from typing import TYPE_CHECKING, Any, Dict, Optional, Union

from litellm._logging import verbose_proxy_logger
from litellm.caching import DualCache
from litellm.proxy._types import (
    KeyRequestBase,
    LiteLLM_ManagementEndpoint_MetadataFields,
    LiteLLM_ManagementEndpoint_MetadataFields_Premium,
    LiteLLM_OrganizationTable,
    LiteLLM_ProjectTable,
    LiteLLM_TeamTable,
    LiteLLM_UserTable,
    LitellmUserRoles,
    NewProjectRequest,
    UpdateProjectRequest,
    UserAPIKeyAuth,
)
from litellm.proxy.utils import _premium_user_check

if TYPE_CHECKING:
    from litellm.proxy._types import NewProjectRequest, UpdateProjectRequest
    from litellm.proxy.utils import PrismaClient, ProxyLogging


def _user_has_admin_view(user_api_key_dict: UserAPIKeyAuth) -> bool:
    return (
        user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN
        or user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY
    )


def _is_user_team_admin(
    user_api_key_dict: UserAPIKeyAuth, team_obj: LiteLLM_TeamTable
) -> bool:
    for member in team_obj.members_with_roles:
        if (
            member.user_id is not None and member.user_id == user_api_key_dict.user_id
        ) and member.role == "admin":
            return True

    return False


async def _is_user_org_admin_for_team(
    user_api_key_dict: UserAPIKeyAuth, team_obj: LiteLLM_TeamTable
) -> bool:
    """
    Check if user is an org admin for the team's organization.

    Returns True if:
    - The team belongs to an organization, AND
    - The user has org_admin role in that organization
    """
    if not team_obj.organization_id or not user_api_key_dict.user_id:
        return False

    from litellm.proxy.auth.auth_checks import get_user_object
    from litellm.proxy.proxy_server import (
        prisma_client,
        proxy_logging_obj,
        user_api_key_cache,
    )

    caller_user = await get_user_object(
        user_id=user_api_key_dict.user_id,
        prisma_client=prisma_client,
        user_api_key_cache=user_api_key_cache,
        user_id_upsert=False,
        proxy_logging_obj=proxy_logging_obj,
    )
    if caller_user is None:
        return False

    for m in caller_user.organization_memberships or []:
        if (
            m.organization_id == team_obj.organization_id
            and m.user_role == LitellmUserRoles.ORG_ADMIN.value
        ):
            return True

    return False


def _team_member_has_permission(
    user_api_key_dict: UserAPIKeyAuth,
    team_obj: LiteLLM_TeamTable,
    permission: str,
) -> bool:
    """Check if a non-admin team member has a specific permission on a team."""
    if not team_obj.team_member_permissions:
        return False
    if permission not in team_obj.team_member_permissions:
        return False
    for member in team_obj.members_with_roles:
        if (
            member.user_id is not None
            and member.user_id == user_api_key_dict.user_id
        ):
            return True
    return False


async def _user_has_admin_privileges(
    user_api_key_dict: UserAPIKeyAuth,
    prisma_client: Optional["PrismaClient"] = None,
    user_api_key_cache: Optional["DualCache"] = None,
    proxy_logging_obj: Optional["ProxyLogging"] = None,
) -> bool:
    """
    Check if user has admin privileges (proxy admin, team admin, or org admin).

    Args:
        user_api_key_dict: User API key authentication object
        prisma_client: Prisma client for database operations
        user_api_key_cache: Cache for user API keys
        proxy_logging_obj: Proxy logging object

    Returns:
        True if user is proxy admin, team admin for any team, or org admin for any organization
    """
    # Check if user is proxy admin
    if user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN:
        return True

    # If no database connection, can't check team/org admin status
    if prisma_client is None or user_api_key_dict.user_id is None:
        return False

    # Get user object to check team and org admin status
    from litellm.caching import DualCache as DualCacheImport
    from litellm.proxy.auth.auth_checks import get_user_object

    try:
        user_obj = await get_user_object(
            user_id=user_api_key_dict.user_id,
            prisma_client=prisma_client,
            user_api_key_cache=user_api_key_cache or DualCacheImport(),
            user_id_upsert=False,
            proxy_logging_obj=proxy_logging_obj,
        )

        if user_obj is None:
            return False

        # Check if user is org admin for any organization
        if user_obj.organization_memberships is not None:
            for membership in user_obj.organization_memberships:
                if membership.user_role == LitellmUserRoles.ORG_ADMIN.value:
                    return True

        # Check if user is team admin for any team
        if user_obj.teams is not None and len(user_obj.teams) > 0:
            # Get all teams user is in
            teams = await prisma_client.db.litellm_teamtable.find_many(
                where={"team_id": {"in": user_obj.teams}}
            )

            for team in teams:
                team_obj = LiteLLM_TeamTable(**team.model_dump())
                if _is_user_team_admin(
                    user_api_key_dict=user_api_key_dict, team_obj=team_obj
                ):
                    return True

    except Exception as e:
        # If there's an error checking, default to False for security
        verbose_proxy_logger.debug(
            f"Error checking admin privileges for user {user_api_key_dict.user_id}: {e}"
        )
        return False

    return False


