# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.

from typing import Any

import opensearchpy
from opensearchpy._async.helpers.actions import aiter
from opensearchpy.serializer import serializer


class AsyncConnections:
    _conns: Any

    """
    Class responsible for holding connections to different clusters. Used as a
    singleton in this module.
    """

    def __init__(self) -> None:
        self._kwargs: Any = {}
        self._conns: Any = {}

    async def configure(self, **kwargs: Any) -> None:
        """
        Configure multiple connections at once, useful for passing in config
        dictionaries obtained from other sources, like Django's settings or a
        configuration management tool.

        Example::

            async_connections.configure(
                default={'hosts': 'localhost'},
                dev={'hosts': ['opensearchdev1.example.com:9200'], 'sniff_on_start': True},
            )

        Connections will only be constructed lazily when requested through
        ``get_connection``.
        """
        async for k in aiter(list(self._conns)):
            # try and preserve existing client to keep the persistent connections alive
            if k in self._kwargs and kwargs.get(k, None) == self._kwargs[k]:
                continue
            del self._conns[k]
        self._kwargs = kwargs

    async def add_connection(self, alias: str, conn: Any) -> None:
        """
        Add a connection object, it will be passed through as-is.
        """
        self._conns[alias] = conn

    async def remove_connection(self, alias: str) -> None:
        """
        Remove connection from the registry. Raises ``KeyError`` if connection
        wasn't found.
        """
        errors = 0
        async for d in aiter((self._conns, self._kwargs)):
            try:
                del d[alias]
            except KeyError:
                errors += 1

        if errors == 2:
            raise KeyError(f"There is no connection with alias {alias!r}.")

    async def create_connection(self, alias: str = "default", **kwargs: Any) -> Any:
        """
        Construct an instance of ``opensearchpy.AsyncOpenSearch`` and register
        it under given alias.
        """
        kwargs.setdefault("serializer", serializer)
        conn = self._conns[alias] = opensearchpy.AsyncOpenSearch(**kwargs)
        return conn

    async def get_connection(self, alias: str = "default") -> Any:
        """
        Retrieve a connection, construct it if necessary (only configuration
        was passed to us). If a non-string alias has been passed through we
        assume it's already a client instance and will just return it as-is.

        Raises ``KeyError`` if no client (or its definition) is registered
        under the alias.
        """
        # do not check isinstance(AsyncOpenSearch) so that people can wrap their
        # clients
        if not isinstance(alias, str):
            return alias

        # connection already established
        try:
            return self._conns[alias]
        except KeyError:
            pass

        # if not, try to create it
        try:
            return await self.create_connection(alias, **self._kwargs[alias])
        except KeyError:
            # no connection and no kwargs to set one up
            raise KeyError(f"There is no connection with alias {alias!r}.")


async_connections = AsyncConnections()
configure = async_connections.configure
add_connection = async_connections.add_connection
remove_connection = async_connections.remove_connection
create_connection = async_connections.create_connection
get_connection = async_connections.get_connection
