Skip to content

fix: handle messages concurrently to prevent deadlock (issue #176)#764

Open
MichielDean wants to merge 6 commits into
modelcontextprotocol:mainfrom
MichielDean:fix/concurrent-message-handling
Open

fix: handle messages concurrently to prevent deadlock (issue #176)#764
MichielDean wants to merge 6 commits into
modelcontextprotocol:mainfrom
MichielDean:fix/concurrent-message-handling

Conversation

@MichielDean
Copy link
Copy Markdown

Summary

Fixes #176 — messages are not handled asynchronously, causing deadlock when a request handler needs to send its own request to the peer.

When a server request handler needs to send a request back to the client (e.g., sampling, elicitation, roots), the handler would block the message receive loop. This prevented all subsequent messages from being processed, including the response the handler was waiting for — causing deadlock.

Root Cause

The Protocol.onMessage callback in connect() called onRequest() and onNotification() as suspend functions directly. Since the transport receive loop processes messages sequentially, any blocking handler blocked ALL incoming messages — requests, responses, and notifications alike.

Fix

Added ProtocolOptions.concurrentMessageHandling (default: false) to opt into concurrent handling:

  • When true: Request and notification handlers are launched in separate coroutines using a SupervisorJob scope, unblocking the message receive loop. Responses are still processed synchronously (they're just callback lookups, not suspend functions). This prevents deadlock because the receive loop can continue processing responses while a handler is running.
  • When false (default): Handlers run synchronously in the callback, preserving backward compatibility with HTTP-based transports (SSE, Streamable HTTP) that depend on responses being sent within the same HTTP request context.

Changes

  • ProtocolOptions: added concurrentMessageHandling: Boolean = false
  • Protocol: added handlerScope: CoroutineScope? field, created on connect() when concurrent handling is enabled, cancelled on doClose()
  • Protocol.connect(): when concurrentMessageHandling is true, requests and notifications are dispatched to handlerScope.launch {}; responses are still processed synchronously since they only look up a callback
  • ProtocolTest: updated should propagate CancellationExceptionshould suppress CancellationException since exceptions from launched coroutines don't propagate to the caller (caught by SupervisorJob)
  • New ConcurrencyTest integration test that verifies concurrent tool calls complete in parallel, not serially

Testing

  • All existing tests pass (core, server, integration)
  • New ConcurrencyTest verifies parallel handling
  • StreamableHttpServerTransportTest passes (uses default concurrentMessageHandling = false)
  • ktlint and API compatibility checks pass

Migration Guide

Users experiencing deadlock when server handlers issue requests (sampling, elicitation, roots) should set concurrentMessageHandling = true on their ServerOptions or ClientOptions:

val serverOptions = ServerOptions(
    capabilities = ServerCapabilities(tools = ServerCapabilities.Tools(null)),
)
serverOptions.concurrentMessageHandling = true

val server = Server(
    serverInfo = Implementation("my-server", "1.0"),
    options = serverOptions,
)

For channel-based transports (Stdio, WebSocket, Channel), this should be enabled by default in a future version once transport-specific defaults are implemented.

…ocol

- Dispatch each incoming message to a separate coroutine via
  CoroutineScope(SupervisorJob() + Dispatchers.Default) in
  Protocol.connect(), preventing deadlock when request handlers
  call session.request() before responding (issue modelcontextprotocol#176)
- Add ProtocolOptions.dispatcher for test injection
- Cancel messageScope in Protocol.doClose()
- Update existing CancellationException test to account for
  concurrent dispatch (CancellationException no longer propagates
  to caller; it cancels the individual coroutine)
- Add unit tests for concurrent dispatch, SupervisorJob resilience,
  and scope cleanup on close
- Add integration test using InMemoryTransport.createLinkedPair()
  verifying deadlock prevention end-to-end
Copilot AI review requested due to automatic review settings May 13, 2026 17:33
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds opt-in concurrent request/notification handling in the shared protocol layer to avoid receive-loop deadlocks when handlers wait on peer responses.

Changes:

  • Adds concurrentMessageHandling to ProtocolOptions.
  • Launches request/notification handlers in a supervisor scope when enabled.
  • Adds/updates tests around cancellation and concurrent tool calls.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.

File Description
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt Adds concurrent handler scope and dispatch behavior.
kotlin-sdk-core/api/kotlin-sdk-core.api Updates public API surface for the new option.
kotlin-sdk-core/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/ProtocolTest.kt Updates cancellation behavior expectation.
integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt Adds integration coverage for concurrent tool handling.
Comments suppressed due to low confidence (1)

integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt:149

  • This assertion does not reliably validate concurrent handling. With one slow request followed by an immediate fast request, serial processing still finishes in roughly slowToolDelay plus the 50 ms offset, which is below slowToolDelay * 2; additionally, the stdlib assert may be disabled on JVM test runs. Use a test assertion such as assertTrue and verify that the fast result completes before the slow result (or measure the fast call duration against the remaining slow delay).
        val elapsed = System.currentTimeMillis() - startTime
        // If concurrent: elapsed ≈ slowToolDelay + overhead
        // If serial: elapsed ≈ slowToolDelay * 2
        assert(elapsed < slowToolDelay * 2) {
            "Fast tool was blocked by slow tool. Total duration ${elapsed}ms should be < ${slowToolDelay * 2}ms."

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@MichielDean MichielDean force-pushed the fix/concurrent-message-handling branch from 922214f to a4a00d7 Compare May 13, 2026 17:43
…ne responses, remove dispatcher param

- Replace always-on dispatcher param with concurrentMessageHandling mutable
  property on ProtocolOptions (backward compatible, preserves binary API)
- Handle responses inline (not launched) since they never need concurrency
- Launch requests/notifications in handlerScope only when
  concurrentMessageHandling=true; fall back to synchronous handling
- Revert CancellationException test to synchronous propagation (default=false)
- Remove unit tests that require concurrent dispatch (they tested dispatcher
  behavior, not the opt-in flag)
- Add ChannelTransport-based integration test using real Client/Server that
  verifies concurrent tool calls are actually parallel
- Remove ProtocolOptions.dispatcher from Module.md docs
- Fix SSE/StreamableHTTP test failures caused by always-on concurrent dispatch
Copilot AI review requested due to automatic review settings May 14, 2026 01:27
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

… Dispatchers import, improve concurrency assertion

- Replace System.currentTimeMillis() with TimeSource.Monotonic.markNow()
  for multiplatform compatibility (copilot review)
- Remove unused Dispatchers import from Protocol.kt (copilot review)
- Add fastToolDelay so both tools have delays, making serial vs concurrent
  timing distinguishable (copilot review)
- Use Duration type for delay thresholds instead of Long
@MichielDean
Copy link
Copy Markdown
Author

All Copilot review feedback has been addressed in the latest push (abddfb0):

  1. Explicit true check: Using options?.concurrentMessageHandling == true instead of null-coalescing
  2. Mutable property instead of constructor param: concurrentMessageHandling is now a var in the class body, preserving JVM binary compatibility
  3. try/finally cleanup: Already had try/finally — transports closed and scope cancelled in finally block
  4. CancellationException test: Reverted to synchronous expectation since default is false
  5. System.currentTimeMillis not multiplatform: Replaced with TimeSource.Monotonic.markNow()/elapsedNow()
  6. Unused Dispatchers import: Removed, using fully qualified kotlinx.coroutines.Dispatchers.Default
  7. Test assertion: Both tools now have delays, threshold is slowToolDelay + fastToolDelay to distinguish serial from concurrent timing

All tests pass, ktlint clean, API dump updated.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

- Move ConcurrencyTest from commonTest to jvmTest since it uses
  runBlocking which is not available on JS/Wasm targets
- Change @Property concurrentMessageHandling KDoc tag to plain text
  in connect() documentation since it refers to ProtocolOptions, not
  a property of connect()

Addresses remaining Copilot review comments on PR 764.
@MichielDean
Copy link
Copy Markdown
Author

Addressed all Copilot review feedback:

  1. options?.concurrentMessageHandling != false (null treated as true) — Fixed in 70d0f25. Changed to explicit opt-in: options?.concurrentMessageHandling == true. Protocol(null) now defaults to synchronous handling, which preserves backward compatibility.

  2. Binary compat of ProtocolOptions constructor — Fixed in 70d0f25. concurrentMessageHandling is now a var property in the class body (not a constructor parameter), preserving the existing constructor signatures. API check passes.

  3. Resource leak in ConcurrencyTest — Fixed in 70d0f25. Added try/finally block that closes both transports and cancels the CoroutineScope to prevent leaked coroutines.

  4. CancellationException test with Protocol(null) — Fixed in 70d0f25. The test now uses TestProtocol() with no explicit options, which defaults to concurrentMessageHandling = false (synchronous mode). In synchronous mode, CancellationException correctly propagates from the handler, which is the expected behavior. The test was reverted to its original assertion.

  5. System.currentTimeMillis() not available in commonTest — Fixed in abddfb0. Replaced with TimeSource.Monotonic.markNow() and kotlin.time.Duration for multiplatform compatibility.

  6. Unused Dispatchers import — Fixed in abddfb0. Removed unused import; the code now uses the fully qualified name inline.

  7. Concurrency assertion doesn't distinguish serial from concurrent — Fixed in abddfb0. Both tools now have delays (500ms slow, 50ms fast). Serial time ≈ 550ms, concurrent time ≈ max(500, 50) + overhead. The assertion checks elapsed < slow + fast threshold.

  8. runBlocking not available on JS/Wasm — Fixed in 990061c. Moved ConcurrencyTest from commonTest to jvmTest source set where runBlocking is available.

  9. Flaky assertion threshold — Fixed in abddfb0. The threshold is now slowToolDelay + fastToolDelay (550ms), which is the minimum serial time. With concurrent handling, total time is ~500ms (max of both delays), giving ~50ms of headroom for scheduling overhead. This is tighter but still meaningful — a truly serial run would take 550ms+.

  10. @property KDoc tag on connect() — Fixed in 990061c. @property concurrentMessageHandling was misplaced in the connect() method KDoc. Replaced with plain text referencing ProtocolOptions.concurrentMessageHandling.

@MichielDean MichielDean requested a review from Copilot May 14, 2026 01:51
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.

Comment on lines +153 to +154
val serialThreshold = slowToolDelay + fastToolDelay
if (elapsed >= serialThreshold) {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 3f4c373. The timing-based assertion has been replaced with deterministic synchronization:

  • The slow handler blocks on a CompletableDeferred<Unit> latch, not a wall-clock delay
  • The test asserts the fast handler completes while the slow handler is still suspended
  • This is impossible under serial dispatch (both would be blocked behind the slow handler), making the test a true concurrency regression test with zero timing thresholds
  • Also fixed a JUnit discoverability issue where assertNotNull(CallToolResult?) returned a value, causing JUnit to skip the method entirely

Replace timing-based assertion with deterministic synchronization:
the slow handler blocks on a CompletableDeferred latch, and the test
asserts the fast handler completes while the slow one is still running.
This removes wall-clock timing thresholds that caused flakiness on
loaded CI runners.

Also fixes JUnit discoverability: the test method now returns Unit
(explicit trailing Unit) since assertNotNull(CallToolResult?) returns
a value, causing JUnit to skip the method.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

messages are not handled asynchronously

2 participants