def _org_admin_can_invite_user(
    admin_user_obj: LiteLLM_UserTable,
    target_user_obj: LiteLLM_UserTable,
) -> bool:
    """
    Check if an org admin can invite the target user.
    Target user must be in at least one org where the admin has org admin role.

    Args:
        admin_user_obj: The admin user's full object (from get_user_object)
        target_user_obj: The target user's full object (from get_user_object)

    Returns:
        True if target user is in an org where admin has org admin role
    """
    if admin_user_obj.organization_memberships is None:
        return False
    admin_org_ids = {
        m.organization_id
        for m in admin_user_obj.organization_memberships
        if m.user_role == LitellmUserRoles.ORG_ADMIN.value
    }
    if not admin_org_ids:
        return False
    if target_user_obj.organization_memberships is None:
        return False
    target_org_ids = {
        m.organization_id for m in target_user_obj.organization_memberships
    }
    return bool(admin_org_ids & target_org_ids)


async def _team_admin_can_invite_user(
    user_api_key_dict: UserAPIKeyAuth,
    admin_user_obj: LiteLLM_UserTable,
    target_user_obj: LiteLLM_UserTable,
    prisma_client: "PrismaClient",
) -> bool:
    """
    Check if a team admin can invite the target user.
    Target user must be in at least one team where the admin has team admin role.

    Args:
        user_api_key_dict: The admin user's API key auth object
        admin_user_obj: The admin user's full object (from get_user_object)
        target_user_obj: The target user's full object (from get_user_object)
        prisma_client: Prisma client for database operations

    Returns:
        True if target user is in a team where admin has team admin role
    """
    if not admin_user_obj.teams or len(admin_user_obj.teams) == 0:
        return False
    if not target_user_obj.teams or len(target_user_obj.teams) == 0:
        return False

    teams = await prisma_client.db.litellm_teamtable.find_many(
        where={"team_id": {"in": admin_user_obj.teams}}
    )
    admin_team_ids = [
        team.team_id
        for team in teams
        if _is_user_team_admin(
            user_api_key_dict=user_api_key_dict,
            team_obj=LiteLLM_TeamTable(**team.model_dump()),
        )
    ]
    if not admin_team_ids:
        return False
    target_team_ids = set(target_user_obj.teams)
    return bool(set(admin_team_ids) & target_team_ids)


async def admin_can_invite_user(
    target_user_id: str,
    user_api_key_dict: UserAPIKeyAuth,
    prisma_client: Optional["PrismaClient"] = None,
    user_api_key_cache: Optional["DualCache"] = None,
    proxy_logging_obj: Optional["ProxyLogging"] = None,
) -> bool:
    """
    Check if the admin can create an invitation for the target user.
    - Proxy admins: can invite any user
    - Org admins: can only invite users in their org(s)
    - Team admins: can only invite users in their team(s)

    Uses get_user_object for caching of both admin and target user objects.

    Args:
        target_user_id: The user_id of the user to invite
        user_api_key_dict: The admin user's API key auth object
        prisma_client: Prisma client for database operations
        user_api_key_cache: Cache for user API keys
        proxy_logging_obj: Proxy logging object

    Returns:
        True if user can invite the target user
    """
    if user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN:
        return True

    if prisma_client is None or user_api_key_dict.user_id is None:
        return False

    from litellm.caching import DualCache as DualCacheImport
    from litellm.proxy.auth.auth_checks import get_user_object

    try:
        cache = user_api_key_cache or DualCacheImport()
        admin_user_obj = await get_user_object(
            user_id=user_api_key_dict.user_id,
            prisma_client=prisma_client,
            user_api_key_cache=cache,
            user_id_upsert=False,
            proxy_logging_obj=proxy_logging_obj,
        )
        if admin_user_obj is None:
            return False

        target_user_obj = await get_user_object(
            user_id=target_user_id,
            prisma_client=prisma_client,
            user_api_key_cache=cache,
            user_id_upsert=False,
            proxy_logging_obj=proxy_logging_obj,
        )
        if target_user_obj is None:
            return False

        if _org_admin_can_invite_user(admin_user_obj, target_user_obj):
            return True

        if await _team_admin_can_invite_user(
            user_api_key_dict=user_api_key_dict,
            admin_user_obj=admin_user_obj,
            target_user_obj=target_user_obj,
            prisma_client=prisma_client,
        ):
            return True

        return False
    except Exception as e:
        verbose_proxy_logger.debug(
            f"Error checking invite permission for user {user_api_key_dict.user_id}: {e}"
        )
        return False


