diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 7fd9537d..fce6bfd1 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -727,8 +727,9 @@ impl Inner { let err = Error::remote_go_away(frame.debug_data().clone(), frame.reason()); + let peer = counts.peer(); self.store.for_each(|stream| { - if stream.id > last_stream_id { + if stream.id > last_stream_id && peer.is_local_init(stream.id) { counts.transition(stream, |counts, stream| { actions.recv.handle_error(&err, &mut *stream); actions.send.handle_error(send_buffer, stream, counts); diff --git a/tests/h2-tests/tests/server.rs b/tests/h2-tests/tests/server.rs index 72bdca7f..42a35ae6 100644 --- a/tests/h2-tests/tests/server.rs +++ b/tests/h2-tests/tests/server.rs @@ -757,6 +757,69 @@ async fn goaway_even_if_client_sent_goaway() { join(client, srv).await; } +#[tokio::test] +async fn client_goaway_does_not_kill_remote_initiated_streams() { + h2_support::trace_init!(); + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + assert_default_settings!(settings); + // Client sends a request on stream 1 + client + .send_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .await; + // Receive response headers (no END_STREAM) + client.recv_frame(frames::headers(1).response(200)).await; + // Client sends GOAWAY(0) + client.send_frame(frames::go_away(0)).await; + // Server should still be able to send the response body + client + .recv_frame(frames::data(1, "the response body").eos()) + .await; + // Server sends its own GOAWAY and closes + client.recv_frame(frames::go_away(1)).await; + client.recv_eof().await; + }; + + let srv = async move { + let mut srv = server::handshake(io).await.expect("handshake"); + let (req, mut stream) = srv.next().await.unwrap().unwrap(); + assert_eq!(req.method(), &http::Method::GET); + + // Send response headers without END_STREAM + let rsp = http::Response::builder().status(200).body(()).unwrap(); + let mut tx = stream.send_response(rsp, false).unwrap(); + + // Drive the connection while sending the body. + // The yields ensure the connection processes the client's GOAWAY + // before we attempt to send data. + let send_body = async { + // First yield: connection flushes headers. Client receives them + // and sends GOAWAY(0). + tokio::task::yield_now().await; + // Second yield: connection reads and processes GOAWAY(0). + // Before the fix, stream 1 was killed here. + tokio::task::yield_now().await; + // Send response body. Before the fix, this failed because + // stream 1 was incorrectly closed by recv_go_away. + tx.send_data("the response body".into(), true).unwrap(); + }; + + let mut srv = Box::pin(async move { + assert!(srv.next().await.is_none(), "unexpected request"); + }); + srv.drive(send_body).await; + srv.await; + }; + + join(client, srv).await; +} + #[tokio::test] async fn sends_reset_cancel_when_res_body_is_dropped() { h2_support::trace_init!();