package com.gzzm.lobster.api;

import com.gzzm.lobster.common.JsonUtil;
import com.gzzm.lobster.common.LobsterException;
import com.gzzm.lobster.common.RunStatus;
import com.gzzm.lobster.common.StreamEventType;
import com.gzzm.lobster.identity.UserContext;
import com.gzzm.lobster.identity.UserContextHolder;
import com.gzzm.lobster.run.Run;
import com.gzzm.lobster.run.RunDao;
import com.gzzm.lobster.run.RunStreamEventService;
import com.gzzm.lobster.runtime.AgentRuntime;
import com.gzzm.lobster.runtime.RunScope;
import com.gzzm.lobster.runtime.StreamEvent;
import com.gzzm.lobster.thread.ThreadService;
import com.gzzm.platform.commons.Tools;

import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.BufferedReader;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/**
 * SSE observation endpoint for existing runs.
 *
 * <p>Run creation is handled by {@code POST /ai/api/runs}. This servlet only
 * attaches a browser to an existing run's event stream. Browser disconnects do
 * not control the backend task lifecycle.
 */
public class RunStreamServlet extends HttpServlet {

    private static final long serialVersionUID = 1L;
    private static final long ACTIVE_ATTACH_RETRY_MS = 10_000L;
    private static final long ACTIVE_ATTACH_RETRY_INTERVAL_MS = 100L;
    private static final long ACTIVE_ATTACH_HEARTBEAT_MS = 1_000L;

