#!/usr/bin/env python3
"""Bulk ingest existing documents into the RAG system.

Usage:
    python -m scripts.bulk_ingest --file-dir /data/files --meta-file /data/metadata.jsonl

The metadata JSONL file should have one JSON object per line:
    {"doc_id": "xxx", "title": "...", "doc_number": "...", "filename": "xxx.pdf", ...}

Supports all formats: PDF, Word, Excel, PPT, WPS, OFD, images, text, etc.
"""

from __future__ import annotations

import argparse
import asyncio
import json
import sys
import time
from pathlib import Path

# Ensure the project root is on sys.path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))

from app.config import settings
from app.core.ingest_pipeline import create_pipeline
from app.utils.logger import get_logger

logger = get_logger(__name__)


async def bulk_ingest(
    file_dir: Path,
    meta_file: Path,
    *,
    concurrency: int = 4,
    skip_existing: bool = True,
) -> None:
    """Ingest documents in parallel with controlled concurrency."""
    pipeline = create_pipeline()
    semaphore = asyncio.Semaphore(concurrency)

    # Read metadata
    documents = []
    with open(meta_file, "r", encoding="utf-8") as f:
        for line_num, line in enumerate(f, 1):
            line = line.strip()
            if not line:
                continue
            try:
                doc = json.loads(line)
                documents.append(doc)
            except json.JSONDecodeError as e:
                logger.warning(f"Skipping line {line_num}: {e}")

    logger.info(f"Loaded {len(documents)} documents from {meta_file}")

    # Check existing if needed
    if skip_existing:
        existing_ids = set()
        try:
            result = await pipeline._es.raw.search(
                index=settings.es_meta_index,
                body={
                    "query": {"match_all": {}},
                    "size": 0,
                    "aggs": {"doc_ids": {"terms": {"field": "doc_id", "size": 1000000}}},
                },
            )
            for bucket in result["aggregations"]["doc_ids"]["buckets"]:
                existing_ids.add(bucket["key"])
        except Exception:
            pass

        before = len(documents)
        documents = [d for d in documents if d["doc_id"] not in existing_ids]
        logger.info(f"Skipping {before - len(documents)} already-ingested documents")

    # Track progress
    total = len(documents)
    completed = 0
    failed = 0
    start_time = time.time()

    async def process_one(doc_meta: dict) -> None:
        nonlocal completed, failed
        async with semaphore:
            doc_id = doc_meta["doc_id"]
            # Support both new "filename" and legacy "pdf_filename" keys
            filename = doc_meta.get("filename") or doc_meta.get("pdf_filename", f"{doc_id}.pdf")
            file_path = file_dir / filename

            if not file_path.exists():
                logger.warning(f"File not found, skipping: {file_path}")
                failed += 1
                return

            result = await pipeline.ingest_document(
                doc_id=doc_id,
                file_path=file_path,
                metadata=doc_meta,
            )

            if result["status"] == "completed":
                completed += 1
            else:
                failed += 1

            if (completed + failed) % 50 == 0:
                elapsed = time.time() - start_time
                rate = (completed + failed) / elapsed
                logger.info(
                    f"Progress: {completed + failed}/{total} "
                    f"({completed} ok, {failed} failed) "
                    f"@ {rate:.1f} docs/sec"
                )

    # Process all documents
    tasks = [process_one(doc) for doc in documents]
    await asyncio.gather(*tasks, return_exceptions=True)

    elapsed = time.time() - start_time
    logger.info(
        f"Bulk ingest complete: {completed}/{total} succeeded, "
        f"{failed} failed, {elapsed:.1f}s elapsed"
    )

    # Cleanup
    await pipeline._es.close()
    await pipeline._embedding._client.close()


def main():
    parser = argparse.ArgumentParser(description="Bulk ingest documents into RAG system")
    parser.add_argument(
        "--file-dir",
        type=Path,
        required=True,
        help="Directory containing document files",
    )
    parser.add_argument(
        "--meta-file",
        type=Path,
        required=True,
        help="JSONL file with document metadata (one JSON per line)",
    )
    parser.add_argument(
        "--concurrency",
        type=int,
        default=4,
        help="Number of concurrent ingest tasks (default: 4)",
    )
    parser.add_argument(
        "--no-skip",
        action="store_true",
        help="Do not skip already-ingested documents",
    )
    args = parser.parse_args()

    asyncio.run(
        bulk_ingest(
            file_dir=args.file_dir,
            meta_file=args.meta_file,
            concurrency=args.concurrency,
            skip_existing=not args.no_skip,
        )
    )


if __name__ == "__main__":
    main()
