Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@
"default_mode": "auto",
"max_iterations": 12,
"default_attempts": 1
},
"ats-cli": {
"channel": "ats-cli",
"repo": "/home/openclaw/projects/ats-cli",
"github": "difflabai/ats-skill",
"default_mode": "auto",
"max_iterations": 10,
"default_attempts": 1
}
},
"claude_bin": "/usr/bin/claude",
Expand Down
86 changes: 28 additions & 58 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { readFileSync, existsSync } from 'node:fs';
import { fileURLToPath } from 'node:url';
import { dirname, join } from 'node:path';
import { parseArgs } from 'node:util';
import { parseJsonObjects, parseHumanReadable, trimBuffer } from './lib/watch-parser.js';

const __dirname = dirname(fileURLToPath(import.meta.url));
const VERSION = '3.0.0';
Expand Down Expand Up @@ -591,46 +592,23 @@ async function watchMode() {
buffer += chunk.toString();

// Try to extract JSON objects from the buffer
// The ats watch output interleaves JSON objects with human-readable lines
let startIdx;
while ((startIdx = buffer.indexOf('{')) !== -1) {
// Find the matching closing brace
let depth = 0;
let endIdx = -1;
for (let i = startIdx; i < buffer.length; i++) {
if (buffer[i] === '{') depth++;
else if (buffer[i] === '}') {
depth--;
if (depth === 0) {
endIdx = i;
break;
}
}
}

if (endIdx === -1) break; // incomplete JSON, wait for more data

const jsonStr = buffer.slice(startIdx, endIdx + 1);
buffer = buffer.slice(endIdx + 1);
const jsonResult = parseJsonObjects(buffer);
for (const event of jsonResult.events) {
handleWatchEvent(event, name, channel, project);
}
buffer = jsonResult.remaining;

try {
const event = JSON.parse(jsonStr);
// Fallback: parse human-readable format if no JSON was found
if (jsonResult.events.length === 0) {
const textResult = parseHumanReadable(buffer);
for (const event of textResult.events) {
log('info', 'Fallback text parser matched task', { channel, taskId: event.task_id });
handleWatchEvent(event, name, channel, project);
} catch {
// Not valid JSON, skip
log('debug', 'Failed to parse watch JSON', { channel, json: jsonStr.slice(0, 200) });
}
buffer = textResult.remaining;
}

// If buffer gets too large without valid JSON, trim non-JSON prefix
if (buffer.length > 10000) {
const lastBrace = buffer.lastIndexOf('{');
if (lastBrace > 0) {
buffer = buffer.slice(lastBrace);
} else {
buffer = '';
}
}
buffer = trimBuffer(buffer);
});

watcher.stderr.on('data', (chunk) => {
Expand Down Expand Up @@ -670,31 +648,23 @@ async function watchMode() {

watcher.stdout.on('data', (chunk) => {
buffer += chunk.toString();
let startIdx;
while ((startIdx = buffer.indexOf('{')) !== -1) {
let depth = 0;
let endIdx = -1;
for (let i = startIdx; i < buffer.length; i++) {
if (buffer[i] === '{') depth++;
else if (buffer[i] === '}') {
depth--;
if (depth === 0) { endIdx = i; break; }
}
}
if (endIdx === -1) break;
const jsonStr = buffer.slice(startIdx, endIdx + 1);
buffer = buffer.slice(endIdx + 1);
try {
const event = JSON.parse(jsonStr);

const jsonResult = parseJsonObjects(buffer);
for (const event of jsonResult.events) {
handleWatchEvent(event, name, channel, project);
}
buffer = jsonResult.remaining;

if (jsonResult.events.length === 0) {
const textResult = parseHumanReadable(buffer);
for (const event of textResult.events) {
log('info', 'Fallback text parser matched task', { channel, taskId: event.task_id });
handleWatchEvent(event, name, channel, project);
} catch {
log('debug', 'Failed to parse watch JSON', { channel, json: jsonStr.slice(0, 200) });
}
buffer = textResult.remaining;
}
if (buffer.length > 10000) {
const lastBrace = buffer.lastIndexOf('{');
buffer = lastBrace > 0 ? buffer.slice(lastBrace) : '';
}

buffer = trimBuffer(buffer);
});

watcher.stderr.on('data', () => {});
Expand Down Expand Up @@ -734,7 +704,7 @@ async function watchMode() {

// === Handle a watch event ===
function handleWatchEvent(event, projectName, channel, project) {
const taskId = event.id || event.uuid;
const taskId = event.task_id || event.id || event.uuid;
if (!taskId) return;

// Skip if already seen
Expand Down
114 changes: 114 additions & 0 deletions lib/watch-parser.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/**
* Watch mode parsers — extracted for testability.
*
* parseJsonObjects(buffer) - Extract JSON objects from a raw stdout buffer.
* parseHumanReadable(buffer) - Fallback: extract task.created events from
* human-readable ATS watch output.
*/

/**
* Attempt to extract one or more complete JSON objects from `buffer`.
*
* Returns { events: [ parsed objects ], remaining: unprocessed buffer tail }.
*/
export function parseJsonObjects(buffer) {
const events = [];
let scanPos = 0; // where to search for the next '{'
let consumedPos = 0; // end of the last successfully parsed JSON object

while (true) {
const startIdx = buffer.indexOf('{', scanPos);
if (startIdx === -1) break;

let depth = 0;
let endIdx = -1;
for (let i = startIdx; i < buffer.length; i++) {
if (buffer[i] === '{') depth++;
else if (buffer[i] === '}') {
depth--;
if (depth === 0) {
endIdx = i;
break;
}
}
}

if (endIdx === -1) {
// Incomplete JSON — return everything from startIdx onward as remaining
return { events, remaining: buffer.slice(startIdx) };
}

const jsonStr = buffer.slice(startIdx, endIdx + 1);
scanPos = endIdx + 1;

try {
events.push(JSON.parse(jsonStr));
consumedPos = endIdx + 1;
} catch {
// Not valid JSON, skip this candidate and keep scanning.
// consumedPos is NOT advanced so the text is preserved for
// downstream parsers (e.g. parseHumanReadable).
}
}

// If no JSON events were parsed, preserve the original buffer so downstream
// parsers (e.g. parseHumanReadable) can still process the full text.
if (events.length === 0) {
return { events, remaining: buffer };
}
return { events, remaining: buffer.slice(consumedPos) };
}

/**
* Parse human-readable ATS watch output.
*
* Looks for lines containing "task.created" followed by a line matching
* "Task #<id>: <title>".
*
* Returns { events: [ { task_id, title, event } ], remaining: unprocessed tail }.
*/
export function parseHumanReadable(buffer) {
const events = [];
const lines = buffer.split('\n');
const consumedIndices = new Set();

for (let i = 0; i < lines.length; i++) {
if (consumedIndices.has(i)) continue;
if (/task\.created/.test(lines[i])) {
const window = lines.slice(i, i + 3).join('\n');
const taskMatch = window.match(/Task #(\d+):\s*(.+)/);
if (taskMatch) {
events.push({
task_id: parseInt(taskMatch[1], 10),
title: taskMatch[2].trim(),
event: 'task.created',
});
consumedIndices.add(i);
// Only consume following lines if the Task# match wasn't on the trigger line itself.
const matchOffset = window.indexOf(taskMatch[0]);
const linesBeforeMatch = window.slice(0, matchOffset).split('\n').length - 1;
for (let j = 1; j <= linesBeforeMatch; j++) {
consumedIndices.add(i + j);
}
}
}
}

if (consumedIndices.size === 0) {
return { events, remaining: buffer };
}

const lastConsumed = Math.max(...consumedIndices);
const remaining = lines.slice(lastConsumed + 1).join('\n');
return { events, remaining };
}

/**
* Trim an oversized buffer to avoid unbounded memory growth.
* Keeps content from the last '{' onward, or clears the buffer entirely.
*/
export function trimBuffer(buffer, maxLength = 10000) {
if (buffer.length <= maxLength) return buffer;
const lastBrace = buffer.lastIndexOf('{');
return lastBrace > 0 ? buffer.slice(lastBrace) : '';
}
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"ats-project-runner": "./index.js"
},
"scripts": {
"start": "node index.js"
"start": "node index.js",
"test": "node --test 'test/**/*.test.js'"
}
}
Loading
Loading