Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions crates/control-plane-api/README.md
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# control-plane-api

## Development

> **NOTE:** All commands below should be run from inside the Lima VM.

### Applying Changes

Restart the agent API service to pick up changes:

```bash
systemctl --user restart flow-control-agent.service
```

### Updating the GraphQL Schema

The auto-generated GraphQL schema is checked into the repo. After making changes to the GraphQL API, regenerate it with:

```bash
cargo build -p flow-client --features generate
```

### Updating sqlx query cache

After adding / modifying SQL queries, regenerate the checked-in sqlx query cache so that offline compilation works:

```bash
cargo sqlx prepare --workspace
```

### Updating test snapshots

Tests use `insta` for snapshot testing. After making changes that affect test output, review and accept updated snapshots with:

```bash
cargo insta review -p control-plane-api
```
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ impl AlertSubscriptionsMutation {
email, prefix,
)));
}

// let mut resolved_alert_type = alert_types.as_deref().unwrap_or(DEFAULT_ALERT_TYPES);
// for system_alert in AlertType::all()
// .into_iter()
// .filter(|ty| ty.is_system_alert())
// {
// if !resolved_alert_type.contains(system_alert) {
// resolved_alert_type.push(system_alert);
// }
// }

let updated = create_alert_subscription(
prefix.as_str(),
email.as_str(),
Expand Down Expand Up @@ -121,6 +132,16 @@ impl AlertSubscriptionsMutation {
)));
};

// let mut resolved_alert_type = alert_types.as_deref().unwrap_or(&existing.alert_types);
// for system_alert in AlertType::all()
// .into_iter()
// .filter(|ty| ty.is_system_alert())
// {
// if !resolved_alert_type.contains(system_alert) {
// resolved_alert_type.push(system_alert);
// }
// }

let new_detail = detail.as_deref().or(existing.detail.as_deref());

