diff --git a/Cargo.lock b/Cargo.lock index 63fd068fde..cb6d0cf8a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2827,6 +2827,23 @@ version = "0.2.178" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" +[[package]] +name = "libdd-agent-client" +version = "31.0.0" +dependencies = [ + "bytes", + "flate2", + "httpmock", + "libdd-common", + "libdd-http-client", + "rustls", + "serde", + "serde_json", + "serial_test", + "thiserror 2.0.17", + "tokio", +] + [[package]] name = "libdd-alloc" version = "1.0.0" diff --git a/Cargo.toml b/Cargo.toml index aabcc1d8ac..ca36f40e31 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ members = [ "libdd-tinybytes", "libdd-dogstatsd-client", "libdd-http-client", + "libdd-agent-client", "libdd-log", "libdd-log-ffi", ] diff --git a/libdd-agent-client/Cargo.toml b/libdd-agent-client/Cargo.toml new file mode 100644 index 0000000000..f8f80c17cb --- /dev/null +++ b/libdd-agent-client/Cargo.toml @@ -0,0 +1,32 @@ +# Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +# SPDX-License-Identifier: Apache-2.0 + +[package] +name = "libdd-agent-client" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +authors.workspace = true +description = "Datadog-agent-specialized HTTP client: language metadata injection, per-endpoint send methods, retry, and compression" +homepage = "https://github.com/DataDog/libdatadog/tree/main/libdd-agent-client" +repository = "https://github.com/DataDog/libdatadog/tree/main/libdd-agent-client" + +[lib] +bench = false + +[dependencies] +bytes = "1.4" +flate2 = "1" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +thiserror = "2" +tokio = { version = "1.23", features = ["rt"] } +libdd-http-client = { path = "../libdd-http-client" } +libdd-common = { version = "3.0.2", path = "../libdd-common", default-features = false } + +[dev-dependencies] +httpmock = "0.8.0-alpha.1" +rustls = { version = "0.23", default-features = false, features = ["ring"] } +serial_test = "3.2" +tokio = { version = "1.23", features = ["rt", "macros"] } diff --git a/libdd-agent-client/src/agent_info.rs b/libdd-agent-client/src/agent_info.rs new file mode 100644 index 0000000000..e9193f9d8b --- /dev/null +++ b/libdd-agent-client/src/agent_info.rs @@ -0,0 +1,30 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Types for [`crate::AgentClient::agent_info`]. + +/// Parsed response from a `GET /info` probe. +/// +/// Returned by [`crate::AgentClient::agent_info`]. Contains agent capabilities and headers. +#[derive(Debug, Clone)] +pub struct AgentInfo { + /// Available agent endpoints, e.g. `["/v0.4/traces", "/v0.5/traces"]`. + pub endpoints: Vec, + /// Whether the agent supports client-side P0 dropping. + pub client_drop_p0s: bool, + /// Raw agent configuration block. + pub config: serde_json::Value, + /// Agent version string, if reported. + pub version: Option, + /// Parsed from the `Datadog-Container-Tags-Hash` response header. + /// + /// Used by dd-trace-py to compute the base tag hash (`agent.py:17-23`). + pub container_tags_hash: Option, + /// Value of the `Datadog-Agent-State` response header from the last `/info` fetch. + /// + /// The agent updates this opaque token whenever its internal state changes (e.g. a + /// configuration reload). Clients that poll `/info` periodically can skip re-parsing + /// the response body by comparing this value to the one returned by the previous call + /// and only acting when it differs. + pub state_hash: Option, +} diff --git a/libdd-agent-client/src/builder.rs b/libdd-agent-client/src/builder.rs new file mode 100644 index 0000000000..5753351bae --- /dev/null +++ b/libdd-agent-client/src/builder.rs @@ -0,0 +1,537 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Builder for [`crate::AgentClient`]. + +use std::collections::HashMap; +use std::env; +#[cfg(unix)] +use std::path::PathBuf; +use std::time::Duration; +#[cfg(windows)] +use OsString; + +use libdd_http_client::RetryConfig; + +use crate::{error::BuildError, language_metadata::LanguageMetadata, AgentClient}; + +/// Default timeout for agent requests. +pub const DEFAULT_TIMEOUT_MS: u64 = 2_000; + +/// Default retry configuration: 2 retries (3 total attempts), 100 ms initial delay, +/// exponential backoff with full jitter. +//TODO: Do we really want something different from `RetryConfig::default()` for the agent? The only +//difference is the number of retries : 3 vs 2 +pub fn default_retry_config() -> RetryConfig { + RetryConfig::new() + .max_retries(2) + .initial_delay(Duration::from_millis(100)) + .with_jitter(true) +} + +/// Transport configuration for the agent client. +/// +/// Determines how the client connects to the Datadog agent. +/// Set via [`AgentClientBuilder::transport`] or the convenience helpers +/// [`AgentClientBuilder::http`], [`AgentClientBuilder::unix_socket`], etc. +#[derive(Debug, Clone)] +pub enum AgentTransport { + /// HTTP over TCP. + Http { + /// Hostname or IP address. + host: String, + /// Port number. + port: u16, + }, + /// Unix Domain Socket. + /// + /// HTTP requests are still formed with `Host: localhost`. The socket path governs only the + /// transport layer. + #[cfg(unix)] + UnixSocket { + /// Filesystem path to the socket file. + path: PathBuf, + }, + /// Windows Named Pipe. + #[cfg(windows)] + NamedPipe { + /// Named pipe path, e.g. `\\.\pipe\DD_APM_DRIVER`. + path: OsString, + }, +} + +impl Default for AgentTransport { + fn default() -> Self { + AgentTransport::Http { + host: "localhost".to_string(), + port: 8126, + } + } +} + +/// Builder for [`AgentClient`]. +/// +/// Obtain via [`AgentClient::builder`]. +/// +/// # Required fields +/// +/// - Transport: set via [`AgentClientBuilder::auto_detect`] (reads standard env vars and probes the +/// local socket) or an explicit convenience method ([`AgentClientBuilder::http`], +/// [`AgentClientBuilder::unix_socket`], [`AgentClientBuilder::windows_named_pipe`], +/// [`AgentClientBuilder::transport`]). +/// - [`AgentClientBuilder::language_metadata`]. +/// +/// # Test tokens +/// +/// Call [`AgentClientBuilder::test_agent_session_token`] to inject +/// `x-datadog-test-session-token` on every request. +#[derive(Debug, Default)] +pub struct AgentClientBuilder { + transport: Option, + test_token: Option, + timeout: Option, + language: Option, + retry: Option, + keep_alive: bool, + extra_headers: HashMap, +} + +impl AgentClientBuilder { + /// Create a new builder with default settings. + pub fn new() -> Self { + Self::default() + } + + /// Set the transport configuration. + pub fn transport(mut self, transport: AgentTransport) -> Self { + self.transport = Some(transport); + self + } + + /// Convenience: HTTP over TCP. + pub fn http(self, host: impl Into, port: u16) -> Self { + self.transport(AgentTransport::Http { + host: host.into(), + port, + }) + } + + /// Convenience: Unix Domain Socket. + #[cfg(unix)] + pub fn unix_socket(self, path: impl Into) -> Self { + self.transport(AgentTransport::UnixSocket { path: path.into() }) + } + + /// Convenience: Windows Named Pipe. + #[cfg(windows)] + pub fn windows_named_pipe(self, path: impl Into) -> Self { + self.transport(AgentTransport::NamedPipe { path: path.into() }) + } + + /// Auto-configure transport and timeout from the environment. + /// + /// Transport priority: + /// 1. `DD_TRACE_AGENT_URL` — parsed as `http://host:port` or `unix:///path`. + /// 2. `DD_AGENT_HOST` / `DD_TRACE_AGENT_PORT` — explicit host and/or port. + /// 3. `/var/run/datadog/apm.socket` — Unix domain socket if the file exists. + /// 4. `localhost:8126` — HTTP fallback. + /// + /// Timeout is read from `DD_TRACE_AGENT_TIMEOUT_SECONDS` (seconds, float), + /// defaulting to [`DEFAULT_TIMEOUT_MS`] when unset or unparseable. + #[cfg(unix)] + pub fn auto_detect(mut self) -> Self { + let transport = Self::transport_from_env().unwrap_or_else(|| { + let uds = PathBuf::from("/var/run/datadog/apm.socket"); + if uds.try_exists().unwrap_or(false) { + AgentTransport::UnixSocket { path: uds } + } else { + AgentTransport::Http { + host: "localhost".to_string(), + port: 8126, + } + } + }); + self.transport = Some(transport); + + self.timeout = Some( + env::var("DD_TRACE_AGENT_TIMEOUT_SECONDS") + .ok() + .and_then(|v| v.parse::().ok()) + .map(|secs| Duration::from_millis((secs * 1000.0) as u64)) + .unwrap_or(Duration::from_millis(DEFAULT_TIMEOUT_MS)), + ); + + self + } + + /// Read transport from env vars (`DD_TRACE_AGENT_URL`, then + /// `DD_AGENT_HOST`/`DD_TRACE_AGENT_PORT`). Returns `None` when none of the variables are + /// set. + #[cfg(unix)] + fn transport_from_env() -> Option { + if let Ok(url) = env::var("DD_TRACE_AGENT_URL") { + if let Some(t) = Self::parse_agent_url(&url) { + return Some(t); + } + } + + let host = env::var("DD_AGENT_HOST").ok(); + let port = env::var("DD_TRACE_AGENT_PORT") + .ok() + .and_then(|p| p.parse::().ok()); + + if host.is_some() || port.is_some() { + return Some(AgentTransport::Http { + host: host.unwrap_or_else(|| "localhost".to_string()), + port: port.unwrap_or(8126), + }); + } + + None + } + + /// Parse a Datadog agent URL into an [`AgentTransport`]. + /// + /// Supported schemes: `http://`, `https://`, `unix://`. + #[cfg(unix)] + fn parse_agent_url(url: &str) -> Option { + if let Some(after_scheme) = url.strip_prefix("unix://") { + // unix:///abs/path or unix://localhost/abs/path + let path = if after_scheme.starts_with('/') { + after_scheme + } else { + &after_scheme[after_scheme.find('/')?..] + }; + return Some(AgentTransport::UnixSocket { + path: PathBuf::from(path), + }); + } + + let rest = url + .strip_prefix("http://") + .or_else(|| url.strip_prefix("https://"))?; + + // Drop any trailing path (e.g. "host:port/") + let authority = rest.split('/').next().unwrap_or(rest); + let (host, port) = if let Some(colon) = authority.rfind(':') { + let port = authority[colon + 1..].parse::().ok().unwrap_or(8126); + (&authority[..colon], port) + } else { + (authority, 8126u16) + }; + + Some(AgentTransport::Http { + host: host.to_string(), + port, + }) + } + + /// Set the test session token. + /// + /// When set, `x-datadog-test-session-token: ` is injected on every request. + pub fn test_agent_session_token(mut self, token: impl Into) -> Self { + self.test_token = Some(token.into()); + self + } + + /// Set the request timeout. + /// + /// Defaults to [`DEFAULT_TIMEOUT_MS`] (2 000 ms) when not set. + pub fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = Some(timeout); + self + } + + /// Override the default retry configuration. + /// + /// Defaults to [`default_retry_config`]. + pub fn retry(mut self, config: RetryConfig) -> Self { + self.retry = Some(config); + self + } + + /// Set the language/runtime metadata injected into every request. Required. + pub fn language_metadata(mut self, meta: LanguageMetadata) -> Self { + self.language = Some(meta); + self + } + + /// Enable or disable HTTP keep-alive. Defaults to `false`. + /// + /// The Datadog agent has a low keep-alive timeout that causes "pipe closed" errors on every + /// second connection when keep-alive is enabled. The default of `false` is correct for all + /// periodic-flush writers (traces, stats, data streams). Set to `true` only for + /// high-frequency continuous senders (e.g. a streaming profiling exporter). + pub fn use_keep_alive(mut self, enabled: bool) -> Self { + self.keep_alive = enabled; + self + } + + // Compression + // + // Not exposed in this libv1. Gzip compression (level 6, matching dd-trace-py's trace writer at + // `writer.py:490`) will be added in a follow-up once the core send paths are stable. + // Per-method defaults (e.g. unconditional gzip for `send_pipeline_stats`) are already + // baked in; only the opt-in client-level `gzip(level)` builder knob is deferred. + + /// Additional custom headers to inject. + pub fn extra_headers(mut self, headers: HashMap) -> Self { + self.extra_headers = headers; + self + } + + /// Build the [`AgentClient`]. + pub fn build(self) -> Result { + let transport = self.transport.ok_or(BuildError::MissingTransport)?; + let language = self.language.ok_or(BuildError::MissingLanguageMetadata)?; + let timeout = self + .timeout + .unwrap_or(Duration::from_millis(DEFAULT_TIMEOUT_MS)); + let retry = self.retry.unwrap_or_else(default_retry_config); + + // Build the underlying HTTP client. + let http = Self::build_http_client(transport, timeout, retry) + .map_err(|e| BuildError::HttpClient(e.to_string()))?; + + // Pre-compute all static headers that are injected on every request. + let static_headers = + Self::build_static_headers(&language, self.test_token, self.extra_headers); + + Ok(AgentClient::new(http, static_headers)) + } + + fn build_http_client( + transport: AgentTransport, + timeout: Duration, + retry: RetryConfig, + ) -> Result { + let base_url = match &transport { + AgentTransport::Http { host, port } => format!("http://{}:{}", host, port), + #[cfg(unix)] + AgentTransport::UnixSocket { .. } => "http://localhost".to_string(), + #[cfg(windows)] + AgentTransport::NamedPipe { .. } => "http://localhost".to_string(), + }; + + let mut builder = libdd_http_client::HttpClient::builder() + .base_url(base_url) + .timeout(timeout) + // HTTP errors are handled by each send method, not by the underlying client. + // This allows methods like `agent_info` to interpret 404 as Ok(None) rather than + // an error, and avoids retrying on HTTP 4xx/5xx. + .treat_http_errors_as_errors(false) + .retry(retry); + + match transport { + AgentTransport::Http { .. } => {} + #[cfg(unix)] + AgentTransport::UnixSocket { path } => { + builder = builder.unix_socket(path); + } + #[cfg(windows)] + AgentTransport::NamedPipe { path } => { + builder = builder.windows_named_pipe(path); + } + } + + builder.build() + } + + fn build_static_headers( + language: &LanguageMetadata, + test_token: Option, + extra_headers: HashMap, + ) -> Vec<(String, String)> { + let mut headers = vec![ + ("Datadog-Meta-Lang".to_string(), language.language.clone()), + ( + "Datadog-Meta-Lang-Version".to_string(), + language.language_version.clone(), + ), + ( + "Datadog-Meta-Lang-Interpreter".to_string(), + language.interpreter.clone(), + ), + ( + "Datadog-Meta-Tracer-Version".to_string(), + language.tracer_version.clone(), + ), + ("User-Agent".to_string(), language.user_agent()), + ]; + + if let Some(token) = test_token { + headers.push(("x-datadog-test-session-token".to_string(), token)); + } + + headers.extend(Self::container_headers()); + headers.extend(extra_headers); + + headers + } + + /// Read container / entity-ID headers from the host environment. Always injects + /// `Datadog-External-Env` when `DD_EXTERNAL_ENV` is set. + fn container_headers() -> Vec<(String, String)> { + let mut headers = Vec::new(); + + if let Ok(env) = env::var("DD_EXTERNAL_ENV") { + if !env.is_empty() { + headers.push(("Datadog-External-Env".to_string(), env)); + } + } + + use libdd_common::entity_id; + + if let Some(container_id) = entity_id::get_container_id() { + headers.push(("Datadog-Container-Id".to_string(), container_id.to_owned())); + } + + if let Some(entity_id) = entity_id::get_entity_id() { + headers.push(("Datadog-Entity-ID".to_string(), entity_id.to_owned())); + } + + headers + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::env; + + #[test] + fn default_transport_is_localhost_8126() { + let t = AgentTransport::default(); + match t { + AgentTransport::Http { host, port } => { + assert_eq!(host, "localhost"); + assert_eq!(port, 8126); + } + #[allow(unreachable_patterns)] + _ => panic!("unexpected default transport"), + } + } + + #[test] + fn default_retry_config_is_constructable() { + // Just verify default_retry_config() doesn't panic. + let _cfg = default_retry_config(); + } + + #[test] + fn builder_new_is_default() { + let b = AgentClientBuilder::new(); + assert!(b.transport.is_none()); + assert!(b.language.is_none()); + assert!(!b.keep_alive); + } + + #[test] + fn build_fails_without_transport() { + let result = AgentClientBuilder::new() + .language_metadata(LanguageMetadata::new("python", "3.12", "CPython", "2.0")) + .build(); + assert!(matches!(result, Err(BuildError::MissingTransport))); + } + + #[test] + fn build_fails_without_language_metadata() { + let result = AgentClientBuilder::new().http("localhost", 8126).build(); + assert!(matches!(result, Err(BuildError::MissingLanguageMetadata))); + } + + #[test] + fn build_succeeds_with_required_fields() { + let _ = rustls::crypto::ring::default_provider().install_default(); + let result = AgentClientBuilder::new() + .http("localhost", 8126) + .language_metadata(LanguageMetadata::new("python", "3.12", "CPython", "2.0")) + .build(); + assert!(result.is_ok()); + } + + #[cfg(unix)] + #[test] + #[serial_test::serial] + fn auto_detect_uses_dd_trace_agent_url_http() { + env::set_var("DD_TRACE_AGENT_URL", "http://myhost:9000"); + env::remove_var("DD_AGENT_HOST"); + env::remove_var("DD_TRACE_AGENT_PORT"); + let b = AgentClientBuilder::new().auto_detect(); + env::remove_var("DD_TRACE_AGENT_URL"); + assert!(matches!( + b.transport, + Some(AgentTransport::Http { ref host, port }) + if host == "myhost" && port == 9000 + )); + } + + #[cfg(unix)] + #[test] + #[serial_test::serial] + fn auto_detect_uses_dd_trace_agent_url_unix() { + env::set_var("DD_TRACE_AGENT_URL", "unix:///tmp/test.sock"); + env::remove_var("DD_AGENT_HOST"); + env::remove_var("DD_TRACE_AGENT_PORT"); + let b = AgentClientBuilder::new().auto_detect(); + env::remove_var("DD_TRACE_AGENT_URL"); + assert!(matches!( + b.transport, + Some(AgentTransport::UnixSocket { ref path }) + if path.to_str() == Some("/tmp/test.sock") + )); + } + + #[cfg(unix)] + #[test] + #[serial_test::serial] + fn auto_detect_uses_dd_agent_host_and_port() { + env::remove_var("DD_TRACE_AGENT_URL"); + env::set_var("DD_AGENT_HOST", "remotehost"); + env::set_var("DD_TRACE_AGENT_PORT", "7777"); + let b = AgentClientBuilder::new().auto_detect(); + env::remove_var("DD_AGENT_HOST"); + env::remove_var("DD_TRACE_AGENT_PORT"); + assert!(matches!( + b.transport, + Some(AgentTransport::Http { ref host, port }) + if host == "remotehost" && port == 7777 + )); + } + + #[cfg(unix)] + #[test] + #[serial_test::serial] + fn auto_detect_reads_timeout_from_env() { + env::remove_var("DD_TRACE_AGENT_URL"); + env::remove_var("DD_AGENT_HOST"); + env::remove_var("DD_TRACE_AGENT_PORT"); + env::set_var("DD_TRACE_AGENT_TIMEOUT_SECONDS", "5"); + let b = AgentClientBuilder::new().auto_detect(); + env::remove_var("DD_TRACE_AGENT_TIMEOUT_SECONDS"); + assert_eq!(b.timeout, Some(Duration::from_secs(5))); + } + + #[cfg(unix)] + #[test] + #[serial_test::serial] + fn auto_detect_uses_default_timeout_when_unset() { + env::remove_var("DD_TRACE_AGENT_URL"); + env::remove_var("DD_AGENT_HOST"); + env::remove_var("DD_TRACE_AGENT_PORT"); + env::remove_var("DD_TRACE_AGENT_TIMEOUT_SECONDS"); + let b = AgentClientBuilder::new().auto_detect(); + assert_eq!(b.timeout, Some(Duration::from_millis(DEFAULT_TIMEOUT_MS))); + } + + #[test] + fn extra_headers_stored() { + let mut headers = HashMap::new(); + headers.insert("X-Custom".to_string(), "value".to_string()); + let b = AgentClientBuilder::new().extra_headers(headers); + assert_eq!( + b.extra_headers.get("X-Custom").map(|s| s.as_str()), + Some("value") + ); + } +} diff --git a/libdd-agent-client/src/client.rs b/libdd-agent-client/src/client.rs new file mode 100644 index 0000000000..d63cf73e60 --- /dev/null +++ b/libdd-agent-client/src/client.rs @@ -0,0 +1,324 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! [`AgentClient`] and its send methods. + +use std::collections::HashMap; + +use bytes::Bytes; +use flate2::{write::GzEncoder, Compression}; +use libdd_http_client::{HttpClient, HttpMethod, HttpRequest}; +use serde_json::{from_slice, Value}; +use std::io::Write as _; + +use crate::{ + agent_info::AgentInfo, + builder::AgentClientBuilder, + error::SendError, + telemetry::TelemetryRequest, + traces::{AgentResponse, TraceFormat, TraceSendOptions}, +}; + +/// A Datadog-agent-specialized HTTP client. +/// +/// Wraps a configured [`libdd_http_client::HttpClient`] and injects Datadog-specific headers +/// automatically on every request: +/// +/// - Language metadata headers (`Datadog-Meta-Lang`, `Datadog-Meta-Lang-Version`, +/// `Datadog-Meta-Lang-Interpreter`, `Datadog-Meta-Tracer-Version`) from the [`LanguageMetadata`] +/// supplied when creating the client. +/// - `User-Agent` derived from [`LanguageMetadata::user_agent`]. +/// - Container/entity-ID headers (`Datadog-Container-Id`, `Datadog-Entity-ID`, +/// `Datadog-External-Env`) read from `/proc/self/cgroup` at startup. +/// - `x-datadog-test-session-token` when a test token was set. +/// - Any extra headers registered via [`AgentClientBuilder::extra_headers`]. +/// +/// Obtain via [`AgentClient::builder`]. +/// +/// [`LanguageMetadata`]: crate::LanguageMetadata +pub struct AgentClient { + http: HttpClient, + base_url: String, + static_headers: Vec<(String, String)>, +} + +impl AgentClient { + pub(crate) fn new(http: HttpClient, static_headers: Vec<(String, String)>) -> Self { + let base_url = http.config().base_url().to_string(); + Self { + http, + base_url, + static_headers, + } + } + + /// Create a new [`AgentClientBuilder`]. + pub fn builder() -> AgentClientBuilder { + AgentClientBuilder::new() + } + + /// Send a serialised trace payload to the agent with automatically injected headers. + /// + /// # Returns + /// + /// An [`AgentResponse`] with the HTTP status and the parsed `rate_by_service` sampling + /// rates from the agent response body. + pub async fn send_traces( + &self, + payload: Bytes, + trace_count: usize, + format: TraceFormat, + opts: TraceSendOptions, + ) -> Result { + let (path, content_type) = match format { + TraceFormat::MsgpackV5 => ("/v0.5/traces", "application/msgpack"), + TraceFormat::MsgpackV4 => ("/v0.4/traces", "application/msgpack"), + }; + + let mut request = HttpRequest::new(HttpMethod::Put, format!("{}{}", self.base_url, path)) + .with_body(payload) + .with_headers(self.static_headers.iter().cloned()) + .with_header("Content-Type", content_type) + .with_header("X-Datadog-Trace-Count", trace_count.to_string()) + .with_header("Datadog-Send-Real-Http-Status", "true"); + + if opts.computed_top_level { + request = request.with_header("Datadog-Client-Computed-Top-Level", "yes"); + } + + let response = self.http.send(request).await?; + + if response.status_code() >= 400 { + return Err(SendError::HttpError { + status: response.status_code(), + body: response.body().clone(), + }); + } + + let rate_by_service = parse_rate_by_service(response.body()); + Ok(AgentResponse { + status: response.status_code(), + rate_by_service, + }) + } + + /// Send span stats (APM concentrator buckets) to `/v0.6/stats`. + pub async fn send_stats(&self, payload: Bytes) -> Result<(), SendError> { + let request = HttpRequest::new(HttpMethod::Put, format!("{}/v0.6/stats", self.base_url)) + .with_body(payload) + .with_headers(self.static_headers.iter().cloned()) + .with_header("Content-Type", "application/msgpack"); + + let response = self.http.send(request).await?; + check_status(response) + } + + /// Send data-streams pipeline stats to `/v0.1/pipeline_stats`. + /// + /// The payload is **always** gzip-compressed regardless of the client-level compression + /// setting. This is a protocol requirement of the data-streams endpoint. + pub async fn send_pipeline_stats(&self, payload: Bytes) -> Result<(), SendError> { + let request = HttpRequest::new( + HttpMethod::Put, + format!("{}/v0.1/pipeline_stats", self.base_url), + ) + .with_body(gzip_compress(payload)?) + .with_headers(self.static_headers.iter().cloned()) + .with_header("Content-Type", "application/msgpack") + .with_header("Content-Encoding", "gzip"); + + let response = self.http.send(request).await?; + check_status(response) + } + + /// Send a telemetry event to the agent's telemetry proxy + /// (`telemetry/proxy/api/v2/apmtelemetry`). + pub async fn send_telemetry(&self, req: TelemetryRequest) -> Result<(), SendError> { + let request = HttpRequest::new( + HttpMethod::Post, + format!("{}/telemetry/proxy/api/v2/apmtelemetry", self.base_url), + ) + .with_body(req.body) + .with_headers(self.static_headers.iter().cloned()) + .with_header("Content-Type", "application/json") + .with_header("DD-Telemetry-Request-Type", &req.request_type) + .with_header("DD-Telemetry-API-Version", &req.api_version) + .with_header( + "DD-Telemetry-Debug-Enabled", + if req.debug { "true" } else { "false" }, + ); + + let response = self.http.send(request).await?; + check_status(response) + } + + /// Send an event via the agent's EVP (Event Platform) proxy. + /// + /// The agent forwards the request to `.datadoghq.com`. `subdomain` + /// controls the target intake (injected as `X-Datadog-EVP-Subdomain`); `path` is the + /// endpoint on that intake (e.g. `/api/v2/exposures`). + pub async fn send_evp_event( + &self, + subdomain: &str, + path: &str, + payload: Bytes, + content_type: &str, + ) -> Result<(), SendError> { + let request = HttpRequest::new(HttpMethod::Post, format!("{}{}", self.base_url, path)) + .with_body(payload) + .with_headers(self.static_headers.iter().cloned()) + .with_header("Content-Type", content_type) + .with_header("X-Datadog-EVP-Subdomain", subdomain); + + let response = self.http.send(request).await?; + check_status(response) + } + + /// Probe `GET /info` and return parsed agent capabilities. + /// + /// Returns `Ok(None)` when the agent returns 404 (remote-config / info not supported). + pub async fn agent_info(&self) -> Result, SendError> { + #[derive(serde::Deserialize)] + struct InfoResponse { + version: Option, + endpoints: Option>, + client_drop_p0s: Option, + config: Option, + } + + let request = HttpRequest::new(HttpMethod::Get, format!("{}/info", self.base_url)) + .with_headers(self.static_headers.iter().cloned()); + + let response = self.http.send(request).await?; + + if response.status_code() == 404 { + return Ok(None); + } + + if response.status_code() >= 400 { + return Err(SendError::HttpError { + status: response.status_code(), + body: response.body().clone(), + }); + } + + // Case-insensitive lookup of a response header value. + let header = |name: &str| -> Option { + response + .headers() + .iter() + .find(|(k, _)| k.eq_ignore_ascii_case(name)) + .map(|(_, v)| v.clone()) + }; + + let container_tags_hash = header("datadog-container-tags-hash"); + let state_hash = header("datadog-agent-state"); + + let info: InfoResponse = + from_slice(response.body()).map_err(|e| SendError::Encoding(e.to_string()))?; + + Ok(Some(AgentInfo { + endpoints: info.endpoints.unwrap_or_default(), + client_drop_p0s: info.client_drop_p0s.unwrap_or(false), + config: info.config.unwrap_or(Value::Null), + version: info.version, + container_tags_hash, + state_hash, + })) + } +} + +/// Parse `rate_by_service` from an agent trace response body. +fn parse_rate_by_service(body: &Bytes) -> Option> { + #[derive(serde::Deserialize)] + struct TraceResponse { + rate_by_service: Option>, + } + + from_slice::(body) + .ok() + .and_then(|r| r.rate_by_service) +} + +/// Return `Ok(())` for 2xx, or `Err(SendError::HttpError)` for anything else. +fn check_status(response: libdd_http_client::HttpResponse) -> Result<(), SendError> { + if response.status_code() >= 400 { + Err(SendError::HttpError { + status: response.status_code(), + body: response.body().clone(), + }) + } else { + Ok(()) + } +} + +/// Gzip-compress `payload` at level 6 (matching dd-trace-py's trace writer). +fn gzip_compress(payload: Bytes) -> Result { + let mut encoder = GzEncoder::new(Vec::new(), Compression::new(6)); + encoder + .write_all(&payload) + .map_err(|e| SendError::Encoding(e.to_string()))?; + let compressed = encoder + .finish() + .map_err(|e| SendError::Encoding(e.to_string()))?; + Ok(Bytes::from(compressed)) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{AgentClient, LanguageMetadata}; + + fn ensure_crypto_provider() { + let _ = rustls::crypto::ring::default_provider().install_default(); + } + + fn test_client(port: u16) -> AgentClient { + ensure_crypto_provider(); + AgentClient::builder() + .http("localhost", port) + .language_metadata(LanguageMetadata::new("python", "3.12", "CPython", "2.0")) + .build() + .unwrap() + } + + #[test] + fn builder_roundtrip() { + let client = test_client(8126); + assert!(client.base_url.contains("localhost")); + } + + #[test] + fn static_headers_contain_language_metadata() { + let client = test_client(8126); + let keys: Vec<&str> = client + .static_headers + .iter() + .map(|(k, _)| k.as_str()) + .collect(); + assert!(keys.contains(&"Datadog-Meta-Lang")); + assert!(keys.contains(&"Datadog-Meta-Lang-Version")); + assert!(keys.contains(&"User-Agent")); + } + + #[test] + fn gzip_compress_produces_valid_gzip() { + let input = Bytes::from_static(b"hello world"); + let compressed = gzip_compress(input).unwrap(); + // gzip magic bytes: 0x1f 0x8b + assert_eq!(&compressed[..2], &[0x1f, 0x8b]); + } + + #[test] + fn parse_rate_by_service_valid_json() { + let body = Bytes::from(r#"{"rate_by_service":{"service:env":0.5}}"#); + let rates = parse_rate_by_service(&body).unwrap(); + assert_eq!(rates.get("service:env"), Some(&0.5)); + } + + #[test] + fn parse_rate_by_service_absent_field() { + let body = Bytes::from(r#"{"other":"value"}"#); + assert!(parse_rate_by_service(&body).is_none()); + } +} diff --git a/libdd-agent-client/src/error.rs b/libdd-agent-client/src/error.rs new file mode 100644 index 0000000000..94009a7889 --- /dev/null +++ b/libdd-agent-client/src/error.rs @@ -0,0 +1,69 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Error types for [`crate::AgentClient`]. + +use std::io::{Error, ErrorKind}; +use bytes::Bytes; +use libdd_http_client::HttpClientError; +use thiserror::Error; + +/// Errors that can occur when building an [`crate::AgentClient`]. +#[derive(Debug, Error)] +pub enum BuildError { + /// No transport was configured. + #[error("transport is required")] + MissingTransport, + /// No language metadata was configured. + #[error("language metadata is required")] + MissingLanguageMetadata, + /// The underlying HTTP client could not be constructed. + #[error("HTTP client error: {0}")] + HttpClient(String), +} + +/// Errors that can occur when sending a request via [`crate::AgentClient`]. +#[derive(Debug, Error)] +pub enum SendError { + /// Connection refused, timeout, or I/O error. + #[error("transport error: {0}")] + Transport(#[source] std::io::Error), + /// The server returned an HTTP error status. Includes the raw status and body. + #[error("HTTP error {status}: {body:?}")] + HttpError { + /// HTTP status code returned by the server. + status: u16, + /// Raw response body. + body: Bytes, + }, + /// All retry attempts exhausted without a successful response. + #[error("retries exhausted: {last_error}")] + RetriesExhausted { + /// The last error encountered before giving up. + last_error: Box, + }, + /// Payload serialisation or compression failure. + #[error("encoding error: {0}")] + Encoding(String), +} + +impl From for SendError { + fn from(err: HttpClientError) -> Self { + match err { + HttpClientError::ConnectionFailed(s) => { + SendError::Transport(Error::new(ErrorKind::ConnectionRefused, s)) + } + HttpClientError::TimedOut => { + SendError::Transport(Error::new(ErrorKind::TimedOut, "request timed out")) + } + HttpClientError::IoError(s) => SendError::Transport(Error::other(s)), + HttpClientError::InvalidConfig(s) => { + SendError::Transport(Error::new(ErrorKind::InvalidInput, s)) + } + HttpClientError::RequestFailed { status, body } => SendError::HttpError { + status, + body: Bytes::from(body), + }, + } + } +} diff --git a/libdd-agent-client/src/language_metadata.rs b/libdd-agent-client/src/language_metadata.rs new file mode 100644 index 0000000000..eb599f90ee --- /dev/null +++ b/libdd-agent-client/src/language_metadata.rs @@ -0,0 +1,69 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Language/runtime metadata injected into every outgoing request. + +/// Language and runtime metadata that is automatically injected into every request as +/// `Datadog-Meta-*` headers and drives the `User-Agent` string. +#[derive(Debug, Clone)] +pub struct LanguageMetadata { + /// Value of `Datadog-Meta-Lang`, e.g. `"python"`. + pub language: String, + /// Value of `Datadog-Meta-Lang-Version`, e.g. `"3.12.1"`. + pub language_version: String, + /// Value of `Datadog-Meta-Lang-Interpreter`, e.g. `"CPython"`. + pub interpreter: String, + /// Value of `Datadog-Meta-Tracer-Version`, e.g. `"2.18.0"`. + pub tracer_version: String, +} + +impl LanguageMetadata { + /// Construct a new `LanguageMetadata`. + pub fn new( + language: impl Into, + language_version: impl Into, + interpreter: impl Into, + tracer_version: impl Into, + ) -> Self { + Self { + language: language.into(), + language_version: language_version.into(), + interpreter: interpreter.into(), + tracer_version: tracer_version.into(), + } + } + + /// Produces the `User-Agent` string passed to `Endpoint::to_request_builder()`. + /// + /// Format: `dd-trace-/`, e.g. `dd-trace-python/2.18.0`. + #[inline] + pub fn user_agent(&self) -> String { + format!("dd-trace-{}/{}", self.language, self.tracer_version) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn new_stores_fields() { + let m = LanguageMetadata::new("python", "3.12.1", "CPython", "2.18.0"); + assert_eq!(m.language, "python"); + assert_eq!(m.language_version, "3.12.1"); + assert_eq!(m.interpreter, "CPython"); + assert_eq!(m.tracer_version, "2.18.0"); + } + + #[test] + fn user_agent_format() { + let m = LanguageMetadata::new("python", "3.12.1", "CPython", "2.18.0"); + assert_eq!(m.user_agent(), "dd-trace-python/2.18.0"); + } + + #[test] + fn user_agent_ruby() { + let m = LanguageMetadata::new("ruby", "3.2.0", "MRI", "1.13.0"); + assert_eq!(m.user_agent(), "dd-trace-ruby/1.13.0"); + } +} diff --git a/libdd-agent-client/src/lib.rs b/libdd-agent-client/src/lib.rs new file mode 100644 index 0000000000..605c6d03ca --- /dev/null +++ b/libdd-agent-client/src/lib.rs @@ -0,0 +1,67 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! This crate provides a Datadog-agent-specific HTTP client sitting on top of the basic +//! `libdd-http-client` primitives. The API is higher-level and makes agent-specific settings +//! (headers, etc.) the default rather than opt-in boilerplate. +//! +//! # Quick start +//! +//! Call [`AgentClientBuilder::auto_detect`] to let the client configure transport and timeout +//! from the standard Datadog environment variables (`DD_TRACE_AGENT_URL`, `DD_AGENT_HOST`, +//! `DD_TRACE_AGENT_PORT`, `DD_TRACE_AGENT_TIMEOUT_SECONDS`), falling back to a local Unix +//! socket at `/var/run/datadog/apm.socket` when it exists, and finally to `localhost:8126`. +//! +//! ```rust,no_run +//! # #[cfg(unix)] +//! # fn example() -> Result<(), libdd_agent_client::BuildError> { +//! use libdd_agent_client::{AgentClient, LanguageMetadata}; +//! +//! let client = AgentClient::builder() +//! .auto_detect() +//! .language_metadata(LanguageMetadata::new( +//! "python", "3.12.1", "CPython", "2.18.0", +//! )) +//! .build()?; +//! # Ok(()) +//! # } +//! ``` +//! +//! # Explicit transport +//! +//! When the host and port are known at build time, set them directly: +//! +//! ```rust,no_run +//! # fn example() -> Result<(), libdd_agent_client::BuildError> { +//! use libdd_agent_client::{AgentClient, LanguageMetadata}; +//! +//! let client = AgentClient::builder() +//! .http("localhost", 8126) +//! .language_metadata(LanguageMetadata::new( +//! "python", "3.12.1", "CPython", "2.18.0", +//! )) +//! .build()?; +//! # Ok(()) +//! # } +//! ``` +//! +//! # Fork safety +//! +//! The underlying `libdd-http-client` uses the `hickory-dns` DNS resolver by default, which is +//! in-process and fork-safe. + +pub mod agent_info; +pub mod builder; +pub mod client; +pub mod error; +pub mod language_metadata; +pub mod telemetry; +pub mod traces; + +pub use agent_info::AgentInfo; +pub use builder::{AgentClientBuilder, AgentTransport}; +pub use client::AgentClient; +pub use error::{BuildError, SendError}; +pub use language_metadata::LanguageMetadata; +pub use telemetry::TelemetryRequest; +pub use traces::{AgentResponse, TraceFormat, TraceSendOptions}; diff --git a/libdd-agent-client/src/telemetry.rs b/libdd-agent-client/src/telemetry.rs new file mode 100644 index 0000000000..de1630b10b --- /dev/null +++ b/libdd-agent-client/src/telemetry.rs @@ -0,0 +1,26 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Types specific to [`crate::AgentClient::send_telemetry`]. + +/// A single telemetry event to send via [`crate::AgentClient::send_telemetry`]. +/// +/// The three per-request headers `DD-Telemetry-Request-Type`, `DD-Telemetry-API-Version`, and +/// `DD-Telemetry-Debug-Enabled` are derived automatically from the struct by the client. +/// +/// The client always routes to the agent telemetry proxy endpoint +/// (`telemetry/proxy/api/v2/apmtelemetry`). +#[derive(Debug, Clone)] +pub struct TelemetryRequest { + /// Value for the `DD-Telemetry-Request-Type` header, e.g. `"app-started"`. + pub request_type: String, + /// Value for the `DD-Telemetry-API-Version` header, e.g. `"v2"`. + pub api_version: String, + /// Value for the `DD-Telemetry-Debug-Enabled` header. + pub debug: bool, + /// Pre-serialized JSON payload body. + /// + /// The caller is responsible for serializing the event body to JSON before constructing this + /// struct. The client sends these bytes as `Content-Type: application/json`. + pub body: bytes::Bytes, +} diff --git a/libdd-agent-client/src/traces.rs b/libdd-agent-client/src/traces.rs new file mode 100644 index 0000000000..28827b87f4 --- /dev/null +++ b/libdd-agent-client/src/traces.rs @@ -0,0 +1,46 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Types specific to [`crate::AgentClient::send_traces`]. + +use std::collections::HashMap; + +/// Wire format of the trace payload. +/// +/// Determines both the `Content-Type` header and the target endpoint. +/// +/// # Format selection +/// +/// The caller is currently responsible for choosing the format. In practice this means starting +/// with [`TraceFormat::MsgpackV5`] and downgrading to [`TraceFormat::MsgpackV4`] when the agent +/// returns 404 or 415 (e.g. on Windows, or when AppSec/IAST is active). +/// +/// In a future version this negotiation may be moved into the client itself so that format +/// selection becomes automatic and callers no longer need to track the downgrade state. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TraceFormat { + /// `application/msgpack` to `/v0.5/traces`. Preferred format. + MsgpackV5, + /// `application/msgpack` to `/v0.4/traces`. Fallback for Windows / AppSec. + MsgpackV4, +} + +/// Per-request options for [`crate::AgentClient::send_traces`]. +#[derive(Debug, Clone, Default)] +pub struct TraceSendOptions { + /// When `true`, appends `Datadog-Client-Computed-Top-Level: yes`. + /// + /// Signals to the agent that the client has already marked top-level spans, allowing the agent + /// to skip its own top-level computation. + pub computed_top_level: bool, +} + +/// Parsed response from the agent after a successful trace submission. +#[derive(Debug, Clone)] +pub struct AgentResponse { + /// HTTP status code returned by the agent. + pub status: u16, + /// Per-service sampling rates parsed from the `rate_by_service` field of the agent response + /// body, if present. + pub rate_by_service: Option>, +} diff --git a/libdd-agent-client/tests/agent_info.rs b/libdd-agent-client/tests/agent_info.rs new file mode 100644 index 0000000000..a81101a472 --- /dev/null +++ b/libdd-agent-client/tests/agent_info.rs @@ -0,0 +1,62 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +mod common; + +use httpmock::prelude::*; + +#[tokio::test] +async fn parses_info_response() { + let server = MockServer::start(); + server.mock(|when, then| { + when.method(GET).path("/info"); + then.status(200).body( + r#"{ + "version": "7.50.0", + "endpoints": ["/v0.4/traces", "/v0.5/traces"], + "client_drop_p0s": true, + "config": {} + }"#, + ); + }); + + let info = common::client_for(&server) + .agent_info() + .await + .unwrap() + .expect("expected Some"); + + assert_eq!(info.version.as_deref(), Some("7.50.0")); + assert!(info.endpoints.contains(&"/v0.5/traces".to_string())); + assert!(info.client_drop_p0s); +} + +#[tokio::test] +async fn returns_none_on_404() { + let server = MockServer::start(); + server.mock(|when, then| { + when.method(GET).path("/info"); + then.status(404).body("not found"); + }); + + let result = common::client_for(&server).agent_info().await.unwrap(); + assert!(result.is_none()); +} + +#[tokio::test] +async fn extracts_container_tags_hash_header() { + let server = MockServer::start(); + server.mock(|when, then| { + when.method(GET).path("/info"); + then.status(200) + .header("Datadog-Container-Tags-Hash", "abc123") + .body(r#"{"endpoints":[],"client_drop_p0s":false}"#); + }); + + let info = common::client_for(&server) + .agent_info() + .await + .unwrap() + .unwrap(); + assert_eq!(info.container_tags_hash.as_deref(), Some("abc123")); +} diff --git a/libdd-agent-client/tests/common.rs b/libdd-agent-client/tests/common.rs new file mode 100644 index 0000000000..79f0869d9f --- /dev/null +++ b/libdd-agent-client/tests/common.rs @@ -0,0 +1,20 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use httpmock::MockServer; +use libdd_agent_client::{AgentClient, LanguageMetadata}; + +pub fn ensure_crypto_provider() { + let _ = rustls::crypto::ring::default_provider().install_default(); +} + +pub fn client_for(server: &MockServer) -> AgentClient { + ensure_crypto_provider(); + AgentClient::builder() + .http("localhost", server.port()) + .language_metadata(LanguageMetadata::new( + "python", "3.12.1", "CPython", "2.18.0", + )) + .build() + .expect("client build failed") +} diff --git a/libdd-agent-client/tests/evp_event.rs b/libdd-agent-client/tests/evp_event.rs new file mode 100644 index 0000000000..9df3167520 --- /dev/null +++ b/libdd-agent-client/tests/evp_event.rs @@ -0,0 +1,30 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +mod common; + +use bytes::Bytes; +use httpmock::prelude::*; + +#[tokio::test] +async fn posts_to_path_with_subdomain_header() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(POST) + .path("/api/v2/exposures") + .header("X-Datadog-EVP-Subdomain", "event-platform-intake"); + then.status(200).body(""); + }); + + common::client_for(&server) + .send_evp_event( + "event-platform-intake", + "/api/v2/exposures", + Bytes::from_static(b"{}"), + "application/json", + ) + .await + .unwrap(); + + mock.assert(); +} diff --git a/libdd-agent-client/tests/pipeline_stats.rs b/libdd-agent-client/tests/pipeline_stats.rs new file mode 100644 index 0000000000..7df264b637 --- /dev/null +++ b/libdd-agent-client/tests/pipeline_stats.rs @@ -0,0 +1,41 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +mod common; + +use bytes::Bytes; +use httpmock::prelude::*; + +#[tokio::test] +async fn puts_to_correct_endpoint() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(PUT).path("/v0.1/pipeline_stats"); + then.status(200).body(""); + }); + + common::client_for(&server) + .send_pipeline_stats(Bytes::from_static(b"\x80")) + .await + .unwrap(); + + mock.assert(); +} + +#[tokio::test] +async fn sets_gzip_encoding() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(PUT) + .path("/v0.1/pipeline_stats") + .header("Content-Encoding", "gzip"); + then.status(200).body(""); + }); + + common::client_for(&server) + .send_pipeline_stats(Bytes::from_static(b"\x80")) + .await + .unwrap(); + + mock.assert(); +} diff --git a/libdd-agent-client/tests/static_headers.rs b/libdd-agent-client/tests/static_headers.rs new file mode 100644 index 0000000000..3d1ef30dfd --- /dev/null +++ b/libdd-agent-client/tests/static_headers.rs @@ -0,0 +1,68 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +mod common; + +use bytes::Bytes; +use httpmock::prelude::*; +use libdd_agent_client::{AgentClient, LanguageMetadata, TraceFormat, TraceSendOptions}; + +#[tokio::test] +async fn language_metadata_headers_injected_on_all_requests() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(PUT) + .path("/v0.5/traces") + .header("Datadog-Meta-Lang", "python") + .header("Datadog-Meta-Lang-Version", "3.12.1") + .header("Datadog-Meta-Lang-Interpreter", "CPython") + .header("Datadog-Meta-Tracer-Version", "2.18.0") + .header("User-Agent", "dd-trace-python/2.18.0"); + then.status(200).body(r#"{}"#); + }); + + common::client_for(&server) + .send_traces( + Bytes::from_static(b""), + 0, + TraceFormat::MsgpackV5, + TraceSendOptions::default(), + ) + .await + .unwrap(); + + mock.assert(); +} + +#[tokio::test] +async fn test_token_injected_when_set() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(PUT) + .path("/v0.5/traces") + .header("x-datadog-test-session-token", "my-token"); + then.status(200).body(r#"{}"#); + }); + + common::ensure_crypto_provider(); + let client = AgentClient::builder() + .http("localhost", server.port()) + .language_metadata(LanguageMetadata::new( + "python", "3.12.1", "CPython", "2.18.0", + )) + .test_agent_session_token("my-token") + .build() + .unwrap(); + + client + .send_traces( + Bytes::from_static(b""), + 0, + TraceFormat::MsgpackV5, + TraceSendOptions::default(), + ) + .await + .unwrap(); + + mock.assert(); +} diff --git a/libdd-agent-client/tests/stats.rs b/libdd-agent-client/tests/stats.rs new file mode 100644 index 0000000000..81a3839780 --- /dev/null +++ b/libdd-agent-client/tests/stats.rs @@ -0,0 +1,41 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +mod common; + +use bytes::Bytes; +use httpmock::prelude::*; + +#[tokio::test] +async fn puts_to_v06_stats() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(PUT).path("/v0.6/stats"); + then.status(200).body(""); + }); + + common::client_for(&server) + .send_stats(Bytes::from_static(b"\x80")) + .await + .unwrap(); + + mock.assert(); +} + +#[tokio::test] +async fn sets_msgpack_content_type() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(PUT) + .path("/v0.6/stats") + .header("Content-Type", "application/msgpack"); + then.status(200).body(""); + }); + + common::client_for(&server) + .send_stats(Bytes::from_static(b"\x80")) + .await + .unwrap(); + + mock.assert(); +} diff --git a/libdd-agent-client/tests/telemetry.rs b/libdd-agent-client/tests/telemetry.rs new file mode 100644 index 0000000000..e663f77e49 --- /dev/null +++ b/libdd-agent-client/tests/telemetry.rs @@ -0,0 +1,55 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +mod common; + +use bytes::Bytes; +use httpmock::prelude::*; +use libdd_agent_client::TelemetryRequest; + +#[tokio::test] +async fn posts_to_telemetry_proxy() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(POST) + .path("/telemetry/proxy/api/v2/apmtelemetry"); + then.status(202).body(""); + }); + + common::client_for(&server) + .send_telemetry(TelemetryRequest { + request_type: "app-started".to_string(), + api_version: "v2".to_string(), + debug: false, + body: Bytes::from_static(b"{}"), + }) + .await + .unwrap(); + + mock.assert(); +} + +#[tokio::test] +async fn injects_per_request_headers() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(POST) + .path("/telemetry/proxy/api/v2/apmtelemetry") + .header("DD-Telemetry-Request-Type", "app-started") + .header("DD-Telemetry-API-Version", "v2") + .header("DD-Telemetry-Debug-Enabled", "false"); + then.status(202).body(""); + }); + + common::client_for(&server) + .send_telemetry(TelemetryRequest { + request_type: "app-started".to_string(), + api_version: "v2".to_string(), + debug: false, + body: Bytes::from_static(b"{}"), + }) + .await + .unwrap(); + + mock.assert(); +} diff --git a/libdd-agent-client/tests/traces.rs b/libdd-agent-client/tests/traces.rs new file mode 100644 index 0000000000..7dd0fe3791 --- /dev/null +++ b/libdd-agent-client/tests/traces.rs @@ -0,0 +1,173 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +mod common; + +use bytes::Bytes; +use httpmock::prelude::*; +use libdd_agent_client::{TraceFormat, TraceSendOptions}; + +#[tokio::test] +async fn v5_puts_to_correct_endpoint() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(PUT).path("/v0.5/traces"); + then.status(200).body(r#"{"rate_by_service":{}}"#); + }); + + let resp = common::client_for(&server) + .send_traces( + Bytes::from_static(b"\x91\x90"), + 1, + TraceFormat::MsgpackV5, + TraceSendOptions::default(), + ) + .await + .unwrap(); + + mock.assert(); + assert_eq!(resp.status, 200); +} + +#[tokio::test] +async fn v4_puts_to_v4_endpoint() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(PUT).path("/v0.4/traces"); + then.status(200).body(r#"{}"#); + }); + + common::client_for(&server) + .send_traces( + Bytes::from_static(b"\x91\x90"), + 1, + TraceFormat::MsgpackV4, + TraceSendOptions::default(), + ) + .await + .unwrap(); + + mock.assert(); +} + +#[tokio::test] +async fn injects_trace_count_header() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(PUT) + .path("/v0.5/traces") + .header("X-Datadog-Trace-Count", "42"); + then.status(200).body(r#"{}"#); + }); + + common::client_for(&server) + .send_traces( + Bytes::from_static(b"\x91\x90"), + 42, + TraceFormat::MsgpackV5, + TraceSendOptions::default(), + ) + .await + .unwrap(); + + mock.assert(); +} + +#[tokio::test] +async fn injects_send_real_http_status_header() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(PUT) + .path("/v0.5/traces") + .header("Datadog-Send-Real-Http-Status", "true"); + then.status(200).body(r#"{}"#); + }); + + common::client_for(&server) + .send_traces( + Bytes::from_static(b""), + 0, + TraceFormat::MsgpackV5, + TraceSendOptions::default(), + ) + .await + .unwrap(); + + mock.assert(); +} + +#[tokio::test] +async fn computed_top_level_injects_header() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(PUT) + .path("/v0.5/traces") + .header("Datadog-Client-Computed-Top-Level", "yes"); + then.status(200).body(r#"{}"#); + }); + + common::client_for(&server) + .send_traces( + Bytes::from_static(b""), + 0, + TraceFormat::MsgpackV5, + TraceSendOptions { + computed_top_level: true, + }, + ) + .await + .unwrap(); + + mock.assert(); +} + +#[tokio::test] +async fn parses_rate_by_service() { + let server = MockServer::start(); + server.mock(|when, then| { + when.method(PUT).path("/v0.5/traces"); + then.status(200) + .body(r#"{"rate_by_service":{"service:env":0.75}}"#); + }); + + let resp = common::client_for(&server) + .send_traces( + Bytes::from_static(b""), + 0, + TraceFormat::MsgpackV5, + TraceSendOptions::default(), + ) + .await + .unwrap(); + + assert_eq!( + resp.rate_by_service + .as_ref() + .and_then(|m| m.get("service:env")), + Some(&0.75) + ); +} + +#[tokio::test] +async fn returns_http_error_on_5xx() { + let server = MockServer::start(); + server.mock(|when, then| { + when.method(PUT).path("/v0.5/traces"); + then.status(503).body("overloaded"); + }); + + let err = common::client_for(&server) + .send_traces( + Bytes::from_static(b""), + 0, + TraceFormat::MsgpackV5, + TraceSendOptions::default(), + ) + .await + .unwrap_err(); + + assert!(matches!( + err, + libdd_agent_client::SendError::HttpError { status: 503, .. } + )); +} diff --git a/libdd-http-client/src/request.rs b/libdd-http-client/src/request.rs index 5b27cc2f1d..3d38384492 100644 --- a/libdd-http-client/src/request.rs +++ b/libdd-http-client/src/request.rs @@ -134,6 +134,13 @@ impl HttpRequest { self } + /// Append headers to this request. + #[inline] + pub fn with_headers<'a, K, V>(mut self, it: impl IntoIterator) -> Self where K: Into, V: Into { + self.headers.extend(it.into_iter().map(|(k, v)| (k.into(), v.into()))); + self + } + /// Set the request body. #[inline] pub fn with_body(mut self, body: impl Into) -> Self {