Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/OpenClaw.Agent/OpenClaw.Agent.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<PackageReference Include="Microsoft.Extensions.AI" Version="10.3.0" />
<PackageReference Include="Microsoft.Extensions.AI.OpenAI" Version="10.3.0" />
<PackageReference Include="MQTTnet" Version="5.1.0.1559" />
<PackageReference Include="ModelContextProtocol" Version="1.1.0" />
</ItemGroup>
<ItemGroup>
<Content Include="Plugins/plugin-bridge.mjs" CopyToOutputDirectory="PreserveNewest" />
Expand Down
335 changes: 335 additions & 0 deletions src/OpenClaw.Agent/Plugins/McpServerToolRegistry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,335 @@
using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using ModelContextProtocol;
using ModelContextProtocol.Client;
using OpenClaw.Agent.Tools;
using OpenClaw.Core.Abstractions;
using OpenClaw.Core.Plugins;
using OpenClaw.Core.Security;

namespace OpenClaw.Agent.Plugins;

/// <summary>
/// Discovers tools from configured MCP servers and registers them as native OpenClaw tools.
/// </summary>
public sealed class McpServerToolRegistry : IDisposable
{
private readonly McpPluginsConfig _config;
private readonly ILogger _logger;
private readonly SemaphoreSlim _loadSemaphore = new(1, 1);
private readonly List<DiscoveredMcpTool> _tools = [];
private readonly List<McpClient> _clients = [];
private bool _loaded;
private bool _registered;
private bool _disposed;

/// <summary>
/// Creates a registry for configured MCP servers.
/// </summary>
public McpServerToolRegistry(McpPluginsConfig config, ILogger logger)
{
_config = config;
_logger = logger;
}

/// <summary>
/// Connects to configured MCP servers and registers discovered tools into the native registry.
/// </summary>
public async Task RegisterToolsAsync(NativePluginRegistry nativeRegistry, CancellationToken ct)
{
ThrowIfDisposed();
await _loadSemaphore.WaitAsync(ct);
try
{
ThrowIfDisposed();
if (_registered)
return;

var tools = await LoadInternalAsync(ct);
foreach (var tool in tools)
nativeRegistry.RegisterExternalTool(tool.Tool, tool.PluginId, tool.Detail);

_registered = true;
}
finally
{
_loadSemaphore.Release();
}
}

internal async Task<IReadOnlyList<DiscoveredMcpTool>> LoadAsync(CancellationToken ct)
{
ThrowIfDisposed();
await _loadSemaphore.WaitAsync(ct);
try
{
ThrowIfDisposed();
return _loaded ? _tools : await LoadInternalAsync(ct);
}
finally
{
_loadSemaphore.Release();
}
}

private async Task<IReadOnlyList<DiscoveredMcpTool>> LoadInternalAsync(CancellationToken ct)
{
if (_loaded)
return _tools;

if (!_config.Enabled)
{
_loaded = true;
return _tools;
}

var discoveredTools = new List<DiscoveredMcpTool>();
var discoveredClients = new List<McpClient>();

try
{
foreach (var (serverId, serverConfig) in _config.Servers ?? [])
{
if (!serverConfig.Enabled)
continue;

var transport = CreateTransport(serverId, serverConfig);
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
timeoutCts.CancelAfter(TimeSpan.FromSeconds(serverConfig.StartupTimeoutSeconds));
var client = await McpClient.CreateAsync(transport, cancellationToken: timeoutCts.Token);
discoveredClients.Add(client);

var displayName = string.IsNullOrWhiteSpace(serverConfig.Name) ? serverId : serverConfig.Name!;
var pluginId = $"mcp:{serverId}";

var tools = await LoadToolsFromClientAsync(client, serverId, pluginId, displayName, serverConfig, ct);

foreach (var tool in tools)
{
discoveredTools.Add(new DiscoveredMcpTool(
pluginId,
new McpNativeTool(client, tool.LocalName, tool.RemoteName, tool.Description, tool.InputSchemaText),
displayName));
}
}

_clients.AddRange(discoveredClients);
_tools.AddRange(discoveredTools);
_loaded = true;
return _tools;
}
catch
{
foreach (var client in discoveredClients)
{
try
{
DisposeClient(client);
}
catch
{
}
}
throw;
}
}

private async Task<IReadOnlyList<McpToolDescriptor>> LoadToolsFromClientAsync(
McpClient client,
string serverId,
string pluginId,
string displayName,
McpServerConfig config,
CancellationToken ct)
{
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
timeoutCts.CancelAfter(TimeSpan.FromSeconds(config.RequestTimeoutSeconds));
var response = await client.ListToolsAsync(cancellationToken: timeoutCts.Token);

var tools = new List<McpToolDescriptor>();
foreach (var tool in response)
{
var remoteName = tool.Name;
if (string.IsNullOrWhiteSpace(remoteName))
throw new InvalidOperationException($"MCP server '{displayName}' returned a tool entry with an empty name.");

var localName = ResolveToolName(serverId, config.ToolNamePrefix, remoteName);
var description = !string.IsNullOrWhiteSpace(tool.Description)
? $"{tool.Description} (from MCP server '{displayName}')"
: $"MCP tool '{remoteName}' from server '{displayName}'.";
var inputSchema = ResolveInputSchemaText(tool.JsonSchema);
tools.Add(new McpToolDescriptor(localName, remoteName, description, inputSchema));
}

_logger.LogInformation("MCP server enabled: {ServerId} ({DisplayName}) with {ToolCount} tool(s)",
serverId, displayName, tools.Count);
return tools;
}

private static string ResolveToolName(string serverId, string? toolNamePrefix, string remoteName)
{
var prefix = toolNamePrefix;
if (prefix is null)
prefix = $"{SanitizePrefixPart(serverId)}.";

return string.IsNullOrEmpty(prefix) ? remoteName : prefix + remoteName;
}

private static string SanitizePrefixPart(string value)
{
if (string.IsNullOrWhiteSpace(value))
return "mcp";

var sb = new StringBuilder(value.Length);
foreach (var ch in value)
{
if (char.IsLetterOrDigit(ch) || ch is '_' or '-' or '.')
sb.Append(char.ToLowerInvariant(ch));
else
sb.Append('_');
}

return sb.Length == 0 ? "mcp" : sb.ToString();
}

private static string ResolveInputSchemaText(JsonElement inputSchema)
{
if (inputSchema.ValueKind is JsonValueKind.Undefined or JsonValueKind.Null)
return "{}";

return inputSchema.GetRawText();
}

public void Dispose()
{
bool acquired = false;
try
{
acquired = _loadSemaphore.Wait(TimeSpan.FromSeconds(5));
if (!acquired)
{
_logger.LogWarning("McpServerToolRegistry.Dispose() timed out waiting for load semaphore, waiting indefinitely to ensure load completes");
_loadSemaphore.Wait();
acquired = true;
}
}
catch (ObjectDisposedException)
{
_logger.LogWarning("McpServerToolRegistry.Dispose() encountered disposed semaphore, load may have completed concurrently");
return;
}

try
{
if (_disposed)
return;

_disposed = true;
foreach (var client in _clients)
{
try
{
DisposeClient(client);
}
catch
{
}
}
_clients.Clear();
}
finally
{
if (acquired)
_loadSemaphore.Release();
}
}

private static IClientTransport CreateTransport(string serverId, McpServerConfig config)
{
var transport = config.NormalizeTransport();
return transport switch
{
"stdio" => new StdioClientTransport(new StdioClientTransportOptions
{
Command = config.Command!,
Arguments = config.Arguments ?? [],
WorkingDirectory = config.WorkingDirectory,
EnvironmentVariables = ResolveEnv(config.Environment),
Name = serverId,
}),
"http" => new HttpClientTransport(new HttpClientTransportOptions
{
Endpoint = new Uri(config.Url!),
AdditionalHeaders = ResolveHeaders(config.Headers),
Name = serverId,
}),
_ => throw new InvalidOperationException($"Unsupported MCP transport '{config.Transport}' for server '{serverId}'.")
};
}

private static Dictionary<string, string?>? ResolveEnv(Dictionary<string, string>? environment)
{
if (environment is null || environment.Count == 0)
return null;

var resolved = new Dictionary<string, string?>(StringComparer.Ordinal);
foreach (var (name, rawValue) in environment)
{
if (rawValue is null)
{
resolved[name] = null;
continue;
}
var value = SecretResolver.Resolve(rawValue);
if (value is null && rawValue.StartsWith("env:", StringComparison.Ordinal))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ResolveEnv assumes rawValue is non-null; if configuration binding yields a null dictionary value, rawValue.StartsWith(...) will throw and prevent MCP discovery. This same null-value risk exists in ResolveHeaders too.

Severity: medium

Other Locations
  • src/OpenClaw.Agent/Plugins/McpServerToolRegistry.cs:292

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

throw new InvalidOperationException($"Environment variable '{name}' references unset env var '{rawValue[4..]}'");
resolved[name] = value ?? rawValue;
}

return resolved;
}

private static Dictionary<string, string>? ResolveHeaders(Dictionary<string, string>? headers)
{
if (headers is null || headers.Count == 0)
return null;

var resolved = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
foreach (var (name, rawValue) in headers)
{
if (rawValue is null)
{
resolved[name] = string.Empty;
continue;
}
var value = SecretResolver.Resolve(rawValue);
if (value is null && rawValue.StartsWith("env:", StringComparison.Ordinal))
throw new InvalidOperationException($"Header '{name}' references unset env var '{rawValue[4..]}'");
resolved[name] = value ?? rawValue;
}

return resolved;
}

private void ThrowIfDisposed()
{
if (_disposed)
throw new ObjectDisposedException(nameof(McpServerToolRegistry));
}

private static void DisposeClient(McpClient client)
{
if (client is IAsyncDisposable asyncDisposable)
{
asyncDisposable.DisposeAsync().AsTask().GetAwaiter().GetResult();
return;
}

if (client is IDisposable disposable)
disposable.Dispose();
}

internal sealed record DiscoveredMcpTool(string PluginId, ITool Tool, string Detail);
private sealed record McpToolDescriptor(string LocalName, string RemoteName, string Description, string InputSchemaText);
}
Loading
Loading