test: Coyote systematic concurrency tests (18 bugs across Core, Kafka, MQTT)#263
Open
msallin wants to merge 10 commits intoBEagle1984:masterfrom
Open
test: Coyote systematic concurrency tests (18 bugs across Core, Kafka, MQTT)#263msallin wants to merge 10 commits intoBEagle1984:masterfrom
msallin wants to merge 10 commits intoBEagle1984:masterfrom
Conversation
Scaffolds a Silverback.Tests.Concurrency xunit project wired to Microsoft Coyote 1.7.11 for systematic interleaving exploration. Infrastructure: - Silverback.Tests.Concurrency.csproj with post-build MSBuild target that invokes `coyote rewrite` (DOTNET_ROLL_FORWARD=Major for net10 compat) - rewrite.coyote.json rewrites Core, Integration, Integration.Kafka, and the test assembly itself - CoyoteTestRunner shared helper wrapping TestingEngine.Create + report - TestSequence minimal RawSequence subclass with enforceTimeout:false and reflection hooks for driving SequenceBase private members - InternalsVisibleTo grants on Silverback.Core and Silverback.Integration - Coyote package versions in Directory.Packages.props - Project entry in Silverback.sln
Coyote tests for SequenceBase<TEnvelope> completion/abort synchronization: - ConcurrentAbort_ShouldReachAbortedStateExactlyOnce (passing) - AbortThenAbortIfIncomplete_ShouldBothReturnAndRemainAborted (failing) - ConcurrentAbortMix_ShouldConvergeToSingleAbortedState (failing) - ConcurrentErrorAborts_ShouldSetAbortExceptionExactlyOnce (passing) Two tests fail due to a semaphore leak in AbortIfIncompleteAsync: the !IsPending early-return exits without hitting the finally that releases _completeSemaphoreSlim; Dispose then deadlocks on _completeSemaphoreSlim.Wait(). Also adds a plain (non-Coyote) reproducer in Silverback.Integration.Tests.
SequenceStore is a plain Dictionary<string, ISequence> with zero synchronization. Racing AddAsync against GetEnumerator throws InvalidOperationException from the dictionary's version check.
Abort() and CompleteAsync() share an early-return re-entry guard that either silently no-ops or throws InvalidOperationException when they race. H4 (CreateStream vs PushAsync on unsynchronized _lazyStreams) is skipped pending Coyote rewriter fix for the async enumerator path.
AddCoreAsync calls private CompleteCoreAsync while holding only _addSemaphoreSlim, bypassing the _completeSemaphoreSlim guard that the public CompleteAsync wrapper uses. Test is skipped pending Coyote harness tuning for the potential-deadlock heuristic.
ConsumerChannelsManager.ReadChannelOnceAsync releases _messagesLimiterSemaphoreSlim based on CurrentCount instead of tracking ownership. After a partition revoke, a handler skips the acquire but still releases in its finally, over-releasing a token it never owned. Coyote catches peak active handlers = 3 > maxDoP = 2 on the first iteration.
H5: CommitCoreAsync TOCTOU — stores offset for a revoked partition. H6: OnPartitionsRevoked commits before fire-and-forget stop completes. H7: ConsumeLoopHandler.Start non-volatile IsConsuming race (passing on x64). M2: OffsetsTracker dual AddOrUpdate observed as partial state by reader.
|
You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard. |
Four MQTT-specific Coyote tests targeting concurrency bugs found in the MQTT integration layer. Two fail on master: MC1: _nextChannelIndex++ non-atomic increment (FAILING) ConsumerChannelsManager.OnMessageReceivedAsync (line 84) uses a plain int increment without synchronization. MQTTnet fires callbacks on arbitrary ThreadPool threads; two concurrent callbacks read the same index, route to the same channel, and skip a channel entirely. MC2: _publishQueueChannel recreation race (FAILING) MqttClientWrapper.ConnectCoreAsync (lines 113-114) check-then-creates a new channel while ProcessPublishQueueAsync still drains the old one. Messages written to the new channel are never read by the drainer. MC3: _pendingReconnect TOCTOU on non-volatile bools (passing on x64) Two concurrent TryConnectAsync calls both read _mqttClientWasConnected=true and both trigger reconnect. Like Kafka H7, requires weak memory ordering. MC4: channel.Reset() racing with OnMessageReceivedAsync (passing) Coyote did not find a losing interleaving in 100 iterations. Also adds Silverback.Integration.MQTT project reference and the MQTT assembly to rewrite.coyote.json.
Five new concurrency tests targeting findings from the deep library audit. All five fail on master: BEagle1984#1 LazyMessageStreamEnumerable TCS double-set (CRITICAL) GetOrCreateStream() and Cancel() both call Set on the same TaskCompletionSource without synchronization. The second Set throws InvalidOperationException which is lost in a fire-and-forget context. Loc: LazyMessageStreamEnumerable`1.cs:49-59 BEagle1984#2 ConsumerChannel CTS dispose-while-active (CRITICAL) StartReading() disposes and recreates _readCancellationTokenSource without synchronization. Concurrent readers that cached the old token get ObjectDisposedException. Loc: ConsumerChannel`1.cs:86-102 BEagle1984#4 InMemoryKafkaOffsetStore live collection escapes lock (HIGH) GetStoredOffsets() returns Dictionary.Values (a live view) after releasing the lock. StoreOffsetsAsync mutates the dict under lock, but the caller enumerates the live Values outside the lock. Loc: InMemoryKafkaOffsetStore.cs:20-28 BEagle1984#5 EnumerableSelectExtensions semaphore leak on throw (HIGH) ParallelSelectAsync releases the semaphore in the try block, not the finally. When the selector throws, the token is permanently leaked, reducing effective parallelism to zero over time. Loc: EnumerableSelectExtensions.cs:42-61 MC5 MQTT ConsumedApplicationMessage TCS reassignment race (HIGH) After awaiting the TCS, the channel reader reassigns it. A concurrent CommitCoreAsync/RollbackCoreAsync calls SetResult on the OLD reference. The new TCS is never signaled; the channel reader blocks forever. Loc: ConsumerChannelsManager.cs:73 + MqttConsumer.cs:164,171
The default 100-iteration Coyote run passes for H7, MC3, and MC4 because the race window is between two synchronous statements with no scheduling point for Coyote to interleave. The aggressive variants insert Task.Yield() at the race point (standard Coyote technique) and run at 1000 iterations with systematic fuzzing enabled. H7 aggressive: ConsumeLoopHandler.Start double-start (FAILING) Two concurrent Start() calls both read IsConsuming=false and both start a consume loop. loopsStarted=2, expected 1. MC3 aggressive: MQTT _pendingReconnect double-reconnect (FAILING) Two concurrent TryConnectAsync calls both read _mqttClientWasConnected=true and both trigger reconnect. reconnectCount=2, expected 1. MC4 aggressive: MQTT channel.Reset message loss (FAILING) Concurrent OnMessageReceivedAsync callbacks each create their own channel via Reset; messages written to replaced channels are orphaned. Assertion now checks messages readable from the FINAL channel, not TryWrite success count. Only 1-3 of 4 messages survive. Also adds CoyoteTestRunner.RunAggressive() with systematic fuzzing, potential-deadlock tolerance, and 30s deadlock timeout.
3 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds a
Silverback.Tests.Concurrencyproject that uses Microsoft Coyote to systematically explore task interleavings in Silverback's concurrency-sensitive code. Coyote binary-rewrites Silverback.Core, Silverback.Integration, Silverback.Integration.Kafka, and Silverback.Integration.MQTT, then runs each test across ~100 scheduling strategies to find races deterministically.No production fixes. All bugs are documented as failing tests.
Running locally requires:
The MSBuild target sets
DOTNET_ROLL_FORWARD=Majorbecause the CLI ships as net8.0.Test results
Core / Integration
ConcurrentAbort_ShouldReachAbortedStateExactlyOnceConcurrentErrorAborts_ShouldSetAbortExceptionExactlyOnceAbortThenAbortIfIncomplete_ShouldBothReturnAndRemainAbortedConcurrentAbortMix_ShouldConvergeToSingleAbortedStateSequenceBaseSemaphoreLeakTests(plain xunit)SequenceStore_EnumerateWhileAdding_ShouldNotThrowAbort_RacingCompleteAsync_ShouldNotReturnAsSilentNoOpLazyStream_GetOrCreateRacingCancel_ShouldNotDoubleSetTcsConsumerChannel_StartReadingDisposingCts_ShouldNotThrowInMemoryKafkaOffsetStore_EnumerateReturnedCollectionParallelSelect_SelectorThrows_ShouldNotLeakSemaphoreTokenCreateStream_RacingPushAsyncCompleteCoreAsync_RacingAbortAsyncKafka
SemaphoreCurrentCountRelease_ShouldNotExceedMaxDoPCommitCoreAsync_RevokeRacingCommit_ShouldNotStoreOffsetOnPartitionsRevoked_CommitBeforeStopCompletesConsumeLoopHandlerStart_ConcurrentCallsOffsetsTracker_ConcurrentTrackAndReadMQTT
NextChannelIndex_ConcurrentIncrements_ShouldDistributeEvenlyPublishQueueChannel_RecreationDuringDrain_ShouldNotLoseMessagesPendingReconnect_ConcurrentTryConnect_ShouldReconnectExactlyOnceChannelReset_ConcurrentWithWrite_ShouldNotLoseMessagesMqttConsumedMessage_TcsReassignment_ShouldNotLoseSignalTotals: 15 failing, 5 passing, 2 skipped (22 Coyote tests + 1 plain xunit).
Bugs found
AbortIfIncompleteAsync semaphore leak
Loc: SequenceBase`1.cs:249-265
AbortIfIncompleteAsync acquires _completeSemaphoreSlim then early-returns on !IsPending without hitting the finally that releases it. Dispose() then deadlocks on _completeSemaphoreSlim.Wait(). Introduced in 3db53b5.
Fix: move the !IsPending check inside the try/finally so the finally always releases.
H1 — SequenceStore unsynchronized Dictionary
Loc: SequenceStore.cs:19
Plain Dictionary with zero locking. Cross-thread callers race on Add/Remove/Enumerate.
Fix: replace Dictionary with ConcurrentDictionary or wrap all access in a lock.
C2 — MessageStreamProvider Abort/Complete race
Loc: MessageStreamProvider`1.cs:104-175
Abort() and CompleteAsync() share an early-return guard using plain bools. When Abort runs after Complete finishes, it throws InvalidOperationException.
Fix: replace the re-entry guard with a proper wait-then-observe pattern.
#1 — LazyMessageStreamEnumerable TaskCompletionSource double-set
Loc: LazyMessageStreamEnumerable`1.cs:49-59
GetOrCreateStream() calls _taskCompletionSource.SetResult(stream). Cancel() calls _taskCompletionSource.SetCanceled(). No synchronization. The second Set throws InvalidOperationException in a fire-and-forget context, silently lost.
Fix: use TrySetResult / TrySetCanceled instead of Set, and add a lock or Interlocked flag around the create-vs-cancel decision.
#2 — ConsumerChannel CancellationTokenSource dispose-while-active
Loc: ConsumerChannel`1.cs:86-102
StartReading() disposes and recreates _readCancellationTokenSource without synchronization. Concurrent readers that cached the old token get ObjectDisposedException.
Fix: don't dispose the old CTS immediately; let it be GC'd, or swap under a lock and ensure readers always re-fetch the token.
#4 — InMemoryKafkaOffsetStore live collection escapes lock scope
Loc: InMemoryKafkaOffsetStore.cs:20-28
GetStoredOffsets() returns offsetsByTopicPartition.Values (a live Dictionary.ValueCollection view) after releasing the lock. StoreOffsetsAsync mutates the dictionary under lock, but the caller enumerates the live view outside the lock.
Fix: return a snapshot:
return [.. offsetsByTopicPartition.Values];inside the lock.#5 — EnumerableSelectExtensions semaphore leak on throw
Loc: EnumerableSelectExtensions.cs:42-61
ParallelSelectAsync releases the semaphore in the try block, not the finally. When the selector throws, the catch cancels the CTS but does NOT release the semaphore. The token is permanently leaked.
Fix: move semaphore.Release() to a finally block:
C3 — ConsumerChannelsManager semaphore ownership (reported starvation root cause)
Loc: ConsumerChannelsManager.cs:150-165
Conditionally acquires _messagesLimiterSemaphoreSlim based on _channels.Count > MaxDegreeOfParallelism, then releases based on CurrentCount < MaxDegreeOfParallelism. After a partition revoke drops channel count, a handler skips the acquire but releases in its finally. Over-releases a token it never owned.
Fix: capture bool acquired locally and release iff true.
H5 — Kafka commit TOCTOU on revoked partitions
Loc: KafkaConsumer.cs:251-279
CommitCoreAsync double-checks IsNotRevoked but _revokedPartitions is mutated concurrently by the rebalance handler.
Fix: snapshot the revoked set before the loop.
H6 — OnPartitionsRevoked commits after fire-and-forget stop
Loc: KafkaConsumer.cs:152-174
StopAsync().FireAndForget() does not await; Client.Commit() runs before the consume loop actually stops.
Fix: await StopAsync() before calling Client.Commit().
M2 — OffsetsTracker dual AddOrUpdate non-atomicity
Loc: OffsetsTracker.cs:27-42
TrackOffset performs two sequential AddOrUpdate on _commitOffsets and _rollbackOffsets. A concurrent reader sees partial state.
Fix: take a lock around both updates, or merge into a single structure.
MC1 — MQTT _nextChannelIndex non-atomic increment
Loc: ConsumerChannelsManager.cs:84 (MQTT)
OnMessageReceivedAsync uses _channels[_nextChannelIndex++]. MQTTnet fires callbacks on arbitrary threads.
Fix: use Interlocked.Increment(ref _nextChannelIndex) % _channels.Length.
MC2 — MQTT publish queue channel recreation race
Loc: MqttClientWrapper.cs:113-114
ConnectCoreAsync recreates _publishQueueChannel while ProcessPublishQueueAsync still drains the old one.
Fix: signal the drainer to switch channels, or use a long-lived channel.
MC5 — MQTT ConsumedApplicationMessage TCS reassignment race
Loc: ConsumerChannelsManager.cs:73 + MqttConsumer.cs:164,171
After awaiting the TCS, the channel reader reassigns a new one. A concurrent CommitCoreAsync/RollbackCoreAsync calls SetResult on the OLD reference. The new TCS is never signaled; the channel reader blocks forever.
Fix: use a thread-safe signal mechanism (e.g. a Channel or AsyncAutoResetEvent) instead of reassigning a TCS field.
Test plan
dotnet build Silverback.sln-- no errorsdotnet test tests/Silverback.Tests.Concurrency-- 15 failing, 5 passing, 2 skippeddotnet test tests/Silverback.Integration.Tests --filter SequenceBaseSemaphoreLeakTests-- 1 failing