Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1532,7 +1532,10 @@ impl Catalog {
CatalogError,
> {
let updates = self.storage().await.sync_to_current_updates().await?;
let (builtin_table_updates, catalog_updates) = self.state.apply_updates(updates)?;
let (builtin_table_updates, catalog_updates) = self
.state
.apply_updates(updates, &mut state::LocalExpressionCache::Closed)
.await;
Ok((builtin_table_updates, catalog_updates))
}
}
Expand Down
192 changes: 74 additions & 118 deletions src/adapter/src/catalog/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,9 @@ impl CatalogState {
/// Update in-memory catalog state from a list of updates made to the durable catalog state.
///
/// Returns builtin table updates corresponding to the changes to catalog state.
///
/// This is meant specifically for bootstrapping because it batches and applies builtin view
/// additions separately from other update types.
#[must_use]
#[instrument]
pub(crate) async fn apply_updates_for_bootstrap(
pub(crate) async fn apply_updates(
&mut self,
updates: Vec<StateUpdate>,
local_expression_cache: &mut LocalExpressionCache,
Expand All @@ -109,21 +106,31 @@ impl CatalogState {
) {
let mut builtin_table_updates = Vec::with_capacity(updates.len());
let mut catalog_updates = Vec::with_capacity(updates.len());
let updates = sort_updates(updates);

// First, consolidate updates. The code that applies parsed state
// updates _requires_ that the given updates are consolidated. There
// must be at most one addition and/or one retraction for a given item,
// as identified by that items ID type.
let updates = Self::consolidate_updates(updates);

// Apply updates in groups, according to their timestamps.
let mut groups: Vec<Vec<_>> = Vec::new();
for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
groups.push(updates.collect());
// Bring the updates into the pseudo-topological order that we need
// for updating our in-memory state and generating builtin table
// updates.
let updates = sort_updates(updates.collect());
groups.push(updates);
}

for updates in groups {
let mut apply_state = BootstrapApplyState::Updates(Vec::new());
let mut apply_state = ApplyState::Updates(Vec::new());
let mut retractions = InProgressRetractions::default();

for update in updates {
let next_apply_state = BootstrapApplyState::new(update);
let (next_apply_state, (builtin_table_update, catalog_update)) = apply_state
.step(
next_apply_state,
ApplyState::new(update),
self,
&mut retractions,
local_expression_cache,
Expand All @@ -145,47 +152,6 @@ impl CatalogState {
(builtin_table_updates, catalog_updates)
}

/// Update in-memory catalog state from a list of updates made to the durable catalog state.
///
/// Returns builtin table updates corresponding to the changes to catalog state.
#[instrument]
pub(crate) fn apply_updates(
&mut self,
updates: Vec<StateUpdate>,
) -> Result<
(
Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
Vec<ParsedStateUpdate>,
),
CatalogError,
> {
let mut builtin_table_updates = Vec::with_capacity(updates.len());
let mut catalog_updates = Vec::with_capacity(updates.len());

// First, consolidate updates. The code that applies parsed state
// updates _requires_ that the given updates are consolidated. There
// must be at most one addition and/or one retraction for a given item,
// as identified by that items ID type.
let updates = Self::consolidate_updates(updates);

// Then bring it into the pseudo-topological order that we need for
// updating our in-memory state and generating builtin table updates.
let updates = sort_updates(updates);

for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
let mut retractions = InProgressRetractions::default();
let (builtin_table_update, catalog_updates_op) = self.apply_updates_inner(
updates.collect(),
&mut retractions,
&mut LocalExpressionCache::Closed,
)?;
builtin_table_updates.extend(builtin_table_update);
catalog_updates.extend(catalog_updates_op);
}

Ok((builtin_table_updates, catalog_updates))
}

/// It can happen that the sequencing logic creates "fluctuating" updates
/// for a given catalog ID. For example, when doing a `DROP OWNED BY ...`,
/// for a table, there will be a retraction of the original table state,
Expand All @@ -203,7 +169,6 @@ impl CatalogState {

updates
.into_iter()
.filter(|(_kind, _ts, diff)| *diff != 0.into())
.map(|(kind, ts, diff)| StateUpdate {
kind,
ts,
Expand Down Expand Up @@ -1956,21 +1921,13 @@ impl CatalogState {
}
}

/// Sort [`StateUpdate`]s in timestamp then dependency order
fn sort_updates(mut updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
let mut sorted_updates = Vec::with_capacity(updates.len());

updates.sort_by_key(|update| update.ts);
for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
let sorted_ts_updates = sort_updates_inner(updates.collect());
sorted_updates.extend(sorted_ts_updates);
}

sorted_updates
}

/// Sort [`StateUpdate`]s in dependency order for a single timestamp.
fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
/// Sort [`StateUpdate`]s in dependency order.
///
/// # Panics
///
/// This function assumes that all provided `updates` have the same timestamp
/// and will panic otherwise.
fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
fn push_update<T>(
update: T,
diff: StateDiff,
Expand Down Expand Up @@ -2387,46 +2344,54 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
.collect()
}

/// Most updates are applied one at a time, but during bootstrap, certain types are applied
/// separately in a batch for performance reasons. A constraint is that updates must be applied in
/// order. This process is modeled as a state machine that batches then applies groups of updates.
enum BootstrapApplyState {
/// Groups of updates of certain types are applied in batches to improve
/// performance. A constraint is that updates must be applied in order. This
/// process is modeled as a state machine that batches then applies groups of
/// updates.
enum ApplyState {
/// Additions of builtin views.
BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>),
/// Item updates that aren't builtin view additions.
///
/// This contains all updates whose application requires calling
/// `parse_item` and thus toggling the `enable_for_item_parsing` feature
/// flags.
Items(Vec<StateUpdate>),
/// All other updates.
Updates(Vec<StateUpdate>),
}

impl BootstrapApplyState {
fn new(update: StateUpdate) -> BootstrapApplyState {
match update {
StateUpdate {
kind: StateUpdateKind::SystemObjectMapping(system_object_mapping),
diff: StateDiff::Addition,
..
} if matches!(
system_object_mapping.description.object_type,
CatalogItemType::View
) =>
impl ApplyState {
fn new(update: StateUpdate) -> Self {
use StateUpdateKind::*;
match &update.kind {
SystemObjectMapping(som)
if som.description.object_type == CatalogItemType::View
&& update.diff == StateDiff::Addition =>
{
let view_addition = lookup_builtin_view_addition(system_object_mapping);
BootstrapApplyState::BuiltinViewAdditions(vec![view_addition])
}
StateUpdate {
kind: StateUpdateKind::IntrospectionSourceIndex(_),
..
}
| StateUpdate {
kind: StateUpdateKind::SystemObjectMapping(_),
..
}
| StateUpdate {
kind: StateUpdateKind::Item(_),
..
} => BootstrapApplyState::Items(vec![update]),
update => BootstrapApplyState::Updates(vec![update]),
let view_addition = lookup_builtin_view_addition(som.clone());
Self::BuiltinViewAdditions(vec![view_addition])
}

IntrospectionSourceIndex(_) | SystemObjectMapping(_) | TemporaryItem(_) | Item(_) => {
Self::Items(vec![update])
}

Role(_)
| RoleAuth(_)
| Database(_)
| Schema(_)
| DefaultPrivilege(_)
| SystemPrivilege(_)
| SystemConfiguration(_)
| Cluster(_)
| NetworkPolicy(_)
| ClusterReplica(_)
| SourceReferences(_)
| Comment(_)
| AuditLog(_)
| StorageCollectionMetadata(_)
| UnfinalizedShard(_) => Self::Updates(vec![update]),
}
}

Expand All @@ -2445,7 +2410,7 @@ impl BootstrapApplyState {
Vec<ParsedStateUpdate>,
) {
match self {
BootstrapApplyState::BuiltinViewAdditions(builtin_view_additions) => {
Self::BuiltinViewAdditions(builtin_view_additions) => {
let restore = state.system_configuration.clone();
state.system_configuration.enable_for_item_parsing();
let builtin_table_updates = CatalogState::parse_builtin_views(
Expand All @@ -2458,60 +2423,51 @@ impl BootstrapApplyState {
state.system_configuration = restore;
(builtin_table_updates, Vec::new())
}
BootstrapApplyState::Items(updates) => state.with_enable_for_item_parsing(|state| {
Self::Items(updates) => state.with_enable_for_item_parsing(|state| {
state
.apply_updates_inner(updates, retractions, local_expression_cache)
.expect("corrupt catalog")
}),
BootstrapApplyState::Updates(updates) => state
Self::Updates(updates) => state
.apply_updates_inner(updates, retractions, local_expression_cache)
.expect("corrupt catalog"),
}
}

async fn step(
self,
next: BootstrapApplyState,
next: Self,
state: &mut CatalogState,
retractions: &mut InProgressRetractions,
local_expression_cache: &mut LocalExpressionCache,
) -> (
BootstrapApplyState,
Self,
(
Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
Vec<ParsedStateUpdate>,
),
) {
match (self, next) {
(
BootstrapApplyState::BuiltinViewAdditions(mut builtin_view_additions),
BootstrapApplyState::BuiltinViewAdditions(next_builtin_view_additions),
Self::BuiltinViewAdditions(mut builtin_view_additions),
Self::BuiltinViewAdditions(next_builtin_view_additions),
) => {
// Continue batching builtin view additions.
builtin_view_additions.extend(next_builtin_view_additions);
(
BootstrapApplyState::BuiltinViewAdditions(builtin_view_additions),
Self::BuiltinViewAdditions(builtin_view_additions),
(Vec::new(), Vec::new()),
)
}
(BootstrapApplyState::Items(mut updates), BootstrapApplyState::Items(next_updates)) => {
(Self::Items(mut updates), Self::Items(next_updates)) => {
// Continue batching item updates.
updates.extend(next_updates);
(
BootstrapApplyState::Items(updates),
(Vec::new(), Vec::new()),
)
(Self::Items(updates), (Vec::new(), Vec::new()))
}
(
BootstrapApplyState::Updates(mut updates),
BootstrapApplyState::Updates(next_updates),
) => {
(Self::Updates(mut updates), Self::Updates(next_updates)) => {
// Continue batching updates.
updates.extend(next_updates);
(
BootstrapApplyState::Updates(updates),
(Vec::new(), Vec::new()),
)
(Self::Updates(updates), (Vec::new(), Vec::new()))
}
(apply_state, next_apply_state) => {
// Apply the current batch and start batching new apply state.
Expand Down
10 changes: 4 additions & 6 deletions src/adapter/src/catalog/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,8 @@ pub(crate) async fn migrate(
.expect("known parameter");
}

let (mut ast_builtin_table_updates, mut ast_catalog_updates) = state
.apply_updates_for_bootstrap(item_updates, local_expr_cache)
.await;
let (mut ast_builtin_table_updates, mut ast_catalog_updates) =
state.apply_updates(item_updates, local_expr_cache).await;

info!("migrating from catalog version {:?}", catalog_version);

Expand Down Expand Up @@ -240,9 +239,8 @@ pub(crate) async fn migrate(
// input and stages arbitrary transformations to the catalog on `tx`.

let op_item_updates = tx.get_and_commit_op_updates();
let (item_builtin_table_updates, item_catalog_updates) = state
.apply_updates_for_bootstrap(op_item_updates, local_expr_cache)
.await;
let (item_builtin_table_updates, item_catalog_updates) =
state.apply_updates(op_item_updates, local_expr_cache).await;

ast_builtin_table_updates.extend(item_builtin_table_updates);
ast_catalog_updates.extend(item_catalog_updates);
Expand Down
14 changes: 8 additions & 6 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ impl Catalog {
}

let (builtin_table_update, _catalog_updates) = state
.apply_updates_for_bootstrap(pre_item_updates, &mut LocalExpressionCache::Closed)
.apply_updates(pre_item_updates, &mut LocalExpressionCache::Closed)
.await;
builtin_table_updates.extend(builtin_table_update);

Expand Down Expand Up @@ -416,7 +416,7 @@ impl Catalog {
// and return and use the updates from here. But that's at the very
// least future work.
let (builtin_table_update, _catalog_updates) = state
.apply_updates_for_bootstrap(system_item_updates, &mut local_expr_cache)
.apply_updates(system_item_updates, &mut local_expr_cache)
.await;
builtin_table_updates.extend(builtin_table_update);

Expand Down Expand Up @@ -467,7 +467,7 @@ impl Catalog {
)
} else {
state
.apply_updates_for_bootstrap(item_updates, &mut local_expr_cache)
.apply_updates(item_updates, &mut local_expr_cache)
.await
};
builtin_table_updates.extend(builtin_table_update);
Expand All @@ -481,7 +481,7 @@ impl Catalog {
})
.collect();
let (builtin_table_update, _catalog_updates) = state
.apply_updates_for_bootstrap(post_item_updates, &mut local_expr_cache)
.apply_updates(post_item_updates, &mut local_expr_cache)
.await;
builtin_table_updates.extend(builtin_table_update);

Expand Down Expand Up @@ -511,7 +511,7 @@ impl Catalog {
// and return and use the updates from here. But that's at the very
// least future work.
let (table_updates, _catalog_updates) = state
.apply_updates_for_bootstrap(state_updates, &mut local_expr_cache)
.apply_updates(state_updates, &mut local_expr_cache)
.await;
builtin_table_updates.extend(table_updates);
let builtin_table_updates = state.resolve_builtin_table_updates(builtin_table_updates);
Expand Down Expand Up @@ -656,7 +656,9 @@ impl Catalog {
.map_err(mz_catalog::durable::DurableCatalogError::from)?;

let updates = txn.get_and_commit_op_updates();
let (builtin_updates, catalog_updates) = state.apply_updates(updates)?;
let (builtin_updates, catalog_updates) = state
.apply_updates(updates, &mut LocalExpressionCache::Closed)
.await;
assert!(
builtin_updates.is_empty(),
"storage is not allowed to generate catalog changes that would cause changes to builtin tables"
Expand Down
Loading