Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,13 @@ impl Inner {
id,
self.actions.recv.max_stream_id()
);

// We still need to account for connection-level flow control.
let sz = frame.flow_controlled_len();
assert!(sz <= super::MAX_WINDOW_SIZE as usize);
let sz = sz as WindowSize;
self.actions.recv.ignore_data(sz)?;

return Ok(());
}

Expand All @@ -566,8 +573,8 @@ impl Inner {
// this is just a sanity check.
assert!(sz <= super::MAX_WINDOW_SIZE as usize);
let sz = sz as WindowSize;

self.actions.recv.ignore_data(sz)?;

return Err(Error::library_reset(id, Reason::STREAM_CLOSED));
}

Expand Down
110 changes: 110 additions & 0 deletions tests/h2-tests/tests/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2237,3 +2237,113 @@ async fn too_many_window_update_resets_causes_go_away() {

join(srv, client).await;
}

#[tokio::test]
async fn goaway_ignores_data_but_returns_connection_capacity() {
h2_support::trace_init!();

for padded in [false, true] {
let (io, mut client) = mock::new();

// Test both with and without padding
let data = if padded {
let mut frame = vec![0u8; 16_384];
frame[0] = 5; // 5 bytes of padding
frame
} else {
vec![0u8; 16_384]
};

let client = async move {
let _ = client.assert_server_handshake().await;

client
.send_frame(
frames::headers(1)
.request("GET", "https://example.com/")
.eos(),
)
.await;

// Receive GOAWAY(MAX) + PING from graceful shutdown.
client.recv_frame(frames::go_away(2147483647)).await;
client.recv_frame(frames::ping(frame::Ping::SHUTDOWN)).await;

client
.recv_frame(frames::headers(1).response(200).eos())
.await;

// Stream 3 arrives "in flight" before client processes GOAWAY.
client
.send_frame(frames::headers(3).request("POST", "https://example.com/"))
.await;

// Complete the graceful shutdown handshake.
client
.send_frame(frames::ping(frame::Ping::SHUTDOWN).pong())
.await;

// Final GOAWAY(3): streams 1 and 3 are accepted, everything above is rejected.
client.recv_frame(frames::go_away(3)).await;

// Stream 5 is above last_stream_id=3; DATA will be ignored,
// but connection window must still be replenished.
client
.send_frame(frames::headers(5).request("POST", "https://example.com/"))
.await;
client
.send_frame(if padded {
frames::data(5, &data[..]).padded()
} else {
frames::data(5, &data[..])
})
.await;
client
.send_frame(if padded {
frames::data(5, &data[..]).padded()
} else {
frames::data(5, &data[..])
})
.await;

client
.recv_frame(frames::window_update(0, 16_384 * 2))
.await;

client.send_frame(frames::data(3, "").eos()).await;

client
.recv_frame(frames::headers(3).response(200).eos())
.await;

client.recv_eof().await;
};

let srv = async move {
let mut srv = server::handshake(io).await.expect("handshake");

let (_req, mut stream) = srv.next().await.unwrap().unwrap();
srv.graceful_shutdown();
let rsp = http::Response::builder().status(200).body(()).unwrap();
stream.send_response(rsp, true).unwrap();

let (req, mut stream) = srv.next().await.unwrap().unwrap();
let body = req.into_parts().1;

let body = async move {
let buf = util::concat(body).await.unwrap();
assert!(buf.is_empty());
let rsp = http::Response::builder().status(200).body(()).unwrap();
stream.send_response(rsp, true).unwrap();
};

let mut srv = Box::pin(async move {
assert!(srv.next().await.is_none(), "unexpected stream after GOAWAY");
});
srv.drive(body).await;
srv.await;
};

join(client, srv).await;
}
}
Loading