# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

import asyncio
import gc
import json
import os
import pathlib
import subprocess
import sys
from typing import Any

import pytest
import torch

import vllm.model_executor.model_loader.tensorizer
from vllm import LLM, SamplingParams
from vllm.engine.arg_utils import EngineArgs
# yapf: disable
from vllm.model_executor.model_loader.tensorizer import (TensorizerConfig,
                                                         TensorSerializer,
                                                         is_vllm_tensorized,
                                                         open_stream,
                                                         tensorize_vllm_model)
from vllm.model_executor.model_loader.tensorizer_loader import (
    BLACKLISTED_TENSORIZER_ARGS)
# yapf: enable
from vllm.utils import PlaceholderModule

from ..utils import VLLM_PATH, RemoteOpenAIServer
from .conftest import DummyExecutor, assert_from_collective_rpc

try:
    import tensorizer
    from tensorizer import EncryptionParams
except ImportError:
    tensorizer = PlaceholderModule("tensorizer")  # type: ignore[assignment]
    EncryptionParams = tensorizer.placeholder_attr("EncryptionParams")


class TensorizerCaughtError(Exception):
    pass


EXAMPLES_PATH = VLLM_PATH / "examples"

pytest_plugins = "pytest_asyncio",

prompts = [
    "Hello, my name is",
    "The president of the United States is",
    "The capital of France is",
    "The future of AI is",
]
# Create a sampling params object.
sampling_params = SamplingParams(temperature=0.8, top_p=0.95, seed=0)


def patch_init_and_catch_error(self, obj, method_name,
                               expected_error: type[Exception]):
    original = getattr(obj, method_name, None)
    if original is None:
        raise ValueError("Method '{}' not found.".format(method_name))

    def wrapper(*args, **kwargs):
        try:
            return original(*args, **kwargs)
        except expected_error as err:
            raise TensorizerCaughtError from err

    setattr(obj, method_name, wrapper)

    self.load_model()


def assert_specific_tensorizer_error_is_raised(
    executor,
    obj: Any,
    method_name: str,
    expected_error: type[Exception],
):
    with pytest.raises(TensorizerCaughtError):
        executor.collective_rpc(patch_init_and_catch_error,
                                args=(
                                    obj,
                                    method_name,
                                    expected_error,
                                ))


def is_curl_installed():
    try:
        subprocess.check_call(['curl', '--version'])
        return True
    except (subprocess.CalledProcessError, FileNotFoundError):
        return False


def write_keyfile(keyfile_path: str):
    encryption_params = EncryptionParams.random()
    pathlib.Path(keyfile_path).parent.mkdir(parents=True, exist_ok=True)
    with open(keyfile_path, 'wb') as f:
        f.write(encryption_params.key)


@pytest.mark.skipif(not is_curl_installed(), reason="cURL is not installed")
def test_deserialized_encrypted_vllm_model_has_same_outputs(
        model_ref, vllm_runner, tmp_path, model_path):
    args = EngineArgs(model=model_ref)
    with vllm_runner(model_ref) as vllm_model:
        key_path = tmp_path / model_ref / "model.key"
        write_keyfile(key_path)

        outputs = vllm_model.generate(prompts, sampling_params)

    config_for_serializing = TensorizerConfig(tensorizer_uri=str(model_path),
                                              encryption_keyfile=str(key_path))

    tensorize_vllm_model(args, config_for_serializing)

    config_for_deserializing = TensorizerConfig(
        tensorizer_uri=str(model_path), encryption_keyfile=str(key_path))

    with vllm_runner(model_ref,
                     load_format="tensorizer",
                     model_loader_extra_config=config_for_deserializing
                     ) as loaded_vllm_model:  # noqa: E501

        deserialized_outputs = loaded_vllm_model.generate(
            prompts, sampling_params)
        # noqa: E501

        assert outputs == deserialized_outputs


