from __future__ import annotations from collections import defaultdict from dataclasses import dataclass from datetime import datetime, timezone import hashlib import json import os from pathlib import Path import re from typing import Any STATE_VERSION = 1 EXPORT_SETTINGS_NAME = "settings_new.json" EXPORT_MANIFEST_NAME = "settings_new.manifest.json" EDITOR_STATE_NAME = "settings_new.state.json" PROVISIONAL_SINGLETON_PREFIX = "__draft_singleton__::" DEFAULT_SEARCH_LIMIT = 25 SPACE_RE = re.compile(r"\s+") NON_ALNUM_RE = re.compile(r"[^0-9a-z]+") TITLE_PREFIX_RE = re.compile(r"^\s*(?:(?:no|nr|n)\.?\s*)?\d+[a-z]?\s*[\.\):,\-]*\s*", re.IGNORECASE) class ConcordanceEditorError(RuntimeError): """Raised when the draft workflow cannot satisfy a request.""" @dataclass(frozen=True) class ConcordanceEditorPaths: live_settings_path: Path draft_root: Path draft_settings_path: Path draft_manifest_path: Path draft_state_path: Path @classmethod def from_roots(cls, live_root: Path, draft_root: Path) -> "ConcordanceEditorPaths": live_root = Path(live_root).resolve() draft_root = Path(draft_root).resolve() return cls( live_settings_path=live_root / "settings.json", draft_root=draft_root, draft_settings_path=draft_root / EXPORT_SETTINGS_NAME, draft_manifest_path=draft_root / EXPORT_MANIFEST_NAME, draft_state_path=draft_root / EDITOR_STATE_NAME, ) def utc_now() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") def atomic_write_text(path: Path, text: str) -> None: path.parent.mkdir(parents=True, exist_ok=True) temp = path.with_name(f"{path.name}.tmp") temp.write_text(text, encoding="utf-8") os.replace(temp, path) def write_json(path: Path, payload: Any) -> None: atomic_write_text(path, json.dumps(payload, indent=2, ensure_ascii=False) + "\n") def read_json(path: Path, *, default: Any | None = None) -> Any: if not path.exists(): return default last_error: Exception | None = None for encoding in ("utf-8", "cp1252", "latin-1"): try: text = path.read_text(encoding=encoding) except UnicodeDecodeError as exc: last_error = exc continue if not text.strip(): return default try: return json.loads(text) except json.JSONDecodeError as exc: raise ConcordanceEditorError( f"{path.name} is not valid JSON: {exc.msg} at line {exc.lineno} column {exc.colno}" ) from exc raise ConcordanceEditorError(f"Could not decode {path}: {last_error}") def sha256_file(path: Path) -> str: digest = hashlib.sha256() with path.open("rb") as handle: for chunk in iter(lambda: handle.read(1024 * 1024), b""): digest.update(chunk) return digest.hexdigest() def key_document(setting: dict) -> str: document = str(setting.get("document") or "").strip() volume = str(setting.get("volume") or "").strip() if not volume: return document suffix = f", v.{volume}" if document.lower().endswith(suffix.lower()): return document if not document: return f"v.{volume}" return f"{document}{suffix}" def setting_key(setting: dict) -> str: return ( f"{str(setting.get('source') or '').strip()} || " f"{key_document(setting)} || " f"{str(setting.get('page') or '').strip()}" ) def strip_enum(value: str) -> str: return TITLE_PREFIX_RE.sub("", str(value or "").strip()) def slug(value: str) -> str: lowered = NON_ALNUM_RE.sub("-", str(value or "").casefold()).strip("-") return lowered or "piece" def first_link(setting: dict, field: str) -> str: for section in setting.get("Sections", []): if not isinstance(section, dict): continue for part in section.get("parts", []): if not isinstance(part, dict): continue value = str(part.get(field) or "").strip() if value: return value return "" def display_composer(setting: dict) -> str: composer = str(setting.get("composer") or "").strip() if composer and composer.casefold() != "anonymous": return composer original = str(setting.get("orig_comp") or "").strip() if original and original.casefold() != "anonymous": return original return "Anonymous" def title_label(setting: dict) -> str: title = str(setting.get("title") or "").strip() subtitle = str(setting.get("subtitle") or "").strip() return " ".join(part for part in (title, subtitle) if part).strip() or "Untitled" def label_for_setting(setting: dict) -> str: return ( f"{str(setting.get('source') or '').strip()} | " f"{key_document(setting)} | " f"{str(setting.get('page') or '').strip()}" ) def normalize_text(value: str) -> str: cleaned = NON_ALNUM_RE.sub(" ", str(value or "").casefold()) return SPACE_RE.sub(" ", cleaned).strip() def search_tokens(value: str) -> list[str]: return [token for token in normalize_text(value).split() if token and not token.isdigit()] def provisional_singleton_key(setting_key_value: str) -> str: return f"{PROVISIONAL_SINGLETON_PREFIX}{setting_key_value}" def is_provisional_singleton(piece_key: str) -> bool: return str(piece_key or "").startswith(PROVISIONAL_SINGLETON_PREFIX) def piece_key_for_row(setting: dict) -> str: stored = str(setting.get("concordances") or "").strip() return stored or provisional_singleton_key(setting_key(setting)) def generated_piece_key(rows: list[dict]) -> str: ordered_keys = sorted(setting_key(row) for row in rows) digest = hashlib.sha1("|".join(ordered_keys).encode("utf-8")).hexdigest()[:10] representative = rows[0] if rows else {} composer = display_composer(representative) title = strip_enum(str(representative.get("title") or "")) or str(representative.get("subtitle") or "") label = slug(f"{composer} {title}".strip()) return f"manual-{label}-{digest}" def count_membership_changes(live_settings: list[dict], draft_settings: list[dict]) -> int: live_by_key = {setting_key(row): str(row.get("concordances") or "").strip() for row in live_settings} draft_by_key = {setting_key(row): str(row.get("concordances") or "").strip() for row in draft_settings} keys = set(live_by_key) | set(draft_by_key) return sum(1 for key in keys if live_by_key.get(key, "") != draft_by_key.get(key, "")) class ConcordanceDraftStore: def __init__(self, paths: ConcordanceEditorPaths) -> None: self.paths = paths def live_settings(self) -> list[dict]: payload = read_json(self.paths.live_settings_path) if not isinstance(payload, list): raise ConcordanceEditorError("settings.json must contain a list of setting objects.") return payload def draft_exists(self) -> bool: return self.paths.draft_settings_path.exists() def draft_settings(self) -> list[dict]: payload = read_json(self.paths.draft_settings_path) if not isinstance(payload, list): raise ConcordanceEditorError("settings_new.json must contain a list of setting objects.") return payload def active_settings(self) -> list[dict]: return self.draft_settings() if self.draft_exists() else self.live_settings() def load_state(self) -> dict: payload = read_json(self.paths.draft_state_path, default={}) if not isinstance(payload, dict): return {} operations = payload.get("operations", []) if not isinstance(operations, list): operations = [] return { "version": int(payload.get("version") or STATE_VERSION), "base_settings_sha256": str(payload.get("base_settings_sha256") or "").strip(), "created_at": str(payload.get("created_at") or "").strip(), "created_by": str(payload.get("created_by") or "").strip(), "updated_at": str(payload.get("updated_at") or "").strip(), "updated_by": str(payload.get("updated_by") or "").strip(), "last_exported_at": str(payload.get("last_exported_at") or "").strip(), "last_exported_by": str(payload.get("last_exported_by") or "").strip(), "operation_count": int(payload.get("operation_count") or 0), "operations": [entry for entry in operations if isinstance(entry, dict)][-200:], } def save_state(self, payload: dict) -> None: write_json(self.paths.draft_state_path, payload) def session_status(self) -> dict: live_settings = self.live_settings() live_hash = sha256_file(self.paths.live_settings_path) state = self.load_state() draft_exists = self.draft_exists() draft_hash = sha256_file(self.paths.draft_settings_path) if draft_exists else "" base_hash = str(state.get("base_settings_sha256") or "").strip() stale = bool(draft_exists and base_hash and base_hash.lower() != live_hash.lower()) working_settings = self.draft_settings() if draft_exists else live_settings context = self._build_context(working_settings) membership_change_count = ( count_membership_changes(live_settings, working_settings) if draft_exists else 0 ) return { "draft_exists": draft_exists, "resume_available": bool(draft_exists and not stale), "stale": stale, "live_settings_sha256": live_hash, "draft_settings_sha256": draft_hash, "base_settings_sha256": base_hash, "draft_root": str(self.paths.draft_root), "settings_path": str(self.paths.draft_settings_path if draft_exists else self.paths.live_settings_path), "manifest_path": str(self.paths.draft_manifest_path), "state_path": str(self.paths.draft_state_path), "summary": context["summary"], "membership_change_count": membership_change_count, "updated_at": str(state.get("updated_at") or ""), "updated_by": str(state.get("updated_by") or ""), "created_at": str(state.get("created_at") or ""), "created_by": str(state.get("created_by") or ""), "last_exported_at": str(state.get("last_exported_at") or ""), "last_exported_by": str(state.get("last_exported_by") or ""), "operation_count": int(state.get("operation_count") or 0), "download_urls": { "settings_new": "/concordances/download/settings_new.json", "manifest": "/concordances/download/settings_new.manifest.json", }, } def start_session(self, user_name: str, *, discard: bool = False) -> dict: if self.draft_exists() and not discard: status = self.session_status() if status["stale"]: raise ConcordanceEditorError( "The existing draft is based on an older settings.json and must be discarded first." ) if not self.paths.draft_state_path.exists(): self._seed_state(user_name, sha256_file(self.paths.live_settings_path)) return status live_settings = self.live_settings() write_json(self.paths.draft_settings_path, live_settings) self._seed_state(user_name, sha256_file(self.paths.live_settings_path)) self._delete_if_exists(self.paths.draft_manifest_path) return self.session_status() def _seed_state(self, user_name: str, base_hash: str) -> None: now = utc_now() self.paths.draft_root.mkdir(parents=True, exist_ok=True) self.save_state( { "version": STATE_VERSION, "base_settings_sha256": base_hash, "created_at": now, "created_by": user_name, "updated_at": now, "updated_by": user_name, "last_exported_at": "", "last_exported_by": "", "operation_count": 0, "operations": [], } ) def search_settings(self, query: str = "", *, limit: int = DEFAULT_SEARCH_LIMIT) -> dict: settings = self.active_settings() context = self._build_context(settings) query_text = str(query or "").strip() tokens = search_tokens(query_text) norm_query = normalize_text(query_text) limited = max(1, min(int(limit or DEFAULT_SEARCH_LIMIT), 100)) scored: list[tuple[float, str, dict]] = [] for row in settings: preview = self._setting_preview(row, context, include_members=False) search_text = " | ".join( [ preview["title"], preview["subtitle"], preview["composer"], preview["original_composer"], preview["source"], preview["document"], preview["page"], preview["label"], ] ) normalized = normalize_text(search_text) token_set = set(search_tokens(search_text)) if tokens: hits = sum(1 for token in tokens if token in token_set) if not hits and norm_query and norm_query not in normalized: continue full_phrase = 1 if norm_query and norm_query in normalized else 0 starts = 1 if norm_query and normalized.startswith(norm_query) else 0 score = hits * 10 + full_phrase * 3 + starts else: score = 1 scored.append((float(score), preview["label"], preview)) ranked = [ preview for _score, _label, preview in sorted( scored, key=lambda item: ( -item[0], -int(item[2]["concordance_count"] or 0), item[1].casefold(), ), )[:limited] ] return { "query": query_text, "result_count": len(ranked), "results": ranked, } def setting_detail(self, setting_key_value: str) -> dict: settings = self.active_settings() context = self._build_context(settings) row = context["settings_by_key"].get(setting_key_value) if row is None: raise ConcordanceEditorError(f"Unknown setting key: {setting_key_value}") return self._setting_preview(row, context, include_members=True) def apply_operation( self, *, left_setting_key: str, operation: str, right_setting_key: str = "", user_name: str, ) -> dict: self._require_draft() state = self.load_state() status = self.session_status() if status["stale"]: raise ConcordanceEditorError("Discard the stale draft before applying more concordance changes.") settings = self.draft_settings() context = self._build_context(settings) left_row = context["settings_by_key"].get(left_setting_key) if left_row is None: raise ConcordanceEditorError(f"Unknown left-hand setting key: {left_setting_key}") operation = str(operation or "").strip() affected_rows: list[dict] target_piece_key = "" right_row = None if operation == "make_singleton": left_row["concordances"] = "" affected_rows = [left_row] elif operation in {"merge_just_setting", "merge_setting_groups"}: right_row = context["settings_by_key"].get(right_setting_key) if right_row is None: raise ConcordanceEditorError(f"Unknown right-hand setting key: {right_setting_key}") left_piece_key = context["setting_to_piece"][left_setting_key] right_piece_key = context["setting_to_piece"][right_setting_key] if left_piece_key == right_piece_key: affected_rows = [] target_piece_key = context["stored_piece_by_effective_piece"].get(right_piece_key, "") else: if operation == "merge_just_setting": affected_rows = [left_row] else: affected_rows = [ context["settings_by_key"][member_key] for member_key in context["piece_to_members"][left_piece_key] ] target_piece_key = self._materialize_target_piece_key( settings=settings, context=context, right_piece_key=right_piece_key, additional_rows=affected_rows, ) for row in affected_rows: row["concordances"] = target_piece_key else: raise ConcordanceEditorError(f"Unsupported operation: {operation}") write_json(self.paths.draft_settings_path, settings) self._delete_if_exists(self.paths.draft_manifest_path) now = utc_now() operation_rows = [setting_key(row) for row in affected_rows] state["updated_at"] = now state["updated_by"] = user_name state["operation_count"] = int(state.get("operation_count") or 0) + 1 operations = list(state.get("operations") or []) operations.append( { "timestamp": now, "user": user_name, "operation": operation, "left_setting_key": left_setting_key, "right_setting_key": right_setting_key, "target_piece_key": target_piece_key, "affected_setting_keys": operation_rows, } ) state["operations"] = operations[-200:] self.save_state(state) refreshed = self.setting_detail(left_setting_key) return { "ok": True, "left_setting": refreshed, "operation": operation, "target_piece_key": target_piece_key, "affected_setting_keys": operation_rows, "session": self.session_status(), "right_setting": self.setting_detail(right_setting_key) if right_setting_key else None, } def prepare_export(self, user_name: str) -> dict: self._require_draft() status = self.session_status() if status["stale"]: raise ConcordanceEditorError( "This draft is stale relative to the current settings.json and should be discarded." ) draft_settings = self.draft_settings() live_settings = self.live_settings() state = self.load_state() settings_hash = sha256_file(self.paths.draft_settings_path) manifest = { "type": "concordance_editor_export_manifest", "version": 1, "created_at": utc_now(), "created_by": user_name, "base_settings_sha256": str(state.get("base_settings_sha256") or ""), "settings_new_sha256": settings_hash, "setting_count": len(draft_settings), "blank_concordance_count": sum( 1 for row in draft_settings if not str(row.get("concordances") or "").strip() ), "membership_change_count": count_membership_changes(live_settings, draft_settings), "operation_count": int(state.get("operation_count") or 0), } write_json(self.paths.draft_manifest_path, manifest) state["last_exported_at"] = manifest["created_at"] state["last_exported_by"] = user_name state["updated_at"] = manifest["created_at"] state["updated_by"] = user_name self.save_state(state) return { "ok": True, "manifest": manifest, "session": self.session_status(), } def _require_draft(self) -> None: if not self.draft_exists(): raise ConcordanceEditorError("No draft exists yet. Start or resume a draft first.") def _delete_if_exists(self, path: Path) -> None: if path.exists(): path.unlink() def _build_context(self, settings: list[dict]) -> dict: settings_by_key: dict[str, dict] = {} setting_to_piece: dict[str, str] = {} piece_to_members: defaultdict[str, list[str]] = defaultdict(list) stored_piece_by_effective_piece: dict[str, str] = {} for row in settings: if not isinstance(row, dict): raise ConcordanceEditorError("settings payload must contain setting objects only.") row_key = setting_key(row) if row_key in settings_by_key: raise ConcordanceEditorError(f"Duplicate setting key in draft: {row_key}") settings_by_key[row_key] = row effective_piece_key = piece_key_for_row(row) setting_to_piece[row_key] = effective_piece_key piece_to_members[effective_piece_key].append(row_key) stored_piece_key = str(row.get("concordances") or "").strip() if stored_piece_key: stored_piece_by_effective_piece[effective_piece_key] = stored_piece_key for members in piece_to_members.values(): members.sort() member_counts = [len(members) for members in piece_to_members.values()] summary = { "setting_count": len(settings_by_key), "piece_count": len(piece_to_members), "singleton_count": sum(1 for count in member_counts if count == 1), "multi_setting_piece_count": sum(1 for count in member_counts if count > 1), "largest_piece_size": max(member_counts, default=0), "blank_concordance_count": sum( 1 for row in settings if not str(row.get("concordances") or "").strip() ), } return { "settings_by_key": settings_by_key, "setting_to_piece": setting_to_piece, "piece_to_members": dict(piece_to_members), "stored_piece_by_effective_piece": stored_piece_by_effective_piece, "summary": summary, } def _setting_preview(self, row: dict, context: dict, *, include_members: bool) -> dict: row_key = setting_key(row) effective_piece_key = context["setting_to_piece"][row_key] member_keys = context["piece_to_members"][effective_piece_key] members = [] if include_members: for member_key in member_keys: member_row = context["settings_by_key"][member_key] members.append( { "setting_key": member_key, "label": label_for_setting(member_row), "title": title_label(member_row), "composer": display_composer(member_row), "source": str(member_row.get("source") or "").strip(), "document": key_document(member_row), "page": str(member_row.get("page") or "").strip(), } ) return { "setting_key": row_key, "label": label_for_setting(row), "title": str(row.get("title") or "").strip(), "subtitle": str(row.get("subtitle") or "").strip(), "composer": str(row.get("composer") or "").strip(), "original_composer": str(row.get("orig_comp") or "").strip(), "display_composer": display_composer(row), "source": str(row.get("source") or "").strip(), "document": key_document(row), "page": str(row.get("page") or "").strip(), "stored_concordances": str(row.get("concordances") or "").strip(), "effective_piece_key": effective_piece_key, "is_provisional_singleton": is_provisional_singleton(effective_piece_key), "concordance_count": len(member_keys), "primary_pdf": first_link(row, "PDF"), "primary_midi": first_link(row, "midi"), "primary_fronimo": first_link(row, "Fronimo") or first_link(row, "fronimo"), "members": members, } def _materialize_target_piece_key( self, *, settings: list[dict], context: dict, right_piece_key: str, additional_rows: list[dict], ) -> str: existing_key = context["stored_piece_by_effective_piece"].get(right_piece_key, "") if existing_key: return existing_key right_member_keys = context["piece_to_members"].get(right_piece_key, []) if not right_member_keys: raise ConcordanceEditorError("Right-hand piece has no members.") materialized_rows = [ context["settings_by_key"][member_key] for member_key in right_member_keys ] materialized_rows.extend(additional_rows) new_piece_key = generated_piece_key(materialized_rows) for row in materialized_rows: row["concordances"] = new_piece_key return new_piece_key