Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 63 additions & 3 deletions src/Cli/Commands/RunCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using fuseraft.Core.Models;
using fuseraft.Infrastructure;
using fuseraft.Infrastructure.Plugins;
using fuseraft.Orchestration;

namespace fuseraft.Cli.Commands;

Expand Down Expand Up @@ -64,8 +65,14 @@ public override async Task<int> ExecuteAsync(CommandContext context, RunSettings
checkpoint = await ResolveCheckpointAsync(settings.Resume, sessionStore);
if (checkpoint is null) return 1;

// TurnIndex of the last message equals the highest turn number, accounting for
// any previous compactions where Messages.Count < total turns elapsed.
var turnsComplete = checkpoint.Messages.Count > 0
? checkpoint.Messages[^1].TurnIndex + 1
: 0;

AnsiConsole.MarkupLine($"[dim]Resuming session [bold]{checkpoint.SessionId}[/] " +
$"({checkpoint.Messages.Count} turns already complete)[/]");
$"({turnsComplete} turns already complete)[/]");
}

// Load config + build orchestrator
Expand All @@ -74,10 +81,11 @@ public override async Task<int> ExecuteAsync(CommandContext context, RunSettings
OrchestrationConfig config;
IOrchestrator orchestrator;
McpSessionManager mcpManager;
ConversationCompactor? compactor;

try
{
(orchestrator, config, mcpManager) =
(orchestrator, config, mcpManager, compactor) =
await OrchestratorBuilder.BuildAsync(configPath, loggerFactory, pluginRegistry);
}
catch (Exception ex)
Expand All @@ -104,6 +112,9 @@ public override async Task<int> ExecuteAsync(CommandContext context, RunSettings
// Resolve task
var task = checkpoint?.Task ?? settings.Task?.Trim();

if (checkpoint is not null && !string.IsNullOrWhiteSpace(settings.Task))
AnsiConsole.MarkupLine("[yellow]⚠ Positional task argument ignored when resuming — using the session's original task.[/]");

if (string.IsNullOrEmpty(task))
{
task = AnsiConsole.Prompt(
Expand All @@ -127,6 +138,14 @@ public override async Task<int> ExecuteAsync(CommandContext context, RunSettings
ConfigPath = configPath
};

// Compact before the stream starts if the existing history is already over the threshold.
// This covers the resume case where a prior session accumulated too many turns.
if (compactor?.ShouldCompact(checkpoint.Messages) == true)
{
checkpoint = await ApplyCompactionAsync(task, checkpoint, compactor, sessionStore);
AnsiConsole.MarkupLine("[dim]History compacted before resuming.[/]");
}

// Cancellation
using var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
Expand All @@ -146,7 +165,8 @@ public override async Task<int> ExecuteAsync(CommandContext context, RunSettings

while (!cts.IsCancellationRequested)
{
string? injection = null;
string? injection = null;
bool compactionNeeded = false;

try
{
Expand All @@ -164,6 +184,12 @@ public override async Task<int> ExecuteAsync(CommandContext context, RunSettings
checkpoint.LastUpdatedAt = DateTime.UtcNow;
await sessionStore.SaveAsync(checkpoint, cts.Token);

if (compactor?.ShouldCompact(checkpoint.Messages) == true)
{
compactionNeeded = true;
break;
}

injection = PromptHitl();
if (injection == null) continue; // user pressed Enter — continue
if (injection == "\x00") { cts.Cancel(); break; } // user quit
Expand All @@ -190,6 +216,12 @@ await AnsiConsole.Status()
checkpoint.Messages.Add(msg);
checkpoint.LastUpdatedAt = DateTime.UtcNow;
await sessionStore.SaveAsync(checkpoint, cts.Token);

if (compactor?.ShouldCompact(checkpoint.Messages) == true)
{
compactionNeeded = true;
break;
}
}
});
}
Expand All @@ -214,6 +246,14 @@ await AnsiConsole.Status()
// If the user quit or cancelled, stop the outer loop.
if (cts.IsCancellationRequested) break;

// If history hit the compaction threshold, compact and restart the stream.
if (compactionNeeded)
{
checkpoint = await ApplyCompactionAsync(task, checkpoint, compactor!, sessionStore);
AnsiConsole.MarkupLine("[dim]History compacted — continuing session.[/]");
continue;
}

// If there's a HITL injection, add it as a user turn and restart.
if (injection != null && injection != "\x00")
{
Expand Down Expand Up @@ -255,6 +295,26 @@ await AnsiConsole.Status()
return succeeded ? 0 : 1;
}

// Compaction

private static async Task<SessionCheckpoint> ApplyCompactionAsync(
string task,
SessionCheckpoint checkpoint,
ConversationCompactor compactor,
ISessionStore sessionStore,
CancellationToken cancellationToken = default)
{
var (summary, retained) = await compactor.CompactAsync(task, checkpoint.Messages, cancellationToken);

checkpoint.Messages.Clear();
checkpoint.Messages.Add(summary);
checkpoint.Messages.AddRange(retained);
checkpoint.LastUpdatedAt = DateTime.UtcNow;

await sessionStore.SaveAsync(checkpoint, cancellationToken);
return checkpoint;
}

// HITL prompt

/// <summary>
Expand Down
12 changes: 6 additions & 6 deletions src/Cli/Commands/ValidateConfigCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Spectre.Console;
using Spectre.Console.Cli;
using fuseraft.Core.Models;
using fuseraft.Infrastructure.Plugins;

namespace fuseraft.Cli.Commands;

Expand All @@ -20,11 +21,8 @@ public sealed class ValidateConfigSettings : CommandSettings
/// <summary>
/// Validates an orchestration config file and reports all issues found.
/// </summary>
public sealed class ValidateConfigCommand : Command<ValidateConfigSettings>
public sealed class ValidateConfigCommand(PluginRegistry pluginRegistry) : Command<ValidateConfigSettings>
{
// Known built-in plugin names for strict validation.
private static readonly HashSet<string> BuiltInPlugins =
["FileSystem", "Shell", "Git", "Http", "Json"];

public override int Execute(CommandContext context, ValidateConfigSettings settings)
{
Expand Down Expand Up @@ -102,9 +100,11 @@ public override int Execute(CommandContext context, ValidateConfigSettings setti

if (settings.Strict)
{
var registered = pluginRegistry.RegisteredPlugins
.ToHashSet(StringComparer.OrdinalIgnoreCase);
foreach (var plugin in agent.Plugins)
if (!BuiltInPlugins.Contains(plugin))
issues.Add(("warning", $"Agent '{agent.Name}': plugin '{plugin}' is not a built-in."));
if (!registered.Contains(plugin))
issues.Add(("warning", $"Agent '{agent.Name}': plugin '{plugin}' is not registered."));
}
}
}
Expand Down
49 changes: 39 additions & 10 deletions src/Cli/OrchestratorBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public static class OrchestratorBuilder
/// and returns a configured orchestrator together with the active session manager.
/// The caller is responsible for disposing <paramref name="mcpManager"/> (via <c>await using</c>).
/// </summary>
public static async Task<(IOrchestrator Orchestrator, OrchestrationConfig Config, McpSessionManager McpManager)> BuildAsync(
public static async Task<(IOrchestrator Orchestrator, OrchestrationConfig Config, McpSessionManager McpManager, ConversationCompactor? Compactor)> BuildAsync(
string configPath,
ILoggerFactory loggerFactory,
PluginRegistry pluginRegistry,
Expand All @@ -35,9 +35,7 @@ public static class OrchestratorBuilder
.AddJsonFile(Path.GetFullPath(configPath), optional: false)
.Build();

var config = configuration.GetSection("Orchestration").Get<OrchestrationConfig>()
?? throw new InvalidOperationException(
$"File '{configPath}' is missing the top-level 'Orchestration' key.");
var config = BindConfig(configPath, configuration);

if (config.Agents.Count == 0)
throw new InvalidOperationException("Config must define at least one agent.");
Expand All @@ -55,7 +53,21 @@ public static class OrchestratorBuilder
var strategyFactory = new StrategyFactory(kernelFactory);
var logger = loggerFactory.CreateLogger<AgentOrchestrator>();

return (new AgentOrchestrator(config, agentFactory, strategyFactory, logger), config, mcpManager);
ConversationCompactor? compactor = null;
if (config.Compaction is { } compactionConfig)
{
if (compactionConfig.KeepRecentTurns >= compactionConfig.TriggerTurnCount)
throw new InvalidOperationException(
$"Compaction.KeepRecentTurns ({compactionConfig.KeepRecentTurns}) must be " +
$"less than Compaction.TriggerTurnCount ({compactionConfig.TriggerTurnCount}).");

var summaryModel = compactionConfig.Model ?? config.Agents[0].Model;
compactor = new ConversationCompactor(
kernelFactory, compactionConfig, summaryModel,
loggerFactory.CreateLogger<ConversationCompactor>());
}

return (new AgentOrchestrator(config, agentFactory, strategyFactory, logger), config, mcpManager, compactor);
}

/// <summary>
Expand Down Expand Up @@ -117,11 +129,28 @@ public static OrchestrationConfig LoadConfig(string configPath)
if (!File.Exists(configPath))
throw new FileNotFoundException($"Config file not found: {configPath}");

return new ConfigurationBuilder()
var configuration = new ConfigurationBuilder()
.AddJsonFile(Path.GetFullPath(configPath), optional: false)
.Build()
.GetSection("Orchestration")
.Get<OrchestrationConfig>()
?? throw new InvalidOperationException("Missing 'Orchestration' section.");
.Build();

return BindConfig(configPath, configuration);
}

// Separates binding from loading so both BuildAsync and LoadConfig get the same
// helpful error message when a field type doesn't match the schema.
private static OrchestrationConfig BindConfig(string configPath, IConfiguration configuration)
{
OrchestrationConfig? config;
try
{
config = configuration.GetSection("Orchestration").Get<OrchestrationConfig>();
}
catch (Exception ex)
{
throw new InvalidOperationException($"Failed to bind '{configPath}': {ex.Message} Check that all field types match the expected schema.", ex);
}

return config
?? throw new InvalidOperationException($"File '{configPath}' is missing the top-level 'Orchestration' key.");
}
}
4 changes: 0 additions & 4 deletions src/Core/Models/AgentConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,4 @@ public record AgentConfig
/// </summary>
public List<string> Plugins { get; init; } = [];

/// <summary>
/// Arbitrary key/value metadata passed through to result messages.
/// </summary>
public Dictionary<string, string> Metadata { get; init; } = [];
}
7 changes: 7 additions & 0 deletions src/Core/Models/AgentMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ public record AgentMessage

/// <summary>
/// Token usage and estimated cost for this turn. Null for HITL messages.
/// For compaction summary messages, <see cref="TokenUsage.CostUsd"/> carries the
/// cumulative cost of all compacted turns so budget tracking remains accurate.
/// </summary>
public TokenUsage? Usage { get; init; }

/// <summary>
/// True when this message is an LLM-generated summary that replaces earlier turns.
/// </summary>
public bool IsCompactionSummary { get; init; }
}
33 changes: 33 additions & 0 deletions src/Core/Models/CompactionConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
namespace fuseraft.Core.Models;

/// <summary>
/// Controls automatic conversation compaction. When the session history exceeds
/// <see cref="TriggerTurnCount"/> messages, older turns are summarised by an LLM and
/// replaced with a single summary message; the most recent <see cref="KeepRecentTurns"/>
/// turns are kept verbatim so agents retain immediate context.
///
/// Compaction fires in two situations:
/// <list type="bullet">
/// <item>Before the stream starts, when resuming a checkpoint that is already over the threshold.</item>
/// <item>Mid-session, after each checkpoint save, once the live history crosses the threshold.</item>
/// </list>
/// </summary>
public record CompactionConfig
{
/// <summary>
/// Compact when the message count reaches this value. Default: 50.
/// </summary>
public int TriggerTurnCount { get; init; } = 50;

/// <summary>
/// Number of most-recent turns to keep verbatim after compaction. Default: 10.
/// Must be less than <see cref="TriggerTurnCount"/>.
/// </summary>
public int KeepRecentTurns { get; init; } = 10;

/// <summary>
/// Model used to generate the compaction summary.
/// Defaults to the first agent's model when null.
/// </summary>
public ModelConfig? Model { get; init; }
}
7 changes: 7 additions & 0 deletions src/Core/Models/OrchestrationConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,11 @@ public record OrchestrationConfig
/// lists alongside the built-in plugins.
/// </summary>
public List<McpServerConfig> McpServers { get; init; } = [];

/// <summary>
/// Optional compaction settings. When present, conversation history is automatically
/// summarised once it exceeds the configured turn threshold, keeping the session alive
/// indefinitely without hitting context-window limits. Null (default) disables compaction.
/// </summary>
public CompactionConfig? Compaction { get; init; }
}
Loading
Loading