package com.gzzm.lobster.api;

import com.gzzm.lobster.runtime.StreamEmitter;
import com.gzzm.lobster.runtime.StreamEvent;
import com.gzzm.platform.commons.Tools;

import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * SseStreamEmitter —— 基于 Servlet AsyncContext 的 SSE 推送 /
 * SSE stream emitter built on Servlet AsyncContext.
 */
public class SseStreamEmitter implements StreamEmitter {

    private final AsyncContext asyncContext;
    private final HttpServletResponse response;
    private final PrintWriter writer;
    private final AtomicBoolean completed = new AtomicBoolean(false);

    public SseStreamEmitter(AsyncContext asyncContext) throws IOException {
        this.asyncContext = asyncContext;
        this.response = (HttpServletResponse) asyncContext.getResponse();
        this.response.setCharacterEncoding("UTF-8");
        this.response.setContentType("text/event-stream; charset=UTF-8");
        this.response.setHeader("Cache-Control", "no-cache");
        this.response.setHeader("Connection", "keep-alive");
        this.response.setHeader("X-Accel-Buffering", "no");
        this.writer = response.getWriter();
    }

    @Override
    public synchronized void emit(StreamEvent event) {
        if (completed.get()) return;
        try {
            writer.write(event.toSseFrame());
            writer.flush();
        } catch (Throwable t) {
            try { Tools.log("[SseStreamEmitter] emit failed", t); } catch (Throwable ignore) { /* ignore */ }
        }
    }

    /**
     * 客户端断开检测 / Detect client disconnection.
     *
     * <p>{@link java.io.PrintWriter#checkError()} 返回 true 表示之前某次 write 或 flush
     * 产生了 IOException（通常就是对端 RST / 关 tab）。这只代表 SSE 通道断开；
     * 后台任务是否取消由显式 cancel API 决定。
     */
    @Override
    public boolean isBroken() {
        try { return writer.checkError(); }
        catch (Throwable ignore) { return true; }
    }

    @Override
    public synchronized boolean heartbeat() {
        if (completed.get()) return false;
        try {
            writer.write(": ping\n\n");
            writer.flush();
            return !writer.checkError();
        } catch (Throwable t) {
            try { Tools.log("[SseStreamEmitter] heartbeat failed", t); } catch (Throwable ignore) { /* ignore */ }
            return false;
        }
    }

    public void complete() {
        if (!completed.compareAndSet(false, true)) return;
        try { writer.flush(); } catch (Throwable ignore) { /* ignore */ }
        try { asyncContext.complete(); } catch (Throwable ignore) { /* ignore */ }
    }

    @Override
    public void closeObservation() {
        if (!completed.compareAndSet(false, true)) return;
        try { asyncContext.complete(); } catch (Throwable ignore) { /* ignore */ }
    }
}
