"""
Test cases for spend log cleanup functionality
"""

from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, MagicMock

import pytest

from litellm.proxy.db.db_transaction_queue.spend_log_cleanup import SpendLogCleanup


def test_spend_log_cleanup_cron_scheduling():
    """Test that cron expressions are correctly parsed for spend log cleanup scheduling"""
    from apscheduler.triggers.cron import CronTrigger

    # Valid cron expressions
    cron_expr = "0 4 * * *"  # 4:00 AM daily
    trigger = CronTrigger.from_crontab(cron_expr)
    assert trigger is not None

    # Every minute (useful for testing)
    trigger_minute = CronTrigger.from_crontab("*/1 * * * *")
    assert trigger_minute is not None

    # Specific day and hour
    trigger_weekly = CronTrigger.from_crontab("0 3 * * 0")  # 3 AM every Sunday
    assert trigger_weekly is not None

    # Invalid cron expression should raise ValueError
    with pytest.raises(ValueError):
        CronTrigger.from_crontab("invalid cron")

    with pytest.raises(ValueError):
        CronTrigger.from_crontab("60 25 * * *")  # Invalid minute and hour


def test_spend_log_cleanup_cron_scheduler_integration():
    """
    Integration test: Verify the proxy_server scheduler logic correctly adds
    cron-based cleanup job when maximum_spend_logs_cleanup_cron is configured.

    This tests the logic in proxy_server.py lines 4671-4717 without requiring
    a real database connection.
    """
    from unittest.mock import MagicMock
    from apscheduler.triggers.cron import CronTrigger

    # Mock scheduler
    mock_scheduler = MagicMock()
    mock_prisma_client = MagicMock()
    mock_cleanup_instance = MagicMock()

    # Test Case 1: Cron-based scheduling
    general_settings_cron = {
        "maximum_spend_logs_retention_period": "7d",
        "maximum_spend_logs_cleanup_cron": "0 4 * * *",  # 4 AM daily
    }

    cleanup_cron = general_settings_cron.get("maximum_spend_logs_cleanup_cron")
    assert cleanup_cron is not None

    # Simulate the scheduler logic from proxy_server.py
    cron_trigger = CronTrigger.from_crontab(cleanup_cron)
    mock_scheduler.add_job(
        mock_cleanup_instance.cleanup_old_spend_logs,
        cron_trigger,
        args=[mock_prisma_client],
        id="spend_log_cleanup_job",
        replace_existing=True,
        misfire_grace_time=3600,
    )

    # Verify scheduler was called correctly
    mock_scheduler.add_job.assert_called_once()
    call_args = mock_scheduler.add_job.call_args

    # Verify the trigger is a CronTrigger
    assert isinstance(call_args[0][1], CronTrigger)

    # Verify job ID
    assert call_args[1]["id"] == "spend_log_cleanup_job"
    assert call_args[1]["replace_existing"] is True

    # Test Case 2: Interval-based scheduling (fallback)
    mock_scheduler.reset_mock()
    general_settings_interval = {
        "maximum_spend_logs_retention_period": "7d",
        # No cron, so it should fall back to interval
    }

    cleanup_cron_fallback = general_settings_interval.get(
        "maximum_spend_logs_cleanup_cron"
    )
    assert cleanup_cron_fallback is None  # No cron configured

    # Simulate interval-based scheduling fallback
    retention_interval = general_settings_interval.get(
        "maximum_spend_logs_retention_interval", "1d"
    )
    from litellm.litellm_core_utils.duration_parser import duration_in_seconds

    interval_seconds = duration_in_seconds(retention_interval)

    mock_scheduler.add_job(
        mock_cleanup_instance.cleanup_old_spend_logs,
        "interval",
        seconds=interval_seconds,
        args=[mock_prisma_client],
        id="spend_log_cleanup_job",
        replace_existing=True,
    )

    # Verify interval scheduling was called
    mock_scheduler.add_job.assert_called_once()
    interval_call_args = mock_scheduler.add_job.call_args
    assert interval_call_args[0][1] == "interval"
    assert interval_call_args[1]["seconds"] == 86400  # 1 day in seconds


