From 359d5bc0558acf0c355e9a6151775ec70f86334a Mon Sep 17 00:00:00 2001 From: Santiago Gimeno Date: Mon, 19 Jan 2026 16:37:48 +0100 Subject: [PATCH 1/2] agents: implement gRPC retry policies Implement client-side retry policies for gRPC connections to improve resilience against transient network failures. The retry policy automatically retries failed requests on UNAVAILABLE status with exponential backoff. Retry policy configuration: - Max attempts: 5 retries - Initial backoff: 500ms - Max backoff: 5 seconds - Backoff multiplier: 2.0x - Retryable status: UNAVAILABLE The retry policy is configured via gRPC service config JSON and applies to all RPC methods. This ensures robust connectivity to the N|Solid Console even during temporary network issues or server restarts. --- agents/grpc/src/grpc_agent.cc | 11 ++++++ agents/grpc/src/grpc_client.cc | 34 +++++++++++++++++++ deps/opentelemetry-cpp/otlp-http-exporter.gyp | 4 ++- 3 files changed, 48 insertions(+), 1 deletion(-) diff --git a/agents/grpc/src/grpc_agent.cc b/agents/grpc/src/grpc_agent.cc index e6381c9988..2df0c56968 100644 --- a/agents/grpc/src/grpc_agent.cc +++ b/agents/grpc/src/grpc_agent.cc @@ -80,6 +80,12 @@ const int CONSOLE_ID_SIZE = 36; const seconds DEFAULT_GRPC_TIMEOUT = seconds{ 60 }; +// Retry policy configuration +constexpr size_t retry_max_attempts = 5; +constexpr auto retry_initial_backoff = std::chrono::milliseconds(500); +constexpr auto retry_max_backoff = std::chrono::seconds(5); +constexpr float retry_backoff_multiplier = 2.0f; + JSThreadMetrics::JSThreadMetrics(SharedEnvInst envinst): metrics_(ThreadMetrics::Create(envinst)) { } @@ -1068,6 +1074,11 @@ int GrpcAgent::config(const json& config) { } } + opts.retry_policy_max_attempts = retry_max_attempts; + opts.retry_policy_initial_backoff = retry_initial_backoff; + opts.retry_policy_max_backoff = retry_max_backoff; + opts.retry_policy_backoff_multiplier = retry_backoff_multiplier; + nsolid_service_stub_ = GrpcClient::MakeNSolidServiceStub(opts, tls_keylog_file_); diff --git a/agents/grpc/src/grpc_client.cc b/agents/grpc/src/grpc_client.cc index a340fd5785..236e1115ba 100644 --- a/agents/grpc/src/grpc_client.cc +++ b/agents/grpc/src/grpc_client.cc @@ -73,6 +73,40 @@ std::shared_ptr grpc_arguments.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); grpc_arguments.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0); + static const auto kServiceConfigJson = std::string_view{R"( + { + "methodConfig": [ + { + "name": [{}], + "retryPolicy": { + "maxAttempts": %0000000000u, + "initialBackoff": "%0000000000.1fs", + "maxBackoff": "%0000000000.1fs", + "backoffMultiplier": %0000000000.1f, + "retryableStatusCodes": [ + "UNAVAILABLE" + ] + } + } + ] + })"}; + + // Allocate string with buffer large enough to hold the formatted json config + auto service_config = std::string(kServiceConfigJson.size(), '\0'); + float initial_backoff = options.retry_policy_initial_backoff.count(); + float max_backoff = options.retry_policy_max_backoff.count(); + float backoff_multiplier = options.retry_policy_backoff_multiplier; + std::snprintf( + service_config.data(), + service_config.size(), + kServiceConfigJson.data(), + options.retry_policy_max_attempts, + std::min(std::max(initial_backoff, 0.f), 999999999.f), + std::min(std::max(max_backoff, 0.f), 999999999.f), + std::min(std::max(backoff_multiplier, 0.f), 999999999.f)); + + grpc_arguments.SetServiceConfigJSON(service_config); + return CreateCustomChannel(options.endpoint, MakeCredentials(options, tls_keylog_file), grpc_arguments); diff --git a/deps/opentelemetry-cpp/otlp-http-exporter.gyp b/deps/opentelemetry-cpp/otlp-http-exporter.gyp index a9ce0ef769..8be012662d 100644 --- a/deps/opentelemetry-cpp/otlp-http-exporter.gyp +++ b/deps/opentelemetry-cpp/otlp-http-exporter.gyp @@ -64,12 +64,13 @@ 'ENABLE_ASYNC_EXPORT', 'ENABLE_OTLP_GRPC_CREDENTIAL_PREVIEW', 'OPENTELEMETRY_STL_VERSION=2020', + 'ENABLE_OTLP_RETRY_PREVIEW', ], 'dependencies': [ '../protobuf/protobuf.gyp:protobuf', '../curl/curl.gyp:curl', '../grpc/grpc.gyp:grpc++', - '../protobuf/abseil.gyp:abseil_proto', + '../protobuf/abseil.gyp:abseil_proto', '../zlib/zlib.gyp:zlib', ], 'direct_dependent_settings': { @@ -77,6 +78,7 @@ 'ENABLE_ASYNC_EXPORT', 'ENABLE_OTLP_GRPC_CREDENTIAL_PREVIEW', 'OPENTELEMETRY_STL_VERSION=2020', + 'ENABLE_OTLP_RETRY_PREVIEW', ], 'include_dirs': [ 'api/include', From 5801d1610ce4899ec50fa5a4662c77145434ed94 Mon Sep 17 00:00:00 2001 From: Santiago Gimeno Date: Mon, 19 Jan 2026 16:38:02 +0100 Subject: [PATCH 2/2] test: add gRPC retry policy tests Add comprehensive test suite to verify gRPC retry policy behavior. Extend test infrastructure with fault injection capabilities to simulate transient failures and network issues. Test infrastructure enhancements: - Inject transient failures with configurable status codes and counts - Inject delays to simulate network latency - Track retry attempts via grpc-previous-rpc-attempts metadata - Clear all injected faults between tests Test coverage: - Successful retry after single transient UNAVAILABLE failure - Failure after exhausting max retries (5 attempts) on persistent errors - Verification of retry attempt counts via metadata headers - Multiple service types (ExportInfo, ExportMetricsCmd, ExportSpans) The fault injection system validates that the gRPC client properly retries on transient failures and respects retry limits. --- test/agents/test-grpc-retry-policies.mjs | 366 ++++++++++++++++++ test/agents/test-grpc-tracing.mjs | 4 +- test/common/nsolid-grpc-agent/index.js | 34 +- test/common/nsolid-grpc-agent/server.mjs | 146 +++++-- .../parallel/test-nsolid-grpc-heap-profile.js | 2 +- .../test-nsolid-grpc-heap-sampling.js | 2 +- test/parallel/test-nsolid-grpc-profile.js | 2 +- 7 files changed, 517 insertions(+), 39 deletions(-) create mode 100644 test/agents/test-grpc-retry-policies.mjs diff --git a/test/agents/test-grpc-retry-policies.mjs b/test/agents/test-grpc-retry-policies.mjs new file mode 100644 index 0000000000..ed05a95dad --- /dev/null +++ b/test/agents/test-grpc-retry-policies.mjs @@ -0,0 +1,366 @@ +// Flags: --expose-internals + +import { mustCall, mustSucceed } from '../common/index.mjs'; +import { GRPCServer, TestClient } from '../common/nsolid-grpc-agent/index.js'; + +import assert from 'node:assert'; + +const services = [ + { name: 'ExportInfo', trigger: (client, s, id) => s.info(id), event: 'info' }, + { name: 'ExportMetricsCmd', trigger: (client, s, id) => s.metrics(id), event: 'metrics_cmd' }, + { name: 'ExportSpans', trigger: (client, s, id) => client.trace('http'), event: 'spans' }, +]; + +const tests = []; + +tests.push({ + name: 'should retry on transient UNAVAILABLE and succeed', + test: async (getEnv) => { + return new Promise((resolve) => { + const grpcServer = new GRPCServer(); + grpcServer.start(mustSucceed(async (port) => { + const env = getEnv(port); + const client = new TestClient([], { env }); + const agentId = await client.id(); + + let completed = 0; + const total = services.length; + + for (const svc of services) { + // Inject failure for the first call + grpcServer.injectFailure(svc.name, 'UNAVAILABLE', 1); + + if (svc.name === 'ExportSpans') { + grpcServer.on(svc.event, mustCall(async (data) => { + const attemptsHeader = data.metadata['grpc-previous-rpc-attempts']; + assert(attemptsHeader && attemptsHeader[0] === '1', `Should have retried once for ${svc.name}, got ${attemptsHeader}`); + completed++; + if (completed === total) { + grpcServer.clearFaults(); + await client.shutdown(0); + grpcServer.close(); + resolve(); + } + })); + await client.trace('http'); + continue; + } + + svc.trigger(client, grpcServer, agentId).then(mustCall(async ({ data }) => { + const attemptsHeader = data.metadata['grpc-previous-rpc-attempts']; + assert(attemptsHeader && attemptsHeader[0] === '1', `Should have retried once for ${svc.name}, got ${attemptsHeader}`); + completed++; + if (completed === total) { + grpcServer.clearFaults(); + await client.shutdown(0); + grpcServer.close(); + resolve(); + } + })); + } + })); + }); + }, +}); + +tests.push({ + name: 'should fail after max retries (5) on persistent UNAVAILABLE', + test: async (getEnv) => { + return new Promise((resolve) => { + const grpcServer = new GRPCServer(); + grpcServer.start(mustSucceed(async (port) => { + const env = getEnv(port); + const client = new TestClient([], { env }); + const agentId = await client.id(); + + let completed = 0; + const total = services.length; + + for (const svc of services) { + // Inject failures for more than 5 attempts + grpcServer.injectFailure(svc.name, 'UNAVAILABLE', 20); + + if (svc.name === 'ExportSpans') { + const timeout = new Promise((resolveTimeout) => setTimeout(() => resolveTimeout('timeout'), 10000)); + const success = new Promise((resolve) => grpcServer.on(svc.event, () => resolve('success'))); + Promise.race([success, timeout]).then(mustCall(async (result) => { + if (result === 'timeout') { + // Good, failed as expected due to exhausted retries + completed++; + if (completed === total) { + grpcServer.clearFaults(); + await client.shutdown(0); + grpcServer.close(); + resolve(); + } + } else { + throw new Error(`Should have failed after retries for ${svc.name}`); + } + })); + } + + const triggerPromise = svc.trigger(client, grpcServer, agentId); + if (svc.name !== 'ExportSpans') { + const timeout = new Promise((resolveTimeout) => setTimeout(() => resolveTimeout('timeout'), 20000)); + Promise.race([triggerPromise, timeout]).then(mustCall(async (result) => { + if (result === 'timeout') { + // Good, failed as expected due to exhausted retries + completed++; + if (completed === total) { + grpcServer.clearFaults(); + await client.shutdown(0); + grpcServer.close(); + resolve(); + } + } else { + throw new Error(`Should have failed after retries for ${svc.name}`); + } + })); + } + } + })); + }); + }, +}); + +tests.push({ + name: 'should not retry on non-retryable DEADLINE_EXCEEDED', + test: async (getEnv) => { + return new Promise((resolve) => { + const grpcServer = new GRPCServer(); + grpcServer.start(mustSucceed(async (port) => { + const env = getEnv(port); + const client = new TestClient([], { env }); + const agentId = await client.id(); + + let completed = 0; + const total = services.length; + + for (const svc of services) { + grpcServer.injectFailure(svc.name, 'DEADLINE_EXCEEDED', 1); + + if (svc.name === 'ExportSpans') { + const timeout = new Promise((resolveTimeout) => setTimeout(() => resolveTimeout('timeout'), 1000)); + const success = new Promise((resolve) => grpcServer.on(svc.event, () => resolve('success'))); + Promise.race([success, timeout]).then(mustCall(async (result) => { + if (result === 'timeout') { + // Good, failed immediately without retries + completed++; + if (completed === total) { + await client.shutdown(0); + grpcServer.clearFaults(); + grpcServer.close(); + resolve(); + } + } else { + await client.shutdown(0); + throw new Error(`Should have failed immediately for ${svc.name}`); + } + })); + } + + const triggerPromise = svc.trigger(client, grpcServer, agentId); + if (svc.name !== 'ExportSpans') { + const timeout = new Promise((resolveTimeout) => setTimeout(() => resolveTimeout('timeout'), 1000)); + Promise.race([triggerPromise, timeout]).then(mustCall(async (result) => { + if (result === 'timeout') { + // Good, failed immediately without retries + completed++; + if (completed === total) { + await client.shutdown(0); + grpcServer.clearFaults(); + grpcServer.close(); + resolve(); + } + } else { + await client.shutdown(0); + throw new Error(`Should have failed immediately for ${svc.name}`); + } + })); + } + } + })); + }); + }, +}); + +tests.push({ + name: 'should enforce custom deadline from NSOLID_GRPC_DEADLINE', + test: async (getEnv) => { + return new Promise((resolve) => { + const grpcServer = new GRPCServer(); + grpcServer.start(mustSucceed(async (port) => { + // Inject 2s delay on services + for (const svc of services) { + grpcServer.injectDelay(svc.name, 2000); + } + + const env = { ...getEnv(port), NSOLID_GRPC_DEADLINE: '1' }; + const client = new TestClient([], { env }); + const agentId = await client.id(); + + let completed = 0; + const total = services.length; + + for (const svc of services) { + if (svc.name === 'ExportSpans') { + const timeout = new Promise((resolveTimeout) => setTimeout(() => resolveTimeout('timeout'), 2000)); + const success = new Promise((resolve) => grpcServer.on(svc.event, () => resolve('success'))); + Promise.race([success, timeout]).then(mustCall(async (result) => { + if (result === 'timeout') { + // Good, timed out due to deadline + completed++; + if (completed === total) { + await client.shutdown(0); + grpcServer.clearFaults(); + grpcServer.close(); + resolve(); + } + } else { + throw new Error(`Should have timed out for ${svc.name}`); + } + })); + continue; + } + + const triggerPromise = svc.trigger(client, grpcServer, agentId); + if (svc.name !== 'ExportSpans') { + const timeout = new Promise((resolveTimeout) => setTimeout(() => resolveTimeout('timeout'), 2000)); + Promise.race([triggerPromise, timeout]).then(mustCall(async (result) => { + if (result === 'timeout') { + // Good, timed out due to deadline + completed++; + if (completed === total) { + await client.shutdown(0); + grpcServer.clearFaults(); + grpcServer.close(); + resolve(); + } + } else { + await client.shutdown(0); + throw new Error(`Should have timed out for ${svc.name}`); + } + })); + } + } + })); + }); + }, +}); + +tests.push({ + name: 'should use default 10s deadline when env not set', + test: async (getEnv) => { + return new Promise((resolve) => { + const grpcServer = new GRPCServer(); + grpcServer.start(mustSucceed(async (port) => { + // Inject short delay, should succeed + for (const svc of services) { + grpcServer.injectDelay(svc.name, 500); + } + + const env = getEnv(port); + const client = new TestClient([], { env }); + const agentId = await client.id(); + + let completed = 0; + const total = services.length; + + for (const svc of services) { + if (svc.name === 'ExportSpans') { + grpcServer.on(svc.event, mustCall(async () => { + completed++; + if (completed === total) { + await client.shutdown(0); + grpcServer.clearFaults(); + grpcServer.close(); + resolve(); + } + })); + + await client.trace('http'); + continue; + } + + const promise = svc.trigger(client, grpcServer, agentId); + promise.then(mustCall(async () => { + completed++; + if (completed === total) { + await client.shutdown(0); + grpcServer.clearFaults(); + grpcServer.close(); + resolve(); + } + })); + } + })); + }); + }, +}); + +tests.push({ + name: 'should handle invalid NSOLID_GRPC_DEADLINE gracefully', + test: async (getEnv) => { + return new Promise((resolve) => { + const grpcServer = new GRPCServer(); + grpcServer.start(mustSucceed(async (port) => { + // Inject delay, should succeed with default 10s + for (const svc of services) { + grpcServer.injectDelay(svc.name, 1000); + } + + const env = { ...getEnv(port), NSOLID_GRPC_DEADLINE: 'invalid' }; + const client = new TestClient([], { env }); + const agentId = await client.id(); + + let completed = 0; + const total = services.length; + + for (const svc of services) { + if (svc.name === 'ExportSpans') { + grpcServer.on(svc.event, mustCall(async () => { + completed++; + if (completed === total) { + await client.shutdown(0); + grpcServer.clearFaults(); + grpcServer.close(); + resolve(); + } + })); + } + + const promise = svc.trigger(client, grpcServer, agentId); + if (svc.name !== 'ExportSpans') { + promise.then(mustCall(async () => { + completed++; + if (completed === total) { + await client.shutdown(0); + grpcServer.clearFaults(); + grpcServer.close(); + resolve(); + } + })); + } + } + })); + }); + }, +}); + +const testConfigs = [ + { + getEnv: (port) => ({ + NODE_DEBUG_NATIVE: 'nsolid_grpc_agent', + NSOLID_GRPC: `localhost:${port}`, + NSOLID_GRPC_INSECURE: 1, + NSOLID_TRACING_ENABLED: 1, + }), + }, +]; + +for (const testConfig of testConfigs) { + for (const { name, test } of tests) { + console.log(`[retry-policies] ${name}`); + await test(testConfig.getEnv); + } +} diff --git a/test/agents/test-grpc-tracing.mjs b/test/agents/test-grpc-tracing.mjs index 93a7f90967..ea4b6a9a1f 100644 --- a/test/agents/test-grpc-tracing.mjs +++ b/test/agents/test-grpc-tracing.mjs @@ -204,7 +204,7 @@ tests.push({ if (phase === 'done') return; - mergeResourceSpans(spans, resourceSpans); + mergeResourceSpans(spans.request, resourceSpans); if (phase === 'initial' && resourceSpans.length === 1 && @@ -273,7 +273,7 @@ tests.push({ if (phase === 'done') return; - mergeResourceSpans(spans, resourceSpans); + mergeResourceSpans(spans.request, resourceSpans); if (phase === 'initial' && resourceSpans.length === 1 && diff --git a/test/common/nsolid-grpc-agent/index.js b/test/common/nsolid-grpc-agent/index.js index d0153ae4c6..4185416e48 100644 --- a/test/common/nsolid-grpc-agent/index.js +++ b/test/common/nsolid-grpc-agent/index.js @@ -216,11 +216,13 @@ class GRPCServer extends EventEmitter { if (this.#server) { const requestId = randomUUID(); this.#server.send({ type: 'info', agentId, requestId }); - this.#server.once('message', (msg) => { - if (msg.type === 'info') { + const msgListener = (msg) => { + if (msg.type === 'info' && msg.data.msg.common.requestId === requestId) { + this.#server.off('message', msgListener); resolve({ requestId, data: msg.data }); } - }); + }; + this.#server.on('message', msgListener); } else { resolve(null); } @@ -232,11 +234,13 @@ class GRPCServer extends EventEmitter { if (this.#server) { const requestId = randomUUID(); this.#server.send({ type: 'metrics', agentId, requestId }); - this.#server.on('message', (msg) => { - if (msg.type === 'metrics_cmd') { + const msgListener = (msg) => { + if (msg.type === 'metrics_cmd' && msg.data.msg.common.requestId === requestId) { + this.#server.off('message', msgListener); resolve({ requestId, data: msg.data }); } - }); + }; + this.#server.on('message', msgListener); } else { resolve(null); } @@ -312,6 +316,24 @@ class GRPCServer extends EventEmitter { }); } + injectFailure(service, status = 'UNAVAILABLE', count = 1) { + if (this.#server) { + this.#server.send({ type: 'inject_failure', service, status, count }); + } + } + + injectDelay(service, delayMs = 0) { + if (this.#server) { + this.#server.send({ type: 'inject_delay', service, delay: delayMs }); + } + } + + clearFaults() { + if (this.#server) { + this.#server.send({ type: 'clear_faults' }); + } + } + close() { this.#server.send({ type: 'close' }); } diff --git a/test/common/nsolid-grpc-agent/server.mjs b/test/common/nsolid-grpc-agent/server.mjs index 1913e01b04..3bb26cb341 100644 --- a/test/common/nsolid-grpc-agent/server.mjs +++ b/test/common/nsolid-grpc-agent/server.mjs @@ -1,5 +1,6 @@ import assert from 'node:assert'; import path from 'node:path'; +import { setTimeout } from 'node:timers/promises'; import { parseArgs } from 'node:util'; import grpc from '@grpc/grpc-js'; import protoLoader from '@grpc/proto-loader'; @@ -26,6 +27,44 @@ const includeDirs = [path.resolve(import.meta.dirname, const commandCallMap = new Map(); +// Fault injection state +const faultInjections = new Map(); // service -> { status, remaining } +const delayInjections = new Map(); // service -> delayMs + +// Helper to check and inject fault for unary calls +function checkAndInjectFault(serviceName, callback) { + const fault = faultInjections.get(serviceName); + if (fault && fault.remaining > 0) { + fault.remaining--; + const status = grpc.status[fault.status] || grpc.status.UNAVAILABLE; + console.log(`Injecting fault for ${serviceName}`, { code: status, message: `Injected fault: ${fault.status}` }); + callback({ code: status, message: `Injected fault: ${fault.status}` }); + return true; + } + return false; +} + +// Helper to check and inject fault for streaming calls +function checkAndInjectFaultStreaming(serviceName, call) { + const fault = faultInjections.get(serviceName); + if (fault && fault.remaining > 0) { + fault.remaining--; + const status = grpc.status[fault.status] || grpc.status.UNAVAILABLE; + call.destroy({ code: status, message: `Injected fault: ${fault.status}` }); + return true; + } + return false; +} + +// Helper to inject delay for unary calls +async function injectDelay(serviceName, callback) { + const delay = delayInjections.get(serviceName); + if (delay) { + await setTimeout(delay); + } + callback(); +} + // Create a local server to receive data from async function startServer(cb) { const server = new grpc.Server(); @@ -44,8 +83,11 @@ async function startServer(cb) { Export: (data, callback) => { console.dir(data.request, { depth: null }); // console.log('Logs received'); - callback(null, { message: 'Logs received' }); - cb(null, 'logs', data.request); + if (checkAndInjectFault('ExportLogs', callback)) return; + injectDelay('ExportLogs', () => { + callback(null, { message: 'Logs received' }); + cb(null, 'logs', data.request); + }); }, }); @@ -55,8 +97,12 @@ async function startServer(cb) { Export: (data, callback) => { // console.dir(data, { depth: null }); console.log('Metrics received'); - callback(null, { message: 'Metrics received' }); - cb(null, 'metrics', data.request); + if (checkAndInjectFault('ExportMetrics', callback)) return; + injectDelay('ExportMetrics', () => { + callback(null, { message: 'Metrics received' }); + console.dir(data.metadata, { depth: null }); + cb(null, 'metrics', { request: data.request, metadata: data.metadata }); + }); }, }); @@ -64,8 +110,11 @@ async function startServer(cb) { const packageObjectTrace = grpc.loadPackageDefinition(packageDefinitionTrace); server.addService(packageObjectTrace.opentelemetry.proto.collector.trace.v1.TraceService.service, { Export: (data, callback) => { - callback(null, { message: 'Trace received' }); - cb(null, 'spans', data.request); + if (checkAndInjectFault('ExportSpans', callback)) return; + injectDelay('ExportSpans', () => { + callback(null, { message: 'Trace received' }); + cb(null, 'spans', { request: data.request, metadata: data.metadata }); + }); }, }); @@ -91,6 +140,7 @@ async function startServer(cb) { ExportAsset: async (call) => { console.log('ExportAsset'); console.dir(call.metadata, { depth: null }); + if (checkAndInjectFaultStreaming('ExportAsset', call)) return; const asset = { common: null, threadId: null, @@ -122,19 +172,26 @@ async function startServer(cb) { // Extract data from the request object console.dir(call.request, { depth: null }); console.dir(call.metadata, { depth: null }); - callback(null, {}); - process.send({ type: 'loop_blocked', - data: { msg: call.request, metadata: call.metadata } }); + if (checkAndInjectFault('ExportBlockedLoop', callback)) return; + injectDelay('ExportBlockedLoop', () => { + callback(null, {}); + process.send({ type: 'loop_blocked', + data: { msg: call.request, metadata: call.metadata } }); + }); }, ExportCommandError: (call, callback) => { // Extract data from the request object console.dir(call.request, { depth: null }); console.dir(call.metadata, { depth: null }); - callback(null, {}); + if (checkAndInjectFault('ExportCommandError', callback)) return; + injectDelay('ExportCommandError', () => { + callback(null, {}); + }); }, ExportContinuousProfile: async (call) => { console.log('ExportContinuousProfile'); console.dir(call.metadata, { depth: null }); + if (checkAndInjectFaultStreaming('ExportContinuousProfile', call)) return; const asset = { common: null, threadId: null, @@ -169,57 +226,81 @@ async function startServer(cb) { // Extract data from the request object console.dir(call.request, { depth: null }); console.dir(call.metadata, { depth: null }); - callback(null, {}); - process.send({ type: 'exit', data: { msg: call.request, metadata: call.metadata } }); + if (checkAndInjectFault('ExportExit', callback)) return; + injectDelay('ExportExit', () => { + callback(null, {}); + process.send({ type: 'exit', data: { msg: call.request, metadata: call.metadata } }); + }); }, ExportInfo: (call, callback) => { // Extract data from the request object console.dir(call.request, { depth: null }); console.dir(call.metadata, { depth: null }); - callback(null, {}); - process.send({ type: 'info', data: { msg: call.request, metadata: call.metadata } }); + if (checkAndInjectFault('ExportInfo', callback)) return; + injectDelay('ExportInfo', () => { + callback(null, {}); + process.send({ type: 'info', data: { msg: call.request, metadata: call.metadata } }); + }); }, ExportMetrics: (call, callback) => { // Extract data from the request object - console.dir(call.request, { depth: null }); - console.dir(call.metadata, { depth: null }); - callback(null, {}); - process.send({ type: 'metrics_cmd', data: { msg: call.request, metadata: call.metadata } }); + // console.dir(call.request, { depth: null }); + // console.dir(call.metadata, { depth: null }); + if (checkAndInjectFault('ExportMetricsCmd', callback)) return; + injectDelay('ExportMetricsCmd', () => { + callback(null, {}); + process.send({ type: 'metrics_cmd', data: { msg: call.request, metadata: call.metadata } }); + }); }, ExportPackages: (call, callback) => { // Extract data from the request object console.dir(call.request, { depth: null }); console.dir(call.metadata, { depth: null }); - callback(null, {}); - process.send({ type: 'packages', data: { msg: call.request, metadata: call.metadata } }); + if (checkAndInjectFault('ExportPackages', callback)) return; + injectDelay('ExportPackages', () => { + callback(null, {}); + process.send({ type: 'packages', data: { msg: call.request, metadata: call.metadata } }); + }); }, ExportReconfigure: (call, callback) => { // Extract data from the request object console.dir(call.request, { depth: null }); console.dir(call.metadata, { depth: null }); - callback(null, {}); - process.send({ type: 'reconfigure', data: { msg: call.request, metadata: call.metadata } }); + if (checkAndInjectFault('ExportReconfigure', callback)) return; + injectDelay('ExportReconfigure', () => { + callback(null, {}); + process.send({ type: 'reconfigure', data: { msg: call.request, metadata: call.metadata } }); + }); }, ExportSourceCode: (call, callback) => { // Extract data from the request object console.dir(call.request, { depth: null }); console.dir(call.metadata, { depth: null }); - callback(null, {}); - process.send({ type: 'source_code', data: { msg: call.request, metadata: call.metadata } }); + if (checkAndInjectFault('ExportSourceCode', callback)) return; + injectDelay('ExportSourceCode', () => { + callback(null, {}); + process.send({ type: 'source_code', data: { msg: call.request, metadata: call.metadata } }); + }); }, ExportStartupTimes: (call, callback) => { // Extract data from the request object console.dir(call.request, { depth: null }); console.dir(call.metadata, { depth: null }); - callback(null, {}); - process.send({ type: 'startup_times', data: { msg: call.request, metadata: call.metadata } }); + if (checkAndInjectFault('ExportStartupTimes', callback)) return; + injectDelay('ExportStartupTimes', () => { + callback(null, {}); + process.send({ type: 'startup_times', data: { msg: call.request, metadata: call.metadata } }); + }); }, ExportUnblockedLoop: (call, callback) => { // Extract data from the request object console.dir(call.request, { depth: null }); console.dir(call.metadata, { depth: null }); - callback(null, {}); - process.send({ type: 'loop_unblocked', data: { msg: call.request, metadata: call.metadata } }); + if (checkAndInjectFault('ExportUnblockedLoop', callback)) return; + injectDelay('ExportUnblockedLoop', () => { + callback(null, {}); + process.send({ type: 'loop_unblocked', data: { msg: call.request, metadata: call.metadata } }); + }); }, }); @@ -272,6 +353,15 @@ process.on('message', (message) => { sendSourceCode(message.agentId, message.requestId, message.options); } else if (message.type === 'startup_times') { sendStartupTimes(message.agentId, message.requestId); + } else if (message.type === 'inject_failure') { + // Inject failure for a service: { service, status, count } + faultInjections.set(message.service, { status: message.status || 'UNAVAILABLE', remaining: message.count || 1 }); + } else if (message.type === 'inject_delay') { + // Inject delay for a service: { service, delay } + delayInjections.set(message.service, message.delay || 0); + } else if (message.type === 'clear_faults') { + faultInjections.clear(); + delayInjections.clear(); } else if (message.type === 'close') { server.forceShutdown(); process.exit(0); diff --git a/test/parallel/test-nsolid-grpc-heap-profile.js b/test/parallel/test-nsolid-grpc-heap-profile.js index 8edd26fe44..09a4b6ab84 100644 --- a/test/parallel/test-nsolid-grpc-heap-profile.js +++ b/test/parallel/test-nsolid-grpc-heap-profile.js @@ -125,5 +125,5 @@ setTimeout(() => { })); })); })); - }, 100); + }, 2 * 5000); // To make sure all the retries are done }, 100); diff --git a/test/parallel/test-nsolid-grpc-heap-sampling.js b/test/parallel/test-nsolid-grpc-heap-sampling.js index 913ecc65cb..486ce88367 100644 --- a/test/parallel/test-nsolid-grpc-heap-sampling.js +++ b/test/parallel/test-nsolid-grpc-heap-sampling.js @@ -169,5 +169,5 @@ setTimeout(() => { })); }, 100); })); - }, 500); + }, 2 * 5000); // To make sure all the retries are done }, 100); diff --git a/test/parallel/test-nsolid-grpc-profile.js b/test/parallel/test-nsolid-grpc-profile.js index 27c3954140..aa3545911e 100644 --- a/test/parallel/test-nsolid-grpc-profile.js +++ b/test/parallel/test-nsolid-grpc-profile.js @@ -114,5 +114,5 @@ setTimeout(() => { })); }, 100); })); - }, 500); + }, 2 * 5000); // To make sure all the retries are done }, 100);