From 1c99e55069399004d3d599695a658df1c519596d Mon Sep 17 00:00:00 2001 From: Trenton H <797416+stumpylog@users.noreply.github.com> Date: Thu, 29 Jan 2026 10:06:02 -0800 Subject: [PATCH] Initial version hacked up by Opus --- src/paperless_migration/asgi.py | 28 ++ src/paperless_migration/consumers.py | 245 ++++++++++++++ src/paperless_migration/routing.py | 13 + src/paperless_migration/scripts/transform.py | 181 ---------- src/paperless_migration/scripts/wipe_db.py | 61 ---- .../{scripts => services}/__init__.py | 0 src/paperless_migration/services/importer.py | 186 +++++++++++ src/paperless_migration/services/transform.py | 173 ++++++++++ src/paperless_migration/services/wipe_db.py | 115 +++++++ src/paperless_migration/settings.py | 36 +- .../paperless_migration/migration_home.html | 310 ++++++++++++++++-- src/paperless_migration/urls.py | 12 +- src/paperless_migration/views.py | 247 ++++---------- 13 files changed, 1128 insertions(+), 479 deletions(-) create mode 100644 src/paperless_migration/asgi.py create mode 100644 src/paperless_migration/consumers.py create mode 100644 src/paperless_migration/routing.py delete mode 100644 src/paperless_migration/scripts/transform.py delete mode 100644 src/paperless_migration/scripts/wipe_db.py rename src/paperless_migration/{scripts => services}/__init__.py (100%) create mode 100644 src/paperless_migration/services/importer.py create mode 100644 src/paperless_migration/services/transform.py create mode 100644 src/paperless_migration/services/wipe_db.py diff --git a/src/paperless_migration/asgi.py b/src/paperless_migration/asgi.py new file mode 100644 index 000000000..906b36b31 --- /dev/null +++ b/src/paperless_migration/asgi.py @@ -0,0 +1,28 @@ +"""ASGI application for migration mode with WebSocket support.""" + +from __future__ import annotations + +import os + +from channels.auth import AuthMiddlewareStack +from channels.routing import ProtocolTypeRouter +from channels.routing import URLRouter +from channels.security.websocket import AllowedHostsOriginValidator +from django.core.asgi import get_asgi_application + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "paperless_migration.settings") + +# Initialize Django ASGI application early to ensure settings are loaded +django_asgi_app = get_asgi_application() + +# Import routing after Django is initialized +from paperless_migration.routing import websocket_urlpatterns # noqa: E402 + +application = ProtocolTypeRouter( + { + "http": django_asgi_app, + "websocket": AllowedHostsOriginValidator( + AuthMiddlewareStack(URLRouter(websocket_urlpatterns)), + ), + }, +) diff --git a/src/paperless_migration/consumers.py b/src/paperless_migration/consumers.py new file mode 100644 index 000000000..dd3d58b76 --- /dev/null +++ b/src/paperless_migration/consumers.py @@ -0,0 +1,245 @@ +"""WebSocket consumers for migration operations.""" + +from __future__ import annotations + +import json +import logging +import os +import shutil +import tempfile +from pathlib import Path +from typing import Any + +from channels.generic.websocket import AsyncWebsocketConsumer +from django.conf import settings + +from paperless_migration.services.importer import ImportService +from paperless_migration.services.transform import TransformService + +logger = logging.getLogger(__name__) + + +class MigrationConsumerBase(AsyncWebsocketConsumer): + """Base consumer with common authentication and messaging logic.""" + + async def connect(self) -> None: + """Authenticate and accept or reject the connection.""" + user = self.scope.get("user") + session = self.scope.get("session", {}) + + if not user or not user.is_authenticated: + logger.warning("WebSocket connection rejected: not authenticated") + await self.close(code=4001) + return + + if not user.is_superuser: + logger.warning("WebSocket connection rejected: not superuser") + await self.close(code=4003) + return + + if not session.get("migration_code_ok"): + logger.warning("WebSocket connection rejected: migration code not verified") + await self.close(code=4002) + return + + await self.accept() + logger.info("WebSocket connection accepted for user: %s", user.username) + + async def disconnect(self, close_code: int) -> None: + """Handle disconnection.""" + logger.debug("WebSocket disconnected with code: %d", close_code) + + async def receive(self, text_data: str | None = None, **kwargs: Any) -> None: + """Handle incoming messages - triggers the operation.""" + if text_data is None: + return + + try: + data = json.loads(text_data) + except json.JSONDecodeError: + await self.send_error("Invalid JSON message") + return + + action = data.get("action") + if action == "start": + await self.run_operation() + else: + await self.send_error(f"Unknown action: {action}") + + async def run_operation(self) -> None: + """Override in subclasses to run the specific operation.""" + raise NotImplementedError + + async def send_message(self, msg_type: str, **kwargs: Any) -> None: + """Send a typed JSON message to the client.""" + await self.send(text_data=json.dumps({"type": msg_type, **kwargs})) + + async def send_log(self, message: str, level: str = "info") -> None: + """Send a log message.""" + await self.send_message("log", message=message, level=level) + + async def send_progress( + self, + current: int, + total: int | None = None, + label: str = "", + ) -> None: + """Send a progress update.""" + await self.send_message( + "progress", + current=current, + total=total, + label=label, + ) + + async def send_stats(self, stats: dict[str, Any]) -> None: + """Send statistics update.""" + await self.send_message("stats", **stats) + + async def send_complete( + self, + duration: float, + *, + success: bool, + **kwargs: Any, + ) -> None: + """Send completion message.""" + await self.send_message( + "complete", + success=success, + duration=duration, + **kwargs, + ) + + async def send_error(self, message: str) -> None: + """Send an error message.""" + await self.send_message("error", message=message) + + +class TransformConsumer(MigrationConsumerBase): + """WebSocket consumer for transform operations.""" + + async def run_operation(self) -> None: + """Run the transform operation.""" + input_path = Path(settings.MIGRATION_EXPORT_PATH) + output_path = Path(settings.MIGRATION_TRANSFORMED_PATH) + frequency = settings.MIGRATION_PROGRESS_FREQUENCY + + if not input_path.exists(): + await self.send_error(f"Export file not found: {input_path}") + return + + if output_path.exists(): + await self.send_error( + f"Output file already exists: {output_path}. " + "Delete it first to re-run transform.", + ) + return + + await self.send_log("Starting transform operation...") + + service = TransformService( + input_path=input_path, + output_path=output_path, + update_frequency=frequency, + ) + + try: + async for update in service.run_async(): + match update["type"]: + case "progress": + await self.send_progress( + current=update["completed"], + label=f"{update['completed']:,} rows processed", + ) + if update.get("stats"): + await self.send_stats({"transformed": update["stats"]}) + case "complete": + await self.send_complete( + success=True, + duration=update["duration"], + total_processed=update["total_processed"], + stats=update["stats"], + speed=update["speed"], + ) + case "error": + await self.send_error(update["message"]) + case "log": + await self.send_log( + update["message"], + update.get("level", "info"), + ) + except Exception as exc: + logger.exception("Transform operation failed") + await self.send_error(f"Transform failed: {exc}") + + +class ImportConsumer(MigrationConsumerBase): + """WebSocket consumer for import operations.""" + + async def run_operation(self) -> None: + """Run the import operation (wipe, migrate, import).""" + export_path = Path(settings.MIGRATION_EXPORT_PATH) + transformed_path = Path(settings.MIGRATION_TRANSFORMED_PATH) + imported_marker = Path(settings.MIGRATION_IMPORTED_PATH) + source_dir = export_path.parent + + if not export_path.exists(): + await self.send_error("Export file not found. Upload or re-check export.") + return + + if not transformed_path.exists(): + await self.send_error("Transformed file not found. Run transform first.") + return + + await self.send_log("Preparing import operation...") + + # Backup original manifest and swap in transformed version + backup_path: Path | None = None + try: + backup_fd, backup_name = tempfile.mkstemp( + prefix="manifest.v2.", + suffix=".json", + dir=source_dir, + ) + os.close(backup_fd) + backup_path = Path(backup_name) + shutil.copy2(export_path, backup_path) + shutil.copy2(transformed_path, export_path) + await self.send_log("Manifest files prepared") + except Exception as exc: + await self.send_error(f"Failed to prepare import manifest: {exc}") + return + + service = ImportService( + source_dir=source_dir, + imported_marker=imported_marker, + ) + + try: + async for update in service.run_async(): + match update["type"]: + case "phase": + await self.send_log(f"Phase: {update['phase']}", level="info") + case "log": + await self.send_log( + update["message"], + update.get("level", "info"), + ) + case "complete": + await self.send_complete( + success=update["success"], + duration=update["duration"], + ) + case "error": + await self.send_error(update["message"]) + except Exception as exc: + logger.exception("Import operation failed") + await self.send_error(f"Import failed: {exc}") + finally: + # Restore original manifest + if backup_path and backup_path.exists(): + try: + shutil.move(str(backup_path), str(export_path)) + except Exception as exc: + logger.warning("Failed to restore backup manifest: %s", exc) diff --git a/src/paperless_migration/routing.py b/src/paperless_migration/routing.py new file mode 100644 index 000000000..72bcb6d3d --- /dev/null +++ b/src/paperless_migration/routing.py @@ -0,0 +1,13 @@ +"""WebSocket URL routing for migration operations.""" + +from __future__ import annotations + +from django.urls import path + +from paperless_migration.consumers import ImportConsumer +from paperless_migration.consumers import TransformConsumer + +websocket_urlpatterns = [ + path("ws/migration/transform/", TransformConsumer.as_asgi()), + path("ws/migration/import/", ImportConsumer.as_asgi()), +] diff --git a/src/paperless_migration/scripts/transform.py b/src/paperless_migration/scripts/transform.py deleted file mode 100644 index d9aed3c1a..000000000 --- a/src/paperless_migration/scripts/transform.py +++ /dev/null @@ -1,181 +0,0 @@ -# /// script -# dependencies = [ -# "rich", -# "ijson", -# "typer-slim", -# "websockets", -# ] -# /// - -import json -import time -from collections import Counter -from collections.abc import Callable -from pathlib import Path -from typing import Any -from typing import TypedDict - -import ijson -import typer -from rich.console import Console -from rich.progress import BarColumn -from rich.progress import Progress -from rich.progress import SpinnerColumn -from rich.progress import TextColumn -from rich.progress import TimeElapsedColumn -from rich.table import Table -from websockets.sync.client import ClientConnection -from websockets.sync.client import connect - -app = typer.Typer(add_completion=False) -console = Console() - - -class FixtureObject(TypedDict): - model: str - pk: int - fields: dict[str, Any] - - -TransformFn = Callable[[FixtureObject], FixtureObject] - - -def transform_documents_document(obj: FixtureObject) -> FixtureObject: - fields: dict[str, Any] = obj["fields"] - fields.pop("storage_type", None) - content: Any = fields.get("content") - fields["content_length"] = len(content) if isinstance(content, str) else 0 - return obj - - -TRANSFORMS: dict[str, TransformFn] = { - "documents.document": transform_documents_document, -} - - -def validate_output(value: Path) -> Path: - if value.exists(): - raise typer.BadParameter(f"Output file '{value}' already exists.") - return value - - -@app.command() -def migrate( - input_path: Path = typer.Option( - ..., - "--input", - "-i", - exists=True, - file_okay=True, - dir_okay=False, - readable=True, - ), - output_path: Path = typer.Option( - ..., - "--output", - "-o", - callback=validate_output, - ), - ws_url: str | None = typer.Option(None, "--ws"), - update_frequency: int = typer.Option(100, "--freq"), -) -> None: - """ - Process JSON fixtures with detailed summary and timing. - """ - if input_path.resolve() == output_path.resolve(): - console.print( - "[bold red]Error:[/bold red] Input and output paths cannot be the same file.", - ) - raise typer.Exit(code=1) - - stats: Counter[str] = Counter() - total_processed: int = 0 - start_time: float = time.perf_counter() - - ws: ClientConnection | None = None - if ws_url: - try: - ws = connect(ws_url) - except Exception as e: - console.print( - f"[yellow]Warning: Could not connect to WebSocket: {e}[/yellow]", - ) - - progress = Progress( - SpinnerColumn(), - TextColumn("[bold blue]{task.description}"), - BarColumn(), - TextColumn("{task.completed:,} rows"), - TimeElapsedColumn(), - console=console, - ) - - try: - with ( - progress, - input_path.open("rb") as infile, - output_path.open("w", encoding="utf-8") as outfile, - ): - task = progress.add_task("Processing fixture", start=True) - outfile.write("[\n") - first: bool = True - - for i, obj in enumerate(ijson.items(infile, "item")): - fixture: FixtureObject = obj - model: str = fixture["model"] - total_processed += 1 - - transform: TransformFn | None = TRANSFORMS.get(model) - if transform: - fixture = transform(fixture) - stats[model] += 1 - - if not first: - outfile.write(",\n") - first = False - - json.dump(fixture, outfile, ensure_ascii=False) - progress.advance(task, 1) - - if ws and (i % update_frequency == 0): - ws.send( - json.dumps( - { - "task": "processing", - "completed": total_processed, - "stats": dict(stats), - }, - ), - ) - - outfile.write("\n]\n") - - finally: - if ws: - ws.close() - - end_time: float = time.perf_counter() - duration: float = end_time - start_time - - # Final Statistics Table - console.print("\n[bold green]Processing Complete[/bold green]") - - table = Table(show_header=True, header_style="bold magenta") - table.add_column("Metric", style="dim") - table.add_column("Value", justify="right") - - table.add_row("Total Time", f"{duration:.2f} seconds") - table.add_row("Total Processed", f"{total_processed:,} rows") - table.add_row( - "Processing Speed", - f"{total_processed / duration:.0f} rows/sec" if duration > 0 else "N/A", - ) - - for model, count in stats.items(): - table.add_row(f"Transformed: {model}", f"{count:,}") - - console.print(table) - - -if __name__ == "__main__": - app() diff --git a/src/paperless_migration/scripts/wipe_db.py b/src/paperless_migration/scripts/wipe_db.py deleted file mode 100644 index 9b87a33e0..000000000 --- a/src/paperless_migration/scripts/wipe_db.py +++ /dev/null @@ -1,61 +0,0 @@ -import django -from django.apps import apps -from django.db import connection -from django.db.migrations.recorder import MigrationRecorder - - -def _target_tables() -> list[str]: - tables = { - model._meta.db_table for model in apps.get_models(include_auto_created=True) - } - tables.add(MigrationRecorder.Migration._meta.db_table) - existing = set(connection.introspection.table_names()) - return sorted(tables & existing) - - -def _drop_sqlite_tables() -> None: - tables = _target_tables() - with connection.cursor() as cursor: - cursor.execute("PRAGMA foreign_keys=OFF;") - for table in tables: - cursor.execute(f'DROP TABLE IF EXISTS "{table}";') - cursor.execute("PRAGMA foreign_keys=ON;") - - -def _drop_postgres_tables() -> None: - tables = _target_tables() - if not tables: - return - with connection.cursor() as cursor: - for table in tables: - cursor.execute(f'DROP TABLE IF EXISTS "{table}" CASCADE;') - - -def _drop_mysql_tables() -> None: - tables = _target_tables() - with connection.cursor() as cursor: - cursor.execute("SET FOREIGN_KEY_CHECKS=0;") - for table in tables: - cursor.execute(f"DROP TABLE IF EXISTS `{table}`;") - cursor.execute("SET FOREIGN_KEY_CHECKS=1;") - - -def main() -> None: - django.setup() - vendor = connection.vendor - print(f"Wiping database for {vendor}...") # noqa: T201 - - if vendor == "sqlite": - _drop_sqlite_tables() - elif vendor == "postgresql": - _drop_postgres_tables() - elif vendor == "mysql": - _drop_mysql_tables() - else: - raise SystemExit(f"Unsupported database vendor: {vendor}") - - print("Database wipe complete.") # noqa: T201 - - -if __name__ == "__main__": - main() diff --git a/src/paperless_migration/scripts/__init__.py b/src/paperless_migration/services/__init__.py similarity index 100% rename from src/paperless_migration/scripts/__init__.py rename to src/paperless_migration/services/__init__.py diff --git a/src/paperless_migration/services/importer.py b/src/paperless_migration/services/importer.py new file mode 100644 index 000000000..dd1eb6f97 --- /dev/null +++ b/src/paperless_migration/services/importer.py @@ -0,0 +1,186 @@ +"""Import service for loading transformed data into v3 database.""" + +from __future__ import annotations + +import subprocess +import sys +import time +from dataclasses import dataclass +from pathlib import Path +from typing import TYPE_CHECKING +from typing import TypedDict + +if TYPE_CHECKING: + from collections.abc import AsyncGenerator + from collections.abc import Generator + + +class ProgressUpdate(TypedDict, total=False): + """Progress update message structure.""" + + type: str + phase: str + message: str + level: str + success: bool + duration: float + return_code: int + + +@dataclass +class ImportService: + """Service for importing transformed data into v3 database. + + This service orchestrates the three-phase import process: + 1. Wipe the existing database + 2. Run Django migrations for v3 schema + 3. Import the transformed data + """ + + source_dir: Path + imported_marker: Path + manage_path: Path | None = None + + def __post_init__(self) -> None: + if self.manage_path is None: + # Default to manage.py in the src directory + self.manage_path = ( + Path(__file__).resolve().parent.parent.parent / "manage.py" + ) + + def _get_env(self) -> dict[str, str]: + """Get environment variables for subprocess calls.""" + import os + + env = os.environ.copy() + env["DJANGO_SETTINGS_MODULE"] = "paperless.settings" + env["PAPERLESS_MIGRATION_MODE"] = "0" + return env + + def _run_command( + self, + args: list[str], + label: str, + ) -> Generator[ProgressUpdate, None, int]: + """Run a command and yield log lines. Returns the return code.""" + yield {"type": "log", "message": f"Running: {label}", "level": "info"} + + process = subprocess.Popen( + args, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + bufsize=1, + text=True, + env=self._get_env(), + ) + + try: + if process.stdout: + for line in process.stdout: + yield { + "type": "log", + "message": line.rstrip(), + "level": "info", + } + process.wait() + return process.returncode + finally: + if process.poll() is None: + process.kill() + + def run_sync(self) -> Generator[ProgressUpdate, None, None]: + """Run the import synchronously, yielding progress updates. + + This orchestrates: + 1. Database wipe + 2. Django migrations + 3. Document import + """ + start_time = time.perf_counter() + + # Phase 1: Wipe database + yield {"type": "phase", "phase": "wipe"} + wipe_cmd = [ + sys.executable, + "-m", + "paperless_migration.services.wipe_db", + ] + wipe_code = yield from self._run_command(wipe_cmd, "Database wipe") + + if wipe_code != 0: + yield { + "type": "error", + "message": f"Database wipe failed with code {wipe_code}", + } + return + + yield {"type": "log", "message": "Database wipe complete", "level": "info"} + + # Phase 2: Run migrations + yield {"type": "phase", "phase": "migrate"} + migrate_cmd = [ + sys.executable, + str(self.manage_path), + "migrate", + "--noinput", + ] + migrate_code = yield from self._run_command(migrate_cmd, "Django migrations") + + if migrate_code != 0: + yield { + "type": "error", + "message": f"Migrations failed with code {migrate_code}", + } + return + + yield {"type": "log", "message": "Migrations complete", "level": "info"} + + # Phase 3: Import data + yield {"type": "phase", "phase": "import"} + import_cmd = [ + sys.executable, + str(self.manage_path), + "document_importer", + str(self.source_dir), + "--data-only", + ] + import_code = yield from self._run_command(import_cmd, "Document import") + + if import_code != 0: + yield { + "type": "error", + "message": f"Import failed with code {import_code}", + } + return + + # Mark import as complete + try: + self.imported_marker.parent.mkdir(parents=True, exist_ok=True) + self.imported_marker.write_text("ok\n", encoding="utf-8") + except Exception as exc: + yield { + "type": "log", + "message": f"Warning: Could not write import marker: {exc}", + "level": "warning", + } + + end_time = time.perf_counter() + duration = end_time - start_time + + yield { + "type": "complete", + "success": True, + "duration": duration, + } + + async def run_async(self) -> AsyncGenerator[ProgressUpdate, None]: + """Run the import asynchronously, yielding progress updates. + + This wraps the synchronous implementation to work with async consumers. + """ + import asyncio + + for update in self.run_sync(): + yield update + # Yield control to the event loop + await asyncio.sleep(0) diff --git a/src/paperless_migration/services/transform.py b/src/paperless_migration/services/transform.py new file mode 100644 index 000000000..600937b5e --- /dev/null +++ b/src/paperless_migration/services/transform.py @@ -0,0 +1,173 @@ +"""Transform service for converting v2 exports to v3 format.""" + +from __future__ import annotations + +import json +import time +from collections import Counter +from collections.abc import AsyncGenerator +from collections.abc import Callable +from collections.abc import Generator +from dataclasses import dataclass +from dataclasses import field +from typing import TYPE_CHECKING +from typing import Any +from typing import TypedDict + +import ijson + +if TYPE_CHECKING: + from pathlib import Path + + +class FixtureObject(TypedDict): + """Structure of a Django fixture object.""" + + model: str + pk: int + fields: dict[str, Any] + + +class ProgressUpdate(TypedDict, total=False): + """Progress update message structure.""" + + type: str + completed: int + stats: dict[str, int] + message: str + level: str + duration: float + total_processed: int + speed: float + + +TransformFn = Callable[[FixtureObject], FixtureObject] + + +def transform_documents_document(obj: FixtureObject) -> FixtureObject: + """Transform a documents.document fixture object for v3 schema.""" + fields: dict[str, Any] = obj["fields"] + fields.pop("storage_type", None) + content: Any = fields.get("content") + fields["content_length"] = len(content) if isinstance(content, str) else 0 + return obj + + +# Registry of model-specific transforms +TRANSFORMS: dict[str, TransformFn] = { + "documents.document": transform_documents_document, +} + + +@dataclass +class TransformService: + """Service for transforming v2 exports to v3 format. + + This service processes JSON fixtures incrementally using ijson for + memory-efficient streaming, and yields progress updates suitable + for WebSocket transmission. + """ + + input_path: Path + output_path: Path + update_frequency: int = 100 + _stats: Counter[str] = field(default_factory=Counter, init=False) + _total_processed: int = field(default=0, init=False) + + def validate(self) -> str | None: + """Validate preconditions for transform. Returns error message or None.""" + if not self.input_path.exists(): + return f"Input file not found: {self.input_path}" + if self.output_path.exists(): + return f"Output file already exists: {self.output_path}" + if self.input_path.resolve() == self.output_path.resolve(): + return "Input and output paths cannot be the same file" + return None + + def _process_fixture(self, obj: FixtureObject) -> FixtureObject: + """Apply any registered transforms to a fixture object.""" + model: str = obj["model"] + transform: TransformFn | None = TRANSFORMS.get(model) + if transform: + obj = transform(obj) + self._stats[model] += 1 + return obj + + def run_sync(self) -> Generator[ProgressUpdate, None, None]: + """Run the transform synchronously, yielding progress updates. + + This is the core implementation that processes the JSON file + and yields progress updates at regular intervals. + """ + error = self.validate() + if error: + yield {"type": "error", "message": error} + return + + self._stats.clear() + self._total_processed = 0 + start_time = time.perf_counter() + + yield {"type": "log", "message": "Opening input file...", "level": "info"} + + try: + with ( + self.input_path.open("rb") as infile, + self.output_path.open("w", encoding="utf-8") as outfile, + ): + outfile.write("[\n") + first = True + + for i, obj in enumerate(ijson.items(infile, "item")): + fixture: FixtureObject = obj + fixture = self._process_fixture(fixture) + self._total_processed += 1 + + if not first: + outfile.write(",\n") + first = False + + json.dump(fixture, outfile, ensure_ascii=False) + + # Yield progress at configured frequency + if i > 0 and i % self.update_frequency == 0: + yield { + "type": "progress", + "completed": self._total_processed, + "stats": dict(self._stats), + } + + outfile.write("\n]\n") + + except Exception as exc: + # Clean up partial output on error + if self.output_path.exists(): + self.output_path.unlink() + yield {"type": "error", "message": str(exc)} + return + + end_time = time.perf_counter() + duration = end_time - start_time + speed = self._total_processed / duration if duration > 0 else 0 + + yield { + "type": "complete", + "duration": duration, + "total_processed": self._total_processed, + "stats": dict(self._stats), + "speed": speed, + } + + async def run_async(self) -> AsyncGenerator[ProgressUpdate, None]: + """Run the transform asynchronously, yielding progress updates. + + This wraps the synchronous implementation to work with async consumers. + The actual I/O is done synchronously since ijson doesn't support async, + but we yield control periodically to keep the event loop responsive. + """ + import asyncio + + for update in self.run_sync(): + yield update + # Yield control to the event loop periodically + await asyncio.sleep(0) diff --git a/src/paperless_migration/services/wipe_db.py b/src/paperless_migration/services/wipe_db.py new file mode 100644 index 000000000..ae5c31f0d --- /dev/null +++ b/src/paperless_migration/services/wipe_db.py @@ -0,0 +1,115 @@ +"""Database wipe service for migration import process. + +This module can be run as a script via: + python -m paperless_migration.services.wipe_db + +It uses the paperless_migration settings to wipe all tables +before running v3 migrations. +""" + +from __future__ import annotations + +import logging +import sys +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from django.db.backends.base.base import BaseDatabaseWrapper + +logger = logging.getLogger(__name__) + + +def _get_target_tables(connection: BaseDatabaseWrapper) -> list[str]: + """Get list of tables to drop that exist in the database.""" + from django.apps import apps + from django.db.migrations.recorder import MigrationRecorder + + model_tables = { + model._meta.db_table for model in apps.get_models(include_auto_created=True) + } + model_tables.add(MigrationRecorder.Migration._meta.db_table) + existing_tables = set(connection.introspection.table_names()) + return sorted(model_tables & existing_tables) + + +def _drop_sqlite_tables(connection: BaseDatabaseWrapper) -> int: + """Drop tables for SQLite database. Returns count of tables dropped.""" + tables = _get_target_tables(connection) + with connection.cursor() as cursor: + cursor.execute("PRAGMA foreign_keys=OFF;") + for table in tables: + cursor.execute(f'DROP TABLE IF EXISTS "{table}";') + cursor.execute("PRAGMA foreign_keys=ON;") + return len(tables) + + +def _drop_postgres_tables(connection: BaseDatabaseWrapper) -> int: + """Drop tables for PostgreSQL database. Returns count of tables dropped.""" + tables = _get_target_tables(connection) + if not tables: + return 0 + with connection.cursor() as cursor: + for table in tables: + cursor.execute(f'DROP TABLE IF EXISTS "{table}" CASCADE;') + return len(tables) + + +def _drop_mysql_tables(connection: BaseDatabaseWrapper) -> int: + """Drop tables for MySQL/MariaDB database. Returns count of tables dropped.""" + tables = _get_target_tables(connection) + with connection.cursor() as cursor: + cursor.execute("SET FOREIGN_KEY_CHECKS=0;") + for table in tables: + cursor.execute(f"DROP TABLE IF EXISTS `{table}`;") + cursor.execute("SET FOREIGN_KEY_CHECKS=1;") + return len(tables) + + +def wipe_database() -> tuple[bool, str]: + """Wipe all application tables from the database. + + Returns: + Tuple of (success: bool, message: str) + """ + from django.db import connection + + vendor = connection.vendor + logger.info("Wiping database for vendor: %s", vendor) + + try: + match vendor: + case "sqlite": + count = _drop_sqlite_tables(connection) + case "postgresql": + count = _drop_postgres_tables(connection) + case "mysql": + count = _drop_mysql_tables(connection) + case _: + return False, f"Unsupported database vendor: {vendor}" + + message = f"Dropped {count} tables from {vendor} database" + logger.info(message) + return True, message + + except Exception as exc: + message = f"Failed to wipe database: {exc}" + logger.exception(message) + return False, message + + +def main() -> int: + """Entry point when run as a script.""" + import os + + import django + + os.environ.setdefault("DJANGO_SETTINGS_MODULE", "paperless_migration.settings") + django.setup() + + success, message = wipe_database() + print(message) # noqa: T201 + return 0 if success else 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/paperless_migration/settings.py b/src/paperless_migration/settings.py index 4228760dd..44308b33e 100644 --- a/src/paperless_migration/settings.py +++ b/src/paperless_migration/settings.py @@ -4,7 +4,6 @@ from __future__ import annotations import logging import os -import secrets from pathlib import Path from typing import Any @@ -41,6 +40,11 @@ DATA_DIR = __get_path("PAPERLESS_DATA_DIR", BASE_DIR.parent / "data") EXPORT_DIR = __get_path("PAPERLESS_EXPORT_DIR", BASE_DIR.parent / "export") +def _parse_redis_url() -> str: + """Parse Redis URL from environment with sensible defaults.""" + return os.getenv("PAPERLESS_REDIS_URL", "redis://localhost:6379") + + def _parse_db_settings() -> dict[str, dict[str, Any]]: databases: dict[str, dict[str, Any]] = { "default": { @@ -97,9 +101,7 @@ def _parse_db_settings() -> dict[str, dict[str, Any]]: DATABASES = _parse_db_settings() -SECRET_KEY = os.getenv( - "PAPERLESS_SECRET_KEY", -) +SECRET_KEY = os.getenv("PAPERLESS_SECRET_KEY") AUTH_PASSWORD_VALIDATORS = [ { @@ -128,6 +130,7 @@ INSTALLED_APPS = [ "django.contrib.sessions", "django.contrib.messages", "django.contrib.staticfiles", + "channels", "allauth", "allauth.account", "allauth.socialaccount", @@ -166,6 +169,24 @@ TEMPLATES = [ }, ] +# ASGI application for Channels +ASGI_APPLICATION = "paperless_migration.asgi.application" + +# Channel layers configuration using Redis +REDIS_URL = _parse_redis_url() + +CHANNEL_LAYERS = { + "default": { + "BACKEND": "channels_redis.core.RedisChannelLayer", + "CONFIG": { + "hosts": [REDIS_URL], + "capacity": 1500, + "expiry": 10, + }, + }, +} + +# Keep WSGI for compatibility WSGI_APPLICATION = "paperless_migration.wsgi.application" AUTHENTICATION_BACKENDS = [ @@ -203,9 +224,16 @@ MIGRATION_TRANSFORMED_PATH = __get_path( ) MIGRATION_IMPORTED_PATH = Path(EXPORT_DIR / "import.completed").resolve() +# Progress update frequency (rows between WebSocket updates) +MIGRATION_PROGRESS_FREQUENCY = int( + os.getenv("PAPERLESS_MIGRATION_PROGRESS_FREQUENCY", "100"), +) + # One-time access code required for migration logins; stable across autoreload _code = os.getenv("PAPERLESS_MIGRATION_ACCESS_CODE") if not _code: + import secrets + _code = secrets.token_urlsafe(12) os.environ["PAPERLESS_MIGRATION_ACCESS_CODE"] = _code MIGRATION_ACCESS_CODE = _code diff --git a/src/paperless_migration/templates/paperless_migration/migration_home.html b/src/paperless_migration/templates/paperless_migration/migration_home.html index d973662c7..5b7da7d57 100644 --- a/src/paperless_migration/templates/paperless_migration/migration_home.html +++ b/src/paperless_migration/templates/paperless_migration/migration_home.html @@ -16,12 +16,12 @@ --bs-border-color: #dee2e6; --bs-link-color: var(--pngx-primary); --bs-link-color-rgb: 23, 84, 31; - } + } @media (prefers-color-scheme: dark) { :root { color-scheme: light; } } - .btn-primary:disabled { - --bs-btn-disabled-bg: #4d7352; - --bs-btn-disabled-border-color: #4d7352; + .btn-primary:disabled { + --bs-btn-disabled-bg: #4d7352; + --bs-btn-disabled-border-color: #4d7352; } body { @@ -32,9 +32,9 @@ min-height: 100vh; } - svg.logo .text { - fill: #161616 !important; - } + svg.logo .text { + fill: #161616 !important; + } .hero-card, .card-step { @@ -106,6 +106,87 @@ color: #fff; border-color: #17541f; } + + .console-log { + background: #0f1a12; + color: #d1e7d6; + border-radius: 12px; + min-height: 180px; + max-height: 400px; + padding: 12px; + font-size: 0.85rem; + font-family: 'Consolas', 'Monaco', monospace; + overflow: auto; + white-space: pre-wrap; + word-break: break-word; + } + + .console-log .log-error { color: #ff6b6b; } + .console-log .log-warning { color: #ffd93d; } + .console-log .log-success { color: #6bcb77; } + .console-log .log-info { color: #4d96ff; } + + .progress-bar-container { + height: 24px; + background: rgba(23, 84, 31, 0.1); + border-radius: 12px; + overflow: hidden; + margin-bottom: 0.5rem; + } + + .progress-bar-fill { + height: 100%; + background: linear-gradient(90deg, #17541f, #2c7a3c); + border-radius: 12px; + transition: width 0.3s ease; + display: flex; + align-items: center; + justify-content: center; + color: white; + font-size: 0.75rem; + font-weight: 600; + min-width: fit-content; + padding: 0 8px; + } + + .stats-grid { + display: grid; + grid-template-columns: repeat(auto-fit, minmax(120px, 1fr)); + gap: 0.5rem; + margin-top: 0.5rem; + } + + .stat-item { + background: rgba(23, 84, 31, 0.05); + border-radius: 8px; + padding: 0.5rem; + text-align: center; + } + + .stat-value { + font-size: 1.25rem; + font-weight: 700; + color: #17541f; + } + + .stat-label { + font-size: 0.75rem; + color: #666; + } + + .ws-status { + display: inline-flex; + align-items: center; + gap: 0.5rem; + padding: 0.25rem 0.75rem; + border-radius: 999px; + font-size: 0.8rem; + font-weight: 500; + } + + .ws-status.connected { background: #d4edda; color: #155724; } + .ws-status.disconnected { background: #f8d7da; color: #721c24; } + .ws-status.connecting { background: #fff3cd; color: #856404; } @@ -118,7 +199,7 @@ {% include "paperless-ngx/snippets/svg_logo.html" with extra_attrs="width='280' class='logo'" %}