@pytest.mark.asyncio
async def test_should_delete_spend_logs():
    # Test case 1: No retention set
    cleaner = SpendLogCleanup(general_settings={})
    assert cleaner._should_delete_spend_logs() is False

    # Test case 2: Valid seconds string
    cleaner = SpendLogCleanup(
        general_settings={"maximum_spend_logs_retention_period": "3600s"}
    )
    assert cleaner._should_delete_spend_logs() is True

    # Test case 3: Valid days string
    cleaner = SpendLogCleanup(
        general_settings={"maximum_spend_logs_retention_period": "30d"}
    )
    assert cleaner._should_delete_spend_logs() is True

    # Test case 4: Valid hours string
    cleaner = SpendLogCleanup(
        general_settings={"maximum_spend_logs_retention_period": "24h"}
    )
    assert cleaner._should_delete_spend_logs() is True

    # Test case 5: Invalid format
    cleaner = SpendLogCleanup(
        general_settings={"maximum_spend_logs_retention_period": "invalid"}
    )
    assert cleaner._should_delete_spend_logs() is False


@pytest.mark.asyncio
async def test_cleanup_old_spend_logs_batch_deletion():
    from unittest.mock import AsyncMock, MagicMock

    # Setup Prisma client
    mock_prisma_client = MagicMock()
    mock_db = MagicMock()

    # Mock execute_raw to return deleted counts
    mock_db.execute_raw = AsyncMock(side_effect=[1000, 500, 0])

    # Wire up mocks
    mock_prisma_client.db = mock_db

    # Mock Redis cache and pod_lock_manager
    mock_redis_cache = MagicMock()
    mock_pod_lock_manager = MagicMock()
    mock_pod_lock_manager.redis_cache = mock_redis_cache
    mock_pod_lock_manager.acquire_lock = AsyncMock(return_value=True)
    mock_pod_lock_manager.release_lock = AsyncMock()

    # Run cleanup with mocked pod_lock_manager
    test_settings = {"maximum_spend_logs_retention_period": "7d"}
    cleaner = SpendLogCleanup(general_settings=test_settings)
    cleaner.pod_lock_manager = mock_pod_lock_manager
    assert cleaner._should_delete_spend_logs() is True
    await cleaner.cleanup_old_spend_logs(mock_prisma_client)

    # Validate batching and deletion via raw SQL
    assert mock_db.execute_raw.call_count == 3

    # Check the first call argument
    call_args_sql = mock_db.execute_raw.call_args_list[0][0][0]
    assert 'DELETE FROM "LiteLLM_SpendLogs"' in call_args_sql
    assert 'WHERE "request_id" IN' in call_args_sql


@pytest.mark.asyncio
async def test_cleanup_old_spend_logs_retention_period_cutoff():
    """
    Test that logs are filtered using correct cutoff based on retention
    """
    # Setup Prisma client
    mock_prisma_client = MagicMock()
    mock_db = MagicMock()
    mock_db.execute_raw = AsyncMock(return_value=0)
    mock_prisma_client.db = mock_db

    # Mock Redis cache and pod_lock_manager
    mock_redis_cache = MagicMock()
    mock_pod_lock_manager = MagicMock()
    mock_pod_lock_manager.redis_cache = mock_redis_cache
    mock_pod_lock_manager.acquire_lock = AsyncMock(return_value=True)
    mock_pod_lock_manager.release_lock = AsyncMock()

    # Run cleanup with mocked pod_lock_manager
    test_settings = {"maximum_spend_logs_retention_period": "24h"}
    cleaner = SpendLogCleanup(general_settings=test_settings)
    cleaner.pod_lock_manager = mock_pod_lock_manager
    assert cleaner._should_delete_spend_logs() is True
    await cleaner.cleanup_old_spend_logs(mock_prisma_client)

    # Verify the cutoff date is correct
    cutoff_date = mock_db.execute_raw.call_args[0][1]
    expected_cutoff = datetime.now(timezone.utc) - timedelta(seconds=86400)
    assert (
        abs((cutoff_date - expected_cutoff).total_seconds()) < 1
    )  # Allow 1 second difference for test execution time


