Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/api/status_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ const (
AdapterConditionUnknown AdapterConditionStatus = "Unknown"
)

// Condition type constants
const (
ConditionTypeAvailable = "Available"
ConditionTypeApplied = "Applied"
ConditionTypeHealth = "Health"
ConditionTypeReady = "Ready"
)

// ResourceCondition represents a condition of a resource
// Domain equivalent of openapi.ResourceCondition
// JSON tags match database JSONB structure
Expand Down
45 changes: 30 additions & 15 deletions pkg/services/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
stderrors "errors"
"fmt"
"strings"
"time"

Expand All @@ -30,7 +31,8 @@ type ClusterService interface {
UpdateClusterStatusFromAdapters(ctx context.Context, clusterID string) (*api.Cluster, *errors.ServiceError)

// ProcessAdapterStatus handles the business logic for adapter status:
// - If Available condition is "Unknown": returns (nil, nil) indicating no-op
// - First report: accepts Unknown Available condition, skips aggregation
// - Subsequent reports: rejects Unknown Available condition
// - Otherwise: upserts the status and triggers aggregation
ProcessAdapterStatus(
ctx context.Context, clusterID string, adapterStatus *api.AdapterStatus,
Expand Down Expand Up @@ -237,9 +239,12 @@ func (s *sqlClusterService) UpdateClusterStatusFromAdapters(
}

// ProcessAdapterStatus handles the business logic for adapter status:
// - If Available is "Unknown" and a status already exists: returns (nil, nil) as no-op
// - If Available is "Unknown" and no status exists (first report): upserts the status
// - Otherwise: upserts the status and triggers aggregation
// - Validates that all mandatory conditions (Available, Applied, Health) are present
// - Rejects duplicate condition types
// - For first status report: accepts Unknown Available condition to avoid data loss
// - For subsequent reports: rejects Unknown Available condition to preserve existing valid state
// - Uses complete replacement semantics: each update replaces all conditions for this adapter
// - Returns (nil, nil) for discarded updates
func (s *sqlClusterService) ProcessAdapterStatus(
ctx context.Context, clusterID string, adapterStatus *api.AdapterStatus,
) (*api.AdapterStatus, *errors.ServiceError) {
Expand All @@ -264,35 +269,45 @@ func (s *sqlClusterService) ProcessAdapterStatus(
}
}

// Find the "Available" condition
hasAvailableCondition := false
// Validate mandatory conditions and check for duplicates
if errorType, conditionName := ValidateMandatoryConditions(conditions); errorType != "" {
ctx = logger.WithClusterID(ctx, clusterID)
logger.Info(ctx, fmt.Sprintf("Discarding adapter status update from %s: %s condition %s",
adapterStatus.Adapter, errorType, conditionName))
return nil, nil
}

// Check Available condition for Unknown status
triggerAggregation := false
for _, cond := range conditions {
if cond.Type != "Available" {
if cond.Type != api.ConditionTypeAvailable {
continue
}

hasAvailableCondition = true
triggerAggregation = true
if cond.Status == api.AdapterConditionUnknown {
if existingStatus != nil {
// Available condition is "Unknown" and a status already exists, return nil to indicate no-op
// Non-first report && Available=Unknown → reject
ctx = logger.WithClusterID(ctx, clusterID)
logger.Info(ctx, fmt.Sprintf("Discarding adapter status update from %s: subsequent Unknown Available",
adapterStatus.Adapter))
return nil, nil
}
// First report from this adapter: allow storing even with Available=Unknown
// but skip aggregation since Unknown should not affect cluster-level conditions
hasAvailableCondition = false
triggerAggregation = false
}
break
}

// Upsert the adapter status
// Upsert the adapter status (complete replacement)
upsertedStatus, err := s.adapterStatusDao.Upsert(ctx, adapterStatus)
if err != nil {
return nil, handleCreateError("AdapterStatus", err)
}

// Only trigger aggregation when the adapter reported an Available condition.
// If the adapter status doesn't include Available (e.g. it only reports Ready/Progressing),
// saving it should not overwrite the cluster's synthetic Available/Ready conditions.
if hasAvailableCondition {
// Only trigger aggregation when triggerAggregation is true
if triggerAggregation {
if _, aggregateErr := s.UpdateClusterStatusFromAdapters(
ctx, clusterID,
); aggregateErr != nil {
Expand Down
Loading