From d62a1f8907cafb78252d6b162cb4f552021c5661 Mon Sep 17 00:00:00 2001 From: charlesdong1991 Date: Sun, 1 Mar 2026 18:40:53 +0100 Subject: [PATCH 1/4] Create metrics framework and add connection metrics --- Cargo.toml | 1 + crates/fluss/Cargo.toml | 2 + crates/fluss/src/lib.rs | 1 + crates/fluss/src/metrics.rs | 229 +++++++ crates/fluss/src/rpc/mod.rs | 1 + crates/fluss/src/rpc/server_connection.rs | 690 +++++++++++++++++++++- 6 files changed, 919 insertions(+), 5 deletions(-) create mode 100644 crates/fluss/src/metrics.rs diff --git a/Cargo.toml b/Cargo.toml index 77d71400..c24838eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,5 +40,6 @@ arrow = { version = "57.0.0", features = ["ipc_compression"] } bigdecimal = "0.4" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +metrics = "0.24" opendal = "0.53" jiff = { version = "0.2" } diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml index db1348a0..66d481ee 100644 --- a/crates/fluss/Cargo.toml +++ b/crates/fluss/Cargo.toml @@ -55,6 +55,7 @@ serde = { workspace = true, features = ["rc"] } serde_json = { workspace = true } thiserror = "1.0" log = { version = "0.4", features = ["kv_std"] } +metrics = { workspace = true } tokio = { workspace = true } parking_lot = "0.12" bytes = "1.10.1" @@ -77,6 +78,7 @@ strum_macros = "0.26" jiff = { workspace = true, features = ["js"] } [dev-dependencies] +metrics-util = "0.20" testcontainers = "0.25.0" test-env-helpers = "0.2.2" diff --git a/crates/fluss/src/lib.rs b/crates/fluss/src/lib.rs index 689c37ca..e77c38be 100644 --- a/crates/fluss/src/lib.rs +++ b/crates/fluss/src/lib.rs @@ -26,6 +26,7 @@ pub use cluster::{ServerNode, ServerType}; pub mod config; pub mod error; +pub mod metrics; mod bucketing; mod compression; diff --git a/crates/fluss/src/metrics.rs b/crates/fluss/src/metrics.rs new file mode 100644 index 00000000..3c51f5c8 --- /dev/null +++ b/crates/fluss/src/metrics.rs @@ -0,0 +1,229 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Metric name constants and helpers for fluss-rust client instrumentation. +//! +//! Uses the [`metrics`] crate facade pattern: library code emits metrics via +//! `counter!`/`gauge!`/`histogram!` macros, and the application installs a +//! recorder (e.g. `metrics-exporter-prometheus`) to collect them. When no +//! recorder is installed, all metric calls are no-ops with zero overhead. + +use crate::rpc::ApiKey; + +// --------------------------------------------------------------------------- +// Label keys +// --------------------------------------------------------------------------- + +pub const LABEL_API_KEY: &str = "api_key"; + +// --------------------------------------------------------------------------- +// Connection / RPC metrics +// +// Java reference: ConnectionMetrics.java, ClientMetricGroup.java, MetricNames.java +// +// Note on bytes_received: Rust counts the response body length (after the +// ResponseHeader). Java uses ApiMessage.totalSize() which may include framing. +// Absolute values can differ slightly; the semantic (bytes received per +// response) is the same. +// --------------------------------------------------------------------------- + +pub const CLIENT_REQUESTS_TOTAL: &str = "fluss.client.requests.total"; +pub const CLIENT_RESPONSES_TOTAL: &str = "fluss.client.responses.total"; +pub const CLIENT_BYTES_SENT_TOTAL: &str = "fluss.client.bytes_sent.total"; +pub const CLIENT_BYTES_RECEIVED_TOTAL: &str = "fluss.client.bytes_received.total"; +pub const CLIENT_REQUEST_LATENCY_MS: &str = "fluss.client.request_latency_ms"; +pub const CLIENT_REQUESTS_IN_FLIGHT: &str = "fluss.client.requests_in_flight"; + +/// Returns a label value for reportable API keys, matching Java's +/// `ConnectionMetrics.REPORT_API_KEYS` filter (`ProduceLog`, `FetchLog`, +/// `PutKv`, `Lookup`). Returns `None` for admin/metadata/auth calls to +/// avoid metric cardinality bloat. +pub(crate) fn api_key_label(api_key: ApiKey) -> Option<&'static str> { + match api_key { + ApiKey::ProduceLog => Some("produce_log"), + ApiKey::FetchLog => Some("fetch_log"), + ApiKey::PutKv => Some("put_kv"), + ApiKey::Lookup => Some("lookup"), + _ => None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use metrics_util::debugging::DebuggingRecorder; + + #[test] + fn reportable_api_keys_return_label() { + assert_eq!(api_key_label(ApiKey::ProduceLog), Some("produce_log")); + assert_eq!(api_key_label(ApiKey::FetchLog), Some("fetch_log")); + assert_eq!(api_key_label(ApiKey::PutKv), Some("put_kv")); + assert_eq!(api_key_label(ApiKey::Lookup), Some("lookup")); + } + + #[test] + fn non_reportable_api_keys_return_none() { + assert_eq!(api_key_label(ApiKey::MetaData), None); + assert_eq!(api_key_label(ApiKey::CreateTable), None); + assert_eq!(api_key_label(ApiKey::Authenticate), None); + assert_eq!(api_key_label(ApiKey::ListDatabases), None); + assert_eq!(api_key_label(ApiKey::GetTable), None); + } + + #[test] + fn reportable_request_records_all_connection_metrics() { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + metrics::with_local_recorder(&recorder, || { + let label = api_key_label(ApiKey::ProduceLog).unwrap(); + + metrics::counter!(CLIENT_REQUESTS_TOTAL, LABEL_API_KEY => label).increment(1); + metrics::counter!(CLIENT_BYTES_SENT_TOTAL, LABEL_API_KEY => label).increment(256); + metrics::gauge!(CLIENT_REQUESTS_IN_FLIGHT, LABEL_API_KEY => label).increment(1.0); + + metrics::counter!(CLIENT_RESPONSES_TOTAL, LABEL_API_KEY => label).increment(1); + metrics::counter!(CLIENT_BYTES_RECEIVED_TOTAL, LABEL_API_KEY => label).increment(128); + metrics::histogram!(CLIENT_REQUEST_LATENCY_MS, LABEL_API_KEY => label).record(42.5); + metrics::gauge!(CLIENT_REQUESTS_IN_FLIGHT, LABEL_API_KEY => label).decrement(1.0); + }); + + let snapshot = snapshotter.snapshot(); + let entries: Vec<_> = snapshot.into_vec(); + + let find_counter = |name: &str| -> Option { + entries.iter().find_map(|(key, _, _, val)| { + if key.key().name() == name { + match val { + metrics_util::debugging::DebugValue::Counter(v) => Some(*v), + _ => None, + } + } else { + None + } + }) + }; + + let find_histogram = |name: &str| -> Option> { + entries.iter().find_map(|(key, _, _, val)| { + if key.key().name() == name { + match val { + metrics_util::debugging::DebugValue::Histogram(v) => { + Some(v.iter().map(|f| f.into_inner()).collect()) + } + _ => None, + } + } else { + None + } + }) + }; + + assert_eq!(find_counter(CLIENT_REQUESTS_TOTAL), Some(1)); + assert_eq!(find_counter(CLIENT_RESPONSES_TOTAL), Some(1)); + assert_eq!(find_counter(CLIENT_BYTES_SENT_TOTAL), Some(256)); + assert_eq!(find_counter(CLIENT_BYTES_RECEIVED_TOTAL), Some(128)); + assert_eq!(find_histogram(CLIENT_REQUEST_LATENCY_MS), Some(vec![42.5])); + + let has_label = entries.iter().all(|(key, _, _, _)| { + key.key() + .labels() + .any(|l| l.key() == LABEL_API_KEY && l.value() == "produce_log") + }); + assert!(has_label, "all metrics must carry the api_key label"); + } + + #[test] + fn non_reportable_request_records_no_metrics() { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + metrics::with_local_recorder(&recorder, || { + let label = api_key_label(ApiKey::MetaData); + assert!(label.is_none()); + // When label is None, no metrics calls are made (matching request() logic). + }); + + let snapshot = snapshotter.snapshot(); + assert!( + snapshot.into_vec().is_empty(), + "non-reportable API keys must not produce metrics" + ); + } + + #[test] + fn inflight_gauge_nets_to_zero_after_balanced_calls() { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + metrics::with_local_recorder(&recorder, || { + let label = api_key_label(ApiKey::FetchLog).unwrap(); + + // Simulate 3 concurrent requests completing + for _ in 0..3 { + metrics::gauge!(CLIENT_REQUESTS_IN_FLIGHT, LABEL_API_KEY => label).increment(1.0); + } + for _ in 0..3 { + metrics::gauge!(CLIENT_REQUESTS_IN_FLIGHT, LABEL_API_KEY => label).decrement(1.0); + } + }); + + let snapshot = snapshotter.snapshot(); + for (key, _, _, val) in snapshot.into_vec() { + if key.key().name() == CLIENT_REQUESTS_IN_FLIGHT { + match val { + metrics_util::debugging::DebugValue::Gauge(g) => { + let value: f64 = g.into_inner(); + assert!( + value == 0.0, + "in-flight gauge should be 0 after balanced inc/dec, got: {value}" + ); + } + other => panic!("expected Gauge, got {other:?}"), + } + } + } + } + + #[test] + fn different_api_keys_produce_separate_metric_series() { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + metrics::with_local_recorder(&recorder, || { + let produce_label = api_key_label(ApiKey::ProduceLog).unwrap(); + let fetch_label = api_key_label(ApiKey::FetchLog).unwrap(); + + metrics::counter!(CLIENT_REQUESTS_TOTAL, LABEL_API_KEY => produce_label).increment(5); + metrics::counter!(CLIENT_REQUESTS_TOTAL, LABEL_API_KEY => fetch_label).increment(3); + }); + + let snapshot = snapshotter.snapshot(); + let entries: Vec<_> = snapshot.into_vec(); + + let request_entries: Vec<_> = entries + .iter() + .filter(|(key, _, _, _)| key.key().name() == CLIENT_REQUESTS_TOTAL) + .collect(); + + assert_eq!( + request_entries.len(), + 2, + "produce_log and fetch_log should be separate metric series" + ); + } +} diff --git a/crates/fluss/src/rpc/mod.rs b/crates/fluss/src/rpc/mod.rs index 86e13b1c..6f3a88d1 100644 --- a/crates/fluss/src/rpc/mod.rs +++ b/crates/fluss/src/rpc/mod.rs @@ -16,6 +16,7 @@ // under the License. mod api_key; +pub(crate) use api_key::ApiKey; mod api_version; pub mod error; mod fluss_api_error; diff --git a/crates/fluss/src/rpc/server_connection.rs b/crates/fluss/src/rpc/server_connection.rs index 13c5d9ca..beb8c54f 100644 --- a/crates/fluss/src/rpc/server_connection.rs +++ b/crates/fluss/src/rpc/server_connection.rs @@ -351,6 +351,31 @@ where R: RequestBody + Send + WriteVersionedType>, R::ResponseBody: ReadVersionedType>>, { + let api_label = crate::metrics::api_key_label(R::API_KEY); + let start = std::time::Instant::now(); + let record_completion_metrics = |label: &'static str, response_bytes: u64| { + metrics::counter!( + crate::metrics::CLIENT_RESPONSES_TOTAL, + crate::metrics::LABEL_API_KEY => label + ) + .increment(1); + metrics::counter!( + crate::metrics::CLIENT_BYTES_RECEIVED_TOTAL, + crate::metrics::LABEL_API_KEY => label + ) + .increment(response_bytes); + metrics::gauge!( + crate::metrics::CLIENT_REQUESTS_IN_FLIGHT, + crate::metrics::LABEL_API_KEY => label + ) + .decrement(1.0); + metrics::histogram!( + crate::metrics::CLIENT_REQUEST_LATENCY_MS, + crate::metrics::LABEL_API_KEY => label + ) + .record(start.elapsed().as_secs_f64() * 1000.0); + }; + let request_id = self.request_id.fetch_add(1, Ordering::SeqCst) & 0x7FFFFFFF; let header = RequestHeader { request_api_key: R::API_KEY, @@ -386,12 +411,76 @@ where ConnectionState::Poison(e) => return Err(RpcError::Poisoned(Arc::clone(e)).into()), } - self.send_message(buf).await?; + // Guard to decrement in-flight on cancellation, panic, or any exit without + // explicitly calling record_completion_metrics. Prevents gauge drift when + // the request future is dropped (e.g. tokio::select! timeout). + let mut in_flight_guard = if let Some(label) = api_label { + metrics::counter!( + crate::metrics::CLIENT_REQUESTS_TOTAL, + crate::metrics::LABEL_API_KEY => label + ) + .increment(1); + metrics::counter!( + crate::metrics::CLIENT_BYTES_SENT_TOTAL, + crate::metrics::LABEL_API_KEY => label + ) + .increment(buf.len() as u64); + metrics::gauge!( + crate::metrics::CLIENT_REQUESTS_IN_FLIGHT, + crate::metrics::LABEL_API_KEY => label + ) + .increment(1.0); + Some(scopeguard::guard(label, |l| { + metrics::gauge!( + crate::metrics::CLIENT_REQUESTS_IN_FLIGHT, + crate::metrics::LABEL_API_KEY => l + ) + .decrement(1.0); + })) + } else { + None + }; + + let mut disarm_in_flight_guard = || { + if let Some(guard) = in_flight_guard.take() { + let _ = scopeguard::ScopeGuard::into_inner(guard); + } + }; + + if let Err(e) = self.send_message(buf).await { + if let Some(label) = api_label { + disarm_in_flight_guard(); + record_completion_metrics(label, 0); + } + return Err(e.into()); + } _cleanup_on_cancel.message_sent(); - let mut response = rx.await.map_err(|e| Error::UnexpectedError { - message: "Got recvError, some one close the channel".to_string(), - source: Some(Box::new(e)), - })??; + let mut response = match rx.await { + Ok(Ok(response)) => response, + Ok(Err(e)) => { + if let Some(label) = api_label { + disarm_in_flight_guard(); + record_completion_metrics(label, 0); + } + return Err(e.into()); + } + Err(e) => { + if let Some(label) = api_label { + disarm_in_flight_guard(); + record_completion_metrics(label, 0); + } + return Err(Error::UnexpectedError { + message: "Got recvError, some one close the channel".to_string(), + source: Some(Box::new(e)), + }); + } + }; + + let response_bytes = response.data.get_ref().len() as u64; + if let Some(label) = api_label { + disarm_in_flight_guard(); + record_completion_metrics(label, response_bytes); + } if let Some(error_response) = response.header.error_response { return Err(Error::FlussAPIError { @@ -561,3 +650,594 @@ impl Drop for CleanupRequestStateOnCancel { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::error::Error; + use crate::rpc::ApiKey; + use crate::rpc::api_version::ApiVersion; + use crate::rpc::frame::{ReadError, WriteError}; + use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; + use metrics_util::debugging::DebuggingRecorder; + use std::sync::OnceLock; + use tokio::io::{AsyncReadExt, AsyncWriteExt, BufStream}; + use tokio::sync::Mutex as AsyncMutex; + + // -- Test-only request/response types -------------------------------- + + struct TestProduceRequest; + struct TestProduceResponse; + + impl RequestBody for TestProduceRequest { + type ResponseBody = TestProduceResponse; + const API_KEY: ApiKey = ApiKey::ProduceLog; + const REQUEST_VERSION: ApiVersion = ApiVersion(0); + } + + impl WriteVersionedType> for TestProduceRequest { + fn write_versioned(&self, _w: &mut Vec, _v: ApiVersion) -> Result<(), WriteError> { + Ok(()) + } + } + + impl ReadVersionedType>> for TestProduceResponse { + fn read_versioned(_r: &mut Cursor>, _v: ApiVersion) -> Result { + Ok(TestProduceResponse) + } + } + + struct TestMetadataRequest; + struct TestMetadataResponse; + + impl RequestBody for TestMetadataRequest { + type ResponseBody = TestMetadataResponse; + const API_KEY: ApiKey = ApiKey::MetaData; + const REQUEST_VERSION: ApiVersion = ApiVersion(0); + } + + impl WriteVersionedType> for TestMetadataRequest { + fn write_versioned(&self, _w: &mut Vec, _v: ApiVersion) -> Result<(), WriteError> { + Ok(()) + } + } + + impl ReadVersionedType>> for TestMetadataResponse { + fn read_versioned(_r: &mut Cursor>, _v: ApiVersion) -> Result { + Ok(TestMetadataResponse) + } + } + + // -- Mock server ----------------------------------------------------- + + /// Reads framed requests and echoes back minimal success responses. + async fn mock_echo_server(mut stream: tokio::io::DuplexStream) { + loop { + let mut len_buf = [0u8; 4]; + if stream.read_exact(&mut len_buf).await.is_err() { + return; + } + let len = i32::from_be_bytes(len_buf) as usize; + + let mut payload = vec![0u8; len]; + if stream.read_exact(&mut payload).await.is_err() { + return; + } + + // Header layout: api_key(2) + api_version(2) + request_id(4) + let request_id = i32::from_be_bytes([payload[4], payload[5], payload[6], payload[7]]); + + // Response: resp_type(1, 0=success) + request_id(4) + let mut resp = Vec::with_capacity(5); + resp.push(0u8); + resp.extend_from_slice(&request_id.to_be_bytes()); + + let resp_len = (resp.len() as i32).to_be_bytes(); + if stream.write_all(&resp_len).await.is_err() + || stream.write_all(&resp).await.is_err() + || stream.flush().await.is_err() + { + return; + } + } + } + + /// Reads framed requests and echoes back error responses (resp_type=1). + async fn mock_error_server(mut stream: tokio::io::DuplexStream) { + use prost::Message; + + loop { + let mut len_buf = [0u8; 4]; + if stream.read_exact(&mut len_buf).await.is_err() { + return; + } + let len = i32::from_be_bytes(len_buf) as usize; + + let mut payload = vec![0u8; len]; + if stream.read_exact(&mut payload).await.is_err() { + return; + } + + let request_id = i32::from_be_bytes([payload[4], payload[5], payload[6], payload[7]]); + + let err = crate::proto::ErrorResponse { + error_code: 1, + error_message: Some("test error".to_string()), + }; + let mut err_buf = Vec::new(); + err.encode(&mut err_buf).expect("ErrorResponse encode"); + + let mut resp = Vec::with_capacity(5 + err_buf.len()); + resp.push(1u8); // ERROR_RESPONSE + resp.extend_from_slice(&request_id.to_be_bytes()); + resp.extend(err_buf); + + let resp_len = (resp.len() as i32).to_be_bytes(); + if stream.write_all(&resp_len).await.is_err() + || stream.write_all(&resp).await.is_err() + || stream.flush().await.is_err() + { + return; + } + } + } + + // -- Recorder setup -------------------------------------------------- + + /// Shared test recorder (installed once per test binary). + static TEST_SNAPSHOTTER: OnceLock = OnceLock::new(); + static TEST_LOCK: OnceLock> = OnceLock::new(); + + fn test_snapshotter() -> &'static metrics_util::debugging::Snapshotter { + TEST_SNAPSHOTTER.get_or_init(|| { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + recorder + .install() + .expect("debugging recorder install should succeed in this test binary"); + snapshotter + }) + } + + fn test_lock() -> &'static AsyncMutex<()> { + TEST_LOCK.get_or_init(|| AsyncMutex::new(())) + } + + // -- Tests ----------------------------------------------------------- + + #[tokio::test] + async fn request_records_metrics_for_reportable_api_key() { + let _test_guard = test_lock().lock().await; + let snapshotter = test_snapshotter(); + + let (client, server) = tokio::io::duplex(4096); + tokio::spawn(mock_echo_server(server)); + + let conn = ServerConnectionInner::new(BufStream::new(client), usize::MAX, Arc::from("t")); + + let before: Vec<_> = snapshotter.snapshot().into_vec(); + let request_before = before + .iter() + .find_map(|(key, _, _, value)| { + let has_label = key.key().labels().any(|l| { + l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" + }); + if key.key().name() != crate::metrics::CLIENT_REQUESTS_TOTAL || !has_label { + return None; + } + match value { + metrics_util::debugging::DebugValue::Counter(v) => Some(*v), + _ => None, + } + }) + .unwrap_or(0); + let response_before = before + .iter() + .find_map(|(key, _, _, value)| { + let has_label = key.key().labels().any(|l| { + l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" + }); + if key.key().name() != crate::metrics::CLIENT_RESPONSES_TOTAL || !has_label { + return None; + } + match value { + metrics_util::debugging::DebugValue::Counter(v) => Some(*v), + _ => None, + } + }) + .unwrap_or(0); + + conn.request(TestProduceRequest).await.unwrap(); + + let after: Vec<_> = snapshotter.snapshot().into_vec(); + let request_after = after + .iter() + .find_map(|(key, _, _, value)| { + let has_label = key.key().labels().any(|l| { + l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" + }); + if key.key().name() != crate::metrics::CLIENT_REQUESTS_TOTAL || !has_label { + return None; + } + match value { + metrics_util::debugging::DebugValue::Counter(v) => Some(*v), + _ => None, + } + }) + .unwrap_or(0); + let response_after = after + .iter() + .find_map(|(key, _, _, value)| { + let has_label = key.key().labels().any(|l| { + l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" + }); + if key.key().name() != crate::metrics::CLIENT_RESPONSES_TOTAL || !has_label { + return None; + } + match value { + metrics_util::debugging::DebugValue::Counter(v) => Some(*v), + _ => None, + } + }) + .unwrap_or(0); + assert_eq!( + request_after - request_before, + 1, + "produce_log request counter should increment by 1" + ); + assert_eq!( + response_after - response_before, + 1, + "produce_log completion counter should increment by 1" + ); + + let has_latency_sample = after.iter().any(|(key, _, _, value)| { + key.key().name() == crate::metrics::CLIENT_REQUEST_LATENCY_MS + && key + .key() + .labels() + .any(|l| l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log") + && matches!(value, metrics_util::debugging::DebugValue::Histogram(_)) + }); + assert!( + has_latency_sample, + "request latency histogram should be recorded for produce_log" + ); + } + + #[tokio::test] + async fn request_skips_metrics_for_non_reportable_api_key() { + let _test_guard = test_lock().lock().await; + let snapshotter = test_snapshotter(); + + let (client, server) = tokio::io::duplex(4096); + tokio::spawn(mock_echo_server(server)); + + let conn = ServerConnectionInner::new(BufStream::new(client), usize::MAX, Arc::from("t")); + let before: Vec<_> = snapshotter.snapshot().into_vec(); + let request_sum_before: u64 = before + .iter() + .filter_map(|(key, _, _, value)| { + if key.key().name() != crate::metrics::CLIENT_REQUESTS_TOTAL { + return None; + } + match value { + metrics_util::debugging::DebugValue::Counter(v) => Some(*v), + _ => None, + } + }) + .sum(); + let response_sum_before: u64 = before + .iter() + .filter_map(|(key, _, _, value)| { + if key.key().name() != crate::metrics::CLIENT_RESPONSES_TOTAL { + return None; + } + match value { + metrics_util::debugging::DebugValue::Counter(v) => Some(*v), + _ => None, + } + }) + .sum(); + + conn.request(TestMetadataRequest).await.unwrap(); + + let snapshot: Vec<_> = snapshotter.snapshot().into_vec(); + let request_sum_after: u64 = snapshot + .iter() + .filter_map(|(key, _, _, value)| { + if key.key().name() != crate::metrics::CLIENT_REQUESTS_TOTAL { + return None; + } + match value { + metrics_util::debugging::DebugValue::Counter(v) => Some(*v), + _ => None, + } + }) + .sum(); + let response_sum_after: u64 = snapshot + .iter() + .filter_map(|(key, _, _, value)| { + if key.key().name() != crate::metrics::CLIENT_RESPONSES_TOTAL { + return None; + } + match value { + metrics_util::debugging::DebugValue::Counter(v) => Some(*v), + _ => None, + } + }) + .sum(); + assert_eq!( + request_sum_after, request_sum_before, + "non-reportable API keys must not change request counters" + ); + assert_eq!( + response_sum_after, response_sum_before, + "non-reportable API keys must not change response counters" + ); + + // No metric entry should carry a non-reportable API key label. + let non_reportable = snapshot.iter().any(|(key, _, _, _)| { + key.key() + .labels() + .any(|l| l.key() == crate::metrics::LABEL_API_KEY && l.value() == "metadata") + }); + assert!( + !non_reportable, + "non-reportable API keys must not appear in metrics" + ); + } + + #[tokio::test] + async fn request_records_completion_metrics_when_send_fails() { + let _test_guard = test_lock().lock().await; + let snapshotter = test_snapshotter(); + + let (client, server) = tokio::io::duplex(64); + drop(server); // force write failure on request path + let conn = ServerConnectionInner::new(BufStream::new(client), usize::MAX, Arc::from("t")); + + let before: Vec<_> = snapshotter.snapshot().into_vec(); + let request_before = before + .iter() + .find_map(|(key, _, _, value)| { + let has_label = key.key().labels().any(|l| { + l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" + }); + if key.key().name() != crate::metrics::CLIENT_REQUESTS_TOTAL || !has_label { + return None; + } + match value { + metrics_util::debugging::DebugValue::Counter(v) => Some(*v), + _ => None, + } + }) + .unwrap_or(0); + let response_before = before + .iter() + .find_map(|(key, _, _, value)| { + let has_label = key.key().labels().any(|l| { + l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" + }); + if key.key().name() != crate::metrics::CLIENT_RESPONSES_TOTAL || !has_label { + return None; + } + match value { + metrics_util::debugging::DebugValue::Counter(v) => Some(*v), + _ => None, + } + }) + .unwrap_or(0); + let bytes_received_before = before + .iter() + .find_map(|(key, _, _, value)| { + let has_label = key.key().labels().any(|l| { + l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" + }); + if key.key().name() != crate::metrics::CLIENT_BYTES_RECEIVED_TOTAL || !has_label { + return None; + } + match value { + metrics_util::debugging::DebugValue::Counter(v) => Some(*v), + _ => None, + } + }) + .unwrap_or(0); + let result = conn.request(TestProduceRequest).await; + assert!( + result.is_err(), + "request should fail when transport is closed" + ); + let after: Vec<_> = snapshotter.snapshot().into_vec(); + let request_after = after + .iter() + .find_map(|(key, _, _, value)| { + let has_label = key.key().labels().any(|l| { + l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" + }); + if key.key().name() != crate::metrics::CLIENT_REQUESTS_TOTAL || !has_label { + return None; + } + match value { + metrics_util::debugging::DebugValue::Counter(v) => Some(*v), + _ => None, + } + }) + .unwrap_or(0); + let response_after = after + .iter() + .find_map(|(key, _, _, value)| { + let has_label = key.key().labels().any(|l| { + l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" + }); + if key.key().name() != crate::metrics::CLIENT_RESPONSES_TOTAL || !has_label { + return None; + } + match value { + metrics_util::debugging::DebugValue::Counter(v) => Some(*v), + _ => None, + } + }) + .unwrap_or(0); + let bytes_received_after = after + .iter() + .find_map(|(key, _, _, value)| { + let has_label = key.key().labels().any(|l| { + l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" + }); + if key.key().name() != crate::metrics::CLIENT_BYTES_RECEIVED_TOTAL || !has_label { + return None; + } + match value { + metrics_util::debugging::DebugValue::Counter(v) => Some(*v), + _ => None, + } + }) + .unwrap_or(0); + let inflight_after = after + .iter() + .find_map(|(key, _, _, value)| { + let has_label = key.key().labels().any(|l| { + l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" + }); + if key.key().name() != crate::metrics::CLIENT_REQUESTS_IN_FLIGHT || !has_label { + return None; + } + match value { + metrics_util::debugging::DebugValue::Gauge(v) => Some(v.into_inner()), + _ => None, + } + }) + .unwrap_or(0.0); + + assert_eq!( + request_after - request_before, + 1, + "failed request should still count as request" + ); + assert_eq!( + response_after - response_before, + 1, + "failed request should still count as a completion like Java ConnectionMetrics" + ); + assert_eq!( + bytes_received_after - bytes_received_before, + 0, + "failed send should record zero received bytes" + ); + assert_eq!( + inflight_after, 0.0, + "in-flight gauge must return to zero after failure" + ); + } + + #[tokio::test] + async fn request_records_completion_metrics_when_server_returns_api_error() { + let _test_guard = test_lock().lock().await; + let snapshotter = test_snapshotter(); + + let (client, server) = tokio::io::duplex(4096); + tokio::spawn(mock_error_server(server)); + + let conn = ServerConnectionInner::new(BufStream::new(client), usize::MAX, Arc::from("t")); + + let before: Vec<_> = snapshotter.snapshot().into_vec(); + let response_before = before + .iter() + .find_map(|(key, _, _, value)| { + let has_label = key.key().labels().any(|l| { + l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" + }); + if key.key().name() != crate::metrics::CLIENT_RESPONSES_TOTAL || !has_label { + return None; + } + match value { + metrics_util::debugging::DebugValue::Counter(v) => Some(*v), + _ => None, + } + }) + .unwrap_or(0); + let bytes_received_before = before + .iter() + .find_map(|(key, _, _, value)| { + let has_label = key.key().labels().any(|l| { + l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" + }); + if key.key().name() != crate::metrics::CLIENT_BYTES_RECEIVED_TOTAL || !has_label { + return None; + } + match value { + metrics_util::debugging::DebugValue::Counter(v) => Some(*v), + _ => None, + } + }) + .unwrap_or(0); + + let result = conn.request(TestProduceRequest).await; + assert!( + matches!(result, Err(Error::FlussAPIError { .. })), + "request should fail with FlussAPIError when server returns error_response" + ); + + let after: Vec<_> = snapshotter.snapshot().into_vec(); + let response_after = after + .iter() + .find_map(|(key, _, _, value)| { + let has_label = key.key().labels().any(|l| { + l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" + }); + if key.key().name() != crate::metrics::CLIENT_RESPONSES_TOTAL || !has_label { + return None; + } + match value { + metrics_util::debugging::DebugValue::Counter(v) => Some(*v), + _ => None, + } + }) + .unwrap_or(0); + let bytes_received_after = after + .iter() + .find_map(|(key, _, _, value)| { + let has_label = key.key().labels().any(|l| { + l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" + }); + if key.key().name() != crate::metrics::CLIENT_BYTES_RECEIVED_TOTAL || !has_label { + return None; + } + match value { + metrics_util::debugging::DebugValue::Counter(v) => Some(*v), + _ => None, + } + }) + .unwrap_or(0); + let inflight_after = after + .iter() + .find_map(|(key, _, _, value)| { + let has_label = key.key().labels().any(|l| { + l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" + }); + if key.key().name() != crate::metrics::CLIENT_REQUESTS_IN_FLIGHT || !has_label { + return None; + } + match value { + metrics_util::debugging::DebugValue::Gauge(v) => Some(v.into_inner()), + _ => None, + } + }) + .unwrap_or(0.0); + + assert_eq!( + response_after - response_before, + 1, + "API error response should count as completion like Java" + ); + assert!( + bytes_received_after > bytes_received_before, + "API error response should record received bytes (error body)" + ); + assert_eq!( + inflight_after, 0.0, + "in-flight gauge must return to zero after API error" + ); + } +} From 0345c69ff30a877424bb8fcc63e3276140bdf856 Mon Sep 17 00:00:00 2001 From: charlesdong1991 Date: Wed, 4 Mar 2026 21:31:38 +0100 Subject: [PATCH 2/4] address review feedback --- crates/fluss/src/rpc/server_connection.rs | 549 +++++++--------------- 1 file changed, 181 insertions(+), 368 deletions(-) diff --git a/crates/fluss/src/rpc/server_connection.rs b/crates/fluss/src/rpc/server_connection.rs index beb8c54f..87cd41f8 100644 --- a/crates/fluss/src/rpc/server_connection.rs +++ b/crates/fluss/src/rpc/server_connection.rs @@ -35,7 +35,7 @@ use std::ops::DerefMut; use std::sync::Arc; use std::sync::atomic::{AtomicI32, Ordering}; use std::task::Poll; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufStream, WriteHalf}; use tokio::sync::Mutex as AsyncMutex; use tokio::sync::oneshot::{Sender, channel}; @@ -217,6 +217,89 @@ struct ActiveRequest { channel: Sender>, } +/// Tracks per-request connection metrics and ensures in-flight gauge cleanup on drop. +struct RequestMetricsLifecycle { + label: Option<&'static str>, + start: Instant, + completed: bool, +} + +impl RequestMetricsLifecycle { + fn begin(api_key: crate::rpc::ApiKey, request_bytes: u64) -> Self { + let label = crate::metrics::api_key_label(api_key); + if let Some(label) = label { + // Match Java semantics: count request attempts before write/send. + metrics::counter!( + crate::metrics::CLIENT_REQUESTS_TOTAL, + crate::metrics::LABEL_API_KEY => label + ) + .increment(1); + metrics::counter!( + crate::metrics::CLIENT_BYTES_SENT_TOTAL, + crate::metrics::LABEL_API_KEY => label + ) + .increment(request_bytes); + metrics::gauge!( + crate::metrics::CLIENT_REQUESTS_IN_FLIGHT, + crate::metrics::LABEL_API_KEY => label + ) + .increment(1.0); + } + Self { + label, + start: Instant::now(), + completed: false, + } + } + + fn complete(&mut self, response_bytes: u64) { + let Some(label) = self.label else { + return; + }; + if self.completed { + return; + } + + metrics::counter!( + crate::metrics::CLIENT_RESPONSES_TOTAL, + crate::metrics::LABEL_API_KEY => label + ) + .increment(1); + metrics::counter!( + crate::metrics::CLIENT_BYTES_RECEIVED_TOTAL, + crate::metrics::LABEL_API_KEY => label + ) + .increment(response_bytes); + metrics::gauge!( + crate::metrics::CLIENT_REQUESTS_IN_FLIGHT, + crate::metrics::LABEL_API_KEY => label + ) + .decrement(1.0); + metrics::histogram!( + crate::metrics::CLIENT_REQUEST_LATENCY_MS, + crate::metrics::LABEL_API_KEY => label + ) + .record(self.start.elapsed().as_secs_f64() * 1000.0); + self.completed = true; + } +} + +impl Drop for RequestMetricsLifecycle { + fn drop(&mut self) { + if self.completed { + return; + } + if let Some(label) = self.label { + metrics::gauge!( + crate::metrics::CLIENT_REQUESTS_IN_FLIGHT, + crate::metrics::LABEL_API_KEY => label + ) + .decrement(1.0); + self.completed = true; + } + } +} + #[derive(Debug)] enum ConnectionState { /// Currently active requests by request ID. @@ -351,31 +434,6 @@ where R: RequestBody + Send + WriteVersionedType>, R::ResponseBody: ReadVersionedType>>, { - let api_label = crate::metrics::api_key_label(R::API_KEY); - let start = std::time::Instant::now(); - let record_completion_metrics = |label: &'static str, response_bytes: u64| { - metrics::counter!( - crate::metrics::CLIENT_RESPONSES_TOTAL, - crate::metrics::LABEL_API_KEY => label - ) - .increment(1); - metrics::counter!( - crate::metrics::CLIENT_BYTES_RECEIVED_TOTAL, - crate::metrics::LABEL_API_KEY => label - ) - .increment(response_bytes); - metrics::gauge!( - crate::metrics::CLIENT_REQUESTS_IN_FLIGHT, - crate::metrics::LABEL_API_KEY => label - ) - .decrement(1.0); - metrics::histogram!( - crate::metrics::CLIENT_REQUEST_LATENCY_MS, - crate::metrics::LABEL_API_KEY => label - ) - .record(start.elapsed().as_secs_f64() * 1000.0); - }; - let request_id = self.request_id.fetch_add(1, Ordering::SeqCst) & 0x7FFFFFFF; let header = RequestHeader { request_api_key: R::API_KEY, @@ -411,64 +469,21 @@ where ConnectionState::Poison(e) => return Err(RpcError::Poisoned(Arc::clone(e)).into()), } - // Guard to decrement in-flight on cancellation, panic, or any exit without - // explicitly calling record_completion_metrics. Prevents gauge drift when - // the request future is dropped (e.g. tokio::select! timeout). - let mut in_flight_guard = if let Some(label) = api_label { - metrics::counter!( - crate::metrics::CLIENT_REQUESTS_TOTAL, - crate::metrics::LABEL_API_KEY => label - ) - .increment(1); - metrics::counter!( - crate::metrics::CLIENT_BYTES_SENT_TOTAL, - crate::metrics::LABEL_API_KEY => label - ) - .increment(buf.len() as u64); - metrics::gauge!( - crate::metrics::CLIENT_REQUESTS_IN_FLIGHT, - crate::metrics::LABEL_API_KEY => label - ) - .increment(1.0); - Some(scopeguard::guard(label, |l| { - metrics::gauge!( - crate::metrics::CLIENT_REQUESTS_IN_FLIGHT, - crate::metrics::LABEL_API_KEY => l - ) - .decrement(1.0); - })) - } else { - None - }; - - let mut disarm_in_flight_guard = || { - if let Some(guard) = in_flight_guard.take() { - let _ = scopeguard::ScopeGuard::into_inner(guard); - } - }; + let mut request_metrics = RequestMetricsLifecycle::begin(R::API_KEY, buf.len() as u64); if let Err(e) = self.send_message(buf).await { - if let Some(label) = api_label { - disarm_in_flight_guard(); - record_completion_metrics(label, 0); - } + request_metrics.complete(0); return Err(e.into()); } _cleanup_on_cancel.message_sent(); let mut response = match rx.await { Ok(Ok(response)) => response, Ok(Err(e)) => { - if let Some(label) = api_label { - disarm_in_flight_guard(); - record_completion_metrics(label, 0); - } + request_metrics.complete(0); return Err(e.into()); } Err(e) => { - if let Some(label) = api_label { - disarm_in_flight_guard(); - record_completion_metrics(label, 0); - } + request_metrics.complete(0); return Err(Error::UnexpectedError { message: "Got recvError, some one close the channel".to_string(), source: Some(Box::new(e)), @@ -477,10 +492,7 @@ where }; let response_bytes = response.data.get_ref().len() as u64; - if let Some(label) = api_label { - disarm_in_flight_guard(); - record_completion_metrics(label, response_bytes); - } + request_metrics.complete(response_bytes); if let Some(error_response) = response.header.error_response { return Err(Error::FlussAPIError { @@ -659,7 +671,9 @@ mod tests { use crate::rpc::api_version::ApiVersion; use crate::rpc::frame::{ReadError, WriteError}; use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; - use metrics_util::debugging::DebuggingRecorder; + use metrics::{SharedString, Unit}; + use metrics_util::CompositeKey; + use metrics_util::debugging::{DebugValue, DebuggingRecorder}; use std::sync::OnceLock; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufStream}; use tokio::sync::Mutex as AsyncMutex; @@ -803,83 +817,89 @@ mod tests { TEST_LOCK.get_or_init(|| AsyncMutex::new(())) } - // -- Tests ----------------------------------------------------------- - - #[tokio::test] - async fn request_records_metrics_for_reportable_api_key() { - let _test_guard = test_lock().lock().await; - let snapshotter = test_snapshotter(); - - let (client, server) = tokio::io::duplex(4096); - tokio::spawn(mock_echo_server(server)); - - let conn = ServerConnectionInner::new(BufStream::new(client), usize::MAX, Arc::from("t")); + type SnapshotEntry = ( + CompositeKey, + Option, + Option, + DebugValue, + ); + + fn has_api_label(key: &CompositeKey, label: &str) -> bool { + key.key() + .labels() + .any(|l| l.key() == crate::metrics::LABEL_API_KEY && l.value() == label) + } - let before: Vec<_> = snapshotter.snapshot().into_vec(); - let request_before = before + fn counter_for_label(entries: &[SnapshotEntry], metric_name: &str, label: &str) -> u64 { + entries .iter() .find_map(|(key, _, _, value)| { - let has_label = key.key().labels().any(|l| { - l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" - }); - if key.key().name() != crate::metrics::CLIENT_REQUESTS_TOTAL || !has_label { + if key.key().name() != metric_name || !has_api_label(key, label) { return None; } match value { - metrics_util::debugging::DebugValue::Counter(v) => Some(*v), + DebugValue::Counter(v) => Some(*v), _ => None, } }) - .unwrap_or(0); - let response_before = before - .iter() - .find_map(|(key, _, _, value)| { - let has_label = key.key().labels().any(|l| { - l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" - }); - if key.key().name() != crate::metrics::CLIENT_RESPONSES_TOTAL || !has_label { - return None; - } - match value { - metrics_util::debugging::DebugValue::Counter(v) => Some(*v), - _ => None, - } - }) - .unwrap_or(0); - - conn.request(TestProduceRequest).await.unwrap(); + .unwrap_or(0) + } - let after: Vec<_> = snapshotter.snapshot().into_vec(); - let request_after = after + fn gauge_for_label(entries: &[SnapshotEntry], metric_name: &str, label: &str) -> f64 { + entries .iter() .find_map(|(key, _, _, value)| { - let has_label = key.key().labels().any(|l| { - l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" - }); - if key.key().name() != crate::metrics::CLIENT_REQUESTS_TOTAL || !has_label { + if key.key().name() != metric_name || !has_api_label(key, label) { return None; } match value { - metrics_util::debugging::DebugValue::Counter(v) => Some(*v), + DebugValue::Gauge(v) => Some(v.into_inner()), _ => None, } }) - .unwrap_or(0); - let response_after = after + .unwrap_or(0.0) + } + + fn counter_sum(entries: &[SnapshotEntry], metric_name: &str) -> u64 { + entries .iter() - .find_map(|(key, _, _, value)| { - let has_label = key.key().labels().any(|l| { - l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" - }); - if key.key().name() != crate::metrics::CLIENT_RESPONSES_TOTAL || !has_label { + .filter_map(|(key, _, _, value)| { + if key.key().name() != metric_name { return None; } match value { - metrics_util::debugging::DebugValue::Counter(v) => Some(*v), + DebugValue::Counter(v) => Some(*v), _ => None, } }) - .unwrap_or(0); + .sum() + } + + // -- Tests ----------------------------------------------------------- + + #[tokio::test] + async fn request_records_metrics_for_reportable_api_key() { + let _test_guard = test_lock().lock().await; + let snapshotter = test_snapshotter(); + + let (client, server) = tokio::io::duplex(4096); + tokio::spawn(mock_echo_server(server)); + + let conn = ServerConnectionInner::new(BufStream::new(client), usize::MAX, Arc::from("t")); + + let before: Vec<_> = snapshotter.snapshot().into_vec(); + let request_before = + counter_for_label(&before, crate::metrics::CLIENT_REQUESTS_TOTAL, "produce_log"); + let response_before = + counter_for_label(&before, crate::metrics::CLIENT_RESPONSES_TOTAL, "produce_log"); + + conn.request(TestProduceRequest).await.unwrap(); + + let after: Vec<_> = snapshotter.snapshot().into_vec(); + let request_after = + counter_for_label(&after, crate::metrics::CLIENT_REQUESTS_TOTAL, "produce_log"); + let response_after = + counter_for_label(&after, crate::metrics::CLIENT_RESPONSES_TOTAL, "produce_log"); assert_eq!( request_after - request_before, 1, @@ -893,11 +913,8 @@ mod tests { let has_latency_sample = after.iter().any(|(key, _, _, value)| { key.key().name() == crate::metrics::CLIENT_REQUEST_LATENCY_MS - && key - .key() - .labels() - .any(|l| l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log") - && matches!(value, metrics_util::debugging::DebugValue::Histogram(_)) + && has_api_label(key, "produce_log") + && matches!(value, DebugValue::Histogram(_)) }); assert!( has_latency_sample, @@ -915,58 +932,14 @@ mod tests { let conn = ServerConnectionInner::new(BufStream::new(client), usize::MAX, Arc::from("t")); let before: Vec<_> = snapshotter.snapshot().into_vec(); - let request_sum_before: u64 = before - .iter() - .filter_map(|(key, _, _, value)| { - if key.key().name() != crate::metrics::CLIENT_REQUESTS_TOTAL { - return None; - } - match value { - metrics_util::debugging::DebugValue::Counter(v) => Some(*v), - _ => None, - } - }) - .sum(); - let response_sum_before: u64 = before - .iter() - .filter_map(|(key, _, _, value)| { - if key.key().name() != crate::metrics::CLIENT_RESPONSES_TOTAL { - return None; - } - match value { - metrics_util::debugging::DebugValue::Counter(v) => Some(*v), - _ => None, - } - }) - .sum(); + let request_sum_before = counter_sum(&before, crate::metrics::CLIENT_REQUESTS_TOTAL); + let response_sum_before = counter_sum(&before, crate::metrics::CLIENT_RESPONSES_TOTAL); conn.request(TestMetadataRequest).await.unwrap(); let snapshot: Vec<_> = snapshotter.snapshot().into_vec(); - let request_sum_after: u64 = snapshot - .iter() - .filter_map(|(key, _, _, value)| { - if key.key().name() != crate::metrics::CLIENT_REQUESTS_TOTAL { - return None; - } - match value { - metrics_util::debugging::DebugValue::Counter(v) => Some(*v), - _ => None, - } - }) - .sum(); - let response_sum_after: u64 = snapshot - .iter() - .filter_map(|(key, _, _, value)| { - if key.key().name() != crate::metrics::CLIENT_RESPONSES_TOTAL { - return None; - } - match value { - metrics_util::debugging::DebugValue::Counter(v) => Some(*v), - _ => None, - } - }) - .sum(); + let request_sum_after = counter_sum(&snapshot, crate::metrics::CLIENT_REQUESTS_TOTAL); + let response_sum_after = counter_sum(&snapshot, crate::metrics::CLIENT_RESPONSES_TOTAL); assert_eq!( request_sum_after, request_sum_before, "non-reportable API keys must not change request counters" @@ -977,11 +950,7 @@ mod tests { ); // No metric entry should carry a non-reportable API key label. - let non_reportable = snapshot.iter().any(|(key, _, _, _)| { - key.key() - .labels() - .any(|l| l.key() == crate::metrics::LABEL_API_KEY && l.value() == "metadata") - }); + let non_reportable = snapshot.iter().any(|(key, _, _, _)| has_api_label(key, "metadata")); assert!( !non_reportable, "non-reportable API keys must not appear in metrics" @@ -998,117 +967,26 @@ mod tests { let conn = ServerConnectionInner::new(BufStream::new(client), usize::MAX, Arc::from("t")); let before: Vec<_> = snapshotter.snapshot().into_vec(); - let request_before = before - .iter() - .find_map(|(key, _, _, value)| { - let has_label = key.key().labels().any(|l| { - l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" - }); - if key.key().name() != crate::metrics::CLIENT_REQUESTS_TOTAL || !has_label { - return None; - } - match value { - metrics_util::debugging::DebugValue::Counter(v) => Some(*v), - _ => None, - } - }) - .unwrap_or(0); - let response_before = before - .iter() - .find_map(|(key, _, _, value)| { - let has_label = key.key().labels().any(|l| { - l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" - }); - if key.key().name() != crate::metrics::CLIENT_RESPONSES_TOTAL || !has_label { - return None; - } - match value { - metrics_util::debugging::DebugValue::Counter(v) => Some(*v), - _ => None, - } - }) - .unwrap_or(0); - let bytes_received_before = before - .iter() - .find_map(|(key, _, _, value)| { - let has_label = key.key().labels().any(|l| { - l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" - }); - if key.key().name() != crate::metrics::CLIENT_BYTES_RECEIVED_TOTAL || !has_label { - return None; - } - match value { - metrics_util::debugging::DebugValue::Counter(v) => Some(*v), - _ => None, - } - }) - .unwrap_or(0); + let request_before = + counter_for_label(&before, crate::metrics::CLIENT_REQUESTS_TOTAL, "produce_log"); + let response_before = + counter_for_label(&before, crate::metrics::CLIENT_RESPONSES_TOTAL, "produce_log"); + let bytes_received_before = + counter_for_label(&before, crate::metrics::CLIENT_BYTES_RECEIVED_TOTAL, "produce_log"); let result = conn.request(TestProduceRequest).await; assert!( result.is_err(), "request should fail when transport is closed" ); let after: Vec<_> = snapshotter.snapshot().into_vec(); - let request_after = after - .iter() - .find_map(|(key, _, _, value)| { - let has_label = key.key().labels().any(|l| { - l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" - }); - if key.key().name() != crate::metrics::CLIENT_REQUESTS_TOTAL || !has_label { - return None; - } - match value { - metrics_util::debugging::DebugValue::Counter(v) => Some(*v), - _ => None, - } - }) - .unwrap_or(0); - let response_after = after - .iter() - .find_map(|(key, _, _, value)| { - let has_label = key.key().labels().any(|l| { - l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" - }); - if key.key().name() != crate::metrics::CLIENT_RESPONSES_TOTAL || !has_label { - return None; - } - match value { - metrics_util::debugging::DebugValue::Counter(v) => Some(*v), - _ => None, - } - }) - .unwrap_or(0); - let bytes_received_after = after - .iter() - .find_map(|(key, _, _, value)| { - let has_label = key.key().labels().any(|l| { - l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" - }); - if key.key().name() != crate::metrics::CLIENT_BYTES_RECEIVED_TOTAL || !has_label { - return None; - } - match value { - metrics_util::debugging::DebugValue::Counter(v) => Some(*v), - _ => None, - } - }) - .unwrap_or(0); - let inflight_after = after - .iter() - .find_map(|(key, _, _, value)| { - let has_label = key.key().labels().any(|l| { - l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" - }); - if key.key().name() != crate::metrics::CLIENT_REQUESTS_IN_FLIGHT || !has_label { - return None; - } - match value { - metrics_util::debugging::DebugValue::Gauge(v) => Some(v.into_inner()), - _ => None, - } - }) - .unwrap_or(0.0); + let request_after = + counter_for_label(&after, crate::metrics::CLIENT_REQUESTS_TOTAL, "produce_log"); + let response_after = + counter_for_label(&after, crate::metrics::CLIENT_RESPONSES_TOTAL, "produce_log"); + let bytes_received_after = + counter_for_label(&after, crate::metrics::CLIENT_BYTES_RECEIVED_TOTAL, "produce_log"); + let inflight_after = + gauge_for_label(&after, crate::metrics::CLIENT_REQUESTS_IN_FLIGHT, "produce_log"); assert_eq!( request_after - request_before, @@ -1142,36 +1020,10 @@ mod tests { let conn = ServerConnectionInner::new(BufStream::new(client), usize::MAX, Arc::from("t")); let before: Vec<_> = snapshotter.snapshot().into_vec(); - let response_before = before - .iter() - .find_map(|(key, _, _, value)| { - let has_label = key.key().labels().any(|l| { - l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" - }); - if key.key().name() != crate::metrics::CLIENT_RESPONSES_TOTAL || !has_label { - return None; - } - match value { - metrics_util::debugging::DebugValue::Counter(v) => Some(*v), - _ => None, - } - }) - .unwrap_or(0); - let bytes_received_before = before - .iter() - .find_map(|(key, _, _, value)| { - let has_label = key.key().labels().any(|l| { - l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" - }); - if key.key().name() != crate::metrics::CLIENT_BYTES_RECEIVED_TOTAL || !has_label { - return None; - } - match value { - metrics_util::debugging::DebugValue::Counter(v) => Some(*v), - _ => None, - } - }) - .unwrap_or(0); + let response_before = + counter_for_label(&before, crate::metrics::CLIENT_RESPONSES_TOTAL, "produce_log"); + let bytes_received_before = + counter_for_label(&before, crate::metrics::CLIENT_BYTES_RECEIVED_TOTAL, "produce_log"); let result = conn.request(TestProduceRequest).await; assert!( @@ -1180,51 +1032,12 @@ mod tests { ); let after: Vec<_> = snapshotter.snapshot().into_vec(); - let response_after = after - .iter() - .find_map(|(key, _, _, value)| { - let has_label = key.key().labels().any(|l| { - l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" - }); - if key.key().name() != crate::metrics::CLIENT_RESPONSES_TOTAL || !has_label { - return None; - } - match value { - metrics_util::debugging::DebugValue::Counter(v) => Some(*v), - _ => None, - } - }) - .unwrap_or(0); - let bytes_received_after = after - .iter() - .find_map(|(key, _, _, value)| { - let has_label = key.key().labels().any(|l| { - l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" - }); - if key.key().name() != crate::metrics::CLIENT_BYTES_RECEIVED_TOTAL || !has_label { - return None; - } - match value { - metrics_util::debugging::DebugValue::Counter(v) => Some(*v), - _ => None, - } - }) - .unwrap_or(0); - let inflight_after = after - .iter() - .find_map(|(key, _, _, value)| { - let has_label = key.key().labels().any(|l| { - l.key() == crate::metrics::LABEL_API_KEY && l.value() == "produce_log" - }); - if key.key().name() != crate::metrics::CLIENT_REQUESTS_IN_FLIGHT || !has_label { - return None; - } - match value { - metrics_util::debugging::DebugValue::Gauge(v) => Some(v.into_inner()), - _ => None, - } - }) - .unwrap_or(0.0); + let response_after = + counter_for_label(&after, crate::metrics::CLIENT_RESPONSES_TOTAL, "produce_log"); + let bytes_received_after = + counter_for_label(&after, crate::metrics::CLIENT_BYTES_RECEIVED_TOTAL, "produce_log"); + let inflight_after = + gauge_for_label(&after, crate::metrics::CLIENT_REQUESTS_IN_FLIGHT, "produce_log"); assert_eq!( response_after - response_before, From 2c88f314400c4f510321f89e7d32e2b0be5ec491 Mon Sep 17 00:00:00 2001 From: charlesdong1991 Date: Thu, 5 Mar 2026 13:59:29 +0100 Subject: [PATCH 3/4] Update comment to reflect correct behaviour --- crates/fluss/src/metrics.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/crates/fluss/src/metrics.rs b/crates/fluss/src/metrics.rs index 3c51f5c8..1dee70fd 100644 --- a/crates/fluss/src/metrics.rs +++ b/crates/fluss/src/metrics.rs @@ -35,10 +35,11 @@ pub const LABEL_API_KEY: &str = "api_key"; // // Java reference: ConnectionMetrics.java, ClientMetricGroup.java, MetricNames.java // -// Note on bytes_received: Rust counts the response body length (after the -// ResponseHeader). Java uses ApiMessage.totalSize() which may include framing. -// Absolute values can differ slightly; the semantic (bytes received per -// response) is the same. +// Note on byte counting: Rust counts the full message buffer (header + body). +// Java uses ApiMessage.totalSize() which counts only the API message body +// (excluding both framing and the protocol header). +// Rust's byte counters will be slightly higher than Java's +// for the same messages // --------------------------------------------------------------------------- pub const CLIENT_REQUESTS_TOTAL: &str = "fluss.client.requests.total"; From 44c4474c8eb16690d81193d5214d3b228bd9380e Mon Sep 17 00:00:00 2001 From: charlesdong1991 Date: Thu, 5 Mar 2026 21:37:49 +0100 Subject: [PATCH 4/4] update response byte and request byte to exclude header and framing --- crates/fluss/src/metrics.rs | 10 +++++----- crates/fluss/src/rpc/message/header.rs | 3 +-- crates/fluss/src/rpc/server_connection.rs | 18 ++++++++++++------ 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/crates/fluss/src/metrics.rs b/crates/fluss/src/metrics.rs index 1dee70fd..928cbe54 100644 --- a/crates/fluss/src/metrics.rs +++ b/crates/fluss/src/metrics.rs @@ -35,11 +35,11 @@ pub const LABEL_API_KEY: &str = "api_key"; // // Java reference: ConnectionMetrics.java, ClientMetricGroup.java, MetricNames.java // -// Note on byte counting: Rust counts the full message buffer (header + body). -// Java uses ApiMessage.totalSize() which counts only the API message body -// (excluding both framing and the protocol header). -// Rust's byte counters will be slightly higher than Java's -// for the same messages +// Byte counting matches Java semantics: both sides count only the API message +// body, excluding the protocol header and framing. +// Java: rawRequest.totalSize() / response.totalSize() (see MessageCodec.java). +// Rust: buf.len() - REQUEST_HEADER_LENGTH for sent bytes, +// buffer.len() - cursor.position() for received bytes. // --------------------------------------------------------------------------- pub const CLIENT_REQUESTS_TOTAL: &str = "fluss.client.requests.total"; diff --git a/crates/fluss/src/rpc/message/header.rs b/crates/fluss/src/rpc/message/header.rs index 77bda7c7..2f5848aa 100644 --- a/crates/fluss/src/rpc/message/header.rs +++ b/crates/fluss/src/rpc/message/header.rs @@ -23,8 +23,7 @@ use crate::rpc::message::{ReadVersionedType, WriteVersionedType}; use bytes::{Buf, BufMut}; use prost::Message; -#[allow(dead_code)] -const REQUEST_HEADER_LENGTH: i32 = 8; +pub(crate) const REQUEST_HEADER_LENGTH: usize = 8; const SUCCESS_RESPONSE: u8 = 0; #[allow(dead_code)] const ERROR_RESPONSE: u8 = 1; diff --git a/crates/fluss/src/rpc/server_connection.rs b/crates/fluss/src/rpc/server_connection.rs index 87cd41f8..45732cfd 100644 --- a/crates/fluss/src/rpc/server_connection.rs +++ b/crates/fluss/src/rpc/server_connection.rs @@ -22,7 +22,8 @@ use crate::rpc::error::RpcError; use crate::rpc::error::RpcError::ConnectionError; use crate::rpc::frame::{AsyncMessageRead, AsyncMessageWrite}; use crate::rpc::message::{ - ReadVersionedType, RequestBody, RequestHeader, ResponseHeader, WriteVersionedType, + REQUEST_HEADER_LENGTH, ReadVersionedType, RequestBody, RequestHeader, ResponseHeader, + WriteVersionedType, }; use crate::rpc::transport::Transport; use futures::future::BoxFuture; @@ -469,7 +470,9 @@ where ConnectionState::Poison(e) => return Err(RpcError::Poisoned(Arc::clone(e)).into()), } - let mut request_metrics = RequestMetricsLifecycle::begin(R::API_KEY, buf.len() as u64); + // count only the API message body, excluding the protocol header. + let request_body_bytes = buf.len().saturating_sub(REQUEST_HEADER_LENGTH) as u64; + let mut request_metrics = RequestMetricsLifecycle::begin(R::API_KEY, request_body_bytes); if let Err(e) = self.send_message(buf).await { request_metrics.complete(0); @@ -491,7 +494,9 @@ where } }; - let response_bytes = response.data.get_ref().len() as u64; + // count only the API message body, excluding the response header. + let response_bytes = + (response.data.get_ref().len() as u64).saturating_sub(response.data.position()); request_metrics.complete(response_bytes); if let Some(error_response) = response.header.error_response { @@ -1044,9 +1049,10 @@ mod tests { 1, "API error response should count as completion like Java" ); - assert!( - bytes_received_after > bytes_received_before, - "API error response should record received bytes (error body)" + assert_eq!( + bytes_received_after - bytes_received_before, + 0, + "API error response should record zero body bytes like Java onRequestFailure" ); assert_eq!( inflight_after, 0.0,