fix: persist codexapp streaming state
This commit is contained in:
209
server.js
209
server.js
@@ -532,6 +532,8 @@ const pendingCrossConversationReplies = new Map();
|
||||
const pendingCodexAppUserInputs = new Map();
|
||||
let codexAppClient = null;
|
||||
let codexAppClientSignature = '';
|
||||
const CODEX_APP_STATE_FILE = 'codexapp-state.json';
|
||||
const CODEX_APP_STATE_FLUSH_DELAY_MS = 250;
|
||||
|
||||
// Track which session each ws is viewing: ws -> sessionId
|
||||
const wsSessionMap = new Map();
|
||||
@@ -2173,6 +2175,196 @@ function cleanRunDir(sessionId) {
|
||||
} catch {}
|
||||
}
|
||||
|
||||
function codexAppStatePath(sessionId) {
|
||||
return path.join(runDir(sessionId), CODEX_APP_STATE_FILE);
|
||||
}
|
||||
|
||||
function mapToPairs(value) {
|
||||
if (value instanceof Map) {
|
||||
return Array.from(value.entries()).map(([key, item]) => [String(key), String(item || '')]);
|
||||
}
|
||||
if (value && typeof value === 'object') {
|
||||
return Object.entries(value).map(([key, item]) => [String(key), String(item || '')]);
|
||||
}
|
||||
return [];
|
||||
}
|
||||
|
||||
function codexAppTurnKey(sessionId, state = {}) {
|
||||
const threadId = state.threadId || '';
|
||||
const turnId = state.turnId || '';
|
||||
const startedAt = state.startedAt || '';
|
||||
return `${sanitizeId(sessionId)}:${threadId}:${turnId}:${startedAt}`;
|
||||
}
|
||||
|
||||
function serializeCodexAppEntry(sessionId, entry) {
|
||||
return {
|
||||
version: 1,
|
||||
agent: 'codexapp',
|
||||
sessionId: sanitizeId(sessionId),
|
||||
cwd: entry.cwd || null,
|
||||
threadId: entry.threadId || null,
|
||||
turnId: entry.turnId || null,
|
||||
turnStatus: entry.turnStatus || 'running',
|
||||
startedAt: entry.startedAt || new Date().toISOString(),
|
||||
updatedAt: new Date().toISOString(),
|
||||
clientUserMessageId: entry.clientUserMessageId || null,
|
||||
fullText: entry.fullText || '',
|
||||
toolCalls: Array.isArray(entry.toolCalls) ? entry.toolCalls : [],
|
||||
toolOutputDeltas: mapToPairs(entry.toolOutputDeltas),
|
||||
agentMessageItems: mapToPairs(entry.agentMessageItems),
|
||||
lastUsage: entry.lastUsage || null,
|
||||
lastError: entry.lastError || null,
|
||||
userAborted: !!entry.userAborted,
|
||||
};
|
||||
}
|
||||
|
||||
function writeCodexAppTurnState(sessionId, entry) {
|
||||
if (!entry || entry.codexAppStateCleaned) return;
|
||||
try {
|
||||
const dir = runDir(sessionId);
|
||||
fs.mkdirSync(dir, { recursive: true });
|
||||
const statePath = codexAppStatePath(sessionId);
|
||||
const tmpPath = `${statePath}.${process.pid}.${Date.now()}.tmp`;
|
||||
fs.writeFileSync(tmpPath, JSON.stringify(serializeCodexAppEntry(sessionId, entry)));
|
||||
fs.renameSync(tmpPath, statePath);
|
||||
} catch (err) {
|
||||
plog('WARN', 'codex_app_state_persist_failed', {
|
||||
sessionId: String(sessionId || '').slice(0, 8),
|
||||
error: err?.message || String(err || ''),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function persistCodexAppTurnState(sessionId, entry, options = {}) {
|
||||
if (!sessionId || !entry || entry.codexAppStateCleaned) return;
|
||||
const immediate = !!options.immediate;
|
||||
if (immediate && entry.codexAppStateTimer) {
|
||||
clearTimeout(entry.codexAppStateTimer);
|
||||
entry.codexAppStateTimer = null;
|
||||
}
|
||||
if (immediate) {
|
||||
entry.codexAppStateDirty = false;
|
||||
writeCodexAppTurnState(sessionId, entry);
|
||||
return;
|
||||
}
|
||||
entry.codexAppStateDirty = true;
|
||||
if (entry.codexAppStateTimer) return;
|
||||
entry.codexAppStateTimer = setTimeout(() => {
|
||||
entry.codexAppStateTimer = null;
|
||||
if (!entry.codexAppStateDirty) return;
|
||||
entry.codexAppStateDirty = false;
|
||||
writeCodexAppTurnState(sessionId, entry);
|
||||
}, CODEX_APP_STATE_FLUSH_DELAY_MS);
|
||||
if (typeof entry.codexAppStateTimer.unref === 'function') entry.codexAppStateTimer.unref();
|
||||
}
|
||||
|
||||
function cleanupCodexAppTurnState(sessionId, entry) {
|
||||
if (entry?.codexAppStateTimer) {
|
||||
clearTimeout(entry.codexAppStateTimer);
|
||||
entry.codexAppStateTimer = null;
|
||||
}
|
||||
if (entry) entry.codexAppStateCleaned = true;
|
||||
cleanRunDir(sessionId);
|
||||
}
|
||||
|
||||
function loadCodexAppTurnState(sessionId) {
|
||||
try {
|
||||
const statePath = codexAppStatePath(sessionId);
|
||||
if (!fs.existsSync(statePath)) return null;
|
||||
const state = JSON.parse(fs.readFileSync(statePath, 'utf8'));
|
||||
if (!state || state.agent !== 'codexapp') return null;
|
||||
return state;
|
||||
} catch (err) {
|
||||
plog('WARN', 'codex_app_state_load_failed', {
|
||||
sessionId: String(sessionId || '').slice(0, 8),
|
||||
error: err?.message || String(err || ''),
|
||||
});
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function hasCodexAppTurnMessage(session, turnKey) {
|
||||
return Array.isArray(session?.messages)
|
||||
&& session.messages.some((message) => message?.codexAppTurnKey === turnKey);
|
||||
}
|
||||
|
||||
function recoverCodexAppTurnState(sessionId) {
|
||||
const state = loadCodexAppTurnState(sessionId);
|
||||
if (!state) {
|
||||
cleanRunDir(sessionId);
|
||||
return false;
|
||||
}
|
||||
|
||||
const session = loadSession(sessionId);
|
||||
if (!session || !isCodexAppSession(session)) {
|
||||
cleanRunDir(sessionId);
|
||||
return true;
|
||||
}
|
||||
|
||||
const fullText = String(state.fullText || '');
|
||||
const toolCalls = Array.isArray(state.toolCalls) ? state.toolCalls : [];
|
||||
const hasRecoverableContent = fullText.trim() || toolCalls.length > 0;
|
||||
const turnKey = codexAppTurnKey(sessionId, state);
|
||||
let changed = false;
|
||||
|
||||
if (state.threadId) {
|
||||
setRuntimeSessionId(session, state.threadId);
|
||||
changed = true;
|
||||
}
|
||||
if (state.lastUsage) {
|
||||
session.totalUsage = state.lastUsage;
|
||||
changed = true;
|
||||
}
|
||||
|
||||
if (hasRecoverableContent && !hasCodexAppTurnMessage(session, turnKey)) {
|
||||
const interrupted = state.turnStatus !== 'completed';
|
||||
session.messages.push({
|
||||
role: 'assistant',
|
||||
content: fullText,
|
||||
toolCalls,
|
||||
timestamp: state.updatedAt || new Date().toISOString(),
|
||||
codexAppTurnKey: turnKey,
|
||||
codexAppThreadId: state.threadId || null,
|
||||
codexAppTurnId: state.turnId || null,
|
||||
codexAppRecoveredPartial: interrupted,
|
||||
interrupted,
|
||||
});
|
||||
if (interrupted) {
|
||||
session.messages.push({
|
||||
role: 'system',
|
||||
content: 'Codex App 服务重启前的未完成输出已恢复,原运行任务已中断。',
|
||||
timestamp: new Date().toISOString(),
|
||||
codexAppTurnKey: `${turnKey}:notice`,
|
||||
});
|
||||
}
|
||||
session.hasUnread = true;
|
||||
changed = true;
|
||||
plog('INFO', 'codex_app_state_recovered', {
|
||||
sessionId: sessionId.slice(0, 8),
|
||||
threadId: state.threadId || null,
|
||||
turnId: state.turnId || null,
|
||||
responseLen: fullText.length,
|
||||
toolCallCount: toolCalls.length,
|
||||
interrupted,
|
||||
});
|
||||
} else {
|
||||
plog('INFO', 'codex_app_state_recovery_skipped', {
|
||||
sessionId: sessionId.slice(0, 8),
|
||||
threadId: state.threadId || null,
|
||||
turnId: state.turnId || null,
|
||||
duplicate: hasCodexAppTurnMessage(session, turnKey),
|
||||
hasRecoverableContent: !!hasRecoverableContent,
|
||||
});
|
||||
}
|
||||
|
||||
if (changed) {
|
||||
session.updated = new Date().toISOString();
|
||||
saveSession(session);
|
||||
}
|
||||
cleanRunDir(sessionId);
|
||||
return true;
|
||||
}
|
||||
|
||||
function sendSessionList(ws) {
|
||||
try {
|
||||
const files = fs.readdirSync(SESSIONS_DIR).filter(f => f.endsWith('.json'));
|
||||
@@ -2900,6 +3092,11 @@ function recoverProcesses() {
|
||||
const session = loadSession(sessionId);
|
||||
const agent = getSessionAgent(session);
|
||||
|
||||
if (fs.existsSync(codexAppStatePath(sessionId))) {
|
||||
recoverCodexAppTurnState(sessionId);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!fs.existsSync(pidPath)) {
|
||||
try { fs.rmSync(dir, { recursive: true }); } catch {}
|
||||
continue;
|
||||
@@ -4465,6 +4662,7 @@ function handleCodexAppNotification(notification) {
|
||||
}
|
||||
|
||||
const result = codexAppRuntime.processCodexAppNotification(routed.entry, notification, routed.sessionId);
|
||||
persistCodexAppTurnState(routed.sessionId, routed.entry, { immediate: !!result?.done });
|
||||
if (result?.done) {
|
||||
handleCodexAppTurnComplete(routed.sessionId);
|
||||
}
|
||||
@@ -4936,6 +5134,7 @@ function handleCodexAppMessage(ws, session, runtimeTextValue, resolvedAttachment
|
||||
startedAt: new Date().toISOString(),
|
||||
};
|
||||
activeCodexAppTurns.set(session.id, entry);
|
||||
persistCodexAppTurnState(session.id, entry, { immediate: true });
|
||||
sendSessionList(ws);
|
||||
|
||||
startCodexAppTurn(session.id, input).catch((err) => {
|
||||
@@ -4970,9 +5169,11 @@ async function startCodexAppTurn(sessionId, input) {
|
||||
setRuntimeSessionId(session, threadId);
|
||||
session.updated = new Date().toISOString();
|
||||
saveSession(session);
|
||||
persistCodexAppTurnState(sessionId, entry, { immediate: true });
|
||||
|
||||
const turn = await client.request('turn/start', codexAppTurnParams(session, input, threadId, entry.clientUserMessageId), 60000);
|
||||
if (turn?.turn?.id) entry.turnId = turn.turn.id;
|
||||
persistCodexAppTurnState(sessionId, entry, { immediate: true });
|
||||
}
|
||||
|
||||
function handleCodexAppTurnComplete(sessionId, options = {}) {
|
||||
@@ -4987,12 +5188,17 @@ function handleCodexAppTurnComplete(sessionId, options = {}) {
|
||||
: null;
|
||||
|
||||
const session = loadSession(sessionId);
|
||||
if (session && ((entry.fullText || '').trim() || (entry.toolCalls || []).length > 0)) {
|
||||
const turnKey = codexAppTurnKey(sessionId, entry);
|
||||
if (session && ((entry.fullText || '').trim() || (entry.toolCalls || []).length > 0) && !hasCodexAppTurnMessage(session, turnKey)) {
|
||||
session.messages.push({
|
||||
role: 'assistant',
|
||||
content: entry.fullText || '',
|
||||
toolCalls: entry.toolCalls || [],
|
||||
timestamp: new Date().toISOString(),
|
||||
codexAppTurnKey: turnKey,
|
||||
codexAppThreadId: entry.threadId || null,
|
||||
codexAppTurnId: entry.turnId || null,
|
||||
interrupted: !!options.interrupted,
|
||||
});
|
||||
session.updated = new Date().toISOString();
|
||||
if (!entry.ws) session.hasUnread = true;
|
||||
@@ -5000,6 +5206,7 @@ function handleCodexAppTurnComplete(sessionId, options = {}) {
|
||||
}
|
||||
|
||||
activeCodexAppTurns.delete(sessionId);
|
||||
cleanupCodexAppTurnState(sessionId, entry);
|
||||
if (entry.crossConversationReplyRequestId) {
|
||||
completeCrossConversationReply(entry.crossConversationReplyRequestId, entry, session);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user