From ea433d6735c94a200a7b7f636042bfcc0a770700 Mon Sep 17 00:00:00 2001 From: awsl233777 Date: Sun, 12 Apr 2026 08:58:02 +0000 Subject: [PATCH 01/42] feat: add task orchestration tab and task CLI --- cli.js | 1690 ++++++++++++++++- lib/task-orchestrator.js | 839 ++++++++ tests/e2e/run.js | 2 + tests/e2e/test-task-orchestration.js | 173 ++ tests/unit/config-tabs-ui.test.mjs | 18 + tests/unit/run.mjs | 1 + tests/unit/task-orchestrator.test.mjs | 156 ++ tests/unit/web-ui-behavior-parity.test.mjs | 36 +- web-ui/app.js | 33 +- web-ui/index.html | 1 + web-ui/modules/app.computed.dashboard.mjs | 3 + web-ui/modules/app.computed.index.mjs | 2 + web-ui/modules/app.computed.main-tabs.mjs | 51 + web-ui/modules/app.methods.index.mjs | 4 +- .../app.methods.task-orchestration.mjs | 402 ++++ web-ui/partials/index/layout-header.html | 55 +- .../partials/index/panel-orchestration.html | 258 +++ web-ui/session-helpers.mjs | 10 + web-ui/styles.css | 1 + web-ui/styles/task-orchestration.css | 219 +++ 20 files changed, 3844 insertions(+), 110 deletions(-) create mode 100644 lib/task-orchestrator.js create mode 100644 tests/e2e/test-task-orchestration.js create mode 100644 tests/unit/task-orchestrator.test.mjs create mode 100644 web-ui/modules/app.computed.main-tabs.mjs create mode 100644 web-ui/modules/app.methods.task-orchestration.mjs create mode 100644 web-ui/partials/index/panel-orchestration.html create mode 100644 web-ui/styles/task-orchestration.css diff --git a/cli.js b/cli.js index f664b7af..ec4c2259 100644 --- a/cli.js +++ b/cli.js @@ -65,6 +65,12 @@ const { validateWorkflowDefinition, executeWorkflowDefinition } = require('./lib/workflow-engine'); +const { + truncateText: truncateTaskText, + buildTaskPlan, + validateTaskPlan, + executeTaskPlan +} = require('./lib/task-orchestrator'); const { buildConfigHealthReport: buildConfigHealthReportCore } = require('./cli/config-health'); const { createAuthProfileController @@ -130,6 +136,9 @@ const CLAUDE_PROJECTS_DIR = path.join(os.homedir(), '.claude', 'projects'); const RECENT_CONFIGS_FILE = path.join(CONFIG_DIR, 'recent-configs.json'); const WORKFLOW_DEFINITIONS_FILE = path.join(CONFIG_DIR, 'codexmate-workflows.json'); const WORKFLOW_RUNS_FILE = path.join(CONFIG_DIR, 'codexmate-workflow-runs.jsonl'); +const TASK_QUEUE_FILE = path.join(CONFIG_DIR, 'codexmate-task-queue.json'); +const TASK_RUNS_FILE = path.join(CONFIG_DIR, 'codexmate-task-runs.jsonl'); +const TASK_RUN_DETAILS_DIR = path.join(CONFIG_DIR, 'codexmate-task-runs'); const DEFAULT_CLAUDE_MODEL = 'glm-4.7'; const DEFAULT_MODEL_CONTEXT_WINDOW = 190000; const DEFAULT_MODEL_AUTO_COMPACT_TOKEN_LIMIT = 185000; @@ -185,6 +194,8 @@ const MAX_SKILLS_ZIP_UNCOMPRESSED_BYTES = 512 * 1024 * 1024; const DEFAULT_EXTRACT_SUFFIXES = Object.freeze(['.json']); const DOWNLOAD_ARTIFACT_TTL_MS = 10 * 60 * 1000; const g_downloadArtifacts = new Map(); +const g_taskRunControllers = new Map(); +let g_taskQueueProcessor = null; const BUILTIN_PROXY_PROVIDER_NAME = 'codexmate-proxy'; const DEFAULT_BUILTIN_PROXY_SETTINGS = Object.freeze({ enabled: false, @@ -9425,6 +9436,111 @@ function createWebServer({ htmlPath, assetsDir, webDir, host, port, openBrowser }; } break; + case 'task-overview': + result = buildTaskOverviewPayload(params || {}); + break; + case 'task-plan': + { + const plan = coerceTaskPlanPayload(params || {}); + const validation = validatePreparedTaskPlan(plan); + result = { + ok: validation.ok, + plan, + issues: validation.issues || [], + warnings: validation.warnings || [] + }; + if (!validation.ok) { + result.error = validation.error || 'task plan validation failed'; + } + } + break; + case 'task-run': + { + const detach = !!(params && params.detach); + if (detach) { + const plan = coerceTaskPlanPayload(params || {}); + const validation = validatePreparedTaskPlan(plan); + if (!validation.ok) { + result = { + ok: false, + error: validation.error || 'task plan validation failed', + issues: validation.issues || [], + warnings: validation.warnings || [] + }; + break; + } + const taskId = typeof params.taskId === 'string' && params.taskId.trim() ? params.taskId.trim() : createTaskId(); + const runId = createTaskRunId(); + runTaskPlanInternal(plan, { taskId, runId }).catch(() => {}); + result = { + ok: true, + started: true, + detached: true, + taskId, + runId, + warnings: validation.warnings || [] + }; + } else { + result = await runTaskNow(params || {}); + } + } + break; + case 'task-runs': + { + const rawLimit = params && Number.isFinite(params.limit) ? params.limit : parseInt(params && params.limit, 10); + const limit = Number.isFinite(rawLimit) ? Math.max(1, Math.floor(rawLimit)) : 20; + result = { + runs: listTaskRunRecords(limit), + limit + }; + } + break; + case 'task-run-detail': + { + const runId = params && typeof params.runId === 'string' ? params.runId.trim() : ''; + if (!runId) { + result = { error: 'runId is required' }; + break; + } + const detail = readTaskRunDetail(runId); + result = detail || { error: `task run not found: ${runId}` }; + } + break; + case 'task-queue-add': + result = addTaskToQueue(params || {}); + break; + case 'task-queue-list': + { + const rawLimit = params && Number.isFinite(params.limit) ? params.limit : parseInt(params && params.limit, 10); + const limit = Number.isFinite(rawLimit) ? Math.max(1, Math.floor(rawLimit)) : 50; + result = { + tasks: listTaskQueueItems({ limit, status: params && params.status }), + limit + }; + } + break; + case 'task-queue-show': + { + const taskId = params && typeof params.taskId === 'string' ? params.taskId.trim() : ''; + if (!taskId) { + result = { error: 'taskId is required' }; + break; + } + result = getTaskQueueItem(taskId) || { error: `task not found: ${taskId}` }; + } + break; + case 'task-queue-start': + result = await startTaskQueueProcessing(params || {}); + break; + case 'task-retry': + result = await retryTaskRun(params || {}); + break; + case 'task-cancel': + result = cancelTaskRunOrQueue(params || {}); + break; + case 'task-logs': + result = getTaskLogs(params || {}); + break; default: result = { error: '未知操作' }; } @@ -10119,139 +10235,674 @@ async function cmdWorkflow(args = []) { throw new Error(`未知 workflow 子命令: ${subcommand}`); } -// #region parseCodexProxyOptions -function parseCodexProxyOptions(args = []) { +function printTaskHelp() { + console.log('\n用法: codexmate task [参数]'); + console.log(' codexmate task plan --target "实现任务编排 Tab" --follow-up "继续处理 review"'); + console.log(' codexmate task run --target "修复失败测试" --allow-write --concurrency 2'); + console.log(' codexmate task run --target "检查请求链路" --dry-run --plan-only'); + console.log(' codexmate task runs --limit 20'); + console.log(' codexmate task queue add --target "整理 workflow 入口" --allow-write'); + console.log(' codexmate task queue list'); + console.log(' codexmate task queue show '); + console.log(' codexmate task queue start [] [--detach]'); + console.log(' codexmate task retry '); + console.log(' codexmate task cancel '); + console.log(' codexmate task logs '); + console.log('参数:'); + console.log(' --target <文本> 任务目标文本'); + console.log(' --title <文本> 任务标题'); + console.log(' --notes <文本> 附加说明'); + console.log(' --plan 直接提供任务计划对象'); + console.log(' --workflow-id 复用现有 workflow(可重复)'); + console.log(' --follow-up <文本> 追加 follow-up(可重复)'); + console.log(' --allow-write 允许写入工作区'); + console.log(' --dry-run 仅计划/预演,不执行写入'); + console.log(' --plan-only 仅输出计划,不执行'); + console.log(' --engine 选择编排引擎'); + console.log(' --concurrency 并发度'); + console.log(' --auto-fix-rounds 自动修复回合数'); + console.log(' --limit runs/queue list 数量'); + console.log(' --task-id 指定任务 ID'); + console.log(' --run-id 指定运行 ID'); + console.log(' --status <状态> queue list 状态过滤'); + console.log(' --detach 后台启动任务或队列'); + console.log(' --json 以 JSON 输出'); + console.log(); +} + +function parseTaskCliOptions(args = []) { const options = { - passthroughArgs: [], - queuedFollowUps: [] + title: '', + target: '', + notes: '', + planRaw: '', + workflowIds: [], + followUps: [], + allowWrite: false, + dryRun: false, + planOnly: false, + engine: 'codex', + concurrency: 2, + autoFixRounds: 1, + limit: 20, + taskId: '', + runId: '', + status: '', + detach: false, + json: false }; - const argv = Array.isArray(args) ? args : []; - - const pushFollowUp = (value, optionName) => { - const raw = value === undefined || value === null ? '' : String(value); - if (!raw.trim()) { + const rest = []; + const pushValue = (key, value, optionName) => { + const text = value === undefined || value === null ? '' : String(value).trim(); + if (!text) { throw new Error(`${optionName} 需要提供非空内容`); } - options.queuedFollowUps.push(raw); + options[key].push(text); }; - - for (let i = 0; i < argv.length; i++) { - const arg = argv[i]; - if (arg === undefined || arg === null) { + for (let i = 0; i < args.length; i += 1) { + const arg = String(args[i] || ''); + if (!arg) continue; + if (arg === '--allow-write') { + options.allowWrite = true; continue; } - const text = String(arg); - if (text === '--') { - options.passthroughArgs.push(...argv.slice(i).map((item) => String(item))); - break; + if (arg === '--dry-run') { + options.dryRun = true; + continue; } - if (text === '--queued-follow-up' || text === '--follow-up') { - const next = argv[i + 1]; - if (next === undefined) { - throw new Error(`${text} 需要提供内容`); - } - pushFollowUp(next, text); + if (arg === '--plan-only') { + options.planOnly = true; + continue; + } + if (arg === '--detach') { + options.detach = true; + continue; + } + if (arg === '--json') { + options.json = true; + continue; + } + if (arg === '--title') { + options.title = String(args[i + 1] || '').trim(); i += 1; continue; } - if (text.startsWith('--queued-follow-up=')) { - pushFollowUp(text.slice('--queued-follow-up='.length), '--queued-follow-up'); + if (arg.startsWith('--title=')) { + options.title = arg.slice('--title='.length).trim(); continue; } - if (text.startsWith('--follow-up=')) { - pushFollowUp(text.slice('--follow-up='.length), '--follow-up'); + if (arg === '--target') { + options.target = String(args[i + 1] || '').trim(); + i += 1; continue; } - options.passthroughArgs.push(text); + if (arg.startsWith('--target=')) { + options.target = arg.slice('--target='.length).trim(); + continue; + } + if (arg === '--notes') { + options.notes = String(args[i + 1] || '').trim(); + i += 1; + continue; + } + if (arg.startsWith('--notes=')) { + options.notes = arg.slice('--notes='.length).trim(); + continue; + } + if (arg === '--plan') { + options.planRaw = String(args[i + 1] || '').trim(); + i += 1; + continue; + } + if (arg.startsWith('--plan=')) { + options.planRaw = arg.slice('--plan='.length).trim(); + continue; + } + if (arg === '--workflow-id') { + pushValue('workflowIds', args[i + 1], '--workflow-id'); + i += 1; + continue; + } + if (arg.startsWith('--workflow-id=')) { + pushValue('workflowIds', arg.slice('--workflow-id='.length), '--workflow-id'); + continue; + } + if (arg === '--follow-up') { + pushValue('followUps', args[i + 1], '--follow-up'); + i += 1; + continue; + } + if (arg.startsWith('--follow-up=')) { + pushValue('followUps', arg.slice('--follow-up='.length), '--follow-up'); + continue; + } + if (arg === '--engine') { + options.engine = normalizeTaskEngine(args[i + 1]); + i += 1; + continue; + } + if (arg.startsWith('--engine=')) { + options.engine = normalizeTaskEngine(arg.slice('--engine='.length)); + continue; + } + if (arg === '--concurrency') { + const value = parseInt(args[i + 1], 10); + if (Number.isFinite(value)) options.concurrency = value; + i += 1; + continue; + } + if (arg.startsWith('--concurrency=')) { + const value = parseInt(arg.slice('--concurrency='.length), 10); + if (Number.isFinite(value)) options.concurrency = value; + continue; + } + if (arg === '--auto-fix-rounds') { + const value = parseInt(args[i + 1], 10); + if (Number.isFinite(value)) options.autoFixRounds = value; + i += 1; + continue; + } + if (arg.startsWith('--auto-fix-rounds=')) { + const value = parseInt(arg.slice('--auto-fix-rounds='.length), 10); + if (Number.isFinite(value)) options.autoFixRounds = value; + continue; + } + if (arg === '--limit') { + const value = parseInt(args[i + 1], 10); + if (Number.isFinite(value)) options.limit = value; + i += 1; + continue; + } + if (arg.startsWith('--limit=')) { + const value = parseInt(arg.slice('--limit='.length), 10); + if (Number.isFinite(value)) options.limit = value; + continue; + } + if (arg === '--task-id') { + options.taskId = String(args[i + 1] || '').trim(); + i += 1; + continue; + } + if (arg.startsWith('--task-id=')) { + options.taskId = arg.slice('--task-id='.length).trim(); + continue; + } + if (arg === '--run-id') { + options.runId = String(args[i + 1] || '').trim(); + i += 1; + continue; + } + if (arg.startsWith('--run-id=')) { + options.runId = arg.slice('--run-id='.length).trim(); + continue; + } + if (arg === '--status') { + options.status = String(args[i + 1] || '').trim().toLowerCase(); + i += 1; + continue; + } + if (arg.startsWith('--status=')) { + options.status = arg.slice('--status='.length).trim().toLowerCase(); + continue; + } + rest.push(arg); } - - return options; + return { options, rest }; } -// #endregion parseCodexProxyOptions -function shellEscapePosixArg(value) { - const text = value === undefined || value === null ? '' : String(value); - return `'${text.replace(/'/g, `'\"'\"'`)}'`; +function buildTaskCliPayload(options = {}, rest = []) { + const payload = { + title: options.title || '', + target: options.target || '', + notes: options.notes || '', + workflowIds: Array.isArray(options.workflowIds) ? options.workflowIds.slice() : [], + followUps: Array.isArray(options.followUps) ? options.followUps.slice() : [], + allowWrite: options.allowWrite === true, + dryRun: options.dryRun === true, + engine: options.engine || 'codex', + concurrency: options.concurrency, + autoFixRounds: options.autoFixRounds, + taskId: options.taskId || '', + runId: options.runId || '' + }; + if (!payload.target && Array.isArray(rest) && rest.length > 0) { + payload.target = rest.join(' ').trim(); + } + if (options.planRaw) { + payload.plan = parseWorkflowInputArg(options.planRaw); + } + return payload; } -// #region buildScriptCommandArgs -function buildScriptCommandArgs(commandLine) { - const platform = process.platform; - // util-linux script needs -e/--return to propagate child exit code. - if (platform === 'linux' || platform === 'android') { - return ['-q', '-e', '-c', commandLine, '/dev/null']; +function printTaskPlanSummary(plan, warnings = []) { + console.log(`\n任务计划: ${plan.title || '(untitled)'}`); + console.log(` engine: ${plan.engine || 'codex'}`); + console.log(` allowWrite: ${plan.allowWrite === true ? 'yes' : 'no'}`); + console.log(` dryRun: ${plan.dryRun === true ? 'yes' : 'no'}`); + console.log(` concurrency: ${plan.concurrency || 1}`); + if (plan.target) { + console.log(` target: ${truncateTaskText(plan.target, 200)}`); } - // NetBSD supports -e/-c, matching util-linux style contract. - if (platform === 'netbsd') { - return ['-q', '-e', '-c', commandLine, '/dev/null']; + const waves = Array.isArray(plan.waves) ? plan.waves : []; + console.log(` waves: ${waves.length}`); + for (const wave of waves) { + const ids = Array.isArray(wave.nodeIds) ? wave.nodeIds.join(', ') : ''; + console.log(` - ${wave.label || `Wave ${wave.index + 1}`}: ${ids}`); } - // OpenBSD supports "-c " with a trailing output file path. - if (platform === 'openbsd') { - return ['-c', commandLine, '/dev/null']; + const nodes = Array.isArray(plan.nodes) ? plan.nodes : []; + for (const node of nodes) { + console.log(` - ${node.id} [${node.kind}] ${node.title || ''}`.trim()); + if (node.workflowId) { + console.log(` workflowId: ${node.workflowId}`); + } + if (Array.isArray(node.dependsOn) && node.dependsOn.length > 0) { + console.log(` dependsOn: ${node.dependsOn.join(', ')}`); + } } - // BSD/macOS script does not support util-linux "-c " syntax. - if (platform === 'darwin' || platform === 'freebsd') { - return ['-q', '/dev/null', 'sh', '-lc', commandLine]; + if (Array.isArray(warnings) && warnings.length > 0) { + console.log(' warnings:'); + warnings.forEach((item) => console.log(` - ${item}`)); } - throw new Error(`当前平台暂不支持 --follow-up 自动排队(platform=${platform})`); + console.log(); } -// #endregion buildScriptCommandArgs -// #region runProxyCommandWithQueuedFollowUps -async function runProxyCommandWithQueuedFollowUps(selectedBin, finalArgs = [], queuedFollowUps = []) { - if (!process.stdin || !process.stdin.isTTY) { - throw new Error('当前 stdin 不是 TTY,无法使用 --follow-up 自动排队。'); +function printTaskRunSummary(detail = {}) { + const run = detail.run && typeof detail.run === 'object' ? detail.run : {}; + console.log(`\n任务执行 ${run.status === 'success' ? '完成' : '结束'}: ${detail.title || detail.taskId || ''}`.trim()); + console.log(` taskId: ${detail.taskId || ''}`); + console.log(` runId: ${detail.runId || ''}`); + console.log(` status: ${run.status || detail.status || 'unknown'}`); + console.log(` duration: ${run.durationMs || 0}ms`); + if (run.summary) { + console.log(` summary: ${run.summary}`); } - - const scriptPath = resolveCommandPath('script'); - if (!scriptPath) { - throw new Error('未找到 script 命令,无法自动注入 queued follow-up 消息。'); + if (run.error) { + console.log(` error: ${run.error}`); } + const nodes = Array.isArray(run.nodes) ? run.nodes : []; + for (const node of nodes) { + console.log(` - ${node.id}: ${node.status || 'unknown'} attempts=${node.attemptCount || 0}`); + if (node.summary) { + console.log(` ${node.summary}`); + } + if (node.error && node.error !== node.summary) { + console.log(` error: ${node.error}`); + } + } + console.log(); +} - const commandLine = [selectedBin, ...finalArgs].map((item) => shellEscapePosixArg(item)).join(' '); - const scriptArgs = buildScriptCommandArgs(commandLine); - - return new Promise((resolve, reject) => { - let settled = false; - const child = spawn(scriptPath, scriptArgs, { - stdio: ['pipe', 'pipe', 'pipe'] - }); +async function cmdTask(args = []) { + const argv = Array.isArray(args) ? args : []; + if (argv.length === 0 || argv.includes('--help') || argv.includes('-h')) { + printTaskHelp(); + return; + } + const subcommand = String(argv[0] || '').trim().toLowerCase(); + const parsed = parseTaskCliOptions(argv.slice(1)); + const options = parsed.options; + const rest = parsed.rest; - const stdin = process.stdin; - const hadRawMode = !!stdin.isRaw; - let cleanedUp = false; - let waitingDrain = false; - let followUpsFlushed = false; - let outputReadyDetected = false; - const timers = []; - const pendingWrites = []; - let onChildStdinDrain = null; - let onChildStdinError = null; - const resolveOnce = (code) => { - if (settled) return; - settled = true; - resolve(code); - }; - const rejectOnce = (error) => { - if (settled) return; - settled = true; - reject(error); + if (subcommand === 'plan') { + const payload = buildTaskCliPayload(options, rest); + const plan = coerceTaskPlanPayload(payload); + const validation = validatePreparedTaskPlan(plan); + const result = { + ok: validation.ok, + plan, + issues: validation.issues || [], + warnings: validation.warnings || [] }; - const handleWriteFailure = (error) => { - const err = error instanceof Error ? error : new Error(String(error || 'unknown')); - cleanup(); - try { - if (!child.killed) { - child.kill('SIGTERM'); - } - } catch (_) { - // Ignore failure to terminate child after stdin write failure. + if (options.json) { + console.log(JSON.stringify(result, null, 2)); + } else { + if (!validation.ok) { + throw new Error(validation.error || 'task plan validation failed'); } - rejectOnce(new Error(`写入 ${selectedBin} stdin 失败: ${err.message}`)); - }; - const flushPendingWrites = () => { - if (cleanedUp || child.stdin.destroyed) { - pendingWrites.length = 0; + printTaskPlanSummary(plan, validation.warnings || []); + } + if (!validation.ok) { + throw new Error(validation.error || 'task plan validation failed'); + } + return; + } + + if (subcommand === 'runs') { + const limit = Number.isFinite(options.limit) ? Math.max(1, Math.floor(options.limit)) : 20; + const runs = listTaskRunRecords(limit); + if (options.json) { + console.log(JSON.stringify({ runs, limit }, null, 2)); + return; + } + console.log(`\n最近任务运行(${runs.length}/${limit}):`); + for (const item of runs) { + console.log(` - [${item.status || 'unknown'}] ${item.title || item.taskId || ''} runId=${item.runId || ''} duration=${item.durationMs || 0}ms`); + if (item.summary) { + console.log(` ${item.summary}`); + } + if (item.error) { + console.log(` error: ${item.error}`); + } + } + console.log(); + return; + } + + if (subcommand === 'queue') { + const queueSubcommand = String(rest[0] || '').trim().toLowerCase(); + const tail = rest.slice(1); + if (!queueSubcommand) { + throw new Error('queue 子命令不能为空'); + } + if (queueSubcommand === 'add') { + const payload = buildTaskCliPayload(options, tail); + const result = addTaskToQueue(payload); + if (options.json) { + console.log(JSON.stringify(result, null, 2)); + } else { + if (result.error) { + throw new Error(result.error); + } + console.log(`✓ 已加入队列: ${result.task.taskId}`); + console.log(` ${result.task.title || result.task.target || ''}`); + console.log(); + } + if (result.error) { + throw new Error(result.error); + } + return; + } + if (queueSubcommand === 'list') { + const limit = Number.isFinite(options.limit) ? Math.max(1, Math.floor(options.limit)) : 20; + const tasks = listTaskQueueItems({ limit, status: options.status || '' }); + if (options.json) { + console.log(JSON.stringify({ tasks, limit }, null, 2)); + return; + } + console.log(`\n任务队列(${tasks.length}/${limit}):`); + for (const item of tasks) { + console.log(` - [${item.status}] ${item.taskId} ${item.title || item.target || ''}`.trim()); + if (item.lastSummary) { + console.log(` ${item.lastSummary}`); + } + } + console.log(); + return; + } + if (queueSubcommand === 'show') { + const taskId = options.taskId || String(tail[0] || '').trim(); + if (!taskId) { + throw new Error('taskId is required'); + } + const task = getTaskQueueItem(taskId); + if (!task) { + throw new Error(`task not found: ${taskId}`); + } + console.log(JSON.stringify(task, null, 2)); + return; + } + if (queueSubcommand === 'start') { + const taskId = options.taskId || String(tail[0] || '').trim(); + const result = await startTaskQueueProcessing({ taskId, detach: options.detach }); + if (options.json) { + console.log(JSON.stringify(result, null, 2)); + } else if (result.detached) { + console.log('✓ 队列处理已在后台启动'); + console.log(); + } else if (result.detail) { + printTaskRunSummary(result.detail); + } else { + console.log('队列中暂无可执行任务'); + console.log(); + } + return; + } + throw new Error(`未知 queue 子命令: ${queueSubcommand}`); + } + + if (subcommand === 'run') { + const payload = buildTaskCliPayload(options, rest); + if (options.planOnly) { + const plan = coerceTaskPlanPayload(payload); + const validation = validatePreparedTaskPlan(plan); + const result = { + ok: validation.ok, + plan, + issues: validation.issues || [], + warnings: validation.warnings || [] + }; + if (options.json) { + console.log(JSON.stringify(result, null, 2)); + } else { + if (!validation.ok) { + throw new Error(validation.error || 'task plan validation failed'); + } + printTaskPlanSummary(plan, validation.warnings || []); + } + if (!validation.ok) { + throw new Error(validation.error || 'task plan validation failed'); + } + return; + } + if (options.detach) { + const plan = coerceTaskPlanPayload(payload); + const validation = validatePreparedTaskPlan(plan); + if (!validation.ok) { + throw new Error(validation.error || 'task plan validation failed'); + } + const taskId = options.taskId || createTaskId(); + const runId = createTaskRunId(); + runTaskPlanInternal(plan, { taskId, runId }).catch(() => {}); + const result = { ok: true, detached: true, taskId, runId, warnings: validation.warnings || [] }; + if (options.json) { + console.log(JSON.stringify(result, null, 2)); + } else { + console.log(`✓ 后台任务已启动: taskId=${taskId} runId=${runId}`); + console.log(); + } + return; + } + const detail = await runTaskNow(payload); + if (options.json) { + console.log(JSON.stringify(detail, null, 2)); + } else { + printTaskRunSummary(detail); + } + if (detail.error || (detail.run && detail.run.status && detail.run.status !== 'success')) { + throw new Error(detail.error || (detail.run && detail.run.error) || 'task run failed'); + } + return; + } + + if (subcommand === 'retry') { + const runId = options.runId || String(rest[0] || '').trim(); + const result = await retryTaskRun({ runId, detach: options.detach }); + if (options.json) { + console.log(JSON.stringify(result, null, 2)); + } else if (result.detached) { + console.log(`✓ 已后台重试: runId=${result.runId}`); + console.log(); + } else { + printTaskRunSummary(result); + } + if (result.error) { + throw new Error(result.error); + } + return; + } + + if (subcommand === 'cancel') { + const result = cancelTaskRunOrQueue({ + target: String(rest[0] || '').trim(), + taskId: options.taskId, + runId: options.runId + }); + if (options.json) { + console.log(JSON.stringify(result, null, 2)); + } else { + if (result.error) { + throw new Error(result.error); + } + console.log('✓ 已发出取消请求'); + console.log(); + } + if (result.error) { + throw new Error(result.error); + } + return; + } + + if (subcommand === 'logs') { + const runId = options.runId || String(rest[0] || '').trim(); + const result = getTaskLogs({ runId }); + if (result.error) { + throw new Error(result.error); + } + if (options.json) { + console.log(JSON.stringify(result, null, 2)); + } else { + console.log(result.logs || '(no logs)'); + console.log(); + } + return; + } + + throw new Error(`未知 task 子命令: ${subcommand}`); +} + +// #region parseCodexProxyOptions +function parseCodexProxyOptions(args = []) { + const options = { + passthroughArgs: [], + queuedFollowUps: [] + }; + const argv = Array.isArray(args) ? args : []; + + const pushFollowUp = (value, optionName) => { + const raw = value === undefined || value === null ? '' : String(value); + if (!raw.trim()) { + throw new Error(`${optionName} 需要提供非空内容`); + } + options.queuedFollowUps.push(raw); + }; + + for (let i = 0; i < argv.length; i++) { + const arg = argv[i]; + if (arg === undefined || arg === null) { + continue; + } + const text = String(arg); + if (text === '--') { + options.passthroughArgs.push(...argv.slice(i).map((item) => String(item))); + break; + } + if (text === '--queued-follow-up' || text === '--follow-up') { + const next = argv[i + 1]; + if (next === undefined) { + throw new Error(`${text} 需要提供内容`); + } + pushFollowUp(next, text); + i += 1; + continue; + } + if (text.startsWith('--queued-follow-up=')) { + pushFollowUp(text.slice('--queued-follow-up='.length), '--queued-follow-up'); + continue; + } + if (text.startsWith('--follow-up=')) { + pushFollowUp(text.slice('--follow-up='.length), '--follow-up'); + continue; + } + options.passthroughArgs.push(text); + } + + return options; +} +// #endregion parseCodexProxyOptions + +function shellEscapePosixArg(value) { + const text = value === undefined || value === null ? '' : String(value); + return `'${text.replace(/'/g, `'\"'\"'`)}'`; +} + +// #region buildScriptCommandArgs +function buildScriptCommandArgs(commandLine) { + const platform = process.platform; + // util-linux script needs -e/--return to propagate child exit code. + if (platform === 'linux' || platform === 'android') { + return ['-q', '-e', '-c', commandLine, '/dev/null']; + } + // NetBSD supports -e/-c, matching util-linux style contract. + if (platform === 'netbsd') { + return ['-q', '-e', '-c', commandLine, '/dev/null']; + } + // OpenBSD supports "-c " with a trailing output file path. + if (platform === 'openbsd') { + return ['-c', commandLine, '/dev/null']; + } + // BSD/macOS script does not support util-linux "-c " syntax. + if (platform === 'darwin' || platform === 'freebsd') { + return ['-q', '/dev/null', 'sh', '-lc', commandLine]; + } + throw new Error(`当前平台暂不支持 --follow-up 自动排队(platform=${platform})`); +} +// #endregion buildScriptCommandArgs + +// #region runProxyCommandWithQueuedFollowUps +async function runProxyCommandWithQueuedFollowUps(selectedBin, finalArgs = [], queuedFollowUps = []) { + if (!process.stdin || !process.stdin.isTTY) { + throw new Error('当前 stdin 不是 TTY,无法使用 --follow-up 自动排队。'); + } + + const scriptPath = resolveCommandPath('script'); + if (!scriptPath) { + throw new Error('未找到 script 命令,无法自动注入 queued follow-up 消息。'); + } + + const commandLine = [selectedBin, ...finalArgs].map((item) => shellEscapePosixArg(item)).join(' '); + const scriptArgs = buildScriptCommandArgs(commandLine); + + return new Promise((resolve, reject) => { + let settled = false; + const child = spawn(scriptPath, scriptArgs, { + stdio: ['pipe', 'pipe', 'pipe'] + }); + + const stdin = process.stdin; + const hadRawMode = !!stdin.isRaw; + let cleanedUp = false; + let waitingDrain = false; + let followUpsFlushed = false; + let outputReadyDetected = false; + const timers = []; + const pendingWrites = []; + let onChildStdinDrain = null; + let onChildStdinError = null; + const resolveOnce = (code) => { + if (settled) return; + settled = true; + resolve(code); + }; + const rejectOnce = (error) => { + if (settled) return; + settled = true; + reject(error); + }; + const handleWriteFailure = (error) => { + const err = error instanceof Error ? error : new Error(String(error || 'unknown')); + cleanup(); + try { + if (!child.killed) { + child.kill('SIGTERM'); + } + } catch (_) { + // Ignore failure to terminate child after stdin write failure. + } + rejectOnce(new Error(`写入 ${selectedBin} stdin 失败: ${err.message}`)); + }; + const flushPendingWrites = () => { + if (cleanedUp || child.stdin.destroyed) { + pendingWrites.length = 0; return; } while (pendingWrites.length > 0) { @@ -11199,6 +11850,841 @@ async function runWorkflowById(workflowId, input = {}, options = {}) { }; } +function createTaskId() { + return `task-${Date.now()}-${crypto.randomBytes(3).toString('hex')}`; +} + +function createTaskRunId() { + return `tr-${Date.now()}-${crypto.randomBytes(3).toString('hex')}`; +} + +function normalizeTaskEngine(value) { + const normalized = typeof value === 'string' ? value.trim().toLowerCase() : ''; + return normalized === 'workflow' ? 'workflow' : 'codex'; +} + +function normalizeTaskFollowUps(input = []) { + const seen = new Set(); + const result = []; + for (const item of Array.isArray(input) ? input : []) { + const text = typeof item === 'string' ? item.trim() : ''; + if (!text || seen.has(text)) continue; + seen.add(text); + result.push(text); + } + return result; +} + +function buildTaskWorkflowCatalog() { + const listed = listWorkflowDefinitions(); + return { + workflows: Array.isArray(listed.workflows) + ? listed.workflows.map((item) => ({ + id: item.id, + name: item.name, + description: item.description, + readOnly: item.readOnly !== false, + stepCount: item.stepCount || 0 + })) + : [], + warnings: Array.isArray(listed.warnings) ? listed.warnings : [] + }; +} + +function normalizeTaskPlanRequest(params = {}) { + const source = params && typeof params === 'object' ? params : {}; + const rawWorkflowIds = Array.isArray(source.workflowIds) + ? source.workflowIds + : (typeof source.workflowId === 'string' && source.workflowId.trim() ? [source.workflowId.trim()] : []); + const rawFollowUps = Array.isArray(source.followUps) + ? source.followUps + : (typeof source.followUp === 'string' && source.followUp.trim() ? [source.followUp.trim()] : []); + return { + id: typeof source.id === 'string' ? source.id.trim() : '', + title: typeof source.title === 'string' ? source.title.trim() : '', + target: typeof source.target === 'string' ? source.target.trim() : '', + notes: typeof source.notes === 'string' ? source.notes.trim() : '', + cwd: typeof source.cwd === 'string' ? source.cwd.trim() : process.cwd(), + engine: normalizeTaskEngine(source.engine), + allowWrite: source.allowWrite === true, + dryRun: source.dryRun === true, + concurrency: Number.isFinite(source.concurrency) ? source.concurrency : parseInt(source.concurrency, 10), + autoFixRounds: Number.isFinite(source.autoFixRounds) ? source.autoFixRounds : parseInt(source.autoFixRounds, 10), + workflowIds: rawWorkflowIds, + followUps: normalizeTaskFollowUps(rawFollowUps) + }; +} + +function coerceTaskPlanPayload(params = {}) { + if (params && params.plan && typeof params.plan === 'object' && !Array.isArray(params.plan)) { + return cloneJson(params.plan, {}); + } + const request = normalizeTaskPlanRequest(params || {}); + const catalog = buildTaskWorkflowCatalog(); + const plan = buildTaskPlan(request, { + workflowCatalog: catalog.workflows, + cwd: request.cwd || process.cwd() + }); + return { + ...plan, + engine: normalizeTaskEngine(request.engine || plan.engine) + }; +} + +function validatePreparedTaskPlan(plan) { + const catalog = buildTaskWorkflowCatalog(); + const validation = validateTaskPlan(plan, { + workflowCatalog: catalog.workflows + }); + return { + ...validation, + warnings: catalog.warnings || [] + }; +} + +function normalizeTaskQueueItem(raw = {}) { + const plan = raw.plan && typeof raw.plan === 'object' && !Array.isArray(raw.plan) + ? cloneJson(raw.plan, {}) + : {}; + const taskId = typeof raw.taskId === 'string' ? raw.taskId.trim() : ''; + return { + taskId: taskId || createTaskId(), + title: typeof raw.title === 'string' ? raw.title.trim() : (typeof plan.title === 'string' ? plan.title.trim() : ''), + target: typeof raw.target === 'string' ? raw.target.trim() : (typeof plan.target === 'string' ? plan.target.trim() : ''), + status: typeof raw.status === 'string' ? raw.status.trim().toLowerCase() : 'queued', + createdAt: toIsoTime(raw.createdAt || Date.now(), ''), + updatedAt: toIsoTime(raw.updatedAt || raw.createdAt || Date.now(), ''), + engine: normalizeTaskEngine(raw.engine || plan.engine), + allowWrite: raw.allowWrite === true || plan.allowWrite === true, + dryRun: raw.dryRun === true || plan.dryRun === true, + concurrency: Number.isFinite(raw.concurrency) ? raw.concurrency : (Number.isFinite(plan.concurrency) ? plan.concurrency : 2), + autoFixRounds: Number.isFinite(raw.autoFixRounds) ? raw.autoFixRounds : (Number.isFinite(plan.autoFixRounds) ? plan.autoFixRounds : 1), + lastRunId: typeof raw.lastRunId === 'string' ? raw.lastRunId.trim() : '', + lastSummary: typeof raw.lastSummary === 'string' ? raw.lastSummary.trim() : '', + plan, + runStatus: typeof raw.runStatus === 'string' ? raw.runStatus.trim().toLowerCase() : '' + }; +} + +function readTaskQueueState() { + const parsed = readJsonObjectFromFile(TASK_QUEUE_FILE, {}); + if (!parsed.ok || !parsed.exists) { + return { + tasks: [] + }; + } + const source = parsed.data && typeof parsed.data === 'object' ? parsed.data : {}; + const tasks = Array.isArray(source.tasks) ? source.tasks.map((item) => normalizeTaskQueueItem(item)) : []; + return { tasks }; +} + +function writeTaskQueueState(state = {}) { + ensureDir(path.dirname(TASK_QUEUE_FILE)); + writeJsonAtomic(TASK_QUEUE_FILE, { + tasks: Array.isArray(state.tasks) ? state.tasks.map((item) => normalizeTaskQueueItem(item)) : [] + }); +} + +function upsertTaskQueueItem(item) { + const state = readTaskQueueState(); + const next = normalizeTaskQueueItem(item || {}); + const index = state.tasks.findIndex((entry) => entry.taskId === next.taskId); + if (index >= 0) { + state.tasks[index] = next; + } else { + state.tasks.push(next); + } + writeTaskQueueState(state); + return next; +} + +function getTaskQueueItem(taskId) { + const id = typeof taskId === 'string' ? taskId.trim() : ''; + if (!id) return null; + return readTaskQueueState().tasks.find((item) => item.taskId === id) || null; +} + +function listTaskQueueItems(options = {}) { + const state = readTaskQueueState(); + const limit = Number.isFinite(options.limit) ? Math.max(1, Math.floor(options.limit)) : 50; + const statusFilter = typeof options.status === 'string' ? options.status.trim().toLowerCase() : ''; + const statusRank = { + running: 0, + queued: 1, + failed: 2, + completed: 3, + cancelled: 4 + }; + return state.tasks + .filter((item) => !statusFilter || item.status === statusFilter) + .sort((a, b) => { + const rankDiff = (statusRank[a.status] ?? 99) - (statusRank[b.status] ?? 99); + if (rankDiff !== 0) return rankDiff; + return String(b.updatedAt || '').localeCompare(String(a.updatedAt || '')); + }) + .slice(0, limit); +} + +function appendTaskRunRecord(record) { + ensureDir(path.dirname(TASK_RUNS_FILE)); + fs.appendFileSync(TASK_RUNS_FILE, `${JSON.stringify(record)}\n`, { encoding: 'utf-8', mode: 0o600 }); +} + +function listTaskRunRecords(limit = 20) { + const max = Number.isFinite(limit) ? Math.max(1, Math.floor(limit)) : 20; + if (!fs.existsSync(TASK_RUNS_FILE)) { + return []; + } + let content = ''; + try { + content = fs.readFileSync(TASK_RUNS_FILE, 'utf-8'); + } catch (_) { + return []; + } + const rows = content + .split(/\r?\n/g) + .map((line) => line.trim()) + .filter(Boolean); + const parsed = []; + for (let i = rows.length - 1; i >= 0; i -= 1) { + try { + parsed.push(JSON.parse(rows[i])); + if (parsed.length >= max) { + break; + } + } catch (_) {} + } + return parsed; +} + +function getTaskRunDetailPath(runId) { + const safeId = typeof runId === 'string' ? runId.trim() : ''; + if (!safeId) { + return ''; + } + return path.join(TASK_RUN_DETAILS_DIR, `${safeId}.json`); +} + +function writeTaskRunDetail(detail = {}) { + const detailPath = getTaskRunDetailPath(detail.runId); + if (!detailPath) return; + ensureDir(path.dirname(detailPath)); + writeJsonAtomic(detailPath, detail); +} + +function readTaskRunDetail(runId) { + const detailPath = getTaskRunDetailPath(runId); + if (!detailPath) { + return null; + } + const parsed = readJsonObjectFromFile(detailPath, {}); + if (!parsed.ok || !parsed.exists) { + return null; + } + return parsed.data && typeof parsed.data === 'object' ? parsed.data : null; +} + +function collectTaskRunSummary(detail = {}) { + const run = detail.run && typeof detail.run === 'object' ? detail.run : {}; + const nodes = Array.isArray(run.nodes) ? run.nodes : []; + return { + runId: detail.runId || '', + taskId: detail.taskId || '', + title: detail.title || '', + target: detail.target || '', + engine: detail.engine || '', + allowWrite: detail.allowWrite === true, + dryRun: detail.dryRun === true, + concurrency: detail.concurrency || 0, + status: run.status || detail.status || '', + startedAt: run.startedAt || detail.startedAt || '', + endedAt: run.endedAt || detail.endedAt || '', + durationMs: run.durationMs || 0, + summary: run.summary || detail.summary || '', + error: run.error || detail.error || '', + nodeCount: nodes.length, + successCount: nodes.filter((node) => node.status === 'success').length, + failedCount: nodes.filter((node) => node.status === 'failed').length, + blockedCount: nodes.filter((node) => node.status === 'blocked').length, + cancelledCount: nodes.filter((node) => node.status === 'cancelled').length + }; +} + +function buildTaskOverviewPayload(options = {}) { + const queueLimit = Number.isFinite(options.queueLimit) ? Math.max(1, Math.floor(options.queueLimit)) : 20; + const runLimit = Number.isFinite(options.runLimit) ? Math.max(1, Math.floor(options.runLimit)) : 20; + const workflowCatalog = buildTaskWorkflowCatalog(); + const queue = listTaskQueueItems({ limit: queueLimit }); + const runs = listTaskRunRecords(runLimit); + return { + workflows: workflowCatalog.workflows, + warnings: workflowCatalog.warnings, + queue, + runs, + activeRunIds: Array.from(g_taskRunControllers.keys()) + }; +} + +function summarizeTaskLogs(logs = [], limit = 80) { + return (Array.isArray(logs) ? logs : []) + .slice(0, limit) + .map((item) => { + if (!item || typeof item !== 'object') { + return String(item || ''); + } + const at = item.at ? `[${item.at}] ` : ''; + const level = item.level ? `${String(item.level).toUpperCase()} ` : ''; + const message = item.message ? String(item.message) : ''; + return `${at}${level}${message}`.trim(); + }) + .filter(Boolean) + .join('\n'); +} + +function findCodexSessionId(value, depth = 0) { + if (depth > 6 || value === null || value === undefined) { + return ''; + } + if (Array.isArray(value)) { + for (const item of value) { + const found = findCodexSessionId(item, depth + 1); + if (found) return found; + } + return ''; + } + if (typeof value !== 'object') { + return ''; + } + const candidateKeys = ['session_id', 'sessionId', 'conversation_id', 'conversationId', 'thread_id', 'threadId']; + for (const key of candidateKeys) { + const candidate = value[key]; + if (typeof candidate === 'string' && candidate.trim()) { + return candidate.trim(); + } + } + for (const item of Object.values(value)) { + const found = findCodexSessionId(item, depth + 1); + if (found) return found; + } + return ''; +} + +function readCodexLastMessageFile(filePath) { + if (!filePath) return ''; + try { + return fs.readFileSync(filePath, 'utf-8').trim(); + } catch (_) { + return ''; + } +} + +async function runCodexExecTaskNode(node, context = {}) { + const codexPath = resolveCommandPath('codex') || 'codex'; + if (!commandExists(codexPath, '--version')) { + return { + success: false, + error: '未找到 codex CLI,请先安装并确保 PATH 可用', + summary: 'codex CLI 不可用', + output: null, + logs: [{ at: toIsoTime(Date.now()), level: 'error', message: 'codex CLI 不可用' }] + }; + } + const allowWrite = context.allowWrite === true && node.write === true; + const cwd = typeof context.cwd === 'string' && context.cwd.trim() ? context.cwd.trim() : process.cwd(); + const dependencyResults = Array.isArray(context.dependencyResults) ? context.dependencyResults : []; + const dependencyLines = dependencyResults + .map((item) => { + const summary = item && (item.summary || item.error) ? String(item.summary || item.error) : ''; + return summary ? `- ${item.id}: ${summary}` : ''; + }) + .filter(Boolean); + const previousAttempts = Array.isArray(context.previousAttempts) ? context.previousAttempts : []; + const lastAttempt = previousAttempts.length > 0 ? previousAttempts[previousAttempts.length - 1] : null; + const attempt = Number.isFinite(context.attempt) ? context.attempt : 1; + const promptParts = [String(node.prompt || '').trim()]; + if (dependencyLines.length > 0) { + promptParts.push(`前置节点摘要:\n${dependencyLines.join('\n')}`); + } + if (attempt > 1 && lastAttempt) { + promptParts.push(`上一轮失败摘要:\n${String(lastAttempt.error || lastAttempt.summary || '').trim()}`); + promptParts.push('请在保持目标不变的前提下修复上一轮失败并继续完成当前节点。'); + } + const finalPrompt = promptParts.filter(Boolean).join('\n\n'); + const tempRoot = path.join(TASK_RUN_DETAILS_DIR, 'tmp'); + ensureDir(tempRoot); + const tempDir = fs.mkdtempSync(path.join(tempRoot, 'codex-')); + const outputFile = path.join(tempDir, 'last-message.txt'); + const args = [ + '-a', 'never', + '-s', allowWrite ? 'workspace-write' : 'read-only', + '-C', cwd, + 'exec', + '--json', + '--skip-git-repo-check', + '--output-last-message', outputFile, + finalPrompt + ]; + const stdoutLines = []; + const stderrLines = []; + const parsedEvents = []; + let sessionId = ''; + const captureLines = (bucket, text) => { + const lines = String(text || '').split(/\r?\n/g).map((line) => line.trim()).filter(Boolean); + for (const line of lines) { + if (bucket.length < 120) { + bucket.push(truncateTaskText(line, 1200)); + } + try { + const payload = JSON.parse(line); + if (parsedEvents.length < 120) { + parsedEvents.push(payload); + } + if (!sessionId) { + sessionId = findCodexSessionId(payload); + } + } catch (_) {} + } + }; + const exit = await new Promise((resolve) => { + const child = spawn(codexPath, args, { + stdio: ['ignore', 'pipe', 'pipe'], + windowsHide: true + }); + if (typeof context.registerAbort === 'function') { + context.registerAbort(() => { + try { + child.kill('SIGTERM'); + } catch (_) {} + }); + } + child.stdout.on('data', (chunk) => { + captureLines(stdoutLines, chunk); + }); + child.stderr.on('data', (chunk) => { + captureLines(stderrLines, chunk); + }); + child.on('error', (error) => { + resolve({ code: 1, signal: '', error: error && error.message ? error.message : String(error || 'spawn failed') }); + }); + child.on('close', (code, signal) => { + resolve({ code: typeof code === 'number' ? code : 1, signal: signal || '', error: '' }); + }); + }); + const lastMessage = readCodexLastMessageFile(outputFile); + try { + if (fs.rmSync) { + fs.rmSync(tempDir, { recursive: true, force: true }); + } else { + fs.rmdirSync(tempDir, { recursive: true }); + } + } catch (_) {} + const success = exit.code === 0; + const errorMessage = success + ? '' + : (exit.error || stderrLines[stderrLines.length - 1] || stdoutLines[stdoutLines.length - 1] || `codex exec exited with code ${exit.code}`); + const summary = truncateTaskText(lastMessage || (success ? 'Codex 执行完成' : errorMessage), 400); + return { + success, + error: errorMessage, + summary, + output: { + exitCode: exit.code, + signal: exit.signal || '', + sessionId, + lastMessage, + events: parsedEvents, + stdoutPreview: stdoutLines, + stderrPreview: stderrLines + }, + logs: [ + ...stdoutLines.map((line) => ({ at: toIsoTime(Date.now()), level: 'info', message: line })), + ...stderrLines.map((line) => ({ at: toIsoTime(Date.now()), level: 'warn', message: line })) + ] + }; +} + +async function executeTaskNodeAdapter(node, context = {}) { + if (node.kind === 'workflow') { + const input = { + ...(node.input && typeof node.input === 'object' && !Array.isArray(node.input) ? cloneJson(node.input, {}) : {}), + task: { + title: context.plan && context.plan.title ? context.plan.title : '', + target: context.plan && context.plan.target ? context.plan.target : '', + dependencyResults: cloneJson(context.dependencyResults || [], []) + } + }; + const result = await runWorkflowById(node.workflowId, input, { + allowWrite: context.allowWrite === true, + dryRun: context.dryRun === true + }); + return { + success: result && result.success === true, + error: result && result.error ? result.error : '', + summary: truncateTaskText( + result && result.error + ? result.error + : `${result && result.workflowName ? result.workflowName : node.workflowId} ${result && result.success === true ? '完成' : '失败'}`, + 400 + ), + output: cloneJson(result, null), + logs: Array.isArray(result && result.steps) + ? result.steps.map((step) => ({ + at: step.startedAt || toIsoTime(Date.now()), + level: step.status === 'failed' ? 'error' : (step.status === 'skipped' ? 'warn' : 'info'), + message: `${step.id || step.tool || 'step'}: ${step.status || 'unknown'}${step.error ? ` (${step.error})` : ''}` + })) + : [] + }; + } + return runCodexExecTaskNode(node, context); +} + +async function runTaskPlanInternal(plan, options = {}) { + const validation = validatePreparedTaskPlan(plan); + if (!validation.ok) { + return { + error: validation.error || 'task plan validation failed', + issues: validation.issues || [], + warnings: validation.warnings || [] + }; + } + const taskId = typeof options.taskId === 'string' && options.taskId.trim() ? options.taskId.trim() : (plan.id || createTaskId()); + const runId = typeof options.runId === 'string' && options.runId.trim() ? options.runId.trim() : createTaskRunId(); + const controller = new AbortController(); + const baseDetail = { + runId, + taskId, + title: plan.title || '', + target: plan.target || '', + engine: normalizeTaskEngine(plan.engine), + allowWrite: plan.allowWrite === true, + dryRun: plan.dryRun === true, + concurrency: Number.isFinite(plan.concurrency) ? plan.concurrency : 2, + createdAt: toIsoTime(Date.now()), + updatedAt: toIsoTime(Date.now()), + warnings: validation.warnings || [], + plan: cloneJson(plan, {}) + }; + writeTaskRunDetail({ + ...baseDetail, + status: 'running', + run: { + status: 'running', + startedAt: toIsoTime(Date.now()), + endedAt: '', + durationMs: 0, + nodes: [], + logs: [] + } + }); + g_taskRunControllers.set(runId, { + runId, + taskId, + controller, + abort() { + try { + controller.abort(); + } catch (_) {} + } + }); + if (options.queueItem) { + upsertTaskQueueItem({ + ...options.queueItem, + taskId, + status: 'running', + runStatus: 'running', + lastRunId: runId, + lastSummary: '', + updatedAt: toIsoTime(Date.now()), + plan + }); + } + try { + const run = await executeTaskPlan(plan, { + concurrency: plan.concurrency, + signal: controller.signal, + executeNode: async (node, nodeContext) => executeTaskNodeAdapter(node, { + ...nodeContext, + plan, + taskId, + runId, + allowWrite: plan.allowWrite === true, + dryRun: plan.dryRun === true, + cwd: plan.cwd || process.cwd() + }), + onUpdate: async (snapshot) => { + const nextDetail = { + ...baseDetail, + updatedAt: toIsoTime(Date.now()), + status: snapshot.status || 'running', + run: snapshot + }; + writeTaskRunDetail(nextDetail); + if (options.queueItem) { + upsertTaskQueueItem({ + ...options.queueItem, + taskId, + status: snapshot.status === 'success' + ? 'completed' + : (snapshot.status === 'failed' ? 'failed' : (snapshot.status === 'cancelled' ? 'cancelled' : 'running')), + runStatus: snapshot.status || 'running', + lastRunId: runId, + lastSummary: snapshot.summary || '', + updatedAt: toIsoTime(Date.now()), + plan + }); + } + } + }); + const detail = { + ...baseDetail, + updatedAt: toIsoTime(Date.now()), + status: run.status || 'failed', + run + }; + writeTaskRunDetail(detail); + appendTaskRunRecord(collectTaskRunSummary(detail)); + if (options.queueItem) { + upsertTaskQueueItem({ + ...options.queueItem, + taskId, + status: run.status === 'success' + ? 'completed' + : (run.status === 'cancelled' ? 'cancelled' : 'failed'), + runStatus: run.status || '', + lastRunId: runId, + lastSummary: run.summary || run.error || '', + updatedAt: toIsoTime(Date.now()), + plan + }); + } + return detail; + } finally { + g_taskRunControllers.delete(runId); + } +} + +function addTaskToQueue(params = {}) { + const plan = coerceTaskPlanPayload(params || {}); + const validation = validatePreparedTaskPlan(plan); + if (!validation.ok) { + return { + error: validation.error || 'task plan validation failed', + issues: validation.issues || [], + warnings: validation.warnings || [] + }; + } + const taskId = typeof params.taskId === 'string' && params.taskId.trim() ? params.taskId.trim() : createTaskId(); + const item = upsertTaskQueueItem({ + taskId, + title: plan.title, + target: plan.target, + status: 'queued', + createdAt: toIsoTime(Date.now()), + updatedAt: toIsoTime(Date.now()), + engine: plan.engine, + allowWrite: plan.allowWrite === true, + dryRun: plan.dryRun === true, + concurrency: plan.concurrency || 2, + autoFixRounds: plan.autoFixRounds || 1, + lastRunId: '', + lastSummary: '', + runStatus: '', + plan + }); + return { + ok: true, + task: item, + warnings: validation.warnings || [] + }; +} + +async function runTaskNow(params = {}) { + const plan = coerceTaskPlanPayload(params || {}); + const detail = await runTaskPlanInternal(plan, { + taskId: typeof params.taskId === 'string' && params.taskId.trim() ? params.taskId.trim() : createTaskId(), + runId: typeof params.runId === 'string' && params.runId.trim() ? params.runId.trim() : createTaskRunId() + }); + return detail; +} + +async function startTaskQueueProcessing(options = {}) { + const taskId = typeof options.taskId === 'string' ? options.taskId.trim() : ''; + const detach = options.detach === true; + const runner = async () => { + let latestDetail = null; + while (true) { + const queue = listTaskQueueItems({ limit: 200, status: 'queued' }); + const nextItem = taskId + ? queue.find((item) => item.taskId === taskId) + : queue[0]; + if (!nextItem) { + break; + } + latestDetail = await runTaskPlanInternal(nextItem.plan, { + taskId: nextItem.taskId, + runId: createTaskRunId(), + queueItem: nextItem + }); + if (taskId) { + break; + } + } + return latestDetail; + }; + if (detach) { + if (g_taskQueueProcessor) { + return { + ok: true, + started: false, + alreadyRunning: true + }; + } + g_taskQueueProcessor = runner() + .catch(() => null) + .finally(() => { + g_taskQueueProcessor = null; + }); + return { + ok: true, + started: true, + detached: true + }; + } + const detail = await runner(); + return { + ok: true, + started: true, + detached: false, + detail + }; +} + +async function retryTaskRun(params = {}) { + const runId = typeof params.runId === 'string' ? params.runId.trim() : ''; + if (!runId) { + return { error: 'runId is required' }; + } + const detail = readTaskRunDetail(runId); + if (!detail || !detail.plan) { + return { error: `task run not found: ${runId}` }; + } + const plan = cloneJson(detail.plan, {}); + const detach = params.detach === true; + const nextRunId = createTaskRunId(); + if (detach) { + runTaskPlanInternal(plan, { + taskId: detail.taskId || createTaskId(), + runId: nextRunId + }).catch(() => {}); + return { + ok: true, + started: true, + detached: true, + runId: nextRunId, + taskId: detail.taskId || '' + }; + } + return runTaskPlanInternal(plan, { + taskId: detail.taskId || createTaskId(), + runId: nextRunId + }); +} + +function cancelTaskRunOrQueue(params = {}) { + const rawTarget = typeof params.target === 'string' ? params.target.trim() : ''; + const runId = typeof params.runId === 'string' ? params.runId.trim() : ''; + const taskId = typeof params.taskId === 'string' ? params.taskId.trim() : ''; + const target = rawTarget || runId || taskId; + if (!target) { + return { error: 'taskId or runId is required' }; + } + const controllerByRun = g_taskRunControllers.get(target); + if (controllerByRun) { + controllerByRun.abort(); + return { + ok: true, + cancelled: true, + runId: controllerByRun.runId, + taskId: controllerByRun.taskId, + mode: 'running' + }; + } + const queueItem = getTaskQueueItem(target); + if (queueItem) { + if (queueItem.status === 'queued') { + const next = upsertTaskQueueItem({ + ...queueItem, + status: 'cancelled', + runStatus: 'cancelled', + updatedAt: toIsoTime(Date.now()), + lastSummary: queueItem.lastSummary || '已取消' + }); + return { + ok: true, + cancelled: true, + task: next, + mode: 'queued' + }; + } + if (queueItem.lastRunId && g_taskRunControllers.has(queueItem.lastRunId)) { + const active = g_taskRunControllers.get(queueItem.lastRunId); + active.abort(); + return { + ok: true, + cancelled: true, + runId: active.runId, + taskId: active.taskId, + mode: 'running' + }; + } + return { + error: `task cannot be cancelled in current status: ${queueItem.status}` + }; + } + const detail = readTaskRunDetail(target); + if (detail && g_taskRunControllers.has(detail.runId)) { + const active = g_taskRunControllers.get(detail.runId); + active.abort(); + return { + ok: true, + cancelled: true, + runId: active.runId, + taskId: active.taskId, + mode: 'running' + }; + } + return { error: `task/run not found: ${target}` }; +} + +function getTaskLogs(params = {}) { + const runId = typeof params.runId === 'string' ? params.runId.trim() : ''; + if (!runId) { + return { error: 'runId is required' }; + } + const detail = readTaskRunDetail(runId); + if (!detail) { + return { error: `task run not found: ${runId}` }; + } + const run = detail.run && typeof detail.run === 'object' ? detail.run : {}; + const lines = []; + for (const node of Array.isArray(run.nodes) ? run.nodes : []) { + lines.push(`# ${node.id}${node.title ? ` ${node.title}` : ''}`); + const body = summarizeTaskLogs(node.logs || [], 120); + if (body) { + lines.push(body); + } else { + lines.push('(no logs)'); + } + lines.push(''); + } + return { + runId, + logs: lines.join('\n').trim(), + detail + }; +} + function createMcpTools(options = {}) { const allowWrite = !!options.allowWrite; const tools = []; @@ -12034,6 +13520,7 @@ async function main() { console.log(' codexmate add-model <模型> 添加模型'); console.log(' codexmate delete-model <模型> 删除模型'); console.log(' codexmate workflow MCP 工作流中心'); + console.log(' codexmate task 本地任务编排'); console.log(' codexmate run [--host ] [--no-browser] 启动 Web 界面'); console.log(' codexmate codex [参数...] [--follow-up <文本>|--queued-follow-up <文本> 可重复] 等同于 codex --yolo'); console.log(' 注: follow-up 自动排队仅支持 linux/android/netbsd/openbsd/darwin/freebsd 且 stdin 必须是 TTY,其他平台会报错'); @@ -12062,6 +13549,7 @@ async function main() { case 'auth': cmdAuth(args.slice(1)); break; case 'proxy': await cmdProxy(args.slice(1)); break; case 'workflow': await cmdWorkflow(args.slice(1)); break; + case 'task': await cmdTask(args.slice(1)); break; case 'run': cmdStart(parseStartOptions(args.slice(1))); break; case 'start': console.error('错误: 命令已更名为 "run",请使用: codexmate run'); diff --git a/lib/task-orchestrator.js b/lib/task-orchestrator.js new file mode 100644 index 00000000..98f09c9b --- /dev/null +++ b/lib/task-orchestrator.js @@ -0,0 +1,839 @@ +function isPlainObject(value) { + return !!value && typeof value === 'object' && !Array.isArray(value); +} + +function cloneJson(value, fallback = null) { + try { + return JSON.parse(JSON.stringify(value)); + } catch (_) { + return fallback; + } +} + +function normalizeText(value, maxLength = 4000) { + const text = value === undefined || value === null ? '' : String(value).trim(); + if (!text) { + return ''; + } + return text.length > maxLength ? text.slice(0, maxLength) : text; +} + +function normalizeId(value, fallback = '') { + const text = String(value || '').trim().toLowerCase(); + const normalized = text + .replace(/[^a-z0-9._-]+/g, '-') + .replace(/-{2,}/g, '-') + .replace(/^-+|-+$/g, ''); + return normalized || fallback; +} + +function normalizePositiveInteger(value, fallback, min = 1, max = 8) { + const numeric = Number.parseInt(String(value), 10); + if (!Number.isFinite(numeric)) { + return fallback; + } + return Math.min(max, Math.max(min, Math.floor(numeric))); +} + +function truncateText(value, maxLength = 240) { + const text = String(value || '').trim(); + if (!text) { + return ''; + } + if (text.length <= maxLength) { + return text; + } + return `${text.slice(0, Math.max(0, maxLength - 1))}…`; +} + +function uniqueStringList(input = []) { + const result = []; + const seen = new Set(); + for (const item of Array.isArray(input) ? input : []) { + const text = normalizeText(item, 400); + if (!text || seen.has(text)) { + continue; + } + seen.add(text); + result.push(text); + } + return result; +} + +function splitTargetIntoItems(target) { + const source = normalizeText(target, 12000); + if (!source) { + return []; + } + const lines = source + .split(/\r?\n/g) + .map((line) => line.trim()) + .filter(Boolean) + .map((line) => line.replace(/^[-*+•]\s+/, '').replace(/^\d+[.)]\s+/, '').trim()) + .filter(Boolean); + if (lines.length >= 2) { + return uniqueStringList(lines).slice(0, 6); + } + + const segments = source + .split(/[;;。\n]+/g) + .map((part) => part.trim()) + .filter(Boolean) + .map((part) => part.replace(/^[-*+•]\s+/, '').replace(/^\d+[.)]\s+/, '').trim()) + .filter(Boolean); + if (segments.length >= 2) { + return uniqueStringList(segments).slice(0, 6); + } + + return [truncateText(source, 220)]; +} + +function buildTaskTitle(target, explicitTitle = '') { + const custom = normalizeText(explicitTitle, 160); + if (custom) { + return custom; + } + const normalizedTarget = normalizeText(target, 400); + if (!normalizedTarget) { + return '未命名任务'; + } + return truncateText(normalizedTarget.replace(/\s+/g, ' '), 96); +} + +function computePlanWaves(nodes = []) { + const list = Array.isArray(nodes) ? nodes : []; + const byId = new Map(); + const indegree = new Map(); + const outgoing = new Map(); + for (const node of list) { + const id = normalizeId(node && node.id, ''); + if (!id) continue; + byId.set(id, node); + indegree.set(id, 0); + outgoing.set(id, []); + } + for (const node of list) { + const id = normalizeId(node && node.id, ''); + if (!id) continue; + const dependsOn = uniqueStringList(node && node.dependsOn).filter((depId) => byId.has(depId)); + indegree.set(id, dependsOn.length); + for (const depId of dependsOn) { + outgoing.get(depId).push(id); + } + } + + const remaining = new Set(byId.keys()); + const waves = []; + while (remaining.size > 0) { + const ready = Array.from(remaining).filter((id) => (indegree.get(id) || 0) === 0); + if (ready.length === 0) { + waves.push(Array.from(remaining)); + break; + } + ready.sort(); + waves.push(ready); + for (const id of ready) { + remaining.delete(id); + const nextIds = outgoing.get(id) || []; + for (const nextId of nextIds) { + indegree.set(nextId, Math.max(0, (indegree.get(nextId) || 0) - 1)); + } + } + } + + return waves.map((wave, index) => ({ + index, + nodeIds: wave, + label: `Wave ${index + 1}` + })); +} + +function detectDependencyCycle(nodes = []) { + const list = Array.isArray(nodes) ? nodes : []; + const graph = new Map(); + for (const node of list) { + const id = normalizeId(node && node.id, ''); + if (!id) continue; + graph.set(id, uniqueStringList(node && node.dependsOn).map((depId) => normalizeId(depId, '')).filter(Boolean)); + } + const visiting = new Set(); + const visited = new Set(); + const visit = (id) => { + if (visited.has(id)) { + return false; + } + if (visiting.has(id)) { + return true; + } + visiting.add(id); + const deps = graph.get(id) || []; + for (const depId of deps) { + if (!graph.has(depId)) { + continue; + } + if (visit(depId)) { + return true; + } + } + visiting.delete(id); + visited.add(id); + return false; + }; + for (const id of graph.keys()) { + if (visit(id)) { + return true; + } + } + return false; +} + +function buildCodexNodePrompt(kind, context = {}) { + const title = normalizeText(context.title, 160); + const target = normalizeText(context.target, 4000); + const item = normalizeText(context.item, 1200); + const followUp = normalizeText(context.followUp, 1200); + const dependencySummaries = uniqueStringList(context.dependencySummaries || []).slice(0, 6); + const allowWrite = context.allowWrite === true; + const dependencyBlock = dependencySummaries.length > 0 + ? `\n已完成前置摘要:\n${dependencySummaries.map((entry, index) => `${index + 1}. ${entry}`).join('\n')}` + : ''; + const writeRule = allowWrite + ? '允许直接修改本地工作区,但必须控制变更范围并完成最小必要验证。' + : '只允许只读调查,不要修改任何文件,不要执行写入型操作。'; + + if (kind === 'analysis') { + return [ + `任务标题: ${title || '任务分析'}`, + `任务目标:\n${target}`, + writeRule, + '请先调查当前仓库与上下文,给出简洁的现状判断、主要风险、建议执行顺序,以及你认为最小可行的验证方案。', + '输出请保持简洁,聚焦可执行信息。' + ].join('\n\n'); + } + + if (kind === 'work') { + return [ + `任务标题: ${title || '执行子任务'}`, + `总目标:\n${target}`, + `当前子任务:\n${item || title || target}`, + dependencyBlock, + writeRule, + '请只处理当前子任务范围,避免越界修改。完成后说明实际改动、验证结果与剩余风险。' + ].join('\n\n'); + } + + if (kind === 'verify') { + return [ + `任务标题: ${title || '验证与总结'}`, + `总目标:\n${target}`, + dependencyBlock, + writeRule, + '请基于当前工作区状态执行最小必要验证,检查明显回归,并输出结果摘要、未完成项和建议下一步。' + ].join('\n\n'); + } + + return [ + `任务标题: ${title || '后续任务'}`, + `总目标:\n${target}`, + dependencyBlock, + writeRule, + `追加指令:\n${followUp || item || target}`, + '请严格围绕上述后续指令继续推进,并给出结果摘要。' + ].join('\n\n'); +} + +function normalizeWorkflowCatalog(workflowCatalog = []) { + const map = new Map(); + for (const item of Array.isArray(workflowCatalog) ? workflowCatalog : []) { + const id = normalizeId(item && item.id, ''); + if (!id) continue; + map.set(id, { + id, + name: normalizeText(item.name || id, 160) || id, + description: normalizeText(item.description, 600), + readOnly: item && item.readOnly !== false + }); + } + return map; +} + +function buildTaskPlan(request = {}, options = {}) { + const workflowMap = normalizeWorkflowCatalog(options.workflowCatalog || []); + const target = normalizeText(request.target, 4000); + const title = buildTaskTitle(target, request.title); + const engine = normalizeId(request.engine, 'codex') === 'workflow' ? 'workflow' : 'codex'; + const allowWrite = request.allowWrite === true; + const dryRun = request.dryRun === true; + const concurrency = normalizePositiveInteger(request.concurrency, 2, 1, 8); + const autoFixRounds = normalizePositiveInteger(request.autoFixRounds, 1, 0, 5); + const requestedWorkflowIds = uniqueStringList(request.workflowIds || []) + .map((id) => normalizeId(id, '')) + .filter((id) => id && workflowMap.has(id)); + const followUps = uniqueStringList(request.followUps || []); + const notes = normalizeText(request.notes, 2000); + const cwd = normalizeText(request.cwd || options.cwd, 1200); + const nodes = []; + let nodeSequence = 0; + const nextNodeId = (prefix) => `${prefix}-${String(++nodeSequence).padStart(2, '0')}`; + + if (requestedWorkflowIds.length > 0) { + let previousId = ''; + for (const workflowId of requestedWorkflowIds) { + const meta = workflowMap.get(workflowId); + const nodeId = nextNodeId('workflow'); + nodes.push({ + id: nodeId, + title: meta ? meta.name : workflowId, + kind: 'workflow', + workflowId, + dependsOn: previousId ? [previousId] : [], + write: !!(meta && meta.readOnly === false), + retryLimit: 0, + autoFixRounds, + input: { + target, + title, + notes + } + }); + previousId = nodeId; + } + } else { + const items = splitTargetIntoItems(target); + const analysisId = nextNodeId('analysis'); + nodes.push({ + id: analysisId, + title: '现状分析', + kind: 'codex', + prompt: buildCodexNodePrompt('analysis', { + title, + target, + allowWrite: false + }), + dependsOn: [], + write: false, + retryLimit: 0, + autoFixRounds: 0 + }); + + const executionNodeIds = []; + const workItems = items.length > 0 ? items : [target || title]; + for (const item of workItems) { + const nodeId = nextNodeId('work'); + executionNodeIds.push(nodeId); + nodes.push({ + id: nodeId, + title: truncateText(item, 72) || `执行 ${executionNodeIds.length}`, + kind: 'codex', + prompt: buildCodexNodePrompt('work', { + title, + target, + item, + allowWrite + }), + dependsOn: [analysisId], + write: allowWrite, + retryLimit: 0, + autoFixRounds + }); + } + + const verifyId = nextNodeId('verify'); + nodes.push({ + id: verifyId, + title: '验证与总结', + kind: 'codex', + prompt: buildCodexNodePrompt('verify', { + title, + target, + allowWrite: false + }), + dependsOn: executionNodeIds.slice(), + write: false, + retryLimit: 0, + autoFixRounds: 0 + }); + } + + let followUpDependsOn = nodes.length > 0 ? [nodes[nodes.length - 1].id] : []; + for (const followUp of followUps) { + const nodeId = nextNodeId('follow-up'); + nodes.push({ + id: nodeId, + title: truncateText(followUp, 72) || `Follow-up ${nodes.length + 1}`, + kind: 'codex', + prompt: buildCodexNodePrompt('follow-up', { + title, + target, + allowWrite, + followUp + }), + dependsOn: followUpDependsOn, + write: allowWrite, + retryLimit: 0, + autoFixRounds + }); + followUpDependsOn = [nodeId]; + } + + const plan = { + id: normalizeId(request.id, ''), + title, + target, + notes, + cwd, + engine, + allowWrite, + dryRun, + concurrency, + autoFixRounds, + workflowIds: requestedWorkflowIds, + followUps, + nodes, + waves: computePlanWaves(nodes) + }; + return plan; +} + +function validateTaskPlan(plan, options = {}) { + const issues = []; + if (!isPlainObject(plan)) { + return { + ok: false, + error: 'task plan must be an object', + issues: [{ code: 'task-plan-invalid', message: 'task plan must be an object' }] + }; + } + + const workflowMap = normalizeWorkflowCatalog(options.workflowCatalog || []); + const nodes = Array.isArray(plan.nodes) ? plan.nodes : []; + if (nodes.length === 0) { + issues.push({ code: 'task-plan-nodes-required', message: 'task plan must contain at least one node' }); + } + const nodeIds = new Set(); + for (const node of nodes) { + if (!isPlainObject(node)) { + issues.push({ code: 'task-node-invalid', message: 'task node must be an object' }); + continue; + } + const id = normalizeId(node.id, ''); + if (!id) { + issues.push({ code: 'task-node-id-required', message: 'task node id is required' }); + continue; + } + if (nodeIds.has(id)) { + issues.push({ code: 'task-node-id-duplicate', message: `duplicate task node id: ${id}` }); + continue; + } + nodeIds.add(id); + const kind = normalizeId(node.kind, ''); + if (kind !== 'workflow' && kind !== 'codex') { + issues.push({ code: 'task-node-kind-invalid', message: `task node ${id} has unsupported kind: ${node.kind || ''}` }); + } + if (kind === 'workflow') { + const workflowId = normalizeId(node.workflowId, ''); + if (!workflowId) { + issues.push({ code: 'task-node-workflow-required', message: `task node ${id} missing workflowId` }); + } else if (workflowMap.size > 0 && !workflowMap.has(workflowId)) { + issues.push({ code: 'task-node-workflow-unknown', message: `task node ${id} references unknown workflow: ${workflowId}` }); + } + } + if (kind === 'codex' && !normalizeText(node.prompt, 12000)) { + issues.push({ code: 'task-node-prompt-required', message: `task node ${id} missing prompt` }); + } + } + + if (issues.length === 0) { + for (const node of nodes) { + const id = normalizeId(node.id, ''); + const dependsOn = uniqueStringList(node.dependsOn || []).map((depId) => normalizeId(depId, '')); + for (const depId of dependsOn) { + if (!nodeIds.has(depId)) { + issues.push({ code: 'task-node-dependency-missing', message: `task node ${id} depends on missing node: ${depId}` }); + } + if (depId === id) { + issues.push({ code: 'task-node-dependency-self', message: `task node ${id} cannot depend on itself` }); + } + } + } + } + + if (issues.length === 0 && detectDependencyCycle(nodes)) { + issues.push({ code: 'task-plan-cycle-detected', message: 'task plan contains a dependency cycle' }); + } + + return { + ok: issues.length === 0, + error: issues[0] ? issues[0].message : '', + issues + }; +} + +function createNodeRunRecord(node, dependencyNodeIds = []) { + return { + id: normalizeId(node && node.id, ''), + title: normalizeText(node && node.title, 160), + kind: normalizeId(node && node.kind, ''), + workflowId: normalizeId(node && node.workflowId, ''), + dependsOn: uniqueStringList(node && node.dependsOn), + dependencyNodeIds: uniqueStringList(dependencyNodeIds), + write: node && node.write === true, + status: 'pending', + attemptCount: 0, + autoFixRounds: normalizePositiveInteger(node && node.autoFixRounds, 0, 0, 5), + retryLimit: normalizePositiveInteger(node && node.retryLimit, 0, 0, 5), + startedAt: '', + endedAt: '', + durationMs: 0, + summary: '', + error: '', + output: null, + logs: [], + attempts: [] + }; +} + +function createRunLog(level, message, extra = {}) { + return { + at: new Date().toISOString(), + level: level || 'info', + message: normalizeText(message, 1000), + ...cloneJson(extra, {}) + }; +} + +async function executeTaskPlan(plan, options = {}) { + const executeNode = typeof options.executeNode === 'function' + ? options.executeNode + : async () => ({ success: false, error: 'executeNode is not configured' }); + const onUpdate = typeof options.onUpdate === 'function' ? options.onUpdate : null; + const signal = options.signal || null; + const startedAtTs = Date.now(); + const concurrency = normalizePositiveInteger(options.concurrency || plan.concurrency, 2, 1, 8); + const nodeList = Array.isArray(plan && plan.nodes) ? plan.nodes : []; + const run = { + status: 'running', + startedAt: new Date(startedAtTs).toISOString(), + endedAt: '', + durationMs: 0, + concurrency, + waves: computePlanWaves(nodeList), + nodes: nodeList.map((node) => createNodeRunRecord(node)), + logs: [createRunLog('info', 'task run started', { concurrency })], + summary: '', + error: '' + }; + + const nodeMap = new Map(); + for (const node of run.nodes) { + nodeMap.set(node.id, node); + } + + const emitUpdate = async () => { + if (!onUpdate) { + return; + } + await onUpdate(cloneJson(run, run)); + }; + + const active = new Map(); + let writeLock = false; + + const finalizeNode = (nodeRun, payload, attemptStartedAt, attemptIndex) => { + const endedAtTs = Date.now(); + const success = !!(payload && payload.success === true); + const error = success ? '' : normalizeText(payload && payload.error, 2000) || `node failed: ${nodeRun.id}`; + nodeRun.attempts.push({ + index: attemptIndex, + startedAt: new Date(attemptStartedAt).toISOString(), + endedAt: new Date(endedAtTs).toISOString(), + durationMs: endedAtTs - attemptStartedAt, + success, + error, + summary: truncateText(payload && payload.summary, 400), + output: cloneJson(payload && payload.output, null), + logs: cloneJson(payload && payload.logs, []) + }); + nodeRun.logs = cloneJson(payload && payload.logs, []); + nodeRun.output = cloneJson(payload && payload.output, null); + nodeRun.summary = truncateText(payload && payload.summary, 400); + nodeRun.error = error; + nodeRun.startedAt = nodeRun.startedAt || new Date(attemptStartedAt).toISOString(); + nodeRun.endedAt = new Date(endedAtTs).toISOString(); + nodeRun.durationMs = endedAtTs - new Date(nodeRun.startedAt).getTime(); + return { success, error }; + }; + + const executeOneNode = async (nodeRun, nodeDef, hooks = {}) => { + nodeRun.status = 'running'; + if (!nodeRun.startedAt) { + nodeRun.startedAt = new Date().toISOString(); + } + nodeRun.attemptCount += 1; + const attemptIndex = nodeRun.attemptCount; + const attemptStartedAt = Date.now(); + const dependencyResults = (nodeRun.dependsOn || []) + .map((depId) => nodeMap.get(depId)) + .filter(Boolean) + .map((dep) => ({ + id: dep.id, + status: dep.status, + summary: dep.summary, + error: dep.error, + output: cloneJson(dep.output, null) + })); + const attemptLogs = [createRunLog('info', `starting node ${nodeRun.id}`, { attempt: attemptIndex })]; + let abortHandler = null; + try { + const payload = await executeNode(nodeDef, { + attempt: attemptIndex, + maxAttempts: 1 + Math.max(nodeRun.retryLimit, 0) + Math.max(nodeRun.autoFixRounds, 0), + dependencyResults, + signal, + registerAbort(handler) { + abortHandler = typeof handler === 'function' ? handler : null; + if (typeof hooks.onAbortChange === 'function') { + hooks.onAbortChange(abortHandler); + } + }, + previousAttempts: cloneJson(nodeRun.attempts, []) + }); + attemptLogs.push(...(Array.isArray(payload && payload.logs) ? payload.logs : [])); + const finalized = finalizeNode(nodeRun, { + ...payload, + logs: attemptLogs.concat(Array.isArray(payload && payload.logs) ? payload.logs : []) + }, attemptStartedAt, attemptIndex); + return { + success: finalized.success, + error: finalized.error, + abortHandler + }; + } catch (error) { + const message = error && error.message ? error.message : String(error || 'task node execution failed'); + const finalized = finalizeNode(nodeRun, { + success: false, + error: message, + output: null, + summary: message, + logs: attemptLogs.concat(createRunLog('error', message)) + }, attemptStartedAt, attemptIndex); + return { + success: false, + error: finalized.error, + abortHandler + }; + } + }; + + const getPendingNodeDefs = () => nodeList.filter((node) => { + const nodeRun = nodeMap.get(normalizeId(node && node.id, '')); + return !!nodeRun && nodeRun.status === 'pending'; + }); + + const hasFailedDependency = (nodeRun) => (nodeRun.dependsOn || []).some((depId) => { + const dep = nodeMap.get(depId); + return dep && (dep.status === 'failed' || dep.status === 'blocked' || dep.status === 'cancelled'); + }); + + const isReady = (nodeRun) => (nodeRun.dependsOn || []).every((depId) => { + const dep = nodeMap.get(depId); + return dep && (dep.status === 'success' || dep.status === 'skipped'); + }); + + const markBlockedNodes = () => { + for (const nodeRun of run.nodes) { + if (nodeRun.status !== 'pending') continue; + if (!hasFailedDependency(nodeRun)) continue; + nodeRun.status = 'blocked'; + nodeRun.error = 'blocked by failed dependency'; + nodeRun.summary = '前置节点失败,已阻塞'; + nodeRun.startedAt = nodeRun.startedAt || new Date().toISOString(); + nodeRun.endedAt = new Date().toISOString(); + run.logs.push(createRunLog('warn', `node blocked: ${nodeRun.id}`)); + } + }; + + const abortActiveNodes = () => { + for (const activeRun of active.values()) { + if (activeRun && typeof activeRun.abort === 'function') { + try { + activeRun.abort(); + } catch (_) {} + } + } + }; + + try { + while (true) { + if (signal && signal.aborted) { + abortActiveNodes(); + for (const nodeRun of run.nodes) { + if (nodeRun.status === 'pending') { + nodeRun.status = 'cancelled'; + nodeRun.error = 'cancelled before start'; + nodeRun.summary = '已取消'; + } else if (nodeRun.status === 'running') { + nodeRun.status = 'cancelled'; + nodeRun.error = 'cancelled while running'; + nodeRun.summary = '执行中取消'; + } + } + run.logs.push(createRunLog('warn', 'task run cancelled')); + run.status = 'cancelled'; + break; + } + + markBlockedNodes(); + const readyNodeDefs = getPendingNodeDefs().filter((node) => { + const nodeRun = nodeMap.get(normalizeId(node && node.id, '')); + return nodeRun && isReady(nodeRun); + }); + let started = false; + for (const nodeDef of readyNodeDefs) { + if (active.size >= concurrency) { + break; + } + const nodeRun = nodeMap.get(normalizeId(nodeDef && nodeDef.id, '')); + if (!nodeRun) continue; + const wantsWrite = nodeRun.write === true; + if (writeLock && wantsWrite) { + continue; + } + if (wantsWrite && active.size > 0) { + continue; + } + if (!wantsWrite && writeLock) { + continue; + } + started = true; + if (wantsWrite) { + writeLock = true; + } + const promise = (async () => { + let result = null; + const maxAttempts = 1 + Math.max(nodeRun.retryLimit, 0) + Math.max(nodeRun.autoFixRounds, 0); + for (let attempt = nodeRun.attemptCount + 1; attempt <= maxAttempts; attempt += 1) { + result = await executeOneNode(nodeRun, nodeDef, { + onAbortChange(handler) { + const current = active.get(nodeRun.id); + if (current) { + current._abort = typeof handler === 'function' ? handler : null; + } + } + }); + if (result.success) { + nodeRun.status = 'success'; + run.logs.push(createRunLog('info', `node completed: ${nodeRun.id}`)); + break; + } + if (signal && signal.aborted) { + nodeRun.status = 'cancelled'; + nodeRun.summary = '执行中取消'; + break; + } + if (attempt < maxAttempts) { + nodeRun.status = 'running'; + run.logs.push(createRunLog('warn', `node retry scheduled: ${nodeRun.id}`, { + nextAttempt: attempt + 1, + error: nodeRun.error + })); + continue; + } + nodeRun.status = 'failed'; + run.logs.push(createRunLog('error', `node failed: ${nodeRun.id}`, { error: nodeRun.error })); + } + return { + nodeId: nodeRun.id, + wantsWrite, + abort: result && result.abortHandler ? result.abortHandler : null + }; + })(); + active.set(nodeRun.id, { + promise, + abort() { + const current = active.get(nodeRun.id); + if (current && typeof current._abort === 'function') { + current._abort(); + } + }, + _abort: null, + wantsWrite + }); + await emitUpdate(); + } + + if (active.size === 0) { + const pending = run.nodes.some((nodeRun) => nodeRun.status === 'pending'); + if (!pending) { + break; + } + if (!started) { + markBlockedNodes(); + const stillPending = run.nodes.some((nodeRun) => nodeRun.status === 'pending'); + if (!stillPending) { + break; + } + run.logs.push(createRunLog('error', 'task run stalled because no nodes can be scheduled')); + run.status = 'failed'; + run.error = 'task run stalled because no nodes can be scheduled'; + break; + } + } + + if (active.size > 0) { + const settled = await Promise.race(Array.from(active.entries()).map(([nodeId, state]) => state.promise + .then((payload) => ({ nodeId, payload })) + .catch((error) => ({ nodeId, payload: { error: error && error.message ? error.message : String(error || 'task failed') } })))); + const settledState = active.get(settled.nodeId); + active.delete(settled.nodeId); + if (settledState && settledState.wantsWrite) { + writeLock = false; + } + await emitUpdate(); + } + } + } catch (error) { + run.status = 'failed'; + run.error = error && error.message ? error.message : String(error || 'task run failed'); + run.logs.push(createRunLog('error', run.error)); + } + + const endedAtTs = Date.now(); + run.endedAt = new Date(endedAtTs).toISOString(); + run.durationMs = endedAtTs - startedAtTs; + if (!run.status || run.status === 'running') { + if (run.nodes.some((node) => node.status === 'failed')) { + run.status = 'failed'; + } else if (run.nodes.some((node) => node.status === 'cancelled')) { + run.status = 'cancelled'; + } else if (run.nodes.every((node) => node.status === 'success' || node.status === 'skipped')) { + run.status = 'success'; + } else if (run.nodes.some((node) => node.status === 'blocked')) { + run.status = 'failed'; + } else { + run.status = 'failed'; + } + } + const lastCompletedNode = [...run.nodes].reverse().find((node) => node.summary || node.error); + run.summary = truncateText( + lastCompletedNode && (lastCompletedNode.summary || lastCompletedNode.error) + ? (lastCompletedNode.summary || lastCompletedNode.error) + : (run.status === 'success' ? '任务执行完成' : '任务执行失败'), + 400 + ); + if (!run.error && run.status !== 'success') { + const failedNode = run.nodes.find((node) => node.status === 'failed' || node.status === 'blocked' || node.status === 'cancelled'); + run.error = failedNode ? (failedNode.error || failedNode.summary) : 'task run failed'; + } + await emitUpdate(); + return run; +} + +module.exports = { + truncateText, + splitTargetIntoItems, + computePlanWaves, + buildTaskPlan, + validateTaskPlan, + executeTaskPlan +}; diff --git a/tests/e2e/run.js b/tests/e2e/run.js index 91fddb33..d9eafe5c 100644 --- a/tests/e2e/run.js +++ b/tests/e2e/run.js @@ -23,6 +23,7 @@ const testHealthSpeed = require('./test-health-speed'); const testMessages = require('./test-messages'); const testMcp = require('./test-mcp'); const testWorkflow = require('./test-workflow'); +const testTaskOrchestration = require('./test-task-orchestration'); const testInvalidConfig = require('./test-invalid-config'); const testWebUiAssets = require('./test-web-ui-assets'); const testWebUiSessionBrowser = require('./test-web-ui-session-browser'); @@ -134,6 +135,7 @@ async function main() { await testMessages(ctx); await testMcp(ctx); await testWorkflow(ctx); + await testTaskOrchestration(ctx); await testWebUiAssets(ctx); await testWebUiSessionBrowser(ctx); diff --git a/tests/e2e/test-task-orchestration.js b/tests/e2e/test-task-orchestration.js new file mode 100644 index 00000000..e02ff9b5 --- /dev/null +++ b/tests/e2e/test-task-orchestration.js @@ -0,0 +1,173 @@ +const { assert, runSync, fs, path } = require('./helpers'); + +function parseJsonOutput(rawText) { + const text = String(rawText || '').trim(); + if (!text) { + return {}; + } + try { + return JSON.parse(text); + } catch (_) { + const start = text.indexOf('{'); + const end = text.lastIndexOf('}'); + if (start >= 0 && end > start) { + return JSON.parse(text.slice(start, end + 1)); + } + throw new Error(`invalid json output: ${text.slice(0, 200)}`); + } +} + +module.exports = async function testTaskOrchestration(ctx) { + const { api, node, cliPath, env, tmpHome } = ctx; + + const planResult = runSync(node, [ + cliPath, + 'task', + 'plan', + '--target', + '检查当前配置并输出摘要', + '--follow-up', + '整理结论', + '--json' + ], { env }); + assert(planResult.status === 0, `task plan failed: ${planResult.stderr || planResult.stdout}`); + const planPayload = parseJsonOutput(planResult.stdout); + assert(planPayload.ok === true, 'task plan should validate'); + assert(planPayload.plan && Array.isArray(planPayload.plan.nodes), 'task plan should include nodes'); + assert(planPayload.plan.nodes.length >= 2, 'task plan should include multiple nodes'); + + const runResult = runSync(node, [ + cliPath, + 'task', + 'run', + '--target', + '诊断当前配置', + '--workflow-id', + 'diagnose-config', + '--engine', + 'workflow', + '--json' + ], { env }); + assert(runResult.status === 0, `task run failed: ${runResult.stderr || runResult.stdout}`); + const runPayload = parseJsonOutput(runResult.stdout); + assert(runPayload.run && runPayload.run.status === 'success', 'task run should succeed with diagnose-config workflow'); + assert(typeof runPayload.runId === 'string' && runPayload.runId, 'task run should return runId'); + assert(typeof runPayload.taskId === 'string' && runPayload.taskId, 'task run should return taskId'); + + const runsResult = runSync(node, [cliPath, 'task', 'runs', '--limit', '10', '--json'], { env }); + assert(runsResult.status === 0, `task runs failed: ${runsResult.stderr || runsResult.stdout}`); + const runsPayload = parseJsonOutput(runsResult.stdout); + assert(Array.isArray(runsPayload.runs), 'task runs should return runs array'); + assert(runsPayload.runs.some((item) => item.runId === runPayload.runId), 'task runs should include latest run'); + + const queueAddResult = runSync(node, [ + cliPath, + 'task', + 'queue', + 'add', + '--target', + '再次诊断当前配置', + '--workflow-id', + 'diagnose-config', + '--engine', + 'workflow', + '--json' + ], { env }); + assert(queueAddResult.status === 0, `task queue add failed: ${queueAddResult.stderr || queueAddResult.stdout}`); + const queueAddPayload = parseJsonOutput(queueAddResult.stdout); + assert(queueAddPayload.ok === true, 'task queue add should succeed'); + assert(queueAddPayload.task && queueAddPayload.task.taskId, 'queue add should return task'); + + const queueShowResult = runSync(node, [ + cliPath, + 'task', + 'queue', + 'show', + queueAddPayload.task.taskId + ], { env }); + assert(queueShowResult.status === 0, `task queue show failed: ${queueShowResult.stderr || queueShowResult.stdout}`); + const queueShowPayload = parseJsonOutput(queueShowResult.stdout); + assert(queueShowPayload.taskId === queueAddPayload.task.taskId, 'task queue show should resolve task'); + + const queueStartResult = runSync(node, [ + cliPath, + 'task', + 'queue', + 'start', + queueAddPayload.task.taskId, + '--json' + ], { env }); + assert(queueStartResult.status === 0, `task queue start failed: ${queueStartResult.stderr || queueStartResult.stdout}`); + const queueStartPayload = parseJsonOutput(queueStartResult.stdout); + assert(queueStartPayload.ok === true, 'task queue start should succeed'); + assert(queueStartPayload.detail && queueStartPayload.detail.run && queueStartPayload.detail.run.status === 'success', 'queued task should complete successfully'); + + const logsResult = runSync(node, [ + cliPath, + 'task', + 'logs', + queueStartPayload.detail.runId, + '--json' + ], { env }); + assert(logsResult.status === 0, `task logs failed: ${logsResult.stderr || logsResult.stdout}`); + const logsPayload = parseJsonOutput(logsResult.stdout); + assert(typeof logsPayload.logs === 'string', 'task logs should return log text'); + assert(logsPayload.logs.includes('# workflow-01') || logsPayload.logs.includes('# diagnose-config') || logsPayload.logs.includes('# workflow'), 'task logs should include node heading'); + + const queueListResult = runSync(node, [cliPath, 'task', 'queue', 'list', '--json'], { env }); + assert(queueListResult.status === 0, `task queue list failed: ${queueListResult.stderr || queueListResult.stdout}`); + const queueListPayload = parseJsonOutput(queueListResult.stdout); + assert(Array.isArray(queueListPayload.tasks), 'task queue list should return tasks array'); + assert(queueListPayload.tasks.some((item) => item.taskId === queueAddPayload.task.taskId), 'task queue list should include queued task record'); + + const taskRunsFile = path.join(tmpHome, '.codex', 'codexmate-task-runs.jsonl'); + const taskQueueFile = path.join(tmpHome, '.codex', 'codexmate-task-queue.json'); + assert(fs.existsSync(taskRunsFile), 'task runs file should be created'); + assert(fs.existsSync(taskQueueFile), 'task queue file should be created'); + + const apiOverview = await api('task-overview'); + assert(Array.isArray(apiOverview.queue), 'task-overview API should return queue'); + assert(Array.isArray(apiOverview.runs), 'task-overview API should return runs'); + + const apiPlan = await api('task-plan', { + target: '检查配置后输出摘要', + followUps: ['整理结果'] + }); + assert(apiPlan.ok === true, 'task-plan API should validate'); + assert(apiPlan.plan && Array.isArray(apiPlan.plan.waves), 'task-plan API should return waves'); + + const apiQueueAdd = await api('task-queue-add', { + target: '排队执行配置诊断', + workflowIds: ['diagnose-config'], + engine: 'workflow' + }); + assert(apiQueueAdd.ok === true, 'task-queue-add API should succeed'); + + const apiQueueStart = await api('task-queue-start', { + taskId: apiQueueAdd.task.taskId, + detach: false + }); + assert(apiQueueStart.ok === true, 'task-queue-start API should succeed'); + assert(apiQueueStart.detail && apiQueueStart.detail.run && apiQueueStart.detail.run.status === 'success', 'API queue start should execute task'); + + const apiRunDetail = await api('task-run-detail', { runId: apiQueueStart.detail.runId }); + assert(apiRunDetail && apiRunDetail.runId === apiQueueStart.detail.runId, 'task-run-detail API should return detail'); + + const apiRetry = await api('task-retry', { + runId: apiQueueStart.detail.runId, + detach: false + }); + assert(apiRetry && apiRetry.run && apiRetry.run.status === 'success', 'task-retry API should rerun task'); + + const apiLogs = await api('task-logs', { runId: apiRetry.runId }); + assert(typeof apiLogs.logs === 'string', 'task-logs API should return logs'); + + const apiCancelQueued = await api('task-queue-add', { + target: '待取消任务', + workflowIds: ['diagnose-config'], + engine: 'workflow' + }); + assert(apiCancelQueued.ok === true, 'second task-queue-add API should succeed'); + const apiCancel = await api('task-cancel', { taskId: apiCancelQueued.task.taskId }); + assert(apiCancel.ok === true, 'task-cancel API should cancel queued task'); +}; diff --git a/tests/unit/config-tabs-ui.test.mjs b/tests/unit/config-tabs-ui.test.mjs index cd3663e6..00c943c3 100644 --- a/tests/unit/config-tabs-ui.test.mjs +++ b/tests/unit/config-tabs-ui.test.mjs @@ -14,6 +14,7 @@ test('config template keeps expected config tabs in top and side navigation', () const openclawModal = readProjectFile('web-ui/partials/index/modal-openclaw-config.html'); const sessionsPanel = readProjectFile('web-ui/partials/index/panel-sessions.html'); const usagePanel = readProjectFile('web-ui/partials/index/panel-usage.html'); + const orchestrationPanel = readProjectFile('web-ui/partials/index/panel-orchestration.html'); const baseTheme = readProjectFile('web-ui/styles/base-theme.css'); const controlsForms = readProjectFile('web-ui/styles/controls-forms.css'); const sideRail = html.match(/