Files
cc-web/lib/codex-app-worker-client.js
2026-06-16 09:09:23 +08:00

207 lines
5.9 KiB
JavaScript

'use strict';
const path = require('path');
const { fork } = require('child_process');
function createCodexAppWorkerClient(options = {}) {
const workerPath = options.workerPath || path.join(__dirname, 'codex-app-worker.js');
const onNotification = typeof options.onNotification === 'function' ? options.onNotification : () => {};
const onServerRequest = typeof options.onServerRequest === 'function' ? options.onServerRequest : null;
const onExit = typeof options.onExit === 'function' ? options.onExit : () => {};
const onLog = typeof options.onLog === 'function' ? options.onLog : () => {};
let worker = null;
let nextId = 1;
let configured = false;
let workerExited = false;
let appServerRunning = false;
const pending = new Map();
function rejectAllPending(err) {
for (const [, item] of pending) {
clearTimeout(item.timer);
item.reject(err);
}
pending.clear();
}
function specPayload() {
return {
command: options.command,
args: Array.isArray(options.args) ? options.args : [],
env: options.env || process.env,
cwd: options.cwd || process.cwd(),
clientInfo: options.clientInfo || null,
};
}
function ensureWorker() {
if (worker && !workerExited) return worker;
workerExited = false;
configured = false;
appServerRunning = false;
worker = fork(workerPath, [], {
cwd: options.cwd || process.cwd(),
env: process.env,
stdio: ['ignore', 'ignore', 'ignore', 'ipc'],
});
worker.on('message', (message = {}) => {
if (Object.prototype.hasOwnProperty.call(message, 'id')) {
const item = pending.get(message.id);
if (!item) return;
pending.delete(message.id);
clearTimeout(item.timer);
if (message.error) {
const err = new Error(message.error.message || 'Codex App worker 请求失败。');
err.code = message.error.code;
err.data = message.error.data;
item.reject(err);
} else {
item.resolve(message.result || {});
}
return;
}
if (message.type === 'notification') {
onNotification(message.notification);
return;
}
if (message.type === 'serverRequest') {
handleServerRequest(message);
return;
}
if (message.type === 'exit') {
appServerRunning = false;
onExit(message.info || {});
return;
}
if (message.type === 'log') {
onLog(message.level || 'INFO', message.event || 'codex_app_worker_log', message.data || {});
}
});
worker.on('exit', (code, signal) => {
workerExited = true;
configured = false;
appServerRunning = false;
rejectAllPending(new Error(`Codex App worker 已退出: code=${code ?? 'null'} signal=${signal || 'null'}`));
onExit({ code, signal, stderr: 'Codex App worker process exited' });
});
worker.on('error', (err) => {
workerExited = true;
configured = false;
appServerRunning = false;
rejectAllPending(err);
onExit({ code: null, signal: null, stderr: err.message });
});
return worker;
}
function sendWorker(type, payload = {}, timeoutMs = 300000) {
const proc = ensureWorker();
const id = nextId++;
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
pending.delete(id);
reject(new Error(`Codex App worker 请求超时: ${type}`));
}, timeoutMs);
pending.set(id, { resolve, reject, timer, type });
try {
proc.send({ id, type, ...payload });
} catch (err) {
clearTimeout(timer);
pending.delete(id);
reject(err);
}
});
}
function sendWorkerNotification(type, payload = {}) {
const proc = ensureWorker();
proc.send({ type, ...payload });
}
function handleServerRequest(message) {
const requestId = message.requestId;
if (!requestId) return;
if (!onServerRequest) {
sendWorkerNotification('serverRequestResult', {
requestId,
error: { code: -32601, message: 'cc-web 暂不支持 Codex app-server 请求。' },
});
return;
}
Promise.resolve()
.then(() => onServerRequest(message.request || {}))
.then((result) => {
sendWorkerNotification('serverRequestResult', { requestId, result: result || {} });
})
.catch((err) => {
sendWorkerNotification('serverRequestResult', {
requestId,
error: { code: -32603, message: err?.message || 'cc-web 处理 Codex App worker 请求失败。' },
});
});
}
async function configureIfNeeded() {
if (configured) return;
await sendWorker('configure', { spec: specPayload() }, 30000);
configured = true;
}
async function start() {
await configureIfNeeded();
const result = await sendWorker('start', {}, 30000);
appServerRunning = true;
return result;
}
async function request(method, params = {}, timeoutMs = 300000) {
await configureIfNeeded();
return sendWorker('request', { method, params, timeoutMs }, timeoutMs + 1000);
}
function notification(method, params = {}) {
sendWorkerNotification('notification', { method, params });
}
async function reloadMcpServers() {
await configureIfNeeded();
return sendWorker('reloadMcpServers', {}, 30000);
}
function stop() {
appServerRunning = false;
configured = false;
if (worker && !workerExited) {
try { worker.send({ type: 'stop' }); } catch {}
setTimeout(() => {
try {
if (worker && !worker.killed) worker.kill('SIGKILL');
} catch {}
}, 3000);
}
rejectAllPending(new Error('Codex App worker 已停止。'));
}
function isRunning() {
return !!worker && !workerExited && appServerRunning;
}
return {
start,
stop,
request,
notification,
reloadMcpServers,
isRunning,
pid: () => worker?.pid || null,
};
}
module.exports = { createCodexAppWorkerClient };