def test_deserialized_hf_model_has_same_outputs(hf_runner, vllm_runner,
                                                tmp_path, model_ref,
                                                model_path):
    with hf_runner(model_ref) as hf_model:
        max_tokens = 50
        outputs = hf_model.generate_greedy(prompts, max_tokens=max_tokens)
        with open_stream(model_path, "wb+") as stream:
            serializer = TensorSerializer(stream)
            serializer.write_module(hf_model.model)

    with vllm_runner(model_ref,
                     load_format="tensorizer",
                     model_loader_extra_config=TensorizerConfig(
                         tensorizer_uri=str(model_path),
                         num_readers=1,
                     )) as loaded_hf_model:
        deserialized_outputs = loaded_hf_model.generate_greedy(
            prompts, max_tokens=max_tokens)

        assert outputs == deserialized_outputs


def test_load_without_tensorizer_load_format(vllm_runner, capfd, model_ref):
    model = None
    try:
        model = vllm_runner(
            model_ref,
            model_loader_extra_config=TensorizerConfig(tensorizer_uri="test"))
    except RuntimeError:
        out, err = capfd.readouterr()
        combined_output = out + err
        assert ("ValueError: Model loader extra config "
                "is not supported for load "
                "format auto") in combined_output
    finally:
        del model
        gc.collect()
        torch.cuda.empty_cache()


def test_raise_value_error_on_invalid_load_format(vllm_runner, capfd,
                                                  model_ref):
    model = None
    try:
        model = vllm_runner(
            model_ref,
            load_format="safetensors",
            model_loader_extra_config=TensorizerConfig(tensorizer_uri="test"))
    except RuntimeError:
        out, err = capfd.readouterr()

        combined_output = out + err
        assert ("ValueError: Model loader extra config is not supported "
                "for load format safetensors") in combined_output
    finally:
        del model
        gc.collect()
        torch.cuda.empty_cache()


@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires 2 GPUs")
def test_tensorizer_with_tp_path_without_template(vllm_runner, capfd):
    try:
        model_ref = "EleutherAI/pythia-1.4b"
        tensorized_path = f"s3://tensorized/{model_ref}/fp16/model.tensors"

        vllm_runner(
            model_ref,
            load_format="tensorizer",
            model_loader_extra_config=TensorizerConfig(
                tensorizer_uri=tensorized_path,
                num_readers=1,
                s3_endpoint="object.ord1.coreweave.com",
            ),
            tensor_parallel_size=2,
            disable_custom_all_reduce=True,
        )
    except RuntimeError:
        out, err = capfd.readouterr()
        combined_output = out + err
        assert ("ValueError: For a sharded model, tensorizer_uri "
                "should include a string format template like '%04d' "
                "to be formatted with the rank "
                "of the shard") in combined_output


@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires 2 GPUs")
def test_deserialized_encrypted_vllm_model_with_tp_has_same_outputs(
        vllm_runner, tmp_path):
    model_ref = "EleutherAI/pythia-1.4b"
    # record outputs from un-sharded un-tensorized model
    with vllm_runner(
            model_ref,
            disable_custom_all_reduce=True,
            enforce_eager=True,
    ) as base_model:
        outputs = base_model.generate(prompts, sampling_params)

    # load model with two shards and serialize with encryption
    model_path = str(tmp_path / model_ref / "model-%02d.tensors")
    key_path = tmp_path / (model_ref + ".key")

    tensorizer_config = TensorizerConfig(
        tensorizer_uri=model_path,
        encryption_keyfile=str(key_path),
    )

    tensorize_vllm_model(
        engine_args=EngineArgs(
            model=model_ref,
            tensor_parallel_size=2,
            disable_custom_all_reduce=True,
            enforce_eager=True,
        ),
        tensorizer_config=tensorizer_config,
    )
    assert os.path.isfile(model_path % 0), "Serialization subprocess failed"
    assert os.path.isfile(model_path % 1), "Serialization subprocess failed"

    with vllm_runner(
            model_ref,
            tensor_parallel_size=2,
            load_format="tensorizer",
            disable_custom_all_reduce=True,
            enforce_eager=True,
            model_loader_extra_config=tensorizer_config) as loaded_vllm_model:
        deserialized_outputs = loaded_vllm_model.generate(
            prompts, sampling_params)

    assert outputs == deserialized_outputs


