Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions dragonfly-client-util/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use local_ip_address::{local_ip, local_ipv6};
use std::net::IpAddr;
use std::path::PathBuf;

#[cfg(target_os = "linux")]
use std::{io, mem, os::unix::io::RawFd};
Expand All @@ -36,6 +37,22 @@ pub fn format_url(scheme: &str, ip: IpAddr, port: u16) -> String {
format!("{}://{}", scheme, format_socket_addr(ip, port))
}

/// scheme_for_tls returns the gRPC dial scheme: `https` when client TLS is fully
/// configured (ca_cert, cert, and key all set, the same gate as
/// load_client_tls_config), otherwise `http`. tonic derives transport security
/// from the URL scheme alone, so it must match whether a TLS config is attached.
pub fn scheme_for_tls(
ca_cert: &Option<PathBuf>,
cert: &Option<PathBuf>,
key: &Option<PathBuf>,
) -> &'static str {
if ca_cert.is_some() && cert.is_some() && key.is_some() {
"https"
} else {
"http"
}
}

/// Get the local IP address of the machine.
///
/// Attempts to retrieve the local IPv4 address first. If unavailable or if the
Expand Down Expand Up @@ -153,4 +170,18 @@ mod tests {
let ip = preferred_local_ip();
assert!(ip.is_some());
}

#[test]
fn test_scheme_for_tls() {
let ca = Some(PathBuf::from("/etc/tls/ca.pem"));
let cert = Some(PathBuf::from("/etc/tls/cert.pem"));
let key = Some(PathBuf::from("/etc/tls/key.pem"));

assert_eq!(scheme_for_tls(&ca, &cert, &key), "https");

assert_eq!(scheme_for_tls(&None, &cert, &key), "http");
assert_eq!(scheme_for_tls(&ca, &None, &key), "http");
assert_eq!(scheme_for_tls(&ca, &cert, &None), "http");
assert_eq!(scheme_for_tls(&None, &None, &None), "http");
}
}
9 changes: 7 additions & 2 deletions dragonfly-client/src/dynconfig/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use dragonfly_api::manager::v2::{
};
use dragonfly_client_config::{dfdaemon::Config, CARGO_PKG_VERSION, GIT_COMMIT_SHORT_HASH};
use dragonfly_client_core::{Error, Result};
use dragonfly_client_util::net::format_url;
use dragonfly_client_util::net::{format_url, scheme_for_tls};
use dragonfly_client_util::shutdown;
use regex::Regex;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -310,8 +310,13 @@ impl Dynconfig {
}
}

