From e9efababc041d061c52255acbbc9eff19c2e8758 Mon Sep 17 00:00:00 2001 From: Arni Dagur Date: Tue, 28 Apr 2026 00:19:13 +0100 Subject: [PATCH] fix: Avoid panic or capacity leak when a stream is cancelled after reserve_capacity --- src/proto/streams/prioritize.rs | 12 ++--- tests/h2-tests/tests/flow_control.rs | 74 ++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 8 deletions(-) diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 75358d47..f7ec0fb1 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -442,14 +442,10 @@ impl Prioritize { return; } - // If the stream has requested capacity, then it must be in the - // streaming state (more data could be sent) or there is buffered data - // waiting to be sent. - debug_assert!( - stream.state.is_send_streaming() || stream.buffered_send_data > 0, - "state={:?}", - stream.state - ); + // The stream may have been reset or closed since capacity was requested. + if !stream.state.is_send_streaming() && stream.buffered_send_data == 0 { + return; + } // The amount of currently available capacity on the connection let conn_available = self.flow.available().as_size(); diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index 57453e78..cabd8f43 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -1587,6 +1587,80 @@ async fn reset_stream_waiting_for_capacity() { join(srv, client).await; } +// Regression test for https://github.com/hyperium/h2/pull/893 +#[tokio::test] +async fn reserve_capacity_then_cancel_does_not_leak() { + for explicit_reset in [true, false] { + let (io, mut srv) = mock::new(); + + let srv = async move { + let _ = srv.assert_client_handshake().await; + srv.recv_frame(frames::headers(1).request("POST", "https://example.com/")) + .await; + if !explicit_reset { + srv.send_frame(frames::headers(1).response(200)).await; + } + let mut data_bytes = 0; + loop { + let frame = srv.next().await.unwrap().unwrap(); + match frame { + h2::frame::Frame::Reset(_) | h2::frame::Frame::Headers(_) => {} + h2::frame::Frame::Data(d) => { + data_bytes += d.payload().len(); + if d.is_end_stream() { + break; + } + } + other => panic!("unexpected: {:?}", other), + } + } + assert_eq!(data_bytes, 65535); + srv.send_frame(frames::headers(3).response(200).eos()).await; + }; + + let client = async move { + let (mut client, mut conn) = client::handshake(io).await.expect("handshake"); + let request = Request::builder() + .method(Method::POST) + .uri("https://example.com/") + .body(()) + .unwrap(); + let (response, mut stream) = client.send_request(request, false).unwrap(); + stream.reserve_capacity(10); + + if explicit_reset { + stream.send_reset(Reason::CANCEL); + drop(stream); + let err = response.await.unwrap_err(); + assert_eq!(err.reason(), Some(Reason::CANCEL)); + } else { + let resp = conn.drive(response).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + drop(stream); + drop(resp); + } + + // Open a second stream and send a full window of data. If capacity + // leaked, this would stall. + let request2 = Request::builder() + .method(Method::POST) + .uri("https://example.com/") + .body(()) + .unwrap(); + let (response2, mut stream2) = client.send_request(request2, false).unwrap(); + stream2.send_data(vec![0; 65535].into(), true).unwrap(); + join(async move { conn.await.expect("conn") }, async move { + let resp = response2.await.expect("resp"); + assert_eq!(resp.status(), StatusCode::OK); + drop(client); + }) + .await; + }; + + join(srv, client).await; + } +} + #[tokio::test] async fn data_padding() { h2_support::trace_init!();