def _set_object_metadata_field(
    object_data: Union[
        LiteLLM_TeamTable,
        KeyRequestBase,
        LiteLLM_OrganizationTable,
        LiteLLM_ProjectTable,
        "NewProjectRequest",
        "UpdateProjectRequest",
    ],
    field_name: str,
    value: Any,
) -> None:
    """
    Helper function to set metadata fields that require premium user checks

    Args:
        object_data: The team/key/organization/project data object to modify
        field_name: Name of the metadata field to set
        value: Value to set for the field
    """
    if field_name in LiteLLM_ManagementEndpoint_MetadataFields_Premium:
        _premium_user_check(field_name)

    object_data.metadata = object_data.metadata or {}
    object_data.metadata[field_name] = value


async def _upsert_budget_and_membership(
    tx,
    *,
    team_id: str,
    user_id: str,
    max_budget: Optional[float],
    existing_budget_id: Optional[str],
    user_api_key_dict: UserAPIKeyAuth,
    tpm_limit: Optional[int] = None,
    rpm_limit: Optional[int] = None,
):
    """
    Helper function to Create/Update or Delete the budget within the team membership
    Args:
        tx: The transaction object
        team_id: The ID of the team
        user_id: The ID of the user
        max_budget: The maximum budget for the team
        existing_budget_id: The ID of the existing budget, if any
        user_api_key_dict: User API Key dictionary containing user information
        tpm_limit: Tokens per minute limit for the team member
        rpm_limit: Requests per minute limit for the team member

    If max_budget, tpm_limit, and rpm_limit are all None, the user's budget is removed from the team membership.
    If any of these values exist, a budget is updated or created and linked to the team membership.
    """
    if max_budget is None and tpm_limit is None and rpm_limit is None:
        # disconnect the budget since all limits are None
        await tx.litellm_teammembership.update(
            where={"user_id_team_id": {"user_id": user_id, "team_id": team_id}},
            data={"litellm_budget_table": {"disconnect": True}},
        )
        return

    # create a new budget
    create_data: Dict[str, Any] = {
        "created_by": user_api_key_dict.user_id or "",
        "updated_by": user_api_key_dict.user_id or "",
    }
    if max_budget is not None:
        create_data["max_budget"] = max_budget
    if tpm_limit is not None:
        create_data["tpm_limit"] = tpm_limit
    if rpm_limit is not None:
        create_data["rpm_limit"] = rpm_limit

    new_budget = await tx.litellm_budgettable.create(
        data=create_data,
        include={"team_membership": True},
    )
    # upsert the team membership with the new/updated budget
    await tx.litellm_teammembership.upsert(
        where={
            "user_id_team_id": {
                "user_id": user_id,
                "team_id": team_id,
            }
        },
        data={
            "create": {
                "user_id": user_id,
                "team_id": team_id,
                "litellm_budget_table": {
                    "connect": {"budget_id": new_budget.budget_id},
                },
            },
            "update": {
                "litellm_budget_table": {
                    "connect": {"budget_id": new_budget.budget_id},
                },
            },
        },
    )


def _update_metadata_field(updated_kv: dict, field_name: str) -> None:
    """
    Helper function to update metadata fields that require premium user checks in the update endpoint

    Args:
        updated_kv: The key-value dict being used for the update
        field_name: Name of the metadata field being updated
    """
    if field_name in LiteLLM_ManagementEndpoint_MetadataFields_Premium:
        value = updated_kv.get(field_name)
        # Skip the premium check for empty collections ([] or {}).
        # The UI sends these as defaults even when the user hasn't configured
        # any enterprise features (see issue #20304).  However, we still
        # proceed with the update so that users can intentionally clear a
        # previously-set field by sending an empty list/dict.
        if value is not None and value != [] and value != {}:
            _premium_user_check()

    if field_name in updated_kv and updated_kv[field_name] is not None:
        # remove field from updated_kv
        _value = updated_kv.pop(field_name)
        if "metadata" in updated_kv and updated_kv["metadata"] is not None:
            updated_kv["metadata"][field_name] = _value
        else:
            updated_kv["metadata"] = {field_name: _value}


def _has_non_empty_value(value: Any) -> bool:
    """Check if a value has real content (not None, not empty list, not blank string)."""
    if value is None:
        return False
    if isinstance(value, list) and len(value) == 0:
        return False
    if isinstance(value, str) and value.strip() == "":
        return False
    return True


def _update_metadata_fields(updated_kv: dict) -> None:
    """
    Helper function to update all metadata fields (both premium and standard).

    Args:
        updated_kv: The key-value dict being used for the update
    """
    for field in LiteLLM_ManagementEndpoint_MetadataFields_Premium:
        if field in updated_kv and updated_kv[field] is not None:
            _update_metadata_field(updated_kv=updated_kv, field_name=field)

    for field in LiteLLM_ManagementEndpoint_MetadataFields:
        if field in updated_kv and updated_kv[field] is not None:
            _update_metadata_field(updated_kv=updated_kv, field_name=field)
