Skip to content

test: Coyote systematic concurrency tests (18 bugs across Core, Kafka, MQTT)#263

Open
msallin wants to merge 10 commits intoBEagle1984:masterfrom
msallin:test/coyote-concurrency-audit
Open

test: Coyote systematic concurrency tests (18 bugs across Core, Kafka, MQTT)#263
msallin wants to merge 10 commits intoBEagle1984:masterfrom
msallin:test/coyote-concurrency-audit

Conversation

@msallin
Copy link
Copy Markdown
Collaborator

@msallin msallin commented Apr 12, 2026

Summary

Adds a Silverback.Tests.Concurrency project that uses Microsoft Coyote to systematically explore task interleavings in Silverback's concurrency-sensitive code. Coyote binary-rewrites Silverback.Core, Silverback.Integration, Silverback.Integration.Kafka, and Silverback.Integration.MQTT, then runs each test across ~100 scheduling strategies to find races deterministically.

No production fixes. All bugs are documented as failing tests.

Running locally requires:

dotnet tool install --global Microsoft.Coyote.CLI --version 1.7.11
dotnet test tests/Silverback.Tests.Concurrency

The MSBuild target sets DOTNET_ROLL_FORWARD=Major because the CLI ships as net8.0.

Test results

Core / Integration

Finding Test Status Bug
ConcurrentAbort_ShouldReachAbortedStateExactlyOnce Passing Abort idempotency is correct
ConcurrentErrorAborts_ShouldSetAbortExceptionExactlyOnce Passing Error-abort idempotency is correct
AbortIfIncompleteAsync leak AbortThenAbortIfIncomplete_ShouldBothReturnAndRemainAborted Failing Semaphore leak in AbortIfIncompleteAsync
AbortIfIncompleteAsync leak ConcurrentAbortMix_ShouldConvergeToSingleAbortedState Failing Same leak, mixed abort paths
AbortIfIncompleteAsync leak SequenceBaseSemaphoreLeakTests (plain xunit) Failing Deterministic reproducer, no Coyote needed
H1 SequenceStore_EnumerateWhileAdding_ShouldNotThrow Failing Dictionary enumerator version check
C2 Abort_RacingCompleteAsync_ShouldNotReturnAsSilentNoOp Failing Abort throws after Complete finishes
#1 LazyStream_GetOrCreateRacingCancel_ShouldNotDoubleSetTcs Failing TCS double-set (SetResult + SetCanceled race)
#2 ConsumerChannel_StartReadingDisposingCts_ShouldNotThrow Failing ObjectDisposedException on cached token
#4 InMemoryKafkaOffsetStore_EnumerateReturnedCollection Failing Live dict.Values escapes lock scope
#5 ParallelSelect_SelectorThrows_ShouldNotLeakSemaphoreToken Failing Semaphore leaked when selector throws
H4 CreateStream_RacingPushAsync Skipped Coyote rewriter crashes on async enumerator
C1 CompleteCoreAsync_RacingAbortAsync Skipped Needs Coyote harness tuning

Kafka

Finding Test Status Bug
C3 SemaphoreCurrentCountRelease_ShouldNotExceedMaxDoP Failing Peak active 3 > maxDoP 2
H5 CommitCoreAsync_RevokeRacingCommit_ShouldNotStoreOffset Failing Offset stored for revoked partition
H6 OnPartitionsRevoked_CommitBeforeStopCompletes Failing Messages consumed after commit
H7 ConsumeLoopHandlerStart_ConcurrentCalls Passing Non-volatile bool; not observable on x64
M2 OffsetsTracker_ConcurrentTrackAndRead Failing Reader sees partial dual-dict state

MQTT

Finding Test Status Bug
MC1 NextChannelIndex_ConcurrentIncrements_ShouldDistributeEvenly Failing Non-atomic increment loses messages
MC2 PublishQueueChannel_RecreationDuringDrain_ShouldNotLoseMessages Failing Channel recreated while drainer holds old ref
MC3 PendingReconnect_ConcurrentTryConnect_ShouldReconnectExactlyOnce Passing Non-volatile bool; not observable on x64
MC4 ChannelReset_ConcurrentWithWrite_ShouldNotLoseMessages Passing Not caught in 100 iterations
MC5 MqttConsumedMessage_TcsReassignment_ShouldNotLoseSignal Failing Commit signal sent to stale TCS; channel reader blocks forever

Totals: 15 failing, 5 passing, 2 skipped (22 Coyote tests + 1 plain xunit).

Bugs found

AbortIfIncompleteAsync semaphore leak

