diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index fce6bfd1..72f2d11e 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -555,6 +555,13 @@ impl Inner { id, self.actions.recv.max_stream_id() ); + + // We still need to account for connection-level flow control. + let sz = frame.flow_controlled_len(); + assert!(sz <= super::MAX_WINDOW_SIZE as usize); + let sz = sz as WindowSize; + self.actions.recv.ignore_data(sz)?; + return Ok(()); } @@ -566,8 +573,8 @@ impl Inner { // this is just a sanity check. assert!(sz <= super::MAX_WINDOW_SIZE as usize); let sz = sz as WindowSize; - self.actions.recv.ignore_data(sz)?; + return Err(Error::library_reset(id, Reason::STREAM_CLOSED)); } diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index 57453e78..bb04cc9b 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -2237,3 +2237,113 @@ async fn too_many_window_update_resets_causes_go_away() { join(srv, client).await; } + +#[tokio::test] +async fn goaway_ignores_data_but_returns_connection_capacity() { + h2_support::trace_init!(); + + for padded in [false, true] { + let (io, mut client) = mock::new(); + + // Test both with and without padding + let data = if padded { + let mut frame = vec![0u8; 16_384]; + frame[0] = 5; // 5 bytes of padding + frame + } else { + vec![0u8; 16_384] + }; + + let client = async move { + let _ = client.assert_server_handshake().await; + + client + .send_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .await; + + // Receive GOAWAY(MAX) + PING from graceful shutdown. + client.recv_frame(frames::go_away(2147483647)).await; + client.recv_frame(frames::ping(frame::Ping::SHUTDOWN)).await; + + client + .recv_frame(frames::headers(1).response(200).eos()) + .await; + + // Stream 3 arrives "in flight" before client processes GOAWAY. + client + .send_frame(frames::headers(3).request("POST", "https://example.com/")) + .await; + + // Complete the graceful shutdown handshake. + client + .send_frame(frames::ping(frame::Ping::SHUTDOWN).pong()) + .await; + + // Final GOAWAY(3): streams 1 and 3 are accepted, everything above is rejected. + client.recv_frame(frames::go_away(3)).await; + + // Stream 5 is above last_stream_id=3; DATA will be ignored, + // but connection window must still be replenished. + client + .send_frame(frames::headers(5).request("POST", "https://example.com/")) + .await; + client + .send_frame(if padded { + frames::data(5, &data[..]).padded() + } else { + frames::data(5, &data[..]) + }) + .await; + client + .send_frame(if padded { + frames::data(5, &data[..]).padded() + } else { + frames::data(5, &data[..]) + }) + .await; + + client + .recv_frame(frames::window_update(0, 16_384 * 2)) + .await; + + client.send_frame(frames::data(3, "").eos()).await; + + client + .recv_frame(frames::headers(3).response(200).eos()) + .await; + + client.recv_eof().await; + }; + + let srv = async move { + let mut srv = server::handshake(io).await.expect("handshake"); + + let (_req, mut stream) = srv.next().await.unwrap().unwrap(); + srv.graceful_shutdown(); + let rsp = http::Response::builder().status(200).body(()).unwrap(); + stream.send_response(rsp, true).unwrap(); + + let (req, mut stream) = srv.next().await.unwrap().unwrap(); + let body = req.into_parts().1; + + let body = async move { + let buf = util::concat(body).await.unwrap(); + assert!(buf.is_empty()); + let rsp = http::Response::builder().status(200).body(()).unwrap(); + stream.send_response(rsp, true).unwrap(); + }; + + let mut srv = Box::pin(async move { + assert!(srv.next().await.is_none(), "unexpected stream after GOAWAY"); + }); + srv.drive(body).await; + srv.await; + }; + + join(client, srv).await; + } +}