let updated = update_alert_subscription(
Expand Down
36 changes: 36 additions & 0 deletions crates/control-plane-api/src/server/public/graphql/alert_types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use models::status::AlertType;

#[derive(Debug, Default)]
pub struct AlertTypesQuery;

/// Describes an alert type with user-facing metadata.
#[derive(Debug, Clone, async_graphql::SimpleObject)]
pub struct AlertTypeInfo {
/// The alert type identifier.
alert_type: AlertType,
/// A user-facing description of what this alert type means.
description: String,
/// A short, user-facing alert type name.
display_name: String,
/// An indication of whether the alert type is subscribed to by default.
is_default: bool,
/// An indication of whether the alert type is considered to be a system alert.
is_system_alert: bool,
}

#[async_graphql::Object]
impl AlertTypesQuery {
/// Returns all possible alert types with their user-facing metadata.
async fn alert_types(&self) -> Vec<AlertTypeInfo> {
AlertType::all()
.iter()
.map(|at| AlertTypeInfo {
alert_type: *at,
description: at.description().to_string(),
display_name: at.display_name().to_string(),
is_default: at.is_default(),
is_system_alert: at.is_system_alert(),
})
.collect()
}
}
2 changes: 2 additions & 0 deletions crates/control-plane-api/src/server/public/graphql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ impl connection::CursorType for TimestampCursor {
}

mod alert_subscriptions;
mod alert_types;
mod alerts;
mod authorized_prefixes;
mod data_planes;
Expand Down Expand Up @@ -50,6 +51,7 @@ pub struct PgDataLoader(pub sqlx::PgPool);
pub struct QueryRoot(
live_spec_refs::LiveSpecsQuery,
alerts::AlertsQuery,
alert_types::AlertTypesQuery,
prefixes::PrefixesQuery,
alert_subscriptions::AlertSubscriptionsQuery,
storage_mappings::StorageMappingsQuery,
Expand Down
90 changes: 78 additions & 12 deletions crates/flow-client/control-plane-api.graphql
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
"""
Status of the abandonment evaluation for a task.
"""
type AbandonStatus {
"""
When this spec was last checked for abandonment
"""
lastEvaluated: DateTime
}

"""
Status of the task shards running in the data-plane. This records information about
the activations of builds in the data-plane, including any subsequent re-activations
Expand Down Expand Up @@ -163,6 +173,28 @@ enum AlertType {
"""
shard_failed
"""
Warning that a task has been unable to run for an extended period. It will
be automatically disabled unless the issue is addressed or a new version
of the spec is published.
"""
task_chronically_failing
"""
The task was automatically disabled because its shards have been
failing continuously for an extended period without any user intervention.
"""
task_auto_disabled_failing
"""
Warning that a task has not processed any data for an extended period
and has not been modified recently. It will be automatically disabled
unless a new version of the spec is published.
"""
task_idle
"""
The task was automatically disabled because it had not processed any
data for an extended period and had not been modified recently.
"""
task_auto_disabled_idle
"""
Triggers when an automated background process needs to publish a spec,
but is unable to because of publication errors. Background publications
are peformed on all specs for a variety of reasons. For example,
Expand All @@ -173,6 +205,32 @@ enum AlertType {
background_publication_failed
}

"""
Describes an alert type with user-facing metadata.
"""
type AlertTypeInfo {
"""
The alert type identifier.
"""
alertType: AlertType!
"""
A user-facing description of what this alert type means.
"""
description: String!
"""
A short, user-facing alert type name.
"""
display_name: String!
"""
An indication of whether the alert type is subscribed to by default.
"""
is_default: Boolean!
"""
An indication of whether the alert type is considered to be a system alert.
"""
is_system_alert: Boolean!
}

input AlertsBy {
"""
Show alerts for the given catalog namespace prefix.
Expand Down Expand Up @@ -354,6 +412,10 @@ type Controller {
"""
configUpdate: PendingConfigUpdateStatus
alerts: JSONObject
"""
Present for captures, collections, and materializations.
"""
abandon: AbandonStatus
updatedAt: DateTime!
}

Expand Down Expand Up @@ -769,32 +831,32 @@ type LockFailure {
type MutationRoot {
"""
Create a storage mapping for the given catalog prefix.

This validates that the user has admin access to the catalog prefix,
runs health checks to verify that data planes can access the storage buckets,
and then saves the storage mapping to the database.

All health checks must pass before the storage mapping is created.
"""
createStorageMapping(catalogPrefix: Prefix!, detail: String, spec: JSON!): CreateStorageMappingResult!
"""
Update an existing storage mapping for the given catalog prefix.

This validates that the user has admin access to the catalog prefix,
runs health checks to verify that data planes can access the storage buckets,
and then updates the storage mapping in the database.

Health checks for newly added stores or data planes must pass before the
storage mapping is updated. Health check failures for existing stores/data planes
are allowed (they were already validated when created).
"""
updateStorageMapping(catalogPrefix: Prefix!, detail: String, spec: JSON!): UpdateStorageMappingResult!
"""
Check storage health for a given catalog prefix and storage definition.

This validates the inputs, verifies that the user has admin access to the catalog prefix,
and runs health checks to verify that data planes can access the storage buckets.

Unlike create/update mutations, this does not modify any data and always returns
health check results (both successes and failures) rather than erroring on failures.
"""
Expand All @@ -815,7 +877,7 @@ type MutationRoot {
deleteAlertSubscription(prefix: Prefix!, email: String!): AlertSubscription!
"""
Create an invite link that grants access to a catalog prefix.

The caller must have admin capability on the catalog prefix.
Share the returned token with the intended recipient out-of-band.
"""
Expand All @@ -827,7 +889,7 @@ type MutationRoot {
redeemInviteLink(token: UUID!): RedeemInviteLinkResult!
"""
Delete an invite link, revoking it so it can no longer be redeemed.

The caller must have admin capability on the invite link's catalog prefix.
"""
deleteInviteLink(token: UUID!): Boolean!
Expand Down Expand Up @@ -997,7 +1059,7 @@ type QueryRoot {
"""
Returns a paginated list of live specs under the given prefix and
matching the given type.

Note that the `user_capability` that's returned as part of the reference
represents the user's capability to the whole prefix, and it is possible
that there are more specific grants for a broader capability. In other
Expand All @@ -1010,28 +1072,32 @@ type QueryRoot {
prefixes.
"""
alerts(by: AlertsBy!, before: String, last: Int, after: String, first: Int): AlertConnection!
"""
Returns all possible alert types with their user-facing metadata.
"""
alertTypes: [AlertTypeInfo!]!
prefixes(by: PrefixesBy!, after: String, first: Int): PrefixRefConnection!
"""
Returns a complete list of alert subscriptions.
"""
alertSubscriptions(by: AlertSubscriptionsBy!): [AlertSubscription!]!
"""
Returns storage mappings accessible to the current user.

Requires at least read capability to the queried prefixes.
Results are paginated and sorted by catalog_prefix.
"""
storageMappings(by: StorageMappingsBy!, after: String, before: String, first: Int, last: Int): StorageMappingConnection!
"""
Returns data planes accessible to the current user.

Results are paginated and sorted by data_plane_name.
Only data planes the user has at least read capability to are returned.
"""
dataPlanes(after: String, before: String, first: Int, last: Int): DataPlaneConnection!
"""
List invite links the caller has admin access to.

Returns invite links under all prefixes where the caller has admin
capability, optionally narrowed by a prefix filter.
"""
Expand Down
Loading
Loading