"""DB engine + sessionmaker — process-level singleton.

Previous code created a new Engine on every get_sessionmaker() call. Each
Engine carries its own connection pool, so pipeline + scheduler + api would
hold dozens of independent pools and easily exceed MySQL's max_connections
under load. Now we build one engine per (db_url, pid) and reuse.

Tests can call `_reset_for_tests()` to drop the cache between runs.
"""
from __future__ import annotations

import os

from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session, sessionmaker

from govcrawler.settings import get_settings

# (db_url, pid) → Engine — keyed by pid so child processes (forked workers)
# get their own pool instead of inheriting a parent's broken sockets.
_engine_cache: dict[tuple[str, int], Engine] = {}
_sm_cache: dict[tuple[str, int], sessionmaker[Session]] = {}


def get_engine() -> Engine:
    s = get_settings()
    key = (s.db_url, os.getpid())
    eng = _engine_cache.get(key)
    if eng is None:
        eng = create_engine(
            s.db_url,
            echo=False,
            future=True,
            pool_pre_ping=True,
            pool_recycle=3600,           # MySQL wait_timeout 默认 8h，留一手
            pool_size=10,
            max_overflow=10,
        )
        _engine_cache[key] = eng
    return eng


def get_sessionmaker() -> sessionmaker[Session]:
    s = get_settings()
    key = (s.db_url, os.getpid())
    sm = _sm_cache.get(key)
    if sm is None:
        sm = sessionmaker(bind=get_engine(), expire_on_commit=False, class_=Session)
        _sm_cache[key] = sm
    return sm


def _reset_for_tests() -> None:
    """Test-only: drop cached engines/sessionmakers. Called by conftest
    between fixtures that re-point db_url to a fresh sqlite file."""
    for eng in _engine_cache.values():
        try:
            eng.dispose()
        except Exception:
            pass
    _engine_cache.clear()
    _sm_cache.clear()
