Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ rust-version = "1.91.0"
version = "0.1.0"

[workspace.dependencies]
anyhow = "1.0.98"
arc-swap = "1.7"
async-compression = { version = "0.4", default-features = false }
async-stream = "0.3"
Expand Down
1 change: 1 addition & 0 deletions subprojects/crates/binary-cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@ harmonia-utils-io.workspace = true
harmonia-utils-signature.workspace = true

[dev-dependencies]
eyre = "0.6"
hydra-tracing.workspace = true
tempfile = "3.23.0"
2 changes: 1 addition & 1 deletion subprojects/crates/binary-cache/examples/download_file.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use binary_cache::S3BinaryCacheClient;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
async fn main() -> eyre::Result<()> {
let _tracing_guard = hydra_tracing::init()?;
let client = S3BinaryCacheClient::new(
"s3://store?region=unknown&endpoint=http://localhost:9000&scheme=http&write-nar-listing=1&ls-compression=br&log-compression=br&profile=local_nix_store".parse()?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use binary_cache::S3BinaryCacheClient;
use harmonia_store_path::StorePath;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
async fn main() -> eyre::Result<()> {
let _tracing_guard = hydra_tracing::init()?;
let nix_config = daemon_client_utils::parse_nix_remote().unwrap();
let store = harmonia_store_remote::ConnectionPool::new(
Expand Down
4 changes: 2 additions & 2 deletions subprojects/crates/binary-cache/examples/simple_presigned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use binary_cache::{PresignedUploadClient, S3BinaryCacheClient, path_to_narinfo};
use harmonia_store_path::StorePath;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
async fn main() -> eyre::Result<()> {
let now = std::time::Instant::now();

let _tracing_guard = hydra_tracing::init()?;
Expand Down Expand Up @@ -51,7 +51,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
client
.upload_narinfo_after_presigned_upload(&store, narinfo)
.await?;
Ok::<(), Box<dyn std::error::Error>>(())
Ok::<(), eyre::Report>(())
}
})
.buffered(10);
Expand Down
2 changes: 1 addition & 1 deletion subprojects/crates/binary-cache/examples/upload_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use binary_cache::S3BinaryCacheClient;
use harmonia_store_path::StorePath;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
async fn main() -> eyre::Result<()> {
let now = std::time::Instant::now();

let _tracing_guard = hydra_tracing::init()?;
Expand Down
2 changes: 1 addition & 1 deletion subprojects/crates/binary-cache/examples/upload_logs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use binary_cache::S3BinaryCacheClient;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
async fn main() -> eyre::Result<()> {
let _tracing_guard = hydra_tracing::init()?;
let client = S3BinaryCacheClient::new(
"s3://store?region=unknown&endpoint=http://localhost:9000&scheme=http&write-nar-listing=1&ls-compression=br&log-compression=br&profile=local_nix_store".parse()?,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use binary_cache::S3BinaryCacheClient;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
async fn main() -> eyre::Result<()> {
let _tracing_guard = hydra_tracing::init()?;
let nix_config = daemon_client_utils::parse_nix_remote().unwrap();
let _local = harmonia_store_remote::ConnectionPool::new(
Expand Down
6 changes: 6 additions & 0 deletions subprojects/crates/db/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,9 @@ impl From<std::num::TryFromIntError> for Error {
}

pub type Result<T> = std::result::Result<T, Error>;

/// Error from parsing a database connection URL. This is a config
/// error, not a connection error — no database was contacted.
#[derive(Debug, thiserror::Error)]
#[error("bad database configuration: {0}")]
pub struct DbConfigurationError(pub Box<dyn std::error::Error + Send + Sync>);
26 changes: 20 additions & 6 deletions subprojects/crates/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub mod models;
use std::str::FromStr as _;

pub use connection::{Connection, Transaction};
pub use error::{DataError, Error, Result};
pub use error::{DataError, DbConfigurationError, Error, Result};
pub use harmonia_store_path::StoreDir;

#[derive(Debug, Clone)]
Expand All @@ -43,12 +43,26 @@ impl Database {
Ok(Connection::new(conn))
}

/// Re-configure the connection pool with a new URL.
///
/// This only parses and stores the new options — it does **not**
/// contact the database.
// TODO: ability to change max_connections by dropping the pool and recreating it
#[tracing::instrument(skip(self, url), err)]
pub fn reconfigure_pool(&self, url: &str) -> Result<()> {
// TODO: ability to change max_connections by dropping the pool and recreating it
self.pool
.set_connect_options(sqlx::postgres::PgConnectOptions::from_str(url)?);
Ok(())
pub fn reconfigure_pool(&self, url: &str) -> std::result::Result<(), DbConfigurationError> {
match sqlx::postgres::PgConnectOptions::from_str(url) {
Ok(options) => {
self.pool.set_connect_options(options);
Ok(())
}
Err(sqlx::Error::Configuration(e)) => Err(DbConfigurationError(e)),
Err(e) => {
// PgConnectOptions::from_str only produces Configuration
// errors. If this changes in a future sqlx version, fail
// loudly rather than silently swallowing it.
panic!("unexpected error from PgConnectOptions::from_str: {e}")
}
}
}

pub async fn listener(
Expand Down
2 changes: 1 addition & 1 deletion subprojects/crates/nix-support/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ mod tests {
let fs = FilesystemOperations {
real_store_dir: store_dir.to_path().to_owned(),
};
let bp = Box::pin(parse_build_product(&store_dir, &fs, line)).await;
let bp = parse_build_product(&store_dir, &fs, line).await;
assert!(bp.is_none());
}

Expand Down
1 change: 0 additions & 1 deletion subprojects/hydra-builder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ rust-version.workspace = true
sd-notify.workspace = true
tracing.workspace = true

anyhow.workspace = true
clap = { workspace = true, features = [ "derive" ] }
color-eyre.workspace = true
fs-err = { workspace = true, features = [ "tokio" ] }
Expand Down
8 changes: 5 additions & 3 deletions subprojects/hydra-builder/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use color_eyre::eyre;

#[derive(Debug, thiserror::Error)]
pub enum BuilderError {
#[error("environment variable {0} not set")]
Expand All @@ -22,7 +24,7 @@ pub enum BuilderError {
ParseNixStore(String),

#[error("Loading Nix configuration")]
LoadNixConfig(#[source] anyhow::Error),
LoadNixConfig(#[source] eyre::Report),

#[error("Gateway API missing host")]
GatewayMissingHost,
Expand All @@ -43,13 +45,13 @@ pub enum BuilderError {
VersionIncompatible(String),

#[error("Reading system information")]
ReadingSystemInfo(#[source] anyhow::Error),
ReadingSystemInfo(#[source] eyre::Report),

#[error("Failed to communicate {0} times over the channel. Terminating the application.")]
RepeatedFailure(u32),

#[error("While handling request")]
HandlingRequest(#[source] anyhow::Error),
HandlingRequest(#[source] eyre::Report),

#[error("Task failed")]
Task(#[from] tokio::task::JoinError),
Expand Down
39 changes: 23 additions & 16 deletions subprojects/hydra-builder/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::atomic::Ordering;
use tonic::{Request, service::interceptor::InterceptedService, transport::Channel};

use harmonia_store_path::StorePath;

use hydra_proto::{
BuilderRequest, VersionCheckRequest, builder_request, runner_request,
runner_service_client::RunnerServiceClient,
Expand Down Expand Up @@ -195,12 +196,12 @@ async fn handle_request(
runner_request::Message::Build(m) => {
state
.schedule_build(m)
.map_err(BuilderError::HandlingRequest)?;
.map_err(|e| BuilderError::HandlingRequest(e.into()))?;
}
runner_request::Message::Abort(m) => {
state
.abort_build(&m)
.map_err(BuilderError::HandlingRequest)?;
.map_err(|e| BuilderError::HandlingRequest(e.into()))?;
}
}
Ok(())
Expand Down Expand Up @@ -244,8 +245,11 @@ pub async fn start_bidirectional_stream(
let join_msg = state
.get_join_message()
.await
.map_err(BuilderError::ReadingSystemInfo)?;
.map_err(|e| BuilderError::ReadingSystemInfo(e.into()))?;
let state2 = state.clone();
let ping_error: Arc<std::sync::Mutex<Option<BuilderError>>> =
Arc::new(std::sync::Mutex::new(None));
let ping_error2 = ping_error.clone();
let ping_stream = async_stream::stream! {
yield BuilderRequest {
message: Some(builder_request::Message::Join(join_msg))
Expand All @@ -256,16 +260,17 @@ pub async fn start_bidirectional_stream(
interval.tick().await;

let ping = match state.get_ping_message() {
Ok(v) => builder_request::Message::Ping(v),
Ok(v) => v,
Err(e) => {
tracing::error!("Failed to construct ping message: {e}");
continue
*ping_error2.lock().expect("ping error mutex poisoned") =
Some(BuilderError::ReadingSystemInfo(e.into()));
break;
},
};
tracing::debug!("sending ping: {ping:?}");

yield BuilderRequest {
message: Some(ping)
message: Some(builder_request::Message::Ping(ping))
};
}
};
Expand All @@ -283,26 +288,28 @@ pub async fn start_bidirectional_stream(
}
};

let mut consecutive_failure_count = 0;
let mut consecutive_failure_count: u32 = 0;
while let Some(item) = stream.next().await {
match item.map(|v| v.message) {
Ok(Some(v)) => {
consecutive_failure_count = 0;
if let Err(err) = handle_request(state2.clone(), v).await {
tracing::error!("Failed to correctly handle request: {err}");
}
}
Ok(None) => {
let item = match item {
Ok(v) => {
consecutive_failure_count = 0;
v
}
Err(e) => {
consecutive_failure_count += 1;
tracing::error!("stream message delivery failed: {e}");
if consecutive_failure_count == 10 {
return Err(BuilderError::RepeatedFailure(consecutive_failure_count));
}
continue;
}
};
if let Some(msg) = item.message {
handle_request(state2.clone(), msg).await?;
}
}
if let Some(e) = ping_error.lock().expect("ping error mutex poisoned").take() {
return Err(e);
}
Ok(())
}
22 changes: 17 additions & 5 deletions subprojects/hydra-builder/src/nix_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@

use std::collections::HashMap;

/// Errors from loading nix configuration.
#[derive(Debug, thiserror::Error)]
pub enum NixConfigError {
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
Json(#[from] serde_json::Error),
#[error("nix show-config failed: {0}")]
Command(String),
}

/// Cached nix configuration values.
#[derive(Debug, Clone)]
pub struct NixConfig {
Expand All @@ -10,7 +21,7 @@ pub struct NixConfig {

impl NixConfig {
/// Read nix configuration by running `nix show-config --json`.
pub fn load() -> anyhow::Result<Self> {
pub fn load() -> Result<Self, NixConfigError> {
let output = std::process::Command::new("nix")
.args([
"--extra-experimental-features",
Expand All @@ -20,10 +31,11 @@ impl NixConfig {
])
.output()?;
if !output.status.success() {
anyhow::bail!(
"nix show-config failed: {}",
str::from_utf8(&output.stderr).unwrap_or("Invalid UTF-8")
);
return Err(NixConfigError::Command(
str::from_utf8(&output.stderr)
.unwrap_or("Invalid UTF-8")
.to_owned(),
));
}
let values: HashMap<String, serde_json::Value> = serde_json::from_slice(&output.stdout)?;
Ok(Self { values })
Expand Down
Loading
Loading