import sys, os
import traceback
from dotenv import load_dotenv

load_dotenv()
import os, io, asyncio

# this file is to test litellm/proxy

sys.path.insert(
    0, os.path.abspath("../..")
)  # Adds the parent directory to the system path
import pytest, time
import litellm
from litellm import embedding, completion, completion_cost, Timeout
from litellm import RateLimitError
import importlib, inspect

# test /chat/completion request to the proxy
from fastapi.testclient import TestClient
from fastapi import FastAPI
from litellm.proxy.proxy_server import (
    router,
    save_worker_config,
    initialize,
)  # Replace with the actual module where your FastAPI router is defined

filepath = os.path.dirname(os.path.abspath(__file__))
python_file_path = f"{filepath}/test_configs/custom_callbacks.py"


@pytest.fixture
def client():
    filepath = os.path.dirname(os.path.abspath(__file__))
    config_fp = f"{filepath}/test_configs/test_custom_logger.yaml"
    app = FastAPI()
    asyncio.run(initialize(config=config_fp))
    app.include_router(router)  # Include your router in the test app
    return TestClient(app)


# Your bearer token
token = os.getenv("PROXY_MASTER_KEY")

headers = {"Authorization": f"Bearer {token}"}


print("Testing proxy custom logger")


@pytest.mark.skipif(
    os.environ.get("OPENAI_API_KEY") is None,
    reason="OPENAI_API_KEY not set - skipping integration test"
)
def test_embedding(client):
    try:
        litellm.set_verbose = False
        from litellm.proxy.types_utils.utils import get_instance_fn

        my_custom_logger = get_instance_fn(
            value="custom_callbacks.my_custom_logger", config_file_path=python_file_path
        )
        print("id of initialized custom logger", id(my_custom_logger))
        litellm.callbacks = [my_custom_logger]
        # Your test data
        print("initialized proxy")
        # import the initialized custom logger
        print(litellm.callbacks)

        # assert len(litellm.callbacks) == 1 # assert litellm is initialized with 1 callback
        print("my_custom_logger", my_custom_logger)
        assert my_custom_logger.async_success_embedding is False

        test_data = {"model": "azure-embedding-model", "input": ["hello"]}
        response = client.post("/embeddings", json=test_data, headers=headers)
        print("made request", response.status_code, response.text)
        print(
            "vars my custom logger /embeddings",
            vars(my_custom_logger),
            "id",
            id(my_custom_logger),
        )
        assert (
            my_custom_logger.async_success_embedding is True
        )  # checks if the status of async_success is True, only the async_log_success_event can set this to true
        assert (
            my_custom_logger.async_embedding_kwargs["model"] == "text-embedding-ada-002"
        )  # checks if kwargs passed to async_log_success_event are correct
        kwargs = my_custom_logger.async_embedding_kwargs
        litellm_params = kwargs.get("litellm_params")
        
        # Test 1: Verify metadata is populated correctly
        metadata = litellm_params.get("metadata", None)
        print("\n\n Metadata in custom logger kwargs", litellm_params.get("metadata"))
        assert metadata is not None, "metadata should be present in litellm_params"
        assert "user_api_key" in metadata, "user_api_key should be in metadata"
        assert "headers" in metadata, "headers should be in metadata"
        
        # Test 2: Verify proxy_server_request contains the original request details
        proxy_server_request = litellm_params.get("proxy_server_request")
        assert proxy_server_request is not None, "proxy_server_request should exist"
        assert proxy_server_request.get("url") == "http://testserver/embeddings", "url should match"
        assert proxy_server_request.get("method") == "POST", "method should be POST"
        assert "headers" in proxy_server_request, "headers should be present"
        assert "body" in proxy_server_request, "body should be present"
        
        # Test 3: Verify request body contains the original input data
        body = proxy_server_request["body"]
        assert body.get("model") == "azure-embedding-model", "model should match original request"
        assert body.get("input") == ["hello"], "input should match original request"
        
        # Test 4: Verify model_info is populated
        model_info = litellm_params.get("model_info")
        assert model_info is not None, "model_info should exist"
        assert model_info.get("mode") == "embedding", "mode should be embedding"
        assert model_info.get("id") == "hello", "id should match"
        assert model_info.get("input_cost_per_token") == 0.002, "input cost should match"
        result = response.json()
        print(f"Received response: {result}")
        print("Passed Embedding custom logger on proxy!")
    except Exception as e:
        pytest.fail(f"LiteLLM Proxy test failed. Exception {str(e)}")