Loc: SequenceBase`1.cs:249-265

AbortIfIncompleteAsync acquires _completeSemaphoreSlim then early-returns on !IsPending without hitting the finally that releases it. Dispose() then deadlocks on _completeSemaphoreSlim.Wait(). Introduced in 3db53b5.

Fix: move the !IsPending check inside the try/finally so the finally always releases.

H1 — SequenceStore unsynchronized Dictionary

Loc: SequenceStore.cs:19

Plain Dictionary with zero locking. Cross-thread callers race on Add/Remove/Enumerate.

Fix: replace Dictionary with ConcurrentDictionary or wrap all access in a lock.

C2 — MessageStreamProvider Abort/Complete race

Loc: MessageStreamProvider`1.cs:104-175

Abort() and CompleteAsync() share an early-return guard using plain bools. When Abort runs after Complete finishes, it throws InvalidOperationException.

Fix: replace the re-entry guard with a proper wait-then-observe pattern.

#1 — LazyMessageStreamEnumerable TaskCompletionSource double-set

Loc: LazyMessageStreamEnumerable`1.cs:49-59

GetOrCreateStream() calls _taskCompletionSource.SetResult(stream). Cancel() calls _taskCompletionSource.SetCanceled(). No synchronization. The second Set throws InvalidOperationException in a fire-and-forget context, silently lost.

Fix: use TrySetResult / TrySetCanceled instead of Set, and add a lock or Interlocked flag around the create-vs-cancel decision.

#2 — ConsumerChannel CancellationTokenSource dispose-while-active

Loc: ConsumerChannel`1.cs:86-102

StartReading() disposes and recreates _readCancellationTokenSource without synchronization. Concurrent readers that cached the old token get ObjectDisposedException.

Fix: don't dispose the old CTS immediately; let it be GC'd, or swap under a lock and ensure readers always re-fetch the token.

#4 — InMemoryKafkaOffsetStore live collection escapes lock scope

Loc: InMemoryKafkaOffsetStore.cs:20-28

GetStoredOffsets() returns offsetsByTopicPartition.Values (a live Dictionary.ValueCollection view) after releasing the lock. StoreOffsetsAsync mutates the dictionary under lock, but the caller enumerates the live view outside the lock.

Fix: return a snapshot: return [.. offsetsByTopicPartition.Values]; inside the lock.

#5 — EnumerableSelectExtensions semaphore leak on throw

Loc: EnumerableSelectExtensions.cs:42-61

ParallelSelectAsync releases the semaphore in the try block, not the finally. When the selector throws, the catch cancels the CTS but does NOT release the semaphore. The token is permanently leaked.

Fix: move semaphore.Release() to a finally block:

await semaphore.WaitAsync(cancellationTokenSource.Token).ConfigureAwait(false);
try { result = await selector(value).ConfigureAwait(false); }
finally { semaphore.Release(); }

C3 — ConsumerChannelsManager semaphore ownership (reported starvation root cause)

Loc: ConsumerChannelsManager.cs:150-165

Conditionally acquires _messagesLimiterSemaphoreSlim based on _channels.Count > MaxDegreeOfParallelism, then releases based on CurrentCount < MaxDegreeOfParallelism. After a partition revoke drops channel count, a handler skips the acquire but releases in its finally. Over-releases a token it never owned.

Fix: capture bool acquired locally and release iff true.

H5 — Kafka commit TOCTOU on revoked partitions

Loc: KafkaConsumer.cs:251-279

CommitCoreAsync double-checks IsNotRevoked but _revokedPartitions is mutated concurrently by the rebalance handler.

Fix: snapshot the revoked set before the loop.

H6 — OnPartitionsRevoked commits after fire-and-forget stop

Loc: KafkaConsumer.cs:152-174

StopAsync().FireAndForget() does not await; Client.Commit() runs before the consume loop actually stops.

Fix: await StopAsync() before calling Client.Commit().

M2 — OffsetsTracker dual AddOrUpdate non-atomicity

Loc: OffsetsTracker.cs:27-42

TrackOffset performs two sequential AddOrUpdate on _commitOffsets and _rollbackOffsets. A concurrent reader sees partial state.

Fix: take a lock around both updates, or merge into a single structure.

MC1 — MQTT _nextChannelIndex non-atomic increment

Loc: ConsumerChannelsManager.cs:84 (MQTT)

OnMessageReceivedAsync uses _channels[_nextChannelIndex++]. MQTTnet fires callbacks on arbitrary threads.

Fix: use Interlocked.Increment(ref _nextChannelIndex) % _channels.Length.

