package com.gzzm.lobster.runtime;

import com.gzzm.lobster.audit.AuditService;
import com.gzzm.lobster.artifact.Artifact;
import com.gzzm.lobster.artifact.ArtifactService;
import com.gzzm.lobster.common.*;
import com.gzzm.lobster.config.AgentProfile;
import com.gzzm.lobster.config.AgentProfileDao;
import com.gzzm.lobster.config.LobsterConfig;
import com.gzzm.lobster.context.ContextAssembler;
import com.gzzm.lobster.context.ContextAssembly;
import com.gzzm.lobster.guardrails.AgentLoopDetector;
import com.gzzm.lobster.guardrails.ClaimConsistencyChecker;
import com.gzzm.lobster.guardrails.ContentFilter;
import com.gzzm.lobster.guardrails.InternalInfoSanitizer;
import com.gzzm.lobster.identity.UserContext;
import com.gzzm.lobster.llm.*;
import com.gzzm.lobster.pending.PendingRequest;
import com.gzzm.lobster.pending.PendingRequestService;
import com.gzzm.lobster.plan.PlanService;
import com.gzzm.lobster.quota.ConcurrencyGuard;
import com.gzzm.lobster.run.Run;
import com.gzzm.lobster.run.RunDao;
import com.gzzm.lobster.run.RunStreamEventService;
import com.gzzm.lobster.thread.ThreadMessage;
import com.gzzm.lobster.thread.ThreadDao;
import com.gzzm.lobster.thread.ThreadRoom;
import com.gzzm.lobster.thread.ThreadService;
import com.gzzm.lobster.tool.*;
import com.gzzm.lobster.tool.mcp.McpRuntimeToolCache;
import com.gzzm.lobster.tool.mcp.McpServerConfig;
import com.gzzm.lobster.tool.mcp.McpToolCache;
import com.gzzm.lobster.workspace.ResourceMetadata;
import com.gzzm.lobster.workspace.WorkspaceResource;
import com.gzzm.lobster.workspace.WorkspaceService;
import com.gzzm.platform.commons.Tools;
import net.cyan.arachne.annotation.Service;
import net.cyan.nest.annotation.Inject;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
 * AgentRuntime —— 大龙虾核心运行时 / Agent Runtime core.
 *
 * <p>实现 ReAct 主循环（可选 Plan 增强）：
 * <pre>
 * 接收输入
 *   → 写入 thread transcript
 *   → 组装上下文（ContextAssembler）
 *   → 流式调用 LLM，token 级 delta 实时推送前端
 *   → 若有 tool_calls，执行 ToolExecutor（每步前 check 取消）
 *   → 工具结果回写 transcript，流式推送
 *   → 循环直到：无 tool_calls / maxTurns / loop / pending / cancelled / error
 * </pre>
 *
 * <p>严格遵循：
 * <ul>
 *   <li>thread-first：连续性来自 thread，不依赖 run 内部变量</li>
 *   <li>run-thin：run 只是观测记录</li>
 *   <li>runtime 不做 completion gate、不做内容质量裁决</li>
 *   <li>异步/交互型工具通过 pending request 自然结束当前 run</li>
 *   <li>用户 cancel 会在下一个 LLM/工具边界立刻生效</li>
 * </ul>
 *
 * <p>V2（2026-04）升级点：
 * <ul>
 *   <li>{@link #cancelFlags} 从 AtomicBoolean 升级为 AtomicReference&lt;CancelReason&gt;
 *       —— 区分 USER / TIMEOUT / BUDGET / UPSTREAM，LlmRuntime 据此决定 fallback</li>
 *   <li>新增 {@link RunContext} + {@link RunEventBus}：主循环附带 publish 生命周期事件，
 *       不持久化，进程内 SSE / audit 订阅。旧的 {@code emitter.xxx} 调用全部保留</li>
 *   <li>ToolContext 携带 deadline + cancelFlag，{@link ToolExecutorDispatcher} 据此做
 *       timeout 和 cancel 传播</li>
 *   <li>{@link AgentLoopDetector} 改用 ProgressSignal 识别强信号（plan 迁移 / artifact 产出）</li>
 *   <li>显式 cancel 通过 {@link #cancel(String)} 传播；SSE 客户端断连不再终止后台 run</li>
 * </ul>
 */
@Service(url = "/ai/runtime/service")
public class AgentRuntime {

    @Inject private ThreadService threadService;
    @Inject private ContextAssembler contextAssembler;
    @Inject private ToolExecutorDispatcher toolExecutorDispatcher;
    @Inject private ToolRegistry toolRegistry;
    @Inject private LlmRuntime llmRuntime;
    @Inject private ModelRouter modelRouter;
    @Inject private RunDao runDao;
    @Inject private AgentProfileDao agentProfileDao;
    @Inject private ThreadDao threadDao;
    @Inject private AuditService auditService;
    @Inject private PlanService planService;
    @Inject private PendingRequestService pendingRequestService;
    @Inject private RunStreamEventService runStreamEventService;
    @Inject private ClaimConsistencyChecker claimChecker;
    @Inject private ContentFilter contentFilter;
    @Inject private InternalInfoSanitizer sanitizer;
    @Inject private ArtifactService artifactService;
    @Inject private McpRuntimeToolCache mcpRuntimeToolCache;

    @Inject private ConcurrencyGuard concurrencyGuard;
    @Inject private WorkspaceService workspaceService;

    /** thunwind DAO 跨线程保护 —— 详见 feedback_thunwind_dao_thread_binding */
    private RunDao runDao() {
        try {
            RunDao d = Tools.getBean(RunDao.class);
            if (d != null) return d;
        } catch (Throwable ignore) { /* fallback */ }
        return runDao;
    }
    private AgentProfileDao agentProfileDao() {
        try {
            AgentProfileDao d = Tools.getBean(AgentProfileDao.class);
            if (d != null) return d;
        } catch (Throwable ignore) { /* fallback */ }
        return agentProfileDao;
    }
    private ThreadDao threadDao() {
        try {
            ThreadDao d = Tools.getBean(ThreadDao.class);
            if (d != null) return d;
        } catch (Throwable ignore) { /* fallback */ }
        return threadDao;
    }
    private ArtifactService artifactService() {
        try {
            ArtifactService s = Tools.getBean(ArtifactService.class);
            if (s != null) return s;
        } catch (Throwable ignore) { /* fallback */ }
        return artifactService;
    }
    private WorkspaceService workspaceService() {
        try {
            WorkspaceService s = Tools.getBean(WorkspaceService.class);
            if (s != null) return s;
        } catch (Throwable ignore) { /* fallback */ }
        return workspaceService;
    }
    private PendingRequestService pendingRequestService() {
        try {
            PendingRequestService s = Tools.getBean(PendingRequestService.class);
            if (s != null) return s;
        } catch (Throwable ignore) { /* fallback */ }
        return pendingRequestService;
    }

    private static final int DEFAULT_MAX_TURNS = 20;
    /**
     * 输出预留 token：从模型 contextWindow 中扣除给 assistant 输出 + 工具调用使用的预算.
     * 模型 ModelProfile.maxOutputTokens 非空时优先使用该值，否则退回此默认.
     */
    private static final int DEFAULT_OUTPUT_RESERVE_TOKENS = 4000;
    /** 安全余量：补偿 TokenEstimator 的偏差 + adapter 包装 + 极端长 system 模板. */
    private static final int CONTEXT_SAFETY_MARGIN_TOKENS = 2000;
    /** 单轮 LLM 流式空闲上限 / per-turn LLM stream idle-timeout fallback. */
    private static final long DEFAULT_LLM_TURN_TIMEOUT_MS = 120_000L;
    /** 整个 run 级 deadline 默认值；= maxTurns * 单轮上限 + 余量 */
    private static final long RUN_DEADLINE_HEADROOM_MS = 60_000L;
    private static final long RUN_HEARTBEAT_INTERVAL_MS = 15_000L;
    private static final long RUN_LEASE_STALE_MS = 90_000L;
    private static final long ACTIVE_RUN_RECORD_GRACE_MS = 15_000L;
    private final String runtimeWorkerId = "worker-" + UUID.randomUUID().toString().replace("-", "");

    /**
     * 活跃 run 的取消引用表 / Active-run cancel references (V2: 携带 CancelReason).
     *
     * <p>{@link #cancel(String, CancelReason)} 设置原因后，主循环每轮前 + 每次工具调用前
     * 都会检查。LLM 流式调用通过注入的 handler 读到 cancel 标记即抛弃该轮剩余输出。
     * LlmRuntime 据 {@link CancelReason} 决定是否 fallback 模型。
     */
    private final ConcurrentHashMap<String, AtomicReference<CancelReason>> cancelFlags = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, ActiveRun> activeRuns = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, Object> threadStartLocks = new ConcurrentHashMap<>();

    private static final class ActiveRun {
        private static final int RECENT_STREAM_EVENT_LIMIT = 20000;

        final String runId;
        final String threadId;
        final String userId;
        volatile String modelId;
        volatile long heartbeatAtMs;
        volatile boolean workerStarted;
        volatile boolean closed;
        final AtomicLong nextEventSeq = new AtomicLong(1L);
        final MultiplexStreamEmitter emitter;
        final CountDownLatch done = new CountDownLatch(1);
        final Deque<StreamEvent> recentEvents = new ArrayDeque<>();
        final Deque<StreamEvent> bufferedEvents = new ArrayDeque<>();

        ActiveRun(String runId, String threadId, String userId, MultiplexStreamEmitter emitter, long nextEventSeq) {
            this.runId = runId;
            this.threadId = threadId;
            this.userId = userId;
            this.heartbeatAtMs = System.currentTimeMillis();
            this.nextEventSeq.set(Math.max(1L, nextEventSeq));
            this.emitter = emitter;
        }

        StreamEvent withEventSeq(StreamEvent event, long seq) {
            Map<String, Object> payload = new LinkedHashMap<>(event.getPayload());
            payload.put("eventSeq", seq);
            return StreamEvent.of(event.getType(), payload);
        }

        void remember(StreamEvent event) {
            if (event == null) return;
            recentEvents.addLast(event);
            while (recentEvents.size() > RECENT_STREAM_EVENT_LIMIT) {
                recentEvents.removeFirst();
            }
        }

        List<StreamEvent> recentAfter(long afterSeq) {
            List<StreamEvent> out = new ArrayList<>();
            for (StreamEvent event : recentEvents) {
                if (eventSeq(event) > afterSeq) out.add(event);
            }
            return out;
        }

        List<StreamEvent> bufferedAfter(long afterSeq) {
            List<StreamEvent> out = new ArrayList<>();
            for (StreamEvent event : bufferedEvents) {
                if (eventSeq(event) > afterSeq) out.add(event);
            }
            return out;
        }

        void buffer(StreamEvent event) {
            if (event != null) bufferedEvents.addLast(event);
        }

        void flushBuffered(RunStreamEventService eventService) {
            if (eventService == null) {
                throw new LobsterException("run_event.persistence_unavailable",
                        "Run stream event persistence is unavailable");
            }
            while (!bufferedEvents.isEmpty()) {
                StreamEvent event = bufferedEvents.peekFirst();
                long seq = eventSeq(event);
                eventService.record(userId, threadId, runId, seq, event);
                bufferedEvents.removeFirst();
            }
        }

        private static long eventSeq(StreamEvent event) {
            if (event == null || event.getPayload() == null) return 0L;
            Object raw = event.getPayload().get("eventSeq");
            if (raw instanceof Number) return ((Number) raw).longValue();
            if (raw == null) return 0L;
            try { return Long.parseLong(String.valueOf(raw)); }
            catch (Throwable ignore) { return 0L; }
        }
    }

    public static final class RunLeaseSnapshot {
        private final boolean active;
        private final long heartbeatAtMs;
        private final long staleAfterMs;

        RunLeaseSnapshot(boolean active, long heartbeatAtMs, long staleAfterMs) {
            this.active = active;
            this.heartbeatAtMs = heartbeatAtMs;
            this.staleAfterMs = staleAfterMs;
        }

        public boolean isActive() { return active; }
        public long getHeartbeatAtMs() { return heartbeatAtMs; }
        public long getStaleAfterMs() { return staleAfterMs; }
    }

    private static final class RecordingStreamEmitter implements StreamEmitter {
        private final ActiveRun activeRun;
        private final String userId;
        private final RunStreamEventService eventService;

        RecordingStreamEmitter(ActiveRun activeRun, String userId,
                               RunStreamEventService eventService) {
            this.activeRun = activeRun;
            this.userId = userId;
            this.eventService = eventService;
        }

        @Override
        public void emit(StreamEvent event) {
            if (event == null) return;
            if (eventService == null) {
                throw new LobsterException("run_event.persistence_unavailable",
                        "Run stream event persistence is unavailable");
            }
            StreamEvent enriched;
            synchronized (activeRun) {
                if (activeRun.closed) return;
                long seq = activeRun.nextEventSeq.getAndIncrement();
                enriched = activeRun.withEventSeq(event, seq);
                activeRun.remember(enriched);
                activeRun.buffer(enriched);
                if (isFlushBoundary(enriched)) {
                    activeRun.flushBuffered(eventService);
                }
                activeRun.emitter.emit(enriched);
            }
        }

        @Override
        public void flushBufferedEvents() {
            synchronized (activeRun) {
                activeRun.flushBuffered(eventService);
            }
        }

        private boolean isFlushBoundary(StreamEvent event) {
            if (event == null) return false;
            StreamEventType type = event.getType();
            return type == StreamEventType.run_started
                    || type == StreamEventType.pending_request
                    || type == StreamEventType.run_ended
                    || type == StreamEventType.error;
        }
    }

    private static final class RunOwnershipLostException extends RuntimeException {
        RunOwnershipLostException(String runId) {
            super("Run worker ownership lost: " + runId);
        }
    }

    /**
     * LLM 流式调用后台线程池 / Background executor for streaming LLM calls.
     *
     * <p>把阻塞式 HTTP 读流调用从主循环线程剥离，使主循环能真正地 poll cancel 标记。
     * Daemon 线程保证 JVM 关闭时不阻塞。
     */
    private final ExecutorService llmStreamPool = Executors.newCachedThreadPool(r -> {
        Thread t = new Thread(r, "lobster-llm-stream");
        t.setDaemon(true);
        return t;
    });

    private final ExecutorService runWorkerPool = Executors.newCachedThreadPool(r -> {
        Thread t = new Thread(r, "lobster-run-worker");
        t.setDaemon(true);
        return t;
    });

    private final ScheduledExecutorService runLeasePool = Executors.newSingleThreadScheduledExecutor(r -> {
        Thread t = new Thread(r, "lobster-run-lease");
        t.setDaemon(true);
        return t;
    });

    {
        runLeasePool.scheduleAtFixedRate(() -> {
            try {
                heartbeatActiveRuns();
                recoverStaleRunnableRuns();
            } catch (Throwable t) {
                try { Tools.log("[AgentRuntime] run lease heartbeat failed", t); } catch (Throwable ignore) { /* ignore */ }
            }
        }, RUN_HEARTBEAT_INTERVAL_MS, RUN_HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
    }

    /**
     * 执行一次 run / Execute one run loop.
     *
     * <p>当 emitter 为 null 时退化为非流式（使用 NoopStreamEmitter）。
     */
    public RunResult run(RunRequest request, StreamEmitter emitter) throws Exception {
        if (emitter == null) emitter = new NoopStreamEmitter();
        ThreadRoom thread = request.getThread();
        if (thread == null) throw new LobsterException("runtime.no_thread", "Run requires thread");
        try (ConcurrencyGuard.Acquisition ignored = concurrencyGuard.acquire(
                request.getUser().getUserId(), thread.getThreadId(), 3, 1)) {
            return runInternal(request, emitter);
        }
    }

    /**
     * Start a run in a backend worker and return its runId immediately.
     *
     * <p>The resulting run is observable through persisted stream events and
     * /runs/stream reconnect; callers must not depend on the initiating HTTP
     * request staying open.
     */
    public String startDetached(RunRequest request) {
        ThreadRoom thread = request == null ? null : request.getThread();
        if (thread == null) throw new LobsterException("runtime.no_thread", "Run requires thread");
        String threadId = thread.getThreadId();
        final String runId = request.getRunId() == null || request.getRunId().isEmpty()
                ? IdGenerator.runId() : request.getRunId();
        final RunRequest detached = request.withRunId(runId);
        Object startLock = threadStartLocks.computeIfAbsent(threadId, k -> new Object());
        boolean claimedThread = false;
        ActiveRun preRegistered = null;
        try {
            synchronized (startLock) {
                Run idempotent = findRunByClientRequestId(detached);
                if (idempotent != null) {
                    Run reconciled = reconcileRunLease(idempotent);
                    return reconciled == null ? idempotent.getRunId() : reconciled.getRunId();
                }
                Run active = activeRunForThread(threadId);
                if (active != null) {
                    String sourceRunId = detached.getContinuationSourceRunId();
                    boolean continuingFromActive = detached.isResumeContinuation()
                            && sourceRunId != null && sourceRunId.equals(active.getRunId());
                    if (!continuingFromActive) {
                        if (detached.isResumeContinuation()
                                && sourceRunId != null && !sourceRunId.isEmpty()) {
                            throw new LobsterException("run.continuation_conflict",
                                    "Continuation source run is not the active run");
                        }
                        return active.getRunId();
                    }
                    int replaced = threadDao().replaceActiveRun(runId, new Date(), threadId,
                            detached.getUser().getUserId(), sourceRunId);
                    if (replaced <= 0) {
                        Run latest = activeRunForThread(threadId);
                        return latest == null ? active.getRunId() : latest.getRunId();
                    }
                    claimedThread = true;
                } else {
                    if (blocksNewRunForOpenPending(detached)) {
                        throw new LobsterException("run.pending_open",
                                "Thread has an open pending request; resolve it before starting a new run");
                    }
                    int claimed = threadDao().claimActiveRun(runId, new Date(), threadId,
                            detached.getUser().getUserId());
                    if (claimed <= 0) {
                        Run latest = activeRunForThread(threadId);
                        if (latest != null) return latest.getRunId();
                        throw new LobsterException("run.claim_conflict",
                                "Thread already has an active run claim");
                    }
                    claimedThread = true;
                }
                Run existing = runDao().getRun(runId);
                Date now = new Date();
                if (existing == null) {
                    Run run = new Run();
                    run.setRunId(runId);
                    run.setThreadId(threadId);
                    run.setUserId(detached.getUser().getUserId());
                    run.setTriggerType(detached.getTriggerSource());
                    run.setTriggerRef(detached.getTriggerRef());
                    run.setClientRequestId(detached.getClientRequestId());
                    run.setContinuationSourceRunId(detached.getContinuationSourceRunId());
                    run.setRequestPayloadJson(serializeRunRequest(detached));
                    run.setStatus(RunStatus.running);
                    run.setTurns(0);
                    run.setStartedAt(now);
                    run.setHeartbeatAt(run.getStartedAt());
                    run.setWorkerId(runtimeWorkerId);
                    run.setClaimedAt(now);
                    runDao().save(run);
                } else if (detached.isResumeContinuation()) {
                    existing.setClientRequestId(detached.getClientRequestId());
                    existing.setContinuationSourceRunId(detached.getContinuationSourceRunId());
                    existing.setRequestPayloadJson(serializeRunRequest(detached));
                    existing.setStatus(RunStatus.running);
                    existing.setExitReason(null);
                    existing.setEndedAt(null);
                    existing.setHeartbeatAt(now);
                    existing.setWorkerId(runtimeWorkerId);
                    existing.setClaimedAt(now);
                    existing.setErrorCode(null);
                    existing.setErrorMessage(null);
                    runDao().save(existing);
                } else {
                    existing.setClientRequestId(detached.getClientRequestId());
                    existing.setContinuationSourceRunId(detached.getContinuationSourceRunId());
                    existing.setRequestPayloadJson(serializeRunRequest(detached));
                    existing.setWorkerId(runtimeWorkerId);
                    existing.setClaimedAt(now);
                    existing.setHeartbeatAt(now);
                    runDao().save(existing);
                }
                closeContinuationSourceRun(detached, now);
                preRegistered = registerActiveRun(runId, threadId,
                        detached.getUser().getUserId(), null);
            }
        } catch (Throwable t) {
            if (preRegistered != null) {
                finishActiveRun(preRegistered);
                activeRuns.remove(runId, preRegistered);
            }
            if (claimedThread) {
                if (!restoreContinuationSourceClaim(detached, threadId, runId)) {
                    try { threadDao().clearActiveRunIfMatches(new Date(), threadId, runId); } catch (Throwable ignore) { /* ignore */ }
                }
            }
            markDetachedRunNotAccepted(runId, detached, t);
            try { Tools.log("[AgentRuntime] pre-create detached run failed: " + runId, t); } catch (Throwable ignore) { /* ignore */ }
            if (t instanceof LobsterException) throw (LobsterException) t;
            throw new LobsterException("runtime.run_persist_failed",
                    "Failed to create durable run record", t);
        }
        if (!submitRunWorker(runId, detached)) {
            ActiveRun active = activeRuns.get(runId);
            if (active != null && !active.workerStarted && activeRuns.remove(runId, active)) {
                finishActiveRun(active);
            }
            LobsterException failure = new LobsterException("runtime.worker_submit_failed",
                    "Failed to submit detached run worker");
            try { Tools.log("[AgentRuntime] detached run worker submit failed; durable run will be recovered: " + runId, failure); }
            catch (Throwable ignore) { /* ignore */ }
            return runId;
        }
        return runId;
    }

    private Run findRunByClientRequestId(RunRequest request) {
        if (request == null || request.getClientRequestId() == null
                || request.getClientRequestId().isEmpty()) {
            return null;
        }
        ThreadRoom thread = request.getThread();
        UserContext user = request.getUser();
        if (thread == null || user == null) return null;
        try {
            List<Run> runs = runDao().listByClientRequestId(user.getUserId(),
                    thread.getThreadId(), request.getClientRequestId());
            return runs == null || runs.isEmpty() ? null : runs.get(0);
        } catch (Throwable t) {
            try { Tools.log("[AgentRuntime] idempotent run lookup failed: "
                    + request.getClientRequestId(), t); } catch (Throwable ignore) { /* ignore */ }
            return null;
        }
    }

    private boolean restoreContinuationSourceClaim(RunRequest detached, String threadId, String runId) {
        if (detached == null || !detached.isResumeContinuation()) return false;
        String sourceRunId = detached.getContinuationSourceRunId();
        if (sourceRunId == null || sourceRunId.isEmpty() || sourceRunId.equals(runId)) return false;
        try {
            Run source = runDao().getRun(sourceRunId);
            if (source != null) {
                source.setStatus(RunStatus.waiting_user);
                source.setExitReason(null);
                source.setEndedAt(null);
                source.setHeartbeatAt(new Date());
                runDao().save(source);
            }
            threadDao().replaceActiveRun(sourceRunId, new Date(), threadId,
                    detached.getUser().getUserId(), runId);
            return true;
        } catch (Throwable t) {
            try { Tools.log("[AgentRuntime] restore continuation source failed: " + sourceRunId, t); }
            catch (Throwable ignore) { /* ignore */ }
            return false;
        }
    }

    private void closeContinuationSourceRun(RunRequest detached, Date now) throws Exception {
        if (detached == null || !detached.isResumeContinuation()) return;
        String sourceRunId = detached.getContinuationSourceRunId();
        String runId = detached.getRunId();
        if (sourceRunId == null || sourceRunId.isEmpty() || sourceRunId.equals(runId)) return;
        Date endedAt = now == null ? new Date() : now;
        int updated = runDao().finishIfStatus(RunStatus.ended, endedAt, RunExitReason.pending,
                sourceRunId, RunStatus.waiting_user);
        if (updated > 0) return;
        Run source = runDao().getRun(sourceRunId);
        if (source != null && source.getStatus() == RunStatus.ended
                && source.getExitReason() == RunExitReason.pending) {
            return;
        }
        throw new LobsterException("run.source_close_failed",
                "Failed to close continuation source run");
    }

    private void markDetachedRunFailed(String runId, RunRequest request, Throwable t) {
        if (runId == null || runId.isEmpty() || request == null) return;
        if (isOwnershipLost(t)) return;
        try {
            Run run = null;
            try { run = runDao().getRun(runId); } catch (Throwable ignore) { /* create below */ }
            if (run != null && isTerminalStatus(run.getStatus())) return;
            if (run == null) {
                run = new Run();
                run.setRunId(runId);
                if (request.getThread() != null) run.setThreadId(request.getThread().getThreadId());
                if (request.getUser() != null) run.setUserId(request.getUser().getUserId());
                run.setTriggerType(request.getTriggerSource());
                run.setTriggerRef(request.getTriggerRef());
                run.setClientRequestId(request.getClientRequestId());
                run.setContinuationSourceRunId(request.getContinuationSourceRunId());
                run.setTurns(0);
                run.setStartedAt(new Date());
            }
            run.setStatus(RunStatus.error);
            run.setExitReason(RunExitReason.error);
            run.setEndedAt(new Date());
            run.setHeartbeatAt(run.getEndedAt());
            run.setErrorCode(t instanceof LobsterException
                    ? ((LobsterException) t).getCode() : "runtime.detached_worker");
            run.setErrorMessage(truncate(safeMsg(t), 2000));
            runDao().save(run);
            clearThreadActiveRun(run);
        } catch (Throwable persistFailure) {
            try { Tools.log("[AgentRuntime] persist detached failure failed: " + runId, persistFailure); } catch (Throwable ignore) { /* ignore */ }
        }
    }

    private void markDetachedRunNotAccepted(String runId, RunRequest request, Throwable t) {
        if (runId == null || runId.isEmpty() || request == null) return;
        try {
            Run run = runDao().getRun(runId);
            if (run == null || isTerminalStatus(run.getStatus())) return;
            run.setStatus(RunStatus.cancelled);
            run.setExitReason(RunExitReason.cancelled);
            run.setEndedAt(new Date());
            run.setHeartbeatAt(run.getEndedAt());
            run.setErrorCode(t instanceof LobsterException
                    ? ((LobsterException) t).getCode() : "runtime.worker_submit_failed");
            run.setErrorMessage(truncate(safeMsg(t), 2000));
            runDao().save(run);
            clearThreadActiveRun(run);
        } catch (Throwable persistFailure) {
            try { Tools.log("[AgentRuntime] persist detached not-accepted failed: " + runId, persistFailure); } catch (Throwable ignore) { /* ignore */ }
        }
    }

    private boolean isTerminalStatus(RunStatus status) {
        return status == RunStatus.ended || status == RunStatus.error || status == RunStatus.cancelled;
    }

    public Run activeRunForThread(String threadId) throws Exception {
        if (threadId == null || threadId.isEmpty()) return null;
        ThreadRoom thread = null;
        try { thread = threadDao().getThread(threadId); } catch (Throwable ignore) { /* fallback to run scan */ }
        String activeRunId = thread == null ? null : thread.getActiveRunId();
        if (activeRunId != null && !activeRunId.isEmpty()) {
            Run claimed = runDao().getRun(activeRunId);
            Run reconciled = reconcileRunLease(claimed);
            if (reconciled != null && isActiveStatus(reconciled.getStatus())) {
                return reconciled;
            }
            if (claimed == null && isRecentActiveRunClaim(thread)) {
                Run placeholder = new Run();
                placeholder.setRunId(activeRunId);
                placeholder.setThreadId(threadId);
                placeholder.setUserId(thread.getUserId());
                placeholder.setStatus(RunStatus.running);
                placeholder.setStartedAt(thread.getActiveRunUpdatedAt());
                placeholder.setHeartbeatAt(thread.getActiveRunUpdatedAt());
                return placeholder;
            }
            try { threadDao().clearActiveRunIfMatches(new Date(), threadId, activeRunId); } catch (Throwable ignore) { /* ignore */ }
        }
        List<Run> runs = runDao().listByThread(threadId);
        if (runs == null) return null;
        for (Run r : runs) {
            if (r == null) continue;
            Run reconciled = reconcileRunLease(r);
            if (reconciled != null && isActiveStatus(reconciled.getStatus())) {
                try {
                    threadDao().claimActiveRun(reconciled.getRunId(), new Date(),
                            reconciled.getThreadId(), reconciled.getUserId());
                } catch (Throwable ignore) { /* best effort for legacy rows */ }
                return reconciled;
            }
        }
        return null;
    }

    private boolean isRecentActiveRunClaim(ThreadRoom thread) {
        if (thread == null || thread.getActiveRunUpdatedAt() == null) return false;
        long age = System.currentTimeMillis() - thread.getActiveRunUpdatedAt().getTime();
        return age >= 0L && age < ACTIVE_RUN_RECORD_GRACE_MS;
    }

    private boolean isActiveStatus(RunStatus status) {
        return status == RunStatus.running || status == RunStatus.waiting_user;
    }

    private boolean submitRunWorker(final String runId, final RunRequest detached) {
        try {
            runWorkerPool.submit(() -> {
                try {
                    RunScope.runWith(detached.getUser(), () -> {
                        try {
                            run(detached, null);
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    });
                } catch (Throwable t) {
                    if (isOwnershipLost(t)) {
                        try { Tools.log("[AgentRuntime] detached run ownership lost: " + runId); } catch (Throwable ignore) { /* ignore */ }
                        return;
                    }
                    try { Tools.log("[AgentRuntime] detached run failed: " + runId, t); } catch (Throwable ignore) { /* ignore */ }
                    markDetachedRunFailed(runId, detached, t);
                }
            });
            return true;
        } catch (Throwable t) {
            try { Tools.log("[AgentRuntime] submit detached run failed: " + runId, t); } catch (Throwable ignore) { /* ignore */ }
            // Keep the durable run in running state. A rejected local worker is a
            // lease failure, not a task terminal state; recovery can claim it once
            // the heartbeat is stale.
            return false;
        }
    }

    private String serializeRunRequest(RunRequest request) {
        return serializeRunRequest(request, request == null || request.shouldAppendUserInput(),
                request != null && request.isResumeContinuation());
    }

    private String serializeRunRequest(RunRequest request, boolean appendUserInput, boolean resumeContinuation) {
        if (request == null) return null;
        Map<String, Object> payload = new LinkedHashMap<>();
        payload.put("runId", request.getRunId());
        payload.put("threadId", request.getThread() == null ? null : request.getThread().getThreadId());
        payload.put("userInput", request.getUserInput());
        payload.put("triggerSource", request.getTriggerSource());
        payload.put("triggerRef", request.getTriggerRef());
        payload.put("clientRequestId", request.getClientRequestId());
        payload.put("attachmentMediaIds", request.getAttachmentMediaIds());
        payload.put("attachedResourceIds", request.getAttachedResourceIds());
        payload.put("kbEnabled", request.isKbEnabled());
        payload.put("kbMode", request.getKbMode());
        payload.put("kbScopeIds", request.getKbScopeIds());
        payload.put("appendUserInput", appendUserInput);
        payload.put("resumeContinuation", resumeContinuation);
        payload.put("continuationSourceRunId", request.getContinuationSourceRunId());
        UserContext user = request.getUser();
        if (user != null) {
            Map<String, Object> u = new LinkedHashMap<>();
            u.put("userId", user.getUserId());
            u.put("externalUserId", user.getExternalUserId());
            u.put("deptId", user.getDeptId());
            u.put("orgId", user.getOrgId());
            u.put("displayName", user.getDisplayName());
            u.put("roles", new ArrayList<>(user.getRoles()));
            payload.put("user", u);
        }
        return JsonUtil.toJson(payload);
    }

    @SuppressWarnings("unchecked")
    private RunRequest restoreRunRequest(Run run) throws Exception {
        if (run == null || run.getRequestPayloadJson() == null || run.getRequestPayloadJson().isEmpty()) {
            return null;
        }
        Map<String, Object> payload = JsonUtil.fromJsonToMap(run.getRequestPayloadJson());
        if (payload == null) return null;
        String threadId = str(payload.get("threadId"), run.getThreadId());
        ThreadRoom thread = threadService.tryLoad(threadId);
        if (thread == null) return null;
        Map<String, Object> u = payload.get("user") instanceof Map
                ? (Map<String, Object>) payload.get("user") : Collections.<String, Object>emptyMap();
        Set<String> roles = new LinkedHashSet<>();
        Object rawRoles = u.get("roles");
        if (rawRoles instanceof Iterable) {
            for (Object role : (Iterable<?>) rawRoles) if (role != null) roles.add(String.valueOf(role));
        }
        UserContext user = new UserContext(
                str(u.get("userId"), run.getUserId()),
                str(u.get("externalUserId"), null),
                str(u.get("deptId"), null),
                str(u.get("orgId"), thread.getOrgId()),
                str(u.get("displayName"), null),
                roles);
        RunRequest request = new RunRequest(thread, user,
                str(payload.get("userInput"), ""),
                str(payload.get("triggerSource"), run.getTriggerType()),
                str(payload.get("triggerRef"), run.getTriggerRef()),
                stringList(payload.get("attachmentMediaIds")),
                stringList(payload.get("attachedResourceIds")),
                bool(payload.get("kbEnabled")),
                str(payload.get("kbMode"), "auto"),
                stringList(payload.get("kbScopeIds")))
                .withRunId(run.getRunId())
                .withClientRequestId(str(payload.get("clientRequestId"), run.getClientRequestId()));
        boolean append = boolDefault(payload.get("appendUserInput"), false);
        boolean resume = boolDefault(payload.get("resumeContinuation"), true);
        if (!append || resume) {
            request = request.asResumeContinuation(str(payload.get("continuationSourceRunId"), null), append);
        }
        return request;
    }

    private boolean blocksNewRunForOpenPending(RunRequest request) {
        if (request == null || request.isResumeContinuation()) return false;
        if ("pending_resolve".equals(request.getTriggerSource())) return false;
        ThreadRoom thread = request.getThread();
        if (thread == null || thread.getThreadId() == null || thread.getThreadId().isEmpty()) return false;
        try {
            PendingRequestService svc = pendingRequestService();
            return svc != null && svc.threadHasOpen(thread.getThreadId());
        } catch (Throwable t) {
            try { Tools.log("[AgentRuntime] pending guard failed: " + thread.getThreadId(), t); } catch (Throwable ignore) { /* ignore */ }
            return false;
        }
    }

    private boolean isSameRunRecovery(RunRequest request, String runId) {
        if (request == null || !request.isResumeContinuation()) return false;
        String sourceRunId = request.getContinuationSourceRunId();
        return sourceRunId == null || sourceRunId.isEmpty() || sourceRunId.equals(runId);
    }

    private int turnBudgetFor(RunRequest request, String runId, int maxTurns, int initialTurns) {
        if (!isSameRunRecovery(request, runId)) return Math.max(0, maxTurns);
        return Math.max(0, maxTurns - Math.max(0, initialTurns));
    }

    private static String str(Object value, String fallback) {
        if (value == null) return fallback;
        String s = String.valueOf(value);
        return s.isEmpty() ? fallback : s;
    }

    private static boolean bool(Object value) {
        return boolDefault(value, false);
    }

    private static boolean boolDefault(Object value, boolean fallback) {
        if (value == null) return fallback;
        if (value instanceof Boolean) return (Boolean) value;
        return Boolean.parseBoolean(String.valueOf(value));
    }

    private static List<String> stringList(Object value) {
        if (!(value instanceof Iterable)) return Collections.emptyList();
        List<String> out = new ArrayList<>();
        for (Object item : (Iterable<?>) value) if (item != null) out.add(String.valueOf(item));
        return out;
    }

    private void clearThreadActiveRun(Run run) {
        if (run == null || run.getThreadId() == null || run.getRunId() == null) return;
        try { threadDao().clearActiveRunIfMatches(new Date(), run.getThreadId(), run.getRunId()); }
        catch (Throwable t) { try { Tools.log("[AgentRuntime] clear active run failed: " + run.getRunId(), t); } catch (Throwable ignore) { /* ignore */ } }
    }

    private void assertRunOwner(Run run) throws Exception {
        if (run == null || run.getRunId() == null) return;
        Run latest = runDao().getRun(run.getRunId());
        if (latest == null || latest.getStatus() != RunStatus.running
                || !runtimeWorkerId.equals(latest.getWorkerId())) {
            throw new RunOwnershipLostException(run.getRunId());
        }
    }

    private boolean isCurrentRunOwnerQuietly(String runId) {
        if (runId == null || runId.isEmpty()) return false;
        try {
            Run latest = runDao().getRun(runId);
            return latest != null
                    && latest.getStatus() == RunStatus.running
                    && runtimeWorkerId.equals(latest.getWorkerId());
        } catch (Throwable t) {
            return false;
        }
    }

    private void persistProgressOwned(Run run) throws Exception {
        if (run == null || run.getRunId() == null) return;
        Date heartbeatAt = run.getHeartbeatAt() == null ? new Date() : run.getHeartbeatAt();
        int updated = runDao().progressOwned(run.getTurns(), heartbeatAt, run.getRunId(),
                RunStatus.running, runtimeWorkerId);
        if (updated <= 0) throw new RunOwnershipLostException(run.getRunId());
    }

    private void persistRequestPayloadOwned(Run run) throws Exception {
        if (run == null || run.getRunId() == null) return;
        int updated = runDao().updateRequestPayloadOwned(run.getRequestPayloadJson(),
                run.getRunId(), RunStatus.running, runtimeWorkerId);
        if (updated <= 0) throw new RunOwnershipLostException(run.getRunId());
    }

    private void persistModelOwned(Run run) throws Exception {
        if (run == null || run.getRunId() == null) return;
        int updated = runDao().updateModelOwned(run.getModelId(), run.getRunId(),
                RunStatus.running, runtimeWorkerId);
        if (updated <= 0) throw new RunOwnershipLostException(run.getRunId());
    }

    private void persistSuspendedOwned(Run run) throws Exception {
        if (run == null || run.getRunId() == null) return;
        Date heartbeatAt = run.getHeartbeatAt() == null ? new Date() : run.getHeartbeatAt();
        int updated = runDao().suspendOwned(RunStatus.waiting_user, RunExitReason.pending,
                run.getTurns(), heartbeatAt, run.getRunId(), RunStatus.running, runtimeWorkerId);
        if (updated <= 0) throw new RunOwnershipLostException(run.getRunId());
    }

    private void persistTerminalOwned(Run run, RunStatus status, RunExitReason exitReason,
                                      Date endedAt) throws Exception {
        if (run == null || run.getRunId() == null) return;
        int updated = runDao().finishOwned(status, endedAt, exitReason, run.getTurns(),
                run.getErrorCode(), run.getErrorMessage(), run.getRunId(),
                RunStatus.running, runtimeWorkerId);
        if (updated <= 0) throw new RunOwnershipLostException(run.getRunId());
    }

    private boolean isOwnershipLost(Throwable t) {
        Throwable cur = t;
        while (cur != null) {
            if (cur instanceof RunOwnershipLostException) return true;
            cur = cur.getCause();
        }
        return false;
    }

    private boolean hasUserMessageForRun(ThreadRoom thread, String runId) {
        if (thread == null || runId == null || runId.isEmpty()) return false;
        try {
            List<ThreadMessage> messages = threadService.loadLightweightTranscript(thread.getThreadId());
            if (messages == null) return false;
            for (ThreadMessage m : messages) {
                if (m != null && m.getRole() == MessageRole.user && runId.equals(m.getRunId())) {
                    return true;
                }
            }
        } catch (Throwable t) {
            try { Tools.log("[AgentRuntime] check run user message failed: " + runId, t); } catch (Throwable ignore) { /* ignore */ }
        }
        return false;
    }

    private RunResult runInternal(RunRequest request, StreamEmitter emitter) throws Exception {
        ThreadRoom thread = request.getThread();

        // 1) 记录 run
        String runId = request.getRunId() == null || request.getRunId().isEmpty()
                ? IdGenerator.runId() : request.getRunId();
        Run run = null;
        try { run = runDao().getRun(runId); } catch (Throwable ignore) { /* create below */ }
        if (run != null && run.getStatus() == RunStatus.cancelled) {
            return completeCancelledBeforeStart(runId, thread, run);
        }
        AtomicReference<CancelReason> earlyCancelRef = new AtomicReference<>();
        AtomicReference<CancelReason> existingCancelRef = cancelFlags.putIfAbsent(runId, earlyCancelRef);
        if (existingCancelRef != null) earlyCancelRef = existingCancelRef;
        if (earlyCancelRef.get() != null) {
            return completeCancelledBeforeStart(runId, thread, run);
        }
        try {
            Run latest = runDao().getRun(runId);
            if (latest != null) {
                run = latest;
                if (latest.getStatus() == RunStatus.cancelled) {
                    return completeCancelledBeforeStart(runId, thread, latest);
                }
            }
        } catch (Throwable ignore) { /* best effort race guard */ }
        if (run == null) run = new Run();
        int initialTurns = isSameRunRecovery(request, runId) && run.getTurns() != null ? run.getTurns() : 0;
        run.setRunId(runId);
        run.setThreadId(thread.getThreadId());
        run.setUserId(request.getUser().getUserId());
        run.setTriggerType(request.getTriggerSource());
        run.setTriggerRef(request.getTriggerRef());
        run.setClientRequestId(request.getClientRequestId());
        run.setContinuationSourceRunId(request.getContinuationSourceRunId());
        run.setRequestPayloadJson(serializeRunRequest(request.withRunId(runId)));
        run.setStatus(RunStatus.running);
        run.setExitReason(null);
        run.setEndedAt(null);
        run.setTurns(initialTurns);
        run.setErrorCode(null);
        run.setErrorMessage(null);
        Date now = new Date();
        if (run.getStartedAt() == null) run.setStartedAt(now);
        run.setHeartbeatAt(now);
        run.setWorkerId(runtimeWorkerId);
        run.setClaimedAt(now);
        if (earlyCancelRef.get() != null) {
            return completeCancelledBeforeStart(runId, thread, run);
        }
        runDao().save(run);
        if (earlyCancelRef.get() != null) {
            return completeCancelledBeforeStart(runId, thread, run);
        }
        // 2) 读 max turns（用于 deadline 计算）
        int maxTurns = DEFAULT_MAX_TURNS;
        try {
            AgentProfile agent = agentProfileDao().getDefaultByOrg(request.getUser().getOrgId());
            if (agent != null && agent.getMaxTurnsPerRun() != null) maxTurns = agent.getMaxTurnsPerRun();
        } catch (Throwable ignore) { /* ignore */ }

        // 3) 构造 RunContext + EventBus + cancel 引用
        long turnTimeoutMs = llmTurnTimeoutMs();
        long deadlineAt = System.currentTimeMillis()
                + maxTurns * turnTimeoutMs + RUN_DEADLINE_HEADROOM_MS;
        InMemoryRunEventBus bus = new InMemoryRunEventBus();
        RunContext rctx = new RunContext(run.getRunId(), thread.getThreadId(),
                request.getUser().getUserId(), request.getUser().getOrgId(),
                maxTurns, deadlineAt, bus);
        CancelReason earlyCancel = earlyCancelRef.get();
        if (earlyCancel != null) rctx.cancel.compareAndSet(null, earlyCancel);
        cancelFlags.put(run.getRunId(), rctx.cancel);
        if (earlyCancelRef.get() != null || rctx.isCancelled()) {
            return completeCancelledBeforeStart(runId, thread, run);
        }
        ActiveRun activeRun = null;
        StreamEmitter directEmitter = isObservableEmitter(emitter) ? emitter : null;
        try {
            if (runStreamEventService == null) {
                throw new LobsterException("run_event.persistence_unavailable",
                        "Run stream event persistence is unavailable");
            }
            activeRun = activeRuns.get(run.getRunId());
            if (activeRun == null && rctx.isCancelled()) {
                return completeCancelledBeforeStart(runId, thread, run);
            }
            if (activeRun == null) {
                activeRun = registerActiveRun(run.getRunId(), thread.getThreadId(),
                        request.getUser().getUserId(), directEmitter);
            } else if (directEmitter != null) {
                activeRun.emitter.add(directEmitter);
            }
            if (rctx.isCancelled()) {
                return completeCancelledBeforeStart(runId, thread, run);
            }
            activeRun.workerStarted = true;
            if (rctx.isCancelled()) {
                return completeCancelledBeforeStart(runId, thread, run);
            }
            emitter = new RecordingStreamEmitter(activeRun, request.getUser().getUserId(), runStreamEventService);

            // 3.1 把 RunEvent 桥接回 StreamEmitter —— 订阅关系在 run 生命周期内有效
            final StreamEmitter emitterRef = emitter;
            final String threadIdRef = thread.getThreadId();
            AutoCloseable busSub = bus.subscribe(ev -> bridgeEventToEmitter(ev, emitterRef, threadIdRef));

            try {
                int turnBudget = turnBudgetFor(request, run.getRunId(), maxTurns, initialTurns);
                return runCore(request, thread, run, rctx, emitter, maxTurns, initialTurns, turnBudget);
            } finally {
                try { busSub.close(); } catch (Throwable ignore) { /* ignore */ }
            }
        } catch (Throwable t) {
            if (isOwnershipLost(t)) {
                try {
                    Run latest = runDao().getRun(run.getRunId());
                    if (latest == null || !isTerminalStatus(latest.getStatus())) {
                        emitter.systemHint(thread.getThreadId(), run.getRunId(), "run ownership moved; reconnecting");
                    }
                } catch (Throwable ignore) { /* ignore */ }
                throw t;
            }
            markDetachedRunFailed(run.getRunId(), request, t);
            try { emitter.error(thread.getThreadId(), run.getRunId(), "runtime.exception", safeMsg(t)); } catch (Throwable ignore) { /* ignore */ }
            emitThreadProjection(thread.getThreadId(), run.getRunId(), emitter);
            try { emitter.runEnded(thread.getThreadId(), run.getRunId(), RunExitReason.error.name()); } catch (Throwable ignore) { /* ignore */ }
            throw t;
        } finally {
            if (activeRun != null) {
                if (directEmitter != null) activeRun.emitter.remove(directEmitter);
                finishActiveRun(activeRun);
                activeRuns.remove(run.getRunId(), activeRun);
            }
            cancelFlags.remove(run.getRunId());
        }
    }

    private void finishActiveRun(ActiveRun activeRun) {
        if (activeRun == null) return;
        synchronized (activeRun) {
            try {
                activeRun.flushBuffered(runStreamEventService);
            } catch (Throwable t) {
                try { Tools.log("[AgentRuntime] flush buffered stream events failed: " + activeRun.runId, t); }
                catch (Throwable ignore) { /* ignore */ }
            }
        }
        activeRun.done.countDown();
    }

    private ActiveRun registerActiveRun(String runId, String threadId, String userId,
                                        StreamEmitter initialEmitter) {
        if (runStreamEventService == null) {
            throw new LobsterException("run_event.persistence_unavailable",
                    "Run stream event persistence is unavailable");
        }
        long nextSeq = runStreamEventService.maxSeq(runId) + 1L;
        MultiplexStreamEmitter mux = new MultiplexStreamEmitter();
        if (isObservableEmitter(initialEmitter)) mux.add(initialEmitter);
        ActiveRun candidate = new ActiveRun(runId, threadId, userId, mux, nextSeq);
        ActiveRun existing = activeRuns.putIfAbsent(runId, candidate);
        if (existing != null) {
            if (isObservableEmitter(initialEmitter)) existing.emitter.add(initialEmitter);
            return existing;
        }
        return candidate;
    }

    private boolean isObservableEmitter(StreamEmitter emitter) {
        return emitter != null && !(emitter instanceof NoopStreamEmitter);
    }

    private RunResult completeCancelledBeforeStart(String runId, ThreadRoom thread, Run run) throws Exception {
        Date now = new Date();
        try {
            runDao().finishIfStatus(RunStatus.cancelled, now, RunExitReason.cancelled,
                    runId, RunStatus.running);
        } catch (Throwable t) {
            try { Tools.log("[AgentRuntime] finish early-cancelled run failed: " + runId, t); }
            catch (Throwable ignore) { /* ignore */ }
        }
        Run latest = run;
        try {
            Run refreshed = runDao().getRun(runId);
            if (refreshed != null) latest = refreshed;
        } catch (Throwable ignore) { /* fallback */ }
        if (latest != null) clearThreadActiveRun(latest);
        ActiveRun cancelledActive = activeRuns.remove(runId);
        AtomicReference<CancelReason> cancelRef = cancelFlags.get(runId);
        CancelReason reason = cancelRef == null ? CancelReason.USER : cancelRef.get();
        if (cancelledActive != null && cancelledActive.workerStarted) {
            try {
                synchronized (cancelledActive) {
                    cancelledActive.flushBuffered(runStreamEventService);
                }
                List<StreamEvent> events = recordSyntheticCancelledTerminalEvent(runId, reason);
                for (StreamEvent ev : events) cancelledActive.emitter.emit(ev);
            } catch (Throwable t) {
                try { Tools.log("[AgentRuntime] record early-cancel terminal event failed: " + runId, t); }
                catch (Throwable ignore) { /* ignore */ }
            }
        }
        if (cancelledActive != null) finishActiveRun(cancelledActive);
        cancelFlags.remove(runId);
        return new RunResult(runId, thread.getThreadId(),
                latest == null || latest.getTurns() == null ? 0 : latest.getTurns(),
                "", RunExitReason.cancelled, null);
    }

    private RunResult runCore(RunRequest request, ThreadRoom thread, Run run,
                              RunContext rctx, StreamEmitter emitter, int maxTurns,
                              int initialTurns, int turnBudget) throws Exception {

        // 4) 用户输入入 transcript —— 对话框上传的资源 id 在此刻展开成 prelude
        String effectiveUserInput = prependAttachmentPrelude(
                request.getUserInput(), request.getAttachedResourceIds());
        String currentUserInput = effectiveUserInput;
        // 多模态：attachmentMediaIds 里有图片 → 校验是图片资源 → 持久化 mediaIds 进
        // attachmentsJson；buildTranscriptView 重新读盘转 base64 dataUrl 给 vision 模型.
        // 非图片 mediaId 静默丢弃（前端不应该混进来；混进来也不影响主流程）.
        List<String> imageMediaIds = filterImageMediaIds(request.getAttachmentMediaIds(), request.getUser().getUserId());
        String attachmentsJson = imageMediaIds.isEmpty() ? null
                : JsonUtil.toJson(Collections.singletonMap("imageMediaIds", imageMediaIds));
        if (request.shouldAppendUserInput() && !hasUserMessageForRun(thread, run.getRunId())) {
            assertRunOwner(run);
            threadService.appendMessage(thread, MessageRole.user,
                    effectiveUserInput, request.getTriggerSource(), run.getRunId(),
                    null, null, null, null, attachmentsJson);
            run.setRequestPayloadJson(serializeRunRequest(request.withRunId(run.getRunId()),
                    false, request.isResumeContinuation()));
            persistRequestPayloadOwned(run);
        } else if (request.shouldAppendUserInput()) {
            run.setRequestPayloadJson(serializeRunRequest(request.withRunId(run.getRunId()),
                    false, request.isResumeContinuation()));
            persistRequestPayloadOwned(run);
        }

        // 5) 选模型——有图片就要求 multimodal，让 ModelRouter 过滤
        ModelSelectionContext selCtx = new ModelSelectionContext();
        selCtx.setOrgId(request.getUser().getOrgId());
        selCtx.setUserId(request.getUser().getUserId());
        selCtx.setRequiresTools(true);
        selCtx.setRequiresMultimodal(!imageMediaIds.isEmpty());
        ModelRouteResult route = modelRouter.route(selCtx);
        run.setModelId(route.getPrimary().getModelId());
        persistModelOwned(run);
        ActiveRun activeRun = activeRuns.get(run.getRunId());
        if (activeRun != null) activeRun.modelId = route.getPrimary().getModelId();
        emitter.runStarted(thread.getThreadId(), run.getRunId(), route.getPrimary().getModelId());
        rctx.events.publish(new RunEvent.RunStarted(run.getRunId(), thread.getThreadId(),
                route.getPrimary().getModelId()));

        AgentLoopDetector loopDetector = new AgentLoopDetector();
        int turn = 0;
        int totalTurns = Math.max(0, initialTurns);
        String lastAssistantText = "";
        RunExitReason exitReason = RunExitReason.normal;
        String pendingRequestId = null;
        Map<String, Object> lastPlanSnapshot = null;
        boolean completedWithoutToolCalls = false;

        LlmCallRequest llmReq = new LlmCallRequest();
        llmReq.setRunId(run.getRunId());
        llmReq.setThreadId(thread.getThreadId());
        llmReq.setUserId(request.getUser().getUserId());
        llmReq.setOrgId(request.getUser().getOrgId());

        ContextAssembler.RunSnapshot contextRunSnapshot = contextAssembler.createRunSnapshot(thread);

        while (turn < turnBudget) {
            // —— 取消边界 —— //
            if (rctx.isCancelled()) { exitReason = RunExitReason.cancelled; break; }
            if (rctx.deadlineAtMs > 0 && System.currentTimeMillis() > rctx.deadlineAtMs) {
                rctx.requestCancel(CancelReason.TIMEOUT);
                exitReason = RunExitReason.cancelled;
                break;
            }

            turn++;
            totalTurns++;
            rctx.turn.set(totalTurns);
            run.setTurns(totalTurns);
            run.setHeartbeatAt(new Date());
            persistProgressOwned(run);
            rctx.events.publish(new RunEvent.TurnStarted(run.getRunId(), thread.getThreadId(), totalTurns));

            // 5.1 组装上下文
            // 预算按当前路由模型的 contextWindow 动态计算：契合 1M / 200k / 128k 模型差异，
            // 避免一刀切硬编码导致长窗口模型上下文被早早折叠 / 短窗口模型被 API 端 422.
            // ModelProfile 缺 contextWindow 时退回 LobsterConfig.defaultContextBudgetTokens 兜底.
            int budgetTokens = computeContextBudget(route);
            ContextAssembly assembly = contextAssembler.assemble(
                    thread, currentUserInput, budgetTokens, route,
                    request.isKbEnabled(), request.getKbMode(), request.getKbScopeIds(),
                    request.getUser(), contextRunSnapshot);
            // 知识库工具开关：kbEnabled=false 时把 OA 知识库工具从工具表过滤掉，
            // 模型完全看不到这两个工具就不会乱调；强制注入模式（forced）由 ContextAssembler
            // 已经把 hits 拼进 user prelude 里，依然把工具留出去，让模型可以追加查更多.
            boolean exposeExternalizedReader = hasExternalizedContentRef(assembly.getSendView());
            List<ToolExposureDecision> exposureDecisions = new ArrayList<>();
            ToolRegistry.ToolListFilter toolFilter = def -> {
                ToolExposureDecision decision = toolExposureDecision(def, request, exposeExternalizedReader);
                exposureDecisions.add(decision);
                return decision.exposed;
            };
            List<ToolSpec> toolSpecs = toolRegistry.toToolSpecs(toolFilter);
            emitToolExposureDiagnostics(emitter, thread.getThreadId(), run.getRunId(), exposureDecisions);

            // 5.2 流式调用 LLM
            rctx.events.publish(new RunEvent.LlmStarted(run.getRunId(), thread.getThreadId(),
                    route.getPrimary().getModelId()));
            LlmResponse llmResp;
            try {
                llmResp = streamOneTurn(
                        llmReq, route, assembly.getSendView(), toolSpecs,
                        thread, run.getRunId(), emitter, rctx,
                        request.getUser().getOrgId());
            } catch (RunCancelledException rce) {
                exitReason = RunExitReason.cancelled;
                rctx.events.publish(new RunEvent.RunCancelled(run.getRunId(),
                        thread.getThreadId(), rce.getReason()));
                break;
            } catch (LobsterException e) {
                exitReason = RunExitReason.error;
                run.setErrorCode(e.getCode());
                run.setErrorMessage(e.getMessage());
                try { Tools.log("[AgentRuntime] LLM turn failed: " + e.getCode(), e); } catch (Throwable ignore) { /* ignore */ }
                emitter.error(thread.getThreadId(), run.getRunId(), e.getCode(), e.getMessage());
                rctx.events.publish(new RunEvent.RunError(run.getRunId(), thread.getThreadId(),
                        e.getCode(), e.getMessage(), e));
                break;
            } catch (Throwable t) {
                exitReason = RunExitReason.error;
                run.setErrorCode("llm.exception");
                run.setErrorMessage(safeMsg(t));
                try { Tools.log("[AgentRuntime] LLM turn threw " + t.getClass().getName(), t); } catch (Throwable ignore) { /* ignore */ }
                emitter.error(thread.getThreadId(), run.getRunId(), "llm.exception", safeMsg(t));
                rctx.events.publish(new RunEvent.RunError(run.getRunId(), thread.getThreadId(),
                        "llm.exception", safeMsg(t), t));
                break;
            }

            if (rctx.isCancelled()) { exitReason = RunExitReason.cancelled; break; }
            assertRunOwner(run);

            String assistantText = llmResp.getAssistantText();
            lastAssistantText = assistantText;
            rctx.usage.addPromptTokens(llmResp.getInputTokens());
            rctx.usage.addCompletionTokens(llmResp.getOutputTokens());
            rctx.events.publish(new RunEvent.LlmCompleted(run.getRunId(), thread.getThreadId(),
                    llmResp.getFinishReason(), llmResp.getInputTokens(), llmResp.getOutputTokens()));

            // 5.3 assistant 消息落盘
            String toolCallsJson = serializeToolCalls(llmResp.getToolCalls());
            // thinking-mode 模型的 reasoning_content 必须持久化——下一轮 ContextAssembler 装回
            // LobsterMessage、adapter 再序列化到请求 body，否则 DeepSeek 等返 400.
            threadService.appendMessage(thread, MessageRole.assistant,
                    assistantText, "assistant", run.getRunId(), null, null, toolCallsJson,
                    llmResp.getReasoningContent());

            // 5.4 无 tool_calls 则自然结束
            if (llmResp.getToolCalls().isEmpty()) {
                exitReason = RunExitReason.normal;
                completedWithoutToolCalls = true;
                break;
            }

            // 5.5 循环检测
            AgentLoopDetector.LoopCheckResult lc =
                    loopDetector.recordAndCheckIdenticalLoop(llmResp.getToolCalls());
            if (lc.isTripped()) {
                exitReason = "loop_detected".equals(lc.getReason())
                        ? RunExitReason.loop_detected : RunExitReason.no_progress;
                // 先给本轮 tool_calls 补"中止"结果消息，再插入 system 提示——
                // 否则下一轮上下文里会有孤立 assistant(tool_calls)，LLM 400.
                writeAbortedToolResults(thread, run.getRunId(), llmResp.getToolCalls(),
                        Collections.emptySet(), "loop detected: " + lc.getDetail());
                threadService.appendMessage(thread, MessageRole.system,
                        "[检测到重复操作，已中止：" + lc.getDetail() + "]",
                        "runtime_guard", run.getRunId(), null, null, null);
                emitter.systemHint(thread.getThreadId(), run.getRunId(),
                        "检测到重复操作，本次执行已中止");
                rctx.events.publish(new RunEvent.TurnEnded(run.getRunId(), thread.getThreadId(),
                        totalTurns, "loop_detected"));
                break;
            }

            // 5.6 执行工具调用
            List<ToolResult> turnResults = new ArrayList<>();
            boolean pendingFound = false;
            boolean artifactProduced = false;
            boolean planChanged = false;
            String firstPendingId = null;
            // 记录本轮已落 tool_result 的 call.id，用于中途 cancel 时给未执行的 call 补中止结果
            Set<String> executedToolCallIds = new HashSet<>();
            for (ToolCall call : llmResp.getToolCalls()) {
                if (rctx.isCancelled()) break;
                assertRunOwner(run);
                emitter.toolCall(thread.getThreadId(), run.getRunId(),
                        call.getId(), call.getName(), call.getArgumentsJson());
                rctx.events.publish(new RunEvent.ToolCallStarted(run.getRunId(), thread.getThreadId(),
                        call.getId(), call.getName(), call.getArgumentsJson()));

                // V3: 工具上下文带 deadline + cancelFlag + progress sink.
                // progress sink 把 long-running 工具（code_exec 等）的阶段事件桥接到 SSE.
                // 闭包里绑 toolCallId，让前端能把 progress 挂到对应 tool-card 下而不是全局 toast.
                final StreamEmitter emitterForProgress = emitter;
                final String threadIdForProgress = thread.getThreadId();
                final String runIdForProgress = run.getRunId();
                final String toolNameForProgress = call.getName();
                final String toolCallIdForProgress = call.getId();
                com.gzzm.lobster.tool.ProgressSink progressSink =
                        (stage, message, detail) -> {
                            if (!isCurrentRunOwnerQuietly(runIdForProgress)) {
                                return;
                            }
                            try {
                                java.util.Map<String, Object> p = new java.util.LinkedHashMap<>();
                                p.put("toolCallId", toolCallIdForProgress);
                                p.put("tool", toolNameForProgress);
                                p.put("stage", stage);
                                if (detail != null && !detail.isEmpty()) {
                                    if (detail.containsKey("event")) p.putAll(detail);
                                    else p.put("detail", detail);
                                }
                                emitterForProgress.systemHint(threadIdForProgress, runIdForProgress,
                                        message, p);
                            } catch (Throwable ignore) { /* progress 不阻塞主流程 */ }
                        };
                // 把 RunRequest 上的 KB 设定塞进 ToolContext 的 attributes 旁路，
                // OaKnowledgeTools 取 kb.scopeIds 直接传给 client，无需让 LLM 在 args 里重复带.
                Map<String, Object> tcAttrs = new LinkedHashMap<>();
                tcAttrs.put("kb.enabled",  request.isKbEnabled());
                tcAttrs.put("kb.mode",     request.getKbMode());
                tcAttrs.put("kb.scopeIds", request.getKbScopeIds());
                ToolContext tc = new ToolContext(request.getUser(), thread.getThreadId(),
                        run.getRunId(), call.getId(), call.getName(),
                        rctx.deadlineAtMs, rctx.cancel, progressSink, tcAttrs);

                long toolStart = System.currentTimeMillis();
                ToolResult result;
                try {
                    result = toolExecutorDispatcher.dispatch(tc, call);
                } catch (Throwable t) {
                    result = ToolResult.error("tool dispatch failed: " + safeMsg(t));
                }
                long toolDur = System.currentTimeMillis() - toolStart;
                assertRunOwner(run);
                turnResults.add(result);
                rctx.usage.incToolCalls();

                // 进展信号：artifact / plan
                if (result.getArtifactIds() != null && !result.getArtifactIds().isEmpty()) {
                    artifactProduced = true;
                }
                if (result.getStatus() == ToolResultStatus.pending) {
                    pendingFound = true;
                    if (firstPendingId == null) firstPendingId = result.getPendingRequestId();
                    String typeName = result.getPendingType() == null
                            ? null : result.getPendingType().name();
                    String pendingTitle = result.getPendingTitle();
                    if (pendingTitle == null || pendingTitle.isEmpty()) {
                        pendingTitle = result.getMessage();
                    }
                    PendingRequestService pendingSvc = pendingRequestService();
                    PendingRequest pending = pendingSvc == null
                            ? null : pendingSvc.get(result.getPendingRequestId());
                    if (pending != null) {
                        Map<String, Object> pendingProjection = toPendingProjection(pending);
                        pendingProjection.put("toolName", call.getName());
                        emitter.pendingRequest(thread.getThreadId(), run.getRunId(), pendingProjection);
                    } else {
                        emitter.pendingRequest(thread.getThreadId(), run.getRunId(),
                                result.getPendingRequestId(), typeName,
                                pendingTitle, call.getName());
                    }
                    rctx.events.publish(new RunEvent.PendingCreated(run.getRunId(), thread.getThreadId(),
                            result.getPendingRequestId(), typeName, pendingTitle, call.getName()));
                }
                // 把执行耗时贴进 tool message JSON：历史回放 + admin trace 都能读到，
                // LLM 也能感知 "上一个工具花了多久"（决定是否值得重试 / 改 prompt）.
                String content = injectToolDurationIntoContent(result.toToolMessageContent(), toolDur);
                threadService.appendMessage(thread, MessageRole.tool,
                        content, "tool_result", run.getRunId(),
                        call.getId(), call.getName(), null);
                executedToolCallIds.add(call.getId());
                // KB 工具的 hits 走 data 旁路，让前端渲染引用气泡（不污染 LLM 看到的 summary）.
                Map<String, Object> resultDataForUi = isOaKnowledgeTool(call.getName())
                        ? result.getData()
                        : null;
                emitter.toolResult(thread.getThreadId(), run.getRunId(),
                        call.getId(), call.getName(),
                        result.getStatus().name(), result.getMessage(),
                        resultDataForUi, toolDur);
                emitArtifactResourceUpserts(thread, run.getRunId(), result, emitter);
                rctx.events.publish(new RunEvent.ToolCallCompleted(run.getRunId(), thread.getThreadId(),
                        call.getId(), call.getName(), result.getStatus().name(),
                        result.getMessage(), toolDur));

                if ("update_plan".equals(call.getName()) && result.getStatus() == ToolResultStatus.ok) {
                    Map<String, Object> snap = result.getData();
                    if (hasPlanStateChange(lastPlanSnapshot, snap)) planChanged = true;
                    lastPlanSnapshot = snap;
                    emitter.planUpdate(thread.getThreadId(), run.getRunId(), snap);
                }
            }

            if (rctx.isCancelled()) {
                // 中途 cancel 可能在 tool 循环里 break 出来，遗留未执行 tool_call —— 补中止结果
                CancelReason cr = rctx.cancelReason();
                writeAbortedToolResults(thread, run.getRunId(), llmResp.getToolCalls(),
                        executedToolCallIds,
                        "run cancelled: " + (cr == null ? "USER" : cr.name()));
                exitReason = RunExitReason.cancelled;
                break;
            }

            // 5.7 Claim 一致性
            ClaimConsistencyChecker.CheckResult claim = claimChecker.check(assistantText, turnResults);
            if (!claim.isPass()) {
                threadService.appendMessage(thread, MessageRole.system,
                        "[提示] assistant 出现了完成性声明但缺少工具证据：" + claim.getReason(),
                        "runtime_guard", run.getRunId(), null, null, null);
            }

            // 5.8 进展检测 —— V2 ProgressSignal
            AgentLoopDetector.ProgressSignal signal =
                    new AgentLoopDetector.ProgressSignal(planChanged, artifactProduced, pendingFound);
            AgentLoopDetector.LoopCheckResult prog = loopDetector.recordProgress(signal);
            if (prog.isTripped()) {
                exitReason = RunExitReason.no_progress;
                threadService.appendMessage(thread, MessageRole.system,
                        "[多轮无实质进展，已中止：" + prog.getDetail() + "]",
                        "runtime_guard", run.getRunId(), null, null, null);
                emitter.systemHint(thread.getThreadId(), run.getRunId(),
                        "多轮无进展，本次执行已中止");
                rctx.events.publish(new RunEvent.TurnEnded(run.getRunId(), thread.getThreadId(),
                        totalTurns, "no_progress"));
                break;
            }

            rctx.events.publish(new RunEvent.TurnEnded(run.getRunId(), thread.getThreadId(),
                    totalTurns, pendingFound ? "pending" : null));

            // 5.9 pending 则本轮结束
            if (pendingFound) {
                exitReason = RunExitReason.pending;
                pendingRequestId = firstPendingId;
                break;
            }
        }

        if (exitReason == RunExitReason.normal && !completedWithoutToolCalls && turn >= turnBudget) {
            assertRunOwner(run);
            PendingRequest p = createMaxTurnsPending(thread, request, run, totalTurns, maxTurns);
            pendingRequestId = p.getRequestId();
            suspendForMaxTurnsContinuation(thread, request, run, rctx, p, totalTurns, emitter);
            emitThreadProjection(thread.getThreadId(), run.getRunId(), emitter);
            emitter.runEnded(thread.getThreadId(), run.getRunId(), RunExitReason.pending.name());
            rctx.events.publish(new RunEvent.RunEnded(run.getRunId(), thread.getThreadId(),
                    RunExitReason.pending.name()));
            auditService.record(request.getUser(), thread.getThreadId(), run.getRunId(),
                    "run.suspended", "pending_request", pendingRequestId, "max_turns", null);
            return new RunResult(run.getRunId(), thread.getThreadId(), totalTurns,
                    lastAssistantText, RunExitReason.pending, pendingRequestId);
        }

        Date terminalAt = new Date();
        switch (exitReason) {
            case error:
                run.setStatus(RunStatus.error);
                run.setEndedAt(terminalAt);
                break;
            case cancelled:
                run.setStatus(RunStatus.cancelled);
                run.setEndedAt(terminalAt);
                break;
            case pending:
                run.setStatus(RunStatus.waiting_user);
                run.setEndedAt(null);
                break;
            default:
                run.setStatus(RunStatus.ended);
                run.setEndedAt(terminalAt);
                break;
        }
        run.setExitReason(exitReason);
        if (run.getStatus() == RunStatus.waiting_user) {
            persistSuspendedOwned(run);
        } else {
            persistTerminalOwned(run, run.getStatus(), exitReason, terminalAt);
        }
        if (run.getStatus() != RunStatus.waiting_user) clearThreadActiveRun(run);

        // 首轮正常结束时自动生成标题——放在 runEnded 前发，保证 SSE 流还没关，
        // 前端收到 thread_renamed 能同步更新 ThreadList.
        if (exitReason == RunExitReason.normal) {
            maybeAutoGenerateTitle(thread, request, effectiveUserInput, lastAssistantText, emitter);
        }

        emitThreadProjection(thread.getThreadId(), run.getRunId(), emitter);
        emitter.runEnded(thread.getThreadId(), run.getRunId(), exitReason.name());
        rctx.events.publish(new RunEvent.RunEnded(run.getRunId(), thread.getThreadId(),
                exitReason.name()));
        auditService.record(request.getUser(), thread.getThreadId(), run.getRunId(),
                "run.end", "thread", thread.getThreadId(), exitReason.name(), null);

        return new RunResult(run.getRunId(), thread.getThreadId(), totalTurns,
                lastAssistantText, exitReason, pendingRequestId);
    }

    private void emitArtifactResourceUpserts(ThreadRoom thread, String runId, ToolResult result,
                                             StreamEmitter emitter) {
        if (thread == null || result == null || emitter == null) return;
        List<String> ids = result.getArtifactIds();
        if (ids == null || ids.isEmpty()) return;
        for (String artifactId : ids) {
            if (artifactId == null || artifactId.isEmpty()) continue;
            try {
                Artifact artifact = artifactService().get(artifactId);
                if (artifact == null || !thread.getThreadId().equals(artifact.getThreadId())) continue;
                emitter.artifactUpsert(thread.getThreadId(), runId, toArtifactProjection(artifact));
                WorkspaceResource resource = findResourceForArtifact(thread.getThreadId(), artifactId);
                if (resource != null) {
                    emitter.resourceUpsert(thread.getThreadId(), runId, toResourceProjection(resource));
                }
            } catch (Throwable t) {
                try { Tools.log("[AgentRuntime] emit artifact/resource projection failed: " + artifactId, t); }
                catch (Throwable ignore) { /* projection must not block run */ }
            }
        }
    }

    private WorkspaceResource findResourceForArtifact(String threadId, String artifactId) throws Exception {
        WorkspaceService ws = workspaceService();
        if (ws == null || threadId == null || artifactId == null) return null;
        List<WorkspaceResource> list = ws.listResources(threadId, null, 0, 500);
        if (list == null) return null;
        for (WorkspaceResource r : list) {
            if (r != null && artifactId.equals(r.getSourceId())) return r;
        }
        return null;
    }

    private void emitThreadProjection(String threadId, String runId, StreamEmitter emitter) {
        if (threadId == null || emitter == null) return;
        try {
            emitter.emit(threadProjectionEvent(threadId, runId));
        } catch (Throwable t) {
            try { Tools.log("[AgentRuntime] emit thread projection failed: " + threadId, t); }
            catch (Throwable ignore) { /* projection must not block terminal event */ }
        }
    }

    private StreamEvent threadProjectionEvent(String threadId, String runId) throws Exception {
        Map<String, Object> payload = new LinkedHashMap<>();
        payload.put("threadId", threadId);
        payload.put("runId", runId);
        payload.put("projection", buildThreadProjection(threadId));
        return StreamEvent.of(StreamEventType.thread_projection, payload);
    }

    private Map<String, Object> buildThreadProjection(String threadId) throws Exception {
        Map<String, Object> projection = new LinkedHashMap<>();
        projection.put("messages", messageProjection(threadId));
        projection.put("resources", resourceProjection(threadId));
        projection.put("artifacts", artifactProjection(threadId));
        projection.put("pendingRequests", pendingProjection(threadId));
        return projection;
    }

    private List<Map<String, Object>> messageProjection(String threadId) throws Exception {
        List<ThreadMessage> rows = threadService.loadLightweightTranscript(threadId);
        List<Map<String, Object>> out = new ArrayList<>();
        if (rows == null) return out;
        for (ThreadMessage m : rows) out.add(toMessageProjection(m));
        return out;
    }

    private List<Map<String, Object>> artifactProjection(String threadId) throws Exception {
        ArtifactService artifacts = artifactService();
        List<Artifact> rows = artifacts == null
                ? Collections.<Artifact>emptyList()
                : artifacts.listByThread(threadId);
        List<Map<String, Object>> out = new ArrayList<>();
        if (rows == null) return out;
        for (Artifact a : rows) out.add(toArtifactProjection(a));
        return out;
    }

    private List<Map<String, Object>> resourceProjection(String threadId) throws Exception {
        WorkspaceService ws = workspaceService();
        List<WorkspaceResource> rows = ws == null
                ? Collections.<WorkspaceResource>emptyList()
                : ws.listResources(threadId, null, 0, 50);
        List<Map<String, Object>> out = new ArrayList<>();
        if (rows == null) return out;
        for (WorkspaceResource r : rows) out.add(toResourceProjection(r));
        return out;
    }

    private List<Map<String, Object>> pendingProjection(String threadId) throws Exception {
        PendingRequestService pendingSvc = pendingRequestService();
        List<PendingRequest> rows = pendingSvc == null
                ? Collections.<PendingRequest>emptyList()
                : pendingSvc.listOpenByThread(threadId);
        List<Map<String, Object>> out = new ArrayList<>();
        if (rows == null) return out;
        for (PendingRequest p : rows) out.add(toPendingProjection(p));
        return out;
    }

    private Map<String, Object> toMessageProjection(ThreadMessage m) {
        Map<String, Object> row = new LinkedHashMap<>();
        row.put("messageId", m.getMessageId());
        row.put("role", m.getRole() == null ? null : m.getRole().name());
        row.put("content", m.getContent());
        row.put("runId", m.getRunId());
        row.put("toolCallId", m.getToolCallId());
        row.put("toolName", m.getToolName());
        if (m.getReasoningContent() != null && !m.getReasoningContent().isEmpty()) {
            row.put("reasoningContent", m.getReasoningContent());
        }
        row.put("attachmentsJson", m.getAttachmentsJson());
        row.put("seq", m.getSeq());
        row.put("createTime", m.getCreateTime());
        return row;
    }

    private Map<String, Object> toArtifactProjection(Artifact a) {
        Map<String, Object> row = new LinkedHashMap<>();
        row.put("artifactId", a.getArtifactId());
        row.put("title", a.getTitle());
        row.put("artifactType", a.getArtifactType() == null ? null : a.getArtifactType().name());
        row.put("format", a.getFormat());
        row.put("sourceRunId", a.getSourceRunId());
        row.put("version", a.getVersion());
        row.put("status", a.getStatus() == null ? "active" : a.getStatus().name());
        row.put("createTime", a.getCreateTime());
        row.put("updateTime", a.getUpdateTime());
        return row;
    }

    private Map<String, Object> toResourceProjection(WorkspaceResource r) {
        Map<String, Object> row = new LinkedHashMap<>();
        row.put("resourceId", r.getResourceId());
        row.put("sourceId", r.getSourceId());
        row.put("displayName", r.getDisplayName());
        row.put("sourceType", r.getSourceType() == null ? null : r.getSourceType().name());
        String oaFileType = oaFileType(r);
        if (oaFileType != null) {
            row.put("oaFileType", oaFileType);
            row.put("oaFileTypeLabel", oaFileTypeLabel(r, oaFileType));
        }
        row.put("artifactType", r.getArtifactType());
        row.put("mimeType", r.getMimeType());
        row.put("status", r.getStatus() == null ? null : r.getStatus().name());
        if (r.getMetadataJson() != null) row.put("metadataJson", r.getMetadataJson());
        row.putAll(ResourceMetadata.artifactLifecycle(r));
        if (r.getCreateTime() != null) row.put("createTime", r.getCreateTime());
        return row;
    }

    private String oaFileType(WorkspaceResource r) {
        if (r == null || r.getSourceType() != ResourceSourceType.OA_FILE) return null;
        String type = ResourceMetadata.getOaFileType(r);
        if (type != null && !type.isEmpty()) return type;
        String origin = r.getOrigin();
        if (origin != null && origin.toLowerCase(Locale.ROOT).contains("mail")) return "MAIL";
        return null;
    }

    private String oaFileTypeLabel(WorkspaceResource r, String type) {
        String label = ResourceMetadata.getOaFileTypeLabel(r);
        if (label != null && !label.isEmpty()) return label;
        if ("MAIL".equalsIgnoreCase(type)) return "邮件";
        if ("DOCUMENT".equalsIgnoreCase(type)) return "公文";
        return type;
    }

    private Map<String, Object> toPendingProjection(PendingRequest p) {
        Map<String, Object> row = new LinkedHashMap<>();
        row.put("requestId", p.getRequestId());
        row.put("type", p.getType() == null ? null : p.getType().name());
        row.put("title", p.getTitle());
        row.put("prompt", p.getPrompt());
        row.put("allowedActionsJson", p.getAllowedActionsJson());
        row.put("sourceRunId", p.getSourceRunId());
        row.put("runId", p.getSourceRunId());
        row.put("toolCallId", p.getToolCallId());
        if (p.getPayloadJson() != null && !p.getPayloadJson().isEmpty()) {
            try {
                row.put("payload", JsonUtil.fromJsonToMap(p.getPayloadJson()));
            } catch (Throwable ignore) {
                row.put("payload", p.getPayloadJson());
            }
        }
        row.put("status", p.getStatus() == null ? null : p.getStatus().name());
        row.put("createTime", p.getCreateTime());
        return row;
    }

    private PendingRequest createMaxTurnsPending(ThreadRoom thread, RunRequest request,
                                                 Run run, int turns, int maxTurns) {
        try {
            Map<String, Object> payload = new LinkedHashMap<>();
            payload.put("kind", "max_turns_continue");
            payload.put("sourceRunId", run.getRunId());
            payload.put("turns", turns);
            payload.put("maxTurns", maxTurns);
            payload.put("triggerSource", request.getTriggerSource());
            PendingRequestService pendingSvc = pendingRequestService();
            if (pendingSvc == null) {
                throw new LobsterException("pending.service_unavailable",
                        "Pending request service is unavailable");
            }
            return pendingSvc.create(
                    thread,
                    request.getUser(),
                    PendingRequestType.confirm_action,
                    "已达到最大执行轮数，是否继续？",
                    "本次任务已达到 maxTurnsPerRun 限制。确认后将从当前会话状态继续执行，并重置新的轮次预算；取消则停止继续执行。",
                    payload,
                    Arrays.asList("CONFIRM", "CANCEL"),
                    null,
                    run.getRunId(),
                    "runtime.max_turns");
        } catch (Throwable t) {
            try { Tools.log("[AgentRuntime] create max-turns pending failed", t); } catch (Throwable ignore) { /* ignore */ }
            if (t instanceof LobsterException) throw (LobsterException) t;
            throw new LobsterException("run.max_turns_pending_failed",
                    "Failed to create max-turns continuation pending request", t);
        }
    }

    private void suspendForMaxTurnsContinuation(ThreadRoom thread, RunRequest request,
                                                Run run, RunContext rctx, PendingRequest pending,
                                                int totalTurns, StreamEmitter emitter) throws Exception {
        if (pending == null) return;
        run.setStatus(RunStatus.waiting_user);
        run.setExitReason(RunExitReason.pending);
        run.setTurns(totalTurns);
        run.setHeartbeatAt(new Date());
        persistSuspendedOwned(run);
        auditService.record(request.getUser(), thread.getThreadId(), run.getRunId(),
                "run.waiting_user", "pending_request", pending.getRequestId(), "max_turns", null);
        Map<String, Object> pendingProjection = toPendingProjection(pending);
        pendingProjection.put("toolName", "runtime.max_turns");
        emitter.pendingRequest(thread.getThreadId(), run.getRunId(), pendingProjection);
        rctx.events.publish(new RunEvent.PendingCreated(run.getRunId(), thread.getThreadId(),
                pending.getRequestId(), pending.getType().name(), pending.getTitle(), "runtime.max_turns"));
        emitter.systemHint(thread.getThreadId(), run.getRunId(), "run_waiting_user");
    }

    /**
     * 把 {@link RunEvent} 桥接到现有 {@link StreamEmitter} / Bridge run events to SSE.
     *
     * <p>大多数事件已由主循环直接 emitter.xxx 显式推送（兼容旧行为）；
     * 本桥接器只补齐 {@code RunCancelled} / {@code RunError} 这些"过去没 emit" 的事件类型，
     * 避免把 SSE 变成双份事件流。
     */
    private static void bridgeEventToEmitter(RunEvent ev, StreamEmitter emitter, String threadId) {
        if (ev instanceof RunEvent.RunCancelled) {
            RunEvent.RunCancelled rc = (RunEvent.RunCancelled) ev;
            emitter.systemHint(threadId, rc.runId,
                    "run cancelled: " + (rc.reason == null ? "USER" : rc.reason.name()));
        }
        // 其余事件暂不桥接 —— 主循环已 emit 过一次。EventBus 只给进程内订阅者（未来的 audit / metric / hook）用。
    }

    /** OA 知识库类工具名 —— 与 KB 总开关联动. */
    private static boolean isOaKnowledgeTool(String name) {
        return "oa_search_knowledge".equals(name) || "oa_get_knowledge_detail".equals(name);
    }

    /** 外置内容读取工具只有在 send-view 里真的出现后端 ref 标记时才暴露给模型. */
    private static boolean isExternalizedContentTool(String name) {
        return "read_externalized_content".equals(name);
    }

    private static boolean hasExternalizedContentRef(List<LobsterMessage> messages) {
        if (messages == null || messages.isEmpty()) return false;
        for (LobsterMessage m : messages) {
            String c = m == null ? null : m.getContent();
            if (c == null || c.isEmpty()) continue;
            if (c.contains("ref=message/") || c.contains("ref=tool-result/")
                    || c.contains("full body externalized")) {
                return true;
            }
        }
        return false;
    }

    /** 管理台禁用的工具不暴露给模型；执行前仍由 ToolPermissionChecker 再兜底。 */
    private static final class ToolExposureDecision {
        String toolName;
        ToolCategory category;
        String serverId;
        String schemaHash;
        ToolRiskLevel riskLevel;
        Boolean requireConfirm;
        boolean exposed;
        String reason;

        Map<String, Object> toMap() {
            Map<String, Object> out = new LinkedHashMap<>();
            out.put("toolName", toolName);
            out.put("category", category == null ? null : category.name());
            out.put("serverId", serverId);
            out.put("schemaHash", schemaHash);
            out.put("riskLevel", riskLevel == null ? null : riskLevel.name());
            out.put("requireConfirm", requireConfirm);
            out.put("exposed", Boolean.valueOf(exposed));
            out.put("reason", reason);
            return out;
        }
    }

    private ToolExposureDecision toolExposureDecision(BuiltinToolDefinition def,
                                                      RunRequest request,
                                                      boolean exposeExternalizedReader) {
        ToolExposureDecision d = new ToolExposureDecision();
        if (def == null || def.getToolName() == null) {
            d.exposed = false;
            d.reason = "invalid_tool";
            return d;
        }
        d.toolName = def.getToolName();
        d.category = def.getCategory();
        d.riskLevel = def.getRiskLevel();
        d.requireConfirm = Boolean.valueOf(def.isRequireConfirm());
        if (!request.isKbEnabled() && isOaKnowledgeTool(def.getToolName())) {
            d.exposed = false;
            d.reason = "kb_disabled";
            return d;
        }
        if (!exposeExternalizedReader && isExternalizedContentTool(def.getToolName())) {
            d.exposed = false;
            d.reason = "no_externalized_content_ref";
            return d;
        }
        boolean mcpTool = def.getCategory() == ToolCategory.MCP;
        try {
            ToolDefinitionConfigDao dao = Tools.getBean(ToolDefinitionConfigDao.class);
            if (dao == null) {
                d.exposed = !mcpTool;
                d.reason = mcpTool ? "governance_unavailable" : "governance_default_allow";
                return d;
            }
            ToolDefinitionConfig row = dao.getByName(def.getToolName());
            if (row == null) {
                d.exposed = !mcpTool;
                d.reason = mcpTool ? "governance_missing" : "governance_default_allow";
                return d;
            }
            d.serverId = row.getMcpServerId();
            d.riskLevel = row.getRiskLevel();
            d.requireConfirm = row.getRequireConfirm();
            if (Boolean.FALSE.equals(row.getEnabled())) {
                d.exposed = false;
                d.reason = "tool_disabled";
                return d;
            }
            if (mcpTool) {
                McpRuntimeToolCache.Snapshot snapshot = mcpRuntimeToolCache().get(def.getToolName());
                if (snapshot == null) {
                    d.exposed = false;
                    d.reason = "mcp_snapshot_unavailable";
                    return d;
                }
                McpServerConfig server = snapshot.getServer();
                McpToolCache cache = snapshot.getTool();
                d.serverId = server == null ? d.serverId : server.getServerId();
                d.schemaHash = cache == null ? null : cache.getSchemaHash();
                d.riskLevel = cache == null || cache.getRiskLevel() == null ? d.riskLevel : cache.getRiskLevel();
                d.requireConfirm = cache == null ? d.requireConfirm : cache.getRequireConfirm();
                if (server == null || Boolean.FALSE.equals(server.getEnabled())) {
                    d.exposed = false;
                    d.reason = "mcp_server_disabled";
                    return d;
                }
                if (cache == null || Boolean.FALSE.equals(cache.getEnabled())) {
                    d.exposed = false;
                    d.reason = "mcp_tool_disabled";
                    return d;
                }
            }
            d.exposed = true;
            d.reason = "allowed";
            return d;
        } catch (Throwable t) {
            try { Tools.log("[AgentRuntime] tool exposure decision failed", t); }
            catch (Throwable ignore) { /* ignore */ }
            d.exposed = !mcpTool;
            d.reason = mcpTool ? "exposure_check_failed" : "governance_default_allow";
            return d;
        }
    }

    private void emitToolExposureDiagnostics(StreamEmitter emitter, String threadId, String runId,
                                             List<ToolExposureDecision> decisions) {
        if (emitter == null || decisions == null || decisions.isEmpty()) return;
        List<Map<String, Object>> exposed = new ArrayList<>();
        List<Map<String, Object>> filtered = new ArrayList<>();
        for (ToolExposureDecision d : decisions) {
            if (d == null || d.category != ToolCategory.MCP) continue;
            if (d.exposed) exposed.add(d.toMap()); else filtered.add(d.toMap());
        }
        if (exposed.isEmpty() && filtered.isEmpty()) return;
        Map<String, Object> detail = new LinkedHashMap<>();
        detail.put("event", "mcp_discovery");
        detail.put("exposedTools", exposed);
        detail.put("filteredTools", filtered);
        List<Map<String, Object>> policy = new ArrayList<>();
        policy.addAll(exposed);
        policy.addAll(filtered);
        detail.put("toolPolicyDecision", policy);
        emitter.systemHint(threadId, runId, "mcp_discovery", detail);
    }

    private McpRuntimeToolCache mcpRuntimeToolCache() {
        try {
            McpRuntimeToolCache c = Tools.getBean(McpRuntimeToolCache.class);
            if (c != null) return c;
        } catch (Throwable ignore) { /* fallback */ }
        return mcpRuntimeToolCache == null ? new McpRuntimeToolCache() : mcpRuntimeToolCache;
    }

    /**
     * 把执行耗时贴进 tool message 的 JSON content / Inject durationMs into tool message JSON.
     *
     * <p>原 content 由 {@link com.gzzm.lobster.tool.ToolResult#toToolMessageContent()} 输出，
     * 形如 {@code {"status":"ok","message":"...","data":{...}}}。这里解析后追加 {@code durationMs}
     * 字段再 reserialize；解析失败（极少数被自定义工具乱写过的）就退回原样，不阻塞主流程.
     *
     * <p>durationMs &lt; 0 视为未知，不写入.
     */
    @SuppressWarnings("unchecked")
    private static String injectToolDurationIntoContent(String json, long durationMs) {
        if (durationMs < 0 || json == null || json.isEmpty()) return json;
        try {
            Object parsed = JsonUtil.fromJson(json, Map.class);
            if (!(parsed instanceof Map)) return json;
            Map<String, Object> m = (Map<String, Object>) parsed;
            m.put("durationMs", durationMs);
            return JsonUtil.toJson(m);
        } catch (Throwable ignore) {
            return json;
        }
    }

    /**
     * plan 快照状态迁移检测 / Detect plan item state transitions across snapshots.
     *
     * <p>比较前后两次 plan 的 items 列表 —— 只要有任意一个 item 的 status 变化，
     * 或新增 / 删除了 item，就算"强进展"。空 → 首次下发算强进展。
     */
    @SuppressWarnings("unchecked")
    private static boolean hasPlanStateChange(Map<String, Object> prev, Map<String, Object> curr) {
        if (curr == null) return false;
        if (prev == null) return true;     // 首次下发 plan 算进展
        Object prevItems = prev.get("items");
        Object currItems = curr.get("items");
        if (!(currItems instanceof List)) return false;
        List<Object> cl = (List<Object>) currItems;
        List<Object> pl = prevItems instanceof List ? (List<Object>) prevItems : Collections.emptyList();
        if (cl.size() != pl.size()) return true;
        for (int i = 0; i < cl.size(); i++) {
            Map<String, Object> c = (Map<String, Object>) cl.get(i);
            Map<String, Object> p = (Map<String, Object>) pl.get(i);
            if (c == null || p == null) { if (c != p) return true; else continue; }
            Object cs = c.get("status");
            Object ps = p.get("status");
            if (cs == null ? ps != null : !cs.equals(ps)) return true;
            Object cid = c.get("itemId");
            Object pid = p.get("itemId");
            if (cid == null ? pid != null : !cid.equals(pid)) return true;
        }
        return false;
    }

    /**
     * 流式跑完一轮 LLM 调用 / Stream one turn; convert hard cancel to {@link RunCancelledException}.
     *
     * <p>chatStream 在**后台线程**跑，主循环线程每 200ms 轮询 cancel + 超时。handler 通过
     * {@link StreamingResponseHandler#isCancelled()} 把取消信号下发到 adapter，adapter
     * 收到后主动 {@code conn.disconnect()}。cancel 原因为 USER/TIMEOUT/BUDGET 时
     * 抛 {@link RunCancelledException} —— LlmRuntime 据此不 fallback。
     */
    private LlmResponse streamOneTurn(
            LlmCallRequest llmReq, ModelRouteResult route,
            List<LobsterMessage> messages, List<ToolSpec> tools,
            ThreadRoom thread, String runId,
            StreamEmitter emitter, RunContext rctx, String orgId) {

        final CountDownLatch done = new CountDownLatch(1);
        final AtomicReference<LlmResponse> responseRef = new AtomicReference<>();
        final AtomicReference<Throwable> errorRef = new AtomicReference<>();
        final StringBuilder bufferedText = new StringBuilder();
        final AtomicLong lastStreamActivityMs = new AtomicLong(System.currentTimeMillis());

        StreamingResponseHandler handler = new StreamingResponseHandler() {
            @Override public void onDelta(String delta) {
                if (rctx.isCancelled() || delta == null || delta.isEmpty()) return;
                lastStreamActivityMs.set(System.currentTimeMillis());
                bufferedText.append(delta);
                String safe = sanitizer.sanitize(delta);
                emitter.assistantText(thread.getThreadId(), runId, safe);
                rctx.events.publish(new RunEvent.LlmDelta(runId, thread.getThreadId(), safe));
            }
            @Override public void onReasoningDelta(String delta) {
                // 故意不做两件事：
                //   1) 不过 InternalInfoSanitizer —— 它会把 "I should call read_file" 改写成
                //      "I should call 【内部能力】"，把模型的推理逻辑砸成残文；
                //      工具名在下方 process card 里也是明示的，没有"思考里见到工具名"的额外泄漏.
                //   2) 不入 bufferedText —— 那个 buffer 的语义是"用户取消时兜底当作 final answer
                //      塞回 LlmResponse"，思考片段不属于 final answer，混进去会让取消后历史记录里
                //      assistant 消息的内容变成思考原文.
                if (rctx.isCancelled() || delta == null || delta.isEmpty()) return;
                lastStreamActivityMs.set(System.currentTimeMillis());
                emitter.assistantThinking(thread.getThreadId(), runId, delta);
            }
            @Override public void onToolCall(ToolCall toolCall) {
                lastStreamActivityMs.set(System.currentTimeMillis());
                // adapter 可能在此时就解析到 tool_call；主循环会统一再 emit 一次。
            }
            @Override public void onWriteFileContentDelta(String toolCallId, int toolIndex, String contentDelta) {
                if (rctx.isCancelled() || contentDelta == null || contentDelta.isEmpty()) return;
                lastStreamActivityMs.set(System.currentTimeMillis());
                emitter.writeFileContentDelta(thread.getThreadId(), runId, toolCallId, toolIndex, contentDelta);
            }
            @Override public void onComplete(LlmResponse response) {
                responseRef.set(response);
                done.countDown();
            }
            @Override public void onError(Throwable error) {
                errorRef.set(error);
                done.countDown();
            }
            @Override public boolean isCancelled() {
                return rctx.isCancelled();
            }
        };

        final long turnTimeoutMs = llmTurnTimeoutMs();
        final java.util.concurrent.Future<?> streamFuture = llmStreamPool.submit(() -> {
            try {
                llmRuntime.chatStream(llmReq, route, messages, tools, handler);
            } catch (Throwable t) {
                errorRef.compareAndSet(null, t);
                done.countDown();
            }
        });

        try {
            while (!done.await(200, TimeUnit.MILLISECONDS)) {
                if (rctx.isCancelled()) {
                    streamFuture.cancel(true);
                    CancelReason reason = rctx.cancelReason();
                    if (reason != null && reason != CancelReason.UPSTREAM) {
                        throw new RunCancelledException(reason);
                    }
                    return new LlmResponse(bufferedText.toString(),
                            Collections.<ToolCall>emptyList(),
                            0, 0, "cancelled",
                            route.getPrimary().getModelId(), "");
                }
                long now = System.currentTimeMillis();
                if (turnTimeoutMs > 0 && now - lastStreamActivityMs.get() > turnTimeoutMs) {
                    streamFuture.cancel(true);
                    throw new LobsterException("llm.timeout",
                            "LLM turn idle timeout after " + turnTimeoutMs + "ms without stream activity");
                }
                if (rctx.deadlineAtMs > 0 && now > rctx.deadlineAtMs) {
                    rctx.requestCancel(CancelReason.TIMEOUT);
                    streamFuture.cancel(true);
                    throw new RunCancelledException(CancelReason.TIMEOUT, "run deadline exceeded");
                }
            }
        } catch (InterruptedException ie) {
            streamFuture.cancel(true);
            Thread.currentThread().interrupt();
            throw new LobsterException("runtime.interrupted", "interrupted");
        }

        if (rctx.isCancelled()) {
            CancelReason reason = rctx.cancelReason();
            if (reason != null && reason != CancelReason.UPSTREAM) {
                throw new RunCancelledException(reason);
            }
            return new LlmResponse(bufferedText.toString(),
                    Collections.<ToolCall>emptyList(),
                    0, 0, "cancelled",
                    route.getPrimary().getModelId(), "");
        }
        if (errorRef.get() != null) {
            Throwable t = errorRef.get();
            if (t instanceof RunCancelledException) throw (RunCancelledException) t;
            if (t instanceof LobsterException) throw (LobsterException) t;
            throw new LobsterException("llm.stream", "stream failed: " + safeMsg(t), t);
        }
        LlmResponse response = responseRef.get();
        if (response == null) {
            return new LlmResponse(bufferedText.toString(),
                    Collections.<ToolCall>emptyList(),
                    0, 0, "timeout",
                    route.getPrimary().getModelId(), "");
        }

        if (contentFilter != null
                && response.getAssistantText() != null
                && !response.getAssistantText().isEmpty()) {
            String safe = sanitizer.sanitize(response.getAssistantText());
            ContentFilter.FilterResult fr = contentFilter.check(safe, orgId);
            if (fr != null && fr.getVerdict() == ContentFilter.Verdict.BLOCK) {
                emitter.systemHint(thread.getThreadId(), runId,
                        "输出被内容过滤策略拦截：" + fr.getReason());
            }
        }
        return response;
    }

    private static long llmTurnTimeoutMs() {
        long configured = LobsterConfig.getLlmTurnTimeoutMs();
        return configured > 0 ? configured : DEFAULT_LLM_TURN_TIMEOUT_MS;
    }

    /**
     * 把本轮附带资源的索引拼到 userInput 前面作为 prelude。
     *
     * <p>对每个 resourceId 读 {@link WorkspaceResource#getMetadataJson()}，优先用
     * 已有 summary（LLM 之前回写过），其次用 outlineSummary.topSections。
     * 找不到资源就跳过，不让本地错误挡住用户的消息。
     */
    private String prependAttachmentPrelude(String userInput, List<String> resourceIds) {
        if (resourceIds == null || resourceIds.isEmpty()) return userInput;
        StringBuilder sb = new StringBuilder();
        int rendered = 0;
        for (String rid : resourceIds) {
            if (rid == null || rid.isEmpty()) continue;
            WorkspaceResource r;
            try {
                WorkspaceService ws = workspaceService();
                r = ws == null ? null : ws.getResource(rid);
            }
            catch (Throwable t) { continue; }
            if (r == null) continue;
            String name = r.getDisplayName() == null ? rid : r.getDisplayName();
            String kind = ResourceMetadata.getKind(r);
            String summary = ResourceMetadata.getSummary(r);
            List<String> topSections = ResourceMetadata.getTopSections(r);

            sb.append("- ").append(name)
              .append(" (").append(rid);
            if (kind != null && !kind.isEmpty()) sb.append(", ").append(kind);
            sb.append(")");
            if (summary != null && !summary.isEmpty()) {
                sb.append("：").append(summary);
            } else if (topSections != null && !topSections.isEmpty()) {
                sb.append("：");
                int take = Math.min(5, topSections.size());
                for (int i = 0; i < take; i++) {
                    if (i > 0) sb.append(" / ");
                    sb.append(topSections.get(i));
                }
                if (topSections.size() > take) sb.append(" …");
            } else {
                sb.append("：[尚无摘要，可用 read_file / save_resource_summary 补全]");
            }
            sb.append("\n");
            rendered++;
        }
        if (rendered == 0) return userInput;

        StringBuilder out = new StringBuilder();
        out.append("<attachments>\n").append(sb).append("</attachments>\n\n");
        if (userInput != null) out.append(userInput);
        return out.toString();
    }

    /** 默认标题（见 ThreadService.createThread）——用于判断是否该自动重命名. */
    private static final String DEFAULT_THREAD_TITLE = "新对话";

    /**
     * 首轮正常结束时，若 thread 标题还是默认的"新对话"，自动基于首轮对话生成一个短标题。
     *
     * <p>失败全部静默——标题生成属锦上添花，不应影响主流程。
     *
     * <p>路由：用 {@code taskType="summary"} 请 ModelRouter 单独走 summary tier，
     * 配过 summary 模型（qwen-turbo 之类）就用它，没配自然降级到 fast/standard，
     * 不再硬绑主对话用的那条 route——不能让一个 12 字标题烧 premium 模型的 token.
     */
    private void maybeAutoGenerateTitle(ThreadRoom thread, RunRequest request,
                                        String firstUserInput, String firstAssistantText,
                                        StreamEmitter emitter) {
        try {
            if (thread == null) return;
            if (!DEFAULT_THREAD_TITLE.equals(thread.getTitle())) return;

            // 标题只看用户首条消息——firstAssistantText 参数保留是给将来可能需要的混合策略，
            // 当前生产规范明确"仅基于 user message"，避免标题被助手回答带偏.
            String userTrim = trimForTitle(firstUserInput, 400);
            if (userTrim.isEmpty()) return;

            // 标题生成专属路由：summary tier 优先，failover 走 fast 走 standard.
            // 路由失败（ModelRouter 兜底也找不到任何模型）就静默放弃标题，不影响主对话.
            ModelRouteResult titleRoute;
            try {
                ModelSelectionContext titleCtx = new ModelSelectionContext();
                titleCtx.setOrgId(request.getUser().getOrgId());
                titleCtx.setUserId(request.getUser().getUserId());
                titleCtx.setTaskType("summary");
                titleCtx.setRequiresFastResponse(true);
                titleRoute = modelRouter.route(titleCtx);
            } catch (Throwable t) {
                try { Tools.log("[AgentRuntime] title route failed, skip auto title: " + safeMsg(t)); } catch (Throwable ignore) { /* ignore */ }
                return;
            }

            // 标题生成 prompt（生产规范）—— 与产品文档约定的 system / user 双段格式严格一致：
            //   - 仅基于用户首条消息（不喂助手回答，避免标题被回答内容带偏 / 多花 token）
            //   - 中文 4-15 字 / 英文 2-8 词，与用户输入语言一致
            //   - 陈述式短语，禁标点 / 禁前缀（"关于"、"如何"等）/ 禁引号 / 禁解释
            // 三引号围绕用户消息让边界清晰，避免某些模型把 prompt 末尾的标题当输入续写.
            List<LobsterMessage> msgs = new ArrayList<>();
            msgs.add(LobsterMessage.system(
                    "你是一个对话标题生成助手。根据用户的首条消息，生成一个简洁、准确的对话标题。\n\n" +
                    "要求：\n" +
                    "1. 长度：4-15 个字（中文）或 2-8 个词（英文），与用户输入语言保持一致\n" +
                    "2. 内容：概括对话的核心主题或意图，能让用户日后快速识别这个对话\n" +
                    "3. 风格：陈述式短语，不使用标点符号（问号、句号、感叹号、引号等一律不要）\n" +
                    "4. 禁止：不要加\"关于\"、\"如何\"等冗余前缀；不要输出解释、引号或多个候选\n" +
                    "5. 格式：直接输出标题文本，不要任何前后缀\n\n" +
                    "仅输出标题本身，不要输出其他任何内容。"));
            msgs.add(LobsterMessage.user(
                    "用户的首条消息：\n\"\"\"\n" + userTrim + "\n\"\"\"\n\n请生成标题："));

            LlmCallRequest llmReq = new LlmCallRequest();
            llmReq.setRunId("title-" + thread.getThreadId());
            llmReq.setThreadId(thread.getThreadId());
            llmReq.setUserId(request.getUser().getUserId());
            llmReq.setOrgId(request.getUser().getOrgId());

            LlmResponse resp = llmRuntime.chat(llmReq, titleRoute, msgs, Collections.emptyList());
            String title = resp == null ? null : resp.getAssistantText();
            title = sanitizeGeneratedTitle(title);
            if (title == null || title.isEmpty()) return;

            threadService.rename(request.getUser(), thread.getThreadId(), title);
            thread.setTitle(title);
            emitter.threadRenamed(thread.getThreadId(), title);
        } catch (Throwable t) {
            try { Tools.log("[AgentRuntime] auto title generation failed: " + thread.getThreadId(), t); } catch (Throwable ignore) { /* ignore */ }
        }
    }

    /**
     * 把 LLM 返回的标题清洗到符合产品规范：
     *   1) 取首行（模型偶尔会附解释 / 候选）
     *   2) 剥两端引号 / 括号 / 书名号
     *   3) 剥头尾标点（同时清理内部句末类标点的零星残留——prompt 已禁标点，这是兜底）
     *   4) 截到 30 字以内（中文 15 字以内是预期，30 是硬上限）
     */
    private static String sanitizeGeneratedTitle(String raw) {
        if (raw == null) return null;
        String s = raw.trim();
        if (s.isEmpty()) return null;
        // 取首行——偶尔模型会输出带解释的多行
        int nl = s.indexOf('\n');
        if (nl > 0) s = s.substring(0, nl).trim();
        // 剥两端各种引号/方括号/书名号
        while (s.length() > 0 && "\"'“”‘’《》〈〉「」『』【】()（）[]".indexOf(s.charAt(0)) >= 0) {
            s = s.substring(1).trim();
        }
        while (s.length() > 0 && "\"'“”‘’《》〈〉「」『』】】)）]".indexOf(s.charAt(s.length() - 1)) >= 0) {
            s = s.substring(0, s.length() - 1).trim();
        }
        // 剥头尾句末/分隔标点
        String trailingPunct = "。.!！?？,，;；:：、";
        while (s.length() > 0 && trailingPunct.indexOf(s.charAt(s.length() - 1)) >= 0) {
            s = s.substring(0, s.length() - 1).trim();
        }
        while (s.length() > 0 && trailingPunct.indexOf(s.charAt(0)) >= 0) {
            s = s.substring(1).trim();
        }
        if (s.length() > 30) s = s.substring(0, 30);
        return s.isEmpty() ? null : s;
    }

    private static String trimForTitle(String s, int cap) {
        if (s == null) return "";
        String t = s.trim();
        if (t.length() > cap) t = t.substring(0, cap);
        return t;
    }

    /**
     * 为"已生成 tool_calls 但未执行"的调用补写合成 tool 结果消息，避免历史里出现孤立
     * assistant(tool_calls) 导致下一轮 /chat/completions 返回 HTTP 400
     * ("An assistant message with 'tool_calls' must be followed by tool messages...").
     *
     * <p>触发场景：loop detector 中止、tool 循环中途被 cancel、tool 执行器整体失败等。
     * 消息体用结构化 JSON 便于前端/LLM 自行解释，不污染正常 tool_result 语义。
     */
    private void writeAbortedToolResults(ThreadRoom thread, String runId,
                                         List<ToolCall> calls, Set<String> executedIds,
                                         String abortReason) {
        if (calls == null || calls.isEmpty()) return;
        for (ToolCall c : calls) {
            if (c == null || c.getId() == null) continue;
            if (executedIds != null && executedIds.contains(c.getId())) continue;
            Map<String, Object> body = new LinkedHashMap<>();
            body.put("status", "aborted");
            body.put("message", abortReason == null ? "tool call not executed" : abortReason);
            String content = JsonUtil.toJson(body);
            try {
                threadService.appendMessage(thread, MessageRole.tool,
                        content, "tool_result", runId,
                        c.getId(), c.getName(), null);
            } catch (Throwable t) {
                try { Tools.log("[AgentRuntime] write aborted tool_result failed: " + c.getId(), t); } catch (Throwable ignore) { /* ignore */ }
            }
        }
    }

    private String serializeToolCalls(List<ToolCall> calls) {
        if (calls == null || calls.isEmpty()) return null;
        List<Map<String, Object>> list = new ArrayList<>();
        for (ToolCall c : calls) {
            Map<String, Object> m = new LinkedHashMap<>();
            m.put("id", c.getId());
            m.put("name", c.getName());
            m.put("arguments", c.getArgumentsJson());
            list.add(m);
        }
        return JsonUtil.toJson(list);
    }

    /**
     * 取消一个正在运行的 run / Cancel a running run with {@link CancelReason#USER}.
     *
     * <p>保持向后兼容：旧接口默认语义是用户主动取消。
     */
    public void cancel(String runId) throws Exception {
        cancel(runId, CancelReason.USER);
    }

    /**
     * V2 新接口：带原因的取消 / Cancel with a {@link CancelReason}.
     *
     * <p>置 cancel flag + 写 DB。主循环在下一个 LLM/工具边界看到标记后立刻退出。
     * 原因会随异常传到 LlmRuntime，非 UPSTREAM 的原因不会触发 fallback。
     */
    public void cancel(String runId, CancelReason reason) throws Exception {
        if (reason == null) reason = CancelReason.USER;
        AtomicReference<CancelReason> ref = cancelFlags.computeIfAbsent(runId,
                k -> new AtomicReference<CancelReason>());
        ref.compareAndSet(null, reason);
        ActiveRun active = activeRuns.get(runId);
        if (active != null) {
            synchronized (active) {
                active.notifyAll();
            }
        }
        boolean persistedCancel = false;
        try {
            Date now = new Date();
            int n = runDao().finishIfStatus(RunStatus.cancelled, now, RunExitReason.cancelled,
                    runId, RunStatus.running);
            n += runDao().finishIfStatus(RunStatus.cancelled, now, RunExitReason.cancelled,
                    runId, RunStatus.waiting_user);
            persistedCancel = n > 0;
            Run run = runDao().getRun(runId);
            if (persistedCancel) {
                if (run != null) {
                    run.setCancelReason(reason.name());
                    if (run.getCancelRequestedAt() == null) run.setCancelRequestedAt(now);
                    runDao().save(run);
                }
                cancelOpenPendingForRun(runId);
                clearThreadActiveRun(run);
            }
        } catch (Throwable t) {
            try { Tools.log("[AgentRuntime] cancel DAO failed", t); } catch (Throwable ignore) { /* ignore */ }
            if (t instanceof LobsterException) throw (LobsterException) t;
            throw new LobsterException("run.cancel_failed", "Failed to persist run cancellation", t);
        }
        if (persistedCancel && active != null) {
            try {
                List<StreamEvent> events = recordSyntheticCancelledTerminalEvent(runId, reason, active);
                for (StreamEvent ev : events) active.emitter.emit(ev);
            } catch (Throwable t) {
                try { Tools.log("[AgentRuntime] record cancel terminal event failed: " + runId, t); }
                catch (Throwable ignore) { /* ignore */ }
                active.closed = true;
            } finally {
                finishActiveRun(active);
                activeRuns.remove(runId, active);
            }
        } else if (active == null && persistedCancel) {
            try {
                recordSyntheticCancelledTerminalEvent(runId, reason);
            } catch (Throwable t) {
                try { Tools.log("[AgentRuntime] record detached cancel terminal event failed: " + runId, t); }
                catch (Throwable ignore) { /* ignore */ }
            } finally {
                cancelFlags.remove(runId, ref);
            }
        } else if (active == null) {
            cancelFlags.remove(runId, ref);
        }
        killSandboxForRun(runId);
    }

    private void killSandboxForRun(String runId) {
        try {
            com.gzzm.lobster.sandbox.SandboxService ss = Tools.getBean(com.gzzm.lobster.sandbox.SandboxService.class);
            if (ss != null) ss.killByRun(runId);
        } catch (Throwable ignore) { /* sandbox optional */ }
    }

    private void cancelOpenPendingForRun(String runId) {
        if (runId == null || runId.isEmpty()) return;
        try {
            PendingRequestService svc = pendingRequestService();
            if (svc != null) svc.cancelOpenBySourceRun(runId);
        } catch (Throwable t) {
            try { Tools.log("[AgentRuntime] cancel pending by source run failed: " + runId, t); }
            catch (Throwable ignore) { /* ignore */ }
        }
    }

    /**
     * Attach a new SSE client to an already running in-process run.
     *
     * <p>This blocks until the run completes so the servlet can keep the SSE response open.
     * Browser/page disconnects are isolated to that one client and do not cancel the run.
     */
    public boolean streamActiveRun(String userId, String threadId, String runId, long afterSeq, StreamEmitter emitter) throws Exception {
        ActiveRun active = activeRuns.get(runId);
        if (active == null) return false;
        if (userId == null || !userId.equals(active.userId)
                || threadId == null || !threadId.equals(active.threadId)) {
            throw new LobsterException("runtime.forbidden", "run not visible");
        }
        if (runStreamEventService == null) {
            throw new LobsterException("run_event.persistence_unavailable",
                    "Run stream event persistence is unavailable");
        }
        boolean attached = false;
        List<StreamEvent> bufferedBeforeReplay;
        List<StreamEvent> recentBeforeReplay;
        synchronized (active) {
            bufferedBeforeReplay = active.bufferedAfter(afterSeq);
            recentBeforeReplay = active.recentAfter(afterSeq);
        }
        List<StreamEvent> persistedReplay = runStreamEventService.listAfter(runId, afterSeq);
        synchronized (active) {
            active.emitter.add(emitter);
            attached = true;
            try {
                List<StreamEvent> replay = mergeReplayEvents(persistedReplay,
                        bufferedBeforeReplay, recentBeforeReplay,
                        active.bufferedAfter(afterSeq), active.recentAfter(afterSeq),
                        afterSeq);
                for (StreamEvent ev : replay) active.emitter.emitTo(emitter, ev);
                Map<String, Object> reconnected = new LinkedHashMap<>();
                reconnected.put("threadId", active.threadId);
                reconnected.put("runId", active.runId);
                reconnected.put("message", "stream_reconnected");
                active.emitter.emitTo(emitter, StreamEvent.of(StreamEventType.system_hint, reconnected));
            } catch (Throwable t) {
                active.emitter.remove(emitter);
                attached = false;
                throw t;
            }
        }
        try {
            while (true) {
                if (active.done.await(1L, TimeUnit.SECONDS)) return true;
                if (emitter != null && !emitter.heartbeat()) return true;
            }
        } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new LobsterException("runtime.interrupted", "interrupted");
        } finally {
            if (attached) active.emitter.remove(emitter);
        }
    }

    private static List<StreamEvent> mergeReplayEvents(List<StreamEvent> persisted,
                                                       List<StreamEvent> bufferedBefore,
                                                       List<StreamEvent> recentBefore,
                                                       List<StreamEvent> bufferedAfter,
                                                       List<StreamEvent> recentAfter,
                                                       long afterSeq) {
        TreeMap<Long, StreamEvent> bySeq = new TreeMap<>();
        addReplayEvents(bySeq, persisted, afterSeq);
        addReplayEvents(bySeq, bufferedBefore, afterSeq);
        addReplayEvents(bySeq, recentBefore, afterSeq);
        addReplayEvents(bySeq, bufferedAfter, afterSeq);
        addReplayEvents(bySeq, recentAfter, afterSeq);
        return new ArrayList<>(bySeq.values());
    }

    private static void addReplayEvents(TreeMap<Long, StreamEvent> out,
                                        List<StreamEvent> events,
                                        long afterSeq) {
        if (events == null || events.isEmpty()) return;
        for (StreamEvent event : events) {
            long seq = streamEventSeq(event);
            if (seq > afterSeq) out.put(seq, event);
        }
    }

    private static long streamEventSeq(StreamEvent event) {
        if (event == null || event.getPayload() == null) return 0L;
        Object raw = event.getPayload().get("eventSeq");
        if (raw instanceof Number) return ((Number) raw).longValue();
        if (raw == null) return 0L;
        try { return Long.parseLong(String.valueOf(raw)); }
        catch (Throwable ignore) { return 0L; }
    }

    public boolean streamActiveRun(String userId, String runId, long afterSeq, StreamEmitter emitter) throws Exception {
        ActiveRun active = activeRuns.get(runId);
        return active != null && streamActiveRun(userId, active.threadId, runId, afterSeq, emitter);
    }

    public boolean streamActiveRun(String userId, String runId, StreamEmitter emitter) throws Exception {
        return streamActiveRun(userId, runId, 0L, emitter);
    }

    public boolean isRunActive(String runId) {
        return runId != null && activeRuns.containsKey(runId);
    }

    public RunLeaseSnapshot runLease(String runId) {
        ActiveRun active = runId == null ? null : activeRuns.get(runId);
        if (active == null) return new RunLeaseSnapshot(false, 0L, RUN_LEASE_STALE_MS);
        return new RunLeaseSnapshot(true, active.heartbeatAtMs, RUN_LEASE_STALE_MS);
    }

    public Run reconcileRunLease(Run run) {
        if (run == null || run.getStatus() != RunStatus.running) return run;
        ActiveRun active = activeRuns.get(run.getRunId());
        if (active != null) return run;
        long heartbeatAt = 0L;
        if (run.getHeartbeatAt() != null) heartbeatAt = run.getHeartbeatAt().getTime();
        if (heartbeatAt <= 0L && run.getStartedAt() != null) heartbeatAt = run.getStartedAt().getTime();
        if (heartbeatAt <= 0L) return run;
        long age = System.currentTimeMillis() - heartbeatAt;
        if (age < RUN_LEASE_STALE_MS) return run;
        if (recoverPersistedRun(run, new Date(heartbeatAt + 1L))) {
            try {
                Run recovered = runDao().getRun(run.getRunId());
                return recovered == null ? run : recovered;
            } catch (Throwable ignore) {
                return run;
            }
        }
        try {
            Run latest = runDao().getRun(run.getRunId());
            if (latest != null && latest.getStatus() == RunStatus.running) {
                boolean claimedByThisRecovery = runtimeWorkerId.equals(latest.getWorkerId());
                boolean advancedByAnotherWorker = latest.getHeartbeatAt() != null
                        && latest.getHeartbeatAt().getTime() > heartbeatAt;
                if (!claimedByThisRecovery && advancedByAnotherWorker) return latest;
                return markRunWorkerLost(latest);
            }
            return latest == null ? run : latest;
        } catch (Throwable t) {
            try { Tools.log("[AgentRuntime] reconcile stale run failed: " + run.getRunId(), t); } catch (Throwable ignore) { /* ignore */ }
            return run;
        }
    }

    private void recoverStaleRunnableRuns() {
        try {
            Date cutoff = new Date(System.currentTimeMillis() - RUN_LEASE_STALE_MS);
            List<Run> stale = runDao().listStaleRunnable(RunStatus.running, cutoff, 0, 10);
            if (stale == null || stale.isEmpty()) return;
            for (Run run : stale) {
                if (run == null || activeRuns.containsKey(run.getRunId())) continue;
                if (!recoverPersistedRun(run, cutoff)) {
                    try {
                        Run latest = runDao().getRun(run.getRunId());
                        if (latest != null && latest.getStatus() == RunStatus.running
                                && runtimeWorkerId.equals(latest.getWorkerId())) {
                            markRunWorkerLost(latest);
                        }
                    } catch (Throwable t) {
                        try { Tools.log("[AgentRuntime] mark unrecoverable run failed: " + run.getRunId(), t); }
                        catch (Throwable ignore) { /* ignore */ }
                    }
                }
            }
        } catch (Throwable t) {
            try { Tools.log("[AgentRuntime] recover stale runnable runs failed", t); } catch (Throwable ignore) { /* ignore */ }
        }
    }

    private boolean recoverPersistedRun(Run run, Date staleBefore) {
        if (run == null || run.getRunId() == null || run.getRunId().isEmpty()) return false;
        if (activeRuns.containsKey(run.getRunId())) return true;
        if (run.getRequestPayloadJson() == null || run.getRequestPayloadJson().isEmpty()) return false;
        try {
            Date now = new Date();
            int claimed = runDao().claimStaleRunnable(runtimeWorkerId, now, run.getRunId(),
                    RunStatus.running, staleBefore == null ? now : staleBefore);
            if (claimed <= 0) return false;
            Run claimedRun = runDao().getRun(run.getRunId());
            RunRequest request = restoreRunRequest(claimedRun == null ? run : claimedRun);
            if (request == null) return false;
            try { Tools.log("[AgentRuntime] recovering durable run: " + run.getRunId()); } catch (Throwable ignore) { /* ignore */ }
            return submitRunWorker(run.getRunId(), request.withRunId(run.getRunId()));
        } catch (Throwable t) {
            try { Tools.log("[AgentRuntime] recover durable run failed: " + run.getRunId(), t); } catch (Throwable ignore) { /* ignore */ }
            return false;
        }
    }

    private Run markRunWorkerLost(Run run) throws Exception {
        if (run == null) return null;
        run.setStatus(RunStatus.error);
        run.setExitReason(RunExitReason.error);
        run.setEndedAt(new Date());
        run.setHeartbeatAt(run.getEndedAt());
        run.setErrorCode("runtime.worker_lost");
        run.setErrorMessage("Run worker heartbeat expired; backend execution is no longer active");
        runDao().save(run);
        clearThreadActiveRun(run);
        recordSyntheticTerminalEvents(run);
        return run;
    }

    private void recordSyntheticTerminalEvents(Run run) {
        if (run == null || runStreamEventService == null) return;
        try {
            long seq = runStreamEventService.maxSeq(run.getRunId()) + 1L;
            Map<String, Object> err = new LinkedHashMap<>();
            err.put("threadId", run.getThreadId());
            err.put("runId", run.getRunId());
            err.put("code", run.getErrorCode());
            err.put("message", run.getErrorMessage());
            runStreamEventService.record(run.getUserId(), run.getThreadId(), run.getRunId(),
                    seq, StreamEvent.of(StreamEventType.error, err));

            long endedSeq = seq + 1L;
            try {
                StreamEvent projectionEvent = threadProjectionEvent(run.getThreadId(), run.getRunId());
                runStreamEventService.record(run.getUserId(), run.getThreadId(), run.getRunId(),
                        endedSeq, projectionEvent);
                endedSeq++;
            } catch (Throwable projectionError) {
                try { Tools.log("[AgentRuntime] record stale terminal projection failed: " + run.getRunId(), projectionError); }
                catch (Throwable ignore) { /* keep terminal event */ }
            }

            Map<String, Object> ended = new LinkedHashMap<>();
            ended.put("threadId", run.getThreadId());
            ended.put("runId", run.getRunId());
            ended.put("exitReason", RunExitReason.error.name());
            runStreamEventService.record(run.getUserId(), run.getThreadId(), run.getRunId(),
                    endedSeq, StreamEvent.of(StreamEventType.run_ended, ended));
        } catch (Throwable t) {
            try { Tools.log("[AgentRuntime] record stale terminal events failed: " + run.getRunId(), t); } catch (Throwable ignore) { /* ignore */ }
        }
    }

    /**
     * Ensure reconnect observers see a terminal boundary even when older runs
     * ended before terminal stream projection existed or a worker failed after
     * persisting transcript but before emitting {@code run_ended}.
     */
    public List<StreamEvent> recordTerminalObservationEvents(Run run) {
        List<StreamEvent> events = new ArrayList<>();
        if (run == null || run.getRunId() == null || run.getRunId().isEmpty()) return events;
        if (run.getStatus() == RunStatus.running || run.getStatus() == RunStatus.waiting_user) return events;
        if (runStreamEventService == null) {
            throw new LobsterException("run_event.persistence_unavailable",
                    "Run stream event persistence is unavailable");
        }
        long seq = runStreamEventService.maxSeq(run.getRunId()) + 1L;
        try {
            StreamEvent projection = runStreamEventService.record(run.getUserId(), run.getThreadId(),
                    run.getRunId(), seq, threadProjectionEvent(run.getThreadId(), run.getRunId()));
            if (projection != null) events.add(projection);
            seq++;
        } catch (Throwable t) {
            try { Tools.log("[AgentRuntime] record terminal fallback projection failed: " + run.getRunId(), t); }
            catch (Throwable ignore) { /* keep terminal event */ }
        }
        Map<String, Object> ended = new LinkedHashMap<>();
        ended.put("threadId", run.getThreadId());
        ended.put("runId", run.getRunId());
        ended.put("exitReason", terminalExitReason(run));
        StreamEvent endedEvent = runStreamEventService.record(run.getUserId(), run.getThreadId(),
                run.getRunId(), seq, StreamEvent.of(StreamEventType.run_ended, ended));
        if (endedEvent != null) events.add(endedEvent);
        return events;
    }

    private String terminalExitReason(Run run) {
        if (run == null) return RunExitReason.normal.name();
        if (run.getExitReason() != null) return run.getExitReason().name();
        if (run.getStatus() == RunStatus.cancelled) return RunExitReason.cancelled.name();
        if (run.getStatus() == RunStatus.error) return RunExitReason.error.name();
        return RunExitReason.normal.name();
    }

    private List<StreamEvent> recordSyntheticCancelledTerminalEvent(String runId, CancelReason reason) {
        return recordSyntheticCancelledTerminalEvent(runId, reason, 0L);
    }

    private List<StreamEvent> recordSyntheticCancelledTerminalEvent(String runId, CancelReason reason, ActiveRun active) {
        if (active == null) return recordSyntheticCancelledTerminalEvent(runId, reason);
        synchronized (active) {
            active.flushBuffered(runStreamEventService);
            active.closed = true;
            long seq = active.nextEventSeq.getAndAdd(3L);
            return recordSyntheticCancelledTerminalEvent(runId, reason, seq);
        }
    }

    private List<StreamEvent> recordSyntheticCancelledTerminalEvent(String runId, CancelReason reason, long startSeq) {
        List<StreamEvent> events = new ArrayList<>();
        if (runId == null || runId.isEmpty()) return events;
        if (runStreamEventService == null) {
            throw new LobsterException("run_event.persistence_unavailable",
                    "Run stream event persistence is unavailable");
        }
        try {
            Run run = runDao().getRun(runId);
            if (run == null) return events;
            long seq = startSeq > 0L ? startSeq : runStreamEventService.maxSeq(runId) + 1L;
            Map<String, Object> hint = new LinkedHashMap<>();
            hint.put("threadId", run.getThreadId());
            hint.put("runId", runId);
            hint.put("message", "run cancelled: " + (reason == null ? "USER" : reason.name()));
            StreamEvent hintEvent = runStreamEventService.record(run.getUserId(), run.getThreadId(), runId,
                    seq, StreamEvent.of(StreamEventType.system_hint, hint));
            if (hintEvent != null) events.add(hintEvent);

            long endedSeq = seq + 1L;
            try {
                StreamEvent projectionEvent = runStreamEventService.record(run.getUserId(), run.getThreadId(), runId,
                        endedSeq, threadProjectionEvent(run.getThreadId(), runId));
                if (projectionEvent != null) events.add(projectionEvent);
                endedSeq++;
            } catch (Throwable projectionError) {
                try { Tools.log("[AgentRuntime] record cancelled terminal projection failed: " + runId, projectionError); }
                catch (Throwable ignore) { /* keep terminal event */ }
                endedSeq = startSeq > 0L ? seq + 2L : seq + 1L;
            }

            Map<String, Object> ended = new LinkedHashMap<>();
            ended.put("threadId", run.getThreadId());
            ended.put("runId", runId);
            ended.put("exitReason", RunExitReason.cancelled.name());
            StreamEvent endedEvent = runStreamEventService.record(run.getUserId(), run.getThreadId(), runId,
                    endedSeq, StreamEvent.of(StreamEventType.run_ended, ended));
            if (endedEvent != null) events.add(endedEvent);
            return events;
        } catch (LobsterException e) {
            throw e;
        } catch (Throwable t) {
            try { Tools.log("[AgentRuntime] record cancelled terminal event failed: " + runId, t); } catch (Throwable ignore) { /* ignore */ }
            throw new LobsterException("run_event.cancel_terminal_failed",
                    "Failed to persist cancellation terminal event", t);
        }
    }

    private void heartbeatActiveRuns() {
        if (activeRuns.isEmpty()) return;
        Date now = new Date();
        long nowMs = now.getTime();
        for (ActiveRun active : activeRuns.values()) {
            if (active == null) continue;
            try {
                int updated = runDao().heartbeatOwned(now, active.runId, RunStatus.running, runtimeWorkerId);
                updated += runDao().heartbeatOwned(now, active.runId, RunStatus.waiting_user, runtimeWorkerId);
                if (updated > 0) {
                    active.heartbeatAtMs = nowMs;
                } else if (active.workerStarted && activeRuns.remove(active.runId, active)) {
                    Run latest = null;
                    try { latest = runDao().getRun(active.runId); } catch (Throwable ignore) { /* fallback */ }
                    if (latest != null && isTerminalStatus(latest.getStatus())) {
                        finishActiveRun(active);
                    } else {
                        AtomicReference<CancelReason> ref = cancelFlags.get(active.runId);
                        if (ref != null) ref.compareAndSet(null, CancelReason.TIMEOUT);
                        finishActiveRun(active);
                    }
                }
            } catch (Throwable t) {
                try { Tools.log("[AgentRuntime] heartbeat run failed: " + active.runId, t); } catch (Throwable ignore) { /* ignore */ }
            }
        }
    }

    /** 查询某个 run 是否已被标记取消。 */
    public boolean isCancelled(String runId) {
        AtomicReference<CancelReason> ref = cancelFlags.get(runId);
        return ref != null && ref.get() != null;
    }

    /** V2：查询取消原因；未取消或未知 runId 返回 null。 */
    public CancelReason cancelReasonOf(String runId) {
        AtomicReference<CancelReason> ref = cancelFlags.get(runId);
        return ref == null ? null : ref.get();
    }

    private String safeMsg(Throwable t) {
        if (t == null) return "unknown";
        String m = t.getMessage();
        return m == null ? t.getClass().getSimpleName() : m;
    }

    private String truncate(String s, int maxLen) {
        if (s == null || maxLen <= 0 || s.length() <= maxLen) return s;
        return s.substring(0, maxLen);
    }

    /**
     * 过滤出真正是图片的 mediaId（前端有可能误把非图片资源当 attachmentMediaIds 发过来）.
     * 仅对 USER_UPLOAD + mimeType=image/* + 归属当前用户的资源放行.
     */
    private List<String> filterImageMediaIds(List<String> mediaIds, String userId) {
        if (mediaIds == null || mediaIds.isEmpty() || userId == null) return Collections.emptyList();
        List<String> out = new ArrayList<>();
        for (String rid : mediaIds) {
            if (rid == null || rid.isEmpty()) continue;
            try {
                WorkspaceService ws = workspaceService();
                WorkspaceResource r = ws == null ? null : ws.getResource(rid);
                if (r == null) continue;
                if (!userId.equals(r.getUserId())) continue;
                if (r.getSourceType() != ResourceSourceType.USER_UPLOAD) continue;
                String mime = r.getMimeType();
                if (mime == null || !mime.startsWith("image/")) continue;
                out.add(rid);
            } catch (Throwable t) {
                try { Tools.log("[AgentRuntime] filterImageMediaIds: skip " + rid + " due to " + t.getMessage()); }
                catch (Throwable ignore) { /* ignore */ }
            }
        }
        return out;
    }

    /**
     * 按路由后的模型 ModelProfile.contextWindow 动态算上下文预算.
     *
     * <p>语义：尊重模型物理上限，不让 fallback 把小窗口模型撑爆.
     * <ul>
     *   <li>未填 contextWindow → 走配置兜底（小心：理论上可能撑爆，但配置缺失就只能信兜底）</li>
     *   <li>contextWindow - reserve - safety &gt; 0 → 用动态值，<b>不</b>再用 max(fallback, dynamic)
     *       —— 避免 8k contextWindow 模型被 fallback=16k 顶上去导致 API 422</li>
     *   <li>动态算下来 ≤ 0（profile 配错，输出预留比窗口还大）→ 退兜底但限到 contextWindow 以内</li>
     * </ul>
     */
    private int computeContextBudget(ModelRouteResult route) {
        int fallback = LobsterConfig.getDefaultContextBudgetTokens();
        if (route == null || route.getPrimary() == null) return fallback;
        Integer ctx = route.getPrimary().getContextWindow();
        if (ctx == null || ctx <= 0) return fallback;
        Integer maxOut = route.getPrimary().getMaxOutputTokens();
        int outputReserve = (maxOut == null || maxOut <= 0)
                ? DEFAULT_OUTPUT_RESERVE_TOKENS : maxOut;
        int dynamic = ctx - outputReserve - CONTEXT_SAFETY_MARGIN_TOKENS;
        if (dynamic > 0) return dynamic;
        // profile 配置异常（输出预留 + 余量 > contextWindow）—— 走兜底但夹到物理上限以内
        return Math.min(fallback, ctx);
    }
}
