test: E2E stress tests for Kafka (24 partitions) and MQTT (rapid production)#266
Closed
msallin wants to merge 11 commits intoBEagle1984:masterfrom
Closed
test: E2E stress tests for Kafka (24 partitions) and MQTT (rapid production)#266msallin wants to merge 11 commits intoBEagle1984:masterfrom
msallin wants to merge 11 commits intoBEagle1984:masterfrom
Conversation
Scaffolds a Silverback.Tests.Concurrency xunit project wired to Microsoft Coyote 1.7.11 for systematic interleaving exploration. Infrastructure: - Silverback.Tests.Concurrency.csproj with post-build MSBuild target that invokes `coyote rewrite` (DOTNET_ROLL_FORWARD=Major for net10 compat) - rewrite.coyote.json rewrites Core, Integration, Integration.Kafka, and the test assembly itself - CoyoteTestRunner shared helper wrapping TestingEngine.Create + report - TestSequence minimal RawSequence subclass with enforceTimeout:false and reflection hooks for driving SequenceBase private members - InternalsVisibleTo grants on Silverback.Core and Silverback.Integration - Coyote package versions in Directory.Packages.props - Project entry in Silverback.sln
Coyote tests for SequenceBase<TEnvelope> completion/abort synchronization: - ConcurrentAbort_ShouldReachAbortedStateExactlyOnce (passing) - AbortThenAbortIfIncomplete_ShouldBothReturnAndRemainAborted (failing) - ConcurrentAbortMix_ShouldConvergeToSingleAbortedState (failing) - ConcurrentErrorAborts_ShouldSetAbortExceptionExactlyOnce (passing) Two tests fail due to a semaphore leak in AbortIfIncompleteAsync: the !IsPending early-return exits without hitting the finally that releases _completeSemaphoreSlim; Dispose then deadlocks on _completeSemaphoreSlim.Wait(). Also adds a plain (non-Coyote) reproducer in Silverback.Integration.Tests.
SequenceStore is a plain Dictionary<string, ISequence> with zero synchronization. Racing AddAsync against GetEnumerator throws InvalidOperationException from the dictionary's version check.
Abort() and CompleteAsync() share an early-return re-entry guard that either silently no-ops or throws InvalidOperationException when they race. H4 (CreateStream vs PushAsync on unsynchronized _lazyStreams) is skipped pending Coyote rewriter fix for the async enumerator path.
AddCoreAsync calls private CompleteCoreAsync while holding only _addSemaphoreSlim, bypassing the _completeSemaphoreSlim guard that the public CompleteAsync wrapper uses. Test is skipped pending Coyote harness tuning for the potential-deadlock heuristic.
ConsumerChannelsManager.ReadChannelOnceAsync releases _messagesLimiterSemaphoreSlim based on CurrentCount instead of tracking ownership. After a partition revoke, a handler skips the acquire but still releases in its finally, over-releasing a token it never owned. Coyote catches peak active handlers = 3 > maxDoP = 2 on the first iteration.
H5: CommitCoreAsync TOCTOU — stores offset for a revoked partition. H6: OnPartitionsRevoked commits before fire-and-forget stop completes. H7: ConsumeLoopHandler.Start non-volatile IsConsuming race (passing on x64). M2: OffsetsTracker dual AddOrUpdate observed as partial state by reader.
Four MQTT-specific Coyote tests targeting concurrency bugs found in the MQTT integration layer. Two fail on master: MC1: _nextChannelIndex++ non-atomic increment (FAILING) ConsumerChannelsManager.OnMessageReceivedAsync (line 84) uses a plain int increment without synchronization. MQTTnet fires callbacks on arbitrary ThreadPool threads; two concurrent callbacks read the same index, route to the same channel, and skip a channel entirely. MC2: _publishQueueChannel recreation race (FAILING) MqttClientWrapper.ConnectCoreAsync (lines 113-114) check-then-creates a new channel while ProcessPublishQueueAsync still drains the old one. Messages written to the new channel are never read by the drainer. MC3: _pendingReconnect TOCTOU on non-volatile bools (passing on x64) Two concurrent TryConnectAsync calls both read _mqttClientWasConnected=true and both trigger reconnect. Like Kafka H7, requires weak memory ordering. MC4: channel.Reset() racing with OnMessageReceivedAsync (passing) Coyote did not find a losing interleaving in 100 iterations. Also adds Silverback.Integration.MQTT project reference and the MQTT assembly to rewrite.coyote.json.
Five new concurrency tests targeting findings from the deep library audit. All five fail on master: BEagle1984#1 LazyMessageStreamEnumerable TCS double-set (CRITICAL) GetOrCreateStream() and Cancel() both call Set on the same TaskCompletionSource without synchronization. The second Set throws InvalidOperationException which is lost in a fire-and-forget context. Loc: LazyMessageStreamEnumerable`1.cs:49-59 BEagle1984#2 ConsumerChannel CTS dispose-while-active (CRITICAL) StartReading() disposes and recreates _readCancellationTokenSource without synchronization. Concurrent readers that cached the old token get ObjectDisposedException. Loc: ConsumerChannel`1.cs:86-102 BEagle1984#4 InMemoryKafkaOffsetStore live collection escapes lock (HIGH) GetStoredOffsets() returns Dictionary.Values (a live view) after releasing the lock. StoreOffsetsAsync mutates the dict under lock, but the caller enumerates the live Values outside the lock. Loc: InMemoryKafkaOffsetStore.cs:20-28 BEagle1984#5 EnumerableSelectExtensions semaphore leak on throw (HIGH) ParallelSelectAsync releases the semaphore in the try block, not the finally. When the selector throws, the token is permanently leaked, reducing effective parallelism to zero over time. Loc: EnumerableSelectExtensions.cs:42-61 MC5 MQTT ConsumedApplicationMessage TCS reassignment race (HIGH) After awaiting the TCS, the channel reader reassigns it. A concurrent CommitCoreAsync/RollbackCoreAsync calls SetResult on the OLD reference. The new TCS is never signaled; the channel reader blocks forever. Loc: ConsumerChannelsManager.cs:73 + MqttConsumer.cs:164,171
The default 100-iteration Coyote run passes for H7, MC3, and MC4 because the race window is between two synchronous statements with no scheduling point for Coyote to interleave. The aggressive variants insert Task.Yield() at the race point (standard Coyote technique) and run at 1000 iterations with systematic fuzzing enabled. H7 aggressive: ConsumeLoopHandler.Start double-start (FAILING) Two concurrent Start() calls both read IsConsuming=false and both start a consume loop. loopsStarted=2, expected 1. MC3 aggressive: MQTT _pendingReconnect double-reconnect (FAILING) Two concurrent TryConnectAsync calls both read _mqttClientWasConnected=true and both trigger reconnect. reconnectCount=2, expected 1. MC4 aggressive: MQTT channel.Reset message loss (FAILING) Concurrent OnMessageReceivedAsync callbacks each create their own channel via Reset; messages written to replaced channels are orphaned. Assertion now checks messages readable from the FINAL channel, not TryWrite success count. Only 1-3 of 4 messages survive. Also adds CoyoteTestRunner.RunAggressive() with systematic fuzzing, potential-deadlock tolerance, and 30s deadlock timeout.
…production) Two stress tests using the mocked broker infrastructure that hammer the full Silverback pipeline under high concurrency. These are regular xunit tests (no Coyote), designed to surface the concurrency bugs we found in pattern-level tests under realistic end-to-end load. KafkaBatchStressTests: 24 partitions, batch size 50, 5040 messages, sync IMessageStreamEnumerable subscriber. Surfaces C3 (semaphore leak after partition count fluctuation), H1 (SequenceStore dictionary race under 24 concurrent readers/writers), and AbortIfIncompleteAsync (semaphore leak on incomplete batch abort). 60-second timeout; expected to either timeout (starvation/deadlock) or fail on assertion (message loss or duplicate). MqttConcurrencyStressTests: 1200 messages produced concurrently via Task.WhenAll, sync subscriber with SpinWait to maximize callback overlap. Surfaces MC1 (_nextChannelIndex non-atomic increment losing message routing). 60-second timeout. Run with: dotnet test tests/Silverback.Integration.Tests.E2E -c Debug --filter "Type=Stress"
|
You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Two E2E stress tests using the mocked broker infrastructure that hammer the full Silverback pipeline under high concurrency. These are regular xunit tests (no Coyote) that live in the existing E2E project and run the real consumer pipeline end-to-end.
Designed to surface the concurrency bugs found in PR #263 under realistic load conditions. Expected to fail on the current codebase.
Run with:
dotnet test tests/Silverback.Integration.Tests.E2E -c Debug --filter "Type=Stress"
Tests
KafkaBatchStressTests (60s timeout)
Bugs this surfaces under load:
MqttConcurrencyStressTests (60s timeout)
Bugs this surfaces under load:
Test plan