MC2 — MQTT publish queue channel recreation race

Loc: MqttClientWrapper.cs:113-114

ConnectCoreAsync recreates _publishQueueChannel while ProcessPublishQueueAsync still drains the old one.

Fix: signal the drainer to switch channels, or use a long-lived channel.

MC5 — MQTT ConsumedApplicationMessage TCS reassignment race

Loc: ConsumerChannelsManager.cs:73 + MqttConsumer.cs:164,171

After awaiting the TCS, the channel reader reassigns a new one. A concurrent CommitCoreAsync/RollbackCoreAsync calls SetResult on the OLD reference. The new TCS is never signaled; the channel reader blocks forever.

Fix: use a thread-safe signal mechanism (e.g. a Channel or AsyncAutoResetEvent) instead of reassigning a TCS field.

Test plan

  • dotnet build Silverback.sln -- no errors
  • dotnet test tests/Silverback.Tests.Concurrency -- 15 failing, 5 passing, 2 skipped
  • dotnet test tests/Silverback.Integration.Tests --filter SequenceBaseSemaphoreLeakTests -- 1 failing
  • Review each bug description and decide fix priority

msallin added 7 commits April 12, 2026 15:57
Scaffolds a Silverback.Tests.Concurrency xunit project wired to Microsoft
Coyote 1.7.11 for systematic interleaving exploration.

Infrastructure:
- Silverback.Tests.Concurrency.csproj with post-build MSBuild target that
  invokes `coyote rewrite` (DOTNET_ROLL_FORWARD=Major for net10 compat)
- rewrite.coyote.json rewrites Core, Integration, Integration.Kafka, and
  the test assembly itself
- CoyoteTestRunner shared helper wrapping TestingEngine.Create + report
- TestSequence minimal RawSequence subclass with enforceTimeout:false and
  reflection hooks for driving SequenceBase private members
- InternalsVisibleTo grants on Silverback.Core and Silverback.Integration
- Coyote package versions in Directory.Packages.props
- Project entry in Silverback.sln
Coyote tests for SequenceBase<TEnvelope> completion/abort synchronization:

- ConcurrentAbort_ShouldReachAbortedStateExactlyOnce (passing)
- AbortThenAbortIfIncomplete_ShouldBothReturnAndRemainAborted (failing)
- ConcurrentAbortMix_ShouldConvergeToSingleAbortedState (failing)
- ConcurrentErrorAborts_ShouldSetAbortExceptionExactlyOnce (passing)

Two tests fail due to a semaphore leak in AbortIfIncompleteAsync: the
!IsPending early-return exits without hitting the finally that releases
_completeSemaphoreSlim; Dispose then deadlocks on _completeSemaphoreSlim.Wait().

Also adds a plain (non-Coyote) reproducer in Silverback.Integration.Tests.
SequenceStore is a plain Dictionary<string, ISequence> with zero
synchronization. Racing AddAsync against GetEnumerator throws
InvalidOperationException from the dictionary's version check.
Abort() and CompleteAsync() share an early-return re-entry guard that
either silently no-ops or throws InvalidOperationException when they race.
H4 (CreateStream vs PushAsync on unsynchronized _lazyStreams) is skipped
pending Coyote rewriter fix for the async enumerator path.
AddCoreAsync calls private CompleteCoreAsync while holding only
_addSemaphoreSlim, bypassing the _completeSemaphoreSlim guard that the
public CompleteAsync wrapper uses. Test is skipped pending Coyote harness
tuning for the potential-deadlock heuristic.
ConsumerChannelsManager.ReadChannelOnceAsync releases
_messagesLimiterSemaphoreSlim based on CurrentCount instead of tracking
ownership. After a partition revoke, a handler skips the acquire but still
releases in its finally, over-releasing a token it never owned. Coyote
catches peak active handlers = 3 > maxDoP = 2 on the first iteration.
H5: CommitCoreAsync TOCTOU — stores offset for a revoked partition.
H6: OnPartitionsRevoked commits before fire-and-forget stop completes.
H7: ConsumeLoopHandler.Start non-volatile IsConsuming race (passing on x64).
M2: OffsetsTracker dual AddOrUpdate observed as partial state by reader.
@chatgpt-codex-connector
Copy link
Copy Markdown

You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard.

Four MQTT-specific Coyote tests targeting concurrency bugs found in the
MQTT integration layer. Two fail on master:

MC1: _nextChannelIndex++ non-atomic increment (FAILING)
  ConsumerChannelsManager.OnMessageReceivedAsync (line 84) uses a plain
  int increment without synchronization. MQTTnet fires callbacks on
  arbitrary ThreadPool threads; two concurrent callbacks read the same
  index, route to the same channel, and skip a channel entirely.

