diff --git a/src/Cli/Commands/RunCommand.cs b/src/Cli/Commands/RunCommand.cs index 1e87cd2..005ac19 100644 --- a/src/Cli/Commands/RunCommand.cs +++ b/src/Cli/Commands/RunCommand.cs @@ -8,6 +8,7 @@ using fuseraft.Core.Models; using fuseraft.Infrastructure; using fuseraft.Infrastructure.Plugins; +using fuseraft.Orchestration; namespace fuseraft.Cli.Commands; @@ -64,8 +65,14 @@ public override async Task ExecuteAsync(CommandContext context, RunSettings checkpoint = await ResolveCheckpointAsync(settings.Resume, sessionStore); if (checkpoint is null) return 1; + // TurnIndex of the last message equals the highest turn number, accounting for + // any previous compactions where Messages.Count < total turns elapsed. + var turnsComplete = checkpoint.Messages.Count > 0 + ? checkpoint.Messages[^1].TurnIndex + 1 + : 0; + AnsiConsole.MarkupLine($"[dim]Resuming session [bold]{checkpoint.SessionId}[/] " + - $"({checkpoint.Messages.Count} turns already complete)[/]"); + $"({turnsComplete} turns already complete)[/]"); } // Load config + build orchestrator @@ -74,10 +81,11 @@ public override async Task ExecuteAsync(CommandContext context, RunSettings OrchestrationConfig config; IOrchestrator orchestrator; McpSessionManager mcpManager; + ConversationCompactor? compactor; try { - (orchestrator, config, mcpManager) = + (orchestrator, config, mcpManager, compactor) = await OrchestratorBuilder.BuildAsync(configPath, loggerFactory, pluginRegistry); } catch (Exception ex) @@ -104,6 +112,9 @@ public override async Task ExecuteAsync(CommandContext context, RunSettings // Resolve task var task = checkpoint?.Task ?? settings.Task?.Trim(); + if (checkpoint is not null && !string.IsNullOrWhiteSpace(settings.Task)) + AnsiConsole.MarkupLine("[yellow]⚠ Positional task argument ignored when resuming — using the session's original task.[/]"); + if (string.IsNullOrEmpty(task)) { task = AnsiConsole.Prompt( @@ -127,6 +138,14 @@ public override async Task ExecuteAsync(CommandContext context, RunSettings ConfigPath = configPath }; + // Compact before the stream starts if the existing history is already over the threshold. + // This covers the resume case where a prior session accumulated too many turns. + if (compactor?.ShouldCompact(checkpoint.Messages) == true) + { + checkpoint = await ApplyCompactionAsync(task, checkpoint, compactor, sessionStore); + AnsiConsole.MarkupLine("[dim]History compacted before resuming.[/]"); + } + // Cancellation using var cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => @@ -146,7 +165,8 @@ public override async Task ExecuteAsync(CommandContext context, RunSettings while (!cts.IsCancellationRequested) { - string? injection = null; + string? injection = null; + bool compactionNeeded = false; try { @@ -164,6 +184,12 @@ public override async Task ExecuteAsync(CommandContext context, RunSettings checkpoint.LastUpdatedAt = DateTime.UtcNow; await sessionStore.SaveAsync(checkpoint, cts.Token); + if (compactor?.ShouldCompact(checkpoint.Messages) == true) + { + compactionNeeded = true; + break; + } + injection = PromptHitl(); if (injection == null) continue; // user pressed Enter — continue if (injection == "\x00") { cts.Cancel(); break; } // user quit @@ -190,6 +216,12 @@ await AnsiConsole.Status() checkpoint.Messages.Add(msg); checkpoint.LastUpdatedAt = DateTime.UtcNow; await sessionStore.SaveAsync(checkpoint, cts.Token); + + if (compactor?.ShouldCompact(checkpoint.Messages) == true) + { + compactionNeeded = true; + break; + } } }); } @@ -214,6 +246,14 @@ await AnsiConsole.Status() // If the user quit or cancelled, stop the outer loop. if (cts.IsCancellationRequested) break; + // If history hit the compaction threshold, compact and restart the stream. + if (compactionNeeded) + { + checkpoint = await ApplyCompactionAsync(task, checkpoint, compactor!, sessionStore); + AnsiConsole.MarkupLine("[dim]History compacted — continuing session.[/]"); + continue; + } + // If there's a HITL injection, add it as a user turn and restart. if (injection != null && injection != "\x00") { @@ -255,6 +295,26 @@ await AnsiConsole.Status() return succeeded ? 0 : 1; } + // Compaction + + private static async Task ApplyCompactionAsync( + string task, + SessionCheckpoint checkpoint, + ConversationCompactor compactor, + ISessionStore sessionStore, + CancellationToken cancellationToken = default) + { + var (summary, retained) = await compactor.CompactAsync(task, checkpoint.Messages, cancellationToken); + + checkpoint.Messages.Clear(); + checkpoint.Messages.Add(summary); + checkpoint.Messages.AddRange(retained); + checkpoint.LastUpdatedAt = DateTime.UtcNow; + + await sessionStore.SaveAsync(checkpoint, cancellationToken); + return checkpoint; + } + // HITL prompt /// diff --git a/src/Cli/Commands/ValidateConfigCommand.cs b/src/Cli/Commands/ValidateConfigCommand.cs index b37f9af..6098eac 100644 --- a/src/Cli/Commands/ValidateConfigCommand.cs +++ b/src/Cli/Commands/ValidateConfigCommand.cs @@ -3,6 +3,7 @@ using Spectre.Console; using Spectre.Console.Cli; using fuseraft.Core.Models; +using fuseraft.Infrastructure.Plugins; namespace fuseraft.Cli.Commands; @@ -20,11 +21,8 @@ public sealed class ValidateConfigSettings : CommandSettings /// /// Validates an orchestration config file and reports all issues found. /// -public sealed class ValidateConfigCommand : Command +public sealed class ValidateConfigCommand(PluginRegistry pluginRegistry) : Command { - // Known built-in plugin names for strict validation. - private static readonly HashSet BuiltInPlugins = - ["FileSystem", "Shell", "Git", "Http", "Json"]; public override int Execute(CommandContext context, ValidateConfigSettings settings) { @@ -102,9 +100,11 @@ public override int Execute(CommandContext context, ValidateConfigSettings setti if (settings.Strict) { + var registered = pluginRegistry.RegisteredPlugins + .ToHashSet(StringComparer.OrdinalIgnoreCase); foreach (var plugin in agent.Plugins) - if (!BuiltInPlugins.Contains(plugin)) - issues.Add(("warning", $"Agent '{agent.Name}': plugin '{plugin}' is not a built-in.")); + if (!registered.Contains(plugin)) + issues.Add(("warning", $"Agent '{agent.Name}': plugin '{plugin}' is not registered.")); } } } diff --git a/src/Cli/OrchestratorBuilder.cs b/src/Cli/OrchestratorBuilder.cs index c93d9d6..e9075c0 100644 --- a/src/Cli/OrchestratorBuilder.cs +++ b/src/Cli/OrchestratorBuilder.cs @@ -22,7 +22,7 @@ public static class OrchestratorBuilder /// and returns a configured orchestrator together with the active session manager. /// The caller is responsible for disposing (via await using). /// - public static async Task<(IOrchestrator Orchestrator, OrchestrationConfig Config, McpSessionManager McpManager)> BuildAsync( + public static async Task<(IOrchestrator Orchestrator, OrchestrationConfig Config, McpSessionManager McpManager, ConversationCompactor? Compactor)> BuildAsync( string configPath, ILoggerFactory loggerFactory, PluginRegistry pluginRegistry, @@ -35,9 +35,7 @@ public static class OrchestratorBuilder .AddJsonFile(Path.GetFullPath(configPath), optional: false) .Build(); - var config = configuration.GetSection("Orchestration").Get() - ?? throw new InvalidOperationException( - $"File '{configPath}' is missing the top-level 'Orchestration' key."); + var config = BindConfig(configPath, configuration); if (config.Agents.Count == 0) throw new InvalidOperationException("Config must define at least one agent."); @@ -55,7 +53,21 @@ public static class OrchestratorBuilder var strategyFactory = new StrategyFactory(kernelFactory); var logger = loggerFactory.CreateLogger(); - return (new AgentOrchestrator(config, agentFactory, strategyFactory, logger), config, mcpManager); + ConversationCompactor? compactor = null; + if (config.Compaction is { } compactionConfig) + { + if (compactionConfig.KeepRecentTurns >= compactionConfig.TriggerTurnCount) + throw new InvalidOperationException( + $"Compaction.KeepRecentTurns ({compactionConfig.KeepRecentTurns}) must be " + + $"less than Compaction.TriggerTurnCount ({compactionConfig.TriggerTurnCount})."); + + var summaryModel = compactionConfig.Model ?? config.Agents[0].Model; + compactor = new ConversationCompactor( + kernelFactory, compactionConfig, summaryModel, + loggerFactory.CreateLogger()); + } + + return (new AgentOrchestrator(config, agentFactory, strategyFactory, logger), config, mcpManager, compactor); } /// @@ -117,11 +129,28 @@ public static OrchestrationConfig LoadConfig(string configPath) if (!File.Exists(configPath)) throw new FileNotFoundException($"Config file not found: {configPath}"); - return new ConfigurationBuilder() + var configuration = new ConfigurationBuilder() .AddJsonFile(Path.GetFullPath(configPath), optional: false) - .Build() - .GetSection("Orchestration") - .Get() - ?? throw new InvalidOperationException("Missing 'Orchestration' section."); + .Build(); + + return BindConfig(configPath, configuration); + } + + // Separates binding from loading so both BuildAsync and LoadConfig get the same + // helpful error message when a field type doesn't match the schema. + private static OrchestrationConfig BindConfig(string configPath, IConfiguration configuration) + { + OrchestrationConfig? config; + try + { + config = configuration.GetSection("Orchestration").Get(); + } + catch (Exception ex) + { + throw new InvalidOperationException($"Failed to bind '{configPath}': {ex.Message} Check that all field types match the expected schema.", ex); + } + + return config + ?? throw new InvalidOperationException($"File '{configPath}' is missing the top-level 'Orchestration' key."); } } diff --git a/src/Core/Models/AgentConfig.cs b/src/Core/Models/AgentConfig.cs index c98c4cb..9392e4e 100644 --- a/src/Core/Models/AgentConfig.cs +++ b/src/Core/Models/AgentConfig.cs @@ -31,8 +31,4 @@ public record AgentConfig /// public List Plugins { get; init; } = []; - /// - /// Arbitrary key/value metadata passed through to result messages. - /// - public Dictionary Metadata { get; init; } = []; } diff --git a/src/Core/Models/AgentMessage.cs b/src/Core/Models/AgentMessage.cs index a078dbb..25425ff 100644 --- a/src/Core/Models/AgentMessage.cs +++ b/src/Core/Models/AgentMessage.cs @@ -33,6 +33,13 @@ public record AgentMessage /// /// Token usage and estimated cost for this turn. Null for HITL messages. + /// For compaction summary messages, carries the + /// cumulative cost of all compacted turns so budget tracking remains accurate. /// public TokenUsage? Usage { get; init; } + + /// + /// True when this message is an LLM-generated summary that replaces earlier turns. + /// + public bool IsCompactionSummary { get; init; } } diff --git a/src/Core/Models/CompactionConfig.cs b/src/Core/Models/CompactionConfig.cs new file mode 100644 index 0000000..ad98f32 --- /dev/null +++ b/src/Core/Models/CompactionConfig.cs @@ -0,0 +1,33 @@ +namespace fuseraft.Core.Models; + +/// +/// Controls automatic conversation compaction. When the session history exceeds +/// messages, older turns are summarised by an LLM and +/// replaced with a single summary message; the most recent +/// turns are kept verbatim so agents retain immediate context. +/// +/// Compaction fires in two situations: +/// +/// Before the stream starts, when resuming a checkpoint that is already over the threshold. +/// Mid-session, after each checkpoint save, once the live history crosses the threshold. +/// +/// +public record CompactionConfig +{ + /// + /// Compact when the message count reaches this value. Default: 50. + /// + public int TriggerTurnCount { get; init; } = 50; + + /// + /// Number of most-recent turns to keep verbatim after compaction. Default: 10. + /// Must be less than . + /// + public int KeepRecentTurns { get; init; } = 10; + + /// + /// Model used to generate the compaction summary. + /// Defaults to the first agent's model when null. + /// + public ModelConfig? Model { get; init; } +} diff --git a/src/Core/Models/OrchestrationConfig.cs b/src/Core/Models/OrchestrationConfig.cs index 3d79515..22f8a80 100644 --- a/src/Core/Models/OrchestrationConfig.cs +++ b/src/Core/Models/OrchestrationConfig.cs @@ -51,4 +51,11 @@ public record OrchestrationConfig /// lists alongside the built-in plugins. /// public List McpServers { get; init; } = []; + + /// + /// Optional compaction settings. When present, conversation history is automatically + /// summarised once it exceeds the configured turn threshold, keeping the session alive + /// indefinitely without hitting context-window limits. Null (default) disables compaction. + /// + public CompactionConfig? Compaction { get; init; } } diff --git a/src/Infrastructure/Plugins/HttpPlugin.cs b/src/Infrastructure/Plugins/HttpPlugin.cs index aac4d45..8373268 100644 --- a/src/Infrastructure/Plugins/HttpPlugin.cs +++ b/src/Infrastructure/Plugins/HttpPlugin.cs @@ -1,5 +1,6 @@ using System.ComponentModel; using System.Net; +using System.Net.Sockets; using System.Text; using System.Text.Json; using Microsoft.Extensions.Logging; @@ -43,7 +44,7 @@ public HttpPlugin(HttpClient httpClient, IReadOnlyList? allowedHosts = n /// public HttpPlugin() { - _http = new HttpClient { Timeout = TimeSpan.FromSeconds(30) }; + _http = new HttpClient { Timeout = Timeout.InfiniteTimeSpan }; _http.DefaultRequestHeaders.UserAgent.ParseAdd("fuseraft/1.0"); _ownsClient = true; _logger = null; @@ -58,13 +59,14 @@ public async Task GetAsync( [Description("Full URL to fetch.")] string url, [Description( "Optional extra request headers as a JSON object, e.g. " + - "{\"Authorization\":\"Bearer token\",\"Accept\":\"application/json\"}.")] string? headers = null) + "{\"Authorization\":\"Bearer token\",\"Accept\":\"application/json\"}.")] string? headers = null, + [Description("Timeout in seconds before the request is abandoned. Defaults to 30.")] int timeoutSeconds = 30) { - var denial = CheckUrl(url); + var denial = await CheckUrlAsync(url); if (denial is not null) return denial; using var request = BuildRequest(HttpMethod.Get, url, headers); - return await SendAsync(request); + return await SendAsync(request, timeoutSeconds); } [KernelFunction("http_post")] @@ -73,14 +75,15 @@ public async Task PostAsync( [Description("Full URL.")] string url, [Description("Request body (JSON string, form data, etc.).")] string body, [Description("Content-Type header. Defaults to 'application/json'.")] string contentType = "application/json", - [Description("Extra request headers as a JSON object.")] string? headers = null) + [Description("Extra request headers as a JSON object.")] string? headers = null, + [Description("Timeout in seconds. Defaults to 30.")] int timeoutSeconds = 30) { - var denial = CheckUrl(url); + var denial = await CheckUrlAsync(url); if (denial is not null) return denial; using var request = BuildRequest(HttpMethod.Post, url, headers); request.Content = new StringContent(body, Encoding.UTF8, contentType); - return await SendAsync(request); + return await SendAsync(request, timeoutSeconds); } [KernelFunction("http_put")] @@ -89,41 +92,53 @@ public async Task PutAsync( [Description("Full URL.")] string url, [Description("Request body.")] string body, [Description("Content-Type. Defaults to 'application/json'.")] string contentType = "application/json", - [Description("Extra request headers as a JSON object.")] string? headers = null) + [Description("Extra request headers as a JSON object.")] string? headers = null, + [Description("Timeout in seconds. Defaults to 30.")] int timeoutSeconds = 30) { - var denial = CheckUrl(url); + var denial = await CheckUrlAsync(url); if (denial is not null) return denial; using var request = BuildRequest(HttpMethod.Put, url, headers); request.Content = new StringContent(body, Encoding.UTF8, contentType); - return await SendAsync(request); + return await SendAsync(request, timeoutSeconds); } [KernelFunction("http_delete")] [Description("Makes an HTTP DELETE request and returns the response status.")] public async Task DeleteAsync( [Description("Full URL.")] string url, - [Description("Extra request headers as a JSON object.")] string? headers = null) + [Description("Extra request headers as a JSON object.")] string? headers = null, + [Description("Timeout in seconds. Defaults to 30.")] int timeoutSeconds = 30) { - var denial = CheckUrl(url); + var denial = await CheckUrlAsync(url); if (denial is not null) return denial; using var request = BuildRequest(HttpMethod.Delete, url, headers); - return await SendAsync(request); + return await SendAsync(request, timeoutSeconds); } [KernelFunction("http_head")] [Description("Makes an HTTP HEAD request and returns response headers (no body).")] public async Task HeadAsync( [Description("Full URL.")] string url, - [Description("Extra request headers as a JSON object.")] string? headers = null) + [Description("Extra request headers as a JSON object.")] string? headers = null, + [Description("Timeout in seconds. Defaults to 30.")] int timeoutSeconds = 30) { - var denial = CheckUrl(url); + var denial = await CheckUrlAsync(url); if (denial is not null) return denial; using var request = BuildRequest(HttpMethod.Head, url, headers); - using var response = await _http.SendAsync(request); - return FormatHeaders(response); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSeconds)); + try + { + using var response = await _http.SendAsync(request, cts.Token); + return FormatHeaders(response); + } + catch (TaskCanceledException) + { + _logger?.LogDebug("HTTP HEAD timed out: {Url}", request.RequestUri); + return PluginResult.Timeout($"HTTP request exceeded the {timeoutSeconds}s timeout."); + } } // Helpers @@ -132,7 +147,7 @@ public async Task HeadAsync( /// Returns a [DENIED] error when the URL host is not on the allowlist or resolves to a /// private/loopback address. Returns null when the request is permitted. /// - private string? CheckUrl(string url) + private async Task CheckUrlAsync(string url) { if (!Uri.TryCreate(url, UriKind.Absolute, out var uri)) return PluginResult.Error($"Invalid URL: {url}"); @@ -144,7 +159,8 @@ public async Task HeadAsync( return PluginResult.Denied($"Host '{uri.Host}' is not in the configured HTTP allowlist."); // Even allow-listed hostnames must not resolve to private/loopback ranges. - if (IsPrivateHost(uri.Host)) + // DNS is resolved here so hostnames like localtest.me (→ 127.0.0.1) are caught. + if (await ResolvesToPrivateAddressAsync(uri.Host)) return PluginResult.Denied($"Host '{uri.Host}' resolves to a private or loopback address."); } @@ -152,36 +168,52 @@ public async Task HeadAsync( } /// - /// Returns true when the host is a loopback name, a known private-range literal IP, - /// or an IPv6 loopback address. + /// Returns true when is or resolves to a loopback, link-local, + /// or RFC-1918 private address. Fails closed: an unresolvable hostname is treated as + /// private since the HTTP request would fail anyway. /// - private static bool IsPrivateHost(string host) + private static async Task ResolvesToPrivateAddressAsync(string host) { - // Hostname-based checks if (host.Equals("localhost", StringComparison.OrdinalIgnoreCase)) return true; - if (!IPAddress.TryParse(host, out var ip)) - return false; + // Fast path: literal IP address — no DNS needed. + if (IPAddress.TryParse(host, out var literal)) + return IsPrivateIp(literal); + + // Resolve the hostname and check every returned address. + IPAddress[] addresses; + try + { + addresses = await Dns.GetHostAddressesAsync(host); + } + catch (SocketException) + { + // Unresolvable — fail closed. The HTTP request would fail for the same reason. + return true; + } + + return addresses.Any(IsPrivateIp); + } - // Map IPv4-in-IPv6 to IPv4 for range checks + private static bool IsPrivateIp(IPAddress ip) + { if (ip.IsIPv4MappedToIPv6) ip = ip.MapToIPv4(); - if (ip.AddressFamily == System.Net.Sockets.AddressFamily.InterNetwork) + if (ip.AddressFamily == AddressFamily.InterNetwork) { - var bytes = ip.GetAddressBytes(); + var b = ip.GetAddressBytes(); return - bytes[0] == 127 || // 127.0.0.0/8 loopback - bytes[0] == 10 || // 10.0.0.0/8 RFC-1918 - (bytes[0] == 172 && bytes[1] >= 16 && bytes[1] <= 31) || // 172.16-31.0.0/12 RFC-1918 - (bytes[0] == 192 && bytes[1] == 168) || // 192.168.0.0/16 RFC-1918 - (bytes[0] == 169 && bytes[1] == 254); // 169.254.0.0/16 link-local + b[0] == 127 || // 127.0.0.0/8 loopback + b[0] == 10 || // 10.0.0.0/8 RFC-1918 + (b[0] == 172 && b[1] >= 16 && b[1] <= 31) || // 172.16–31.0.0/12 RFC-1918 + (b[0] == 192 && b[1] == 168) || // 192.168.0.0/16 RFC-1918 + (b[0] == 169 && b[1] == 254); // 169.254.0.0/16 link-local } // IPv6 loopback (::1) and link-local (fe80::/10) - return ip.Equals(IPAddress.IPv6Loopback) || - host.StartsWith("fe80", StringComparison.OrdinalIgnoreCase); + return ip.Equals(IPAddress.IPv6Loopback) || ip.IsIPv6LinkLocal; } private static HttpRequestMessage BuildRequest(HttpMethod method, string url, string? headersJson) @@ -197,8 +229,7 @@ private static HttpRequestMessage BuildRequest(HttpMethod method, string url, st { foreach (var (key, value) in dict) { - if (!request.Headers.TryAddWithoutValidation(key, value)) - request.Headers.TryAddWithoutValidation(key, value); + request.Headers.TryAddWithoutValidation(key, value); } } } @@ -212,14 +243,15 @@ private static HttpRequestMessage BuildRequest(HttpMethod method, string url, st return request; } - private async Task SendAsync(HttpRequestMessage request) + private async Task SendAsync(HttpRequestMessage request, int timeoutSeconds) { _logger?.LogDebug("HTTP {Method} {Url}", request.Method, request.RequestUri); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSeconds)); try { - using var response = await _http.SendAsync(request); - var body = await response.Content.ReadAsStringAsync(); + using var response = await _http.SendAsync(request, cts.Token); + var body = await response.Content.ReadAsStringAsync(cts.Token); var statusLine = $"[HTTP {(int)response.StatusCode} {response.ReasonPhrase}]"; _logger?.LogDebug("{StatusLine} {Url} ({ContentType})", @@ -237,7 +269,7 @@ private async Task SendAsync(HttpRequestMessage request) catch (TaskCanceledException) { _logger?.LogDebug("HTTP request timed out: {Url}", request.RequestUri); - return PluginResult.Timeout("HTTP request exceeded the configured timeout."); + return PluginResult.Timeout($"HTTP request exceeded the {timeoutSeconds}s timeout."); } } diff --git a/src/Infrastructure/Plugins/PluginRegistry.cs b/src/Infrastructure/Plugins/PluginRegistry.cs index ac540e8..efe0c7d 100644 --- a/src/Infrastructure/Plugins/PluginRegistry.cs +++ b/src/Infrastructure/Plugins/PluginRegistry.cs @@ -145,7 +145,9 @@ public bool TryGet(string name, [NotNullWhen(true)] out object? plugin) private static HttpClient BuildSharedHttpClient() { - var client = new HttpClient { Timeout = TimeSpan.FromSeconds(30) }; + // Timeout.InfiniteTimeSpan — per-request timeouts are enforced via CancellationTokenSource + // inside HttpPlugin so agents can specify different timeouts per call. + var client = new HttpClient { Timeout = Timeout.InfiniteTimeSpan }; client.DefaultRequestHeaders.UserAgent.ParseAdd("fuseraft/1.0"); return client; } diff --git a/src/Infrastructure/Plugins/SearchPlugin.cs b/src/Infrastructure/Plugins/SearchPlugin.cs index 85ffce5..afc2dfb 100644 --- a/src/Infrastructure/Plugins/SearchPlugin.cs +++ b/src/Infrastructure/Plugins/SearchPlugin.cs @@ -86,6 +86,7 @@ public string SearchContent( var sb = new StringBuilder(); int totalMatches = 0; int filesWithMatches = 0; + int skippedFiles = 0; foreach (var file in Directory.EnumerateFiles(directory, filePattern, SearchOption.AllDirectories)) { @@ -93,7 +94,7 @@ public string SearchContent( string[] lines; try { lines = File.ReadAllLines(file); } - catch { continue; } // skip unreadable files (binary, locked, etc.) + catch { skippedFiles++; continue; } // skip unreadable files (binary, locked, etc.) var fileMatches = new List(); @@ -116,11 +117,16 @@ public string SearchContent( } if (totalMatches == 0) - return PluginResult.Info($"No matches found for '{query}' under {directory}"); + { + var noMatchNote = skippedFiles > 0 ? $" ({skippedFiles} unreadable file(s) skipped)" : string.Empty; + return PluginResult.Info($"No matches found for '{query}' under {directory}{noMatchNote}"); + } var header = $"[RESULTS] {totalMatches} match(es) in {filesWithMatches} file(s)"; if (totalMatches >= maxResults) - header += $" (limit reached — increase maxResults to see more)"; + header += " (limit reached — increase maxResults to see more)"; + if (skippedFiles > 0) + header += $" ({skippedFiles} unreadable file(s) skipped)"; return header + "\n\n" + sb.ToString().TrimEnd(); } @@ -158,6 +164,7 @@ public string SearchSymbol( var sb = new StringBuilder(); int totalMatches = 0; + int skippedFiles = 0; foreach (var file in Directory.EnumerateFiles(directory, filePattern, SearchOption.AllDirectories)) { @@ -165,7 +172,7 @@ public string SearchSymbol( string[] lines; try { lines = File.ReadAllLines(file); } - catch { continue; } + catch { skippedFiles++; continue; } for (int i = 0; i < lines.Length && totalMatches < maxResults; i++) { @@ -178,8 +185,15 @@ public string SearchSymbol( } if (totalMatches == 0) - return PluginResult.Info($"No definition found for '{symbol}' under {directory}"); + { + var noMatchNote = skippedFiles > 0 ? $" ({skippedFiles} unreadable file(s) skipped)" : string.Empty; + return PluginResult.Info($"No definition found for '{symbol}' under {directory}{noMatchNote}"); + } + + var header = $"[RESULTS] {totalMatches} definition(s) found for '{symbol}'"; + if (skippedFiles > 0) + header += $" ({skippedFiles} unreadable file(s) skipped)"; - return $"[RESULTS] {totalMatches} definition(s) found for '{symbol}':\n\n" + sb.ToString().TrimEnd(); + return header + ":\n\n" + sb.ToString().TrimEnd(); } } diff --git a/src/Infrastructure/Plugins/ShellPlugin.cs b/src/Infrastructure/Plugins/ShellPlugin.cs index 070633d..b57a33c 100644 --- a/src/Infrastructure/Plugins/ShellPlugin.cs +++ b/src/Infrastructure/Plugins/ShellPlugin.cs @@ -9,10 +9,9 @@ namespace fuseraft.Infrastructure.Plugins; /// SECURITY NOTE: This plugin runs arbitrary commands. In production, restrict agents /// to a sandbox directory and consider using a container runtime or a restricted shell. /// -/// When is provided, any explicitly supplied -/// workingDirectory argument is validated against the sandbox root and rejected if -/// it falls outside the tree. Commands that omit a working directory execute in the process -/// current directory, which should itself be set to the project root before launch. +/// When is provided, every command's working directory is +/// constrained to that root. Commands that omit a workingDirectory argument default +/// to the sandbox root rather than the process current directory. /// public sealed class ShellPlugin { @@ -106,23 +105,25 @@ public async Task WhichAsync([Description("Program name, e.g. 'dotnet', } [KernelFunction("shell_pwd")] - [Description("Returns the current working directory of the orchestration process.")] - public string GetWorkingDirectory() => Directory.GetCurrentDirectory(); + [Description("Returns the effective working directory used when no workingDirectory argument is supplied.")] + public string GetWorkingDirectory() => _sandboxRoot ?? Directory.GetCurrentDirectory(); // Helpers - // Validates that an explicitly requested working directory stays within the sandbox. + // Validates that the working directory stays within the sandbox. + // When a sandbox is active and no directory is specified, defaults to the sandbox root + // so commands never run in an uncontrolled directory. // Returns a [DENIED] error string on violation, null when safe. - // Also normalises the resolved path back into the out parameter for downstream use. private string? ValidateWorkingDirectory(string? workingDirectory, out string? resolved) { - if (workingDirectory is null || _sandboxRoot is null) + if (_sandboxRoot is null) { resolved = workingDirectory; return null; } - resolved = Path.GetFullPath(workingDirectory); + // Default to sandbox root when no directory is specified. + resolved = Path.GetFullPath(workingDirectory ?? _sandboxRoot); var sandboxPrefix = _sandboxRoot.TrimEnd(Path.DirectorySeparatorChar) + Path.DirectorySeparatorChar; var resolvedCheck = resolved.TrimEnd(Path.DirectorySeparatorChar) + Path.DirectorySeparatorChar; diff --git a/src/Orchestration/AgentOrchestrator.cs b/src/Orchestration/AgentOrchestrator.cs index 7df6a23..cc35a38 100644 --- a/src/Orchestration/AgentOrchestrator.cs +++ b/src/Orchestration/AgentOrchestrator.cs @@ -144,7 +144,11 @@ public async IAsyncEnumerable StreamAsync( var agentModels = config.Agents.ToDictionary( a => a.Name, a => a.Model.ModelId, StringComparer.OrdinalIgnoreCase); - int turn = priorHistory?.Count ?? 0; + // Seed from the last prior message's TurnIndex rather than priorHistory.Count so + // that compacted histories (where Count < total turns elapsed) produce correct indices. + int turn = priorHistory is { Count: > 0 } + ? priorHistory[^1].TurnIndex + 1 + : 0; decimal cumulativeCost = priorHistory? .Sum(m => m.Usage?.CostUsd ?? 0m) ?? 0m; @@ -169,12 +173,10 @@ public async IAsyncEnumerable StreamAsync( agentMessage.Usage?.CostUsd ?? 0m, Truncate(agentMessage.Content, 200)); - yield return agentMessage; - - // Check budget after yielding so the caller receives the turn that pushed - // cost over the limit (useful for debugging / transcript completeness). if (config.MaxCostUsd is { } limit && cumulativeCost > limit) throw new BudgetExceededException(cumulativeCost, limit); + + yield return agentMessage; } } diff --git a/src/Orchestration/ConversationCompactor.cs b/src/Orchestration/ConversationCompactor.cs new file mode 100644 index 0000000..ed10281 --- /dev/null +++ b/src/Orchestration/ConversationCompactor.cs @@ -0,0 +1,186 @@ +using System.Text; +using Microsoft.Extensions.Logging; +using Microsoft.SemanticKernel; +using fuseraft.Core.Models; +using fuseraft.Infrastructure; + +namespace fuseraft.Orchestration; + +/// +/// Summarises older conversation turns into a single context message using an LLM, +/// retaining only the most recent turns verbatim. +/// +/// The summary is given Role = "user" so it is +/// re-injected into the group chat as context the agents can read. Its +/// carries the cumulative cost of all compacted turns +/// (plus the cost of the summary call itself) so that budget tracking remains exact +/// across compaction boundaries. +/// +public sealed class ConversationCompactor( + KernelFactory kernelFactory, + CompactionConfig config, + ModelConfig summaryModel, + ILogger logger) +{ + /// + /// Returns true when has reached or exceeded + /// the configured . + /// + public bool ShouldCompact(IReadOnlyList messages) + => messages.Count >= config.TriggerTurnCount; + + /// + /// Compacts into a summary plus a retained tail. + /// + /// The original task string, included in the summary prompt. + /// Full current history. + /// Cancellation token. + /// + /// A tuple of the summary and the retained tail messages. + /// Callers should replace the checkpoint message list with + /// [ summary, ..retained ]. + /// + public async Task<(AgentMessage Summary, IReadOnlyList Retained)> CompactAsync( + string task, + IReadOnlyList messages, + CancellationToken cancellationToken = default) + { + if (messages.Count == 0) + throw new ArgumentException("Cannot compact an empty message list.", nameof(messages)); + + // Always keep at least 1 message verbatim so the summary has a non-empty tail to + // anchor to, and never keep more messages than actually exist. + var keepCount = Math.Clamp(config.KeepRecentTurns, 1, messages.Count - 1); + var toCompact = messages.Take(messages.Count - keepCount).ToList(); + var toRetain = messages.Skip(messages.Count - keepCount).ToList(); + + logger.LogInformation( + "Compacting {Compacted} turns (0–{LastCompacted}) into a summary; retaining {Kept} recent turns.", + toCompact.Count, toCompact[^1].TurnIndex, toRetain.Count); + + var historyText = BuildHistoryText(toCompact); + var (summaryText, summaryUsage) = await GenerateSummaryAsync( + task, historyText, toCompact.Count, cancellationToken); + + // Carry the cost of all compacted turns forward into the summary message so that + // budget tracking (which sums Usage.CostUsd across priorHistory) stays accurate. + var compactedCost = toCompact.Sum(m => m.Usage?.CostUsd ?? 0m); + var totalCarriedCost = compactedCost + (summaryUsage?.CostUsd ?? 0m); + + var summary = new AgentMessage + { + AgentName = "System", + Content = FormatSummaryContent(toCompact[0].TurnIndex, toCompact[^1].TurnIndex, summaryText), + Role = "user", + // Use the last compacted turn's index so that priorHistory[^1].TurnIndex + 1 + // correctly seeds the next turn number in AgentOrchestrator. + TurnIndex = toCompact[^1].TurnIndex, + IsCompactionSummary = true, + Usage = new TokenUsage( + summaryUsage?.InputTokens ?? 0, + summaryUsage?.OutputTokens ?? 0, + totalCarriedCost) + }; + + logger.LogInformation( + "Compaction complete. Turns 0–{Last} replaced by summary. " + + "Carried cost: ${Cost:F4} (compacted ${Compacted:F4} + summary ${Summary:F4}).", + toCompact[^1].TurnIndex, + totalCarriedCost, compactedCost, summaryUsage?.CostUsd ?? 0m); + + return (summary, toRetain); + } + + // Internals + + private async Task<(string Text, TokenUsage? Usage)> GenerateSummaryAsync( + string task, + string historyText, + int turnCount, + CancellationToken cancellationToken) + { + var kernel = kernelFactory.Create(summaryModel); + var function = KernelFunctionFactory.CreateFromPrompt(SummaryPrompt); + + FunctionResult result; + try + { + result = await kernel.InvokeAsync(function, new KernelArguments + { + ["task"] = task, + ["turn_count"] = turnCount.ToString(), + ["history"] = historyText + }, cancellationToken); + } + catch (Exception ex) + { + throw new InvalidOperationException( + "Compaction failed: the summary LLM call did not complete successfully. " + + $"Inner: {ex.Message}", ex); + } + + var text = result.GetValue()?.Trim(); + + if (string.IsNullOrEmpty(text)) + throw new InvalidOperationException( + "Compaction failed: the summary LLM returned an empty response."); + + return (text, ExtractUsage(result)); + } + + private static TokenUsage? ExtractUsage(FunctionResult result) + { + // SK stores usage on the inner ChatMessageContent metadata when available. + if (result.Metadata is null) return null; + if (!result.Metadata.TryGetValue("Usage", out var raw) || raw is null) return null; + + if (raw is OpenAI.Chat.ChatTokenUsage u) + return new TokenUsage(u.InputTokenCount, u.OutputTokenCount, null); + + return null; + } + + private static string BuildHistoryText(IReadOnlyList messages) + { + var sb = new StringBuilder(); + foreach (var msg in messages) + { + var label = msg.IsCompactionSummary + ? $"[Prior Summary — covers turns 1–{msg.TurnIndex + 1}]" + : $"[{(msg.Role == "user" ? "Human" : msg.AgentName)} — Turn {msg.TurnIndex + 1}]"; + + sb.AppendLine(label); + sb.AppendLine(msg.Content); + sb.AppendLine(); + } + return sb.ToString(); + } + + private static string FormatSummaryContent(int firstTurn, int lastTurn, string summaryText) => + $"[CONVERSATION SUMMARY — covers turns {firstTurn + 1}–{lastTurn + 1}]\n\n{summaryText}"; + + private const string SummaryPrompt = """ + You are compacting an AI agent conversation to preserve context while reducing its size. + + Original task: + {{$task}} + + The following {{$turn_count}} conversation turns are being summarised and will no longer + be available in their original form — only this summary will remain: + + {{$history}} + + Write a thorough, specific summary that retains every detail an agent would need to + continue the task without access to the original turns: + + - All decisions made and their rationale + - All work completed: exact file paths created or modified, commands run, test results, + build output, and any other concrete artefacts + - All errors or failures encountered and exactly how each was resolved + - The current state of progress — what is done, what is partially done + - Any pending items, open questions, or known blockers + + Be precise and complete. Do not paraphrase away specifics such as file names, error + messages, or command output. Nothing omitted here can be recovered later. + """; +} diff --git a/src/Orchestration/Strategies/StrategyFactory.cs b/src/Orchestration/Strategies/StrategyFactory.cs index 535700d..f07e0da 100644 --- a/src/Orchestration/Strategies/StrategyFactory.cs +++ b/src/Orchestration/Strategies/StrategyFactory.cs @@ -34,20 +34,23 @@ private KernelFunctionSelectionStrategy CreateLLMSelection(SelectionStrategyConf var prompt = config.Prompt ?? BuildDefaultSelectionPrompt(); var function = KernelFunctionFactory.CreateFromPrompt(prompt); - var fallbackAgent = agents.Count > 0 ? agents[0] : null; + var agentNames = string.Join(", ", agents.Select(a => a.Name)); return new KernelFunctionSelectionStrategy(function, kernel) { - // Return the agent name from the LLM response; fall back to first agent on failure. ResultParser = result => { var name = result.GetValue()?.Trim() ?? string.Empty; var matched = agents.FirstOrDefault( a => string.Equals(a.Name, name, StringComparison.OrdinalIgnoreCase)); - return matched?.Name ?? fallbackAgent?.Name ?? string.Empty; + if (matched is null) + throw new InvalidOperationException( + $"LLM selection returned unrecognized agent name '{name}'. " + + $"Expected one of: {agentNames}"); + return matched.Name!; }, - InitialAgent = fallbackAgent, - UseInitialAgentAsFallback = true + InitialAgent = agents.Count > 0 ? agents[0] : null, + UseInitialAgentAsFallback = false }; } diff --git a/src/Program.cs b/src/Program.cs index 7a558c8..1356ac9 100644 --- a/src/Program.cs +++ b/src/Program.cs @@ -12,7 +12,7 @@ // Serilog is configured here and forwarded into Microsoft.Extensions.Logging // so that all SK and orchestration logs flow through the same pipeline. -var serilogLogger = new LoggerConfiguration() +Log.Logger = new LoggerConfiguration() .MinimumLevel.Information() .MinimumLevel.Override("Microsoft", LogEventLevel.Warning) .MinimumLevel.Override("System", LogEventLevel.Warning) @@ -21,20 +21,16 @@ "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}{NewLine}{Exception}") .CreateLogger(); -Log.Logger = serilogLogger; - // Service registration var services = new ServiceCollection(); - services.AddLogging(logging => { logging.ClearProviders(); - logging.AddSerilog(serilogLogger, dispose: false); + logging.AddSerilog(Log.Logger, dispose: false); logging.SetMinimumLevel(LogLevel.Information); }); -services.AddSingleton(sp => - new PluginRegistry(sp.GetRequiredService()).RegisterDefaults()); +services.AddSingleton(sp => new PluginRegistry(sp.GetRequiredService()).RegisterDefaults()); services.AddSingleton(); // Commands are resolved via DI, register them so Spectre can inject dependencies. diff --git a/src/ServiceExtensions.cs b/src/ServiceExtensions.cs index 0060288..f0694f9 100644 --- a/src/ServiceExtensions.cs +++ b/src/ServiceExtensions.cs @@ -24,7 +24,7 @@ public static IServiceCollection AddOrchestration(this IServiceCollection servic services.AddSingleton(config); services.AddSingleton(); - services.AddSingleton(static _ => new PluginRegistry().RegisterDefaults()); + services.AddSingleton(static _ => new PluginRegistry().RegisterDefaults()); services.AddSingleton(); services.AddSingleton(); services.AddSingleton();