Skip to content

test: E2E stress tests for Kafka (24 partitions) and MQTT (rapid production)#266

Closed
msallin wants to merge 11 commits intoBEagle1984:masterfrom
msallin:test/e2e-stress
Closed

test: E2E stress tests for Kafka (24 partitions) and MQTT (rapid production)#266
msallin wants to merge 11 commits intoBEagle1984:masterfrom
msallin:test/e2e-stress

Conversation

@msallin
Copy link
Copy Markdown
Collaborator

@msallin msallin commented Apr 12, 2026

Summary

Two E2E stress tests using the mocked broker infrastructure that hammer the full Silverback pipeline under high concurrency. These are regular xunit tests (no Coyote) that live in the existing E2E project and run the real consumer pipeline end-to-end.

Designed to surface the concurrency bugs found in PR #263 under realistic load conditions. Expected to fail on the current codebase.

Run with:
dotnet test tests/Silverback.Integration.Tests.E2E -c Debug --filter "Type=Stress"

Tests

KafkaBatchStressTests (60s timeout)

  • 24 partitions, batch size 50, 5040 messages
  • Sync IMessageStreamEnumerable subscriber (24 threads blocked in MoveNext via SafeWait)
  • Messages distributed via SetKafkaKey across all partitions
  • Asserts: all 5040 consumed, no duplicates, all offsets committed

Bugs this surfaces under load:

  • C3: ConsumerChannelsManager semaphore leak after partition count fluctuation
  • H1: SequenceStore unsynchronized Dictionary under 24 concurrent readers/writers
  • AbortIfIncompleteAsync: semaphore leak on incomplete batch abort
  • Thread starvation: 24 sync subscribers exhaust the ThreadPool

MqttConcurrencyStressTests (60s timeout)

  • 1200 messages produced concurrently via Task.WhenAll
  • Sync subscriber with Thread.SpinWait(100) to maximize callback overlap
  • Asserts: all 1200 received, no duplicates, no missing indices

Bugs this surfaces under load:

  • MC1: _nextChannelIndex non-atomic increment in OnMessageReceivedAsync
  • MC2: publish queue channel recreation race under rapid production

Test plan

  • dotnet build tests/Silverback.Integration.Tests.E2E -- no errors
  • dotnet test --filter "Type=Stress" -- expected: both tests fail (timeout or assertion)
  • dotnet test --filter "Type=E2E" -- existing E2E tests still pass (stress tests are isolated by Trait)

msallin added 11 commits April 12, 2026 15:57
Scaffolds a Silverback.Tests.Concurrency xunit project wired to Microsoft
Coyote 1.7.11 for systematic interleaving exploration.

Infrastructure:
- Silverback.Tests.Concurrency.csproj with post-build MSBuild target that
  invokes `coyote rewrite` (DOTNET_ROLL_FORWARD=Major for net10 compat)
- rewrite.coyote.json rewrites Core, Integration, Integration.Kafka, and
  the test assembly itself
- CoyoteTestRunner shared helper wrapping TestingEngine.Create + report
- TestSequence minimal RawSequence subclass with enforceTimeout:false and
  reflection hooks for driving SequenceBase private members
- InternalsVisibleTo grants on Silverback.Core and Silverback.Integration
- Coyote package versions in Directory.Packages.props
- Project entry in Silverback.sln
Coyote tests for SequenceBase<TEnvelope> completion/abort synchronization:

- ConcurrentAbort_ShouldReachAbortedStateExactlyOnce (passing)
- AbortThenAbortIfIncomplete_ShouldBothReturnAndRemainAborted (failing)
- ConcurrentAbortMix_ShouldConvergeToSingleAbortedState (failing)
- ConcurrentErrorAborts_ShouldSetAbortExceptionExactlyOnce (passing)

Two tests fail due to a semaphore leak in AbortIfIncompleteAsync: the
!IsPending early-return exits without hitting the finally that releases
_completeSemaphoreSlim; Dispose then deadlocks on _completeSemaphoreSlim.Wait().

Also adds a plain (non-Coyote) reproducer in Silverback.Integration.Tests.
SequenceStore is a plain Dictionary<string, ISequence> with zero
synchronization. Racing AddAsync against GetEnumerator throws
InvalidOperationException from the dictionary's version check.
Abort() and CompleteAsync() share an early-return re-entry guard that
either silently no-ops or throws InvalidOperationException when they race.
H4 (CreateStream vs PushAsync on unsynchronized _lazyStreams) is skipped
pending Coyote rewriter fix for the async enumerator path.
AddCoreAsync calls private CompleteCoreAsync while holding only
_addSemaphoreSlim, bypassing the _completeSemaphoreSlim guard that the
public CompleteAsync wrapper uses. Test is skipped pending Coyote harness
tuning for the potential-deadlock heuristic.
ConsumerChannelsManager.ReadChannelOnceAsync releases
_messagesLimiterSemaphoreSlim based on CurrentCount instead of tracking
ownership. After a partition revoke, a handler skips the acquire but still
releases in its finally, over-releasing a token it never owned. Coyote
catches peak active handlers = 3 > maxDoP = 2 on the first iteration.
H5: CommitCoreAsync TOCTOU — stores offset for a revoked partition.
H6: OnPartitionsRevoked commits before fire-and-forget stop completes.
H7: ConsumeLoopHandler.Start non-volatile IsConsuming race (passing on x64).
M2: OffsetsTracker dual AddOrUpdate observed as partial state by reader.
Four MQTT-specific Coyote tests targeting concurrency bugs found in the
MQTT integration layer. Two fail on master:

MC1: _nextChannelIndex++ non-atomic increment (FAILING)
  ConsumerChannelsManager.OnMessageReceivedAsync (line 84) uses a plain
  int increment without synchronization. MQTTnet fires callbacks on
  arbitrary ThreadPool threads; two concurrent callbacks read the same
  index, route to the same channel, and skip a channel entirely.

MC2: _publishQueueChannel recreation race (FAILING)
  MqttClientWrapper.ConnectCoreAsync (lines 113-114) check-then-creates
  a new channel while ProcessPublishQueueAsync still drains the old one.
  Messages written to the new channel are never read by the drainer.

MC3: _pendingReconnect TOCTOU on non-volatile bools (passing on x64)
  Two concurrent TryConnectAsync calls both read _mqttClientWasConnected=true
  and both trigger reconnect. Like Kafka H7, requires weak memory ordering.

MC4: channel.Reset() racing with OnMessageReceivedAsync (passing)
  Coyote did not find a losing interleaving in 100 iterations.

Also adds Silverback.Integration.MQTT project reference and the MQTT
assembly to rewrite.coyote.json.
Five new concurrency tests targeting findings from the deep library audit.
All five fail on master:

BEagle1984#1 LazyMessageStreamEnumerable TCS double-set (CRITICAL)
  GetOrCreateStream() and Cancel() both call Set on the same
  TaskCompletionSource without synchronization. The second Set throws
  InvalidOperationException which is lost in a fire-and-forget context.
  Loc: LazyMessageStreamEnumerable`1.cs:49-59

BEagle1984#2 ConsumerChannel CTS dispose-while-active (CRITICAL)
  StartReading() disposes and recreates _readCancellationTokenSource
  without synchronization. Concurrent readers that cached the old token
  get ObjectDisposedException.
  Loc: ConsumerChannel`1.cs:86-102

BEagle1984#4 InMemoryKafkaOffsetStore live collection escapes lock (HIGH)
  GetStoredOffsets() returns Dictionary.Values (a live view) after
  releasing the lock. StoreOffsetsAsync mutates the dict under lock,
  but the caller enumerates the live Values outside the lock.
  Loc: InMemoryKafkaOffsetStore.cs:20-28

BEagle1984#5 EnumerableSelectExtensions semaphore leak on throw (HIGH)
  ParallelSelectAsync releases the semaphore in the try block, not the
  finally. When the selector throws, the token is permanently leaked,
  reducing effective parallelism to zero over time.
  Loc: EnumerableSelectExtensions.cs:42-61

MC5 MQTT ConsumedApplicationMessage TCS reassignment race (HIGH)
  After awaiting the TCS, the channel reader reassigns it. A concurrent
  CommitCoreAsync/RollbackCoreAsync calls SetResult on the OLD reference.
  The new TCS is never signaled; the channel reader blocks forever.
  Loc: ConsumerChannelsManager.cs:73 + MqttConsumer.cs:164,171
The default 100-iteration Coyote run passes for H7, MC3, and MC4 because
the race window is between two synchronous statements with no scheduling
point for Coyote to interleave. The aggressive variants insert
Task.Yield() at the race point (standard Coyote technique) and run at
1000 iterations with systematic fuzzing enabled.

H7 aggressive: ConsumeLoopHandler.Start double-start (FAILING)
  Two concurrent Start() calls both read IsConsuming=false and both
  start a consume loop. loopsStarted=2, expected 1.

MC3 aggressive: MQTT _pendingReconnect double-reconnect (FAILING)
  Two concurrent TryConnectAsync calls both read
  _mqttClientWasConnected=true and both trigger reconnect.
  reconnectCount=2, expected 1.

MC4 aggressive: MQTT channel.Reset message loss (FAILING)
  Concurrent OnMessageReceivedAsync callbacks each create their own
  channel via Reset; messages written to replaced channels are orphaned.
  Assertion now checks messages readable from the FINAL channel, not
  TryWrite success count. Only 1-3 of 4 messages survive.

Also adds CoyoteTestRunner.RunAggressive() with systematic fuzzing,
potential-deadlock tolerance, and 30s deadlock timeout.
…production)

Two stress tests using the mocked broker infrastructure that hammer the
full Silverback pipeline under high concurrency. These are regular xunit
tests (no Coyote), designed to surface the concurrency bugs we found in
pattern-level tests under realistic end-to-end load.

KafkaBatchStressTests: 24 partitions, batch size 50, 5040 messages,
sync IMessageStreamEnumerable subscriber. Surfaces C3 (semaphore leak
after partition count fluctuation), H1 (SequenceStore dictionary race
under 24 concurrent readers/writers), and AbortIfIncompleteAsync
(semaphore leak on incomplete batch abort). 60-second timeout; expected
to either timeout (starvation/deadlock) or fail on assertion (message
loss or duplicate).

MqttConcurrencyStressTests: 1200 messages produced concurrently via
Task.WhenAll, sync subscriber with SpinWait to maximize callback overlap.
Surfaces MC1 (_nextChannelIndex non-atomic increment losing message
routing). 60-second timeout.

Run with: dotnet test tests/Silverback.Integration.Tests.E2E -c Debug --filter "Type=Stress"
@chatgpt-codex-connector
Copy link
Copy Markdown

You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard.

@msallin msallin closed this Apr 12, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant