From 4be79f6ca80d88d5b6f67294888c74468a21408b Mon Sep 17 00:00:00 2001 From: mchain0 Date: Tue, 16 Jun 2026 13:59:04 +0200 Subject: [PATCH 1/8] cre-4929: retry limit and auto drop of failing workflows --- core/config/capabilities_config.go | 1 + core/config/docs/core.toml | 2 + core/config/toml/types.go | 5 ++ .../services/chainlink/config_capabilities.go | 4 ++ core/services/chainlink/config_test.go | 1 + .../testdata/config-empty-effective.toml | 1 + .../chainlink/testdata/config-full.toml | 1 + .../config-multi-chain-effective.toml | 1 + core/services/cre/cre.go | 1 + core/services/cre/cre_test.go | 1 + core/services/workflows/syncer/v2/handler.go | 4 -- core/services/workflows/syncer/v2/metrics.go | 13 +++++ core/services/workflows/syncer/v2/retries.go | 40 +++++++++++++ .../workflows/syncer/v2/retries_test.go | 47 +++++++++++++++ .../workflows/syncer/v2/workflow_registry.go | 57 +++++++++++++++++-- .../syncer/v2/workflow_registry_test.go | 43 ++++++++++++++ .../testdata/config-empty-effective.toml | 1 + core/web/resolver/testdata/config-full.toml | 1 + .../config-multi-chain-effective.toml | 1 + docs/CONFIG.md | 2 + .../scripts/config/merge_raw_configs.txtar | 1 + testdata/scripts/node/validate/default.txtar | 1 + .../node/validate/defaults-override.txtar | 1 + .../disk-based-logging-disabled.txtar | 1 + .../validate/disk-based-logging-no-dir.txtar | 1 + .../node/validate/disk-based-logging.txtar | 1 + .../node/validate/fallback-override.txtar | 1 + .../node/validate/invalid-ocr-p2p.txtar | 1 + testdata/scripts/node/validate/invalid.txtar | 1 + testdata/scripts/node/validate/valid.txtar | 1 + testdata/scripts/node/validate/warnings.txtar | 1 + 31 files changed, 229 insertions(+), 9 deletions(-) create mode 100644 core/services/workflows/syncer/v2/retries_test.go diff --git a/core/config/capabilities_config.go b/core/config/capabilities_config.go index 0e1b96f5d87..28484207079 100644 --- a/core/config/capabilities_config.go +++ b/core/config/capabilities_config.go @@ -35,6 +35,7 @@ type CapabilitiesWorkflowRegistry interface { RelayID() types.RelayID SyncStrategy() string MaxConcurrency() int + MaxActivationRetries() int WorkflowStorage() WorkflowStorage ModuleCache() ModuleCache AdditionalSources() []AdditionalWorkflowSource diff --git a/core/config/docs/core.toml b/core/config/docs/core.toml index f37e63fd41a..3b8bbb472cb 100644 --- a/core/config/docs/core.toml +++ b/core/config/docs/core.toml @@ -523,6 +523,8 @@ MaxConfigSize = '50.00kb' # Default SyncStrategy = 'event' # Default # MaxConcurrency controls the maximum number of concurrent event handlers in the workflow registry syncer. MaxConcurrency = 12 # Default +# MaxActivationRetries is the number of failed load/initialization attempts before the syncer stops retrying an active workflow. 0 disables the limit. +MaxActivationRetries = 10 # Default [Capabilities.WorkflowRegistry.WorkflowStorage] # URL is the location for the workflow storage service to be communicated with. diff --git a/core/config/toml/types.go b/core/config/toml/types.go index adc5c681064..d6dee270e04 100644 --- a/core/config/toml/types.go +++ b/core/config/toml/types.go @@ -2390,6 +2390,7 @@ type WorkflowRegistry struct { MaxConfigSize *utils.FileSize SyncStrategy *string MaxConcurrency *int + MaxActivationRetries *int WorkflowStorage WorkflowStorage ModuleCache ModuleCache AdditionalSourcesConfig []AdditionalWorkflowSource `toml:"AdditionalSources"` @@ -2432,6 +2433,10 @@ func (r *WorkflowRegistry) setFrom(f *WorkflowRegistry) { r.MaxConcurrency = f.MaxConcurrency } + if f.MaxActivationRetries != nil { + r.MaxActivationRetries = f.MaxActivationRetries + } + r.WorkflowStorage.setFrom(&f.WorkflowStorage) r.ModuleCache.setFrom(&f.ModuleCache) diff --git a/core/services/chainlink/config_capabilities.go b/core/services/chainlink/config_capabilities.go index 1be43a4a31a..fc97681998d 100644 --- a/core/services/chainlink/config_capabilities.go +++ b/core/services/chainlink/config_capabilities.go @@ -234,6 +234,10 @@ func (c *capabilitiesWorkflowRegistry) MaxConcurrency() int { return *c.c.MaxConcurrency } +func (c *capabilitiesWorkflowRegistry) MaxActivationRetries() int { + return *c.c.MaxActivationRetries +} + func (c *capabilitiesWorkflowRegistry) WorkflowStorage() config.WorkflowStorage { return &workflowStorage{ c: c.c.WorkflowStorage, diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go index 127261bc388..5634eee05de 100644 --- a/core/services/chainlink/config_test.go +++ b/core/services/chainlink/config_test.go @@ -461,6 +461,7 @@ func TestConfig_Marshal(t *testing.T) { MaxConfigSize: ptr(utils.FileSize(50 * utils.KB)), SyncStrategy: ptr("event"), MaxConcurrency: ptr(12), + MaxActivationRetries: ptr(10), WorkflowStorage: toml.WorkflowStorage{ ArtifactStorageHost: ptr(""), URL: ptr(""), diff --git a/core/services/chainlink/testdata/config-empty-effective.toml b/core/services/chainlink/testdata/config-empty-effective.toml index f6eb5121505..c5e221a63d6 100644 --- a/core/services/chainlink/testdata/config-empty-effective.toml +++ b/core/services/chainlink/testdata/config-empty-effective.toml @@ -313,6 +313,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 +MaxActivationRetries = 10 # Default AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/core/services/chainlink/testdata/config-full.toml b/core/services/chainlink/testdata/config-full.toml index 782db0949d7..9425ee03b65 100644 --- a/core/services/chainlink/testdata/config-full.toml +++ b/core/services/chainlink/testdata/config-full.toml @@ -323,6 +323,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 +MaxActivationRetries = 10 # Default [Capabilities.WorkflowRegistry.WorkflowStorage] ArtifactStorageHost = '' diff --git a/core/services/chainlink/testdata/config-multi-chain-effective.toml b/core/services/chainlink/testdata/config-multi-chain-effective.toml index 86121aaa1b8..6f64485a72f 100644 --- a/core/services/chainlink/testdata/config-multi-chain-effective.toml +++ b/core/services/chainlink/testdata/config-multi-chain-effective.toml @@ -313,6 +313,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 +MaxActivationRetries = 10 # Default AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/core/services/cre/cre.go b/core/services/cre/cre.go index bf3d74cc9cb..41e9d928d97 100644 --- a/core/services/cre/cre.go +++ b/core/services/cre/cre.go @@ -1099,6 +1099,7 @@ func newWorkflowRegistrySyncerV2( syncerV2.WithAdditionalSources(addSourceConfigs), syncerV2.WithShardOrchestratorClient(shardOrchestratorClient), syncerV2.WithMaxConcurrency(wfReg.MaxConcurrency()), + syncerV2.WithMaxActivationRetries(wfReg.MaxActivationRetries()), } if cfg.Sharding().ShardingEnabled() { registryOpts = append(registryOpts, diff --git a/core/services/cre/cre_test.go b/core/services/cre/cre_test.go index 6bb54396896..3e4bdd3a182 100644 --- a/core/services/cre/cre_test.go +++ b/core/services/cre/cre_test.go @@ -50,6 +50,7 @@ func (w wfRegTestStub) MaxConfigSize() utils.FileSize { return 0 } func (w wfRegTestStub) RelayID() commontypes.RelayID { return commontypes.RelayID{} } func (w wfRegTestStub) SyncStrategy() string { return "" } func (w wfRegTestStub) MaxConcurrency() int { return 0 } +func (w wfRegTestStub) MaxActivationRetries() int { return 0 } func (w wfRegTestStub) WorkflowStorage() config.WorkflowStorage { return wfRegStorageStub{} } func (w wfRegTestStub) ModuleCache() config.ModuleCache { return wfRegModuleCacheStub{} } func (w wfRegTestStub) AdditionalSources() []config.AdditionalWorkflowSource { diff --git a/core/services/workflows/syncer/v2/handler.go b/core/services/workflows/syncer/v2/handler.go index 8d38502915b..c073e60b811 100644 --- a/core/services/workflows/syncer/v2/handler.go +++ b/core/services/workflows/syncer/v2/handler.go @@ -990,10 +990,6 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp case initErr := <-initDone: if initErr != nil { // Engine initialization failed (e.g., trigger subscription failed) - // TODO (cre-1482) add logic to mark a deployment as failed to avoid churn. - // Currently, failed deployments will be retried on each poll cycle (with exponential backoff). - // If the failure is due to user error (e.g., invalid trigger config), this causes unnecessary retries. - // Consider marking the workflow spec as "failed" in the database and requiring workflow redeployment. if closeErr := engine.Close(); closeErr != nil { h.lggr.Errorw("failed to close engine after initialization failure", "error", closeErr, "workflowID", spec.WorkflowID) } diff --git a/core/services/workflows/syncer/v2/metrics.go b/core/services/workflows/syncer/v2/metrics.go index 441164dff27..988e8bf0cdd 100644 --- a/core/services/workflows/syncer/v2/metrics.go +++ b/core/services/workflows/syncer/v2/metrics.go @@ -32,6 +32,7 @@ type metrics struct { reconcileEventsDispatched metric.Int64Histogram // events dispatched per source per tick reconcileDuration metric.Int64Histogram // wall-clock ms for parallel event processing reconcileEventsBackoff metric.Int64Counter // events skipped due to backoff + activationDropped metric.Int64Counter // activations dropped after max load/init retries // On-disk WASM cache write (sync); duration tails indicate IO contention vs typical skew. moduleStoreDuration metric.Int64Histogram @@ -106,6 +107,12 @@ func (m *metrics) recordReconcileBackoff(ctx context.Context, source string, cou )) } +func (m *metrics) incrementActivationDropped(ctx context.Context, source string) { + m.activationDropped.Add(ctx, 1, metric.WithAttributes( + attribute.String("source", source), + )) +} + func (m *metrics) recordModuleStore(ctx context.Context, d time.Duration, success bool) { if m == nil { return @@ -313,6 +320,11 @@ func newMetrics() (*metrics, error) { return nil, err } + activationDropped, err := beholder.GetMeter().Int64Counter("platform_workflow_registry_syncer_activation_dropped_total") + if err != nil { + return nil, err + } + moduleStoreDuration, err := beholder.GetMeter().Int64Histogram("platform_workflow_registry_syncer_module_store_duration_ms") if err != nil { return nil, err @@ -335,6 +347,7 @@ func newMetrics() (*metrics, error) { reconcileEventsDispatched: reconcileEventsDispatched, reconcileDuration: reconcileDuration, reconcileEventsBackoff: reconcileEventsBackoff, + activationDropped: activationDropped, moduleStoreDuration: moduleStoreDuration, }, nil } diff --git a/core/services/workflows/syncer/v2/retries.go b/core/services/workflows/syncer/v2/retries.go index 8c91144afe4..05b21fafdad 100644 --- a/core/services/workflows/syncer/v2/retries.go +++ b/core/services/workflows/syncer/v2/retries.go @@ -2,6 +2,7 @@ package v2 import ( "math" + "sync" "time" "github.com/jonboulle/clockwork" @@ -22,6 +23,45 @@ func (r *reconciliationEvent) updateNextRetryFor(clock clockwork.Clock, retryInt r.nextRetryAt = clock.Now().Add(time.Duration(nextRetry)) } +func activationRetriesExhausted(retryCount, maxActivationRetries int) bool { + return maxActivationRetries > 0 && retryCount >= maxActivationRetries +} + +type droppedActivations struct { + mu sync.RWMutex + bySource map[string]map[string]string // source -> workflowID -> signature +} + +func newDroppedActivations() *droppedActivations { + return &droppedActivations{ + bySource: make(map[string]map[string]string), + } +} + +func (d *droppedActivations) isDropped(source, workflowID, signature string) bool { + d.mu.RLock() + defer d.mu.RUnlock() + sig, ok := d.bySource[source][workflowID] + return ok && sig == signature +} + +func (d *droppedActivations) drop(source, workflowID, signature string) { + d.mu.Lock() + defer d.mu.Unlock() + if d.bySource[source] == nil { + d.bySource[source] = make(map[string]string) + } + d.bySource[source][workflowID] = signature +} + +func (d *droppedActivations) clear(source, workflowID string) { + d.mu.Lock() + defer d.mu.Unlock() + if m := d.bySource[source]; m != nil { + delete(m, workflowID) + } +} + type reconcileReport struct { // events is a map of event type to the number of events of that type NumEventsByType map[string]int diff --git a/core/services/workflows/syncer/v2/retries_test.go b/core/services/workflows/syncer/v2/retries_test.go new file mode 100644 index 00000000000..645a9cf1f3f --- /dev/null +++ b/core/services/workflows/syncer/v2/retries_test.go @@ -0,0 +1,47 @@ +package v2 + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_activationRetriesExhausted(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + retryCount int + maxActivationRetries int + want bool + }{ + {name: "disabled", retryCount: 100, maxActivationRetries: 0, want: false}, + {name: "below limit", retryCount: 2, maxActivationRetries: 3, want: false}, + {name: "at limit", retryCount: 3, maxActivationRetries: 3, want: true}, + {name: "above limit", retryCount: 5, maxActivationRetries: 3, want: true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + require.Equal(t, tt.want, activationRetriesExhausted(tt.retryCount, tt.maxActivationRetries)) + }) + } +} + +func Test_droppedActivations(t *testing.T) { + t.Parallel() + + dropped := newDroppedActivations() + const source = "ContractWorkflowSource" + const workflowID = "abc" + const signature = "WorkflowActivated-abc-1" + + require.False(t, dropped.isDropped(source, workflowID, signature)) + dropped.drop(source, workflowID, signature) + require.True(t, dropped.isDropped(source, workflowID, signature)) + require.False(t, dropped.isDropped(source, workflowID, "other-signature")) + + dropped.clear(source, workflowID) + require.False(t, dropped.isDropped(source, workflowID, signature)) +} diff --git a/core/services/workflows/syncer/v2/workflow_registry.go b/core/services/workflows/syncer/v2/workflow_registry.go index 3e89653a350..94d07b42d70 100644 --- a/core/services/workflows/syncer/v2/workflow_registry.go +++ b/core/services/workflows/syncer/v2/workflow_registry.go @@ -36,6 +36,7 @@ var ( defaultRetryInterval = 12 * time.Second defaultMaxRetryInterval = 5 * time.Minute defaultMaxConcurrency = 12 + defaultMaxActivationRetries = 10 WorkflowRegistryContractName = "WorkflowRegistry" GetWorkflowsByDONMethodName = "getWorkflowListByDON" @@ -102,11 +103,13 @@ type workflowRegistry struct { engineRegistry *EngineRegistry - retryInterval time.Duration - maxRetryInterval time.Duration - maxConcurrency int - clock clockwork.Clock - syncTickInterval time.Duration + retryInterval time.Duration + maxRetryInterval time.Duration + maxActivationRetries int + maxConcurrency int + clock clockwork.Clock + droppedActivations *droppedActivations + syncTickInterval time.Duration hooks Hooks @@ -170,6 +173,14 @@ func WithMaxConcurrency(maxConcurrency int) func(*workflowRegistry) { } } +func WithMaxActivationRetries(maxActivationRetries int) func(*workflowRegistry) { + return func(wr *workflowRegistry) { + if maxActivationRetries >= 0 { + wr.maxActivationRetries = maxActivationRetries + } + } +} + // AdditionalSourceConfig holds configuration for an additional workflow source. type AdditionalSourceConfig struct { URL string @@ -322,8 +333,10 @@ func NewWorkflowRegistry( engineRegistry: engineRegistry, retryInterval: defaultRetryInterval, maxRetryInterval: defaultMaxRetryInterval, + maxActivationRetries: defaultMaxActivationRetries, maxConcurrency: defaultMaxConcurrency, clock: clockwork.NewRealClock(), + droppedActivations: newDroppedActivations(), syncTickInterval: defaultTickInterval, hooks: Hooks{ OnStartFailure: func(_ error) {}, @@ -484,6 +497,12 @@ func (w *workflowRegistry) generateReconciliationEvents( case false: signature := fmt.Sprintf("%s-%s-%s", WorkflowActivated, id, toSpecStatus(wfMeta.Status)) + if w.droppedActivations.isDropped(sourceName, id, signature) { + workflowsSeen[id] = true + delete(pendingEvents, id) + continue + } + if _, ok := pendingEvents[id]; ok && pendingEvents[id].signature == signature { events = append(events, pendingEvents[id]) delete(pendingEvents, id) @@ -839,6 +858,19 @@ func (w *workflowRegistry) syncUsingReconciliationStrategy(ctx context.Context) reconcileReport.NumEventsByType[string(event.Name)]++ mu.Unlock() + if activationRetriesExhausted(event.retryCount, w.maxActivationRetries) { + w.droppedActivations.drop(sourceIdentifier, event.id, event.signature) + w.metrics.incrementActivationDropped(ctx, sourceName) + w.lggr.Errorw("dropping workflow activation after max retries", + "retryCount", event.retryCount, + "maxActivationRetries", w.maxActivationRetries, + "type", event.Name, + "id", event.id, + "workflowInfo", event.Info, + ) + continue + } + if event.retryCount > 0 && !w.clock.Now().After(event.nextRetryAt) { backoffCount++ mu.Lock() @@ -866,12 +898,27 @@ func (w *workflowRegistry) syncUsingReconciliationStrategy(ctx context.Context) handleErr := w.handleWithMetrics(ctx, evt.Event) if handleErr != nil { evt.updateNextRetryFor(w.clock, w.retryInterval, w.maxRetryInterval) + if activationRetriesExhausted(evt.retryCount, w.maxActivationRetries) { + w.droppedActivations.drop(sourceIdentifier, evt.id, evt.signature) + w.metrics.incrementActivationDropped(ctx, sourceName) + w.lggr.Errorw("dropping workflow activation after max retries", + "err", handleErr, + "retryCount", evt.retryCount, + "maxActivationRetries", w.maxActivationRetries, + "type", evt.Name, + "id", evt.id, + "workflowInfo", evt.Info, + ) + return + } mu.Lock() pendingEventsBySource[sourceIdentifier][evt.id] = evt reconcileReport.Backoffs[evt.id] = evt.nextRetryAt mu.Unlock() w.lggr.Errorw("failed to handle event, backing off...", "err", handleErr, "type", evt.Name, "nextRetryAt", evt.nextRetryAt, "retryCount", evt.retryCount, "workflowInfo", evt.Info) + return } + w.droppedActivations.clear(sourceIdentifier, evt.id) }(event) } wg.Wait() diff --git a/core/services/workflows/syncer/v2/workflow_registry_test.go b/core/services/workflows/syncer/v2/workflow_registry_test.go index ae3b7126fee..6c2b9e7b207 100644 --- a/core/services/workflows/syncer/v2/workflow_registry_test.go +++ b/core/services/workflows/syncer/v2/workflow_registry_test.go @@ -558,6 +558,49 @@ func Test_generateReconciliationEventsV2(t *testing.T) { require.Equal(t, nextRetryAt, events[0].nextRetryAt) }) + t.Run("dropped activation is not re-enqueued", func(t *testing.T) { + t.Parallel() + lggr := logger.TestLogger(t) + ctx := t.Context() + workflowDonNotifier := capabilities.NewDonNotifier() + er := NewEngineRegistry() + wr, err := NewWorkflowRegistry( + lggr, + func(ctx context.Context, bytes []byte) (types.ContractReader, error) { + return nil, nil + }, + "", + "test-chain-selector", + Config{ + QueryCount: 20, + SyncStrategy: SyncStrategyReconciliation, + }, + &eventHandler{}, + workflowDonNotifier, + er, + ) + require.NoError(t, err) + + wfID := wfTypes.WorkflowID([32]byte{1}) + owner := []byte{} + wfName := "wf name 1" + metadata := []WorkflowMetadataView{ + { + WorkflowID: wfID, + Owner: owner, + Status: WorkflowStatusActive, + WorkflowName: wfName, + }, + } + source := "TestSource" + signature := fmt.Sprintf("%s-%s-%s", WorkflowActivated, wfID.Hex(), toSpecStatus(WorkflowStatusActive)) + wr.droppedActivations.drop(source, wfID.Hex(), signature) + + events, err := wr.generateReconciliationEvents(ctx, map[string]*reconciliationEvent{}, metadata, &types.Head{Height: "123"}, source) + require.NoError(t, err) + require.Empty(t, events) + }) + t.Run("a paused workflow clears a pending activated event", func(t *testing.T) { t.Parallel() lggr := logger.TestLogger(t) diff --git a/core/web/resolver/testdata/config-empty-effective.toml b/core/web/resolver/testdata/config-empty-effective.toml index f6eb5121505..c5e221a63d6 100644 --- a/core/web/resolver/testdata/config-empty-effective.toml +++ b/core/web/resolver/testdata/config-empty-effective.toml @@ -313,6 +313,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 +MaxActivationRetries = 10 # Default AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/core/web/resolver/testdata/config-full.toml b/core/web/resolver/testdata/config-full.toml index dd7fb035052..d90717a4f98 100644 --- a/core/web/resolver/testdata/config-full.toml +++ b/core/web/resolver/testdata/config-full.toml @@ -323,6 +323,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 +MaxActivationRetries = 10 # Default AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/core/web/resolver/testdata/config-multi-chain-effective.toml b/core/web/resolver/testdata/config-multi-chain-effective.toml index 68a0130ae84..d963f890d39 100644 --- a/core/web/resolver/testdata/config-multi-chain-effective.toml +++ b/core/web/resolver/testdata/config-multi-chain-effective.toml @@ -313,6 +313,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 +MaxActivationRetries = 10 # Default AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/docs/CONFIG.md b/docs/CONFIG.md index 28a34a14dbe..0ea73dca8f6 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -1408,6 +1408,7 @@ MaxEncryptedSecretsSize = '26.40kb' # Default MaxConfigSize = '50.00kb' # Default SyncStrategy = 'event' # Default MaxConcurrency = 12 # Default +MaxActivationRetries = 10 # Default ``` @@ -1463,6 +1464,7 @@ Options are: event which watches for contract events or reconciliation which dif ### MaxConcurrency ```toml MaxConcurrency = 12 # Default +MaxActivationRetries = 10 # Default ``` MaxConcurrency controls the maximum number of concurrent event handlers in the workflow registry syncer. diff --git a/testdata/scripts/config/merge_raw_configs.txtar b/testdata/scripts/config/merge_raw_configs.txtar index 06ac519d21d..52cf62a0d86 100644 --- a/testdata/scripts/config/merge_raw_configs.txtar +++ b/testdata/scripts/config/merge_raw_configs.txtar @@ -460,6 +460,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 +MaxActivationRetries = 10 # Default AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/default.txtar b/testdata/scripts/node/validate/default.txtar index 87476ac18f8..8c8e3f1ad4e 100644 --- a/testdata/scripts/node/validate/default.txtar +++ b/testdata/scripts/node/validate/default.txtar @@ -325,6 +325,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 +MaxActivationRetries = 10 # Default AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/defaults-override.txtar b/testdata/scripts/node/validate/defaults-override.txtar index a0bd0f22ca8..b2ffd574441 100644 --- a/testdata/scripts/node/validate/defaults-override.txtar +++ b/testdata/scripts/node/validate/defaults-override.txtar @@ -386,6 +386,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 +MaxActivationRetries = 10 # Default AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar index b514320e81e..62408eb0dfd 100644 --- a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar @@ -369,6 +369,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 +MaxActivationRetries = 10 # Default AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar index 09d630f0410..3a23a7ec44d 100644 --- a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar @@ -369,6 +369,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 +MaxActivationRetries = 10 # Default AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/disk-based-logging.txtar b/testdata/scripts/node/validate/disk-based-logging.txtar index efa55d6c4a3..e0b5e7eb022 100644 --- a/testdata/scripts/node/validate/disk-based-logging.txtar +++ b/testdata/scripts/node/validate/disk-based-logging.txtar @@ -369,6 +369,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 +MaxActivationRetries = 10 # Default AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/fallback-override.txtar b/testdata/scripts/node/validate/fallback-override.txtar index 4f5406cf44d..c0fa4bea016 100644 --- a/testdata/scripts/node/validate/fallback-override.txtar +++ b/testdata/scripts/node/validate/fallback-override.txtar @@ -471,6 +471,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 +MaxActivationRetries = 10 # Default AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar index 63ac3270c61..c2700ea3dc5 100644 --- a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar +++ b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar @@ -354,6 +354,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 +MaxActivationRetries = 10 # Default AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/invalid.txtar b/testdata/scripts/node/validate/invalid.txtar index 2ea583aa574..1c050e2ba98 100644 --- a/testdata/scripts/node/validate/invalid.txtar +++ b/testdata/scripts/node/validate/invalid.txtar @@ -365,6 +365,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 +MaxActivationRetries = 10 # Default AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/valid.txtar b/testdata/scripts/node/validate/valid.txtar index d8a68cdc8a4..cacdaf4310c 100644 --- a/testdata/scripts/node/validate/valid.txtar +++ b/testdata/scripts/node/validate/valid.txtar @@ -366,6 +366,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 +MaxActivationRetries = 10 # Default AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/warnings.txtar b/testdata/scripts/node/validate/warnings.txtar index cc7c437f38a..cad5d6db3a1 100644 --- a/testdata/scripts/node/validate/warnings.txtar +++ b/testdata/scripts/node/validate/warnings.txtar @@ -348,6 +348,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 +MaxActivationRetries = 10 # Default AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] From 6acf762e69126906a0c62af2ef387c37c72f4dfa Mon Sep 17 00:00:00 2001 From: mchain0 Date: Tue, 16 Jun 2026 14:14:00 +0200 Subject: [PATCH 2/8] cre-4929: minor improvement --- core/services/chainlink/config_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go index 5634eee05de..a274e72cf4a 100644 --- a/core/services/chainlink/config_test.go +++ b/core/services/chainlink/config_test.go @@ -461,7 +461,7 @@ func TestConfig_Marshal(t *testing.T) { MaxConfigSize: ptr(utils.FileSize(50 * utils.KB)), SyncStrategy: ptr("event"), MaxConcurrency: ptr(12), - MaxActivationRetries: ptr(10), + MaxActivationRetries: new(10), WorkflowStorage: toml.WorkflowStorage{ ArtifactStorageHost: ptr(""), URL: ptr(""), From cd1b520883a5f08d73ed32323ce4b9df0f3cee58 Mon Sep 17 00:00:00 2001 From: mchain0 Date: Tue, 16 Jun 2026 16:14:34 +0200 Subject: [PATCH 3/8] cre-4929: regen txtar --- .../chainlink/testdata/config-empty-effective.toml | 2 +- core/services/chainlink/testdata/config-full.toml | 2 +- .../chainlink/testdata/config-multi-chain-effective.toml | 2 +- core/web/resolver/testdata/config-empty-effective.toml | 2 +- core/web/resolver/testdata/config-full.toml | 2 +- .../resolver/testdata/config-multi-chain-effective.toml | 2 +- docs/CONFIG.md | 7 ++++++- testdata/scripts/config/merge_raw_configs.txtar | 2 +- testdata/scripts/node/validate/default.txtar | 2 +- testdata/scripts/node/validate/defaults-override.txtar | 2 +- .../node/validate/disk-based-logging-disabled.txtar | 2 +- .../scripts/node/validate/disk-based-logging-no-dir.txtar | 2 +- testdata/scripts/node/validate/disk-based-logging.txtar | 2 +- testdata/scripts/node/validate/fallback-override.txtar | 2 +- testdata/scripts/node/validate/invalid-ocr-p2p.txtar | 2 +- testdata/scripts/node/validate/invalid.txtar | 2 +- testdata/scripts/node/validate/valid.txtar | 2 +- testdata/scripts/node/validate/warnings.txtar | 2 +- 18 files changed, 23 insertions(+), 18 deletions(-) diff --git a/core/services/chainlink/testdata/config-empty-effective.toml b/core/services/chainlink/testdata/config-empty-effective.toml index c5e221a63d6..b5bdb7e37ae 100644 --- a/core/services/chainlink/testdata/config-empty-effective.toml +++ b/core/services/chainlink/testdata/config-empty-effective.toml @@ -313,7 +313,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 # Default +MaxActivationRetries = 10 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/core/services/chainlink/testdata/config-full.toml b/core/services/chainlink/testdata/config-full.toml index 9425ee03b65..284328904e5 100644 --- a/core/services/chainlink/testdata/config-full.toml +++ b/core/services/chainlink/testdata/config-full.toml @@ -323,7 +323,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 # Default +MaxActivationRetries = 10 [Capabilities.WorkflowRegistry.WorkflowStorage] ArtifactStorageHost = '' diff --git a/core/services/chainlink/testdata/config-multi-chain-effective.toml b/core/services/chainlink/testdata/config-multi-chain-effective.toml index 6f64485a72f..319dc0802bf 100644 --- a/core/services/chainlink/testdata/config-multi-chain-effective.toml +++ b/core/services/chainlink/testdata/config-multi-chain-effective.toml @@ -313,7 +313,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 # Default +MaxActivationRetries = 10 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/core/web/resolver/testdata/config-empty-effective.toml b/core/web/resolver/testdata/config-empty-effective.toml index c5e221a63d6..b5bdb7e37ae 100644 --- a/core/web/resolver/testdata/config-empty-effective.toml +++ b/core/web/resolver/testdata/config-empty-effective.toml @@ -313,7 +313,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 # Default +MaxActivationRetries = 10 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/core/web/resolver/testdata/config-full.toml b/core/web/resolver/testdata/config-full.toml index d90717a4f98..e80094b2d8e 100644 --- a/core/web/resolver/testdata/config-full.toml +++ b/core/web/resolver/testdata/config-full.toml @@ -323,7 +323,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 # Default +MaxActivationRetries = 10 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/core/web/resolver/testdata/config-multi-chain-effective.toml b/core/web/resolver/testdata/config-multi-chain-effective.toml index d963f890d39..f69e29c9f55 100644 --- a/core/web/resolver/testdata/config-multi-chain-effective.toml +++ b/core/web/resolver/testdata/config-multi-chain-effective.toml @@ -313,7 +313,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 # Default +MaxActivationRetries = 10 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/docs/CONFIG.md b/docs/CONFIG.md index 0ea73dca8f6..5dc3bdd3cd9 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -1464,10 +1464,15 @@ Options are: event which watches for contract events or reconciliation which dif ### MaxConcurrency ```toml MaxConcurrency = 12 # Default -MaxActivationRetries = 10 # Default ``` MaxConcurrency controls the maximum number of concurrent event handlers in the workflow registry syncer. +### MaxActivationRetries +```toml +MaxActivationRetries = 10 # Default +``` +MaxActivationRetries is the number of failed load/initialization attempts before the syncer stops retrying an active workflow. 0 disables the limit. + ## Capabilities.WorkflowRegistry.WorkflowStorage ```toml [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/config/merge_raw_configs.txtar b/testdata/scripts/config/merge_raw_configs.txtar index 52cf62a0d86..3812a81ebd4 100644 --- a/testdata/scripts/config/merge_raw_configs.txtar +++ b/testdata/scripts/config/merge_raw_configs.txtar @@ -460,7 +460,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 # Default +MaxActivationRetries = 10 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/default.txtar b/testdata/scripts/node/validate/default.txtar index 8c8e3f1ad4e..07a3adda69a 100644 --- a/testdata/scripts/node/validate/default.txtar +++ b/testdata/scripts/node/validate/default.txtar @@ -325,7 +325,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 # Default +MaxActivationRetries = 10 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/defaults-override.txtar b/testdata/scripts/node/validate/defaults-override.txtar index b2ffd574441..085dd7893b1 100644 --- a/testdata/scripts/node/validate/defaults-override.txtar +++ b/testdata/scripts/node/validate/defaults-override.txtar @@ -386,7 +386,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 # Default +MaxActivationRetries = 10 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar index 62408eb0dfd..6ecc75aafa6 100644 --- a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar @@ -369,7 +369,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 # Default +MaxActivationRetries = 10 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar index 3a23a7ec44d..dd7db5e22a4 100644 --- a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar @@ -369,7 +369,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 # Default +MaxActivationRetries = 10 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/disk-based-logging.txtar b/testdata/scripts/node/validate/disk-based-logging.txtar index e0b5e7eb022..d5af737deb7 100644 --- a/testdata/scripts/node/validate/disk-based-logging.txtar +++ b/testdata/scripts/node/validate/disk-based-logging.txtar @@ -369,7 +369,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 # Default +MaxActivationRetries = 10 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/fallback-override.txtar b/testdata/scripts/node/validate/fallback-override.txtar index c0fa4bea016..0bf283bced8 100644 --- a/testdata/scripts/node/validate/fallback-override.txtar +++ b/testdata/scripts/node/validate/fallback-override.txtar @@ -471,7 +471,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 # Default +MaxActivationRetries = 10 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar index c2700ea3dc5..9bccac2c942 100644 --- a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar +++ b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar @@ -354,7 +354,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 # Default +MaxActivationRetries = 10 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/invalid.txtar b/testdata/scripts/node/validate/invalid.txtar index 1c050e2ba98..6c7e67f9f0f 100644 --- a/testdata/scripts/node/validate/invalid.txtar +++ b/testdata/scripts/node/validate/invalid.txtar @@ -365,7 +365,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 # Default +MaxActivationRetries = 10 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/valid.txtar b/testdata/scripts/node/validate/valid.txtar index cacdaf4310c..e1bc4af1b04 100644 --- a/testdata/scripts/node/validate/valid.txtar +++ b/testdata/scripts/node/validate/valid.txtar @@ -366,7 +366,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 # Default +MaxActivationRetries = 10 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/warnings.txtar b/testdata/scripts/node/validate/warnings.txtar index cad5d6db3a1..5d28ab49868 100644 --- a/testdata/scripts/node/validate/warnings.txtar +++ b/testdata/scripts/node/validate/warnings.txtar @@ -348,7 +348,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 # Default +MaxActivationRetries = 10 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] From 8abe150640ab05c30a82f243070c8894b00cd0d4 Mon Sep 17 00:00:00 2001 From: mchain0 Date: Fri, 19 Jun 2026 15:31:28 +0200 Subject: [PATCH 4/8] cre-4929: logic improvement --- core/config/docs/core.toml | 2 +- core/services/chainlink/config_test.go | 2 +- .../testdata/config-empty-effective.toml | 2 +- .../chainlink/testdata/config-full.toml | 2 +- .../config-multi-chain-effective.toml | 2 +- core/services/workflows/events/emit.go | 29 ++++++ core/services/workflows/events/types.go | 5 +- .../workflows/syncer/v2/activation_errors.go | 87 ++++++++++++++++++ .../syncer/v2/activation_errors_test.go | 90 +++++++++++++++++++ core/services/workflows/syncer/v2/handler.go | 57 ++++++++++-- core/services/workflows/syncer/v2/helpers.go | 4 + core/services/workflows/syncer/v2/metrics.go | 17 +++- core/services/workflows/syncer/v2/retries.go | 6 +- .../workflows/syncer/v2/retries_test.go | 17 ++++ .../workflows/syncer/v2/workflow_registry.go | 63 ++++++++----- .../testdata/config-empty-effective.toml | 2 +- core/web/resolver/testdata/config-full.toml | 2 +- .../config-multi-chain-effective.toml | 2 +- docs/CONFIG.md | 4 +- go.mod | 2 + go.sum | 2 - .../scripts/config/merge_raw_configs.txtar | 2 +- testdata/scripts/node/validate/default.txtar | 2 +- .../node/validate/defaults-override.txtar | 2 +- .../disk-based-logging-disabled.txtar | 2 +- .../validate/disk-based-logging-no-dir.txtar | 2 +- .../node/validate/disk-based-logging.txtar | 2 +- .../node/validate/fallback-override.txtar | 2 +- .../node/validate/invalid-ocr-p2p.txtar | 2 +- testdata/scripts/node/validate/invalid.txtar | 2 +- testdata/scripts/node/validate/valid.txtar | 2 +- testdata/scripts/node/validate/warnings.txtar | 2 +- 32 files changed, 365 insertions(+), 56 deletions(-) create mode 100644 core/services/workflows/syncer/v2/activation_errors.go create mode 100644 core/services/workflows/syncer/v2/activation_errors_test.go diff --git a/core/config/docs/core.toml b/core/config/docs/core.toml index 3b8bbb472cb..815267539c9 100644 --- a/core/config/docs/core.toml +++ b/core/config/docs/core.toml @@ -524,7 +524,7 @@ SyncStrategy = 'event' # Default # MaxConcurrency controls the maximum number of concurrent event handlers in the workflow registry syncer. MaxConcurrency = 12 # Default # MaxActivationRetries is the number of failed load/initialization attempts before the syncer stops retrying an active workflow. 0 disables the limit. -MaxActivationRetries = 10 # Default +MaxActivationRetries = 100 # Default [Capabilities.WorkflowRegistry.WorkflowStorage] # URL is the location for the workflow storage service to be communicated with. diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go index a274e72cf4a..d41508b2f6b 100644 --- a/core/services/chainlink/config_test.go +++ b/core/services/chainlink/config_test.go @@ -461,7 +461,7 @@ func TestConfig_Marshal(t *testing.T) { MaxConfigSize: ptr(utils.FileSize(50 * utils.KB)), SyncStrategy: ptr("event"), MaxConcurrency: ptr(12), - MaxActivationRetries: new(10), + MaxActivationRetries: new(100), WorkflowStorage: toml.WorkflowStorage{ ArtifactStorageHost: ptr(""), URL: ptr(""), diff --git a/core/services/chainlink/testdata/config-empty-effective.toml b/core/services/chainlink/testdata/config-empty-effective.toml index b5bdb7e37ae..192c1cb1a68 100644 --- a/core/services/chainlink/testdata/config-empty-effective.toml +++ b/core/services/chainlink/testdata/config-empty-effective.toml @@ -313,7 +313,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 +MaxActivationRetries = 100 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/core/services/chainlink/testdata/config-full.toml b/core/services/chainlink/testdata/config-full.toml index 284328904e5..40a18faa419 100644 --- a/core/services/chainlink/testdata/config-full.toml +++ b/core/services/chainlink/testdata/config-full.toml @@ -323,7 +323,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 +MaxActivationRetries = 100 [Capabilities.WorkflowRegistry.WorkflowStorage] ArtifactStorageHost = '' diff --git a/core/services/chainlink/testdata/config-multi-chain-effective.toml b/core/services/chainlink/testdata/config-multi-chain-effective.toml index 319dc0802bf..9f9b895ed68 100644 --- a/core/services/chainlink/testdata/config-multi-chain-effective.toml +++ b/core/services/chainlink/testdata/config-multi-chain-effective.toml @@ -313,7 +313,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 +MaxActivationRetries = 100 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/core/services/workflows/events/emit.go b/core/services/workflows/events/emit.go index 0c041d68a1f..18ef7b41205 100644 --- a/core/services/workflows/events/emit.go +++ b/core/services/workflows/events/emit.go @@ -101,6 +101,32 @@ func EmitWorkflowStatusChangedEventV2( return multiErr } +func EmitWorkflowActivationAbandonedV2( + ctx context.Context, + labels map[string]string, + binaryURL string, + configURL string, + reason string, + activationErr error, + retryCount int32, +) error { + var errorMessage string + if activationErr != nil { + errorMessage = activationErr.Error() + } + + event := &eventsv2.WorkflowActivationAbandoned{ + CreInfo: buildCREMetadataV2(labels), + Workflow: buildWorkflowV2(labels, binaryURL, configURL), + Timestamp: time.Now().Format(time.RFC3339), + ErrorMessage: errorMessage, + Reason: reason, + RetryCount: retryCount, + } + + return emitProtoMessage(ctx, event) +} + func EmitExecutionStartedEvent( ctx context.Context, labels map[string]string, @@ -504,6 +530,9 @@ func emitProtoMessage(ctx context.Context, msg proto.Message) error { case *eventsv2.WorkflowActivated: schema = SchemaWorkflowActivatedV2 entity = "workflows.v2." + WorkflowActivated + case *eventsv2.WorkflowActivationAbandoned: + schema = SchemaWorkflowActivationAbandonedV2 + entity = "workflows.v2." + WorkflowActivationAbandoned case *eventsv2.WorkflowPaused: schema = SchemaWorkflowPausedV2 entity = "workflows.v2." + WorkflowPaused diff --git a/core/services/workflows/events/types.go b/core/services/workflows/events/types.go index 11459cc24a5..7d966c12366 100644 --- a/core/services/workflows/events/types.go +++ b/core/services/workflows/events/types.go @@ -6,6 +6,8 @@ const ( WorkflowStatusChanged string = "WorkflowStatusChanged" // WorkflowActivated represents a workflow activated event WorkflowActivated string = "WorkflowActivated" + // WorkflowActivationAbandoned represents a terminal activation failure (no further retries) + WorkflowActivationAbandoned string = "WorkflowActivationAbandoned" // WorkflowPaused represents a workflow paused event WorkflowPaused string = "WorkflowPaused" // WorkflowDeleted represents a workflow deleted event @@ -51,7 +53,8 @@ const ( SchemaTriggerStartedV2 string = "/cre-events-trigger-started/v2" SchemaUserLogsV2 string = "/cre-events-user-logs/v2" SchemaUserMetricV2 string = "/cre-events-user-metric/v2" - SchemaWorkflowActivatedV2 string = "/cre-events-workflow-activated/v2" + SchemaWorkflowActivatedV2 string = "/cre-events-workflow-activated/v2" + SchemaWorkflowActivationAbandonedV2 string = "/cre-events-workflow-activation-abandoned/v2" SchemaWorkflowPausedV2 string = "/cre-events-workflow-paused/v2" SchemaWorkflowDeletedV2 string = "/cre-events-workflow-deleted/v2" diff --git a/core/services/workflows/syncer/v2/activation_errors.go b/core/services/workflows/syncer/v2/activation_errors.go new file mode 100644 index 00000000000..939d033b4fa --- /dev/null +++ b/core/services/workflows/syncer/v2/activation_errors.go @@ -0,0 +1,87 @@ +package v2 + +import ( + "errors" + "strings" + + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/types" +) + +// ActivationRetryPolicy controls how failed workflow activations are retried. +type ActivationRetryPolicy int + +const ( + // ActivationRetryable retries with exponential backoff up to MaxActivationRetries + // (~10h at default 100). Applies to all errors except permanent user/config failures. + ActivationRetryable ActivationRetryPolicy = iota + // ActivationNonRetryable drops immediately without further activation retries + // (permanent user/config errors). + ActivationNonRetryable +) + +const ( + activationAbandonReasonNonRetryable = "non_retryable" + activationAbandonReasonRetryLimitExceeded = "retry_limit_exceeded" +) + +type activationPolicyError struct { + err error + policy ActivationRetryPolicy +} + +func (e *activationPolicyError) Error() string { return e.err.Error() } + +func (e *activationPolicyError) Unwrap() error { return e.err } + +func nonRetryable(err error) error { + if err == nil { + return nil + } + return &activationPolicyError{err: err, policy: ActivationNonRetryable} +} + +func classifyActivationError(err error) ActivationRetryPolicy { + var policyErr *activationPolicyError + if errors.As(err, &policyErr) { + return policyErr.policy + } + + if errors.Is(err, types.ErrGlobalWorkflowCountLimitReached) || + errors.Is(err, types.ErrPerOwnerWorkflowCountLimitReached) { + return ActivationNonRetryable + } + + if isPermanentEngineInitError(err) { + return ActivationNonRetryable + } + + return ActivationRetryable +} + +func activationRetryPolicyForEvent(eventName WorkflowRegistryEventName, handleErr error) ActivationRetryPolicy { + if eventName != WorkflowActivated { + return ActivationRetryable + } + return classifyActivationError(handleErr) +} + +// isPermanentEngineInitError identifies engine init failures caused by user/config +// mistakes. Prefer sentinel errors at the source; extend this set as errors are catalogued. +func isPermanentEngineInitError(err error) bool { + for err != nil { + msg := strings.ToLower(err.Error()) + switch { + case strings.Contains(msg, "workflowid mismatch"), + strings.Contains(msg, "invalid workflow id"), + strings.Contains(msg, "invalid workflow name"), + strings.Contains(msg, "failed to decode workflow spec binary"), + strings.Contains(msg, "failed to decode owner"), + strings.Contains(msg, "invalid cron schedule"), + strings.Contains(msg, "cron schedule must specify"), + strings.Contains(msg, "interval exceeded"): + return true + } + err = errors.Unwrap(err) + } + return false +} diff --git a/core/services/workflows/syncer/v2/activation_errors_test.go b/core/services/workflows/syncer/v2/activation_errors_test.go new file mode 100644 index 00000000000..37b256d20a5 --- /dev/null +++ b/core/services/workflows/syncer/v2/activation_errors_test.go @@ -0,0 +1,90 @@ +package v2 + +import ( + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/types" +) + +func Test_classifyActivationError(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + err error + want ActivationRetryPolicy + }{ + { + name: "artifact fetch is retryable", + err: &types.ArtifactFetchError{ArtifactType: "binary", URL: "http://example", Err: errors.New("503")}, + want: ActivationRetryable, + }, + { + name: "wrapped artifact fetch is retryable", + err: fmt.Errorf("create spec: %w", &types.ArtifactFetchError{ArtifactType: "config", URL: "http://example", Err: errors.New("timeout")}), + want: ActivationRetryable, + }, + { + name: "global limit is non-retryable", + err: types.ErrGlobalWorkflowCountLimitReached, + want: ActivationNonRetryable, + }, + { + name: "per owner limit is non-retryable", + err: types.ErrPerOwnerWorkflowCountLimitReached, + want: ActivationNonRetryable, + }, + { + name: "explicit non-retryable wrapper", + err: nonRetryable(errors.New("workflowID mismatch")), + want: ActivationNonRetryable, + }, + { + name: "workflow id mismatch message is non-retryable", + err: fmt.Errorf("engine initialization failed: %w", errors.New("workflowID mismatch: abc != def")), + want: ActivationNonRetryable, + }, + { + name: "invalid cron schedule is non-retryable", + err: fmt.Errorf("engine initialization failed: %w", errors.New("invalid cron schedule 'bad'")), + want: ActivationNonRetryable, + }, + { + name: "interval exceeded is non-retryable", + err: fmt.Errorf("engine initialization failed: %w", errors.New("cron trigger interval exceeded")), + want: ActivationNonRetryable, + }, + { + name: "unknown error is retryable", + err: errors.New("unexpected engine failure"), + want: ActivationRetryable, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + require.Equal(t, tt.want, classifyActivationError(tt.err)) + }) + } +} + +func Test_activationRetryPolicyForEvent(t *testing.T) { + t.Parallel() + + t.Run("non-activation events are retryable", func(t *testing.T) { + t.Parallel() + policy := activationRetryPolicyForEvent(WorkflowDeleted, &types.ArtifactFetchError{}) + require.Equal(t, ActivationRetryable, policy) + }) + + t.Run("activation artifact fetch is retryable", func(t *testing.T) { + t.Parallel() + policy := activationRetryPolicyForEvent(WorkflowActivated, &types.ArtifactFetchError{}) + require.Equal(t, ActivationRetryable, policy) + }) +} diff --git a/core/services/workflows/syncer/v2/handler.go b/core/services/workflows/syncer/v2/handler.go index c073e60b811..93f6160e1ff 100644 --- a/core/services/workflows/syncer/v2/handler.go +++ b/core/services/workflows/syncer/v2/handler.go @@ -930,7 +930,7 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp decodedBinary, err := hex.DecodeString(spec.Workflow) if err != nil { - return fmt.Errorf("failed to decode workflow spec binary: %w", err) + return nonRetryable(fmt.Errorf("failed to decode workflow spec binary: %w", err)) } // Free the hex-encoded binary string as it is not needed beyond this decode spec.Workflow = "" @@ -942,7 +942,7 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp // Workflow ID should match what is generated from the stored artifacts ownerBytes, err := hex.DecodeString(spec.WorkflowOwner) if err != nil { - return fmt.Errorf("failed to decode owner: %w", err) + return nonRetryable(fmt.Errorf("failed to decode owner: %w", err)) } configBytes := []byte(spec.Config) hash, err := pkgworkflows.GenerateWorkflowID(ownerBytes, spec.WorkflowName, decodedBinary, configBytes, secretsURL) @@ -951,16 +951,16 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp } wid, err := types.WorkflowIDFromHex(spec.WorkflowID) if err != nil { - return fmt.Errorf("invalid workflow id: %w", err) + return nonRetryable(fmt.Errorf("invalid workflow id: %w", err)) } if !types.WorkflowID(hash).Equal(wid) { - return fmt.Errorf("workflowID mismatch: %x != %x", hash, wid) + return nonRetryable(fmt.Errorf("workflowID mismatch: %x != %x", hash, wid)) } // Start a new WorkflowEngine instance, and add it to local engine registry workflowName, err := types.NewWorkflowName(spec.WorkflowName) if err != nil { - return fmt.Errorf("invalid workflow name: %w", err) + return nonRetryable(fmt.Errorf("invalid workflow name: %w", err)) } // Create a channel to receive the initialization result. @@ -1114,6 +1114,53 @@ func logCustMsg(ctx context.Context, cma custmsg.MessageEmitter, msg string, log } } +func (h *eventHandler) EmitActivationAbandoned( + ctx context.Context, + event Event, + reason string, + activationErr error, + retryCount int, +) error { + if event.Name != WorkflowActivated || h == nil || h.emitter == nil { + return nil + } + + payload, ok := event.Data.(WorkflowActivatedEvent) + if !ok { + return newHandlerTypeError(event.Data) + } + + wfID := payload.WorkflowID.Hex() + wfOwner := hex.EncodeToString(payload.WorkflowOwner) + orgID, ferr := h.fetchOrganizationID(ctx, wfOwner) + if ferr != nil { + h.lggr.Warnw("Failed to get organization from linking service", "workflowOwner", wfOwner, "error", ferr) + } + + h.emitterMu.RLock() + cma := h.emitter.With( + platform.KeyWorkflowID, wfID, + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, wfOwner, + platform.KeyWorkflowTag, payload.WorkflowTag, + platform.KeyOrganizationID, orgID, + platform.WorkflowRegistryAddress, h.workflowRegistryAddress, + platform.WorkflowRegistryChainSelector, h.workflowRegistryChainSelector, + platform.KeyWorkflowSource, payload.Source, + ) + h.emitterMu.RUnlock() + + return events.EmitWorkflowActivationAbandonedV2( + ctx, + cma.Labels(), + payload.BinaryURL, + payload.ConfigURL, + reason, + customerFacingError(activationErr), + int32(retryCount), + ) +} + func (h *eventHandler) ensureCapRegistryReady(ctx context.Context) error { // Check every 500ms until the capabilities registry is ready. retryInterval := time.Millisecond * time.Duration(500) diff --git a/core/services/workflows/syncer/v2/helpers.go b/core/services/workflows/syncer/v2/helpers.go index eb1f8f29c44..e87fd02f5a3 100644 --- a/core/services/workflows/syncer/v2/helpers.go +++ b/core/services/workflows/syncer/v2/helpers.go @@ -73,6 +73,10 @@ func (m *testEvtHandler) Handle(ctx context.Context, event Event) error { return nil } +func (m *testEvtHandler) EmitActivationAbandoned(context.Context, Event, string, error, int) error { + return nil +} + func (m *testEvtHandler) ClearEvents() { m.mux.Lock() defer m.mux.Unlock() diff --git a/core/services/workflows/syncer/v2/metrics.go b/core/services/workflows/syncer/v2/metrics.go index 988e8bf0cdd..71479d73ffa 100644 --- a/core/services/workflows/syncer/v2/metrics.go +++ b/core/services/workflows/syncer/v2/metrics.go @@ -33,6 +33,7 @@ type metrics struct { reconcileDuration metric.Int64Histogram // wall-clock ms for parallel event processing reconcileEventsBackoff metric.Int64Counter // events skipped due to backoff activationDropped metric.Int64Counter // activations dropped after max load/init retries + activationAbandoned metric.Int64Counter // activations abandoned with reason label // On-disk WASM cache write (sync); duration tails indicate IO contention vs typical skew. moduleStoreDuration metric.Int64Histogram @@ -107,9 +108,17 @@ func (m *metrics) recordReconcileBackoff(ctx context.Context, source string, cou )) } -func (m *metrics) incrementActivationDropped(ctx context.Context, source string) { +func (m *metrics) incrementActivationDropped(ctx context.Context, source, reason string) { m.activationDropped.Add(ctx, 1, metric.WithAttributes( attribute.String("source", source), + attribute.String("reason", reason), + )) +} + +func (m *metrics) incrementActivationAbandoned(ctx context.Context, source, reason string) { + m.activationAbandoned.Add(ctx, 1, metric.WithAttributes( + attribute.String("source", source), + attribute.String("reason", reason), )) } @@ -325,6 +334,11 @@ func newMetrics() (*metrics, error) { return nil, err } + activationAbandoned, err := beholder.GetMeter().Int64Counter("platform_workflow_registry_syncer_activation_abandoned_total") + if err != nil { + return nil, err + } + moduleStoreDuration, err := beholder.GetMeter().Int64Histogram("platform_workflow_registry_syncer_module_store_duration_ms") if err != nil { return nil, err @@ -348,6 +362,7 @@ func newMetrics() (*metrics, error) { reconcileDuration: reconcileDuration, reconcileEventsBackoff: reconcileEventsBackoff, activationDropped: activationDropped, + activationAbandoned: activationAbandoned, moduleStoreDuration: moduleStoreDuration, }, nil } diff --git a/core/services/workflows/syncer/v2/retries.go b/core/services/workflows/syncer/v2/retries.go index 05b21fafdad..be7ae6ffabd 100644 --- a/core/services/workflows/syncer/v2/retries.go +++ b/core/services/workflows/syncer/v2/retries.go @@ -16,8 +16,10 @@ type reconciliationEvent struct { retryCount int } -func (r *reconciliationEvent) updateNextRetryFor(clock clockwork.Clock, retryInterval time.Duration, maxRetryInterval time.Duration) { - r.retryCount++ +func (r *reconciliationEvent) scheduleRetry(clock clockwork.Clock, retryInterval time.Duration, maxRetryInterval time.Duration, countTowardCap bool) { + if countTowardCap { + r.retryCount++ + } nextRetry := math.Pow(2, float64(r.retryCount)) * float64(retryInterval) nextRetry = math.Min(float64(maxRetryInterval), nextRetry) r.nextRetryAt = clock.Now().Add(time.Duration(nextRetry)) diff --git a/core/services/workflows/syncer/v2/retries_test.go b/core/services/workflows/syncer/v2/retries_test.go index 645a9cf1f3f..4b6dd098a5b 100644 --- a/core/services/workflows/syncer/v2/retries_test.go +++ b/core/services/workflows/syncer/v2/retries_test.go @@ -2,7 +2,9 @@ package v2 import ( "testing" + "time" + "github.com/jonboulle/clockwork" "github.com/stretchr/testify/require" ) @@ -29,6 +31,21 @@ func Test_activationRetriesExhausted(t *testing.T) { } } +func Test_scheduleRetry(t *testing.T) { + t.Parallel() + + clock := clockwork.NewFakeClockAt(time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)) + evt := &reconciliationEvent{retryCount: 0} + + evt.scheduleRetry(clock, 12*time.Second, 5*time.Minute, false) + require.Equal(t, 0, evt.retryCount) + require.Equal(t, clock.Now().Add(12*time.Second), evt.nextRetryAt) + + evt.scheduleRetry(clock, 12*time.Second, 5*time.Minute, true) + require.Equal(t, 1, evt.retryCount) + require.Equal(t, clock.Now().Add(24*time.Second), evt.nextRetryAt) +} + func Test_droppedActivations(t *testing.T) { t.Parallel() diff --git a/core/services/workflows/syncer/v2/workflow_registry.go b/core/services/workflows/syncer/v2/workflow_registry.go index 94d07b42d70..7ab0848d68d 100644 --- a/core/services/workflows/syncer/v2/workflow_registry.go +++ b/core/services/workflows/syncer/v2/workflow_registry.go @@ -36,7 +36,7 @@ var ( defaultRetryInterval = 12 * time.Second defaultMaxRetryInterval = 5 * time.Minute defaultMaxConcurrency = 12 - defaultMaxActivationRetries = 10 + defaultMaxActivationRetries = 100 WorkflowRegistryContractName = "WorkflowRegistry" GetWorkflowsByDONMethodName = "getWorkflowListByDON" @@ -135,6 +135,7 @@ type evtHandler interface { Start(context.Context) error Handle(ctx context.Context, event Event) error + EmitActivationAbandoned(ctx context.Context, event Event, reason string, activationErr error, retryCount int) error } type donNotifier interface { @@ -456,6 +457,32 @@ func (w *workflowRegistry) handleWithMetrics(ctx context.Context, event Event) e return err } +// abandonActivation records an in-memory drop for a failed activation. Drop state is cleared on +// node restart, giving the workflow a fresh retry budget. +func (w *workflowRegistry) abandonActivation( + ctx context.Context, + sourceIdentifier, sourceName string, + evt *reconciliationEvent, + reason string, + activationErr error, +) { + w.droppedActivations.drop(sourceIdentifier, evt.id, evt.signature) + w.metrics.incrementActivationDropped(ctx, sourceName, reason) + w.metrics.incrementActivationAbandoned(ctx, sourceName, reason) + w.lggr.Errorw("abandoning workflow activation", + "reason", reason, + "retryCount", evt.retryCount, + "maxActivationRetries", w.maxActivationRetries, + "type", evt.Name, + "id", evt.id, + "workflowInfo", evt.Info, + "err", activationErr, + ) + if err := w.handler.EmitActivationAbandoned(ctx, evt.Event, reason, activationErr, evt.retryCount); err != nil { + w.lggr.Errorw("failed to emit activation abandoned event", "err", err) + } +} + // toLocalHead converts a chainlink-common Head to our local Head struct func toLocalHead(head *types.Head) Head { return Head{ @@ -858,16 +885,8 @@ func (w *workflowRegistry) syncUsingReconciliationStrategy(ctx context.Context) reconcileReport.NumEventsByType[string(event.Name)]++ mu.Unlock() - if activationRetriesExhausted(event.retryCount, w.maxActivationRetries) { - w.droppedActivations.drop(sourceIdentifier, event.id, event.signature) - w.metrics.incrementActivationDropped(ctx, sourceName) - w.lggr.Errorw("dropping workflow activation after max retries", - "retryCount", event.retryCount, - "maxActivationRetries", w.maxActivationRetries, - "type", event.Name, - "id", event.id, - "workflowInfo", event.Info, - ) + if event.Name == WorkflowActivated && activationRetriesExhausted(event.retryCount, w.maxActivationRetries) { + w.abandonActivation(ctx, sourceIdentifier, sourceName, event, activationAbandonReasonRetryLimitExceeded, nil) continue } @@ -897,25 +916,21 @@ func (w *workflowRegistry) syncUsingReconciliationStrategy(ctx context.Context) }() handleErr := w.handleWithMetrics(ctx, evt.Event) if handleErr != nil { - evt.updateNextRetryFor(w.clock, w.retryInterval, w.maxRetryInterval) - if activationRetriesExhausted(evt.retryCount, w.maxActivationRetries) { - w.droppedActivations.drop(sourceIdentifier, evt.id, evt.signature) - w.metrics.incrementActivationDropped(ctx, sourceName) - w.lggr.Errorw("dropping workflow activation after max retries", - "err", handleErr, - "retryCount", evt.retryCount, - "maxActivationRetries", w.maxActivationRetries, - "type", evt.Name, - "id", evt.id, - "workflowInfo", evt.Info, - ) + policy := activationRetryPolicyForEvent(evt.Name, handleErr) + if policy == ActivationNonRetryable && evt.Name == WorkflowActivated { + w.abandonActivation(ctx, sourceIdentifier, sourceName, evt, activationAbandonReasonNonRetryable, handleErr) + return + } + evt.scheduleRetry(w.clock, w.retryInterval, w.maxRetryInterval, true) + if evt.Name == WorkflowActivated && activationRetriesExhausted(evt.retryCount, w.maxActivationRetries) { + w.abandonActivation(ctx, sourceIdentifier, sourceName, evt, activationAbandonReasonRetryLimitExceeded, handleErr) return } mu.Lock() pendingEventsBySource[sourceIdentifier][evt.id] = evt reconcileReport.Backoffs[evt.id] = evt.nextRetryAt mu.Unlock() - w.lggr.Errorw("failed to handle event, backing off...", "err", handleErr, "type", evt.Name, "nextRetryAt", evt.nextRetryAt, "retryCount", evt.retryCount, "workflowInfo", evt.Info) + w.lggr.Errorw("failed to handle event, backing off...", "err", handleErr, "type", evt.Name, "nextRetryAt", evt.nextRetryAt, "retryCount", evt.retryCount, "retryPolicy", policy, "workflowInfo", evt.Info) return } w.droppedActivations.clear(sourceIdentifier, evt.id) diff --git a/core/web/resolver/testdata/config-empty-effective.toml b/core/web/resolver/testdata/config-empty-effective.toml index b5bdb7e37ae..192c1cb1a68 100644 --- a/core/web/resolver/testdata/config-empty-effective.toml +++ b/core/web/resolver/testdata/config-empty-effective.toml @@ -313,7 +313,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 +MaxActivationRetries = 100 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/core/web/resolver/testdata/config-full.toml b/core/web/resolver/testdata/config-full.toml index e80094b2d8e..a2d790918a2 100644 --- a/core/web/resolver/testdata/config-full.toml +++ b/core/web/resolver/testdata/config-full.toml @@ -323,7 +323,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 +MaxActivationRetries = 100 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/core/web/resolver/testdata/config-multi-chain-effective.toml b/core/web/resolver/testdata/config-multi-chain-effective.toml index f69e29c9f55..fc6c90f6d98 100644 --- a/core/web/resolver/testdata/config-multi-chain-effective.toml +++ b/core/web/resolver/testdata/config-multi-chain-effective.toml @@ -313,7 +313,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 +MaxActivationRetries = 100 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/docs/CONFIG.md b/docs/CONFIG.md index 5dc3bdd3cd9..b4be96962a8 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -1408,7 +1408,7 @@ MaxEncryptedSecretsSize = '26.40kb' # Default MaxConfigSize = '50.00kb' # Default SyncStrategy = 'event' # Default MaxConcurrency = 12 # Default -MaxActivationRetries = 10 # Default +MaxActivationRetries = 100 # Default ``` @@ -1469,7 +1469,7 @@ MaxConcurrency controls the maximum number of concurrent event handlers in the w ### MaxActivationRetries ```toml -MaxActivationRetries = 10 # Default +MaxActivationRetries = 100 # Default ``` MaxActivationRetries is the number of failed load/initialization attempts before the syncer stops retrying an active workflow. 0 disables the limit. diff --git a/go.mod b/go.mod index e62d7f28a02..c2019e38fb8 100644 --- a/go.mod +++ b/go.mod @@ -428,6 +428,8 @@ require ( replace github.com/fbsobreira/gotron-sdk => github.com/smartcontractkit/chainlink-tron/relayer/gotron-sdk v0.0.5-0.20260218133534-cbd44da2856b +replace github.com/smartcontractkit/chainlink-protos/workflows/go => ../chainlink-protos/workflows/go + tool github.com/smartcontractkit/chainlink-common/pkg/loop/cmd/loopinstall tool github.com/smartcontractkit/chainlink-common/script/cmd/dependabot diff --git a/go.sum b/go.sum index 490d39e54c4..19131858a6a 100644 --- a/go.sum +++ b/go.sum @@ -1219,8 +1219,6 @@ github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 h1:B7itmjy+C github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0/go.mod h1:h6kqaGajbNRrezm56zhx03p0mVmmA2xxj7E/M4ytLUA= github.com/smartcontractkit/chainlink-protos/svr v1.2.0 h1:7jjgqRgORQS/ikL3z0ZgJy95pzjhR9LuU1TVWg4BZ78= github.com/smartcontractkit/chainlink-protos/svr v1.2.0/go.mod h1:TcOliTQU6r59DwG4lo3U+mFM9WWyBHGuFkkxQpvSujo= -github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb h1:mlN8zK1UzDIBYtKSILQ4gci9MFwo42QFtGV1tWddMyk= -github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc= github.com/smartcontractkit/chainlink-sui v0.0.0-20260527160341-aa3adc0abf67 h1:NNvPOgvf5vbOYVLxLST+5E88iPOAnpmzZGPihEx8DFc= github.com/smartcontractkit/chainlink-sui v0.0.0-20260527160341-aa3adc0abf67/go.mod h1:k1HSbHyPaQWPOj6lXDIAe04EuwbC5ge1nK+cpG2E8hE= github.com/smartcontractkit/chainlink-ton v1.0.5-0.20260608211110-ed43ab034a6f h1:HXbJWGqJX14BmsVlSww6vcuOcIPbsP+vUe3dWtwA740= diff --git a/testdata/scripts/config/merge_raw_configs.txtar b/testdata/scripts/config/merge_raw_configs.txtar index 3812a81ebd4..79c34fe3af2 100644 --- a/testdata/scripts/config/merge_raw_configs.txtar +++ b/testdata/scripts/config/merge_raw_configs.txtar @@ -460,7 +460,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 +MaxActivationRetries = 100 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/default.txtar b/testdata/scripts/node/validate/default.txtar index 07a3adda69a..c06a3dd36a1 100644 --- a/testdata/scripts/node/validate/default.txtar +++ b/testdata/scripts/node/validate/default.txtar @@ -325,7 +325,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 +MaxActivationRetries = 100 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/defaults-override.txtar b/testdata/scripts/node/validate/defaults-override.txtar index 085dd7893b1..1b0f97d9947 100644 --- a/testdata/scripts/node/validate/defaults-override.txtar +++ b/testdata/scripts/node/validate/defaults-override.txtar @@ -386,7 +386,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 +MaxActivationRetries = 100 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar index 6ecc75aafa6..16e068157fe 100644 --- a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar @@ -369,7 +369,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 +MaxActivationRetries = 100 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar index dd7db5e22a4..84984987c68 100644 --- a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar @@ -369,7 +369,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 +MaxActivationRetries = 100 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/disk-based-logging.txtar b/testdata/scripts/node/validate/disk-based-logging.txtar index d5af737deb7..572cf7e2e05 100644 --- a/testdata/scripts/node/validate/disk-based-logging.txtar +++ b/testdata/scripts/node/validate/disk-based-logging.txtar @@ -369,7 +369,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 +MaxActivationRetries = 100 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/fallback-override.txtar b/testdata/scripts/node/validate/fallback-override.txtar index 0bf283bced8..c071691ce00 100644 --- a/testdata/scripts/node/validate/fallback-override.txtar +++ b/testdata/scripts/node/validate/fallback-override.txtar @@ -471,7 +471,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 +MaxActivationRetries = 100 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar index 9bccac2c942..1732ca239ee 100644 --- a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar +++ b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar @@ -354,7 +354,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 +MaxActivationRetries = 100 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/invalid.txtar b/testdata/scripts/node/validate/invalid.txtar index 6c7e67f9f0f..cb734c4911f 100644 --- a/testdata/scripts/node/validate/invalid.txtar +++ b/testdata/scripts/node/validate/invalid.txtar @@ -365,7 +365,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 +MaxActivationRetries = 100 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/valid.txtar b/testdata/scripts/node/validate/valid.txtar index e1bc4af1b04..f1f1aaf2575 100644 --- a/testdata/scripts/node/validate/valid.txtar +++ b/testdata/scripts/node/validate/valid.txtar @@ -366,7 +366,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 +MaxActivationRetries = 100 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] diff --git a/testdata/scripts/node/validate/warnings.txtar b/testdata/scripts/node/validate/warnings.txtar index 5d28ab49868..9ad8fd6361c 100644 --- a/testdata/scripts/node/validate/warnings.txtar +++ b/testdata/scripts/node/validate/warnings.txtar @@ -348,7 +348,7 @@ MaxEncryptedSecretsSize = '26.40kb' MaxConfigSize = '50.00kb' SyncStrategy = 'event' MaxConcurrency = 12 -MaxActivationRetries = 10 +MaxActivationRetries = 100 AdditionalSources = [] [Capabilities.WorkflowRegistry.WorkflowStorage] From c98e43d81376ce41e291b49d7728afef4cb4ae48 Mon Sep 17 00:00:00 2001 From: mchain0 Date: Mon, 29 Jun 2026 15:30:50 +0200 Subject: [PATCH 5/8] cre-4929: improved protos --- core/services/workflows/events/emit.go | 4 ++-- .../workflows/syncer/v2/activation_errors.go | 16 ++++++++++++---- core/services/workflows/syncer/v2/handler.go | 3 ++- core/services/workflows/syncer/v2/helpers.go | 3 ++- .../workflows/syncer/v2/workflow_registry.go | 18 ++++++++++-------- 5 files changed, 28 insertions(+), 16 deletions(-) diff --git a/core/services/workflows/events/emit.go b/core/services/workflows/events/emit.go index 18ef7b41205..a391499faa1 100644 --- a/core/services/workflows/events/emit.go +++ b/core/services/workflows/events/emit.go @@ -106,7 +106,7 @@ func EmitWorkflowActivationAbandonedV2( labels map[string]string, binaryURL string, configURL string, - reason string, + reason eventsv2.ActivationAbandonReason, activationErr error, retryCount int32, ) error { @@ -118,7 +118,7 @@ func EmitWorkflowActivationAbandonedV2( event := &eventsv2.WorkflowActivationAbandoned{ CreInfo: buildCREMetadataV2(labels), Workflow: buildWorkflowV2(labels, binaryURL, configURL), - Timestamp: time.Now().Format(time.RFC3339), + Timestamp: uint64(time.Now().Unix()), ErrorMessage: errorMessage, Reason: reason, RetryCount: retryCount, diff --git a/core/services/workflows/syncer/v2/activation_errors.go b/core/services/workflows/syncer/v2/activation_errors.go index 939d033b4fa..dbb5f106872 100644 --- a/core/services/workflows/syncer/v2/activation_errors.go +++ b/core/services/workflows/syncer/v2/activation_errors.go @@ -4,6 +4,8 @@ import ( "errors" "strings" + eventsv2 "github.com/smartcontractkit/chainlink-protos/workflows/go/v2" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/types" ) @@ -19,10 +21,16 @@ const ( ActivationNonRetryable ) -const ( - activationAbandonReasonNonRetryable = "non_retryable" - activationAbandonReasonRetryLimitExceeded = "retry_limit_exceeded" -) +func activationAbandonReasonMetricLabel(reason eventsv2.ActivationAbandonReason) string { + switch reason { + case eventsv2.ActivationAbandonReason_ACTIVATION_ABANDON_REASON_NON_RETRYABLE: + return "non_retryable" + case eventsv2.ActivationAbandonReason_ACTIVATION_ABANDON_REASON_RETRY_LIMIT_EXCEEDED: + return "retry_limit_exceeded" + default: + return "unknown" + } +} type activationPolicyError struct { err error diff --git a/core/services/workflows/syncer/v2/handler.go b/core/services/workflows/syncer/v2/handler.go index 93f6160e1ff..1dd063de140 100644 --- a/core/services/workflows/syncer/v2/handler.go +++ b/core/services/workflows/syncer/v2/handler.go @@ -28,6 +28,7 @@ import ( generichost "github.com/smartcontractkit/chainlink-common/pkg/workflows/host" "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" + eventsv2 "github.com/smartcontractkit/chainlink-protos/workflows/go/v2" "github.com/smartcontractkit/chainlink-common/keystore/corekeys/workflowkey" "github.com/smartcontractkit/chainlink/v2/core/capabilities" "github.com/smartcontractkit/chainlink/v2/core/platform" @@ -1117,7 +1118,7 @@ func logCustMsg(ctx context.Context, cma custmsg.MessageEmitter, msg string, log func (h *eventHandler) EmitActivationAbandoned( ctx context.Context, event Event, - reason string, + reason eventsv2.ActivationAbandonReason, activationErr error, retryCount int, ) error { diff --git a/core/services/workflows/syncer/v2/helpers.go b/core/services/workflows/syncer/v2/helpers.go index e87fd02f5a3..6ad671e6ea6 100644 --- a/core/services/workflows/syncer/v2/helpers.go +++ b/core/services/workflows/syncer/v2/helpers.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + eventsv2 "github.com/smartcontractkit/chainlink-protos/workflows/go/v2" "github.com/smartcontractkit/chainlink-evm/gethwrappers/workflow/generated/workflow_registry_wrapper_v2" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncerlimiter" @@ -73,7 +74,7 @@ func (m *testEvtHandler) Handle(ctx context.Context, event Event) error { return nil } -func (m *testEvtHandler) EmitActivationAbandoned(context.Context, Event, string, error, int) error { +func (m *testEvtHandler) EmitActivationAbandoned(context.Context, Event, eventsv2.ActivationAbandonReason, error, int) error { return nil } diff --git a/core/services/workflows/syncer/v2/workflow_registry.go b/core/services/workflows/syncer/v2/workflow_registry.go index 7ab0848d68d..ad873c1397d 100644 --- a/core/services/workflows/syncer/v2/workflow_registry.go +++ b/core/services/workflows/syncer/v2/workflow_registry.go @@ -23,6 +23,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" + eventsv2 "github.com/smartcontractkit/chainlink-protos/workflows/go/v2" "github.com/smartcontractkit/chainlink-evm/gethwrappers/workflow/generated/workflow_registry_wrapper_v2" "github.com/smartcontractkit/chainlink-evm/pkg/config" "github.com/smartcontractkit/chainlink/v2/core/services/shardorchestrator" @@ -135,7 +136,7 @@ type evtHandler interface { Start(context.Context) error Handle(ctx context.Context, event Event) error - EmitActivationAbandoned(ctx context.Context, event Event, reason string, activationErr error, retryCount int) error + EmitActivationAbandoned(ctx context.Context, event Event, reason eventsv2.ActivationAbandonReason, activationErr error, retryCount int) error } type donNotifier interface { @@ -463,14 +464,15 @@ func (w *workflowRegistry) abandonActivation( ctx context.Context, sourceIdentifier, sourceName string, evt *reconciliationEvent, - reason string, + reason eventsv2.ActivationAbandonReason, activationErr error, ) { w.droppedActivations.drop(sourceIdentifier, evt.id, evt.signature) - w.metrics.incrementActivationDropped(ctx, sourceName, reason) - w.metrics.incrementActivationAbandoned(ctx, sourceName, reason) + reasonLabel := activationAbandonReasonMetricLabel(reason) + w.metrics.incrementActivationDropped(ctx, sourceName, reasonLabel) + w.metrics.incrementActivationAbandoned(ctx, sourceName, reasonLabel) w.lggr.Errorw("abandoning workflow activation", - "reason", reason, + "reason", reasonLabel, "retryCount", evt.retryCount, "maxActivationRetries", w.maxActivationRetries, "type", evt.Name, @@ -886,7 +888,7 @@ func (w *workflowRegistry) syncUsingReconciliationStrategy(ctx context.Context) mu.Unlock() if event.Name == WorkflowActivated && activationRetriesExhausted(event.retryCount, w.maxActivationRetries) { - w.abandonActivation(ctx, sourceIdentifier, sourceName, event, activationAbandonReasonRetryLimitExceeded, nil) + w.abandonActivation(ctx, sourceIdentifier, sourceName, event, eventsv2.ActivationAbandonReason_ACTIVATION_ABANDON_REASON_RETRY_LIMIT_EXCEEDED, nil) continue } @@ -918,12 +920,12 @@ func (w *workflowRegistry) syncUsingReconciliationStrategy(ctx context.Context) if handleErr != nil { policy := activationRetryPolicyForEvent(evt.Name, handleErr) if policy == ActivationNonRetryable && evt.Name == WorkflowActivated { - w.abandonActivation(ctx, sourceIdentifier, sourceName, evt, activationAbandonReasonNonRetryable, handleErr) + w.abandonActivation(ctx, sourceIdentifier, sourceName, evt, eventsv2.ActivationAbandonReason_ACTIVATION_ABANDON_REASON_NON_RETRYABLE, handleErr) return } evt.scheduleRetry(w.clock, w.retryInterval, w.maxRetryInterval, true) if evt.Name == WorkflowActivated && activationRetriesExhausted(evt.retryCount, w.maxActivationRetries) { - w.abandonActivation(ctx, sourceIdentifier, sourceName, evt, activationAbandonReasonRetryLimitExceeded, handleErr) + w.abandonActivation(ctx, sourceIdentifier, sourceName, evt, eventsv2.ActivationAbandonReason_ACTIVATION_ABANDON_REASON_RETRY_LIMIT_EXCEEDED, handleErr) return } mu.Lock() From 0e5aeae58fdd707fa106636e62f7464444ac273c Mon Sep 17 00:00:00 2001 From: mchain0 Date: Tue, 30 Jun 2026 10:42:53 +0200 Subject: [PATCH 6/8] cre-4929: protos bump --- core/scripts/go.mod | 2 +- core/scripts/go.sum | 4 ++-- deployment/go.mod | 2 +- deployment/go.sum | 4 ++-- go.mod | 4 +--- go.sum | 4 ++-- integration-tests/go.mod | 2 +- integration-tests/go.sum | 4 ++-- integration-tests/load/go.mod | 2 +- integration-tests/load/go.sum | 4 ++-- system-tests/lib/go.mod | 2 +- system-tests/lib/go.sum | 4 ++-- system-tests/tests/go.mod | 2 +- system-tests/tests/go.sum | 4 ++-- 14 files changed, 21 insertions(+), 23 deletions(-) diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 144980ef2e4..3a7f094f24c 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -513,7 +513,7 @@ require ( github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 // indirect github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 // indirect github.com/smartcontractkit/chainlink-protos/svr v1.2.0 // indirect - github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb // indirect + github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260630073003-fb8da7229930 // indirect github.com/smartcontractkit/chainlink-solana/contracts v0.0.0-20260513123719-d347eaf314e1 // indirect github.com/smartcontractkit/chainlink-sui v0.0.0 // indirect github.com/smartcontractkit/chainlink-testing-framework/framework/components/chiprouter v1.0.4 // indirect diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 8850cd9dfb2..da85c393633 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1640,8 +1640,8 @@ github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 h1:B7itmjy+C github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0/go.mod h1:h6kqaGajbNRrezm56zhx03p0mVmmA2xxj7E/M4ytLUA= github.com/smartcontractkit/chainlink-protos/svr v1.2.0 h1:7jjgqRgORQS/ikL3z0ZgJy95pzjhR9LuU1TVWg4BZ78= github.com/smartcontractkit/chainlink-protos/svr v1.2.0/go.mod h1:TcOliTQU6r59DwG4lo3U+mFM9WWyBHGuFkkxQpvSujo= -github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb h1:mlN8zK1UzDIBYtKSILQ4gci9MFwo42QFtGV1tWddMyk= -github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc= +github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260630073003-fb8da7229930 h1:x2nm4nDoC//WGQRPrInDmBH2/lTN1qAI/IGDQ3gAi7A= +github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260630073003-fb8da7229930/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc= github.com/smartcontractkit/chainlink-solana v1.3.1-0.20260605202330-b5a89c32fdc1 h1:e4vdi3czYy+eK2j/eO5r3ceMxxkx4Qq5IsiAeSAQ9uc= github.com/smartcontractkit/chainlink-solana v1.3.1-0.20260605202330-b5a89c32fdc1/go.mod h1:wi1QdXqhSJnADt9YRaRtEWomqknLcrdkTS0JotupuOQ= github.com/smartcontractkit/chainlink-solana/contracts v0.0.0-20260513123719-d347eaf314e1 h1:/xvuNFI7DwOoTQnmAdYPDdY+sConn3RgZ2rMy/8AXlo= diff --git a/deployment/go.mod b/deployment/go.mod index 72edd881998..2b3b85ea497 100644 --- a/deployment/go.mod +++ b/deployment/go.mod @@ -446,7 +446,7 @@ require ( github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 // indirect github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 // indirect github.com/smartcontractkit/chainlink-protos/svr v1.2.0 // indirect - github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb // indirect + github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260630073003-fb8da7229930 // indirect github.com/smartcontractkit/chainlink-testing-framework/parrot v0.6.2 // indirect github.com/smartcontractkit/chainlink-testing-framework/seth v1.51.5 // indirect github.com/smartcontractkit/chainlink-tron/relayer v0.0.11-0.20260408092456-3c6369888d4a // indirect diff --git a/deployment/go.sum b/deployment/go.sum index 6f5c32648c7..83ea61aab9d 100644 --- a/deployment/go.sum +++ b/deployment/go.sum @@ -1443,8 +1443,8 @@ github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 h1:B7itmjy+C github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0/go.mod h1:h6kqaGajbNRrezm56zhx03p0mVmmA2xxj7E/M4ytLUA= github.com/smartcontractkit/chainlink-protos/svr v1.2.0 h1:7jjgqRgORQS/ikL3z0ZgJy95pzjhR9LuU1TVWg4BZ78= github.com/smartcontractkit/chainlink-protos/svr v1.2.0/go.mod h1:TcOliTQU6r59DwG4lo3U+mFM9WWyBHGuFkkxQpvSujo= -github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb h1:mlN8zK1UzDIBYtKSILQ4gci9MFwo42QFtGV1tWddMyk= -github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc= +github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260630073003-fb8da7229930 h1:x2nm4nDoC//WGQRPrInDmBH2/lTN1qAI/IGDQ3gAi7A= +github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260630073003-fb8da7229930/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc= github.com/smartcontractkit/chainlink-solana v1.3.1-0.20260605202330-b5a89c32fdc1 h1:e4vdi3czYy+eK2j/eO5r3ceMxxkx4Qq5IsiAeSAQ9uc= github.com/smartcontractkit/chainlink-solana v1.3.1-0.20260605202330-b5a89c32fdc1/go.mod h1:wi1QdXqhSJnADt9YRaRtEWomqknLcrdkTS0JotupuOQ= github.com/smartcontractkit/chainlink-solana/contracts v0.0.0-20260513123719-d347eaf314e1 h1:/xvuNFI7DwOoTQnmAdYPDdY+sConn3RgZ2rMy/8AXlo= diff --git a/go.mod b/go.mod index 1034bd557a8..539525d8707 100644 --- a/go.mod +++ b/go.mod @@ -104,7 +104,7 @@ require ( github.com/smartcontractkit/chainlink-protos/orchestrator v0.10.1-0.20260528221400-84746b70eeeb github.com/smartcontractkit/chainlink-protos/ring/go v0.0.0-20260331131315-f08a616d8dcd github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 - github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb + github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260630073003-fb8da7229930 github.com/smartcontractkit/chainlink-sui v0.0.0-20260624134342-6bfb9c92859d github.com/smartcontractkit/chainlink-ton v1.0.5-0.20260608211110-ed43ab034a6f github.com/smartcontractkit/cre-sdk-go v1.9.0-capdev.1.0.20260625132924-dcceeb57cf3c @@ -430,8 +430,6 @@ require ( replace github.com/fbsobreira/gotron-sdk => github.com/smartcontractkit/chainlink-tron/relayer/gotron-sdk v0.0.5-0.20260218133534-cbd44da2856b -replace github.com/smartcontractkit/chainlink-protos/workflows/go => ../chainlink-protos/workflows/go - tool github.com/smartcontractkit/chainlink-common/pkg/loop/cmd/loopinstall tool github.com/smartcontractkit/chainlink-common/script/cmd/dependabot diff --git a/go.sum b/go.sum index 4958dbeba10..dd308078b65 100644 --- a/go.sum +++ b/go.sum @@ -1216,8 +1216,8 @@ github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 h1:B7itmjy+C github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0/go.mod h1:h6kqaGajbNRrezm56zhx03p0mVmmA2xxj7E/M4ytLUA= github.com/smartcontractkit/chainlink-protos/svr v1.2.0 h1:7jjgqRgORQS/ikL3z0ZgJy95pzjhR9LuU1TVWg4BZ78= github.com/smartcontractkit/chainlink-protos/svr v1.2.0/go.mod h1:TcOliTQU6r59DwG4lo3U+mFM9WWyBHGuFkkxQpvSujo= -github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb h1:mlN8zK1UzDIBYtKSILQ4gci9MFwo42QFtGV1tWddMyk= -github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc= +github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260630073003-fb8da7229930 h1:x2nm4nDoC//WGQRPrInDmBH2/lTN1qAI/IGDQ3gAi7A= +github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260630073003-fb8da7229930/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc= github.com/smartcontractkit/chainlink-sui v0.0.0-20260624134342-6bfb9c92859d h1:dqRACdfg0eyzTkYpQEJzbnlgaBZLui11sVAPRZb0oqU= github.com/smartcontractkit/chainlink-sui v0.0.0-20260624134342-6bfb9c92859d/go.mod h1:oYGkFuyHUnK/ScIQHsfdQRrKTLrJPlOVUx6FYYcuUOw= github.com/smartcontractkit/chainlink-ton v1.0.5-0.20260608211110-ed43ab034a6f h1:HXbJWGqJX14BmsVlSww6vcuOcIPbsP+vUe3dWtwA740= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 15c11ccdd62..4d1fde0bac2 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -427,7 +427,7 @@ require ( github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 // indirect github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 // indirect github.com/smartcontractkit/chainlink-protos/svr v1.2.0 // indirect - github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb // indirect + github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260630073003-fb8da7229930 // indirect github.com/smartcontractkit/chainlink-solana v1.3.1-0.20260605202330-b5a89c32fdc1 // indirect github.com/smartcontractkit/chainlink-testing-framework/framework v0.16.5 // indirect github.com/smartcontractkit/chainlink-ton v1.0.5-0.20260608211110-ed43ab034a6f // indirect diff --git a/integration-tests/go.sum b/integration-tests/go.sum index 7b05fbda303..8eb16adf56d 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1430,8 +1430,8 @@ github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 h1:B7itmjy+C github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0/go.mod h1:h6kqaGajbNRrezm56zhx03p0mVmmA2xxj7E/M4ytLUA= github.com/smartcontractkit/chainlink-protos/svr v1.2.0 h1:7jjgqRgORQS/ikL3z0ZgJy95pzjhR9LuU1TVWg4BZ78= github.com/smartcontractkit/chainlink-protos/svr v1.2.0/go.mod h1:TcOliTQU6r59DwG4lo3U+mFM9WWyBHGuFkkxQpvSujo= -github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb h1:mlN8zK1UzDIBYtKSILQ4gci9MFwo42QFtGV1tWddMyk= -github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc= +github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260630073003-fb8da7229930 h1:x2nm4nDoC//WGQRPrInDmBH2/lTN1qAI/IGDQ3gAi7A= +github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260630073003-fb8da7229930/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc= github.com/smartcontractkit/chainlink-solana v1.3.1-0.20260605202330-b5a89c32fdc1 h1:e4vdi3czYy+eK2j/eO5r3ceMxxkx4Qq5IsiAeSAQ9uc= github.com/smartcontractkit/chainlink-solana v1.3.1-0.20260605202330-b5a89c32fdc1/go.mod h1:wi1QdXqhSJnADt9YRaRtEWomqknLcrdkTS0JotupuOQ= github.com/smartcontractkit/chainlink-sui v0.0.0-20260624134342-6bfb9c92859d h1:dqRACdfg0eyzTkYpQEJzbnlgaBZLui11sVAPRZb0oqU= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index 4aee7caa7cc..cec770b402b 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -508,7 +508,7 @@ require ( github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 // indirect github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 // indirect github.com/smartcontractkit/chainlink-protos/svr v1.2.0 // indirect - github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb // indirect + github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260630073003-fb8da7229930 // indirect github.com/smartcontractkit/chainlink-solana v1.3.1-0.20260605202330-b5a89c32fdc1 // indirect github.com/smartcontractkit/chainlink-sui v0.0.0 // indirect github.com/smartcontractkit/chainlink-sui/deployment v0.0.0-20260624134342-6bfb9c92859d // indirect diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index 4b301f9d117..26c04f15b0a 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -1692,8 +1692,8 @@ github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 h1:B7itmjy+C github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0/go.mod h1:h6kqaGajbNRrezm56zhx03p0mVmmA2xxj7E/M4ytLUA= github.com/smartcontractkit/chainlink-protos/svr v1.2.0 h1:7jjgqRgORQS/ikL3z0ZgJy95pzjhR9LuU1TVWg4BZ78= github.com/smartcontractkit/chainlink-protos/svr v1.2.0/go.mod h1:TcOliTQU6r59DwG4lo3U+mFM9WWyBHGuFkkxQpvSujo= -github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb h1:mlN8zK1UzDIBYtKSILQ4gci9MFwo42QFtGV1tWddMyk= -github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc= +github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260630073003-fb8da7229930 h1:x2nm4nDoC//WGQRPrInDmBH2/lTN1qAI/IGDQ3gAi7A= +github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260630073003-fb8da7229930/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc= github.com/smartcontractkit/chainlink-solana v1.3.1-0.20260605202330-b5a89c32fdc1 h1:e4vdi3czYy+eK2j/eO5r3ceMxxkx4Qq5IsiAeSAQ9uc= github.com/smartcontractkit/chainlink-solana v1.3.1-0.20260605202330-b5a89c32fdc1/go.mod h1:wi1QdXqhSJnADt9YRaRtEWomqknLcrdkTS0JotupuOQ= github.com/smartcontractkit/chainlink-sui v0.0.0-20260624134342-6bfb9c92859d h1:dqRACdfg0eyzTkYpQEJzbnlgaBZLui11sVAPRZb0oqU= diff --git a/system-tests/lib/go.mod b/system-tests/lib/go.mod index d4645e0afc7..b3c0d3ac1ed 100644 --- a/system-tests/lib/go.mod +++ b/system-tests/lib/go.mod @@ -45,7 +45,7 @@ require ( github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260623200841-e0322b819f62 github.com/smartcontractkit/chainlink-protos/job-distributor v0.19.0 github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20260512230622-65f10f4cd305 - github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb + github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260630073003-fb8da7229930 github.com/smartcontractkit/chainlink-testing-framework/framework v0.16.5 github.com/smartcontractkit/chainlink-testing-framework/framework/components/chiprouter v1.0.4 github.com/smartcontractkit/chainlink-testing-framework/framework/components/dockercompose v0.1.23 diff --git a/system-tests/lib/go.sum b/system-tests/lib/go.sum index 40734717b2e..7e0fca27150 100644 --- a/system-tests/lib/go.sum +++ b/system-tests/lib/go.sum @@ -1605,8 +1605,8 @@ github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 h1:B7itmjy+C github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0/go.mod h1:h6kqaGajbNRrezm56zhx03p0mVmmA2xxj7E/M4ytLUA= github.com/smartcontractkit/chainlink-protos/svr v1.2.0 h1:7jjgqRgORQS/ikL3z0ZgJy95pzjhR9LuU1TVWg4BZ78= github.com/smartcontractkit/chainlink-protos/svr v1.2.0/go.mod h1:TcOliTQU6r59DwG4lo3U+mFM9WWyBHGuFkkxQpvSujo= -github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb h1:mlN8zK1UzDIBYtKSILQ4gci9MFwo42QFtGV1tWddMyk= -github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc= +github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260630073003-fb8da7229930 h1:x2nm4nDoC//WGQRPrInDmBH2/lTN1qAI/IGDQ3gAi7A= +github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260630073003-fb8da7229930/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc= github.com/smartcontractkit/chainlink-solana v1.3.1-0.20260605202330-b5a89c32fdc1 h1:e4vdi3czYy+eK2j/eO5r3ceMxxkx4Qq5IsiAeSAQ9uc= github.com/smartcontractkit/chainlink-solana v1.3.1-0.20260605202330-b5a89c32fdc1/go.mod h1:wi1QdXqhSJnADt9YRaRtEWomqknLcrdkTS0JotupuOQ= github.com/smartcontractkit/chainlink-solana/contracts v0.0.0-20260513123719-d347eaf314e1 h1:/xvuNFI7DwOoTQnmAdYPDdY+sConn3RgZ2rMy/8AXlo= diff --git a/system-tests/tests/go.mod b/system-tests/tests/go.mod index 018e403a740..54d0050850f 100644 --- a/system-tests/tests/go.mod +++ b/system-tests/tests/go.mod @@ -69,7 +69,7 @@ require ( github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20260521215851-3fdbb363496f github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260623200841-e0322b819f62 github.com/smartcontractkit/chainlink-protos/ring/go v0.0.0-20260331131315-f08a616d8dcd - github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb + github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260630073003-fb8da7229930 github.com/smartcontractkit/chainlink-testing-framework/framework v0.16.5 github.com/smartcontractkit/chainlink-testing-framework/framework/components/chiprouter v1.0.4 github.com/smartcontractkit/chainlink-testing-framework/framework/components/fake v0.15.0 diff --git a/system-tests/tests/go.sum b/system-tests/tests/go.sum index 5570c3d829a..807026f4a98 100644 --- a/system-tests/tests/go.sum +++ b/system-tests/tests/go.sum @@ -1619,8 +1619,8 @@ github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 h1:B7itmjy+C github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0/go.mod h1:h6kqaGajbNRrezm56zhx03p0mVmmA2xxj7E/M4ytLUA= github.com/smartcontractkit/chainlink-protos/svr v1.2.0 h1:7jjgqRgORQS/ikL3z0ZgJy95pzjhR9LuU1TVWg4BZ78= github.com/smartcontractkit/chainlink-protos/svr v1.2.0/go.mod h1:TcOliTQU6r59DwG4lo3U+mFM9WWyBHGuFkkxQpvSujo= -github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb h1:mlN8zK1UzDIBYtKSILQ4gci9MFwo42QFtGV1tWddMyk= -github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528221400-84746b70eeeb/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc= +github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260630073003-fb8da7229930 h1:x2nm4nDoC//WGQRPrInDmBH2/lTN1qAI/IGDQ3gAi7A= +github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260630073003-fb8da7229930/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc= github.com/smartcontractkit/chainlink-solana v1.3.1-0.20260605202330-b5a89c32fdc1 h1:e4vdi3czYy+eK2j/eO5r3ceMxxkx4Qq5IsiAeSAQ9uc= github.com/smartcontractkit/chainlink-solana v1.3.1-0.20260605202330-b5a89c32fdc1/go.mod h1:wi1QdXqhSJnADt9YRaRtEWomqknLcrdkTS0JotupuOQ= github.com/smartcontractkit/chainlink-solana/contracts v0.0.0-20260513123719-d347eaf314e1 h1:/xvuNFI7DwOoTQnmAdYPDdY+sConn3RgZ2rMy/8AXlo= From ca1408b88f4a9eafe45c27fa943028ebf34f6c47 Mon Sep 17 00:00:00 2001 From: mchain0 Date: Tue, 30 Jun 2026 10:54:35 +0200 Subject: [PATCH 7/8] cre-4929: minor improvement --- core/services/workflows/events/types.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/services/workflows/events/types.go b/core/services/workflows/events/types.go index 90b158b0c7b..bdb383a4191 100644 --- a/core/services/workflows/events/types.go +++ b/core/services/workflows/events/types.go @@ -53,8 +53,9 @@ const ( SchemaTriggerStartedV2 string = "/cre-events-trigger-started/v2" SchemaUserLogsV2 string = "/cre-events-user-logs/v2" SchemaUserMetricV2 string = "/cre-events-user-metric/v2" - SchemaWorkflowActivatedV2 string = "/cre-events-workflow-activated/v2" - SchemaWorkflowPausedV2 string = "/cre-events-workflow-paused/v2" + SchemaWorkflowActivatedV2 string = "/cre-events-workflow-activated/v2" + SchemaWorkflowActivationAbandonedV2 string = "/cre-events-workflow-activation-abandoned/v2" + SchemaWorkflowPausedV2 string = "/cre-events-workflow-paused/v2" SchemaWorkflowDeletedV2 string = "/cre-events-workflow-deleted/v2" MeteringReportSchema string = "/workflows/v1/metering.proto" From dfafc90e15106df54f58c41cba5a463197670950 Mon Sep 17 00:00:00 2001 From: mchain0 Date: Tue, 30 Jun 2026 14:19:58 +0200 Subject: [PATCH 8/8] cre-4929: minor improvements --- core/services/workflows/events/emit.go | 2 +- core/services/workflows/events/types.go | 18 +++++++++--------- .../workflows/syncer/v2/activation_errors.go | 11 +++++++++++ core/services/workflows/syncer/v2/handler.go | 10 ++++------ core/services/workflows/syncer/v2/helpers.go | 4 ++-- .../workflows/syncer/v2/workflow_registry.go | 6 +++--- 6 files changed, 30 insertions(+), 21 deletions(-) diff --git a/core/services/workflows/events/emit.go b/core/services/workflows/events/emit.go index a391499faa1..1c57d67ae20 100644 --- a/core/services/workflows/events/emit.go +++ b/core/services/workflows/events/emit.go @@ -118,7 +118,7 @@ func EmitWorkflowActivationAbandonedV2( event := &eventsv2.WorkflowActivationAbandoned{ CreInfo: buildCREMetadataV2(labels), Workflow: buildWorkflowV2(labels, binaryURL, configURL), - Timestamp: uint64(time.Now().Unix()), + Timestamp: uint64(time.Now().Unix()), //nolint:gosec // G115: unix timestamp is non-negative ErrorMessage: errorMessage, Reason: reason, RetryCount: retryCount, diff --git a/core/services/workflows/events/types.go b/core/services/workflows/events/types.go index bdb383a4191..7082ac03c40 100644 --- a/core/services/workflows/events/types.go +++ b/core/services/workflows/events/types.go @@ -45,18 +45,18 @@ const ( SchemaUserLogs string = "/cre-events-user-logs/v1" // V2 schema constants - SchemaWorkflowStartedV2 string = "/cre-events-workflow-started/v2" - SchemaWorkflowFinishedV2 string = "/cre-events-workflow-finished/v2" - SchemaWorkflowExecutionProfileV2 string = "/cre-events-workflow-execution-profile/v2" - SchemaCapabilityStartedV2 string = "/cre-events-capability-started/v2" - SchemaCapabilityFinishedV2 string = "/cre-events-capability-finished/v2" - SchemaTriggerStartedV2 string = "/cre-events-trigger-started/v2" - SchemaUserLogsV2 string = "/cre-events-user-logs/v2" - SchemaUserMetricV2 string = "/cre-events-user-metric/v2" + SchemaWorkflowStartedV2 string = "/cre-events-workflow-started/v2" + SchemaWorkflowFinishedV2 string = "/cre-events-workflow-finished/v2" + SchemaWorkflowExecutionProfileV2 string = "/cre-events-workflow-execution-profile/v2" + SchemaCapabilityStartedV2 string = "/cre-events-capability-started/v2" + SchemaCapabilityFinishedV2 string = "/cre-events-capability-finished/v2" + SchemaTriggerStartedV2 string = "/cre-events-trigger-started/v2" + SchemaUserLogsV2 string = "/cre-events-user-logs/v2" + SchemaUserMetricV2 string = "/cre-events-user-metric/v2" SchemaWorkflowActivatedV2 string = "/cre-events-workflow-activated/v2" SchemaWorkflowActivationAbandonedV2 string = "/cre-events-workflow-activation-abandoned/v2" SchemaWorkflowPausedV2 string = "/cre-events-workflow-paused/v2" - SchemaWorkflowDeletedV2 string = "/cre-events-workflow-deleted/v2" + SchemaWorkflowDeletedV2 string = "/cre-events-workflow-deleted/v2" MeteringReportSchema string = "/workflows/v1/metering.proto" MeteringReportDomain string = "platform" diff --git a/core/services/workflows/syncer/v2/activation_errors.go b/core/services/workflows/syncer/v2/activation_errors.go index dbb5f106872..daecb0244ea 100644 --- a/core/services/workflows/syncer/v2/activation_errors.go +++ b/core/services/workflows/syncer/v2/activation_errors.go @@ -2,6 +2,7 @@ package v2 import ( "errors" + "math" "strings" eventsv2 "github.com/smartcontractkit/chainlink-protos/workflows/go/v2" @@ -21,6 +22,16 @@ const ( ActivationNonRetryable ) +func activationRetryCountAsInt32(retryCount int) int32 { + if retryCount < 0 { + return 0 + } + if retryCount > math.MaxInt32 { + return math.MaxInt32 + } + return int32(retryCount) +} + func activationAbandonReasonMetricLabel(reason eventsv2.ActivationAbandonReason) string { switch reason { case eventsv2.ActivationAbandonReason_ACTIVATION_ABANDON_REASON_NON_RETRYABLE: diff --git a/core/services/workflows/syncer/v2/handler.go b/core/services/workflows/syncer/v2/handler.go index 852b279d0b0..6d662d6cdc3 100644 --- a/core/services/workflows/syncer/v2/handler.go +++ b/core/services/workflows/syncer/v2/handler.go @@ -15,8 +15,7 @@ import ( "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/noop" - "github.com/smartcontractkit/chainlink/v2/core/capabilities/confidentialrelay" - + "github.com/smartcontractkit/chainlink-common/keystore/corekeys/workflowkey" "github.com/smartcontractkit/chainlink-common/pkg/contexts" "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -29,10 +28,9 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/workflows/dontime" generichost "github.com/smartcontractkit/chainlink-common/pkg/workflows/host" "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" - eventsv2 "github.com/smartcontractkit/chainlink-protos/workflows/go/v2" - "github.com/smartcontractkit/chainlink-common/keystore/corekeys/workflowkey" "github.com/smartcontractkit/chainlink/v2/core/capabilities" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/confidentialrelay" "github.com/smartcontractkit/chainlink/v2/core/platform" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/shardorchestrator" @@ -1125,7 +1123,7 @@ func (h *eventHandler) EmitActivationAbandoned( event Event, reason eventsv2.ActivationAbandonReason, activationErr error, - retryCount int, + retryCount int32, ) error { if event.Name != WorkflowActivated || h == nil || h.emitter == nil { return nil @@ -1163,7 +1161,7 @@ func (h *eventHandler) EmitActivationAbandoned( payload.ConfigURL, reason, customerFacingError(activationErr), - int32(retryCount), + retryCount, ) } diff --git a/core/services/workflows/syncer/v2/helpers.go b/core/services/workflows/syncer/v2/helpers.go index 6ad671e6ea6..e8be21d6d7f 100644 --- a/core/services/workflows/syncer/v2/helpers.go +++ b/core/services/workflows/syncer/v2/helpers.go @@ -9,8 +9,8 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" - eventsv2 "github.com/smartcontractkit/chainlink-protos/workflows/go/v2" "github.com/smartcontractkit/chainlink-evm/gethwrappers/workflow/generated/workflow_registry_wrapper_v2" + eventsv2 "github.com/smartcontractkit/chainlink-protos/workflows/go/v2" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncerlimiter" ) @@ -74,7 +74,7 @@ func (m *testEvtHandler) Handle(ctx context.Context, event Event) error { return nil } -func (m *testEvtHandler) EmitActivationAbandoned(context.Context, Event, eventsv2.ActivationAbandonReason, error, int) error { +func (m *testEvtHandler) EmitActivationAbandoned(context.Context, Event, eventsv2.ActivationAbandonReason, error, int32) error { return nil } diff --git a/core/services/workflows/syncer/v2/workflow_registry.go b/core/services/workflows/syncer/v2/workflow_registry.go index ad873c1397d..deb591b6f8c 100644 --- a/core/services/workflows/syncer/v2/workflow_registry.go +++ b/core/services/workflows/syncer/v2/workflow_registry.go @@ -23,9 +23,9 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" - eventsv2 "github.com/smartcontractkit/chainlink-protos/workflows/go/v2" "github.com/smartcontractkit/chainlink-evm/gethwrappers/workflow/generated/workflow_registry_wrapper_v2" "github.com/smartcontractkit/chainlink-evm/pkg/config" + eventsv2 "github.com/smartcontractkit/chainlink-protos/workflows/go/v2" "github.com/smartcontractkit/chainlink/v2/core/services/shardorchestrator" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer/versioning" ) @@ -136,7 +136,7 @@ type evtHandler interface { Start(context.Context) error Handle(ctx context.Context, event Event) error - EmitActivationAbandoned(ctx context.Context, event Event, reason eventsv2.ActivationAbandonReason, activationErr error, retryCount int) error + EmitActivationAbandoned(ctx context.Context, event Event, reason eventsv2.ActivationAbandonReason, activationErr error, retryCount int32) error } type donNotifier interface { @@ -480,7 +480,7 @@ func (w *workflowRegistry) abandonActivation( "workflowInfo", evt.Info, "err", activationErr, ) - if err := w.handler.EmitActivationAbandoned(ctx, evt.Event, reason, activationErr, evt.retryCount); err != nil { + if err := w.handler.EmitActivationAbandoned(ctx, evt.Event, reason, activationErr, activationRetryCountAsInt32(evt.retryCount)); err != nil { w.lggr.Errorw("failed to emit activation abandoned event", "err", err) } }