Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ impl Inner {
if self.actions.may_have_forgotten_stream(peer, id) {
tracing::debug!("recv_data for old stream={:?}, sending STREAM_CLOSED", id,);

let sz = frame.payload().len();
let sz = frame.flow_controlled_len();
// This should have been enforced at the codec::FramedRead layer, so
// this is just a sanity check.
assert!(sz <= super::MAX_WINDOW_SIZE as usize);
Expand All @@ -581,7 +581,7 @@ impl Inner {
let send_buffer = &mut *send_buffer;

self.counts.transition(stream, |counts, stream| {
let sz = frame.payload().len();
let sz = frame.flow_controlled_len();
let res = actions.recv.recv_data(frame, stream);

// Any stream error after receiving a DATA frame means
Expand Down
142 changes: 142 additions & 0 deletions tests/h2-tests/tests/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,148 @@ async fn stream_error_release_connection_capacity() {
join(srv, client).await;
}

// Regression test for TODO
#[tokio::test]
async fn padded_data_stream_error_releases_connection_capacity() {
h2_support::trace_init!();
let (io, mut srv) = mock::new();

// Padded EOS frame: 1 byte pad_len + 8 bytes data + 1 byte padding.
// flow_controlled_len = 10, payload (data only) = 8.
let mut padded_eos = vec![0u8; 10];
padded_eos[0] = 1;

let srv = async move {
let settings = srv.assert_client_handshake().await;
assert_default_settings!(settings);
srv.recv_frame(
frames::headers(1)
.request("GET", "https://http2.akamai.com/")
.eos(),
)
.await;
// Wrong content-length triggers a stream error on the padded EOS frame.
srv.send_frame(
frames::headers(1)
.response(200)
.field("content-length", &*(16_384 * 3).to_string()),
)
.await;
srv.send_frame(frames::data(1, vec![0; 16_384])).await;
srv.send_frame(frames::data(1, vec![0; 16_384])).await;
srv.send_frame(frames::data(1, &padded_eos[..]).padded().eos())
.await;
srv.recv_frame(frames::reset(1).protocol_error()).await;
// Released capacity must include the padded frame's full
// flow_controlled_len (10), not just its payload (8).
srv.recv_frame(frames::window_update(0, 16_384 * 2 + 10))
.await;
};

let client = async move {
let (mut client, mut conn) = client::handshake(io).await.unwrap();
let request = Request::builder()
.uri("https://http2.akamai.com/")
.body(())
.unwrap();

let req = async {
let resp = client
.send_request(request, true)
.unwrap()
.0
.await
.expect("response");
assert_eq!(resp.status(), StatusCode::OK);
let mut body = resp.into_parts().1;
let mut cap = body.flow_control().clone();
let to_release = 16_384 * 2;
let mut should_recv_bytes = to_release;
let mut should_recv_frames = 2usize;

let err = body
.try_for_each(|bytes| async move {
should_recv_bytes -= bytes.len();
should_recv_frames -= 1;
if should_recv_bytes == 0 {
assert_eq!(should_recv_frames, 0);
}
Ok(())
})
.await
.expect_err("body");
assert_eq!(
err.to_string(),
"stream error detected: unspecific protocol error detected"
);
cap.release_capacity(to_release).expect("release_capacity");
};
conn.drive(req).await;
conn.await.expect("client");
};

join(srv, client).await;
}

// Regression test for TODO
#[tokio::test]
async fn padded_data_on_forgotten_stream_releases_connection_capacity() {
h2_support::trace_init!();
let (io, mut srv) = mock::new();

// Padded frame: 1 byte pad_len + 16378 bytes data + 5 bytes padding.
// flow_controlled_len = 16384, payload (data only) = 16378.
let mut padded = vec![0u8; 16_384];
padded[0] = 5;

let srv = async move {
let settings = srv.assert_client_handshake().await;
assert_default_settings!(settings);
srv.recv_frame(
frames::headers(1)
.request("GET", "https://example.com/")
.eos(),
)
.await;
srv.send_frame(frames::headers(1).response(200)).await;
srv.send_frame(frames::data(1, vec![0; 16_384])).await;
srv.recv_frame(frames::reset(1).cancel()).await;
// Wait for the reset to expire so the stream is forgotten.
idle_ms(50).await;
srv.ping_pong([1; 8]).await;
// Stream 1 has been evicted. Send a padded DATA frame for it.
srv.send_frame(frames::data(1, &padded[..]).padded().eos())
.await;
// Released capacity must cover both frames using their full
// flow_controlled_len. There used to be a bug where the padded
// frame would release only 16378 (payload) instead of 16384.
srv.recv_frame(frames::window_update(0, 16_384 * 2)).await;
srv.recv_frame(frames::reset(1).stream_closed()).await;
};

let client = async move {
let (mut client, conn) = client::Builder::new()
.reset_stream_duration(Duration::from_millis(10))
.handshake::<_, Bytes>(io)
.await
.expect("handshake");

let req = async {
let resp = client.get("https://example.com/").await.expect("response");
assert_eq!(resp.status(), StatusCode::OK);
// This drop sends RST_STREAM(CANCEL)
drop(resp);
};

let mut conn = Box::pin(async move { conn.await.expect("client") });
conn.drive(req).await;
conn.await;
drop(client);
};

join(srv, client).await;
}

#[tokio::test]
async fn stream_close_by_data_frame_releases_capacity() {
h2_support::trace_init!();
Expand Down
Loading