@pytest.mark.flaky(reruns=3)
def test_vllm_tensorized_model_has_same_outputs(model_ref, vllm_runner,
                                                tmp_path, model_path):
    gc.collect()
    torch.cuda.empty_cache()
    config = TensorizerConfig(tensorizer_uri=str(model_path))
    args = EngineArgs(model=model_ref)

    with vllm_runner(model_ref) as vllm_model:
        outputs = vllm_model.generate(prompts, sampling_params)

    tensorize_vllm_model(args, config)
    assert is_vllm_tensorized(config)

    with vllm_runner(model_ref,
                     load_format="tensorizer",
                     model_loader_extra_config=config) as loaded_vllm_model:
        deserialized_outputs = loaded_vllm_model.generate(
            prompts, sampling_params)
        # noqa: E501

        assert outputs == deserialized_outputs


def test_load_with_just_model_tensors(just_serialize_model_tensors, model_ref):
    # For backwards compatibility, ensure Tensorizer can be still be loaded
    # for inference by passing the model reference name, not a local/S3 dir,
    # and the location of the model tensors

    model_dir = just_serialize_model_tensors

    extra_config = {"tensorizer_uri": f"{model_dir}/model.tensors"}

    ## Start OpenAI API server
    args = [
        "--load-format",
        "tensorizer",
        "--model-loader-extra-config",
        json.dumps(extra_config),
    ]

    with RemoteOpenAIServer(model_ref, args):
        # This test only concerns itself with being able to load the model
        # and successfully initialize the server
        pass


def test_assert_serialization_kwargs_passed_to_tensor_serializer(tmp_path):

    serialization_params = {
        "limit_cpu_concurrency": 2,
    }
    model_ref = "facebook/opt-125m"
    model_path = tmp_path / (model_ref + ".tensors")
    config = TensorizerConfig(tensorizer_uri=str(model_path),
                              serialization_kwargs=serialization_params)
    llm = LLM(model=model_ref, )

    def serialization_test(self, *args, **kwargs):
        # This is performed in the ephemeral worker process, so monkey-patching
        # will actually work, and cleanup is guaranteed so don't
        # need to reset things

        original_dict = serialization_params
        to_compare = {}

        original = tensorizer.serialization.TensorSerializer.__init__

        def tensorizer_serializer_wrapper(self, *args, **kwargs):
            nonlocal to_compare
            to_compare = kwargs.copy()
            return original(self, *args, **kwargs)

        tensorizer.serialization.TensorSerializer.__init__ = (
            tensorizer_serializer_wrapper)

        tensorizer_config = TensorizerConfig(**kwargs["tensorizer_config"])
        self.save_tensorized_model(tensorizer_config=tensorizer_config, )
        return to_compare | original_dict == to_compare

    kwargs = {"tensorizer_config": config.to_serializable()}

    assert assert_from_collective_rpc(llm, serialization_test, kwargs)


def test_assert_deserialization_kwargs_passed_to_tensor_deserializer(
        tmp_path, capfd):

    deserialization_kwargs = {
        "num_readers": "bar",  # illegal value
    }

    serialization_params = {
        "limit_cpu_concurrency": 2,
    }

    model_ref = "facebook/opt-125m"
    model_path = tmp_path / (model_ref + ".tensors")
    config = TensorizerConfig(tensorizer_uri=str(model_path),
                              serialization_kwargs=serialization_params)

    args = EngineArgs(model=model_ref)
    tensorize_vllm_model(args, config)

    loader_tc = TensorizerConfig(
        tensorizer_uri=str(model_path),
        deserialization_kwargs=deserialization_kwargs,
    )

    engine_args = EngineArgs(
        model="facebook/opt-125m",
        load_format="tensorizer",
        model_loader_extra_config=loader_tc.to_serializable(),
    )

    vllm_config = engine_args.create_engine_config()
    executor = DummyExecutor(vllm_config)

    assert_specific_tensorizer_error_is_raised(
        executor,
        tensorizer.serialization.TensorDeserializer,
        "__init__",
        TypeError,
    )


