diff --git a/.gitignore b/.gitignore index 3f5b5b7658..b958f1b05a 100644 --- a/.gitignore +++ b/.gitignore @@ -181,3 +181,6 @@ compile_commands.json # Ignore asserts-cpp and nlohmann paths in src/ src/asserts-cpp/ src/nlohmann/ + +test/integrations/express/*/node_modules/ +test/integrations/fastify/*/node_modules/ diff --git a/Makefile b/Makefile index 40e19544f1..fe5ed0eb35 100644 --- a/Makefile +++ b/Makefile @@ -214,6 +214,7 @@ clean: ## Remove build artifacts. $(MAKE) testclean $(MAKE) test-addons-clean $(MAKE) test-agents-prereqs-clean + $(MAKE) test-integrations-prereqs-clean $(MAKE) bench-addons-clean .PHONY: testclean @@ -234,6 +235,8 @@ distclean: ## Remove all build and test artifacts. $(RM) -r deps/icu $(RM) -r deps/icu4c*.tgz deps/icu4c*.zip deps/icu-tmp $(RM) $(BINARYTAR).* $(TARBALL).* + $(MAKE) test-agents-prereqs-clean + $(MAKE) test-integrations-prereqs-clean .PHONY: check check: test @@ -318,7 +321,7 @@ v8: ## Build deps/v8. tools/make-v8.sh $(V8_ARCH).$(BUILDTYPE_LOWER) $(V8_BUILD_OPTIONS) .PHONY: jstest -jstest: build-addons build-js-native-api-tests build-node-api-tests build-sqlite-tests test-agents-prereqs ## Runs addon tests and JS tests. +jstest: build-addons build-js-native-api-tests build-node-api-tests build-sqlite-tests test-agents-prereqs test-integrations-prereqs ## Runs addon tests and JS tests. NSOLID_DELAY_INIT="" \ $(PYTHON) tools/test.py $(PARALLEL_ARGS) --mode=$(BUILDTYPE_LOWER) \ $(TEST_CI_ARGS) \ @@ -596,7 +599,7 @@ test-ci-native: | benchmark/napi/.buildstamp test/addons/.buildstamp test/js-nat .PHONY: test-ci-js # This target should not use a native compiler at all # Related CI job: node-test-commit-arm-fanned -test-ci-js: | clear-stalled ## Build and test JavaScript with building anything else. +test-ci-js: | clear-stalled test-agents-prereqs test-integrations-prereqs ## Build and test JavaScript with building anything else. $(PYTHON) tools/test.py $(PARALLEL_ARGS) -p tap --logfile test.tap \ --mode=$(BUILDTYPE_LOWER) --flaky-tests=$(FLAKY_TESTS) \ --skip-tests=$(CI_SKIP_TESTS) \ @@ -611,7 +614,7 @@ test-ci-js: | clear-stalled ## Build and test JavaScript with building anything .PHONY: test-ci # Related CI jobs: most CI tests, excluding node-test-commit-arm-fanned test-ci: LOGLEVEL := info ## Build and test everything (CI). -test-ci: | clear-stalled bench-addons-build build-addons build-js-native-api-tests build-node-api-tests build-sqlite-tests doc-only test-agents-prereqs +test-ci: | clear-stalled bench-addons-build build-addons build-js-native-api-tests build-node-api-tests build-sqlite-tests doc-only test-agents-prereqs test-integrations-prereqs out/Release/cctest --gtest_output=xml:out/junit/cctest.xml $(PYTHON) tools/test.py $(PARALLEL_ARGS) -p tap --logfile test.tap \ --mode=$(BUILDTYPE_LOWER) --flaky-tests=$(FLAKY_TESTS) \ @@ -1695,6 +1698,18 @@ test-agents-prereqs-clean: $(RM) -r test/common/nsolid-zmq-agent/node_modules $(RM) -r test/common/nsolid-otlp-agent/node_modules +.PHONY: test-integrations-prereqs +test-integrations-prereqs: + env npm_config_nodedir=$(PWD) $(NODE) ./deps/npm install express@4 --prefix test/integrations/express/v4 --no-save --no-package-lock + env npm_config_nodedir=$(PWD) $(NODE) ./deps/npm install express@5 --prefix test/integrations/express/v5 --no-save --no-package-lock + env npm_config_nodedir=$(PWD) $(NODE) ./deps/npm install fastify@5 --prefix test/integrations/fastify/v5 --no-save --no-package-lock + +.PHONY: test-integrations-prereqs-clean +test-integrations-prereqs-clean: + $(RM) -r test/integrations/express/v4/node_modules + $(RM) -r test/integrations/express/v5/node_modules + $(RM) -r test/integrations/fastify/v5/node_modules + HAS_DOCKER ?= $(shell command -v docker > /dev/null 2>&1; [ $$? -eq 0 ] && echo 1 || echo 0) .PHONY: gen-openssl diff --git a/agents/grpc/src/grpc_agent.cc b/agents/grpc/src/grpc_agent.cc index e6381c9988..03922e2cbf 100644 --- a/agents/grpc/src/grpc_agent.cc +++ b/agents/grpc/src/grpc_agent.cc @@ -61,7 +61,7 @@ namespace node { namespace nsolid { namespace grpc { -using ThreadMetricsMap = std::map; +using CachedThreadMetricsMap = std::map; constexpr uint64_t span_timer_interval = 1000; constexpr size_t span_msg_q_min_size = 1000; @@ -250,7 +250,7 @@ void PopulateInfoEvent(grpcagent::InfoEvent* info_event, void PopulateMetricsEvent(grpcagent::MetricsEvent* metrics_event, const ProcessMetrics::MetricsStor& proc_metrics, - const ThreadMetricsMap& env_metrics, + const CachedThreadMetricsMap& env_metrics, const char* req_id) { // Fill in the fields of the MetricsResponse. PopulateCommon(metrics_event->mutable_common(), "metrics", req_id); @@ -261,8 +261,18 @@ void PopulateMetricsEvent(grpcagent::MetricsEvent* metrics_event, // As this is the cached we're sending, we pass the same value for prev_stor. otlp::fill_proc_metrics(metrics, proc_metrics, proc_metrics, false); - for (const auto& [env_id, env_metrics_stor] : env_metrics) { + for (const auto& [env_id, cached_metrics] : env_metrics) { + const auto& env_metrics_stor = cached_metrics.stor; + otlp::fill_env_metrics(metrics, env_metrics_stor, false); + // Add exponential histogram metrics for HTTP latency. + otlp::fill_http_histograms( + metrics, + env_metrics_stor, + cached_metrics.http_client_points, + cached_metrics.http_server_points, + false, + cached_metrics.hist_start_ts_ms); } data.scope_metric_data_ = @@ -865,14 +875,17 @@ void GrpcAgent::env_deletion_cb_(SharedEnvInst envinst, EnvInst::Scope scp(envinst); if (scp.Success()) { bool creation = std::get<1>(tup); + uint64_t thread_id = GetThreadId(envinst); if (creation) { auto pair = agent->env_metrics_map_.emplace( std::piecewise_construct, - std::forward_as_tuple(GetThreadId(envinst)), + std::forward_as_tuple(thread_id), std::forward_as_tuple(envinst)); ASSERT(pair.second); } else { - agent->env_metrics_map_.erase(GetThreadId(envinst)); + agent->env_metrics_map_.erase(thread_id); + agent->thr_metrics_hist_prev_end_ts_ms_.erase(thread_id); + agent->thr_metrics_cache_.erase(thread_id); } } } @@ -939,10 +952,35 @@ void GrpcAgent::env_deletion_cb_(SharedEnvInst envinst, data.resource_ = otlp::GetResource(); std::vector metrics; - ThreadMetricsStor stor; - while (agent->thr_metrics_msg_q_.dequeue(stor)) { + ExtMetricsStor ext_stor; + while (agent->thr_metrics_msg_q_.dequeue(ext_stor)) { + auto& stor = ext_stor.stor; otlp::fill_env_metrics(metrics, stor, false); - agent->thr_metrics_cache_.insert_or_assign(stor.thread_id, std::move(stor)); + uint64_t thread_id = stor.thread_id; + uint64_t start_ts_ms = 0; + auto prev_it = agent->thr_metrics_hist_prev_end_ts_ms_.find(thread_id); + if (prev_it != agent->thr_metrics_hist_prev_end_ts_ms_.end()) { + start_ts_ms = prev_it->second; + } + // Add exponential histogram metrics for HTTP latency. + otlp::fill_http_histograms( + metrics, + stor, + ext_stor.http_client_points, + ext_stor.http_server_points, + false, + start_ts_ms); + agent->thr_metrics_hist_prev_end_ts_ms_.insert_or_assign( + thread_id, + stor.timestamp); + agent->thr_metrics_cache_.insert_or_assign( + thread_id, + CachedThreadMetrics{ + std::move(stor), + start_ts_ms, + ext_stor.http_client_points, + ext_stor.http_server_points + }); } data.scope_metric_data_ = @@ -991,8 +1029,18 @@ void GrpcAgent::env_deletion_cb_(SharedEnvInst envinst, return; } - if (agent->thr_metrics_msg_q_.enqueue(metrics->Get()) == 1) { - ASSERT_EQ(0, uv_async_send(&agent->metrics_msg_)); + uint64_t thread_id = metrics->thread_id(); + auto envinst_sp = EnvInst::GetInst(thread_id); + if (envinst_sp != nullptr) { + ExtMetricsStor ext_stor{ + metrics->Get(), + envinst_sp->http_client_histogram_points(), + envinst_sp->http_server_histogram_points() + }; + + if (agent->thr_metrics_msg_q_.enqueue(std::move(ext_stor)) == 1) { + ASSERT_EQ(0, uv_async_send(&agent->metrics_msg_)); + } } } @@ -2080,6 +2128,7 @@ void GrpcAgent::setup_blocked_loop_hooks() { int GrpcAgent::setup_metrics_timer(uint64_t period) { if (period == 0) { + thr_metrics_hist_prev_end_ts_ms_.clear(); return metrics_timer_.stop(); } diff --git a/agents/grpc/src/grpc_agent.h b/agents/grpc/src/grpc_agent.h index 690f8e162c..f8bfc785ab 100644 --- a/agents/grpc/src/grpc_agent.h +++ b/agents/grpc/src/grpc_agent.h @@ -1,13 +1,16 @@ #ifndef AGENTS_GRPC_SRC_GRPC_AGENT_H_ #define AGENTS_GRPC_SRC_GRPC_AGENT_H_ -#include -#include -#include +#include "nsolid.h" +#include "nsolid/nsolid_metrics_types.h" +#include "nsolid/async_ts_queue.h" +#include "nsolid/thread_safe.h" #include +#include #include "grpcpp/grpcpp.h" #include "./proto/nsolid_service.grpc.pb.h" #include "opentelemetry/version.h" +#include "opentelemetry/sdk/metrics/data/metric_data.h" #include "opentelemetry/sdk/trace/recordable.h" #include "../../src/profile_collector.h" #include "asset_stream.h" @@ -50,6 +53,19 @@ using UniqRecordables = std::vector; using SharedGrpcAgent = std::shared_ptr; using WeakGrpcAgent = std::weak_ptr; +struct ExtMetricsStor { + ThreadMetrics::MetricsStor stor; + SharedPointDataAttributes http_client_points; + SharedPointDataAttributes http_server_points; +}; + +struct CachedThreadMetrics { + ThreadMetrics::MetricsStor stor; + uint64_t hist_start_ts_ms = 0; + SharedPointDataAttributes http_client_points; + SharedPointDataAttributes http_server_points; +}; + struct JSThreadMetrics { explicit JSThreadMetrics(SharedEnvInst envinst); SharedThreadMetrics metrics_; @@ -324,11 +340,12 @@ class GrpcAgent: public std::enable_shared_from_this, ProcessMetrics::MetricsStor proc_prev_stor_; std::map env_metrics_map_; nsuv::ns_async metrics_msg_; - TSQueue thr_metrics_msg_q_; + TSQueue thr_metrics_msg_q_; nsuv::ns_timer metrics_timer_; std::unique_ptr metrics_exporter_; - std::map thr_metrics_cache_; + std::map thr_metrics_cache_; + std::map thr_metrics_hist_prev_end_ts_ms_; // For the Configuration API nsuv::ns_async config_msg_; diff --git a/agents/otlp/src/otlp_common.cc b/agents/otlp/src/otlp_common.cc index de303192ef..9069dc007d 100644 --- a/agents/otlp/src/otlp_common.cc +++ b/agents/otlp/src/otlp_common.cc @@ -8,6 +8,8 @@ #include "opentelemetry/semconv/incubating/process_attributes.h" #include "opentelemetry/semconv/incubating/service_attributes.h" #include "opentelemetry/semconv/incubating/thread_attributes.h" +#include "opentelemetry/metrics/sync_instruments.h" +#include "opentelemetry/semconv/http_metrics.h" #include "opentelemetry/sdk/instrumentationscope/instrumentation_scope.h" #include "opentelemetry/sdk/logs/recordable.h" #include "opentelemetry/sdk/trace/recordable.h" @@ -28,6 +30,7 @@ using opentelemetry::sdk::instrumentationscope::InstrumentationScope; using LogsRecordable = opentelemetry::sdk::logs::Recordable; using opentelemetry::sdk::common::OwnedAttributeType; using opentelemetry::sdk::metrics::AggregationTemporality; +using opentelemetry::sdk::metrics::Base2ExponentialHistogramPointData; using opentelemetry::sdk::metrics::MetricData; using opentelemetry::sdk::metrics::InstrumentDescriptor; using opentelemetry::sdk::metrics::InstrumentType; @@ -45,6 +48,10 @@ using opentelemetry::trace::SpanKind; using opentelemetry::trace::TraceFlags; using opentelemetry::trace::TraceId; using opentelemetry::trace::propagation::detail::HexToBinary; +using opentelemetry::semconv::http::kMetricHttpClientRequestDuration; +using opentelemetry::semconv::http::kMetricHttpServerRequestDuration; +using opentelemetry::semconv::http::unitMetricHttpClientRequestDuration; +using opentelemetry::semconv::http::unitMetricHttpServerRequestDuration; using opentelemetry::semconv::process::kProcessOwner; using opentelemetry::semconv::service::kServiceName; using opentelemetry::semconv::service::kServiceInstanceId; @@ -361,6 +368,63 @@ NSOLID_ENV_METRICS_NUMBERS(V) attrs); } +void fill_http_histograms( + std::vector& metrics, // NOLINT(runtime/references) + const ThreadMetrics::MetricsStor& stor, + SharedPointDataAttributes http_client_points, + SharedPointDataAttributes http_server_points, + bool use_snake_case, + uint64_t start_timestamp_ms) { + time_point end{ + duration_cast( + milliseconds(static_cast(stor.timestamp)))}; + time_point start = process_start; + if (start_timestamp_ms != 0) { + start = time_point{ + duration_cast(milliseconds(start_timestamp_ms))}; + if (start > end) { + start = end; + } + } + + // Merge thread-level attributes into each point and emit one MetricData + // per histogram type containing all per-attribute-combo points. + auto emit = [&metrics, &stor, &start, &end]( + const char* metric_name, + const char* unit, + SharedPointDataAttributes points) { + if (!points || points->empty()) return; + std::vector enriched; + enriched.reserve(points->size()); + for (const auto& pt : *points) { + PointAttributes attrs = pt.attributes; + attrs.insert({ kThreadId, static_cast(stor.thread_id) }); + attrs.insert({ kThreadName, stor.thread_name }); + enriched.push_back({ std::move(attrs), pt.point_data }); + } + MetricData metric_data{ + InstrumentDescriptor{ + metric_name, + "", + unit, + InstrumentType::kHistogram, + InstrumentValueType::kDouble }, + AggregationTemporality::kDelta, + SystemTimestamp{ start }, + SystemTimestamp{ end }, + std::move(enriched) + }; + metrics.push_back(std::move(metric_data)); + }; + + emit(kMetricHttpClientRequestDuration, + unitMetricHttpClientRequestDuration, + http_client_points); + emit(kMetricHttpServerRequestDuration, + unitMetricHttpServerRequestDuration, + http_server_points); +} + void fill_log_recordable(LogsRecordable* recordable, const LogWriteInfo& info) { recordable->SetBody(info.msg); diff --git a/agents/otlp/src/otlp_common.h b/agents/otlp/src/otlp_common.h index 8c8bd8cdc6..5940abb4fc 100644 --- a/agents/otlp/src/otlp_common.h +++ b/agents/otlp/src/otlp_common.h @@ -2,7 +2,9 @@ #define AGENTS_OTLP_SRC_OTLP_COMMON_H_ #include "nsolid.h" +#include "nsolid/nsolid_metrics_types.h" #include "opentelemetry/sdk/metrics/data/metric_data.h" +#include "opentelemetry/sdk/metrics/data/point_data.h" #include "opentelemetry/sdk/resource/resource.h" // Class pre-declaration @@ -56,6 +58,14 @@ void fill_env_metrics(std::vector&, const ThreadMetrics::MetricsStor& stor, bool use_snake_case = true); +void fill_http_histograms( + std::vector&, + const ThreadMetrics::MetricsStor& stor, + SharedPointDataAttributes http_client_points, + SharedPointDataAttributes http_server_points, + bool use_snake_case = true, + uint64_t start_timestamp_ms = 0); + void fill_log_recordable(OPENTELEMETRY_NAMESPACE::sdk::logs::Recordable*, const LogWriteInfo&); diff --git a/deps/opentelemetry-cpp/otlp-http-exporter.gyp b/deps/opentelemetry-cpp/otlp-http-exporter.gyp index a9ce0ef769..0b9cc2bc0a 100644 --- a/deps/opentelemetry-cpp/otlp-http-exporter.gyp +++ b/deps/opentelemetry-cpp/otlp-http-exporter.gyp @@ -35,6 +35,8 @@ 'sdk/src/common/global_log_handler.cc', 'sdk/src/logs/exporter.cc', 'sdk/src/logs/readable_log_record.cc', + 'sdk/src/metrics/aggregation/base2_exponential_histogram_aggregation.cc', + 'sdk/src/metrics/aggregation/base2_exponential_histogram_indexer.cc', 'sdk/src/metrics/data/circular_buffer.cc', 'sdk/src/resource/resource.cc', 'sdk/src/resource/resource_detector.cc', diff --git a/lib/_http_client.js b/lib/_http_client.js index cf1013a04f..76469721de 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -43,6 +43,13 @@ const { nsolid_consts, } = nsolidApi; +const { + kHttpMethodMap, + kHttpMethodOther, + kHttpVersionMap, + kHttpVersionOther, +} = require('internal/nsolid_http_consts'); + const net = require('net'); const assert = require('internal/assert'); const { @@ -856,7 +863,13 @@ function parserOnIncomingClient(res, shouldKeepAlive) { } nsolid_counts[kHttpClientCount]++; - nsolidApi.pushClientBucket(now() - req[nsolid_tracer_s]); + nsolidApi.pushClientBucket( + now() - req[nsolid_tracer_s], + kHttpMethodMap[req.method] ?? kHttpMethodOther, + res.statusCode, + req.host || '', + req.socket?.remotePort || 0, + kHttpVersionMap[res.httpVersion] ?? kHttpVersionOther); if (req[kClientRequestStatistics] && hasObserver('http')) { stopPerf(req, kClientRequestStatistics, { diff --git a/lib/_http_server.js b/lib/_http_server.js index d8edfe1c6d..fe486d6b67 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -40,8 +40,18 @@ const { nsolid_tracer_s, nsolid_span_id_s, nsolid_consts, + nsolid_route_s, } = internalBinding('nsolid_api'); +const { + kHttpMethodMap, + kHttpMethodOther, + kHttpVersionMap, + kHttpVersionOther, + kHttpSchemeHttp, + kHttpSchemeHttps, +} = require('internal/nsolid_http_consts'); + const net = require('net'); const EE = require('events'); const assert = require('internal/assert'); @@ -309,7 +319,18 @@ function pushEndTraceData(res) { nsolid_counts[kHttpServerAbortCount]++; } - binding.pushServerBucket(now() - res[nsolid_tracer_s]); + const req = res.req; + // Extract route template from unified nsolid_route_s symbol + // Both Express (via diagnostic channel) and Fastify (via diagnostic channel) set this + const route = req?.[nsolid_route_s] || ''; + const statusCode = res._headerSent ? res.statusCode : 0; + binding.pushServerBucket( + now() - res[nsolid_tracer_s], + kHttpMethodMap[req?.method] ?? kHttpMethodOther, + statusCode, + req?.socket?.encrypted ? kHttpSchemeHttps : kHttpSchemeHttp, + kHttpVersionMap[req?.httpVersion] ?? kHttpVersionOther, + route); const span = res[nsolid_span_id_s]; if (span && generateSpan(kSpanHttpServer)) { if (res._headerSent) { diff --git a/lib/internal/nsolid_diag.js b/lib/internal/nsolid_diag.js index 5563216641..324c067797 100644 --- a/lib/internal/nsolid_diag.js +++ b/lib/internal/nsolid_diag.js @@ -1,11 +1,18 @@ 'use strict'; +const { + Number, + SafeWeakMap, +} = primordials; + const nsolidApi = internalBinding('nsolid_api'); const { nsolid_counts, + nsolid_net_prot_s, nsolid_span_id_s, nsolid_tracer_s, nsolid_consts, + nsolid_route_s, } = nsolidApi; const { now } = require('internal/perf/utils'); @@ -20,6 +27,8 @@ const { nsolidTracer, } = require('internal/nsolid_trace'); +const { URL } = require('internal/url'); + const dc = require('diagnostics_channel'); const { @@ -35,10 +44,22 @@ const { kSpanHttpStatusCode, } = nsolid_consts; +const { + kHttpMethodMap, + kHttpMethodOther, + kHttpVersion11, + kHttpVersion2, + kHttpSchemeHttp, + kHttpSchemeHttps, +} = require('internal/nsolid_http_consts'); + const undiciFetch = dc.tracingChannel('undici:fetch'); const http2Client = dc.tracingChannel('http2.client'); const http2Server = dc.tracingChannel('http2.server'); +// WeakMap to store HTTP version per socket (undici client connections) +const socketVersionMap = new SafeWeakMap(); + // To lazy load the http2 constants let http2Constants; @@ -133,16 +154,26 @@ nsolidTracer.on('flagsUpdated', () => { } }); -dc.subscribe('undici:client:connected', ({ connectParams }) => { +dc.subscribe('undici:client:connected', ({ connectParams, socket }) => { const api = getApi(); const span = api.trace.getSpan(api.context.active()); + const version = connectParams.version === 'h1' ? '1.1' : '2'; + const versionConst = + connectParams.version === 'h1' ? kHttpVersion11 : kHttpVersion2; + // Store the canonical enum value on the socket for later retrieval in + // sendHeaders and histogram attribution. + socketVersionMap.set(socket, versionConst); if (span && span.type === kSpanHttpClient) { - const version = connectParams.version === 'h1' ? '1.1' : '2'; span._pushSpanDataString(kSpanHttpProtocolVersion, version); return api.context.active(); } }); +dc.subscribe('undici:client:sendHeaders', ({ request, socket }) => { + const versionConst = socketVersionMap.get(socket); + request[nsolid_net_prot_s] = versionConst; +}); + dc.subscribe('undici:request:create', ({ request }) => { request[nsolid_tracer_s] = now(); const api = getApi(); @@ -154,7 +185,24 @@ dc.subscribe('undici:request:create', ({ request }) => { dc.subscribe('undici:request:headers', ({ request, response }) => { nsolid_counts[kHttpClientCount]++; - nsolidApi.pushClientBucket(now() - request[nsolid_tracer_s]); + let host = ''; + let port = 0; + if (request.origin) { + try { + const url = new URL(request.origin); + host = url.hostname; + port = Number(url.port) || (url.protocol === 'https:' ? 443 : 80); + } catch { + // Ignore malformed origin + } + } + nsolidApi.pushClientBucket( + now() - request[nsolid_tracer_s], + kHttpMethodMap[request.method] ?? kHttpMethodOther, + response.statusCode, + host, + port, + request[nsolid_net_prot_s]); if (generateSpan(kSpanHttpClient)) { const span = request[nsolid_span_id_s]; if (span) { @@ -189,9 +237,12 @@ dc.subscribe('undici:request:error', ({ request, error }) => { }); dc.subscribe('http2.client.stream.created', ({ stream }) => { + http2Constants ||= require('internal/http2/core').constants; stream[nsolid_tracer_s] = { start: now(), response: false, + method: stream.sentHeaders?.[http2Constants.HTTP2_HEADER_METHOD] || '', + status: 0, }; if (generateSpan(kSpanHttpClient)) { @@ -206,6 +257,8 @@ dc.subscribe('http2.client.stream.created', ({ stream }) => { dc.subscribe('http2.client.stream.finish', ({ stream, headers }) => { stream[nsolid_tracer_s].response = true; http2Constants ||= require('internal/http2/core').constants; + stream[nsolid_tracer_s].status = + Number(headers[http2Constants.HTTP2_HEADER_STATUS]) || 0; if (generateSpan(kSpanHttpClient)) { const span = stream[nsolid_span_id_s]; if (span) { @@ -238,7 +291,14 @@ dc.subscribe('http2.client.stream.close', ({ stream, code }) => { const span = stream[nsolid_span_id_s]; if (code === http2Constants.NGHTTP2_NO_ERROR && tracingInfo.response) { nsolid_counts[kHttpClientCount]++; - nsolidApi.pushClientBucket(now() - tracingInfo.start); + const session = stream.session; + nsolidApi.pushClientBucket( + now() - tracingInfo.start, + kHttpMethodMap[tracingInfo.method] ?? kHttpMethodOther, + tracingInfo.status, + session?.socket?.remoteAddress || '', + session?.socket?.remotePort || 0, + kHttpVersion2); } else { nsolid_counts[kHttpClientAbortCount]++; if (span) { @@ -252,9 +312,12 @@ dc.subscribe('http2.client.stream.close', ({ stream, code }) => { }); dc.subscribe('http2.server.stream.start', ({ stream, headers }) => { + http2Constants ||= require('internal/http2/core').constants; stream[nsolid_tracer_s] = { start: now(), response: false, + method: headers[http2Constants.HTTP2_HEADER_METHOD] || '', + status: 0, }; if (generateSpan(kSpanHttpServer)) { @@ -268,6 +331,9 @@ dc.subscribe('http2.server.stream.start', ({ stream, headers }) => { dc.subscribe('http2.server.stream.finish', ({ stream, headers }) => { stream[nsolid_tracer_s].response = true; + http2Constants ||= require('internal/http2/core').constants; + stream[nsolid_tracer_s].status = + Number(stream.sentHeaders?.[http2Constants.HTTP2_HEADER_STATUS]) || 0; }); dc.subscribe('http2.server.stream.error', ({ stream, error }) => { @@ -290,7 +356,15 @@ dc.subscribe('http2.server.stream.close', ({ stream, code }) => { const span = stream[nsolid_span_id_s]; if (code === http2Constants.NGHTTP2_NO_ERROR && tracingInfo.response) { nsolid_counts[kHttpServerCount]++; - nsolidApi.pushServerBucket(now() - tracingInfo.start); + const encrypted = stream.session?.socket?.encrypted; + const route = stream[nsolid_route_s] || ''; + nsolidApi.pushServerBucket( + now() - tracingInfo.start, + kHttpMethodMap[tracingInfo.method] ?? kHttpMethodOther, + tracingInfo.status, + encrypted ? kHttpSchemeHttps : kHttpSchemeHttp, + kHttpVersion2, + route); } else { nsolid_counts[kHttpServerAbortCount]++; if (span) { @@ -309,3 +383,25 @@ dc.subscribe('http2.server.stream.close', ({ stream, code }) => { span.end(); } }); + +dc.subscribe('tracing:fastify.request.handler:start', ({ request, route }) => { + if (route?.url && request?.raw) { + // Store the route template using unified nsolid_route_s symbol + request.raw[nsolid_route_s] = route.url; + } +}); + +// Subscribe to HTTP response finish to capture Express routes +// Express sets req.route.path during request processing, available by response time +dc.subscribe('http.server.response.finish', ({ request, response }) => { + if (request?.route?.path && !request[nsolid_route_s]) { + // Compose full route path including mounted router prefixes + const baseUrl = request.baseUrl || ''; + const routePath = request.route.path; + // Avoid double slashes when concatenating (e.g., baseUrl ends with / and routePath starts with /) + const fullPath = baseUrl.endsWith('/') && routePath.startsWith('/') + ? baseUrl + routePath.slice(1) + : baseUrl + routePath; + request[nsolid_route_s] = fullPath; + } +}); diff --git a/lib/internal/nsolid_http_consts.js b/lib/internal/nsolid_http_consts.js new file mode 100644 index 0000000000..692717645c --- /dev/null +++ b/lib/internal/nsolid_http_consts.js @@ -0,0 +1,46 @@ +'use strict'; + +const { nsolid_consts } = internalBinding('nsolid_api'); + +// HttpMethod enum values matching MetricsStream::HttpMethod in nsolid.h +const kHttpMethodMap = { + __proto__: null, + GET: nsolid_consts.kHttpMethodGet, + HEAD: nsolid_consts.kHttpMethodHead, + POST: nsolid_consts.kHttpMethodPost, + PUT: nsolid_consts.kHttpMethodPut, + DELETE: nsolid_consts.kHttpMethodDelete, + CONNECT: nsolid_consts.kHttpMethodConnect, + OPTIONS: nsolid_consts.kHttpMethodOptions, + TRACE: nsolid_consts.kHttpMethodTrace, + PATCH: nsolid_consts.kHttpMethodPatch, +}; +const kHttpMethodOther = nsolid_consts.kHttpMethodOther; + +// HttpProtocolVersion enum values matching MetricsStream::HttpProtocolVersion +const kHttpVersionMap = { + '__proto__': null, + '1.0': nsolid_consts.kHttpVersion10, + '1.1': nsolid_consts.kHttpVersion11, + '2': nsolid_consts.kHttpVersion2, +}; +const kHttpVersion10 = nsolid_consts.kHttpVersion10; +const kHttpVersion11 = nsolid_consts.kHttpVersion11; +const kHttpVersion2 = nsolid_consts.kHttpVersion2; +const kHttpVersionOther = nsolid_consts.kHttpVersionOther; + +// HttpUrlScheme enum values matching MetricsStream::HttpUrlScheme +const kHttpSchemeHttp = nsolid_consts.kHttpSchemeHttp; +const kHttpSchemeHttps = nsolid_consts.kHttpSchemeHttps; + +module.exports = { + kHttpMethodMap, + kHttpMethodOther, + kHttpVersionMap, + kHttpVersion10, + kHttpVersion11, + kHttpVersion2, + kHttpVersionOther, + kHttpSchemeHttp, + kHttpSchemeHttps, +}; diff --git a/lib/nsolid.js b/lib/nsolid.js index 08f3e50d29..0054ee7590 100644 --- a/lib/nsolid.js +++ b/lib/nsolid.js @@ -767,6 +767,8 @@ function updateConfig(config = {}) { if (normalized !== undefined) { nsolidConfig.assetsEnabled = normalized; } + } else if (key === 'interval') { + nsolidConfig.interval = +config.interval; } else { nsolidConfig[key] = config[key]; } @@ -811,10 +813,6 @@ function updateConfig(config = {}) { // initConfig object so it doesn't waste memory. initConfig = null; - if (typeof nsolidConfig.interval === 'number') { - binding.setMetricsInterval(nsolidConfig.interval); - } - if (typeof nsolidConfig.pubkey === 'string' && nsolidConfig.pubkey.length !== 40) { debug('[updateConfig] invalid pubkey; must be 40 bytes'); diff --git a/src/nsolid.h b/src/nsolid.h index 1b44a1ce90..c86ae459df 100644 --- a/src/nsolid.h +++ b/src/nsolid.h @@ -7,6 +7,7 @@ #include #include +#include /** * @file nsolid.h @@ -849,11 +850,50 @@ class NODE_EXTERN MetricsStream { kGc = 120 /**< kGcRegular | kGcForced | kGcFull | kGcMajor */ }; + enum class HttpMethod : uint8_t { + kGet, + kHead, + kPost, + kPut, + kDelete, + kConnect, + kOptions, + kTrace, + kPatch, + kOther + }; + + enum class HttpProtocolVersion : uint8_t { + k10, /**< HTTP/1.0 */ + k11, /**< HTTP/1.1 */ + k2, /**< HTTP/2 */ + kOther + }; + + enum class HttpUrlScheme : uint8_t { + kHttp, + kHttps, + kOther + }; + + struct HttpDatapointAttrs { + HttpMethod method; /**< http.request.method */ + uint16_t status_code; /**< http.response.status_code */ + std::string server_address; /**< server.address (client only) */ + uint16_t server_port; /**< server.port (client only) */ + HttpUrlScheme url_scheme; /**< url.scheme (server only) */ + HttpProtocolVersion protocol_version; /**< network.protocol.version */ + std::string route; /**< http.route (server only, route template) */ + }; + + using DatapointAttrs = std::variant; + struct Datapoint { uint64_t thread_id; double timestamp; /**< Datapoint timestamp in milliseconds */ Type type; double value; + DatapointAttrs attrs; /**< Type-specific attributes (monostate = none) */ }; using metrics_stream_bucket = std::vector; diff --git a/src/nsolid/nsolid_api.cc b/src/nsolid/nsolid_api.cc index 6439ca16ee..51c3952a70 100644 --- a/src/nsolid/nsolid_api.cc +++ b/src/nsolid/nsolid_api.cc @@ -77,6 +77,85 @@ using v8::WeakCallbackInfo; using v8::WeakCallbackType; +using opentelemetry::sdk::metrics::Aggregation; +using opentelemetry::sdk::metrics::AttributesHashMap; +using opentelemetry::sdk::metrics::Base2ExponentialHistogramAggregation; +using opentelemetry::sdk::metrics::Base2ExponentialHistogramPointData; +using opentelemetry::sdk::metrics::MetricAttributes; +using opentelemetry::sdk::metrics::PointDataAttributes; + +static const char* HttpMethodToString(MetricsStream::HttpMethod m) { + switch (m) { + case MetricsStream::HttpMethod::kGet: return "GET"; + case MetricsStream::HttpMethod::kHead: return "HEAD"; + case MetricsStream::HttpMethod::kPost: return "POST"; + case MetricsStream::HttpMethod::kPut: return "PUT"; + case MetricsStream::HttpMethod::kDelete: return "DELETE"; + case MetricsStream::HttpMethod::kConnect: return "CONNECT"; + case MetricsStream::HttpMethod::kOptions: return "OPTIONS"; + case MetricsStream::HttpMethod::kTrace: return "TRACE"; + case MetricsStream::HttpMethod::kPatch: return "PATCH"; + case MetricsStream::HttpMethod::kOther: return "_OTHER"; + } + return "_OTHER"; +} + +static const char* HttpProtocolVersionToString( + MetricsStream::HttpProtocolVersion v) { + switch (v) { + case MetricsStream::HttpProtocolVersion::k10: return "1.0"; + case MetricsStream::HttpProtocolVersion::k11: return "1.1"; + case MetricsStream::HttpProtocolVersion::k2: return "2"; + case MetricsStream::HttpProtocolVersion::kOther: return ""; + } + return ""; +} + +static const char* HttpUrlSchemeToString(MetricsStream::HttpUrlScheme s) { + switch (s) { + case MetricsStream::HttpUrlScheme::kHttp: return "http"; + case MetricsStream::HttpUrlScheme::kHttps: return "https"; + case MetricsStream::HttpUrlScheme::kOther: return ""; + } + return ""; +} + +// Build OTel MetricAttributes from HttpDatapointAttrs for attribute-keyed +// histogram aggregation. +static MetricAttributes BuildMetricAttributes( + const MetricsStream::HttpDatapointAttrs& http, + MetricsStream::Type type) { + MetricAttributes attrs; + attrs.insert({"http.request.method", + std::string(HttpMethodToString(http.method))}); + if (http.status_code != 0) { + attrs.insert({"http.response.status_code", + static_cast(http.status_code)}); + } + const char* version = HttpProtocolVersionToString(http.protocol_version); + if (version[0] != '\0') { + attrs.insert({"network.protocol.version", std::string(version)}); + } + if (type == MetricsStream::Type::kHttpClient) { + if (!http.server_address.empty()) { + attrs.insert({"server.address", http.server_address}); + } + if (http.server_port != 0) { + attrs.insert({"server.port", static_cast(http.server_port)}); + } + } else { + const char* scheme = HttpUrlSchemeToString(http.url_scheme); + if (scheme[0] != '\0') { + attrs.insert({"url.scheme", std::string(scheme)}); + } + // Add http.route for server-side metrics when route template is available + if (!http.route.empty()) { + attrs.insert({"http.route", http.route}); + } + } + return attrs; +} + static void calculateHttpDnsPtiles( std::vector& bucket, // NOLINT(runtime/references) std::atomic& median, // NOLINT(runtime/references) @@ -87,7 +166,6 @@ static void calculatePtiles(std::vector* vals, double* med, double* nn); // any thread have been blocked longer than the threadshold passed to // OnBlockedLoopHook(). constexpr uint64_t blocked_loop_interval = 100; -uint64_t gen_ptiles_interval = 5000; constexpr uint64_t datapoints_q_interval = 100; constexpr size_t datapoints_q_max_size = 100; @@ -204,13 +282,19 @@ int EnvInst::RunCommand(SharedEnvInst envinst_sp, -void EnvInst::PushClientBucket(double value) { - send_datapoint(MetricsStream::Type::kHttpClient, value); +void EnvInst::PushClientBucket(double value, + MetricsStream::HttpDatapointAttrs&& attrs) { + send_datapoint(MetricsStream::Type::kHttpClient, + value, + std::move(attrs)); } -void EnvInst::PushServerBucket(double value) { - send_datapoint(MetricsStream::Type::kHttpServer, value); +void EnvInst::PushServerBucket(double value, + MetricsStream::HttpDatapointAttrs&& attrs) { + send_datapoint(MetricsStream::Type::kHttpServer, + value, + std::move(attrs)); } @@ -1207,6 +1291,13 @@ void EnvList::UpdateConfig(const nlohmann::json& config) { update_continuous_profiler(contCpuProfile, contCpuProfileInterval); } + it = config.find("interval"); + if (it != config.end() && !it->is_null()) { + // If interval changes, next timer cb the timer is reset. + uint64_t interval = it->get(); + gen_ptiles_interval_.store(interval, std::memory_order_relaxed); + } + it = config.find("otlp"); if (it != config.end() && !it->is_null()) { otlp::OTLPAgent::Inst()->start(); @@ -1428,7 +1519,7 @@ void EnvList::datapoint_cb_(std::queue&& q) { MetricsStream::Datapoint& dp = q.front(); auto envinst_sp = EnvInst::GetInst(dp.thread_id); if (envinst_sp != nullptr) - envinst_sp->add_metric_datapoint_(dp.type, dp.value); + envinst_sp->add_metric_datapoint_(dp.type, dp.value, dp.attrs); bucket.emplace_back(std::move(dp)); q.pop(); @@ -1448,14 +1539,16 @@ void EnvList::datapoint_cb_(std::queue&& q) { void EnvInst::send_datapoint(MetricsStream::Type type, - double value) { + double value, + MetricsStream::DatapointAttrs attrs) { double ts = 0; if (has_metrics_stream_hooks_) { ts = (PERFORMANCE_NOW() - env()->time_origin()) / 1e6 + env()->time_origin_timestamp() / 1e3; } - EnvList::Inst()->datapoints_q_.Enqueue({ thread_id_, ts, type, value }); + EnvList::Inst()->datapoints_q_.Enqueue( + { thread_id_, ts, type, value, std::move(attrs) }); } @@ -1667,7 +1760,9 @@ void EnvList::env_list_routine_(ns_thread*, EnvList* envlist) { blocked_loop_timer_cb_, blocked_loop_interval, blocked_loop_interval); CHECK_EQ(er, 0); er = envlist->gen_ptiles_timer_.start( - gen_ptiles_cb_, gen_ptiles_interval, gen_ptiles_interval); + gen_ptiles_cb_, + envlist->gen_ptiles_interval_.load(std::memory_order_relaxed), + envlist->gen_ptiles_interval_.load(std::memory_order_relaxed)); CHECK_EQ(er, 0); envlist->blocked_loop_timer_.unref(); envlist->gen_ptiles_timer_.unref(); @@ -1755,7 +1850,7 @@ void EnvList::blocked_loop_timer_cb_(ns_timer*) { // This is a dirty simple percentile tracker. Since we support streaming metrics // a better solution is to stream them all and do more advanced percentile // calculations where it won't affect the process. -void EnvList::gen_ptiles_cb_(ns_timer*) { +void EnvList::gen_ptiles_cb_(ns_timer* timer) { EnvList* envlist = EnvList::Inst(); decltype(EnvList::env_map_) env_map; @@ -1777,6 +1872,43 @@ void EnvList::gen_ptiles_cb_(ns_timer*) { calculateHttpDnsPtiles(envinst_sp->server_bucket_, envinst_sp->http_server_median_, envinst_sp->http_server99_ptile_); + + // Harvest per-attribute exponential histogram points and reset hashmaps. + auto harvest = [](std::unique_ptr& hashmap, + std::shared_ptr& points_ptr) { + auto new_points = std::make_shared(); + hashmap->GetAllEnteries( + [&new_points](const MetricAttributes& attrs, + Aggregation& agg) -> bool { + auto point = agg.ToPoint(); + auto* hist = opentelemetry::nostd::get_if< + Base2ExponentialHistogramPointData>(&point); + if (hist) { + new_points->push_back({ attrs, std::move(*hist) }); + } + return true; + }); + points_ptr = std::move(new_points); + // Reset for the next interval. + hashmap = std::make_unique( + kHttpHistogramCardinalityLimit); + }; + + harvest(envinst_sp->http_client_hashmap_, + envinst_sp->http_client_hist_points_); + harvest(envinst_sp->http_server_hashmap_, + envinst_sp->http_server_hist_points_); + } + + // Restart timer if interval changed. + uint64_t interval = + uv_timer_get_repeat(reinterpret_cast(timer->base_handle())); + uint64_t next_interval = + envlist->gen_ptiles_interval_.load(std::memory_order_relaxed); + if (next_interval != interval) { + int er = envlist->gen_ptiles_timer_.start( + gen_ptiles_cb_, next_interval, next_interval); + CHECK_EQ(er, 0); } } @@ -1986,8 +2118,20 @@ void EnvInst::CustomCommandReqWeakCallback( } -void EnvInst::add_metric_datapoint_(MetricsStream::Type type, double value) { +void EnvInst::add_metric_datapoint_(MetricsStream::Type type, + double value, + MetricsStream::DatapointAttrs attrs) { DCHECK(utils::are_threads_equal(uv_thread_self(), EnvList::Inst()->thread())); + auto aggregate_into_hashmap = [&value]( + AttributesHashMap* hashmap, + MetricAttributes&& metric_attrs) { + auto factory = []() -> std::unique_ptr { + return std::make_unique(); + }; + auto* agg = hashmap->GetOrSetDefault(std::move(metric_attrs), factory); + agg->Aggregate(value); + }; + switch (type) { case MetricsStream::Type::kDns: { @@ -1997,11 +2141,23 @@ void EnvInst::add_metric_datapoint_(MetricsStream::Type type, double value) { case MetricsStream::Type::kHttpClient: { client_bucket_.push_back(value); + auto* http = std::get_if(&attrs); + if (http) { + aggregate_into_hashmap( + http_client_hashmap_.get(), + BuildMetricAttributes(*http, type)); + } } break; case MetricsStream::Type::kHttpServer: { server_bucket_.push_back(value); + auto* http = std::get_if(&attrs); + if (http) { + aggregate_into_hashmap( + http_server_hashmap_.get(), + BuildMetricAttributes(*http, type)); + } } break; case MetricsStream::Type::kGcForced: @@ -2269,19 +2425,58 @@ static void WriteLog(const FunctionCallbackInfo& args) { void BindingData::SlowPushClientBucket( const FunctionCallbackInfo& args) { DCHECK(args[0]->IsNumber()); + DCHECK(args[1]->IsUint32()); + DCHECK(args[2]->IsUint32()); + DCHECK(args[3]->IsString()); + DCHECK(args[4]->IsUint32()); + DCHECK(args[5]->IsUint32()); + Utf8Value server_address(args.GetIsolate(), args[3]); PushClientBucketImpl(PrincipalRealm::GetBindingData(args), - args[0].As()->Value()); -} - - -void BindingData::FastPushClientBucket(v8::Local receiver, - double val) { - PushClientBucketImpl(FromJSObject(receiver), val); -} - - -void BindingData::PushClientBucketImpl(BindingData* data, double val) { - data->env()->envinst_->PushClientBucket(val); + args[0].As()->Value(), + args[1].As()->Value(), + args[2].As()->Value(), + std::string(*server_address, server_address.length()), + args[4].As()->Value(), + args[5].As()->Value()); +} + + +void BindingData::FastPushClientBucket( + v8::Local receiver, + double val, + uint32_t method, + uint32_t status_code, + const v8::FastOneByteString& server_address, + uint32_t server_port, + uint32_t protocol_version) { + PushClientBucketImpl( + FromJSObject(receiver), + val, + method, + status_code, + std::string(server_address.data, server_address.length), + server_port, + protocol_version); +} + + +void BindingData::PushClientBucketImpl(BindingData* data, + double val, + uint32_t method, + uint32_t status_code, + const std::string& server_address, + uint32_t server_port, + uint32_t protocol_version) { + MetricsStream::HttpDatapointAttrs attrs{ + static_cast(method), + static_cast(status_code), + server_address, + static_cast(server_port), + MetricsStream::HttpUrlScheme::kOther, + static_cast(protocol_version), + std::string() + }; + data->env()->envinst_->PushClientBucket(val, std::move(attrs)); } @@ -2307,19 +2502,62 @@ void BindingData::PushDnsBucketImpl(BindingData* data, double val) { void BindingData::SlowPushServerBucket( const FunctionCallbackInfo& args) { DCHECK(args[0]->IsNumber()); + DCHECK(args[1]->IsUint32()); + DCHECK(args[2]->IsUint32()); + DCHECK(args[3]->IsUint32()); + DCHECK(args[4]->IsUint32()); + Isolate* isolate = args.GetIsolate(); + std::string route; + if (args[5]->IsString()) { + Local route_s = args[5].As(); + route = *String::Utf8Value(isolate, route_s); + } PushServerBucketImpl(PrincipalRealm::GetBindingData(args), - args[0].As()->Value()); -} - - -void BindingData::FastPushServerBucket(v8::Local receiver, - double val) { - PushServerBucketImpl(FromJSObject(receiver), val); -} - - -void BindingData::PushServerBucketImpl(BindingData* data, double val) { - data->env()->envinst_->PushServerBucket(val); + args[0].As()->Value(), + args[1].As()->Value(), + args[2].As()->Value(), + args[3].As()->Value(), + args[4].As()->Value(), + route); +} + + +void BindingData::FastPushServerBucket( + v8::Local receiver, + double val, + uint32_t method, + uint32_t status_code, + uint32_t url_scheme, + uint32_t protocol_version, + const FastOneByteString& route) { + PushServerBucketImpl( + FromJSObject(receiver), + val, + method, + status_code, + url_scheme, + protocol_version, + std::string(route.data, route.length)); +} + + +void BindingData::PushServerBucketImpl(BindingData* data, + double val, + uint32_t method, + uint32_t status_code, + uint32_t url_scheme, + uint32_t protocol_version, + const std::string& route) { + MetricsStream::HttpDatapointAttrs attrs{ + static_cast(method), + static_cast(status_code), + "", + 0, + static_cast(url_scheme), + static_cast(protocol_version), + route + }; + data->env()->envinst_->PushServerBucket(val, std::move(attrs)); } @@ -2861,13 +3099,6 @@ static void ResumeMetrics(const FunctionCallbackInfo& args) { } -static void SetMetricsInterval(const FunctionCallbackInfo& args) { - CHECK(args[0]->IsNumber()); - double interval = args[0].As()->Value(); - gen_ptiles_interval = static_cast(interval); -} - - static void OnCustomCommand(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); CHECK(args[0]->IsFunction()); @@ -3279,7 +3510,6 @@ void BindingData::Initialize(Local target, SetMethod(context, target, "getKernelVersion", GetKernelVersion); SetMethod(context, target, "pauseMetrics", PauseMetrics); SetMethod(context, target, "resumeMetrics", ResumeMetrics); - SetMethod(context, target, "setMetricsInterval", SetMetricsInterval); SetMethod(context, target, "onCustomCommand", OnCustomCommand); SetMethod(context, target, "customCommandResponse", CustomCommandResponse); SetMethod(context, @@ -3337,6 +3567,44 @@ void BindingData::Initialize(Local target, NSOLID_SPAN_ATTRS(V) #undef V +#define NSOLID_EXPORT_CONST(Name, ValueExpr) \ + consts_enum->Set(context, \ + OneByteString(isolate, #Name), \ + Integer::New(isolate, \ + static_cast(ValueExpr))).Check(); + +#define NSOLID_HTTP_METHOD_CONSTS(V) \ + V(kHttpMethodGet, MetricsStream::HttpMethod::kGet) \ + V(kHttpMethodHead, MetricsStream::HttpMethod::kHead) \ + V(kHttpMethodPost, MetricsStream::HttpMethod::kPost) \ + V(kHttpMethodPut, MetricsStream::HttpMethod::kPut) \ + V(kHttpMethodDelete, MetricsStream::HttpMethod::kDelete) \ + V(kHttpMethodConnect, MetricsStream::HttpMethod::kConnect) \ + V(kHttpMethodOptions, MetricsStream::HttpMethod::kOptions) \ + V(kHttpMethodTrace, MetricsStream::HttpMethod::kTrace) \ + V(kHttpMethodPatch, MetricsStream::HttpMethod::kPatch) \ + V(kHttpMethodOther, MetricsStream::HttpMethod::kOther) + +#define NSOLID_HTTP_VERSION_CONSTS(V) \ + V(kHttpVersion10, MetricsStream::HttpProtocolVersion::k10) \ + V(kHttpVersion11, MetricsStream::HttpProtocolVersion::k11) \ + V(kHttpVersion20, MetricsStream::HttpProtocolVersion::k2) \ + V(kHttpVersionOther, MetricsStream::HttpProtocolVersion::kOther) + +#define NSOLID_HTTP_SCHEME_CONSTS(V) \ + V(kHttpSchemeHttp, MetricsStream::HttpUrlScheme::kHttp) \ + V(kHttpSchemeHttps, MetricsStream::HttpUrlScheme::kHttps) \ + V(kHttpSchemeOther, MetricsStream::HttpUrlScheme::kOther) + + NSOLID_HTTP_METHOD_CONSTS(NSOLID_EXPORT_CONST) + NSOLID_HTTP_VERSION_CONSTS(NSOLID_EXPORT_CONST) + NSOLID_HTTP_SCHEME_CONSTS(NSOLID_EXPORT_CONST) + +#undef NSOLID_HTTP_SCHEME_CONSTS +#undef NSOLID_HTTP_VERSION_CONSTS +#undef NSOLID_HTTP_METHOD_CONSTS +#undef NSOLID_EXPORT_CONST + consts_enum->Set(context, OneByteString(isolate, "kTraceFieldCount"), Integer::New(isolate, EnvInst::kFieldCount)).Check(); @@ -3365,6 +3633,18 @@ void BindingData::Initialize(Local target, OneByteString(isolate, "nsolid_span_id_s"), Symbol::New( isolate, OneByteString(isolate, "nsolid_span_id"))).Check(); + + target->Set( + context, + OneByteString(isolate, "nsolid_route_s"), + Symbol::New( + isolate, OneByteString(isolate, "nsolid_route"))).Check(); + + target->Set( + context, + OneByteString(isolate, "nsolid_net_prot_s"), + Symbol::New( + isolate, OneByteString(isolate, "nsolid_net_prot"))).Check(); } @@ -3416,7 +3696,6 @@ void BindingData::RegisterExternalReferences( registry->Register(GetKernelVersion); registry->Register(PauseMetrics); registry->Register(ResumeMetrics); - registry->Register(SetMetricsInterval); registry->Register(OnCustomCommand); registry->Register(CustomCommandResponse); registry->Register(AttachRequestToCustomCommand); diff --git a/src/nsolid/nsolid_api.h b/src/nsolid/nsolid_api.h index c5e8095a4f..b510cacfac 100644 --- a/src/nsolid/nsolid_api.h +++ b/src/nsolid/nsolid_api.h @@ -28,6 +28,7 @@ // We can export it via ADDONS_PREREQS in the Makefile and link against it with // our native module builds that depend on it #include "nlohmann/json.hpp" +#include "nsolid/nsolid_metrics_types.h" namespace node { @@ -110,7 +111,6 @@ class EnvInst { NSOLID_DELETE_DEFAULT_CONSTRUCTORS(EnvInst) struct CmdQueueStor; - using user_cb_sig = void(*)(SharedEnvInst, void*); using voided_cb_sig = void(*)(CmdQueueStor*); using optional_string = std::pair; @@ -209,8 +209,10 @@ class EnvInst { const char* value, bool is_return); - void PushClientBucket(double value); - void PushServerBucket(double value); + void PushClientBucket(double value, + MetricsStream::HttpDatapointAttrs&& attrs); + void PushServerBucket(double value, + MetricsStream::HttpDatapointAttrs&& attrs); void PushDnsBucket(double value); std::string GetModuleInfo(); @@ -273,6 +275,13 @@ class EnvInst { bool can_call_into_js() const; uint32_t* get_trace_flags() { return &trace_flags_; } + // Returns the last harvested HTTP client/server latency histogram points, + // keyed by attribute combination. Updated every metrics interval. + SharedPointDataAttributes + http_client_histogram_points() const { return http_client_hist_points_; } + SharedPointDataAttributes + http_server_histogram_points() const { return http_server_hist_points_; } + std::atomic metrics_paused = { false }; // Track the values of JSMetricsFields. @@ -327,8 +336,12 @@ class EnvInst { static void custom_command_(SharedEnvInst envinst_sp, const std::string req_id); - void add_metric_datapoint_(MetricsStream::Type, double); - void send_datapoint(MetricsStream::Type, double); + void add_metric_datapoint_(MetricsStream::Type type, + double value, + MetricsStream::DatapointAttrs attrs = {}); + void send_datapoint(MetricsStream::Type type, + double value, + MetricsStream::DatapointAttrs attrs = {}); static void get_event_loop_stats_(EnvInst* envinst, ThreadMetrics::MetricsStor* stor); @@ -398,6 +411,14 @@ class EnvInst { std::vector dns_bucket_; std::vector client_bucket_; std::vector server_bucket_; + UniqueAttributesHashMap + http_client_hashmap_ = + std::make_unique(kHttpHistogramCardinalityLimit); + UniqueAttributesHashMap + http_server_hashmap_ = + std::make_unique(kHttpHistogramCardinalityLimit); + SharedPointDataAttributes http_client_hist_points_; + SharedPointDataAttributes http_server_hist_points_; std::atomic dns_median_ = { 0 }; std::atomic dns99_ptile_ = { 0 }; std::atomic http_client_median_ = { 0 }; @@ -709,6 +730,7 @@ class EnvList { std::atomic min_blocked_threshold_ = { UINT64_MAX }; // TODO(trevnorris): Temporary until Console supports streaming metrics nsuv::ns_timer gen_ptiles_timer_; + std::atomic gen_ptiles_interval_ = { 5000 }; // exit data std::atomic exiting_ = { false }; std::atomic exit_code_; diff --git a/src/nsolid/nsolid_bindings.h b/src/nsolid/nsolid_bindings.h index 24c8de622d..5b18ba995a 100644 --- a/src/nsolid/nsolid_bindings.h +++ b/src/nsolid/nsolid_bindings.h @@ -24,8 +24,20 @@ class BindingData : public SnapshotableObject { static void SlowPushClientBucket( const v8::FunctionCallbackInfo& args); - static void FastPushClientBucket(v8::Local receiver, double val); - static void PushClientBucketImpl(BindingData* data, double val); + static void FastPushClientBucket(v8::Local receiver, + double val, + uint32_t method, + uint32_t status_code, + const v8::FastOneByteString& server_address, + uint32_t server_port, + uint32_t protocol_version); + static void PushClientBucketImpl(BindingData* data, + double val, + uint32_t method, + uint32_t status_code, + const std::string& server_address, + uint32_t server_port, + uint32_t protocol_version); static void SlowPushDnsBucket( const v8::FunctionCallbackInfo& args); @@ -34,8 +46,20 @@ class BindingData : public SnapshotableObject { static void SlowPushServerBucket( const v8::FunctionCallbackInfo& args); - static void FastPushServerBucket(v8::Local receiver, double val); - static void PushServerBucketImpl(BindingData* data, double val); + static void FastPushServerBucket(v8::Local receiver, + double val, + uint32_t method, + uint32_t status_code, + uint32_t url_scheme, + uint32_t protocol_version, + const v8::FastOneByteString& route); + static void PushServerBucketImpl(BindingData* data, + double val, + uint32_t method, + uint32_t status_code, + uint32_t url_scheme, + uint32_t protocol_version, + const std::string& route); static void SlowPushSpanDataDouble( const v8::FunctionCallbackInfo& args); diff --git a/src/nsolid/nsolid_metrics_types.h b/src/nsolid/nsolid_metrics_types.h new file mode 100644 index 0000000000..657b99f78e --- /dev/null +++ b/src/nsolid/nsolid_metrics_types.h @@ -0,0 +1,33 @@ +#ifndef SRC_NSOLID_NSOLID_METRICS_TYPES_H_ +#define SRC_NSOLID_NSOLID_METRICS_TYPES_H_ + +#include "opentelemetry/sdk/metrics/data/point_data.h" +#include "opentelemetry/sdk/metrics/aggregation/base2_exponential_histogram_aggregation.h" +#include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" +#include +#include + +namespace node { +namespace nsolid { + +// Type alias for histogram point data attributes vector +using PointDataAttributesVector = + std::vector; + +// Shared pointer to point data attributes (used for histogram caching) +using SharedPointDataAttributes = + std::shared_ptr; + +// Type alias for attributes hashmap (used for metric aggregation) +using AttributesHashMap = opentelemetry::sdk::metrics::AttributesHashMap; + +// Unique pointer to attributes hashmap +using UniqueAttributesHashMap = std::unique_ptr; + +constexpr size_t kHttpHistogramCardinalityLimit = + opentelemetry::sdk::metrics::kAggregationCardinalityLimit; + +} // namespace nsolid +} // namespace node + +#endif // SRC_NSOLID_NSOLID_METRICS_TYPES_H_ diff --git a/src/nsolid/nsolid_trace.h b/src/nsolid/nsolid_trace.h index 14cade65cc..c35ee86ff7 100644 --- a/src/nsolid/nsolid_trace.h +++ b/src/nsolid/nsolid_trace.h @@ -72,7 +72,8 @@ namespace tracing { V(kSpanHttpMethod, std::string, method, http.method) \ V(kSpanHttpProtocolVersion, std::string, version, network.protocol.version) \ V(kSpanHttpReqUrl, std::string, req_url, http.url) \ - V(kSpanHttpStatusMessage, std::string, status_text, http.status_text) + V(kSpanHttpStatusMessage, std::string, status_text, http.status_text) \ + V(kSpanHttpRoute, std::string, route, http.route) #define NSOLID_SPAN_HTTP_ATTRS_NUMBERS(V) \ V(kSpanHttpStatusCode, uint64_t, status_code, http.status_code) diff --git a/test/addons/nsolid-dispatchqueue/binding.gyp b/test/addons/nsolid-dispatchqueue/binding.gyp index 1f082bad1e..85bb95a82c 100644 --- a/test/addons/nsolid-dispatchqueue/binding.gyp +++ b/test/addons/nsolid-dispatchqueue/binding.gyp @@ -7,6 +7,8 @@ 'defines': [ 'NODE_WANT_INTERNALS=1' ], 'include_dirs': [ '../../../deps/nsuv/include', + '../../../deps/opentelemetry-cpp/api/include', + '../../../deps/opentelemetry-cpp/sdk/include', '../../../deps/protobuf/src', '../../../deps/protobuf/third_party/abseil-cpp', '../../../deps/v8', diff --git a/test/addons/nsolid-eventloop-cmd/binding.gyp b/test/addons/nsolid-eventloop-cmd/binding.gyp index 1f082bad1e..85bb95a82c 100644 --- a/test/addons/nsolid-eventloop-cmd/binding.gyp +++ b/test/addons/nsolid-eventloop-cmd/binding.gyp @@ -7,6 +7,8 @@ 'defines': [ 'NODE_WANT_INTERNALS=1' ], 'include_dirs': [ '../../../deps/nsuv/include', + '../../../deps/opentelemetry-cpp/api/include', + '../../../deps/opentelemetry-cpp/sdk/include', '../../../deps/protobuf/src', '../../../deps/protobuf/third_party/abseil-cpp', '../../../deps/v8', diff --git a/test/addons/nsolid-log-hooks/binding.gyp b/test/addons/nsolid-log-hooks/binding.gyp index 1f082bad1e..85bb95a82c 100644 --- a/test/addons/nsolid-log-hooks/binding.gyp +++ b/test/addons/nsolid-log-hooks/binding.gyp @@ -7,6 +7,8 @@ 'defines': [ 'NODE_WANT_INTERNALS=1' ], 'include_dirs': [ '../../../deps/nsuv/include', + '../../../deps/opentelemetry-cpp/api/include', + '../../../deps/opentelemetry-cpp/sdk/include', '../../../deps/protobuf/src', '../../../deps/protobuf/third_party/abseil-cpp', '../../../deps/v8', diff --git a/test/addons/nsolid-statsdagent/binding.gyp b/test/addons/nsolid-statsdagent/binding.gyp index 8ee4d6b440..1b817b5b99 100644 --- a/test/addons/nsolid-statsdagent/binding.gyp +++ b/test/addons/nsolid-statsdagent/binding.gyp @@ -7,6 +7,8 @@ 'include_dirs': [ '../../../src/', '../../../deps/nsuv/include/', + '../../../deps/opentelemetry-cpp/api/include', + '../../../deps/opentelemetry-cpp/sdk/include', '../../../deps/protobuf/src', '../../../deps/protobuf/third_party/abseil-cpp', '../../../deps/v8', diff --git a/test/agents/test-grpc-http-histograms.mjs b/test/agents/test-grpc-http-histograms.mjs new file mode 100644 index 0000000000..b1e6042d5e --- /dev/null +++ b/test/agents/test-grpc-http-histograms.mjs @@ -0,0 +1,277 @@ +// Flags: --expose-internals +import { mustCallAtLeast, mustSucceed } from '../common/index.mjs'; +import assert from 'node:assert'; +import { + GRPCServer, + TestClient, +} from '../common/nsolid-grpc-agent/index.js'; +import validators from 'internal/validators'; + +const { + validateArray, + validateNumber, +} = validators; + +// Expected exponential histogram metrics exported by the GrpcAgent. +// The GrpcAgent uses use_snake_case=false, so names are camelCase. +const expectedHistograms = [ + ['http.client.request.duration', 's'], + ['http.server.request.duration', 's'], +]; + +// Semconv attribute keys expected on each data point (besides thread attrs). +const commonHttpAttrs = [ + 'http.request.method', + 'http.response.status_code', + 'network.protocol.version', +]; +const clientOnlyAttrs = ['server.address']; +const serverOnlyAttrs = ['url.scheme']; + +function getAttr(attributes, key) { + return attributes.find((a) => a.key === key); +} + +function checkExponentialHistogramDataPoint(name, dataPoint) { + // Validate timestamps. + const startTime = BigInt(dataPoint.startTimeUnixNano); + assert.ok(startTime, `${name}: startTimeUnixNano should be set`); + const time = BigInt(dataPoint.timeUnixNano); + assert.ok(time, `${name}: timeUnixNano should be set`); + assert.ok(time > startTime, `${name}: timeUnixNano > startTimeUnixNano`); + + // Validate attributes. + validateArray(dataPoint.attributes, `${name}.attributes`); + + // Thread attributes. + const threadIdAttr = getAttr(dataPoint.attributes, 'thread.id'); + assert.ok(threadIdAttr, `${name}: should have thread.id attribute`); + assert.strictEqual(threadIdAttr.value.intValue, '0'); + const threadNameAttr = getAttr(dataPoint.attributes, 'thread.name'); + assert.ok(threadNameAttr, `${name}: should have thread.name attribute`); + + // Common HTTP semconv attributes. + for (const key of commonHttpAttrs) { + assert.ok(getAttr(dataPoint.attributes, key), + `${name}: should have ${key} attribute`); + } + + // Validate http.request.method value. + const methodAttr = getAttr(dataPoint.attributes, 'http.request.method'); + assert.strictEqual(methodAttr.value.stringValue, 'GET', + `${name}: http.request.method should be GET`); + + // Validate http.response.status_code value. + const statusAttr = getAttr(dataPoint.attributes, 'http.response.status_code'); + assert.strictEqual(statusAttr.value.intValue, '200', + `${name}: http.response.status_code should be 200`); + + // Validate network.protocol.version value. + const versionAttr = getAttr(dataPoint.attributes, 'network.protocol.version'); + assert.strictEqual(versionAttr.value.stringValue, '1.1', + `${name}: network.protocol.version should be 1.1`); + + // Type-specific attributes. + const isClient = name.includes('Client') || name.includes('client'); + if (isClient) { + for (const key of clientOnlyAttrs) { + assert.ok(getAttr(dataPoint.attributes, key), + `${name}: client should have ${key} attribute`); + } + const addrAttr = getAttr(dataPoint.attributes, 'server.address'); + assert.strictEqual(addrAttr.value.stringValue, '127.0.0.1', + `${name}: server.address should be 127.0.0.1`); + } else { + for (const key of serverOnlyAttrs) { + assert.ok(getAttr(dataPoint.attributes, key), + `${name}: server should have ${key} attribute`); + } + const schemeAttr = getAttr(dataPoint.attributes, 'url.scheme'); + assert.strictEqual(schemeAttr.value.stringValue, 'http', + `${name}: url.scheme should be http`); + } + + // Validate histogram fields: count, sum, scale. + // count is a string (uint64 via proto longs: String). + const count = parseInt(dataPoint.count, 10); + assert.ok(count > 0, `${name}: count should be > 0, got ${count}`); + + // 'sum' should be present and > 0 (latency values are positive). + validateNumber(dataPoint.sum, `${name}.sum`); + assert.ok(dataPoint.sum > 0, `${name}: sum should be > 0`); + + // 'scale' should be a number. + validateNumber(dataPoint.scale, `${name}.scale`); + + // 'positive' buckets should have data (latency is always positive). + assert.ok(dataPoint.positive, `${name}: positive buckets should exist`); + validateNumber(dataPoint.positive.offset, `${name}.positive.offset`); + validateArray(dataPoint.positive.bucketCounts, `${name}.positive.bucketCounts`); + assert.ok(dataPoint.positive.bucketCounts.length > 0, `${name}: positive.bucketCounts should not be empty`); + + // 'min' and 'max' should be present and > 0. + validateNumber(dataPoint.min, `${name}.min`); + assert.ok(dataPoint.min > 0, `${name}: min should be > 0`); + validateNumber(dataPoint.max, `${name}.max`); + assert.ok(dataPoint.max > 0, `${name}: max should be > 0`); + assert.ok(dataPoint.max >= dataPoint.min, `${name}: max should be >= min`); +} + +function checkHistogramMetrics(metricsData) { + const resourceMetrics = metricsData.resourceMetrics; + if (!resourceMetrics || resourceMetrics.length === 0) return null; + + const scopeMetrics = resourceMetrics[0].scopeMetrics; + if (!scopeMetrics || scopeMetrics.length === 0) return null; + + const metrics = scopeMetrics[0].metrics; + if (!metrics) return null; + + // Find all exponential histogram metrics with count > 0. + const remaining = [...expectedHistograms]; + for (const metric of metrics) { + if (metric.data !== 'exponentialHistogram') continue; + + const idx = remaining.findIndex((m) => m[0] === metric.name); + if (idx === -1) continue; + + const [name, unit] = remaining[idx]; + assert.strictEqual(metric.unit, unit, `${name}: unit should be '${unit}'`); + + // Validate aggregation temporality (delta). + assert.strictEqual(metric.exponentialHistogram.aggregationTemporality, + 'AGGREGATION_TEMPORALITY_DELTA', + `${name}: should use delta temporality`); + + const dataPoints = metric.exponentialHistogram.dataPoints; + validateArray(dataPoints, `${name}.dataPoints`); + assert.ok(dataPoints.length > 0, `${name}: should have at least one data point`); + + // Find a data point with count > 0 (histogram has actual data). + const dp = dataPoints.find((d) => parseInt(d.count, 10) > 0); + if (dp) { + checkExponentialHistogramDataPoint(name, dp); + remaining.splice(idx, 1); + } + } + + return remaining.length === 0; +} + +function collectHistogramWindows(metricsData) { + const resourceMetrics = metricsData.resourceMetrics; + if (!resourceMetrics || resourceMetrics.length === 0) { + return new Map(); + } + + const scopeMetrics = resourceMetrics[0].scopeMetrics; + if (!scopeMetrics || scopeMetrics.length === 0) { + return new Map(); + } + + const metrics = scopeMetrics[0].metrics; + if (!metrics) { + return new Map(); + } + + const windows = new Map(); + for (const metric of metrics) { + if (metric.data !== 'exponentialHistogram') continue; + + const expected = expectedHistograms.find((m) => m[0] === metric.name); + if (!expected) continue; + + const dataPoints = metric.exponentialHistogram.dataPoints; + validateArray(dataPoints, `${metric.name}.dataPoints`); + + // We keep HTTP traffic flowing, so each interval should carry data. + const dp = dataPoints.find((d) => parseInt(d.count, 10) > 0); + if (!dp) continue; + + windows.set(metric.name, { + start: BigInt(dp.startTimeUnixNano), + end: BigInt(dp.timeUnixNano), + }); + } + + return windows; +} + +async function runTest({ getEnv }) { + return new Promise((resolve) => { + const grpcServer = new GRPCServer(); + grpcServer.start(mustSucceed(async (port) => { + console.log('GRPC server started', port); + const env = getEnv(port); + const opts = { + stdio: ['inherit', 'inherit', 'inherit', 'ipc'], + env, + }; + const child = new TestClient([], opts); + await child.id(); + await child.config({ app: 'histogram_test' }); + + // Prime histogram streams with initial requests. + const NUM_HTTP_TRANSACTIONS = 5; + for (let i = 0; i < NUM_HTTP_TRANSACTIONS; i++) { + await child.trace('http'); + } + + // Keep HTTP traffic flowing so each metrics interval has histogram data. + const trafficTimer = setInterval(() => { + child.trace('http').catch(() => {}); + }, 200); + + const previousWindows = new Map(); + const continuityValidated = new Set(); + + // Listen for metrics until we find exponential histograms with data. + let shutdownCalled = false; + grpcServer.on('metrics', mustCallAtLeast((data) => { + if (shutdownCalled) return; + const done = checkHistogramMetrics(data); + const windows = collectHistogramWindows(data); + for (const [metricName, window] of windows) { + const previous = previousWindows.get(metricName); + if (previous) { + assert.strictEqual( + window.start, + previous.end, + `${metricName}: current startTimeUnixNano should match previous timeUnixNano`, + ); + continuityValidated.add(metricName); + } + previousWindows.set(metricName, window); + } + + if (done && continuityValidated.size === expectedHistograms.length) { + shutdownCalled = true; + clearInterval(trafficTimer); + console.log('All exponential histograms validated'); + child.shutdown(0).then(() => { + grpcServer.close(); + resolve(); + }); + } + }, 1)); + })); + }); +} + +const testConfigs = [ + { + getEnv: (port) => { + return { + NODE_DEBUG_NATIVE: 'nsolid_grpc_agent', + NSOLID_GRPC_INSECURE: 1, + NSOLID_GRPC: `localhost:${port}`, + NSOLID_INTERVAL: 1000, + }; + }, + }, +]; + +for (const testConfig of testConfigs) { + await runTest(testConfig); + console.log('Test passed!'); +} diff --git a/test/integrations/express/v4/package.json b/test/integrations/express/v4/package.json new file mode 100644 index 0000000000..d499595bcb --- /dev/null +++ b/test/integrations/express/v4/package.json @@ -0,0 +1,8 @@ +{ + "name": "nsolid-integration-express-v4", + "version": "1.0.0", + "private": true, + "dependencies": { + "express": "^4.21.0" + } +} diff --git a/test/integrations/express/v4/test-http-route.mjs b/test/integrations/express/v4/test-http-route.mjs new file mode 100644 index 0000000000..68f9abcb70 --- /dev/null +++ b/test/integrations/express/v4/test-http-route.mjs @@ -0,0 +1,187 @@ +// Flags: --expose-internals +import { mustSucceed } from '../../../common/index.mjs'; +import assert from 'node:assert'; +import { existsSync } from 'node:fs'; +import { fileURLToPath } from 'node:url'; +import { dirname, join } from 'node:path'; +import { + GRPCServer, +} from '../../../common/nsolid-grpc-agent/index.js'; +import validators from 'internal/validators'; +import nsolid from 'nsolid'; + +const { + validateArray, +} = validators; + +const __dirname = dirname(fileURLToPath(import.meta.url)); + +// Skip test if dependencies not installed +if (!existsSync(join(__dirname, 'node_modules'))) { + console.log('SKIP: node_modules not found. Run "make test-integrations-prereqs" first.'); + process.exit(0); +} + +const { default: express } = await import('express'); + +function getAttr(attributes, key) { + return attributes.find((a) => a.key === key); +} + +function checkHttpRouteAttribute(metricsData, expectedRoutes) { + const resourceMetrics = metricsData.resourceMetrics; + if (!resourceMetrics || resourceMetrics.length === 0) return []; + + const foundRoutes = []; + + // Iterate over all resourceMetrics and their scopeMetrics + for (const resource of resourceMetrics) { + const scopeMetrics = resource.scopeMetrics; + if (!scopeMetrics || scopeMetrics.length === 0) continue; + + for (const scope of scopeMetrics) { + const metrics = scope.metrics; + if (!metrics) continue; + + for (const metric of metrics) { + if (metric.name !== 'http.server.request.duration') continue; + if (metric.data !== 'exponentialHistogram') continue; + + const dataPoints = metric.exponentialHistogram.dataPoints; + validateArray(dataPoints, 'dataPoints'); + + for (const dp of dataPoints) { + const count = parseInt(dp.count, 10); + if (count === 0) continue; + + // Check for http.route attribute + const routeAttr = getAttr(dp.attributes, 'http.route'); + if (routeAttr) { + const routeValue = routeAttr.value.stringValue; + console.log(`Found route: ${routeValue} (count: ${count})`); + if (expectedRoutes.has(routeValue) && !foundRoutes.includes(routeValue)) { + foundRoutes.push(routeValue); + } + } + } + } + } + } + + return foundRoutes; +} + +async function runTest() { + // Start gRPC server as child process + const grpcServer = new GRPCServer(); + + const grpcPort = await new Promise((resolve, reject) => { + grpcServer.start(mustSucceed((port) => { + console.log('gRPC server started on port', port); + resolve(port); + })); + }); + + // Configure NSolid to connect to our gRPC server + process.env.NSOLID_GRPC_INSECURE = '1'; + process.env.NODE_DEBUG_NATIVE = 'nsolid_grpc_agent'; + + // Initialize NSolid + nsolid.start({ grpc: `localhost:${grpcPort}`, interval: 500 }); + + // Create Express server with multiple routes including mounted router + const app = express(); + const apiRouter = express.Router(); + + // Mounted router routes - these test the baseUrl + route.path composition + apiRouter.get('/users/:id', (req, res) => { + res.json({ userId: req.params.id }); + }); + + apiRouter.get('/posts/:postId', (req, res) => { + res.json({ postId: req.params.postId }); + }); + + // Mount the router at /api + app.use('/api', apiRouter); + + // Direct route (not mounted) + app.get('/health', (req, res) => { + res.json({ status: 'ok' }); + }); + + // Track connections for forceful close + const connections = new Set(); + const server = await new Promise((resolve) => { + const s = app.listen(0, '127.0.0.1', () => { + console.log('Express server started on port', s.address().port); + resolve(s); + }); + s.on('connection', (conn) => { + connections.add(conn); + conn.on('close', () => connections.delete(conn)); + }); + }); + + const serverPort = server.address().port; + + // Track which routes we've seen + // Note: Mounted routes should have full path /api/users/:id, not just /users/:id + const expectedRoutes = new Set(['/api/users/:id', '/api/posts/:postId', '/health']); + const foundRoutes = new Set(); + + // Listen for metrics + const metricsPromise = new Promise((resolve, reject) => { + grpcServer.on('metrics', (data) => { + const found = checkHttpRouteAttribute(data, expectedRoutes); + for (const route of found) { + if (!foundRoutes.has(route)) { + foundRoutes.add(route); + console.log(`Validated route: ${route} (${foundRoutes.size}/${expectedRoutes.size})`); + } + } + + if (foundRoutes.size === expectedRoutes.size) { + console.log('All routes validated!'); + resolve(); + } + }); + }); + + // Make HTTP requests to trigger routes + console.log('Making HTTP requests...'); + + // Request to mounted routes /api/users/:id + let response = await fetch(`http://127.0.0.1:${serverPort}/api/users/123`); + assert.strictEqual(response.status, 200); + console.log('Requested /api/users/123'); + + // Request to mounted routes /api/posts/:postId + response = await fetch(`http://127.0.0.1:${serverPort}/api/posts/456`); + assert.strictEqual(response.status, 200); + console.log('Requested /api/posts/456'); + + // Request to direct route /health + response = await fetch(`http://127.0.0.1:${serverPort}/health`); + assert.strictEqual(response.status, 200); + console.log('Requested /health'); + + // Wait for all metrics to be reported + await metricsPromise; + + // Cleanup + console.log('Cleaning up...'); + for (const conn of connections) { + conn.destroy(); + } + + await new Promise((resolve) => { + server.close(() => { + grpcServer.close(); + resolve(); + }); + }); +} + +await runTest(); +console.log('Express v4 route test passed!'); diff --git a/test/integrations/express/v5/package.json b/test/integrations/express/v5/package.json new file mode 100644 index 0000000000..a1b0f0cefa --- /dev/null +++ b/test/integrations/express/v5/package.json @@ -0,0 +1,8 @@ +{ + "name": "nsolid-integration-express-v5", + "version": "1.0.0", + "private": true, + "dependencies": { + "express": "^5.0.0" + } +} diff --git a/test/integrations/express/v5/test-http-route.mjs b/test/integrations/express/v5/test-http-route.mjs new file mode 100644 index 0000000000..aa93e4733d --- /dev/null +++ b/test/integrations/express/v5/test-http-route.mjs @@ -0,0 +1,187 @@ +// Flags: --expose-internals +import { mustSucceed } from '../../../common/index.mjs'; +import assert from 'node:assert'; +import { existsSync } from 'node:fs'; +import { fileURLToPath } from 'node:url'; +import { dirname, join } from 'node:path'; +import { + GRPCServer, +} from '../../../common/nsolid-grpc-agent/index.js'; +import validators from 'internal/validators'; +import nsolid from 'nsolid'; + +const { + validateArray, +} = validators; + +const __dirname = dirname(fileURLToPath(import.meta.url)); + +// Skip test if dependencies not installed +if (!existsSync(join(__dirname, 'node_modules'))) { + console.log('SKIP: node_modules not found. Run "make test-integrations-prereqs" first.'); + process.exit(0); +} + +const { default: express } = await import('express'); + +function getAttr(attributes, key) { + return attributes.find((a) => a.key === key); +} + +function checkHttpRouteAttribute(metricsData, expectedRoutes) { + const resourceMetrics = metricsData.resourceMetrics; + if (!resourceMetrics || resourceMetrics.length === 0) return []; + + const foundRoutes = []; + + // Iterate over all resourceMetrics and their scopeMetrics + for (const resource of resourceMetrics) { + const scopeMetrics = resource.scopeMetrics; + if (!scopeMetrics || scopeMetrics.length === 0) continue; + + for (const scope of scopeMetrics) { + const metrics = scope.metrics; + if (!metrics) continue; + + for (const metric of metrics) { + if (metric.name !== 'http.server.request.duration') continue; + if (metric.data !== 'exponentialHistogram') continue; + + const dataPoints = metric.exponentialHistogram.dataPoints; + validateArray(dataPoints, 'dataPoints'); + + for (const dp of dataPoints) { + const count = parseInt(dp.count, 10); + if (count === 0) continue; + + // Check for http.route attribute + const routeAttr = getAttr(dp.attributes, 'http.route'); + if (routeAttr) { + const routeValue = routeAttr.value.stringValue; + console.log(`Found route: ${routeValue} (count: ${count})`); + if (expectedRoutes.has(routeValue) && !foundRoutes.includes(routeValue)) { + foundRoutes.push(routeValue); + } + } + } + } + } + } + + return foundRoutes; +} + +async function runTest() { + // Start gRPC server as child process + const grpcServer = new GRPCServer(); + + const grpcPort = await new Promise((resolve, reject) => { + grpcServer.start(mustSucceed((port) => { + console.log('gRPC server started on port', port); + resolve(port); + })); + }); + + // Configure NSolid to connect to our gRPC server + process.env.NSOLID_GRPC_INSECURE = '1'; + process.env.NODE_DEBUG_NATIVE = 'nsolid_grpc_agent'; + + // Initialize NSolid + nsolid.start({ grpc: `localhost:${grpcPort}`, interval: 500 }); + + // Create Express server with multiple routes including mounted router + const app = express(); + const apiRouter = express.Router(); + + // Mounted router routes - these test the baseUrl + route.path composition + apiRouter.get('/users/:id', (req, res) => { + res.json({ userId: req.params.id }); + }); + + apiRouter.get('/posts/:postId', (req, res) => { + res.json({ postId: req.params.postId }); + }); + + // Mount the router at /api + app.use('/api', apiRouter); + + // Direct route (not mounted) + app.get('/health', (req, res) => { + res.json({ status: 'ok' }); + }); + + // Track connections for forceful close + const connections = new Set(); + const server = await new Promise((resolve) => { + const s = app.listen(0, '127.0.0.1', () => { + console.log('Express server started on port', s.address().port); + resolve(s); + }); + s.on('connection', (conn) => { + connections.add(conn); + conn.on('close', () => connections.delete(conn)); + }); + }); + + const serverPort = server.address().port; + + // Track which routes we've seen + // Note: Mounted routes should have full path /api/users/:id, not just /users/:id + const expectedRoutes = new Set(['/api/users/:id', '/api/posts/:postId', '/health']); + const foundRoutes = new Set(); + + // Listen for metrics + const metricsPromise = new Promise((resolve, reject) => { + grpcServer.on('metrics', (data) => { + const found = checkHttpRouteAttribute(data, expectedRoutes); + for (const route of found) { + if (!foundRoutes.has(route)) { + foundRoutes.add(route); + console.log(`Validated route: ${route} (${foundRoutes.size}/${expectedRoutes.size})`); + } + } + + if (foundRoutes.size === expectedRoutes.size) { + console.log('All routes validated!'); + resolve(); + } + }); + }); + + // Make HTTP requests to trigger routes + console.log('Making HTTP requests...'); + + // Request to mounted routes /api/users/:id + let response = await fetch(`http://127.0.0.1:${serverPort}/api/users/123`); + assert.strictEqual(response.status, 200); + console.log('Requested /api/users/123'); + + // Request to mounted routes /api/posts/:postId + response = await fetch(`http://127.0.0.1:${serverPort}/api/posts/456`); + assert.strictEqual(response.status, 200); + console.log('Requested /api/posts/456'); + + // Request to direct route /health + response = await fetch(`http://127.0.0.1:${serverPort}/health`); + assert.strictEqual(response.status, 200); + console.log('Requested /health'); + + // Wait for all metrics to be reported + await metricsPromise; + + // Cleanup + console.log('Cleaning up...'); + for (const conn of connections) { + conn.destroy(); + } + + await new Promise((resolve) => { + server.close(() => { + grpcServer.close(); + resolve(); + }); + }); +} + +await runTest(); +console.log('Express v5 route test passed!'); diff --git a/test/integrations/fastify/v5/package.json b/test/integrations/fastify/v5/package.json new file mode 100644 index 0000000000..6a382e8b58 --- /dev/null +++ b/test/integrations/fastify/v5/package.json @@ -0,0 +1,8 @@ +{ + "name": "nsolid-integration-fastify-v5", + "version": "1.0.0", + "private": true, + "dependencies": { + "fastify": "^5.0.0" + } +} diff --git a/test/integrations/fastify/v5/test-http-route.mjs b/test/integrations/fastify/v5/test-http-route.mjs new file mode 100644 index 0000000000..9ce7b6a916 --- /dev/null +++ b/test/integrations/fastify/v5/test-http-route.mjs @@ -0,0 +1,160 @@ +// Flags: --expose-internals +import { mustSucceed } from '../../../common/index.mjs'; +import assert from 'node:assert'; +import { existsSync } from 'node:fs'; +import { fileURLToPath } from 'node:url'; +import { dirname, join } from 'node:path'; +import { + GRPCServer, +} from '../../../common/nsolid-grpc-agent/index.js'; +import validators from 'internal/validators'; +import nsolid from 'nsolid'; + +const { + validateArray, +} = validators; + +const __dirname = dirname(fileURLToPath(import.meta.url)); + +// Skip test if dependencies not installed +if (!existsSync(join(__dirname, 'node_modules'))) { + console.log('SKIP: node_modules not found. Run "make test-integrations-prereqs" first.'); + process.exit(0); +} + +const { default: Fastify } = await import('fastify'); + +function getAttr(attributes, key) { + return attributes.find((a) => a.key === key); +} + +function checkHttpRouteAttribute(metricsData, expectedRoutes) { + const resourceMetrics = metricsData.resourceMetrics; + if (!resourceMetrics || resourceMetrics.length === 0) return []; + + const foundRoutes = []; + + // Iterate over all resourceMetrics and their scopeMetrics + for (const resource of resourceMetrics) { + const scopeMetrics = resource.scopeMetrics; + if (!scopeMetrics || scopeMetrics.length === 0) continue; + + for (const scope of scopeMetrics) { + const metrics = scope.metrics; + if (!metrics) continue; + + for (const metric of metrics) { + if (metric.name !== 'http.server.request.duration') continue; + if (metric.data !== 'exponentialHistogram') continue; + + const dataPoints = metric.exponentialHistogram.dataPoints; + validateArray(dataPoints, 'dataPoints'); + + for (const dp of dataPoints) { + const count = parseInt(dp.count, 10); + if (count === 0) continue; + + const routeAttr = getAttr(dp.attributes, 'http.route'); + if (routeAttr) { + const routeValue = routeAttr.value.stringValue; + console.log(`Found route: ${routeValue} (count: ${count})`); + if (expectedRoutes.has(routeValue) && !foundRoutes.includes(routeValue)) { + foundRoutes.push(routeValue); + } + } + } + } + } + } + + return foundRoutes; +} + +async function runTest() { + // Start gRPC server as child process + const grpcServer = new GRPCServer(); + + const grpcPort = await new Promise((resolve, reject) => { + grpcServer.start(mustSucceed((port) => { + console.log('gRPC server started on port', port); + resolve(port); + })); + }); + + // Configure NSolid to connect to our gRPC server + process.env.NSOLID_GRPC_INSECURE = '1'; + process.env.NODE_DEBUG_NATIVE = 'nsolid_grpc_agent'; + + // Initialize NSolid + nsolid.start({ grpc: `localhost:${grpcPort}`, interval: 500 }); + + const fastify = Fastify({ logger: false }); + + fastify.get('/users/:id', async (request, reply) => { + return { userId: request.params.id }; + }); + + fastify.get('/posts/:postId', async (request, reply) => { + return { postId: request.params.postId }; + }); + + fastify.get('/health', async (request, reply) => { + return { status: 'ok' }; + }); + + // Create Fastify server with multiple routes + await fastify.listen({ port: 0, host: '127.0.0.1' }); + const serverPort = fastify.server.address().port; + console.log('Fastify server started on port', serverPort); + + // Track which routes we've seen + const expectedRoutes = new Set(['/users/:id', '/posts/:postId', '/health']); + const foundRoutes = new Set(); + + // Listen for metrics + const metricsPromise = new Promise((resolve, reject) => { + grpcServer.on('metrics', (data) => { + const found = checkHttpRouteAttribute(data, expectedRoutes); + for (const route of found) { + if (!foundRoutes.has(route)) { + foundRoutes.add(route); + console.log(`Validated route: ${route} (${foundRoutes.size}/${expectedRoutes.size})`); + } + } + + if (foundRoutes.size === expectedRoutes.size) { + console.log('All routes validated!'); + resolve(); + } + }); + }); + + // Make HTTP requests to trigger routes + console.log('Making HTTP requests...'); + + // Request to /users/:id + let response = await fetch(`http://127.0.0.1:${serverPort}/users/123`); + assert.strictEqual(response.status, 200); + console.log('Requested /users/123'); + + // Request to /posts/:postId + response = await fetch(`http://127.0.0.1:${serverPort}/posts/456`); + assert.strictEqual(response.status, 200); + console.log('Requested /posts/456'); + + // Request to /health + response = await fetch(`http://127.0.0.1:${serverPort}/health`); + assert.strictEqual(response.status, 200); + console.log('Requested /health'); + + // Wait for all metrics to be reported + await metricsPromise; + + // Cleanup + console.log('Cleaning up...'); + await fastify.close(); + grpcServer.close(); +} + +await runTest(); +console.log('Fastify v5 route test passed!'); diff --git a/test/integrations/testcfg.py b/test/integrations/testcfg.py new file mode 100644 index 0000000000..97e7367056 --- /dev/null +++ b/test/integrations/testcfg.py @@ -0,0 +1,25 @@ +import sys, os +sys.path.append(os.path.join(os.path.dirname(__file__), '..')) +import testpy + +class IntegrationTestConfiguration(testpy.SimpleTestConfiguration): + """Test configuration that finds test-*.mjs files in nested subdirectories.""" + + def Ls(self, path): + """Recursively find all test-*.mjs files in subdirectories, excluding node_modules.""" + result = [] + for root, dirs, files in os.walk(path): + # Skip node_modules directories + dirs[:] = [d for d in dirs if d != 'node_modules'] + for f in files: + if testpy.LS_RE.match(f): + # Get relative path from the test root + rel_dir = os.path.relpath(root, path) + if rel_dir == '.': + result.append(f) + else: + result.append(os.path.join(rel_dir, f)) + return result + +def GetConfiguration(context, root): + return IntegrationTestConfiguration(context, root, 'integrations') diff --git a/test/parallel/test-bootstrap-modules.js b/test/parallel/test-bootstrap-modules.js index 60dec089a8..6aec316be1 100644 --- a/test/parallel/test-bootstrap-modules.js +++ b/test/parallel/test-bootstrap-modules.js @@ -132,6 +132,7 @@ expected.atRunTime = new Set([ 'NativeModule internal/nsolid_loader', 'NativeModule internal/nsolid_promise_tracking', 'NativeModule internal/nsolid_diag', + 'NativeModule internal/nsolid_http_consts', 'NativeModule internal/nsolid_trace', 'NativeModule internal/otel/api', 'NativeModule internal/otel/context',