Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions src/proto/streams/prioritize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,14 +442,10 @@ impl Prioritize {
return;
}

// If the stream has requested capacity, then it must be in the
// streaming state (more data could be sent) or there is buffered data
// waiting to be sent.
debug_assert!(
stream.state.is_send_streaming() || stream.buffered_send_data > 0,
"state={:?}",
stream.state
);
// The stream may have been reset or closed since capacity was requested.
if !stream.state.is_send_streaming() && stream.buffered_send_data == 0 {
return;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think, worth a trace, or is it not interesting enough?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion here, but I lean towards not adding a trace.

  1. The existing guard for the same condition at another location does not log
  2. I'm not sure what the actionable item would be for someone that sees the log

}

// The amount of currently available capacity on the connection
let conn_available = self.flow.available().as_size();
Expand Down
74 changes: 74 additions & 0 deletions tests/h2-tests/tests/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1587,6 +1587,80 @@ async fn reset_stream_waiting_for_capacity() {
join(srv, client).await;
}

// Regression test for https://github.com/hyperium/h2/pull/893
#[tokio::test]
async fn reserve_capacity_then_cancel_does_not_leak() {
for explicit_reset in [true, false] {
let (io, mut srv) = mock::new();

let srv = async move {
let _ = srv.assert_client_handshake().await;
srv.recv_frame(frames::headers(1).request("POST", "https://example.com/"))
.await;
if !explicit_reset {
srv.send_frame(frames::headers(1).response(200)).await;
}
let mut data_bytes = 0;
loop {
let frame = srv.next().await.unwrap().unwrap();
match frame {
h2::frame::Frame::Reset(_) | h2::frame::Frame::Headers(_) => {}
h2::frame::Frame::Data(d) => {
data_bytes += d.payload().len();
if d.is_end_stream() {
break;
}
}
other => panic!("unexpected: {:?}", other),
}
}
assert_eq!(data_bytes, 65535);
srv.send_frame(frames::headers(3).response(200).eos()).await;
};

let client = async move {
let (mut client, mut conn) = client::handshake(io).await.expect("handshake");
let request = Request::builder()
.method(Method::POST)
.uri("https://example.com/")
.body(())
.unwrap();
let (response, mut stream) = client.send_request(request, false).unwrap();
stream.reserve_capacity(10);

if explicit_reset {
stream.send_reset(Reason::CANCEL);
drop(stream);
let err = response.await.unwrap_err();
assert_eq!(err.reason(), Some(Reason::CANCEL));
} else {
let resp = conn.drive(response).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
drop(stream);
drop(resp);
}

// Open a second stream and send a full window of data. If capacity
// leaked, this would stall.
let request2 = Request::builder()
.method(Method::POST)
.uri("https://example.com/")
.body(())
.unwrap();
let (response2, mut stream2) = client.send_request(request2, false).unwrap();
stream2.send_data(vec![0; 65535].into(), true).unwrap();
join(async move { conn.await.expect("conn") }, async move {
let resp = response2.await.expect("resp");
assert_eq!(resp.status(), StatusCode::OK);
drop(client);
})
.await;
};

join(srv, client).await;
}
}

#[tokio::test]
async fn data_padding() {
h2_support::trace_init!();
Expand Down
Loading