def test_assert_stream_kwargs_passed_to_tensor_deserializer(tmp_path, capfd):

    deserialization_kwargs = {
        "num_readers": 1,
    }

    serialization_params = {
        "limit_cpu_concurrency": 2,
    }

    model_ref = "facebook/opt-125m"
    model_path = tmp_path / (model_ref + ".tensors")
    config = TensorizerConfig(tensorizer_uri=str(model_path),
                              serialization_kwargs=serialization_params)

    args = EngineArgs(model=model_ref)
    tensorize_vllm_model(args, config)

    stream_kwargs = {"mode": "foo"}

    loader_tc = TensorizerConfig(
        tensorizer_uri=str(model_path),
        deserialization_kwargs=deserialization_kwargs,
        stream_kwargs=stream_kwargs,
    )

    engine_args = EngineArgs(
        model="facebook/opt-125m",
        load_format="tensorizer",
        model_loader_extra_config=loader_tc.to_serializable(),
    )

    vllm_config = engine_args.create_engine_config()
    executor = DummyExecutor(vllm_config)

    assert_specific_tensorizer_error_is_raised(
        executor,
        vllm.model_executor.model_loader.tensorizer,
        "open_stream",
        ValueError,
    )


@pytest.mark.asyncio
async def test_serialize_and_serve_entrypoints(tmp_path):
    model_ref = "facebook/opt-125m"

    suffix = "test"
    try:
        result = subprocess.run([
            sys.executable,
            f"{VLLM_PATH}/examples/others/tensorize_vllm_model.py", "--model",
            model_ref, "serialize", "--serialized-directory",
            str(tmp_path), "--suffix", suffix, "--serialization-kwargs",
            '{"limit_cpu_concurrency": 4}'
        ],
                                check=True,
                                capture_output=True,
                                text=True)
    except subprocess.CalledProcessError as e:
        print("Tensorizing failed.")
        print("STDOUT:\n", e.stdout)
        print("STDERR:\n", e.stderr)
        raise

    assert "Successfully serialized" in result.stdout

    # Next, try to serve with vllm serve
    model_uri = tmp_path / "vllm" / model_ref / suffix / "model.tensors"

    model_loader_extra_config = {
        "tensorizer_uri": str(model_uri),
        "stream_kwargs": {
            "force_http": False,
        },
        "deserialization_kwargs": {
            "verify_hash": True,
            "num_readers": 8,
        }
    }

    cmd = [
        "-m", "vllm.entrypoints.cli.main", "serve", "--host", "localhost",
        "--load-format", "tensorizer", model_ref,
        "--model-loader-extra-config",
        json.dumps(model_loader_extra_config, indent=2)
    ]

    proc = await asyncio.create_subprocess_exec(
        sys.executable,
        *cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.STDOUT,
    )

    assert proc.stdout is not None
    fut = proc.stdout.readuntil(b"Application startup complete.")

    try:
        await asyncio.wait_for(fut, 180)
    except asyncio.TimeoutError:
        pytest.fail("Server did not start successfully")
    finally:
        proc.terminate()
    await proc.communicate()


@pytest.mark.parametrize("illegal_value", BLACKLISTED_TENSORIZER_ARGS)
def test_blacklisted_parameter_for_loading(tmp_path, vllm_runner, capfd,
                                           illegal_value):

    serialization_params = {
        "limit_cpu_concurrency": 2,
    }

    model_ref = "facebook/opt-125m"
    model_path = tmp_path / (model_ref + ".tensors")
    config = TensorizerConfig(tensorizer_uri=str(model_path),
                              serialization_kwargs=serialization_params)

    args = EngineArgs(model=model_ref)
    tensorize_vllm_model(args, config)

    loader_tc = {"tensorizer_uri": str(model_path), illegal_value: "foo"}

    try:
        vllm_runner(
            model_ref,
            load_format="tensorizer",
            model_loader_extra_config=loader_tc,
        )
    except RuntimeError:
        out, err = capfd.readouterr()
        combined_output = out + err
        assert (f"ValueError: {illegal_value} is not an allowed "
                f"Tensorizer argument.") in combined_output
