diff --git a/dragonfly-client-util/src/net/mod.rs b/dragonfly-client-util/src/net/mod.rs index 5ad5db9d..b432daa5 100644 --- a/dragonfly-client-util/src/net/mod.rs +++ b/dragonfly-client-util/src/net/mod.rs @@ -16,6 +16,7 @@ use local_ip_address::{local_ip, local_ipv6}; use std::net::IpAddr; +use std::path::PathBuf; #[cfg(target_os = "linux")] use std::{io, mem, os::unix::io::RawFd}; @@ -36,6 +37,22 @@ pub fn format_url(scheme: &str, ip: IpAddr, port: u16) -> String { format!("{}://{}", scheme, format_socket_addr(ip, port)) } +/// scheme_for_tls returns the gRPC dial scheme: `https` when client TLS is fully +/// configured (ca_cert, cert, and key all set, the same gate as +/// load_client_tls_config), otherwise `http`. tonic derives transport security +/// from the URL scheme alone, so it must match whether a TLS config is attached. +pub fn scheme_for_tls( + ca_cert: &Option, + cert: &Option, + key: &Option, +) -> &'static str { + if ca_cert.is_some() && cert.is_some() && key.is_some() { + "https" + } else { + "http" + } +} + /// Get the local IP address of the machine. /// /// Attempts to retrieve the local IPv4 address first. If unavailable or if the @@ -153,4 +170,18 @@ mod tests { let ip = preferred_local_ip(); assert!(ip.is_some()); } + + #[test] + fn test_scheme_for_tls() { + let ca = Some(PathBuf::from("/etc/tls/ca.pem")); + let cert = Some(PathBuf::from("/etc/tls/cert.pem")); + let key = Some(PathBuf::from("/etc/tls/key.pem")); + + assert_eq!(scheme_for_tls(&ca, &cert, &key), "https"); + + assert_eq!(scheme_for_tls(&None, &cert, &key), "http"); + assert_eq!(scheme_for_tls(&ca, &None, &key), "http"); + assert_eq!(scheme_for_tls(&ca, &cert, &None), "http"); + assert_eq!(scheme_for_tls(&None, &None, &None), "http"); + } } diff --git a/dragonfly-client/src/dynconfig/mod.rs b/dragonfly-client/src/dynconfig/mod.rs index 60261f49..b60e40b0 100644 --- a/dragonfly-client/src/dynconfig/mod.rs +++ b/dragonfly-client/src/dynconfig/mod.rs @@ -21,7 +21,7 @@ use dragonfly_api::manager::v2::{ }; use dragonfly_client_config::{dfdaemon::Config, CARGO_PKG_VERSION, GIT_COMMIT_SHORT_HASH}; use dragonfly_client_core::{Error, Result}; -use dragonfly_client_util::net::format_url; +use dragonfly_client_util::net::{format_url, scheme_for_tls}; use dragonfly_client_util::shutdown; use regex::Regex; use serde::{Deserialize, Serialize}; @@ -310,8 +310,13 @@ impl Dynconfig { } } + let scheme = scheme_for_tls( + &self.config.scheduler.ca_cert, + &self.config.scheduler.cert, + &self.config.scheduler.key, + ); let addr = format_url( - "http", + scheme, IpAddr::from_str(&scheduler.ip)?, scheduler.port as u16, ); diff --git a/dragonfly-client/src/grpc/scheduler.rs b/dragonfly-client/src/grpc/scheduler.rs index fa8aa7fd..28565e10 100644 --- a/dragonfly-client/src/grpc/scheduler.rs +++ b/dragonfly-client/src/grpc/scheduler.rs @@ -35,6 +35,7 @@ use dragonfly_api::scheduler::v2::{ use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_core::error::{ErrorType, OrErr}; use dragonfly_client_core::{Error, Result}; +use dragonfly_client_util::net::scheme_for_tls; use hashring::HashRing; use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; @@ -44,7 +45,6 @@ use tokio::task::JoinSet; use tonic::service::interceptor::InterceptedService; use tonic::transport::Channel; use tracing::{debug, error, info, instrument, Instrument}; -use url::Url; use super::interceptor::InjectTracingInterceptor; @@ -178,25 +178,14 @@ impl SchedulerClient { for available_scheduler_addr in available_scheduler_addrs_clone.iter() { let request = Self::make_request(request.clone()); + let config = self.config.clone(); async fn announce_host( + config: Arc, addr: SocketAddr, request: tonic::Request, ) -> Result<()> { debug!("announce host to {}", addr); - - // Connect to the scheduler. - let channel = Channel::from_shared(format!("http://{}", addr)) - .map_err(|_| Error::InvalidURI(addr.to_string()))? - .buffer_size(super::BUFFER_SIZE) - .connect_timeout(super::CONNECT_TIMEOUT) - .timeout(super::REQUEST_TIMEOUT) - .connect() - .await - .inspect_err(|err| { - error!("connect to {} failed: {}", addr.to_string(), err); - }) - .or_err(ErrorType::ConnectError)?; - + let channel = connect_to_scheduler(config, addr).await?; let mut client = SchedulerGRPCClient::with_interceptor(channel, InjectTracingInterceptor) .max_decoding_message_size(usize::MAX) @@ -205,7 +194,8 @@ impl SchedulerClient { Ok(()) } - join_set.spawn(announce_host(*available_scheduler_addr, request).in_current_span()); + join_set + .spawn(announce_host(config, *available_scheduler_addr, request).in_current_span()); } while let Some(message) = join_set @@ -232,25 +222,14 @@ impl SchedulerClient { for available_scheduler_addr in available_scheduler_addrs_clone.iter() { let request = Self::make_request(request.clone()); + let config = self.config.clone(); async fn announce_host( + config: Arc, addr: SocketAddr, request: tonic::Request, ) -> Result<()> { info!("announce host to {:?}", addr); - - // Connect to the scheduler. - let channel = Channel::from_shared(format!("http://{}", addr)) - .map_err(|_| Error::InvalidURI(addr.to_string()))? - .buffer_size(super::BUFFER_SIZE) - .connect_timeout(super::CONNECT_TIMEOUT) - .timeout(super::REQUEST_TIMEOUT) - .connect() - .await - .inspect_err(|err| { - error!("connect to {} failed: {}", addr.to_string(), err); - }) - .or_err(ErrorType::ConnectError)?; - + let channel = connect_to_scheduler(config, addr).await?; let mut client = SchedulerGRPCClient::with_interceptor(channel, InjectTracingInterceptor) .max_decoding_message_size(usize::MAX) @@ -259,7 +238,8 @@ impl SchedulerClient { Ok(()) } - join_set.spawn(announce_host(*available_scheduler_addr, request).in_current_span()); + join_set + .spawn(announce_host(config, *available_scheduler_addr, request).in_current_span()); } while let Some(message) = join_set @@ -291,25 +271,14 @@ impl SchedulerClient { for available_scheduler_addr in available_scheduler_addrs_clone.iter() { let request = Self::make_request(request.clone()); + let config = self.config.clone(); async fn delete_host( + config: Arc, addr: SocketAddr, request: tonic::Request, ) -> Result<()> { info!("delete host from {}", addr); - - // Connect to the scheduler. - let channel = Channel::from_shared(format!("http://{}", addr)) - .map_err(|_| Error::InvalidURI(addr.to_string()))? - .buffer_size(super::BUFFER_SIZE) - .connect_timeout(super::CONNECT_TIMEOUT) - .timeout(super::REQUEST_TIMEOUT) - .connect() - .await - .inspect_err(|err| { - error!("connect to {} failed: {}", addr.to_string(), err); - }) - .or_err(ErrorType::ConnectError)?; - + let channel = connect_to_scheduler(config, addr).await?; let mut client = SchedulerGRPCClient::with_interceptor(channel, InjectTracingInterceptor) .max_decoding_message_size(usize::MAX) @@ -318,7 +287,8 @@ impl SchedulerClient { Ok(()) } - join_set.spawn(delete_host(*available_scheduler_addr, request).in_current_span()); + join_set + .spawn(delete_host(config, *available_scheduler_addr, request).in_current_span()); } while let Some(message) = join_set @@ -595,52 +565,7 @@ impl SchedulerClient { drop(addrs); info!("picked {:?}", addr); - let addr = format!("http://{}", addr); - let domain_name = Url::parse(addr.as_str())? - .host_str() - .ok_or(Error::InvalidParameter) - .inspect_err(|_err| { - error!("invalid address: {}", addr); - })? - .to_string(); - - let channel = match self - .config - .scheduler - .load_client_tls_config(domain_name.as_str()) - .await? - { - Some(client_tls_config) => Channel::from_shared(addr.clone()) - .map_err(|_| Error::InvalidURI(addr.clone()))? - .tls_config(client_tls_config)? - .buffer_size(super::BUFFER_SIZE) - .connect_timeout(super::CONNECT_TIMEOUT) - .timeout(super::REQUEST_TIMEOUT) - .tcp_keepalive(Some(super::TCP_KEEPALIVE)) - .http2_keep_alive_interval(super::HTTP2_KEEP_ALIVE_INTERVAL) - .keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT) - .connect() - .await - .inspect_err(|err| { - error!("connect to {} failed: {}", addr.to_string(), err); - }) - .or_err(ErrorType::ConnectError)?, - None => Channel::from_shared(addr.clone()) - .map_err(|_| Error::InvalidURI(addr.clone()))? - .buffer_size(super::BUFFER_SIZE) - .connect_timeout(super::CONNECT_TIMEOUT) - .timeout(super::REQUEST_TIMEOUT) - .tcp_keepalive(Some(super::TCP_KEEPALIVE)) - .http2_keep_alive_interval(super::HTTP2_KEEP_ALIVE_INTERVAL) - .keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT) - .connect() - .await - .inspect_err(|err| { - error!("connect to {} failed: {}", addr.to_string(), err); - }) - .or_err(ErrorType::ConnectError)?, - }; - + let channel = connect_to_scheduler(self.config.clone(), addr.addr).await?; Ok( SchedulerGRPCClient::with_interceptor(channel, InjectTracingInterceptor) .max_decoding_message_size(usize::MAX) @@ -751,3 +676,43 @@ impl SchedulerClient { request } } + +/// connect_to_scheduler opens a gRPC channel to a scheduler address, using +/// mTLS when scheduler TLS credentials are configured. The URL scheme reflects +/// the TLS config because tonic's `Channel` derives transport behavior from +/// the scheme; an `http://` URL with `tls_config(...)` still dials cleartext. +async fn connect_to_scheduler(config: Arc, addr: SocketAddr) -> Result { + let scheme = scheme_for_tls( + &config.scheduler.ca_cert, + &config.scheduler.cert, + &config.scheduler.key, + ); + let domain_name = addr.ip().to_string(); + let addr = format!("{}://{}", scheme, addr); + + let mut endpoint = Channel::from_shared(addr.clone()) + .map_err(|_| Error::InvalidURI(addr.clone()))? + .buffer_size(super::BUFFER_SIZE) + .connect_timeout(super::CONNECT_TIMEOUT) + .timeout(super::REQUEST_TIMEOUT) + .tcp_keepalive(Some(super::TCP_KEEPALIVE)) + .http2_keep_alive_interval(super::HTTP2_KEEP_ALIVE_INTERVAL) + .keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT); + + if let Some(client_tls_config) = config + .scheduler + .load_client_tls_config(domain_name.as_str()) + .await? + { + endpoint = endpoint.tls_config(client_tls_config)?; + } + + endpoint + .connect() + .await + .inspect_err(|err| { + error!("connect to {} failed: {}", addr, err); + }) + .or_err(ErrorType::ConnectError) + .map_err(Into::into) +} diff --git a/dragonfly-client/src/resource/parent_selector.rs b/dragonfly-client/src/resource/parent_selector.rs index e179d230..cc857037 100644 --- a/dragonfly-client/src/resource/parent_selector.rs +++ b/dragonfly-client/src/resource/parent_selector.rs @@ -22,7 +22,7 @@ use dragonfly_api::dfdaemon::v2::SyncHostRequest; use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_core::Result; use dragonfly_client_util::id_generator::IDGenerator; -use dragonfly_client_util::net::format_url; +use dragonfly_client_util::net::{format_url, scheme_for_tls}; use dragonfly_client_util::shutdown::{self, Shutdown}; use rand::distr::weighted::WeightedIndex; use rand::distr::Distribution; @@ -219,10 +219,15 @@ impl ParentSelector { continue; } + let scheme = scheme_for_tls( + &self.config.upload.client.ca_cert, + &self.config.upload.client.cert, + &self.config.upload.client.key, + ); let dfdaemon_upload_client = match DfdaemonUploadClient::new( self.config.clone(), format_url( - "http", + scheme, IpAddr::from_str(&parent_host.ip)?, parent_host.port as u16, ), @@ -541,10 +546,15 @@ impl PersistentParentSelector { continue; } + let scheme = scheme_for_tls( + &self.config.upload.client.ca_cert, + &self.config.upload.client.cert, + &self.config.upload.client.key, + ); let dfdaemon_upload_client = match DfdaemonUploadClient::new( self.config.clone(), format_url( - "http", + scheme, IpAddr::from_str(&parent_host.ip)?, parent_host.port as u16, ), @@ -866,10 +876,15 @@ impl PersistentCacheParentSelector { continue; } + let scheme = scheme_for_tls( + &self.config.upload.client.ca_cert, + &self.config.upload.client.cert, + &self.config.upload.client.key, + ); let dfdaemon_upload_client = match DfdaemonUploadClient::new( self.config.clone(), format_url( - "http", + scheme, IpAddr::from_str(&parent_host.ip)?, parent_host.port as u16, ), diff --git a/dragonfly-client/src/resource/piece_collector.rs b/dragonfly-client/src/resource/piece_collector.rs index ffe544ca..e4aaafe1 100644 --- a/dragonfly-client/src/resource/piece_collector.rs +++ b/dragonfly-client/src/resource/piece_collector.rs @@ -23,7 +23,7 @@ use dragonfly_api::dfdaemon::v2::{ use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_core::{Error, Result}; use dragonfly_client_storage::metadata; -use dragonfly_client_util::net::format_url; +use dragonfly_client_util::net::{format_url, scheme_for_tls}; use std::net::IpAddr; use std::str::FromStr; use std::sync::Arc; @@ -211,10 +211,15 @@ impl PieceCollector { Error::InvalidPeer(parent.id.clone()) })?; + let scheme = scheme_for_tls( + &config.upload.client.ca_cert, + &config.upload.client.cert, + &config.upload.client.key, + ); // Create a dfdaemon client. let dfdaemon_upload_client = DfdaemonUploadClient::new( config, - format_url("http", IpAddr::from_str(&host.ip)?, host.port as u16), + format_url(scheme, IpAddr::from_str(&host.ip)?, host.port as u16), false, ) .await @@ -505,10 +510,15 @@ impl PersistentPieceCollector { Error::InvalidPeer(parent.id.clone()) })?; + let scheme = scheme_for_tls( + &config.upload.client.ca_cert, + &config.upload.client.cert, + &config.upload.client.key, + ); // Create a dfdaemon client. let dfdaemon_upload_client = DfdaemonUploadClient::new( config, - format_url("http", IpAddr::from_str(&host.ip)?, host.port as u16), + format_url(scheme, IpAddr::from_str(&host.ip)?, host.port as u16), false, ) .await @@ -807,10 +817,15 @@ impl PersistentCachePieceCollector { Error::InvalidPeer(parent.id.clone()) })?; + let scheme = scheme_for_tls( + &config.upload.client.ca_cert, + &config.upload.client.cert, + &config.upload.client.key, + ); // Create a dfdaemon client. let dfdaemon_upload_client = DfdaemonUploadClient::new( config, - format_url("http", IpAddr::from_str(&host.ip)?, host.port as u16), + format_url(scheme, IpAddr::from_str(&host.ip)?, host.port as u16), false, ) .await