@pytest.mark.skipif(
    os.environ.get("OPENAI_API_KEY") is None,
    reason="OPENAI_API_KEY not set - skipping integration test"
)
def test_chat_completion(client):
    try:
        # Your test data
        litellm.set_verbose = False
        from litellm.proxy.types_utils.utils import get_instance_fn

        my_custom_logger = get_instance_fn(
            value="custom_callbacks.my_custom_logger", config_file_path=python_file_path
        )

        print("id of initialized custom logger", id(my_custom_logger))

        litellm.callbacks = [my_custom_logger]
        # import the initialized custom logger
        print(litellm.callbacks)

        # assert len(litellm.callbacks) == 1 # assert litellm is initialized with 1 callback

        print("LiteLLM Callbacks", litellm.callbacks)
        print("my_custom_logger", my_custom_logger)
        assert my_custom_logger.async_success == False

        test_data = {
            "model": "Azure OpenAI GPT-4 Canada",
            "messages": [
                {"role": "user", "content": "write a litellm poem"},
            ],
            "max_tokens": 10,
        }

        response = client.post("/chat/completions", json=test_data, headers=headers)
        print("made request", response.status_code, response.text)
        print("LiteLLM Callbacks", litellm.callbacks)
        time.sleep(1)  # sleep while waiting for callback to run

        print(
            "my_custom_logger in /chat/completions",
            my_custom_logger,
            "id",
            id(my_custom_logger),
        )
        print("vars my custom logger, ", vars(my_custom_logger))
        assert (
            my_custom_logger.async_success == True
        )  # checks if the status of async_success is True, only the async_log_success_event can set this to true
        assert (
            my_custom_logger.async_completion_kwargs["model"] == "gpt-4.1-nano"
        )  # checks if kwargs passed to async_log_success_event are correct
        print(
            "\n\n Custom Logger Async Completion args",
            my_custom_logger.async_completion_kwargs,
        )
        litellm_params = my_custom_logger.async_completion_kwargs.get("litellm_params")
        
        # Test 1: Verify metadata is populated correctly
        metadata = litellm_params.get("metadata", None)
        print("\n\n Metadata in custom logger kwargs", litellm_params.get("metadata"))
        assert metadata is not None, "metadata should be present"
        assert "user_api_key" in metadata, "user_api_key should be in metadata"
        assert "user_api_key_metadata" in metadata, "user_api_key_metadata should be in metadata"
        assert "headers" in metadata, "headers should be in metadata"
        
        # Test 2: Verify model_info is populated
        config_model_info = litellm_params.get("model_info")
        assert config_model_info is not None, "model_info should exist"
        assert config_model_info.get("id") == "gm", "model id should match"
        assert config_model_info.get("mode") == "chat", "mode should be chat"
        assert config_model_info.get("input_cost_per_token") == 0.0002, "input cost should match"
        
        # Test 3: Verify proxy_server_request contains request details
        proxy_server_request_object = litellm_params.get("proxy_server_request")
        assert proxy_server_request_object is not None, "proxy_server_request should exist"
        assert proxy_server_request_object.get("url") == "http://testserver/chat/completions", "url should match"
        assert proxy_server_request_object.get("method") == "POST", "method should be POST"
        
        # Test 4: Verify authorization is not leaked in logged headers
        assert "authorization" not in proxy_server_request_object["headers"], "authorization should not be in headers"
        
        # Test 5: Verify request body contains original input data
        body = proxy_server_request_object.get("body", {})
        assert body.get("model") == "Azure OpenAI GPT-4 Canada", "model should match original request"
        assert body.get("messages") == [{"role": "user", "content": "write a litellm poem"}], "messages should match"
        assert body.get("max_tokens") == 10, "max_tokens should match"
        result = response.json()
        print(f"Received response: {result}")
        print("\nPassed /chat/completions with Custom Logger!")
    except Exception as e:
        pytest.fail(f"LiteLLM Proxy test failed. Exception {str(e)}")


@pytest.mark.skipif(
    os.environ.get("OPENAI_API_KEY") is None,
    reason="OPENAI_API_KEY not set - skipping integration test"
)
def test_chat_completion_stream(client):
    try:
        # Your test data
        litellm.set_verbose = False
        from litellm.proxy.types_utils.utils import get_instance_fn

        my_custom_logger = get_instance_fn(
            value="custom_callbacks.my_custom_logger", config_file_path=python_file_path
        )

        print("id of initialized custom logger", id(my_custom_logger))

        litellm.callbacks = [my_custom_logger]
        import json

        print("initialized proxy")
        # import the initialized custom logger
        print(litellm.callbacks)

        print("LiteLLM Callbacks", litellm.callbacks)
        print("my_custom_logger", my_custom_logger)

        assert (
            my_custom_logger.streaming_response_obj == None
        )  # no streaming response obj is set pre call

        test_data = {
            "model": "Azure OpenAI GPT-4 Canada",
            "messages": [
                {"role": "user", "content": "write 1 line poem about LiteLLM"},
            ],
            "max_tokens": 40,
            "stream": True,  # streaming  call
        }

        response = client.post("/chat/completions", json=test_data, headers=headers)
        print("made request", response.status_code, response.text)
        complete_response = ""
        for line in response.iter_lines():
            if line:
                # Process the streaming data line here
                print("\n\n Line", line)
                print(line)
                line = str(line)

                json_data = line.replace("data: ", "")

                if "[DONE]" in json_data:
                    break

                # Parse the JSON string
                data = json.loads(json_data)

                print("\n\n decode_data", data)

                # Access the content of choices[0]['message']['content']
                content = data["choices"][0]["delta"].get("content", None) or ""

                # Process the content as needed
                print("Content:", content)

                complete_response += content

        print("\n\nHERE is the complete streaming response string", complete_response)
        print("\n\nHERE IS the streaming Response from callback\n\n")
        print(my_custom_logger.streaming_response_obj)
        import time

        time.sleep(0.5)

        streamed_response = my_custom_logger.streaming_response_obj
        assert (
            complete_response == streamed_response["choices"][0]["message"]["content"]
        )

    except Exception as e:
        pytest.fail(f"LiteLLM Proxy test failed. Exception {str(e)}")