@pytest.mark.asyncio
async def test_cleanup_old_spend_logs_no_retention_period():
    """
    Test that no logs are deleted when no retention period is set
    """
    mock_prisma_client = MagicMock()
    mock_prisma_client.db.execute_raw = AsyncMock()

    cleaner = SpendLogCleanup(general_settings={})  # no retention
    await cleaner.cleanup_old_spend_logs(mock_prisma_client)

    mock_prisma_client.db.execute_raw.assert_not_called()


@pytest.mark.asyncio
async def test_lock_not_released_when_not_acquired():
    """
    Lock release should be skipped when _should_delete_spend_logs returns False
    before the lock is ever acquired.
    """
    mock_prisma_client = MagicMock()
    mock_prisma_client.db.execute_raw = AsyncMock()

    mock_redis_cache = MagicMock()
    mock_pod_lock_manager = MagicMock()
    mock_pod_lock_manager.redis_cache = mock_redis_cache
    mock_pod_lock_manager.acquire_lock = AsyncMock(return_value=True)
    mock_pod_lock_manager.release_lock = AsyncMock()

    # No retention setting → _should_delete_spend_logs() returns False before lock is acquired
    cleaner = SpendLogCleanup(general_settings={})
    cleaner.pod_lock_manager = mock_pod_lock_manager

    await cleaner.cleanup_old_spend_logs(mock_prisma_client)

    mock_pod_lock_manager.acquire_lock.assert_not_called()
    mock_pod_lock_manager.release_lock.assert_not_called()


@pytest.mark.asyncio
async def test_integer_retention_treated_as_days():
    """
    An integer value for maximum_spend_logs_retention_period should be treated
    as days (e.g., 3 → '3d' → 259200 seconds).
    """
    cleaner = SpendLogCleanup(
        general_settings={"maximum_spend_logs_retention_period": 3}
    )
    result = cleaner._should_delete_spend_logs()
    assert result is True
    assert cleaner.retention_seconds == 3 * 86400  # 3 days in seconds


def test_string_retention_still_works():
    """
    String values like '3d', '24h', '3600s' should continue to parse correctly.
    """
    cases = [
        ("3d", 3 * 86400),
        ("24h", 24 * 3600),
        ("3600s", 3600),
        ("2w", 2 * 604800),
    ]
    for setting, expected_seconds in cases:
        cleaner = SpendLogCleanup(
            general_settings={"maximum_spend_logs_retention_period": setting}
        )
        assert cleaner._should_delete_spend_logs() is True, f"Failed for {setting}"
        assert cleaner.retention_seconds == expected_seconds, (
            f"Expected {expected_seconds} for {setting}, got {cleaner.retention_seconds}"
        )


def test_cleanup_batch_size_env_var(monkeypatch):
    """Ensure batch size is configurable via environment variable"""
    import importlib

    import litellm.constants as constants_module
    import litellm.proxy.db.db_transaction_queue.spend_log_cleanup as cleanup_module

    # Set env var and reload modules to pick up new value
    monkeypatch.setenv("SPEND_LOG_CLEANUP_BATCH_SIZE", "25")
    importlib.reload(constants_module)
    importlib.reload(cleanup_module)

    cleaner = cleanup_module.SpendLogCleanup(general_settings={})
    assert cleaner.batch_size == 25

    # Remove env var and reload to restore default for other tests
    monkeypatch.delenv("SPEND_LOG_CLEANUP_BATCH_SIZE", raising=False)
    importlib.reload(constants_module)
    importlib.reload(cleanup_module)