    @Override
    protected void doPost(HttpServletRequest request, HttpServletResponse response) {
        try {
            handle(request, response);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    protected void doGet(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        try {
            handle(request, response);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void handle(HttpServletRequest request, HttpServletResponse response)
            throws Exception {
        UserContext user = UserContextHolder.bridgeFromRequest(request, response);
        if (user == null) {
            response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Unauthenticated");
            return;
        }
        try {
            handleAuthenticated(request, response, user);
        } finally {
            UserContextHolder.clear();
        }
    }

    private void handleAuthenticated(HttpServletRequest request, HttpServletResponse response,
                                     UserContext user) throws Exception {
        Map<String, Object> body = readBody(request);
        String runId = strOr(body.get("runId"), request.getParameter("runId"));
        String threadId = strOr(body.get("threadId"), request.getParameter("threadId"));
        long afterSeq = longFrom(body.get("afterSeq"), request.getParameter("afterSeq"));

        if (threadId == null || threadId.isEmpty()) {
            response.sendError(HttpServletResponse.SC_BAD_REQUEST, "threadId required");
            return;
        }
        if (runId == null || runId.isEmpty()) {
            response.sendError(HttpServletResponse.SC_BAD_REQUEST, "runId required");
            return;
        }

        ThreadService threadService = Tools.getBean(ThreadService.class);
        AgentRuntime agentRuntime = Tools.getBean(AgentRuntime.class);
        RunDao runDao = Tools.getBean(RunDao.class);
        try {
            threadService.requireOwnedThread(user, threadId);
            Run run = runDao == null ? null : runDao.getRun(runId);
            if (run == null || !threadId.equals(run.getThreadId())) {
                response.sendError(HttpServletResponse.SC_NOT_FOUND, "run not found");
                return;
            }
            agentRuntime.reconcileRunLease(run);
        } catch (Exception e) {
            response.sendError(HttpServletResponse.SC_FORBIDDEN, safeMsg(e));
            return;
        }

        final AsyncContext actx = request.startAsync(request, response);
        actx.setTimeout(0);
        final String userId = user.getUserId();
        final UserContext finalUser = user;
        final String finalThreadId = threadId;
        final String finalRunId = runId;
        final long finalAfterSeq = afterSeq;
        actx.start(() -> {
            RunScope.runWith(finalUser, () -> {
                SseStreamEmitter emitter = null;
                try {
                    emitter = new SseStreamEmitter(actx);
                    boolean attached = agentRuntime.streamActiveRun(userId, finalThreadId,
                            finalRunId, finalAfterSeq, emitter);
                    if (!attached) {
                        RunStreamEventService liveEventService = Tools.getBean(RunStreamEventService.class);
                        RunDao liveRunDao = Tools.getBean(RunDao.class);
                        if (liveEventService == null) {
                            throw new LobsterException("run_event.persistence_unavailable",
                                    "Run stream event persistence is unavailable");
                        }
                        if (liveRunDao == null) {
                            throw new LobsterException("run.persistence_unavailable",
                                    "Run persistence is unavailable");
                        }
                        attached = waitAndStreamActiveRun(agentRuntime, liveRunDao, userId,
                                finalThreadId, finalRunId, finalAfterSeq, emitter);
                        if (!attached) {
                            try {
                                Tools.log("[RunStreamServlet] active run not attached; fallback to persisted events: "
                                        + finalRunId);
                            } catch (Throwable ignore) { /* ignore */ }
                            tailPersistedEvents(agentRuntime, liveRunDao, liveEventService,
                                    finalRunId, finalAfterSeq, emitter);
                        }
                    }
                } catch (Throwable t) {
                    try {
                        if (emitter != null) {
                            Map<String, Object> payload = new LinkedHashMap<>();
                            payload.put("code", t instanceof LobsterException
                                    ? ((LobsterException) t).getCode() : "runtime.exception");
                            payload.put("message", safeMsg(t));
                            emitter.emit(StreamEvent.of(StreamEventType.error, payload));
                        }
                    } catch (Throwable ignore) { /* best effort */ }
                    try { Tools.log("[RunStreamServlet] stream failed", t); } catch (Throwable ignore) { /* ignore */ }
                } finally {
                    if (emitter != null) emitter.complete();
                    else {
                        try { actx.complete(); } catch (Throwable ignore) { /* ignore */ }
                    }
                }
            });
        });
    }

    private boolean waitAndStreamActiveRun(AgentRuntime agentRuntime, RunDao runDao,
                                           String userId, String threadId, String runId,
                                           long afterSeq, SseStreamEmitter emitter)
            throws Exception {
        long deadline = System.currentTimeMillis() + ACTIVE_ATTACH_RETRY_MS;
        long nextHeartbeat = System.currentTimeMillis() + ACTIVE_ATTACH_HEARTBEAT_MS;
        while (System.currentTimeMillis() < deadline) {
            Run latest = runDao == null ? null : runDao.getRun(runId);
            if (agentRuntime != null) latest = agentRuntime.reconcileRunLease(latest);
            if (!isStillStreaming(latest)) return false;
            if (agentRuntime != null
                    && agentRuntime.streamActiveRun(userId, threadId, runId, afterSeq, emitter)) {
                return true;
            }
            long now = System.currentTimeMillis();
            if (now >= nextHeartbeat) {
                if (!emitter.heartbeat()) return true;
                nextHeartbeat = now + ACTIVE_ATTACH_HEARTBEAT_MS;
            }
            try {
                Thread.sleep(ACTIVE_ATTACH_RETRY_INTERVAL_MS);
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new LobsterException("runtime.interrupted", "interrupted");
            }
        }
        return false;
    }

    private void tailPersistedEvents(AgentRuntime agentRuntime, RunDao runDao,
                                     RunStreamEventService eventService, String runId,
                                     long afterSeq, SseStreamEmitter emitter)
            throws Exception {
        long cursor = afterSeq;
        while (true) {
            EmitResult emitted = emitPersistedEvents(eventService, runId, cursor, emitter);
            cursor = emitted.cursor;
            if (emitted.terminalSeen) return;
            if (!emitter.heartbeat()) return;
            Run latest = runDao == null ? null : runDao.getRun(runId);
            if (agentRuntime != null) latest = agentRuntime.reconcileRunLease(latest);
            if (!isStillStreaming(latest)) {
                EmitResult drained = drainTerminalEvents(eventService, runId, cursor, emitter);
                if (!drained.terminalSeen) {
                    emitMissingTerminalEvents(agentRuntime, eventService, latest, runId, emitter);
                }
                return;
            }
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new LobsterException("runtime.interrupted", "interrupted");
            }
        }
    }

    private EmitResult emitPersistedEvents(RunStreamEventService eventService, String runId,
                                           long afterSeq, SseStreamEmitter emitter) {
        long cursor = afterSeq;
        boolean terminalSeen = false;
        List<StreamEvent> events = eventService.listAfter(runId, cursor);
        for (StreamEvent ev : events) {
            emitter.emit(ev);
            cursor = Math.max(cursor, eventSeq(ev));
            if (ev != null && ev.getType() == StreamEventType.run_ended) {
                terminalSeen = true;
                break;
            }
        }
        return new EmitResult(cursor, terminalSeen);
    }

    private EmitResult drainTerminalEvents(RunStreamEventService eventService, String runId,
                                           long cursor, SseStreamEmitter emitter)
            throws Exception {
        long current = cursor;
        for (int i = 0; i < 15 && emitter.heartbeat(); i++) {
            try {
                Thread.sleep(200L);
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new LobsterException("runtime.interrupted", "interrupted");
            }
            EmitResult next = emitPersistedEvents(eventService, runId, current, emitter);
            current = next.cursor;
            if (next.terminalSeen) return next;
        }
        return new EmitResult(current, false);
    }

    private static final class EmitResult {
        final long cursor;
        final boolean terminalSeen;

        EmitResult(long cursor, boolean terminalSeen) {
            this.cursor = cursor;
            this.terminalSeen = terminalSeen;
        }
    }

    private void emitMissingTerminalEvents(AgentRuntime agentRuntime,
                                           RunStreamEventService eventService,
                                           Run run,
                                           String runId,
                                           SseStreamEmitter emitter) {
        if (agentRuntime == null || eventService == null || emitter == null) return;
        if (run == null || run.getStatus() == RunStatus.running) return;
        if (hasRunEndedEvent(eventService, runId)) return;
        List<StreamEvent> terminal = agentRuntime.recordTerminalObservationEvents(run);
        for (StreamEvent event : terminal) {
            emitter.emit(event);
        }
    }

    private boolean hasRunEndedEvent(RunStreamEventService eventService, String runId) {
        if (eventService == null || runId == null || runId.isEmpty()) return false;
        List<StreamEvent> events = eventService.listAfter(runId, 0L);
        for (StreamEvent event : events) {
            if (event != null && event.getType() == StreamEventType.run_ended) return true;
        }
        return false;
    }

    private boolean isStillStreaming(Run run) {
        return run != null && run.getStatus() == RunStatus.running;
    }

    private long eventSeq(StreamEvent event) {
        if (event == null || event.getPayload() == null) return 0L;
        Object raw = event.getPayload().get("eventSeq");
        if (raw instanceof Number) return ((Number) raw).longValue();
        if (raw == null) return 0L;
        try { return Long.parseLong(String.valueOf(raw)); }
        catch (Throwable ignore) { return 0L; }
    }

    private Map<String, Object> readBody(HttpServletRequest request) {
        String ct = request.getContentType();
        if (ct == null || !ct.toLowerCase().contains("application/json")) {
            return Collections.emptyMap();
        }
        StringBuilder sb = new StringBuilder();
        try (BufferedReader br = request.getReader()) {
            String line;
            while ((line = br.readLine()) != null) sb.append(line);
        } catch (IOException e) {
            return Collections.emptyMap();
        }
        String raw = sb.toString();
        if (raw.isEmpty()) return Collections.emptyMap();
        try {
            return JsonUtil.fromJsonToMap(raw);
        } catch (Throwable t) {
            return Collections.emptyMap();
        }
    }

    private String strOr(Object primary, String fallback) {
        if (primary instanceof String && !((String) primary).isEmpty()) return (String) primary;
        if (primary != null && !(primary instanceof String)) return String.valueOf(primary);
        return fallback;
    }

    private long longFrom(Object bodyVal, String queryVal) {
        Object v = bodyVal != null ? bodyVal : queryVal;
        if (v == null) return 0L;
        if (v instanceof Number) return ((Number) v).longValue();
        try { return Long.parseLong(String.valueOf(v)); }
        catch (Throwable ignore) { return 0L; }
    }

    private String safeMsg(Throwable t) {
        if (t == null) return "unknown";
        String m = t.getMessage();
        return (m == null || m.isEmpty()) ? t.getClass().getSimpleName() : m;
    }
}
