diff --git a/crates/control-plane-api/README.md b/crates/control-plane-api/README.md new file mode 100644 index 00000000000..ff2e333105f --- /dev/null +++ b/crates/control-plane-api/README.md @@ -0,0 +1,37 @@ +# control-plane-api + +## Development + +> **NOTE:** All commands below should be run from inside the Lima VM. + +### Applying Changes + +Restart the agent API service to pick up changes: + +```bash +systemctl --user restart flow-control-agent.service +``` + +### Updating the GraphQL Schema + +The auto-generated GraphQL schema is checked into the repo. After making changes to the GraphQL API, regenerate it with: + +```bash +cargo build -p flow-client --features generate +``` + +### Updating sqlx query cache + +After adding / modifying SQL queries, regenerate the checked-in sqlx query cache so that offline compilation works: + +```bash +cargo sqlx prepare --workspace +``` + +### Updating test snapshots + +Tests use `insta` for snapshot testing. After making changes that affect test output, review and accept updated snapshots with: + +```bash +cargo insta review -p control-plane-api +``` diff --git a/crates/control-plane-api/src/server/public/graphql/alert_subscriptions.rs b/crates/control-plane-api/src/server/public/graphql/alert_subscriptions.rs index 96a21779d83..33c66c8b5f9 100644 --- a/crates/control-plane-api/src/server/public/graphql/alert_subscriptions.rs +++ b/crates/control-plane-api/src/server/public/graphql/alert_subscriptions.rs @@ -75,6 +75,17 @@ impl AlertSubscriptionsMutation { email, prefix, ))); } + + // let mut resolved_alert_type = alert_types.as_deref().unwrap_or(DEFAULT_ALERT_TYPES); + // for system_alert in AlertType::all() + // .into_iter() + // .filter(|ty| ty.is_system_alert()) + // { + // if !resolved_alert_type.contains(system_alert) { + // resolved_alert_type.push(system_alert); + // } + // } + let updated = create_alert_subscription( prefix.as_str(), email.as_str(), @@ -121,6 +132,16 @@ impl AlertSubscriptionsMutation { ))); }; + // let mut resolved_alert_type = alert_types.as_deref().unwrap_or(&existing.alert_types); + // for system_alert in AlertType::all() + // .into_iter() + // .filter(|ty| ty.is_system_alert()) + // { + // if !resolved_alert_type.contains(system_alert) { + // resolved_alert_type.push(system_alert); + // } + // } + let new_detail = detail.as_deref().or(existing.detail.as_deref()); let updated = update_alert_subscription( diff --git a/crates/control-plane-api/src/server/public/graphql/alert_types.rs b/crates/control-plane-api/src/server/public/graphql/alert_types.rs new file mode 100644 index 00000000000..d9c8bbe90d0 --- /dev/null +++ b/crates/control-plane-api/src/server/public/graphql/alert_types.rs @@ -0,0 +1,36 @@ +use models::status::AlertType; + +#[derive(Debug, Default)] +pub struct AlertTypesQuery; + +/// Describes an alert type with user-facing metadata. +#[derive(Debug, Clone, async_graphql::SimpleObject)] +pub struct AlertTypeInfo { + /// The alert type identifier. + alert_type: AlertType, + /// A user-facing description of what this alert type means. + description: String, + /// A short, user-facing alert type name. + display_name: String, + /// An indication of whether the alert type is subscribed to by default. + is_default: bool, + /// An indication of whether the alert type is considered to be a system alert. + is_system_alert: bool, +} + +#[async_graphql::Object] +impl AlertTypesQuery { + /// Returns all possible alert types with their user-facing metadata. + async fn alert_types(&self) -> Vec { + AlertType::all() + .iter() + .map(|at| AlertTypeInfo { + alert_type: *at, + description: at.description().to_string(), + display_name: at.display_name().to_string(), + is_default: at.is_default(), + is_system_alert: at.is_system_alert(), + }) + .collect() + } +} diff --git a/crates/control-plane-api/src/server/public/graphql/mod.rs b/crates/control-plane-api/src/server/public/graphql/mod.rs index e2122654235..112855573ab 100644 --- a/crates/control-plane-api/src/server/public/graphql/mod.rs +++ b/crates/control-plane-api/src/server/public/graphql/mod.rs @@ -22,6 +22,7 @@ impl connection::CursorType for TimestampCursor { } mod alert_subscriptions; +mod alert_types; mod alerts; mod authorized_prefixes; mod data_planes; @@ -50,6 +51,7 @@ pub struct PgDataLoader(pub sqlx::PgPool); pub struct QueryRoot( live_spec_refs::LiveSpecsQuery, alerts::AlertsQuery, + alert_types::AlertTypesQuery, prefixes::PrefixesQuery, alert_subscriptions::AlertSubscriptionsQuery, storage_mappings::StorageMappingsQuery, diff --git a/crates/flow-client/control-plane-api.graphql b/crates/flow-client/control-plane-api.graphql index 88f65e0604e..c377bdc498d 100644 --- a/crates/flow-client/control-plane-api.graphql +++ b/crates/flow-client/control-plane-api.graphql @@ -1,3 +1,13 @@ +""" +Status of the abandonment evaluation for a task. +""" +type AbandonStatus { + """ + When this spec was last checked for abandonment + """ + lastEvaluated: DateTime +} + """ Status of the task shards running in the data-plane. This records information about the activations of builds in the data-plane, including any subsequent re-activations @@ -163,6 +173,28 @@ enum AlertType { """ shard_failed """ + Warning that a task has been unable to run for an extended period. It will + be automatically disabled unless the issue is addressed or a new version + of the spec is published. + """ + task_chronically_failing + """ + The task was automatically disabled because its shards have been + failing continuously for an extended period without any user intervention. + """ + task_auto_disabled_failing + """ + Warning that a task has not processed any data for an extended period + and has not been modified recently. It will be automatically disabled + unless a new version of the spec is published. + """ + task_idle + """ + The task was automatically disabled because it had not processed any + data for an extended period and had not been modified recently. + """ + task_auto_disabled_idle + """ Triggers when an automated background process needs to publish a spec, but is unable to because of publication errors. Background publications are peformed on all specs for a variety of reasons. For example, @@ -173,6 +205,32 @@ enum AlertType { background_publication_failed } +""" +Describes an alert type with user-facing metadata. +""" +type AlertTypeInfo { + """ + The alert type identifier. + """ + alertType: AlertType! + """ + A user-facing description of what this alert type means. + """ + description: String! + """ + A short, user-facing alert type name. + """ + display_name: String! + """ + An indication of whether the alert type is subscribed to by default. + """ + is_default: Boolean! + """ + An indication of whether the alert type is considered to be a system alert. + """ + is_system_alert: Boolean! +} + input AlertsBy { """ Show alerts for the given catalog namespace prefix. @@ -354,6 +412,10 @@ type Controller { """ configUpdate: PendingConfigUpdateStatus alerts: JSONObject + """ + Present for captures, collections, and materializations. + """ + abandon: AbandonStatus updatedAt: DateTime! } @@ -769,21 +831,21 @@ type LockFailure { type MutationRoot { """ Create a storage mapping for the given catalog prefix. - + This validates that the user has admin access to the catalog prefix, runs health checks to verify that data planes can access the storage buckets, and then saves the storage mapping to the database. - + All health checks must pass before the storage mapping is created. """ createStorageMapping(catalogPrefix: Prefix!, detail: String, spec: JSON!): CreateStorageMappingResult! """ Update an existing storage mapping for the given catalog prefix. - + This validates that the user has admin access to the catalog prefix, runs health checks to verify that data planes can access the storage buckets, and then updates the storage mapping in the database. - + Health checks for newly added stores or data planes must pass before the storage mapping is updated. Health check failures for existing stores/data planes are allowed (they were already validated when created). @@ -791,10 +853,10 @@ type MutationRoot { updateStorageMapping(catalogPrefix: Prefix!, detail: String, spec: JSON!): UpdateStorageMappingResult! """ Check storage health for a given catalog prefix and storage definition. - + This validates the inputs, verifies that the user has admin access to the catalog prefix, and runs health checks to verify that data planes can access the storage buckets. - + Unlike create/update mutations, this does not modify any data and always returns health check results (both successes and failures) rather than erroring on failures. """ @@ -815,7 +877,7 @@ type MutationRoot { deleteAlertSubscription(prefix: Prefix!, email: String!): AlertSubscription! """ Create an invite link that grants access to a catalog prefix. - + The caller must have admin capability on the catalog prefix. Share the returned token with the intended recipient out-of-band. """ @@ -827,7 +889,7 @@ type MutationRoot { redeemInviteLink(token: UUID!): RedeemInviteLinkResult! """ Delete an invite link, revoking it so it can no longer be redeemed. - + The caller must have admin capability on the invite link's catalog prefix. """ deleteInviteLink(token: UUID!): Boolean! @@ -997,7 +1059,7 @@ type QueryRoot { """ Returns a paginated list of live specs under the given prefix and matching the given type. - + Note that the `user_capability` that's returned as part of the reference represents the user's capability to the whole prefix, and it is possible that there are more specific grants for a broader capability. In other @@ -1010,6 +1072,10 @@ type QueryRoot { prefixes. """ alerts(by: AlertsBy!, before: String, last: Int, after: String, first: Int): AlertConnection! + """ + Returns all possible alert types with their user-facing metadata. + """ + alertTypes: [AlertTypeInfo!]! prefixes(by: PrefixesBy!, after: String, first: Int): PrefixRefConnection! """ Returns a complete list of alert subscriptions. @@ -1017,21 +1083,21 @@ type QueryRoot { alertSubscriptions(by: AlertSubscriptionsBy!): [AlertSubscription!]! """ Returns storage mappings accessible to the current user. - + Requires at least read capability to the queried prefixes. Results are paginated and sorted by catalog_prefix. """ storageMappings(by: StorageMappingsBy!, after: String, before: String, first: Int, last: Int): StorageMappingConnection! """ Returns data planes accessible to the current user. - + Results are paginated and sorted by data_plane_name. Only data planes the user has at least read capability to are returned. """ dataPlanes(after: String, before: String, first: Int, last: Int): DataPlaneConnection! """ List invite links the caller has admin access to. - + Returns invite links under all prefixes where the caller has admin capability, optionally narrowed by a prefix filter. """ diff --git a/crates/models/src/status/alerts.rs b/crates/models/src/status/alerts.rs index cc5a02872c2..2531c81bafc 100644 --- a/crates/models/src/status/alerts.rs +++ b/crates/models/src/status/alerts.rs @@ -128,6 +128,66 @@ impl AlertType { } } + /// A short, user-facing alert type name. + pub fn display_name(&self) -> &'static str { + match self { + AlertType::AutoDiscoverFailed => "Auto-Discovery Failed", + AlertType::DataMovementStalled => "Data Movement Stalled", + AlertType::FreeTrial => "Free Trial", + AlertType::FreeTrialEnding => "Free Trial Ending", + AlertType::FreeTrialStalled => "Free Trial Stalled", + AlertType::MissingPaymentMethod => "Missing Payment Method", + AlertType::ShardFailed => "Task Failed", + AlertType::TaskChronicallyFailing => "Task Chronically Failing", + AlertType::TaskAutoDisabledFailing => "Task Auto-Disabled (Failing)", + AlertType::TaskIdle => "Task Idle", + AlertType::TaskAutoDisabledIdle => "Task Auto-Disabled (Idle)", + AlertType::BackgroundPublicationFailed => "Background Publication Failed", + } + } + + /// A user-facing description of what this alert type means. + pub fn description(&self) -> &'static str { + match self { + AlertType::AutoDiscoverFailed => { + "Triggers when a capture's automated schema discovery fails. The capture may be unable to respond to schema changes in the source system." + } + AlertType::DataMovementStalled => { + "Triggers when a task has not processed any data during its configured alert interval." + } + AlertType::FreeTrial => { + "Triggers when a free trial begins, and resolves when the trial period ends." + } + AlertType::FreeTrialEnding => { + "Triggers when a free trial is getting close to expiring." + } + AlertType::FreeTrialStalled => { + "Triggers when a free trial has expired and no payment method has been added." + } + AlertType::MissingPaymentMethod => { + "Triggers when no payment method is on file, and resolves when one is added." + } + AlertType::ShardFailed => { + "Triggers when a task has experienced repeated failures. It may still make progress, but performance is degraded." + } + AlertType::TaskChronicallyFailing => { + "Triggers when a task has been unable to run for an extended period. It will be automatically disabled unless the issue is addressed." + } + AlertType::TaskAutoDisabledFailing => { + "Triggers when a task is automatically disabled after failing continuously for an extended period." + } + AlertType::TaskIdle => { + "Triggers when a task has not processed any data for an extended period and has not been modified recently. It will be automatically disabled unless a new version is published." + } + AlertType::TaskAutoDisabledIdle => { + "Triggers when a task is automatically disabled after being idle for an extended period without any spec changes." + } + AlertType::BackgroundPublicationFailed => { + "Triggers when an automated background publication fails. Affected tasks are unlikely to function until the issue is addressed." + } + } + } + pub fn from_str(name: &str) -> Option { for alert_type in AlertType::all() { if name.eq_ignore_ascii_case(alert_type.name()) { @@ -136,6 +196,42 @@ impl AlertType { } None } + + /// An indication of whether the alert type is subscribed to by default. + pub fn is_default(&self) -> bool { + match self { + AlertType::AutoDiscoverFailed => true, + AlertType::BackgroundPublicationFailed => true, + AlertType::DataMovementStalled => true, + AlertType::FreeTrial => true, + AlertType::FreeTrialEnding => true, + AlertType::FreeTrialStalled => true, + AlertType::MissingPaymentMethod => true, + AlertType::ShardFailed => true, + AlertType::TaskAutoDisabledFailing => true, + AlertType::TaskAutoDisabledIdle => true, + AlertType::TaskChronicallyFailing => true, + AlertType::TaskIdle => true, + } + } + + /// An indication of whether the alert type is considered to be a system alert. + pub fn is_system_alert(&self) -> bool { + match self { + AlertType::AutoDiscoverFailed => false, + AlertType::BackgroundPublicationFailed => false, + AlertType::DataMovementStalled => false, + AlertType::FreeTrial => true, + AlertType::FreeTrialEnding => true, + AlertType::FreeTrialStalled => true, + AlertType::MissingPaymentMethod => true, + AlertType::ShardFailed => false, + AlertType::TaskAutoDisabledFailing => true, + AlertType::TaskAutoDisabledIdle => true, + AlertType::TaskChronicallyFailing => true, + AlertType::TaskIdle => true, + } + } } #[cfg(feature = "sqlx-support")]