let scheme = scheme_for_tls(
&self.config.scheduler.ca_cert,
&self.config.scheduler.cert,
&self.config.scheduler.key,
);
let addr = format_url(
"http",
scheme,
IpAddr::from_str(&scheduler.ip)?,
scheduler.port as u16,
);
Expand Down
149 changes: 57 additions & 92 deletions dragonfly-client/src/grpc/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use dragonfly_api::scheduler::v2::{
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::error::{ErrorType, OrErr};
use dragonfly_client_core::{Error, Result};
use dragonfly_client_util::net::scheme_for_tls;
use hashring::HashRing;
use std::net::{IpAddr, SocketAddr};
use std::str::FromStr;
Expand All @@ -44,7 +45,6 @@ use tokio::task::JoinSet;
use tonic::service::interceptor::InterceptedService;
use tonic::transport::Channel;
use tracing::{debug, error, info, instrument, Instrument};
use url::Url;

use super::interceptor::InjectTracingInterceptor;

Expand Down Expand Up @@ -178,25 +178,14 @@ impl SchedulerClient {

for available_scheduler_addr in available_scheduler_addrs_clone.iter() {
let request = Self::make_request(request.clone());
let config = self.config.clone();
async fn announce_host(
config: Arc<Config>,
addr: SocketAddr,
request: tonic::Request<AnnounceHostRequest>,
) -> Result<()> {
debug!("announce host to {}", addr);

// Connect to the scheduler.
let channel = Channel::from_shared(format!("http://{}", addr))
.map_err(|_| Error::InvalidURI(addr.to_string()))?
.buffer_size(super::BUFFER_SIZE)
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.connect()
.await
.inspect_err(|err| {
error!("connect to {} failed: {}", addr.to_string(), err);
})
.or_err(ErrorType::ConnectError)?;

let channel = connect_to_scheduler(config, addr).await?;
let mut client =
SchedulerGRPCClient::with_interceptor(channel, InjectTracingInterceptor)
.max_decoding_message_size(usize::MAX)
Expand All @@ -205,7 +194,8 @@ impl SchedulerClient {
Ok(())
}

join_set.spawn(announce_host(*available_scheduler_addr, request).in_current_span());
join_set
.spawn(announce_host(config, *available_scheduler_addr, request).in_current_span());
}

while let Some(message) = join_set
Expand All @@ -232,25 +222,14 @@ impl SchedulerClient {

for available_scheduler_addr in available_scheduler_addrs_clone.iter() {
let request = Self::make_request(request.clone());
let config = self.config.clone();
async fn announce_host(
config: Arc<Config>,
addr: SocketAddr,
request: tonic::Request<AnnounceHostRequest>,
) -> Result<()> {
info!("announce host to {:?}", addr);

// Connect to the scheduler.
let channel = Channel::from_shared(format!("http://{}", addr))
.map_err(|_| Error::InvalidURI(addr.to_string()))?
.buffer_size(super::BUFFER_SIZE)
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.connect()
.await
.inspect_err(|err| {
error!("connect to {} failed: {}", addr.to_string(), err);
})
.or_err(ErrorType::ConnectError)?;

let channel = connect_to_scheduler(config, addr).await?;
let mut client =
SchedulerGRPCClient::with_interceptor(channel, InjectTracingInterceptor)
.max_decoding_message_size(usize::MAX)
Expand All @@ -259,7 +238,8 @@ impl SchedulerClient {
Ok(())
}

join_set.spawn(announce_host(*available_scheduler_addr, request).in_current_span());
join_set
.spawn(announce_host(config, *available_scheduler_addr, request).in_current_span());
}

while let Some(message) = join_set
Expand Down Expand Up @@ -291,25 +271,14 @@ impl SchedulerClient {

for available_scheduler_addr in available_scheduler_addrs_clone.iter() {
let request = Self::make_request(request.clone());
let config = self.config.clone();
async fn delete_host(
config: Arc<Config>,
addr: SocketAddr,
request: tonic::Request<DeleteHostRequest>,
) -> Result<()> {
info!("delete host from {}", addr);

// Connect to the scheduler.
let channel = Channel::from_shared(format!("http://{}", addr))
.map_err(|_| Error::InvalidURI(addr.to_string()))?
.buffer_size(super::BUFFER_SIZE)
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.connect()
.await
.inspect_err(|err| {
error!("connect to {} failed: {}", addr.to_string(), err);
})
.or_err(ErrorType::ConnectError)?;

let channel = connect_to_scheduler(config, addr).await?;
let mut client =
SchedulerGRPCClient::with_interceptor(channel, InjectTracingInterceptor)
.max_decoding_message_size(usize::MAX)
Expand All @@ -318,7 +287,8 @@ impl SchedulerClient {
Ok(())
}

join_set.spawn(delete_host(*available_scheduler_addr, request).in_current_span());
join_set
.spawn(delete_host(config, *available_scheduler_addr, request).in_current_span());
}

while let Some(message) = join_set
Expand Down Expand Up @@ -595,52 +565,7 @@ impl SchedulerClient {
drop(addrs);
info!("picked {:?}", addr);

let addr = format!("http://{}", addr);
let domain_name = Url::parse(addr.as_str())?
.host_str()
.ok_or(Error::InvalidParameter)
.inspect_err(|_err| {
error!("invalid address: {}", addr);
})?
.to_string();

let channel = match self
.config
.scheduler
.load_client_tls_config(domain_name.as_str())
.await?
{
Some(client_tls_config) => Channel::from_shared(addr.clone())
.map_err(|_| Error::InvalidURI(addr.clone()))?
.tls_config(client_tls_config)?
.buffer_size(super::BUFFER_SIZE)
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.tcp_keepalive(Some(super::TCP_KEEPALIVE))
.http2_keep_alive_interval(super::HTTP2_KEEP_ALIVE_INTERVAL)
.keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT)
.connect()
.await
.inspect_err(|err| {
error!("connect to {} failed: {}", addr.to_string(), err);
})
.or_err(ErrorType::ConnectError)?,
None => Channel::from_shared(addr.clone())
.map_err(|_| Error::InvalidURI(addr.clone()))?
.buffer_size(super::BUFFER_SIZE)
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.tcp_keepalive(Some(super::TCP_KEEPALIVE))
.http2_keep_alive_interval(super::HTTP2_KEEP_ALIVE_INTERVAL)
.keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT)
.connect()
.await
.inspect_err(|err| {
error!("connect to {} failed: {}", addr.to_string(), err);
})
.or_err(ErrorType::ConnectError)?,
};

let channel = connect_to_scheduler(self.config.clone(), addr.addr).await?;
Ok(
SchedulerGRPCClient::with_interceptor(channel, InjectTracingInterceptor)
.max_decoding_message_size(usize::MAX)
Expand Down Expand Up @@ -751,3 +676,43 @@ impl SchedulerClient {
request
}
}

/// connect_to_scheduler opens a gRPC channel to a scheduler address, using
/// mTLS when scheduler TLS credentials are configured. The URL scheme reflects
/// the TLS config because tonic's `Channel` derives transport behavior from
/// the scheme; an `http://` URL with `tls_config(...)` still dials cleartext.
async fn connect_to_scheduler(config: Arc<Config>, addr: SocketAddr) -> Result<Channel> {
let scheme = scheme_for_tls(
&config.scheduler.ca_cert,
&config.scheduler.cert,
&config.scheduler.key,
);
let domain_name = addr.ip().to_string();
let addr = format!("{}://{}", scheme, addr);

let mut endpoint = Channel::from_shared(addr.clone())
.map_err(|_| Error::InvalidURI(addr.clone()))?
.buffer_size(super::BUFFER_SIZE)
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.tcp_keepalive(Some(super::TCP_KEEPALIVE))
.http2_keep_alive_interval(super::HTTP2_KEEP_ALIVE_INTERVAL)
.keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT);

if let Some(client_tls_config) = config
.scheduler
.load_client_tls_config(domain_name.as_str())
.await?
{
endpoint = endpoint.tls_config(client_tls_config)?;
}

endpoint
.connect()
.await
.inspect_err(|err| {
error!("connect to {} failed: {}", addr, err);
})
.or_err(ErrorType::ConnectError)
.map_err(Into::into)
}
23 changes: 19 additions & 4 deletions dragonfly-client/src/resource/parent_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use dragonfly_api::dfdaemon::v2::SyncHostRequest;
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::Result;
use dragonfly_client_util::id_generator::IDGenerator;
use dragonfly_client_util::net::format_url;
use dragonfly_client_util::net::{format_url, scheme_for_tls};
use dragonfly_client_util::shutdown::{self, Shutdown};
use rand::distr::weighted::WeightedIndex;
use rand::distr::Distribution;
Expand Down Expand Up @@ -219,10 +219,15 @@ impl ParentSelector {
continue;
}

let scheme = scheme_for_tls(
&self.config.upload.client.ca_cert,
&self.config.upload.client.cert,
&self.config.upload.client.key,
);
let dfdaemon_upload_client = match DfdaemonUploadClient::new(
self.config.clone(),
format_url(
"http",
scheme,
IpAddr::from_str(&parent_host.ip)?,
parent_host.port as u16,
),
Expand Down Expand Up @@ -541,10 +546,15 @@ impl PersistentParentSelector {
continue;
}

let scheme = scheme_for_tls(
&self.config.upload.client.ca_cert,
&self.config.upload.client.cert,
&self.config.upload.client.key,
);
let dfdaemon_upload_client = match DfdaemonUploadClient::new(
self.config.clone(),
format_url(
"http",
scheme,
IpAddr::from_str(&parent_host.ip)?,
parent_host.port as u16,
),
Expand Down Expand Up @@ -866,10 +876,15 @@ impl PersistentCacheParentSelector {
continue;
}

let scheme = scheme_for_tls(
&self.config.upload.client.ca_cert,
&self.config.upload.client.cert,
&self.config.upload.client.key,
);
let dfdaemon_upload_client = match DfdaemonUploadClient::new(
self.config.clone(),
format_url(
"http",
scheme,
IpAddr::from_str(&parent_host.ip)?,
parent_host.port as u16,
),
Expand Down
Loading