The websocket included script

This commit is contained in:
Trenton H
2026-01-29 09:39:52 -08:00
parent b8af971652
commit b44eea6508

View File

@@ -3,6 +3,7 @@
# "rich",
# "ijson",
# "typer-slim",
# "websockets",
# ]
# ///
@@ -14,6 +15,7 @@ from pathlib import Path
from typing import Any
from typing import TypedDict
import ijson
import typer
from rich.console import Console
from rich.progress import BarColumn
@@ -22,14 +24,8 @@ from rich.progress import SpinnerColumn
from rich.progress import TextColumn
from rich.progress import TimeElapsedColumn
from rich.table import Table
try:
import ijson # type: ignore
except ImportError as exc: # pragma: no cover - handled at runtime
raise SystemExit(
"ijson is required for migration transform. "
"Install dependencies (e.g., `uv pip install ijson`).",
) from exc
from websockets.sync.client import ClientConnection
from websockets.sync.client import connect
app = typer.Typer(add_completion=False)
console = Console()
@@ -80,6 +76,8 @@ def migrate(
"-o",
callback=validate_output,
),
ws_url: str | None = typer.Option(None, "--ws"),
update_frequency: int = typer.Option(100, "--freq"),
) -> None:
"""
Process JSON fixtures with detailed summary and timing.
@@ -94,6 +92,15 @@ def migrate(
total_processed: int = 0
start_time: float = time.perf_counter()
ws: ClientConnection | None = None
if ws_url:
try:
ws = connect(ws_url)
except Exception as e:
console.print(
f"[yellow]Warning: Could not connect to WebSocket: {e}[/yellow]",
)
progress = Progress(
SpinnerColumn(),
TextColumn("[bold blue]{task.description}"),
@@ -103,33 +110,49 @@ def migrate(
console=console,
)
with (
progress,
input_path.open("rb") as infile,
output_path.open("w", encoding="utf-8") as outfile,
):
task = progress.add_task("Processing fixture", start=True)
outfile.write("[\n")
first: bool = True
try:
with (
progress,
input_path.open("rb") as infile,
output_path.open("w", encoding="utf-8") as outfile,
):
task = progress.add_task("Processing fixture", start=True)
outfile.write("[\n")
first: bool = True
for i, obj in enumerate(ijson.items(infile, "item")):
fixture: FixtureObject = obj
model: str = fixture["model"]
total_processed += 1
for i, obj in enumerate(ijson.items(infile, "item")):
fixture: FixtureObject = obj
model: str = fixture["model"]
total_processed += 1
transform: TransformFn | None = TRANSFORMS.get(model)
if transform:
fixture = transform(fixture)
stats[model] += 1
transform: TransformFn | None = TRANSFORMS.get(model)
if transform:
fixture = transform(fixture)
stats[model] += 1
if not first:
outfile.write(",\n")
first = False
if not first:
outfile.write(",\n")
first = False
json.dump(fixture, outfile, ensure_ascii=False)
progress.advance(task, 1)
json.dump(fixture, outfile, ensure_ascii=False)
progress.advance(task, 1)
outfile.write("\n]\n")
if ws and (i % update_frequency == 0):
ws.send(
json.dumps(
{
"task": "processing",
"completed": total_processed,
"stats": dict(stats),
},
),
)
outfile.write("\n]\n")
finally:
if ws:
ws.close()
end_time: float = time.perf_counter()
duration: float = end_time - start_time