Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/config/capabilities_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type CapabilitiesWorkflowRegistry interface {
RelayID() types.RelayID
SyncStrategy() string
MaxConcurrency() int
MaxActivationRetries() int
WorkflowStorage() WorkflowStorage
ModuleCache() ModuleCache
AdditionalSources() []AdditionalWorkflowSource
Expand Down
2 changes: 2 additions & 0 deletions core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,8 @@ MaxConfigSize = '50.00kb' # Default
SyncStrategy = 'event' # Default
# MaxConcurrency controls the maximum number of concurrent event handlers in the workflow registry syncer.
MaxConcurrency = 12 # Default
# MaxActivationRetries is the number of failed load/initialization attempts before the syncer stops retrying an active workflow. 0 disables the limit.
MaxActivationRetries = 100 # Default

[Capabilities.WorkflowRegistry.WorkflowStorage]
# URL is the location for the workflow storage service to be communicated with.
Expand Down
5 changes: 5 additions & 0 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2493,6 +2493,7 @@ type WorkflowRegistry struct {
MaxConfigSize *utils.FileSize
SyncStrategy *string
MaxConcurrency *int
MaxActivationRetries *int
WorkflowStorage WorkflowStorage
ModuleCache ModuleCache
AdditionalSourcesConfig []AdditionalWorkflowSource `toml:"AdditionalSources"`
Expand Down Expand Up @@ -2535,6 +2536,10 @@ func (r *WorkflowRegistry) setFrom(f *WorkflowRegistry) {
r.MaxConcurrency = f.MaxConcurrency
}

if f.MaxActivationRetries != nil {
r.MaxActivationRetries = f.MaxActivationRetries
}

r.WorkflowStorage.setFrom(&f.WorkflowStorage)
r.ModuleCache.setFrom(&f.ModuleCache)

Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ require (
github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 // indirect
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 // indirect
github.com/smartcontractkit/chainlink-protos/svr v1.2.0 // indirect
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb // indirect
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260630073003-fb8da7229930 // indirect
github.com/smartcontractkit/chainlink-solana/contracts v0.0.0-20260513123719-d347eaf314e1 // indirect
github.com/smartcontractkit/chainlink-sui v0.0.0 // indirect
github.com/smartcontractkit/chainlink-testing-framework/framework/components/chiprouter v1.0.4 // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions core/services/chainlink/config_capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ func (c *capabilitiesWorkflowRegistry) MaxConcurrency() int {
return *c.c.MaxConcurrency
}

func (c *capabilitiesWorkflowRegistry) MaxActivationRetries() int {
return *c.c.MaxActivationRetries
}

func (c *capabilitiesWorkflowRegistry) WorkflowStorage() config.WorkflowStorage {
return &workflowStorage{
c: c.c.WorkflowStorage,
Expand Down
1 change: 1 addition & 0 deletions core/services/chainlink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ func TestConfig_Marshal(t *testing.T) {
MaxConfigSize: ptr(utils.FileSize(50 * utils.KB)),
SyncStrategy: ptr("event"),
MaxConcurrency: ptr(12),
MaxActivationRetries: new(100),
WorkflowStorage: toml.WorkflowStorage{
ArtifactStorageHost: ptr(""),
URL: ptr(""),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ MaxEncryptedSecretsSize = '26.40kb'
MaxConfigSize = '50.00kb'
SyncStrategy = 'event'
MaxConcurrency = 12
MaxActivationRetries = 100
AdditionalSources = []

[Capabilities.WorkflowRegistry.WorkflowStorage]
Expand Down
1 change: 1 addition & 0 deletions core/services/chainlink/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ MaxEncryptedSecretsSize = '26.40kb'
MaxConfigSize = '50.00kb'
SyncStrategy = 'event'
MaxConcurrency = 12
MaxActivationRetries = 100

[Capabilities.WorkflowRegistry.WorkflowStorage]
ArtifactStorageHost = ''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ MaxEncryptedSecretsSize = '26.40kb'
MaxConfigSize = '50.00kb'
SyncStrategy = 'event'
MaxConcurrency = 12
MaxActivationRetries = 100
AdditionalSources = []

[Capabilities.WorkflowRegistry.WorkflowStorage]
Expand Down
1 change: 1 addition & 0 deletions core/services/cre/cre.go
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,7 @@ func newWorkflowRegistrySyncerV2(
syncerV2.WithAdditionalSources(addSourceConfigs),
syncerV2.WithShardOrchestratorClient(shardOrchestratorClient),
syncerV2.WithMaxConcurrency(wfReg.MaxConcurrency()),
syncerV2.WithMaxActivationRetries(wfReg.MaxActivationRetries()),
}
if cfg.Sharding().ShardingEnabled() {
registryOpts = append(registryOpts,
Expand Down
1 change: 1 addition & 0 deletions core/services/cre/cre_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (w wfRegTestStub) MaxConfigSize() utils.FileSize { return 0 }
func (w wfRegTestStub) RelayID() commontypes.RelayID { return commontypes.RelayID{} }
func (w wfRegTestStub) SyncStrategy() string { return "" }
func (w wfRegTestStub) MaxConcurrency() int { return 0 }
func (w wfRegTestStub) MaxActivationRetries() int { return 0 }
func (w wfRegTestStub) WorkflowStorage() config.WorkflowStorage { return wfRegStorageStub{} }
func (w wfRegTestStub) ModuleCache() config.ModuleCache { return wfRegModuleCacheStub{} }
func (w wfRegTestStub) AdditionalSources() []config.AdditionalWorkflowSource {
Expand Down
29 changes: 29 additions & 0 deletions core/services/workflows/events/emit.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,32 @@
return multiErr
}

func EmitWorkflowActivationAbandonedV2(
ctx context.Context,
labels map[string]string,
binaryURL string,
configURL string,
reason eventsv2.ActivationAbandonReason,
activationErr error,
retryCount int32,
) error {
var errorMessage string
if activationErr != nil {
errorMessage = activationErr.Error()
}

event := &eventsv2.WorkflowActivationAbandoned{
CreInfo: buildCREMetadataV2(labels),
Workflow: buildWorkflowV2(labels, binaryURL, configURL),
Timestamp: uint64(time.Now().Unix()), //nolint:gosec // G115: unix timestamp is non-negative
ErrorMessage: errorMessage,
Reason: reason,
RetryCount: retryCount,
}

return emitProtoMessage(ctx, event)
}

func EmitExecutionStartedEvent(
ctx context.Context,
labels map[string]string,
Expand Down Expand Up @@ -482,7 +508,7 @@
// V2 event types
case *eventsv2.WorkflowExecutionStarted:
schema = SchemaWorkflowStartedV2
entity = "workflows.v2." + WorkflowExecutionStarted

Check warning on line 511 in core/services/workflows/events/emit.go

View check run for this annotation

CL-sonarqube-production / SonarQube Code Analysis

Define a constant instead of duplicating this literal "workflows.v2." 12 times.

[S1192] String literals should not be duplicated See more on https://sonarqube.main.prod.cldev.sh/project/issues?id=smartcontractkit_chainlink&pullRequest=22853&issues=068f2908-0ce6-49e5-88ca-62eee0523509&open=068f2908-0ce6-49e5-88ca-62eee0523509
case *eventsv2.WorkflowExecutionFinished:
schema = SchemaWorkflowFinishedV2
entity = "workflows.v2." + WorkflowExecutionFinished
Expand All @@ -504,6 +530,9 @@
case *eventsv2.WorkflowActivated:
schema = SchemaWorkflowActivatedV2
entity = "workflows.v2." + WorkflowActivated
case *eventsv2.WorkflowActivationAbandoned:
schema = SchemaWorkflowActivationAbandonedV2
entity = "workflows.v2." + WorkflowActivationAbandoned
case *eventsv2.WorkflowPaused:
schema = SchemaWorkflowPausedV2
entity = "workflows.v2." + WorkflowPaused
Expand Down
25 changes: 14 additions & 11 deletions core/services/workflows/events/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const (
WorkflowStatusChanged string = "WorkflowStatusChanged"
// WorkflowActivated represents a workflow activated event
WorkflowActivated string = "WorkflowActivated"
// WorkflowActivationAbandoned represents a terminal activation failure (no further retries)
WorkflowActivationAbandoned string = "WorkflowActivationAbandoned"
// WorkflowPaused represents a workflow paused event
WorkflowPaused string = "WorkflowPaused"
// WorkflowDeleted represents a workflow deleted event
Expand Down Expand Up @@ -43,17 +45,18 @@ const (
SchemaUserLogs string = "/cre-events-user-logs/v1"

// V2 schema constants
SchemaWorkflowStartedV2 string = "/cre-events-workflow-started/v2"
SchemaWorkflowFinishedV2 string = "/cre-events-workflow-finished/v2"
SchemaWorkflowExecutionProfileV2 string = "/cre-events-workflow-execution-profile/v2"
SchemaCapabilityStartedV2 string = "/cre-events-capability-started/v2"
SchemaCapabilityFinishedV2 string = "/cre-events-capability-finished/v2"
SchemaTriggerStartedV2 string = "/cre-events-trigger-started/v2"
SchemaUserLogsV2 string = "/cre-events-user-logs/v2"
SchemaUserMetricV2 string = "/cre-events-user-metric/v2"
SchemaWorkflowActivatedV2 string = "/cre-events-workflow-activated/v2"
SchemaWorkflowPausedV2 string = "/cre-events-workflow-paused/v2"
SchemaWorkflowDeletedV2 string = "/cre-events-workflow-deleted/v2"
SchemaWorkflowStartedV2 string = "/cre-events-workflow-started/v2"
SchemaWorkflowFinishedV2 string = "/cre-events-workflow-finished/v2"
SchemaWorkflowExecutionProfileV2 string = "/cre-events-workflow-execution-profile/v2"
SchemaCapabilityStartedV2 string = "/cre-events-capability-started/v2"
SchemaCapabilityFinishedV2 string = "/cre-events-capability-finished/v2"
SchemaTriggerStartedV2 string = "/cre-events-trigger-started/v2"
SchemaUserLogsV2 string = "/cre-events-user-logs/v2"
SchemaUserMetricV2 string = "/cre-events-user-metric/v2"
SchemaWorkflowActivatedV2 string = "/cre-events-workflow-activated/v2"
SchemaWorkflowActivationAbandonedV2 string = "/cre-events-workflow-activation-abandoned/v2"
SchemaWorkflowPausedV2 string = "/cre-events-workflow-paused/v2"
SchemaWorkflowDeletedV2 string = "/cre-events-workflow-deleted/v2"

MeteringReportSchema string = "/workflows/v1/metering.proto"
MeteringReportDomain string = "platform"
Expand Down
106 changes: 106 additions & 0 deletions core/services/workflows/syncer/v2/activation_errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package v2

import (
"errors"
"math"
"strings"

eventsv2 "github.com/smartcontractkit/chainlink-protos/workflows/go/v2"

"github.com/smartcontractkit/chainlink/v2/core/services/workflows/types"
)

// ActivationRetryPolicy controls how failed workflow activations are retried.
type ActivationRetryPolicy int

const (
// ActivationRetryable retries with exponential backoff up to MaxActivationRetries
// (~10h at default 100). Applies to all errors except permanent user/config failures.
ActivationRetryable ActivationRetryPolicy = iota
// ActivationNonRetryable drops immediately without further activation retries
// (permanent user/config errors).
ActivationNonRetryable
)

func activationRetryCountAsInt32(retryCount int) int32 {
if retryCount < 0 {
return 0
}
if retryCount > math.MaxInt32 {
return math.MaxInt32
}
return int32(retryCount)
}

func activationAbandonReasonMetricLabel(reason eventsv2.ActivationAbandonReason) string {
switch reason {
case eventsv2.ActivationAbandonReason_ACTIVATION_ABANDON_REASON_NON_RETRYABLE:
return "non_retryable"
case eventsv2.ActivationAbandonReason_ACTIVATION_ABANDON_REASON_RETRY_LIMIT_EXCEEDED:
return "retry_limit_exceeded"
default:
return "unknown"
}
}

type activationPolicyError struct {
err error
policy ActivationRetryPolicy
}

func (e *activationPolicyError) Error() string { return e.err.Error() }

func (e *activationPolicyError) Unwrap() error { return e.err }

func nonRetryable(err error) error {
if err == nil {
return nil
}
return &activationPolicyError{err: err, policy: ActivationNonRetryable}
}

func classifyActivationError(err error) ActivationRetryPolicy {
var policyErr *activationPolicyError
if errors.As(err, &policyErr) {
return policyErr.policy
}

if errors.Is(err, types.ErrGlobalWorkflowCountLimitReached) ||
errors.Is(err, types.ErrPerOwnerWorkflowCountLimitReached) {
return ActivationNonRetryable
}

if isPermanentEngineInitError(err) {
return ActivationNonRetryable
}

return ActivationRetryable
}

func activationRetryPolicyForEvent(eventName WorkflowRegistryEventName, handleErr error) ActivationRetryPolicy {
if eventName != WorkflowActivated {
return ActivationRetryable
}
return classifyActivationError(handleErr)
}

// isPermanentEngineInitError identifies engine init failures caused by user/config
// mistakes. Prefer sentinel errors at the source; extend this set as errors are catalogued.
func isPermanentEngineInitError(err error) bool {
for err != nil {
msg := strings.ToLower(err.Error())
switch {
case strings.Contains(msg, "workflowid mismatch"),
strings.Contains(msg, "invalid workflow id"),
strings.Contains(msg, "invalid workflow name"),
strings.Contains(msg, "failed to decode workflow spec binary"),
strings.Contains(msg, "failed to decode owner"),
strings.Contains(msg, "invalid cron schedule"),
strings.Contains(msg, "cron schedule must specify"),
strings.Contains(msg, "interval exceeded"):
return true
}
err = errors.Unwrap(err)
}
return false
}
90 changes: 90 additions & 0 deletions core/services/workflows/syncer/v2/activation_errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package v2

import (
"errors"
"fmt"
"testing"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/services/workflows/types"
)

func Test_classifyActivationError(t *testing.T) {
t.Parallel()

tests := []struct {
name string
err error
want ActivationRetryPolicy
}{
{
name: "artifact fetch is retryable",
err: &types.ArtifactFetchError{ArtifactType: "binary", URL: "http://example", Err: errors.New("503")},
want: ActivationRetryable,
},
{
name: "wrapped artifact fetch is retryable",
err: fmt.Errorf("create spec: %w", &types.ArtifactFetchError{ArtifactType: "config", URL: "http://example", Err: errors.New("timeout")}),
want: ActivationRetryable,
},
{
name: "global limit is non-retryable",
err: types.ErrGlobalWorkflowCountLimitReached,
want: ActivationNonRetryable,
},
{
name: "per owner limit is non-retryable",
err: types.ErrPerOwnerWorkflowCountLimitReached,
want: ActivationNonRetryable,
},
{
name: "explicit non-retryable wrapper",
err: nonRetryable(errors.New("workflowID mismatch")),
want: ActivationNonRetryable,
},
{
name: "workflow id mismatch message is non-retryable",
err: fmt.Errorf("engine initialization failed: %w", errors.New("workflowID mismatch: abc != def")),
want: ActivationNonRetryable,
},
{
name: "invalid cron schedule is non-retryable",
err: fmt.Errorf("engine initialization failed: %w", errors.New("invalid cron schedule 'bad'")),
want: ActivationNonRetryable,
},
{
name: "interval exceeded is non-retryable",
err: fmt.Errorf("engine initialization failed: %w", errors.New("cron trigger interval exceeded")),
want: ActivationNonRetryable,
},
{
name: "unknown error is retryable",
err: errors.New("unexpected engine failure"),
want: ActivationRetryable,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
require.Equal(t, tt.want, classifyActivationError(tt.err))
})
}
}

func Test_activationRetryPolicyForEvent(t *testing.T) {
t.Parallel()

t.Run("non-activation events are retryable", func(t *testing.T) {
t.Parallel()
policy := activationRetryPolicyForEvent(WorkflowDeleted, &types.ArtifactFetchError{})
require.Equal(t, ActivationRetryable, policy)
})

t.Run("activation artifact fetch is retryable", func(t *testing.T) {
t.Parallel()
policy := activationRetryPolicyForEvent(WorkflowActivated, &types.ArtifactFetchError{})
require.Equal(t, ActivationRetryable, policy)
})
}
Loading
Loading