diff --git a/src/paperless_migration/scripts/transform.py b/src/paperless_migration/scripts/transform.py index c9b178ece..d9aed3c1a 100644 --- a/src/paperless_migration/scripts/transform.py +++ b/src/paperless_migration/scripts/transform.py @@ -3,6 +3,7 @@ # "rich", # "ijson", # "typer-slim", +# "websockets", # ] # /// @@ -14,6 +15,7 @@ from pathlib import Path from typing import Any from typing import TypedDict +import ijson import typer from rich.console import Console from rich.progress import BarColumn @@ -22,14 +24,8 @@ from rich.progress import SpinnerColumn from rich.progress import TextColumn from rich.progress import TimeElapsedColumn from rich.table import Table - -try: - import ijson # type: ignore -except ImportError as exc: # pragma: no cover - handled at runtime - raise SystemExit( - "ijson is required for migration transform. " - "Install dependencies (e.g., `uv pip install ijson`).", - ) from exc +from websockets.sync.client import ClientConnection +from websockets.sync.client import connect app = typer.Typer(add_completion=False) console = Console() @@ -80,6 +76,8 @@ def migrate( "-o", callback=validate_output, ), + ws_url: str | None = typer.Option(None, "--ws"), + update_frequency: int = typer.Option(100, "--freq"), ) -> None: """ Process JSON fixtures with detailed summary and timing. @@ -94,6 +92,15 @@ def migrate( total_processed: int = 0 start_time: float = time.perf_counter() + ws: ClientConnection | None = None + if ws_url: + try: + ws = connect(ws_url) + except Exception as e: + console.print( + f"[yellow]Warning: Could not connect to WebSocket: {e}[/yellow]", + ) + progress = Progress( SpinnerColumn(), TextColumn("[bold blue]{task.description}"), @@ -103,33 +110,49 @@ def migrate( console=console, ) - with ( - progress, - input_path.open("rb") as infile, - output_path.open("w", encoding="utf-8") as outfile, - ): - task = progress.add_task("Processing fixture", start=True) - outfile.write("[\n") - first: bool = True + try: + with ( + progress, + input_path.open("rb") as infile, + output_path.open("w", encoding="utf-8") as outfile, + ): + task = progress.add_task("Processing fixture", start=True) + outfile.write("[\n") + first: bool = True - for i, obj in enumerate(ijson.items(infile, "item")): - fixture: FixtureObject = obj - model: str = fixture["model"] - total_processed += 1 + for i, obj in enumerate(ijson.items(infile, "item")): + fixture: FixtureObject = obj + model: str = fixture["model"] + total_processed += 1 - transform: TransformFn | None = TRANSFORMS.get(model) - if transform: - fixture = transform(fixture) - stats[model] += 1 + transform: TransformFn | None = TRANSFORMS.get(model) + if transform: + fixture = transform(fixture) + stats[model] += 1 - if not first: - outfile.write(",\n") - first = False + if not first: + outfile.write(",\n") + first = False - json.dump(fixture, outfile, ensure_ascii=False) - progress.advance(task, 1) + json.dump(fixture, outfile, ensure_ascii=False) + progress.advance(task, 1) - outfile.write("\n]\n") + if ws and (i % update_frequency == 0): + ws.send( + json.dumps( + { + "task": "processing", + "completed": total_processed, + "stats": dict(stats), + }, + ), + ) + + outfile.write("\n]\n") + + finally: + if ws: + ws.close() end_time: float = time.perf_counter() duration: float = end_time - start_time