fix: handle messages concurrently to prevent deadlock (issue #176)#764
fix: handle messages concurrently to prevent deadlock (issue #176)#764MichielDean wants to merge 6 commits into
Conversation
…ocol - Dispatch each incoming message to a separate coroutine via CoroutineScope(SupervisorJob() + Dispatchers.Default) in Protocol.connect(), preventing deadlock when request handlers call session.request() before responding (issue modelcontextprotocol#176) - Add ProtocolOptions.dispatcher for test injection - Cancel messageScope in Protocol.doClose() - Update existing CancellationException test to account for concurrent dispatch (CancellationException no longer propagates to caller; it cancels the individual coroutine) - Add unit tests for concurrent dispatch, SupervisorJob resilience, and scope cleanup on close - Add integration test using InMemoryTransport.createLinkedPair() verifying deadlock prevention end-to-end
There was a problem hiding this comment.
Pull request overview
This PR adds opt-in concurrent request/notification handling in the shared protocol layer to avoid receive-loop deadlocks when handlers wait on peer responses.
Changes:
- Adds
concurrentMessageHandlingtoProtocolOptions. - Launches request/notification handlers in a supervisor scope when enabled.
- Adds/updates tests around cancellation and concurrent tool calls.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt |
Adds concurrent handler scope and dispatch behavior. |
kotlin-sdk-core/api/kotlin-sdk-core.api |
Updates public API surface for the new option. |
kotlin-sdk-core/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/ProtocolTest.kt |
Updates cancellation behavior expectation. |
integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt |
Adds integration coverage for concurrent tool handling. |
Comments suppressed due to low confidence (1)
integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt:149
- This assertion does not reliably validate concurrent handling. With one slow request followed by an immediate fast request, serial processing still finishes in roughly
slowToolDelayplus the 50 ms offset, which is belowslowToolDelay * 2; additionally, the stdlibassertmay be disabled on JVM test runs. Use a test assertion such asassertTrueand verify that the fast result completes before the slow result (or measure the fast call duration against the remaining slow delay).
val elapsed = System.currentTimeMillis() - startTime
// If concurrent: elapsed ≈ slowToolDelay + overhead
// If serial: elapsed ≈ slowToolDelay * 2
assert(elapsed < slowToolDelay * 2) {
"Fast tool was blocked by slow tool. Total duration ${elapsed}ms should be < ${slowToolDelay * 2}ms."
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…rent message dispatch in Module.md
922214f to
a4a00d7
Compare
…ne responses, remove dispatcher param - Replace always-on dispatcher param with concurrentMessageHandling mutable property on ProtocolOptions (backward compatible, preserves binary API) - Handle responses inline (not launched) since they never need concurrency - Launch requests/notifications in handlerScope only when concurrentMessageHandling=true; fall back to synchronous handling - Revert CancellationException test to synchronous propagation (default=false) - Remove unit tests that require concurrent dispatch (they tested dispatcher behavior, not the opt-in flag) - Add ChannelTransport-based integration test using real Client/Server that verifies concurrent tool calls are actually parallel - Remove ProtocolOptions.dispatcher from Module.md docs - Fix SSE/StreamableHTTP test failures caused by always-on concurrent dispatch
… Dispatchers import, improve concurrency assertion - Replace System.currentTimeMillis() with TimeSource.Monotonic.markNow() for multiplatform compatibility (copilot review) - Remove unused Dispatchers import from Protocol.kt (copilot review) - Add fastToolDelay so both tools have delays, making serial vs concurrent timing distinguishable (copilot review) - Use Duration type for delay thresholds instead of Long
|
All Copilot review feedback has been addressed in the latest push (abddfb0):
All tests pass, ktlint clean, API dump updated. |
- Move ConcurrencyTest from commonTest to jvmTest since it uses runBlocking which is not available on JS/Wasm targets - Change @Property concurrentMessageHandling KDoc tag to plain text in connect() documentation since it refers to ProtocolOptions, not a property of connect() Addresses remaining Copilot review comments on PR 764.
|
Addressed all Copilot review feedback:
|
| val serialThreshold = slowToolDelay + fastToolDelay | ||
| if (elapsed >= serialThreshold) { |
There was a problem hiding this comment.
Fixed in 3f4c373. The timing-based assertion has been replaced with deterministic synchronization:
- The slow handler blocks on a
CompletableDeferred<Unit>latch, not a wall-clock delay - The test asserts the fast handler completes while the slow handler is still suspended
- This is impossible under serial dispatch (both would be blocked behind the slow handler), making the test a true concurrency regression test with zero timing thresholds
- Also fixed a JUnit discoverability issue where
assertNotNull(CallToolResult?)returned a value, causing JUnit to skip the method entirely
Replace timing-based assertion with deterministic synchronization: the slow handler blocks on a CompletableDeferred latch, and the test asserts the fast handler completes while the slow one is still running. This removes wall-clock timing thresholds that caused flakiness on loaded CI runners. Also fixes JUnit discoverability: the test method now returns Unit (explicit trailing Unit) since assertNotNull(CallToolResult?) returns a value, causing JUnit to skip the method.
Summary
Fixes #176 — messages are not handled asynchronously, causing deadlock when a request handler needs to send its own request to the peer.
When a server request handler needs to send a request back to the client (e.g., sampling, elicitation, roots), the handler would block the message receive loop. This prevented all subsequent messages from being processed, including the response the handler was waiting for — causing deadlock.
Root Cause
The
Protocol.onMessagecallback inconnect()calledonRequest()andonNotification()as suspend functions directly. Since the transport receive loop processes messages sequentially, any blocking handler blocked ALL incoming messages — requests, responses, and notifications alike.Fix
Added
ProtocolOptions.concurrentMessageHandling(default:false) to opt into concurrent handling:true: Request and notification handlers are launched in separate coroutines using aSupervisorJobscope, unblocking the message receive loop. Responses are still processed synchronously (they're just callback lookups, not suspend functions). This prevents deadlock because the receive loop can continue processing responses while a handler is running.false(default): Handlers run synchronously in the callback, preserving backward compatibility with HTTP-based transports (SSE, Streamable HTTP) that depend on responses being sent within the same HTTP request context.Changes
ProtocolOptions: addedconcurrentMessageHandling: Boolean = falseProtocol: addedhandlerScope: CoroutineScope?field, created onconnect()when concurrent handling is enabled, cancelled ondoClose()Protocol.connect(): whenconcurrentMessageHandlingis true, requests and notifications are dispatched tohandlerScope.launch {}; responses are still processed synchronously since they only look up a callbackProtocolTest: updatedshould propagate CancellationException→should suppress CancellationExceptionsince exceptions from launched coroutines don't propagate to the caller (caught bySupervisorJob)ConcurrencyTestintegration test that verifies concurrent tool calls complete in parallel, not seriallyTesting
ConcurrencyTestverifies parallel handlingStreamableHttpServerTransportTestpasses (uses defaultconcurrentMessageHandling = false)Migration Guide
Users experiencing deadlock when server handlers issue requests (sampling, elicitation, roots) should set
concurrentMessageHandling = trueon theirServerOptionsorClientOptions:For channel-based transports (Stdio, WebSocket, Channel), this should be enabled by default in a future version once transport-specific defaults are implemented.