feat: add cross conversation messaging
This commit is contained in:
@@ -11,6 +11,7 @@ const REPO_DIR = path.resolve(__dirname, '..');
|
||||
const SERVER_PATH = path.join(REPO_DIR, 'server.js');
|
||||
const MOCK_CLAUDE = path.join(REPO_DIR, 'scripts', 'mock-claude.js');
|
||||
const MOCK_CODEX = path.join(REPO_DIR, 'scripts', 'mock-codex.js');
|
||||
const HAS_SQLITE3 = spawnSync('sqlite3', ['-version'], { stdio: 'ignore' }).status === 0;
|
||||
|
||||
function mkdirp(dir) {
|
||||
fs.mkdirSync(dir, { recursive: true });
|
||||
@@ -39,6 +40,7 @@ function assert(condition, message) {
|
||||
}
|
||||
|
||||
function sql(dbPath, statement) {
|
||||
if (!HAS_SQLITE3) throw new Error('sqlite3 is not available');
|
||||
const result = spawnSync('sqlite3', [dbPath, statement], { encoding: 'utf8' });
|
||||
if (result.status !== 0) throw new Error(result.stderr || `sqlite3 failed: ${statement}`);
|
||||
return result.stdout.trim();
|
||||
@@ -102,6 +104,43 @@ function connectWs(port, password) {
|
||||
});
|
||||
}
|
||||
|
||||
function assertWsUpgradeRejected(port, pathname) {
|
||||
return new Promise((resolve, reject) => {
|
||||
let settled = false;
|
||||
const ws = new WebSocket(`ws://127.0.0.1:${port}${pathname}`);
|
||||
const timer = setTimeout(() => finish(reject, new Error(`WebSocket upgrade was not rejected for ${pathname}`)), 5000);
|
||||
|
||||
function finish(done, value) {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
clearTimeout(timer);
|
||||
if (ws.readyState === WebSocket.OPEN || ws.readyState === WebSocket.CONNECTING) {
|
||||
try { ws.terminate(); } catch {}
|
||||
}
|
||||
done(value);
|
||||
}
|
||||
|
||||
ws.on('open', () => {
|
||||
finish(reject, new Error(`Unexpected WebSocket connection opened for ${pathname}`));
|
||||
});
|
||||
ws.on('unexpected-response', (req, res) => {
|
||||
res.resume();
|
||||
if (res.statusCode === 404) {
|
||||
finish(resolve);
|
||||
return;
|
||||
}
|
||||
finish(reject, new Error(`Expected 404 for ${pathname}, got ${res.statusCode}`));
|
||||
});
|
||||
ws.on('error', (err) => {
|
||||
if (/Unexpected server response: 404/.test(err.message || '')) {
|
||||
finish(resolve);
|
||||
return;
|
||||
}
|
||||
finish(reject, err);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
async function uploadAttachment(port, token, { filename, mime, data }) {
|
||||
const response = await fetch(`http://127.0.0.1:${port}/api/attachments`, {
|
||||
method: 'POST',
|
||||
@@ -128,6 +167,22 @@ async function fetchAuthedJson(port, token, pathname) {
|
||||
return payload;
|
||||
}
|
||||
|
||||
async function callInternalMcp(port, token, payload) {
|
||||
const response = await fetch(`http://127.0.0.1:${port}/api/internal/mcp`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'X-CC-Web-MCP-Token': token,
|
||||
},
|
||||
body: JSON.stringify(payload),
|
||||
});
|
||||
let body = null;
|
||||
try {
|
||||
body = await response.json();
|
||||
} catch {}
|
||||
return { status: response.status, body };
|
||||
}
|
||||
|
||||
function nextMessage(messages, ws, predicate, timeoutMs = 15000) {
|
||||
const callSite = (() => {
|
||||
const stack = String(new Error().stack || '').split('\n');
|
||||
@@ -220,86 +275,90 @@ function createFakeCodexHistory(homeDir) {
|
||||
];
|
||||
fs.writeFileSync(rolloutPath, `${rolloutLines.join('\n')}\n`);
|
||||
|
||||
const stateDb = path.join(homeDir, '.codex', 'state_5.sqlite');
|
||||
mkdirp(path.dirname(stateDb));
|
||||
sql(stateDb, `
|
||||
PRAGMA journal_mode = WAL;
|
||||
CREATE TABLE IF NOT EXISTS threads (
|
||||
id TEXT PRIMARY KEY,
|
||||
rollout_path TEXT NOT NULL,
|
||||
created_at INTEGER NOT NULL,
|
||||
updated_at INTEGER NOT NULL,
|
||||
source TEXT NOT NULL,
|
||||
model_provider TEXT NOT NULL,
|
||||
cwd TEXT NOT NULL,
|
||||
title TEXT NOT NULL,
|
||||
sandbox_policy TEXT NOT NULL,
|
||||
approval_mode TEXT NOT NULL,
|
||||
tokens_used INTEGER NOT NULL DEFAULT 0,
|
||||
has_user_event INTEGER NOT NULL DEFAULT 0,
|
||||
archived INTEGER NOT NULL DEFAULT 0,
|
||||
archived_at INTEGER,
|
||||
git_sha TEXT,
|
||||
git_branch TEXT,
|
||||
git_origin_url TEXT,
|
||||
cli_version TEXT NOT NULL DEFAULT '',
|
||||
first_user_message TEXT NOT NULL DEFAULT '',
|
||||
agent_nickname TEXT,
|
||||
agent_role TEXT,
|
||||
memory_mode TEXT NOT NULL DEFAULT 'enabled'
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS stage1_outputs (
|
||||
thread_id TEXT PRIMARY KEY,
|
||||
source_updated_at INTEGER NOT NULL,
|
||||
raw_memory TEXT NOT NULL,
|
||||
rollout_summary TEXT NOT NULL,
|
||||
generated_at INTEGER NOT NULL
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS thread_dynamic_tools (
|
||||
thread_id TEXT NOT NULL,
|
||||
position INTEGER NOT NULL,
|
||||
name TEXT NOT NULL,
|
||||
description TEXT NOT NULL,
|
||||
input_schema TEXT NOT NULL,
|
||||
PRIMARY KEY(thread_id, position)
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS logs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
ts INTEGER NOT NULL,
|
||||
ts_nanos INTEGER NOT NULL,
|
||||
level TEXT NOT NULL,
|
||||
target TEXT NOT NULL,
|
||||
message TEXT,
|
||||
module_path TEXT,
|
||||
file TEXT,
|
||||
line INTEGER,
|
||||
thread_id TEXT,
|
||||
process_uuid TEXT,
|
||||
estimated_bytes INTEGER NOT NULL DEFAULT 0
|
||||
);
|
||||
INSERT INTO threads (id, rollout_path, created_at, updated_at, source, model_provider, cwd, title, sandbox_policy, approval_mode, cli_version)
|
||||
VALUES ('${threadId}', '${rolloutPath.replace(/'/g, "''")}', 1, 2, 'exec', 'OpenAI', '/tmp/project-b', 'Codex import prompt', '{}', 'never', '0.114.0');
|
||||
INSERT INTO logs (ts, ts_nanos, level, target, thread_id) VALUES (1, 0, 'INFO', 'test', '${threadId}');
|
||||
`);
|
||||
let stateDb = null;
|
||||
let logsDb = null;
|
||||
if (HAS_SQLITE3) {
|
||||
stateDb = path.join(homeDir, '.codex', 'state_5.sqlite');
|
||||
mkdirp(path.dirname(stateDb));
|
||||
sql(stateDb, `
|
||||
PRAGMA journal_mode = WAL;
|
||||
CREATE TABLE IF NOT EXISTS threads (
|
||||
id TEXT PRIMARY KEY,
|
||||
rollout_path TEXT NOT NULL,
|
||||
created_at INTEGER NOT NULL,
|
||||
updated_at INTEGER NOT NULL,
|
||||
source TEXT NOT NULL,
|
||||
model_provider TEXT NOT NULL,
|
||||
cwd TEXT NOT NULL,
|
||||
title TEXT NOT NULL,
|
||||
sandbox_policy TEXT NOT NULL,
|
||||
approval_mode TEXT NOT NULL,
|
||||
tokens_used INTEGER NOT NULL DEFAULT 0,
|
||||
has_user_event INTEGER NOT NULL DEFAULT 0,
|
||||
archived INTEGER NOT NULL DEFAULT 0,
|
||||
archived_at INTEGER,
|
||||
git_sha TEXT,
|
||||
git_branch TEXT,
|
||||
git_origin_url TEXT,
|
||||
cli_version TEXT NOT NULL DEFAULT '',
|
||||
first_user_message TEXT NOT NULL DEFAULT '',
|
||||
agent_nickname TEXT,
|
||||
agent_role TEXT,
|
||||
memory_mode TEXT NOT NULL DEFAULT 'enabled'
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS stage1_outputs (
|
||||
thread_id TEXT PRIMARY KEY,
|
||||
source_updated_at INTEGER NOT NULL,
|
||||
raw_memory TEXT NOT NULL,
|
||||
rollout_summary TEXT NOT NULL,
|
||||
generated_at INTEGER NOT NULL
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS thread_dynamic_tools (
|
||||
thread_id TEXT NOT NULL,
|
||||
position INTEGER NOT NULL,
|
||||
name TEXT NOT NULL,
|
||||
description TEXT NOT NULL,
|
||||
input_schema TEXT NOT NULL,
|
||||
PRIMARY KEY(thread_id, position)
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS logs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
ts INTEGER NOT NULL,
|
||||
ts_nanos INTEGER NOT NULL,
|
||||
level TEXT NOT NULL,
|
||||
target TEXT NOT NULL,
|
||||
message TEXT,
|
||||
module_path TEXT,
|
||||
file TEXT,
|
||||
line INTEGER,
|
||||
thread_id TEXT,
|
||||
process_uuid TEXT,
|
||||
estimated_bytes INTEGER NOT NULL DEFAULT 0
|
||||
);
|
||||
INSERT INTO threads (id, rollout_path, created_at, updated_at, source, model_provider, cwd, title, sandbox_policy, approval_mode, cli_version)
|
||||
VALUES ('${threadId}', '${rolloutPath.replace(/'/g, "''")}', 1, 2, 'exec', 'OpenAI', '/tmp/project-b', 'Codex import prompt', '{}', 'never', '0.114.0');
|
||||
INSERT INTO logs (ts, ts_nanos, level, target, thread_id) VALUES (1, 0, 'INFO', 'test', '${threadId}');
|
||||
`);
|
||||
|
||||
const logsDb = path.join(homeDir, '.codex', 'logs_1.sqlite');
|
||||
sql(logsDb, `
|
||||
CREATE TABLE IF NOT EXISTS logs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
ts INTEGER NOT NULL,
|
||||
ts_nanos INTEGER NOT NULL,
|
||||
level TEXT NOT NULL,
|
||||
target TEXT NOT NULL,
|
||||
message TEXT,
|
||||
module_path TEXT,
|
||||
file TEXT,
|
||||
line INTEGER,
|
||||
thread_id TEXT,
|
||||
process_uuid TEXT,
|
||||
estimated_bytes INTEGER NOT NULL DEFAULT 0
|
||||
);
|
||||
INSERT INTO logs (ts, ts_nanos, level, target, thread_id) VALUES (1, 0, 'INFO', 'test', '${threadId}');
|
||||
`);
|
||||
logsDb = path.join(homeDir, '.codex', 'logs_1.sqlite');
|
||||
sql(logsDb, `
|
||||
CREATE TABLE IF NOT EXISTS logs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
ts INTEGER NOT NULL,
|
||||
ts_nanos INTEGER NOT NULL,
|
||||
level TEXT NOT NULL,
|
||||
target TEXT NOT NULL,
|
||||
message TEXT,
|
||||
module_path TEXT,
|
||||
file TEXT,
|
||||
line INTEGER,
|
||||
thread_id TEXT,
|
||||
process_uuid TEXT,
|
||||
estimated_bytes INTEGER NOT NULL DEFAULT 0
|
||||
);
|
||||
INSERT INTO logs (ts, ts_nanos, level, target, thread_id) VALUES (1, 0, 'INFO', 'test', '${threadId}');
|
||||
`);
|
||||
}
|
||||
|
||||
return { threadId, rolloutPath, stateDb, logsDb };
|
||||
}
|
||||
@@ -344,10 +403,12 @@ async function main() {
|
||||
|
||||
const port = await getFreePort();
|
||||
const password = 'Regression!234';
|
||||
const internalMcpToken = 'RegressionMcp!234';
|
||||
|
||||
await withServer({
|
||||
PORT: String(port),
|
||||
CC_WEB_PASSWORD: password,
|
||||
CC_WEB_INTERNAL_MCP_TOKEN: internalMcpToken,
|
||||
CC_WEB_CONFIG_DIR: configDir,
|
||||
CC_WEB_SESSIONS_DIR: sessionsDir,
|
||||
CC_WEB_LOGS_DIR: logsDir,
|
||||
@@ -355,6 +416,8 @@ async function main() {
|
||||
CLAUDE_PATH: MOCK_CLAUDE,
|
||||
CODEX_PATH: MOCK_CODEX,
|
||||
}, async () => {
|
||||
await assertWsUpgradeRejected(port, '/not-ws');
|
||||
|
||||
const { ws, messages, token } = await connectWs(port, password);
|
||||
|
||||
await nextMessage(messages, ws, (msg) => msg.type === 'session_list');
|
||||
@@ -409,6 +472,62 @@ async function main() {
|
||||
assert(codexSession.mode === 'plan', 'Codex new_session should follow requested mode');
|
||||
assert(codexSession.model === 'gpt-5.5(xhigh)', 'Codex new_session should read default model from ~/.codex/config.toml');
|
||||
|
||||
const mcpList = await callInternalMcp(port, internalMcpToken, {
|
||||
tool: 'ccweb_list_conversations',
|
||||
sourceSessionId: codexSession.sessionId,
|
||||
args: { agent: 'codex', limit: 20 },
|
||||
});
|
||||
assert(mcpList.status === 200 && mcpList.body?.ok, 'MCP conversation list should succeed');
|
||||
assert(mcpList.body.currentConversationId === codexSession.sessionId, 'MCP list should return current source conversation id');
|
||||
assert(mcpList.body.conversations.some((item) => item.id === codexSession.sessionId && !item.summary), 'MCP list should return lightweight session metadata without summary');
|
||||
|
||||
const crossTargetCwd = path.join(tempRoot, 'codex-mcp-cross-target');
|
||||
mkdirp(crossTargetCwd);
|
||||
ws.send(JSON.stringify({ type: 'new_session', agent: 'codex', cwd: crossTargetCwd, mode: 'yolo' }));
|
||||
const crossTargetSession = await nextMessage(messages, ws, (msg) => msg.type === 'session_info' && msg.agent === 'codex' && msg.cwd === crossTargetCwd);
|
||||
const crossSend = await callInternalMcp(port, internalMcpToken, {
|
||||
tool: 'ccweb_send_message',
|
||||
sourceSessionId: codexSession.sessionId,
|
||||
sourceHopCount: 0,
|
||||
args: {
|
||||
targetConversationId: crossTargetSession.sessionId,
|
||||
content: 'cross hello from mcp',
|
||||
},
|
||||
});
|
||||
assert(crossSend.status === 200 && crossSend.body?.ok, `MCP cross send should succeed: ${JSON.stringify(crossSend.body)}`);
|
||||
const crossUserBubble = await nextMessage(messages, ws, (msg) => (
|
||||
msg.type === 'session_message' &&
|
||||
msg.sessionId === crossTargetSession.sessionId &&
|
||||
msg.message?.crossConversation?.sourceSessionId === codexSession.sessionId &&
|
||||
msg.message?.content === 'cross hello from mcp'
|
||||
));
|
||||
assert(crossUserBubble.message.crossConversation.hopCount === 1, 'Cross message should persist hop count');
|
||||
await nextMessage(messages, ws, (msg) => msg.type === 'done' && msg.sessionId === crossTargetSession.sessionId);
|
||||
const storedCrossTarget = JSON.parse(fs.readFileSync(path.join(sessionsDir, `${crossTargetSession.sessionId}.json`), 'utf8'));
|
||||
const storedCrossMessage = storedCrossTarget.messages.find((message) => message.crossConversation?.messageId === crossSend.body.messageId);
|
||||
assert(storedCrossMessage?.content === 'cross hello from mcp', 'Cross message should be persisted in target session');
|
||||
assert(storedCrossMessage.crossConversation.sourceTitle === codexSession.title, 'Cross message should persist source title');
|
||||
assert(storedCrossTarget.messages.some((message) => message.role === 'assistant' && /来自/.test(String(message.content || ''))), 'Cross message runtime prompt should include source context for the target agent');
|
||||
|
||||
const hopLimit = await callInternalMcp(port, internalMcpToken, {
|
||||
tool: 'ccweb_send_message',
|
||||
sourceSessionId: crossTargetSession.sessionId,
|
||||
sourceHopCount: 1,
|
||||
args: {
|
||||
targetConversationId: codexSession.sessionId,
|
||||
content: 'this should be blocked by hop limit',
|
||||
},
|
||||
});
|
||||
assert(hopLimit.status === 400 && hopLimit.body?.code === 'hop_limit_exceeded', 'MCP cross send should enforce hop limit');
|
||||
|
||||
const processLogAfterMcp = fs.readFileSync(path.join(logsDir, 'process.log'), 'utf8');
|
||||
const mcpSpawnLine = processLogAfterMcp
|
||||
.trim()
|
||||
.split('\n')
|
||||
.find((line) => line.includes(`"event":"process_spawn"`) && line.includes(crossTargetSession.sessionId.slice(0, 8)));
|
||||
assert(mcpSpawnLine && mcpSpawnLine.includes('mcp_servers.ccweb.command') && mcpSpawnLine.includes('mcp_servers.ccweb.env_vars'), 'Codex spawn should inject ccweb MCP config');
|
||||
assert(!mcpSpawnLine.includes(internalMcpToken), 'Codex spawn log should not expose internal MCP token');
|
||||
|
||||
ws.send(JSON.stringify({ type: 'list_cwd_suggestions' }));
|
||||
const cwdSuggestions = await nextMessage(messages, ws, (msg) => msg.type === 'cwd_suggestions');
|
||||
assert(cwdSuggestions.defaultPath === homeDir, 'CWD suggestions should expose HOME as default path');
|
||||
@@ -573,7 +692,9 @@ async function main() {
|
||||
|
||||
assert(!fs.existsSync(path.join(sessionsDir, `${importedSessionId}.json`)), 'Deleting Codex session did not remove session JSON');
|
||||
assert(!fs.existsSync(codexFixture.rolloutPath), 'Deleting Codex session did not remove rollout file');
|
||||
assert(sql(codexFixture.stateDb, `select count(*) from threads where id='${codexFixture.threadId}'`) === '0', 'Deleting Codex session did not remove thread row');
|
||||
if (codexFixture.stateDb) {
|
||||
assert(sql(codexFixture.stateDb, `select count(*) from threads where id='${codexFixture.threadId}'`) === '0', 'Deleting Codex session did not remove thread row');
|
||||
}
|
||||
|
||||
ws.close();
|
||||
console.log('Regression checks passed.');
|
||||
|
||||
Reference in New Issue
Block a user