Migration Mode

-

Paperless-ngx v2 → v3

+

Paperless-ngx v2 to v3

Migrate your data from Paperless-ngx version 2 to version 3.

@@ -184,8 +265,8 @@ + -
@@ -219,7 +300,7 @@

Transform

Convert the export into the v3-ready structure.

-
+
{% csrf_token %}
+ {% if transformed_exists %} +
+ {% csrf_token %} + +
+ {% endif %}
@@ -253,6 +343,7 @@ type="submit" name="action" value="import" + id="btn-import" {% if not transformed_exists or imported_exists %}disabled aria-disabled="true"{% endif %} > Import transformed data @@ -272,8 +363,8 @@
Run the v2 export from your Paperless instance, e.g.: docker run --rm ghcr.io/paperless-ngx/paperless-ngx:2.20.6 document_exporter --data-only - (see documentation). Once the manifest.json is in-place, upload it or (especially for larger files) place it directly at the expected location and click “Re-check export”. -

⚠️ The export must be generated with version Paperless-ngx v2.20.6

+ (see documentation). Once the manifest.json is in-place, upload it or (especially for larger files) place it directly at the expected location and click "Re-check export". +

Warning: The export must be generated with version Paperless-ngx v2.20.6

{% endif %} @@ -281,38 +372,187 @@
Migration console
- Live output + + + Ready +
-
Ready
+ + + +
Ready to begin migration...
- {% if stream_action %} - - {% endif %} + + ws.onclose = function(event) { + if (event.code !== 1000) { + const reason = event.code === 4001 ? 'Not authenticated' + : event.code === 4002 ? 'Migration code not verified' + : event.code === 4003 ? 'Superuser access required' + : 'Connection closed (code: ' + event.code + ')'; + appendLog(reason, 'error'); + } + setWsStatus('disconnected', 'Disconnected'); + }; + } + + // Check if we should auto-start a WebSocket action + {% if ws_action %} + startWebSocket('{{ ws_action }}'); + {% endif %} + + // Expose for manual triggering if needed + window.startMigrationWs = startWebSocket; + })(); + diff --git a/src/paperless_migration/urls.py b/src/paperless_migration/urls.py index f25c096ba..657b38d30 100644 --- a/src/paperless_migration/urls.py +++ b/src/paperless_migration/urls.py @@ -1,5 +1,8 @@ +"""URL configuration for migration mode.""" + +from __future__ import annotations + from django.conf import settings -from django.conf.urls.static import static from django.contrib.staticfiles.urls import staticfiles_urlpatterns from django.urls import include from django.urls import path @@ -10,12 +13,9 @@ urlpatterns = [ path("accounts/login/", views.migration_login, name="account_login"), path("accounts/", include("allauth.urls")), path("migration/", views.migration_home, name="migration_home"), - path("migration/transform/stream", views.transform_stream, name="transform_stream"), - path("migration/import/stream", views.import_stream, name="import_stream"), - # redirect root to migration home - path("", views.migration_home, name="migration_home"), + # Redirect root to migration home + path("", views.migration_home, name="home"), ] if settings.DEBUG: urlpatterns += staticfiles_urlpatterns() - urlpatterns += static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT) diff --git a/src/paperless_migration/views.py b/src/paperless_migration/views.py index a1e19ac9a..5e2874b01 100644 --- a/src/paperless_migration/views.py +++ b/src/paperless_migration/views.py @@ -1,30 +1,41 @@ -import os -import shutil -import subprocess -import sys -import tempfile -from pathlib import Path +"""Views for migration mode web interface.""" +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING + +from django.conf import settings from django.contrib import messages from django.contrib.auth import authenticate from django.contrib.auth import login from django.contrib.auth.decorators import login_required from django.http import HttpResponseForbidden -from django.http import StreamingHttpResponse from django.shortcuts import redirect from django.shortcuts import render from django.views.decorators.http import require_http_methods -from paperless_migration import settings +if TYPE_CHECKING: + from django.http import HttpRequest + from django.http import HttpResponse -@login_required -@require_http_methods(["GET", "POST"]) -def migration_home(request): +def _check_migration_access(request: HttpRequest) -> HttpResponse | None: + """Check if user has migration access. Returns error response or None.""" if not request.session.get("migration_code_ok"): return HttpResponseForbidden("Access code required") if not request.user.is_superuser: return HttpResponseForbidden("Superuser access required") + return None + + +@login_required +@require_http_methods(["GET", "POST"]) +def migration_home(request: HttpRequest) -> HttpResponse: + """Main migration dashboard view.""" + error_response = _check_migration_access(request) + if error_response: + return error_response export_path = Path(settings.MIGRATION_EXPORT_PATH) transformed_path = Path(settings.MIGRATION_TRANSFORMED_PATH) @@ -32,13 +43,10 @@ def migration_home(request): if request.method == "POST": action = request.POST.get("action") + if action == "check": messages.success(request, "Checked export paths.") - elif action == "transform": - messages.info(request, "Starting transform… live output below.") - request.session["start_stream_action"] = "transform" - if imported_marker.exists(): - imported_marker.unlink() + elif action == "upload": upload = request.FILES.get("export_file") if not upload: @@ -52,27 +60,53 @@ def migration_home(request): messages.success(request, f"Uploaded to {export_path}.") except Exception as exc: messages.error(request, f"Failed to save file: {exc}") + + elif action == "transform": + if imported_marker.exists(): + imported_marker.unlink() + # Signal to start WebSocket connection for transform + request.session["start_ws_action"] = "transform" + messages.info(request, "Starting transform via WebSocket...") + elif action == "import": - messages.info(request, "Starting import… live output below.") - request.session["start_stream_action"] = "import" + # Signal to start WebSocket connection for import + request.session["start_ws_action"] = "import" + messages.info(request, "Starting import via WebSocket...") + + elif action == "reset_transform": + if transformed_path.exists(): + try: + transformed_path.unlink() + messages.success(request, "Transformed file deleted.") + except Exception as exc: + messages.error(request, f"Failed to delete transformed file: {exc}") + if imported_marker.exists(): + try: + imported_marker.unlink() + except Exception: + pass + else: messages.error(request, "Unknown action.") + return redirect("migration_home") - stream_action = request.session.pop("start_stream_action", None) + ws_action = request.session.pop("start_ws_action", None) + context = { "export_path": export_path, "export_exists": export_path.exists(), "transformed_path": transformed_path, "transformed_exists": transformed_path.exists(), "imported_exists": imported_marker.exists(), - "stream_action": stream_action, + "ws_action": ws_action, } return render(request, "paperless_migration/migration_home.html", context) @require_http_methods(["GET", "POST"]) -def migration_login(request): +def migration_login(request: HttpRequest) -> HttpResponse: + """Migration-specific login view requiring access code.""" if request.method == "POST": username = request.POST.get("login", "") password = request.POST.get("password", "") @@ -96,174 +130,3 @@ def migration_login(request): return redirect(settings.LOGIN_REDIRECT_URL) return render(request, "account/login.html") - - -@login_required -@require_http_methods(["GET"]) -def transform_stream(request): - if not request.session.get("migration_code_ok"): - return HttpResponseForbidden("Access code required") - if not request.user.is_superuser: - return HttpResponseForbidden("Superuser access required") - - input_path = Path(settings.MIGRATION_EXPORT_PATH) - output_path = Path(settings.MIGRATION_TRANSFORMED_PATH) - - cmd = [ - sys.executable, - "-m", - "paperless_migration.scripts.transform", - "--input", - str(input_path), - "--output", - str(output_path), - ] - - def event_stream(): - process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - bufsize=1, - text=True, - ) - try: - yield "data: Starting transform...\n\n" - if process.stdout: - for line in process.stdout: - yield f"data: {line.rstrip()}\n\n" - process.wait() - yield f"data: Transform finished with code {process.returncode}\n\n" - finally: - if process and process.poll() is None: - process.kill() - - return StreamingHttpResponse( - event_stream(), - content_type="text/event-stream", - headers={ - "Cache-Control": "no-cache", - "X-Accel-Buffering": "no", - }, - ) - - -@login_required -@require_http_methods(["GET"]) -def import_stream(request): - if not request.session.get("migration_code_ok"): - return HttpResponseForbidden("Access code required") - if not request.user.is_superuser: - return HttpResponseForbidden("Superuser access required") - - export_path = Path(settings.MIGRATION_EXPORT_PATH) - transformed_path = Path(settings.MIGRATION_TRANSFORMED_PATH) - imported_marker = Path(settings.MIGRATION_IMPORTED_PATH) - manage_path = Path(settings.BASE_DIR) / "manage.py" - source_dir = export_path.parent - - env = os.environ.copy() - env["DJANGO_SETTINGS_MODULE"] = "paperless.settings" - env["PAPERLESS_MIGRATION_MODE"] = "0" - - def event_stream(): - if not export_path.exists(): - yield "data: Missing export manifest.json; upload or re-check export.\n\n" - return - if not transformed_path.exists(): - yield "data: Missing transformed manifest.v3.json; run transform first.\n\n" - return - - backup_path: Path | None = None - try: - backup_fd, backup_name = tempfile.mkstemp( - prefix="manifest.v2.", - suffix=".json", - dir=source_dir, - ) - os.close(backup_fd) - backup_path = Path(backup_name) - shutil.copy2(export_path, backup_path) - shutil.copy2(transformed_path, export_path) - except Exception as exc: - yield f"data: Failed to prepare import manifest: {exc}\n\n" - return - - def run_cmd(args, label): - yield f"data: {label}\n\n" - process = subprocess.Popen( - args, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - bufsize=1, - text=True, - env=env, - ) - try: - if process.stdout: - for line in process.stdout: - yield f"data: {line.rstrip()}\n\n" - process.wait() - return process.returncode - finally: - if process and process.poll() is None: - process.kill() - - wipe_cmd = [ - sys.executable, - "-m", - "paperless_migration.scripts.wipe_db", - ] - migrate_cmd = [ - sys.executable, - str(manage_path), - "migrate", - "--noinput", - ] - import_cmd = [ - sys.executable, - str(manage_path), - "document_importer", - str(source_dir), - "--data-only", - ] - try: - wipe_code = yield from run_cmd( - wipe_cmd, - "Wiping database...", - ) - if wipe_code != 0: - yield f"data: Wipe finished with code {wipe_code}\n\n" - return - - migrate_code = yield from run_cmd( - migrate_cmd, - "Running migrations...", - ) - if migrate_code != 0: - yield f"data: Migrate finished with code {migrate_code}\n\n" - return - - import_code = yield from run_cmd( - import_cmd, - "Starting import...", - ) - if import_code == 0: - imported_marker.parent.mkdir(parents=True, exist_ok=True) - imported_marker.write_text("ok\n", encoding="utf-8") - yield f"data: Import finished with code {import_code}\n\n" - finally: - if backup_path and backup_path.exists(): - try: - shutil.move(backup_path, export_path) - except Exception: - pass - - return StreamingHttpResponse( - event_stream(), - content_type="text/event-stream", - headers={ - "Cache-Control": "no-cache", - "X-Accel-Buffering": "no", - }, - )