Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,6 @@ compile_commands.json
# Ignore asserts-cpp and nlohmann paths in src/
src/asserts-cpp/
src/nlohmann/

test/integrations/express/*/node_modules/
test/integrations/fastify/*/node_modules/
21 changes: 18 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ clean: ## Remove build artifacts.
$(MAKE) testclean
$(MAKE) test-addons-clean
$(MAKE) test-agents-prereqs-clean
$(MAKE) test-integrations-prereqs-clean
$(MAKE) bench-addons-clean

.PHONY: testclean
Expand All @@ -234,6 +235,8 @@ distclean: ## Remove all build and test artifacts.
$(RM) -r deps/icu
$(RM) -r deps/icu4c*.tgz deps/icu4c*.zip deps/icu-tmp
$(RM) $(BINARYTAR).* $(TARBALL).*
$(MAKE) test-agents-prereqs-clean
$(MAKE) test-integrations-prereqs-clean

.PHONY: check
check: test
Expand Down Expand Up @@ -318,7 +321,7 @@ v8: ## Build deps/v8.
tools/make-v8.sh $(V8_ARCH).$(BUILDTYPE_LOWER) $(V8_BUILD_OPTIONS)

.PHONY: jstest
jstest: build-addons build-js-native-api-tests build-node-api-tests build-sqlite-tests test-agents-prereqs ## Runs addon tests and JS tests.
jstest: build-addons build-js-native-api-tests build-node-api-tests build-sqlite-tests test-agents-prereqs test-integrations-prereqs ## Runs addon tests and JS tests.
NSOLID_DELAY_INIT="" \
$(PYTHON) tools/test.py $(PARALLEL_ARGS) --mode=$(BUILDTYPE_LOWER) \
$(TEST_CI_ARGS) \
Expand Down Expand Up @@ -596,7 +599,7 @@ test-ci-native: | benchmark/napi/.buildstamp test/addons/.buildstamp test/js-nat
.PHONY: test-ci-js
# This target should not use a native compiler at all
# Related CI job: node-test-commit-arm-fanned
test-ci-js: | clear-stalled ## Build and test JavaScript with building anything else.
test-ci-js: | clear-stalled test-agents-prereqs test-integrations-prereqs ## Build and test JavaScript with building anything else.
$(PYTHON) tools/test.py $(PARALLEL_ARGS) -p tap --logfile test.tap \
--mode=$(BUILDTYPE_LOWER) --flaky-tests=$(FLAKY_TESTS) \
--skip-tests=$(CI_SKIP_TESTS) \
Expand All @@ -611,7 +614,7 @@ test-ci-js: | clear-stalled ## Build and test JavaScript with building anything
.PHONY: test-ci
# Related CI jobs: most CI tests, excluding node-test-commit-arm-fanned
test-ci: LOGLEVEL := info ## Build and test everything (CI).
test-ci: | clear-stalled bench-addons-build build-addons build-js-native-api-tests build-node-api-tests build-sqlite-tests doc-only test-agents-prereqs
test-ci: | clear-stalled bench-addons-build build-addons build-js-native-api-tests build-node-api-tests build-sqlite-tests doc-only test-agents-prereqs test-integrations-prereqs
out/Release/cctest --gtest_output=xml:out/junit/cctest.xml
$(PYTHON) tools/test.py $(PARALLEL_ARGS) -p tap --logfile test.tap \
--mode=$(BUILDTYPE_LOWER) --flaky-tests=$(FLAKY_TESTS) \
Expand Down Expand Up @@ -1695,6 +1698,18 @@ test-agents-prereqs-clean:
$(RM) -r test/common/nsolid-zmq-agent/node_modules
$(RM) -r test/common/nsolid-otlp-agent/node_modules

.PHONY: test-integrations-prereqs
test-integrations-prereqs:
env npm_config_nodedir=$(PWD) $(NODE) ./deps/npm install express@4 --prefix test/integrations/express/v4 --no-save --no-package-lock
env npm_config_nodedir=$(PWD) $(NODE) ./deps/npm install express@5 --prefix test/integrations/express/v5 --no-save --no-package-lock
env npm_config_nodedir=$(PWD) $(NODE) ./deps/npm install fastify@5 --prefix test/integrations/fastify/v5 --no-save --no-package-lock

.PHONY: test-integrations-prereqs-clean
test-integrations-prereqs-clean:
$(RM) -r test/integrations/express/v4/node_modules
$(RM) -r test/integrations/express/v5/node_modules
$(RM) -r test/integrations/fastify/v5/node_modules

HAS_DOCKER ?= $(shell command -v docker > /dev/null 2>&1; [ $$? -eq 0 ] && echo 1 || echo 0)

.PHONY: gen-openssl
Expand Down
69 changes: 59 additions & 10 deletions agents/grpc/src/grpc_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ namespace node {
namespace nsolid {
namespace grpc {

using ThreadMetricsMap = std::map<uint64_t, ThreadMetrics::MetricsStor>;
using CachedThreadMetricsMap = std::map<uint64_t, CachedThreadMetrics>;

constexpr uint64_t span_timer_interval = 1000;
constexpr size_t span_msg_q_min_size = 1000;
Expand Down Expand Up @@ -250,7 +250,7 @@ void PopulateInfoEvent(grpcagent::InfoEvent* info_event,

void PopulateMetricsEvent(grpcagent::MetricsEvent* metrics_event,
const ProcessMetrics::MetricsStor& proc_metrics,
const ThreadMetricsMap& env_metrics,
const CachedThreadMetricsMap& env_metrics,
const char* req_id) {
// Fill in the fields of the MetricsResponse.
PopulateCommon(metrics_event->mutable_common(), "metrics", req_id);
Expand All @@ -261,8 +261,18 @@ void PopulateMetricsEvent(grpcagent::MetricsEvent* metrics_event,

// As this is the cached we're sending, we pass the same value for prev_stor.
otlp::fill_proc_metrics(metrics, proc_metrics, proc_metrics, false);
for (const auto& [env_id, env_metrics_stor] : env_metrics) {
for (const auto& [env_id, cached_metrics] : env_metrics) {
const auto& env_metrics_stor = cached_metrics.stor;

otlp::fill_env_metrics(metrics, env_metrics_stor, false);
// Add exponential histogram metrics for HTTP latency.
otlp::fill_http_histograms(
metrics,
env_metrics_stor,
cached_metrics.http_client_points,
cached_metrics.http_server_points,
false,
cached_metrics.hist_start_ts_ms);
}

data.scope_metric_data_ =
Expand Down Expand Up @@ -865,14 +875,17 @@ void GrpcAgent::env_deletion_cb_(SharedEnvInst envinst,
EnvInst::Scope scp(envinst);
if (scp.Success()) {
bool creation = std::get<1>(tup);
uint64_t thread_id = GetThreadId(envinst);
if (creation) {
auto pair = agent->env_metrics_map_.emplace(
std::piecewise_construct,
std::forward_as_tuple(GetThreadId(envinst)),
std::forward_as_tuple(thread_id),
std::forward_as_tuple(envinst));
ASSERT(pair.second);
} else {
agent->env_metrics_map_.erase(GetThreadId(envinst));
agent->env_metrics_map_.erase(thread_id);
agent->thr_metrics_hist_prev_end_ts_ms_.erase(thread_id);
agent->thr_metrics_cache_.erase(thread_id);
}
}
}
Expand Down Expand Up @@ -939,10 +952,35 @@ void GrpcAgent::env_deletion_cb_(SharedEnvInst envinst,
data.resource_ = otlp::GetResource();
std::vector<MetricData> metrics;

ThreadMetricsStor stor;
while (agent->thr_metrics_msg_q_.dequeue(stor)) {
ExtMetricsStor ext_stor;
while (agent->thr_metrics_msg_q_.dequeue(ext_stor)) {
auto& stor = ext_stor.stor;
otlp::fill_env_metrics(metrics, stor, false);
agent->thr_metrics_cache_.insert_or_assign(stor.thread_id, std::move(stor));
uint64_t thread_id = stor.thread_id;
uint64_t start_ts_ms = 0;
auto prev_it = agent->thr_metrics_hist_prev_end_ts_ms_.find(thread_id);
if (prev_it != agent->thr_metrics_hist_prev_end_ts_ms_.end()) {
start_ts_ms = prev_it->second;
}
// Add exponential histogram metrics for HTTP latency.
otlp::fill_http_histograms(
metrics,
stor,
ext_stor.http_client_points,
ext_stor.http_server_points,
false,
start_ts_ms);
agent->thr_metrics_hist_prev_end_ts_ms_.insert_or_assign(
thread_id,
stor.timestamp);
agent->thr_metrics_cache_.insert_or_assign(
thread_id,
CachedThreadMetrics{
std::move(stor),
start_ts_ms,
ext_stor.http_client_points,
ext_stor.http_server_points
});
Comment on lines +955 to +983
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Drop queued samples for threads that are already gone.

env_msg_cb_() now erases thr_metrics_cache_ on thread removal, but thr_metrics_msg_q_ can already contain an ExtMetricsStor harvested just before that delete callback ran. If the delete async is processed first, this loop exports the sample and inserts the dead thread right back into thr_metrics_cache_. Skip or purge queued entries whose thread_id is no longer present in env_metrics_map_ before exporting/caching them.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agents/grpc/src/grpc_agent.cc` around lines 955 - 983, The loop processing
dequeued ExtMetricsStor items currently re-inserts metrics for threads that were
removed (env_msg_cb_ erased thr_metrics_cache_) because queued items can arrive
after deletion; before calling otlp::fill_env_metrics,
otlp::fill_http_histograms or updating thr_metrics_cache_ and
thr_metrics_hist_prev_end_ts_ms_, check whether ext_stor.stor.thread_id exists
in env_metrics_map_ (or equivalent live-thread set) and if not simply skip/purge
this dequeued item (continue) so dead threads are not exported or re-added to
thr_metrics_cache_; apply this check around the body handling ExtMetricsStor
(using ExtMetricsStor, thr_metrics_msg_q_.dequeue, thr_metrics_cache_,
thr_metrics_hist_prev_end_ts_ms_, and env_metrics_map_ names to locate the
code).

}

data.scope_metric_data_ =
Expand Down Expand Up @@ -991,8 +1029,18 @@ void GrpcAgent::env_deletion_cb_(SharedEnvInst envinst,
return;
}

if (agent->thr_metrics_msg_q_.enqueue(metrics->Get()) == 1) {
ASSERT_EQ(0, uv_async_send(&agent->metrics_msg_));
uint64_t thread_id = metrics->thread_id();
auto envinst_sp = EnvInst::GetInst(thread_id);
if (envinst_sp != nullptr) {
ExtMetricsStor ext_stor{
metrics->Get(),
envinst_sp->http_client_histogram_points(),
envinst_sp->http_server_histogram_points()
};

if (agent->thr_metrics_msg_q_.enqueue(std::move(ext_stor)) == 1) {
ASSERT_EQ(0, uv_async_send(&agent->metrics_msg_));
}
}
}

Expand Down Expand Up @@ -2080,6 +2128,7 @@ void GrpcAgent::setup_blocked_loop_hooks() {

int GrpcAgent::setup_metrics_timer(uint64_t period) {
if (period == 0) {
thr_metrics_hist_prev_end_ts_ms_.clear();
return metrics_timer_.stop();
}

Expand Down
27 changes: 22 additions & 5 deletions agents/grpc/src/grpc_agent.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
#ifndef AGENTS_GRPC_SRC_GRPC_AGENT_H_
#define AGENTS_GRPC_SRC_GRPC_AGENT_H_

#include <nsolid.h>
#include <nsolid/async_ts_queue.h>
#include <nsolid/thread_safe.h>
#include "nsolid.h"
#include "nsolid/nsolid_metrics_types.h"
#include "nsolid/async_ts_queue.h"
#include "nsolid/thread_safe.h"
#include <memory>
#include <vector>
#include "grpcpp/grpcpp.h"
#include "./proto/nsolid_service.grpc.pb.h"
#include "opentelemetry/version.h"
#include "opentelemetry/sdk/metrics/data/metric_data.h"
#include "opentelemetry/sdk/trace/recordable.h"
#include "../../src/profile_collector.h"
#include "asset_stream.h"
Expand Down Expand Up @@ -50,6 +53,19 @@ using UniqRecordables = std::vector<UniqRecordable>;
using SharedGrpcAgent = std::shared_ptr<GrpcAgent>;
using WeakGrpcAgent = std::weak_ptr<GrpcAgent>;

struct ExtMetricsStor {
ThreadMetrics::MetricsStor stor;
SharedPointDataAttributes http_client_points;
SharedPointDataAttributes http_server_points;
};

struct CachedThreadMetrics {
ThreadMetrics::MetricsStor stor;
uint64_t hist_start_ts_ms = 0;
SharedPointDataAttributes http_client_points;
SharedPointDataAttributes http_server_points;
};

struct JSThreadMetrics {
explicit JSThreadMetrics(SharedEnvInst envinst);
SharedThreadMetrics metrics_;
Expand Down Expand Up @@ -324,11 +340,12 @@ class GrpcAgent: public std::enable_shared_from_this<GrpcAgent>,
ProcessMetrics::MetricsStor proc_prev_stor_;
std::map<uint64_t, JSThreadMetrics> env_metrics_map_;
nsuv::ns_async metrics_msg_;
TSQueue<ThreadMetrics::MetricsStor> thr_metrics_msg_q_;
TSQueue<ExtMetricsStor> thr_metrics_msg_q_;
nsuv::ns_timer metrics_timer_;
std::unique_ptr<opentelemetry::v1::exporter::otlp::OtlpGrpcMetricExporter>
metrics_exporter_;
std::map<uint64_t, ThreadMetrics::MetricsStor> thr_metrics_cache_;
std::map<uint64_t, CachedThreadMetrics> thr_metrics_cache_;
std::map<uint64_t, uint64_t> thr_metrics_hist_prev_end_ts_ms_;

// For the Configuration API
nsuv::ns_async config_msg_;
Expand Down
64 changes: 64 additions & 0 deletions agents/otlp/src/otlp_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include "opentelemetry/semconv/incubating/process_attributes.h"
#include "opentelemetry/semconv/incubating/service_attributes.h"
#include "opentelemetry/semconv/incubating/thread_attributes.h"
#include "opentelemetry/metrics/sync_instruments.h"
#include "opentelemetry/semconv/http_metrics.h"
#include "opentelemetry/sdk/instrumentationscope/instrumentation_scope.h"
#include "opentelemetry/sdk/logs/recordable.h"
#include "opentelemetry/sdk/trace/recordable.h"
Expand All @@ -28,6 +30,7 @@ using opentelemetry::sdk::instrumentationscope::InstrumentationScope;
using LogsRecordable = opentelemetry::sdk::logs::Recordable;
using opentelemetry::sdk::common::OwnedAttributeType;
using opentelemetry::sdk::metrics::AggregationTemporality;
using opentelemetry::sdk::metrics::Base2ExponentialHistogramPointData;
using opentelemetry::sdk::metrics::MetricData;
using opentelemetry::sdk::metrics::InstrumentDescriptor;
using opentelemetry::sdk::metrics::InstrumentType;
Expand All @@ -45,6 +48,10 @@ using opentelemetry::trace::SpanKind;
using opentelemetry::trace::TraceFlags;
using opentelemetry::trace::TraceId;
using opentelemetry::trace::propagation::detail::HexToBinary;
using opentelemetry::semconv::http::kMetricHttpClientRequestDuration;
using opentelemetry::semconv::http::kMetricHttpServerRequestDuration;
using opentelemetry::semconv::http::unitMetricHttpClientRequestDuration;
using opentelemetry::semconv::http::unitMetricHttpServerRequestDuration;
using opentelemetry::semconv::process::kProcessOwner;
using opentelemetry::semconv::service::kServiceName;
using opentelemetry::semconv::service::kServiceInstanceId;
Expand Down Expand Up @@ -361,6 +368,63 @@ NSOLID_ENV_METRICS_NUMBERS(V)
attrs);
}

void fill_http_histograms(
std::vector<MetricData>& metrics, // NOLINT(runtime/references)
const ThreadMetrics::MetricsStor& stor,
SharedPointDataAttributes http_client_points,
SharedPointDataAttributes http_server_points,
bool use_snake_case,
uint64_t start_timestamp_ms) {
time_point end{
duration_cast<time_point::duration>(
milliseconds(static_cast<uint64_t>(stor.timestamp)))};
time_point start = process_start;
if (start_timestamp_ms != 0) {
start = time_point{
duration_cast<time_point::duration>(milliseconds(start_timestamp_ms))};
if (start > end) {
start = end;
}
}

// Merge thread-level attributes into each point and emit one MetricData
// per histogram type containing all per-attribute-combo points.
auto emit = [&metrics, &stor, &start, &end](
const char* metric_name,
const char* unit,
SharedPointDataAttributes points) {
if (!points || points->empty()) return;
std::vector<PointDataAttributes> enriched;
enriched.reserve(points->size());
for (const auto& pt : *points) {
PointAttributes attrs = pt.attributes;
attrs.insert({ kThreadId, static_cast<int64_t>(stor.thread_id) });
attrs.insert({ kThreadName, stor.thread_name });
enriched.push_back({ std::move(attrs), pt.point_data });
}
MetricData metric_data{
InstrumentDescriptor{
metric_name,
"",
unit,
InstrumentType::kHistogram,
InstrumentValueType::kDouble },
AggregationTemporality::kDelta,
SystemTimestamp{ start },
SystemTimestamp{ end },
std::move(enriched)
};
metrics.push_back(std::move(metric_data));
};

emit(kMetricHttpClientRequestDuration,
unitMetricHttpClientRequestDuration,
http_client_points);
emit(kMetricHttpServerRequestDuration,
unitMetricHttpServerRequestDuration,
http_server_points);
}

void fill_log_recordable(LogsRecordable* recordable,
const LogWriteInfo& info) {
recordable->SetBody(info.msg);
Expand Down
10 changes: 10 additions & 0 deletions agents/otlp/src/otlp_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
#define AGENTS_OTLP_SRC_OTLP_COMMON_H_

#include "nsolid.h"
#include "nsolid/nsolid_metrics_types.h"
#include "opentelemetry/sdk/metrics/data/metric_data.h"
#include "opentelemetry/sdk/metrics/data/point_data.h"
#include "opentelemetry/sdk/resource/resource.h"

// Class pre-declaration
Expand Down Expand Up @@ -56,6 +58,14 @@ void fill_env_metrics(std::vector<opentelemetry::sdk::metrics::MetricData>&,
const ThreadMetrics::MetricsStor& stor,
bool use_snake_case = true);

void fill_http_histograms(
std::vector<opentelemetry::sdk::metrics::MetricData>&,
const ThreadMetrics::MetricsStor& stor,
SharedPointDataAttributes http_client_points,
SharedPointDataAttributes http_server_points,
bool use_snake_case = true,
uint64_t start_timestamp_ms = 0);

void fill_log_recordable(OPENTELEMETRY_NAMESPACE::sdk::logs::Recordable*,
const LogWriteInfo&);

Expand Down
2 changes: 2 additions & 0 deletions deps/opentelemetry-cpp/otlp-http-exporter.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
'sdk/src/common/global_log_handler.cc',
'sdk/src/logs/exporter.cc',
'sdk/src/logs/readable_log_record.cc',
'sdk/src/metrics/aggregation/base2_exponential_histogram_aggregation.cc',
'sdk/src/metrics/aggregation/base2_exponential_histogram_indexer.cc',
'sdk/src/metrics/data/circular_buffer.cc',
'sdk/src/resource/resource.cc',
'sdk/src/resource/resource_detector.cc',
Expand Down
Loading
Loading