diff --git a/crates/hotblocks/src/dataset_controller/dataset_controller.rs b/crates/hotblocks/src/dataset_controller/dataset_controller.rs index fc9e7c7..a55a5f9 100644 --- a/crates/hotblocks/src/dataset_controller/dataset_controller.rs +++ b/crates/hotblocks/src/dataset_controller/dataset_controller.rs @@ -2,7 +2,7 @@ use crate::dataset_controller::ingest::ingest; use crate::dataset_controller::ingest_generic::{IngestMessage, NewChunk}; use crate::dataset_controller::write_controller::WriteController; use crate::types::{DBRef, DatasetKind, RetentionStrategy}; -use anyhow::{anyhow, Context}; +use anyhow::{Context, anyhow}; use futures::future::BoxFuture; use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; @@ -15,8 +15,7 @@ use std::time::Duration; use tokio::select; use tokio::task::JoinHandle; use tokio::time::Instant; -use tracing::{debug, error, info, info_span, instrument, warn, Instrument}; - +use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; pub struct DatasetController { dataset_id: DatasetId, @@ -26,10 +25,9 @@ pub struct DatasetController { finalized_head_receiver: tokio::sync::watch::Receiver>, compaction_enabled_sender: tokio::sync::watch::Sender, task: JoinHandle<()>, - compaction_task: JoinHandle<()> + compaction_task: JoinHandle<()>, } - impl Drop for DatasetController { fn drop(&mut self) { self.task.abort(); @@ -37,7 +35,6 @@ impl Drop for DatasetController { } } - impl DatasetController { #[instrument(name = "dataset", skip_all, fields(dataset_id = %dataset_id))] pub fn new( @@ -45,19 +42,23 @@ impl DatasetController { dataset_id: DatasetId, dataset_kind: DatasetKind, retention: RetentionStrategy, - data_sources: Vec - ) -> anyhow::Result - { + data_sources: Vec, + ) -> anyhow::Result { let mut write = WriteController::new(db.clone(), dataset_id, dataset_kind)?; - if let RetentionStrategy::FromBlock { number, parent_hash } = &retention { + if let RetentionStrategy::FromBlock { + number, + parent_hash, + } = &retention + { write.init_retention(*number, parent_hash.clone())?; } let (retention_sender, retention_recv) = tokio::sync::watch::channel(retention); let (head_sender, head_receiver) = tokio::sync::watch::channel(None); let (finalized_head_sender, finalized_head_receiver) = tokio::sync::watch::channel(None); - let (compaction_enabled_sender, compaction_enabled_receiver) = tokio::sync::watch::channel(false); + let (compaction_enabled_sender, compaction_enabled_receiver) = + tokio::sync::watch::channel(false); let _ = head_sender.send(write.head().cloned()); let _ = finalized_head_sender.send(write.finalized_head().cloned()); @@ -69,16 +70,14 @@ impl DatasetController { data_sources, retention_recv, head_sender, - finalized_head_sender + finalized_head_sender, }; let task = tokio::spawn(ctl.run(write).in_current_span()); - let compaction_task = tokio::spawn(compaction_loop( - db, - dataset_id, - compaction_enabled_receiver - ).in_current_span()); + let compaction_task = tokio::spawn( + compaction_loop(db, dataset_id, compaction_enabled_receiver).in_current_span(), + ); Ok(Self { dataset_id, @@ -88,7 +87,7 @@ impl DatasetController { finalized_head_receiver, compaction_enabled_sender, task, - compaction_task + compaction_task, }) } @@ -129,7 +128,7 @@ impl DatasetController { loop { if let Some(block) = recv.borrow_and_update().as_ref() { if block.number >= block_number { - return block.number + return block.number; } } recv.changed().await.unwrap() @@ -141,7 +140,7 @@ impl DatasetController { loop { if let Some(block) = recv.borrow_and_update().as_ref() { if block.number >= block_number { - return block.number + return block.number; } } recv.changed().await.unwrap() @@ -149,7 +148,6 @@ impl DatasetController { } } - struct WriteCtx { db: DBRef, write: WriteController, @@ -157,28 +155,31 @@ struct WriteCtx { finalized_head_sender: tokio::sync::watch::Sender>, } - impl WriteCtx { fn handle_ingest_msg(&mut self, msg: IngestMessage, head: Option) -> anyhow::Result<()> { match msg { IngestMessage::FinalizedHead(head) => { self.write.finalize(&head)?; self.notify_finalized_head(); - }, + } IngestMessage::NewChunk(new_chunk) => { let ctx = format!("failed to write new chunk {}", new_chunk); self.write_new_chunk(new_chunk).context(ctx)?; self.notify_head(); self.notify_finalized_head(); if let Some(n) = head { - if self.write.first_chunk_head().map_or(false, |h| self.write.next_block() - h.number > n) { + if self + .write + .first_chunk_head() + .map_or(false, |h| self.write.next_block() - h.number > n) + { self.retain(self.write.next_block() - n, None)?; } } - }, + } IngestMessage::Fork { prev_blocks, - rollback_sender + rollback_sender, } => { self.write.compute_rollback(&prev_blocks).map(|rollback| { let _ = rollback_sender.send(rollback); @@ -205,10 +206,7 @@ impl WriteCtx { prepared.read(&mut builder, 0, prepared.num_rows())?; - tables.insert( - name.to_string(), - builder.finish()? - ); + tables.insert(name.to_string(), builder.finish()?); } let chunk = Chunk::V1 { @@ -218,13 +216,18 @@ impl WriteCtx { last_block_hash: new_chunk.last_block_hash, first_block_time: new_chunk.first_block_time, last_block_time: new_chunk.last_block_time, - tables + tables, }; - self.write.new_chunk(new_chunk.finalized_head.as_ref(), &chunk) + self.write + .new_chunk(new_chunk.finalized_head.as_ref(), &chunk) } - fn retain(&mut self, from_block: BlockNumber, parent_hash: Option) -> anyhow::Result<()> { + fn retain( + &mut self, + from_block: BlockNumber, + parent_hash: Option, + ) -> anyhow::Result<()> { self.write.retain(from_block, parent_hash)?; self.notify_finalized_head(); self.notify_head(); @@ -236,16 +239,18 @@ impl WriteCtx { } fn notify_finalized_head(&self) { - send_if_new(&self.finalized_head_sender, self.write.finalized_head().cloned()) + send_if_new( + &self.finalized_head_sender, + self.write.finalized_head().cloned(), + ) } fn starts_at(&self, block_number: BlockNumber, parent_hash: &Option) -> bool { - self.write.start_block() == block_number && - self.write.start_block_parent_hash() == parent_hash.as_ref().map(String::as_str) + self.write.start_block() == block_number + && self.write.start_block_parent_hash() == parent_hash.as_ref().map(String::as_str) } } - fn send_if_new(sender: &tokio::sync::watch::Sender, value: T) { sender.send_if_modified(|current| { if current == &value { @@ -257,40 +262,36 @@ fn send_if_new(sender: &tokio::sync::watch::Sender, value: T) { }); } - enum State { Idle, Init { - head: Option + head: Option, }, HeadProbe { future: BoxFuture<'static, BlockNumber>, - head: u64 + head: u64, }, Ingest { handle: IngestHandle, - head: Option + head: Option, }, IngestPause { until: Instant, - head: Option - } + head: Option, + }, } - struct IngestHandle { msg_recv: tokio::sync::mpsc::Receiver, - task: JoinHandle> + task: JoinHandle>, } - impl Drop for IngestHandle { fn drop(&mut self) { self.task.abort() } } - struct Ctl { db: DBRef, dataset_id: DatasetId, @@ -298,10 +299,9 @@ struct Ctl { data_sources: Vec, retention_recv: tokio::sync::watch::Receiver, head_sender: tokio::sync::watch::Sender>, - finalized_head_sender: tokio::sync::watch::Sender> + finalized_head_sender: tokio::sync::watch::Sender>, } - macro_rules! warn_on_tx_restart { ($body:expr) => {{ let before = sqd_storage::db::get_local_tx_restarts(); @@ -315,7 +315,6 @@ macro_rules! warn_on_tx_restart { }}; } - macro_rules! blocking_write { ($write:ident, $body:expr) => {{ let span = tracing::Span::current(); @@ -323,13 +322,14 @@ macro_rules! blocking_write { let _enter = span.enter(); let result = warn_on_tx_restart!($body); (result, $write) - }).await.context("write panicked")?; + }) + .await + .context("write panicked")?; $write = res.1; res.0 }}; } - impl Ctl { async fn run(mut self, write: WriteController) { let mut maybe_write = Some(write); @@ -348,20 +348,25 @@ impl Ctl { let mut write = self.new_write_ctx(maybe_write).await?; macro_rules! blocking { - ($body:expr) => { blocking_write!(write, $body) } + ($body:expr) => { + blocking_write!(write, $body) + }; } // need this variable to please the compiler let retention = self.retention_recv.borrow_and_update().clone(); let mut state = match retention { - RetentionStrategy::FromBlock { number, parent_hash } => { + RetentionStrategy::FromBlock { + number, + parent_hash, + } => { if !write.starts_at(number, &parent_hash) { blocking! { write.retain(number, parent_hash) }?; } State::Init { head: None } - }, + } RetentionStrategy::Head(n) => State::Init { head: Some(n) }, RetentionStrategy::None => { if write.write.head().is_some() { @@ -381,15 +386,15 @@ impl Ctl { state = if let Some(n) = head { State::HeadProbe { future: fetch_chain_top(self.data_sources.clone()).boxed(), - head: *n + head: *n, } } else { State::Ingest { handle: self.spawn_ingest(&write), - head: None + head: None, } } - }, + } State::HeadProbe { future, head } => { select! { biased; @@ -412,7 +417,7 @@ impl Ctl { } } } - }, + } State::Ingest { handle, head } => { select! { biased; @@ -447,7 +452,7 @@ impl Ctl { } } } - }, + } State::IngestPause { until, head } => { select! { biased; @@ -459,7 +464,7 @@ impl Ctl { state = State::Init { head: *head } } } - }, + } State::Idle => { self.retention_recv.changed().await?; write = self.handle_retention_change(&mut state, write).await? @@ -471,32 +476,29 @@ impl Ctl { async fn handle_retention_change( &mut self, state: &mut State, - mut write: WriteCtx - ) -> anyhow::Result - { + mut write: WriteCtx, + ) -> anyhow::Result { // need this variable to please the compiler let retention = self.retention_recv.borrow_and_update().clone(); match retention { - RetentionStrategy::FromBlock { number, parent_hash } => { - let will_erase_head = write.write.head().map_or(false, |h| h.number < number); + RetentionStrategy::FromBlock { + number, + parent_hash, + } => { + let will_erase_head = write.write.head().map_or(false, |h| h.number < number) || // FromBlock is greater than current head, so everything is cleared + write.write.start_block() > number; // FromBlock is less than current front, dropping everything by design blocking_write!(write, write.retain(number, parent_hash))?; match state { - State::Ingest { .. } if !will_erase_head => {}, - _ => *state = State::Init { head: None } - } - }, - RetentionStrategy::Head(n) => { - match state { - State::HeadProbe { head, .. } => { - *head = n - }, - State::Ingest { head, .. } if head.is_some() => { - *head = Some(n) - }, - _ => *state = State::Init { head: Some(n) } + State::Ingest { .. } if !will_erase_head => {} // Keep ingesting, head is valid + _ => *state = State::Init { head: None }, // New ingest needed } + } + RetentionStrategy::Head(n) => match state { + State::HeadProbe { head, .. } => *head = n, + State::Ingest { head, .. } if head.is_some() => *head = Some(n), + _ => *state = State::Init { head: Some(n) }, }, - RetentionStrategy::None => *state = State::Idle + RetentionStrategy::None => *state = State::Idle, } Ok(write) } @@ -512,17 +514,18 @@ impl Ctl { self.data_sources.clone(), self.dataset_kind, write.write.next_block(), - write.write.head_hash() - ).instrument(ingest_span) + write.write.head_hash(), + ) + .instrument(ingest_span), ); - IngestHandle { - msg_recv, - task - } + IngestHandle { msg_recv, task } } - async fn new_write_ctx(&self, maybe_write: Option) -> anyhow::Result { + async fn new_write_ctx( + &self, + maybe_write: Option, + ) -> anyhow::Result { let db = self.db.clone(); let dataset_id = self.dataset_id; let dataset_kind = self.dataset_kind; @@ -534,19 +537,20 @@ impl Ctl { tokio::task::spawn_blocking(move || { let _entered = span.enter(); WriteController::new(db, dataset_id, dataset_kind) - }).await.context("write init task panicked")?? + }) + .await + .context("write init task panicked")?? }; Ok(WriteCtx { db: self.db.clone(), write, head_sender: self.head_sender.clone(), - finalized_head_sender: self.finalized_head_sender.clone() + finalized_head_sender: self.finalized_head_sender.clone(), }) } } - async fn fetch_chain_top(clients: Vec) -> BlockNumber { let mut calls: FuturesUnordered<_> = (0..clients.len()) .map(|i| call_client(&clients, i, false)) @@ -601,25 +605,26 @@ async fn fetch_chain_top(clients: Vec) -> BlockNumber { async fn call_client( clients: &[ReqwestDataClient], idx: usize, - backoff: bool - ) -> (anyhow::Result, usize) - { + backoff: bool, + ) -> (anyhow::Result, usize) { if backoff { tokio::time::sleep(Duration::from_secs(5)).await; } - clients[idx].get_head().map(move |res| { - let res = res.map(|maybe_head| maybe_head.map_or(0, |h| h.number)); - (res, idx) - }).await + clients[idx] + .get_head() + .map(move |res| { + let res = res.map(|maybe_head| maybe_head.map_or(0, |h| h.number)); + (res, idx) + }) + .await } } - #[instrument(name = "compaction", skip_all)] async fn compaction_loop( db: DBRef, dataset_id: DatasetId, - mut enabled: tokio::sync::watch::Receiver + mut enabled: tokio::sync::watch::Receiver, ) { let mut skips = 0; let skip_pause = [1, 2, 3, 4, 5, 10, 20, 30, 60]; @@ -633,9 +638,11 @@ async fn compaction_loop( warn_on_tx_restart! { db.perform_dataset_compaction(dataset_id, None, None, None) } - }).await { + }) + .await + { Ok(res) => res, - Err(err) => Err(anyhow!("failed to await compaction task - {}", err)) + Err(err) => Err(anyhow!("failed to await compaction task - {}", err)), }; match result { @@ -649,17 +656,17 @@ async fn compaction_loop( merged_chunks.len(), ); skips = 0; - }, + } Ok(CompactionStatus::NotingToCompact) => { debug!("nothing to compact"); skips += 1; let pause = skip_pause[std::cmp::min(skips, skip_pause.len() - 1)]; tokio::time::sleep(Duration::from_secs(pause)).await; - }, + } Ok(CompactionStatus::Canceled) => { skips = 0; info!("data changed during compaction"); - }, + } Err(err) => { skips = 0; error!( @@ -671,8 +678,8 @@ async fn compaction_loop( } } else { if enabled.changed().await.is_err() { - return + return; } } } -} \ No newline at end of file +}