MC2: _publishQueueChannel recreation race (FAILING)
  MqttClientWrapper.ConnectCoreAsync (lines 113-114) check-then-creates
  a new channel while ProcessPublishQueueAsync still drains the old one.
  Messages written to the new channel are never read by the drainer.

MC3: _pendingReconnect TOCTOU on non-volatile bools (passing on x64)
  Two concurrent TryConnectAsync calls both read _mqttClientWasConnected=true
  and both trigger reconnect. Like Kafka H7, requires weak memory ordering.

MC4: channel.Reset() racing with OnMessageReceivedAsync (passing)
  Coyote did not find a losing interleaving in 100 iterations.

Also adds Silverback.Integration.MQTT project reference and the MQTT
assembly to rewrite.coyote.json.
@msallin msallin changed the title test: Coyote systematic concurrency tests test: Coyote systematic concurrency tests (Core, Kafka, MQTT) Apr 12, 2026
Five new concurrency tests targeting findings from the deep library audit.
All five fail on master:

BEagle1984#1 LazyMessageStreamEnumerable TCS double-set (CRITICAL)
  GetOrCreateStream() and Cancel() both call Set on the same
  TaskCompletionSource without synchronization. The second Set throws
  InvalidOperationException which is lost in a fire-and-forget context.
  Loc: LazyMessageStreamEnumerable`1.cs:49-59

BEagle1984#2 ConsumerChannel CTS dispose-while-active (CRITICAL)
  StartReading() disposes and recreates _readCancellationTokenSource
  without synchronization. Concurrent readers that cached the old token
  get ObjectDisposedException.
  Loc: ConsumerChannel`1.cs:86-102

BEagle1984#4 InMemoryKafkaOffsetStore live collection escapes lock (HIGH)
  GetStoredOffsets() returns Dictionary.Values (a live view) after
  releasing the lock. StoreOffsetsAsync mutates the dict under lock,
  but the caller enumerates the live Values outside the lock.
  Loc: InMemoryKafkaOffsetStore.cs:20-28

BEagle1984#5 EnumerableSelectExtensions semaphore leak on throw (HIGH)
  ParallelSelectAsync releases the semaphore in the try block, not the
  finally. When the selector throws, the token is permanently leaked,
  reducing effective parallelism to zero over time.
  Loc: EnumerableSelectExtensions.cs:42-61

MC5 MQTT ConsumedApplicationMessage TCS reassignment race (HIGH)
  After awaiting the TCS, the channel reader reassigns it. A concurrent
  CommitCoreAsync/RollbackCoreAsync calls SetResult on the OLD reference.
  The new TCS is never signaled; the channel reader blocks forever.
  Loc: ConsumerChannelsManager.cs:73 + MqttConsumer.cs:164,171
@msallin msallin changed the title test: Coyote systematic concurrency tests (Core, Kafka, MQTT) test: Coyote systematic concurrency tests (15 bugs across Core, Kafka, MQTT) Apr 12, 2026
The default 100-iteration Coyote run passes for H7, MC3, and MC4 because
the race window is between two synchronous statements with no scheduling
point for Coyote to interleave. The aggressive variants insert
Task.Yield() at the race point (standard Coyote technique) and run at
1000 iterations with systematic fuzzing enabled.

H7 aggressive: ConsumeLoopHandler.Start double-start (FAILING)
  Two concurrent Start() calls both read IsConsuming=false and both
  start a consume loop. loopsStarted=2, expected 1.

MC3 aggressive: MQTT _pendingReconnect double-reconnect (FAILING)
  Two concurrent TryConnectAsync calls both read
  _mqttClientWasConnected=true and both trigger reconnect.
  reconnectCount=2, expected 1.

MC4 aggressive: MQTT channel.Reset message loss (FAILING)
  Concurrent OnMessageReceivedAsync callbacks each create their own
  channel via Reset; messages written to replaced channels are orphaned.
  Assertion now checks messages readable from the FINAL channel, not
  TryWrite success count. Only 1-3 of 4 messages survive.

Also adds CoyoteTestRunner.RunAggressive() with systematic fuzzing,
potential-deadlock tolerance, and 30s deadlock timeout.
@msallin msallin changed the title test: Coyote systematic concurrency tests (15 bugs across Core, Kafka, MQTT) test: Coyote systematic concurrency tests (18 bugs across Core, Kafka, MQTT) Apr 12, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant