Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ arrow = { version = "57.0.0", features = ["ipc_compression"] }
bigdecimal = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
metrics = "0.24"
opendal = "0.53"
jiff = { version = "0.2" }
2 changes: 2 additions & 0 deletions crates/fluss/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ serde = { workspace = true, features = ["rc"] }
serde_json = { workspace = true }
thiserror = "1.0"
log = { version = "0.4", features = ["kv_std"] }
metrics = { workspace = true }
tokio = { workspace = true }
parking_lot = "0.12"
bytes = "1.10.1"
Expand All @@ -77,6 +78,7 @@ strum_macros = "0.26"
jiff = { workspace = true, features = ["js"] }

[dev-dependencies]
metrics-util = "0.20"
testcontainers = "0.25.0"
test-env-helpers = "0.2.2"

Expand Down
1 change: 1 addition & 0 deletions crates/fluss/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub use cluster::{ServerNode, ServerType};

pub mod config;
pub mod error;
pub mod metrics;

mod bucketing;
mod compression;
Expand Down
270 changes: 270 additions & 0 deletions crates/fluss/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Metric name constants and helpers for fluss-rust client instrumentation.
//!
//! Uses the [`metrics`] crate facade pattern: library code emits metrics via
//! `counter!`/`gauge!`/`histogram!` macros, and the application installs a
//! recorder (e.g. `metrics-exporter-prometheus`) to collect them. When no
//! recorder is installed, all metric calls are no-ops with zero overhead.

use crate::rpc::ApiKey;

// ---------------------------------------------------------------------------
// Label keys
// ---------------------------------------------------------------------------

pub const LABEL_API_KEY: &str = "api_key";

// ---------------------------------------------------------------------------
// Connection / RPC metrics
//
// Java reference: ConnectionMetrics.java, ClientMetricGroup.java, MetricNames.java
//
// Byte counting matches Java semantics: both sides count only the API message
// body, excluding the protocol header and framing.
// Java: rawRequest.totalSize() / response.totalSize() (see MessageCodec.java).
// Rust: buf.len() - REQUEST_HEADER_LENGTH for sent bytes,
// buffer.len() - cursor.position() for received bytes.
// ---------------------------------------------------------------------------

pub const CLIENT_REQUESTS_TOTAL: &str = "fluss.client.requests.total";
pub const CLIENT_RESPONSES_TOTAL: &str = "fluss.client.responses.total";
pub const CLIENT_BYTES_SENT_TOTAL: &str = "fluss.client.bytes_sent.total";
pub const CLIENT_BYTES_RECEIVED_TOTAL: &str = "fluss.client.bytes_received.total";
pub const CLIENT_REQUEST_LATENCY_MS: &str = "fluss.client.request_latency_ms";
pub const CLIENT_REQUESTS_IN_FLIGHT: &str = "fluss.client.requests_in_flight";

/// Returns a label value for reportable API keys, matching Java's
/// `ConnectionMetrics.REPORT_API_KEYS` filter (`ProduceLog`, `FetchLog`,
/// `PutKv`, `Lookup`). Returns `None` for admin/metadata/auth calls to
/// avoid metric cardinality bloat.
pub(crate) fn api_key_label(api_key: ApiKey) -> Option<&'static str> {
match api_key {
ApiKey::ProduceLog => Some("produce_log"),
ApiKey::FetchLog => Some("fetch_log"),
ApiKey::PutKv => Some("put_kv"),
ApiKey::Lookup => Some("lookup"),
_ => None,
}
}

#[cfg(test)]
mod tests {
use super::*;
use metrics_util::debugging::DebuggingRecorder;

macro_rules! find_counter {
($entries:expr, $name:expr) => {
$entries.iter().find_map(|(key, _, _, val)| {
if key.key().name() == $name {
match val {
metrics_util::debugging::DebugValue::Counter(v) => Some(*v),
_ => None,
}
} else {
None
}
})
};
}

macro_rules! find_histogram {
($entries:expr, $name:expr) => {
$entries.iter().find_map(|(key, _, _, val)| {
if key.key().name() == $name {
match val {
metrics_util::debugging::DebugValue::Histogram(v) => {
Some(v.iter().map(|f| f.into_inner()).collect::<Vec<_>>())
}
_ => None,
}
} else {
None
}
})
};
}

macro_rules! find_gauge {
($entries:expr, $name:expr) => {
$entries.iter().find_map(|(key, _, _, val)| {
if key.key().name() == $name {
match val {
metrics_util::debugging::DebugValue::Gauge(g) => Some(g.into_inner()),
_ => None,
}
} else {
None
}
})
};
}

#[test]
fn reportable_api_keys_return_label() {
assert_eq!(api_key_label(ApiKey::ProduceLog), Some("produce_log"));
assert_eq!(api_key_label(ApiKey::FetchLog), Some("fetch_log"));
assert_eq!(api_key_label(ApiKey::PutKv), Some("put_kv"));
assert_eq!(api_key_label(ApiKey::Lookup), Some("lookup"));
}

#[test]
fn non_reportable_api_keys_return_none() {
assert_eq!(api_key_label(ApiKey::MetaData), None);
assert_eq!(api_key_label(ApiKey::CreateTable), None);
assert_eq!(api_key_label(ApiKey::Authenticate), None);
assert_eq!(api_key_label(ApiKey::ListDatabases), None);
assert_eq!(api_key_label(ApiKey::GetTable), None);
}

#[test]
fn reportable_request_records_all_connection_metrics() {
let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();

metrics::with_local_recorder(&recorder, || {
let label = api_key_label(ApiKey::ProduceLog).unwrap();

metrics::counter!(CLIENT_REQUESTS_TOTAL, LABEL_API_KEY => label).increment(1);
metrics::counter!(CLIENT_BYTES_SENT_TOTAL, LABEL_API_KEY => label).increment(256);
metrics::gauge!(CLIENT_REQUESTS_IN_FLIGHT, LABEL_API_KEY => label).increment(1.0);

metrics::counter!(CLIENT_RESPONSES_TOTAL, LABEL_API_KEY => label).increment(1);
metrics::counter!(CLIENT_BYTES_RECEIVED_TOTAL, LABEL_API_KEY => label).increment(128);
metrics::histogram!(CLIENT_REQUEST_LATENCY_MS, LABEL_API_KEY => label).record(42.5);
metrics::gauge!(CLIENT_REQUESTS_IN_FLIGHT, LABEL_API_KEY => label).decrement(1.0);
});

let snapshot = snapshotter.snapshot();
let entries: Vec<_> = snapshot.into_vec();

assert_eq!(find_counter!(entries, CLIENT_REQUESTS_TOTAL), Some(1));
assert_eq!(find_counter!(entries, CLIENT_RESPONSES_TOTAL), Some(1));
assert_eq!(find_counter!(entries, CLIENT_BYTES_SENT_TOTAL), Some(256));
assert_eq!(
find_counter!(entries, CLIENT_BYTES_RECEIVED_TOTAL),
Some(128)
);
assert_eq!(
find_histogram!(entries, CLIENT_REQUEST_LATENCY_MS),
Some(vec![42.5])
);
assert_eq!(find_gauge!(entries, CLIENT_REQUESTS_IN_FLIGHT), Some(0.0));

let has_label = entries.iter().all(|(key, _, _, _)| {
key.key()
.labels()
.any(|l| l.key() == LABEL_API_KEY && l.value() == "produce_log")
});
assert!(has_label, "all metrics must carry the api_key label");
}

#[test]
fn non_reportable_request_records_no_metrics() {
let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();

metrics::with_local_recorder(&recorder, || {
let label = api_key_label(ApiKey::MetaData);
assert!(label.is_none());
// When label is None, no metrics calls are made (matching request() logic).
});

let snapshot = snapshotter.snapshot();
assert!(
snapshot.into_vec().is_empty(),
"non-reportable API keys must not produce metrics"
);
}

#[test]
fn inflight_gauge_nets_to_zero_after_balanced_calls() {
let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();

metrics::with_local_recorder(&recorder, || {
let label = api_key_label(ApiKey::FetchLog).unwrap();

// Simulate 3 concurrent requests completing
for _ in 0..3 {
metrics::gauge!(CLIENT_REQUESTS_IN_FLIGHT, LABEL_API_KEY => label).increment(1.0);
}
for _ in 0..3 {
metrics::gauge!(CLIENT_REQUESTS_IN_FLIGHT, LABEL_API_KEY => label).decrement(1.0);
}
});

let snapshot = snapshotter.snapshot();
let entries: Vec<_> = snapshot.into_vec();
assert_eq!(
find_gauge!(entries, CLIENT_REQUESTS_IN_FLIGHT),
Some(0.0),
"in-flight gauge should be 0 after balanced inc/dec"
);
}

#[test]
fn different_api_keys_produce_separate_metric_series() {
use std::collections::HashMap;

let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();

metrics::with_local_recorder(&recorder, || {
let produce_label = api_key_label(ApiKey::ProduceLog).unwrap();
let fetch_label = api_key_label(ApiKey::FetchLog).unwrap();

metrics::counter!(CLIENT_REQUESTS_TOTAL, LABEL_API_KEY => produce_label).increment(5);
metrics::counter!(CLIENT_REQUESTS_TOTAL, LABEL_API_KEY => fetch_label).increment(3);
});

let snapshot = snapshotter.snapshot();
let entries: Vec<_> = snapshot.into_vec();

let request_entries: Vec<_> = entries
.iter()
.filter(|(key, _, _, _)| key.key().name() == CLIENT_REQUESTS_TOTAL)
.collect();

assert_eq!(
request_entries.len(),
2,
"produce_log and fetch_log should be separate metric series"
);

let mut counter_by_api_key: HashMap<String, u64> = HashMap::new();
for (key, _, _, val) in request_entries {
let api_key = key
.key()
.labels()
.find(|label| label.key() == LABEL_API_KEY)
.map(|label| label.value())
.expect("requests total metric must include api_key label");

let counter_value = match val {
metrics_util::debugging::DebugValue::Counter(v) => *v,
other => panic!("expected Counter, got {other:?}"),
};

counter_by_api_key.insert(api_key.to_string(), counter_value);
}

assert_eq!(counter_by_api_key.get("produce_log"), Some(&5));
assert_eq!(counter_by_api_key.get("fetch_log"), Some(&3));
}
}
3 changes: 1 addition & 2 deletions crates/fluss/src/rpc/message/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ use crate::rpc::message::{ReadVersionedType, WriteVersionedType};
use bytes::{Buf, BufMut};
use prost::Message;

#[allow(dead_code)]
const REQUEST_HEADER_LENGTH: i32 = 8;
pub(crate) const REQUEST_HEADER_LENGTH: usize = 8;
const SUCCESS_RESPONSE: u8 = 0;
#[allow(dead_code)]
const ERROR_RESPONSE: u8 = 1;
Expand Down
1 change: 1 addition & 0 deletions crates/fluss/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

mod api_key;
pub(crate) use api_key::ApiKey;
mod api_version;
pub mod error;
mod fluss_api_error;
Expand Down
Loading