Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion h3-datagram/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
quic_traits::DatagramConnectionExt,
};

impl<B, C> HandleDatagramsExt<C, B> for Connection<C, B>
impl<C, B> HandleDatagramsExt<C, B> for Connection<C, B>
where
B: Buf,
C: quic::Connection<B> + DatagramConnectionExt<B>,
Expand Down
4 changes: 2 additions & 2 deletions h3-datagram/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ use crate::{
quic_traits::DatagramConnectionExt,
};

impl<B, C> HandleDatagramsExt<C, B> for Connection<C, B>
impl<C, B> HandleDatagramsExt<C, B> for Connection<C, B>
where
B: Buf,
C: quic::Connection<B> + DatagramConnectionExt<B>,
B: Buf,
{
/// Get the datagram sender
fn get_datagram_sender(
Expand Down
40 changes: 29 additions & 11 deletions h3-webtransport/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ where
session_id: SessionId,
/// The underlying HTTP/3 connection
server_conn: Mutex<Connection<C, B>>,
connect_stream: RequestStream<C::BidiStream, B>,
connect_stream: RequestStream<C::BidiStream, B, <C::BidiStream as quic::RecvStream>::Buf>,
opener: Mutex<C::OpenStreams>,
/// Shared State
///
Expand Down Expand Up @@ -80,7 +80,7 @@ where
/// TODO: is the API or the user responsible for validating the CONNECT request?
pub async fn accept(
request: Request<()>,
mut stream: RequestStream<C::BidiStream, B>,
mut stream: RequestStream<C::BidiStream, B, <C::BidiStream as quic::RecvStream>::Buf>,
mut conn: Connection<C, B>,
) -> Result<Self, StreamError> {
let shared = conn.inner.shared.clone();
Expand Down Expand Up @@ -250,13 +250,17 @@ where

/// Streams are opened, but the initial webtransport header has not been sent
type PendingStreams<C, B> = (
BidiStream<<C as quic::OpenStreams<B>>::BidiStream, B>,
BidiStream<
<C as quic::OpenStreams<B>>::BidiStream,
B,
<<C as OpenStreams<B>>::BidiStream as quic::RecvStream>::Buf,
>,
WriteBuf<&'static [u8]>,
);

/// Streams are opened, but the initial webtransport header has not been sent
type PendingUniStreams<C, B> = (
SendStream<<C as quic::OpenStreams<B>>::SendStream, B>,
type PendingUniStreams<C, B, R> = (
SendStream<<C as quic::OpenStreams<B>>::SendStream, B, R>,
WriteBuf<&'static [u8]>,
);

Expand Down Expand Up @@ -288,7 +292,8 @@ where
B: Buf,
C::BidiStream: SendStreamUnframed<B>,
{
type Output = Result<BidiStream<C::BidiStream, B>, StreamError>;
type Output =
Result<BidiStream<C::BidiStream, B, <C::BidiStream as quic::RecvStream>::Buf>, StreamError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut p = self.project();
Expand Down Expand Up @@ -322,7 +327,7 @@ pin_project! {
/// Opens a unidirectional stream
pub struct OpenUni<'a, C: quic::Connection<B>, B:Buf> {
opener: &'a Mutex<C::OpenStreams>,
stream: Option<PendingUniStreams<C, B>>,
stream: Option<PendingUniStreams<C, B, <C::BidiStream as quic::RecvStream>::Buf>>,
// Future for opening a uni stream
session_id: SessionId,
stream_handler: WTransportStreamHandler
Expand All @@ -335,7 +340,8 @@ where
B: Buf,
C::SendStream: SendStreamUnframed<B>,
{
type Output = Result<SendStream<C::SendStream, B>, StreamError>;
type Output =
Result<SendStream<C::SendStream, B, <C::BidiStream as quic::RecvStream>::Buf>, StreamError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut p = self.project();
Expand Down Expand Up @@ -372,11 +378,17 @@ where
#[allow(clippy::large_enum_variant)]
pub enum AcceptedBi<C: quic::Connection<B>, B: Buf> {
/// An incoming bidirectional stream
BidiStream(SessionId, BidiStream<C::BidiStream, B>),
BidiStream(
SessionId,
BidiStream<C::BidiStream, B, <C::BidiStream as quic::RecvStream>::Buf>,
),
/// An incoming HTTP/3 request, passed through a webtransport session.
///
/// This makes it possible to respond to multiple CONNECT requests
Request(Request<()>, RequestStream<C::BidiStream, B>),
Request(
Request<()>,
RequestStream<C::BidiStream, B, <C::BidiStream as quic::RecvStream>::Buf>,
),
}

/// Future for [`WebTransportSession::accept_uni`]
Expand All @@ -393,7 +405,13 @@ where
C: quic::Connection<B>,
B: Buf,
{
type Output = Result<Option<(SessionId, RecvStream<C::RecvStream, B>)>, ConnectionError>;
type Output = Result<
Option<(
SessionId,
RecvStream<C::RecvStream, B, <C::RecvStream as quic::RecvStream>::Buf>,
)>,
ConnectionError,
>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut conn = self.conn.lock().unwrap();
Expand Down
90 changes: 46 additions & 44 deletions h3-webtransport/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::task::Poll;

use bytes::{Buf, Bytes};
use bytes::Buf;
use h3::{
quic::{self, StreamErrorIncoming},
stream::BufRecvStream,
Expand All @@ -10,25 +10,25 @@ use tokio::io::ReadBuf;

pin_project! {
/// WebTransport receive stream
pub struct RecvStream<S,B> {
pub struct RecvStream<S, B, R> {
#[pin]
stream: BufRecvStream<S, B>,
stream: BufRecvStream<S, B, R>,
}
}

impl<S, B> RecvStream<S, B> {
impl<S, B, R> RecvStream<S, B, R> {
#[allow(missing_docs)]
pub fn new(stream: BufRecvStream<S, B>) -> Self {
pub fn new(stream: BufRecvStream<S, B, R>) -> Self {
Self { stream }
}
}

impl<S, B> quic::RecvStream for RecvStream<S, B>
impl<S, B, R> quic::RecvStream for RecvStream<S, B, R>
where
S: quic::RecvStream,
B: Buf,
S: quic::RecvStream<Buf = R>,
R: Buf,
{
type Buf = Bytes;
type Buf = R;

fn poll_data(
&mut self,
Expand All @@ -46,9 +46,9 @@ where
}
}

impl<S, B> futures_util::io::AsyncRead for RecvStream<S, B>
impl<S, B, R> futures_util::io::AsyncRead for RecvStream<S, B, R>
where
BufRecvStream<S, B>: futures_util::io::AsyncRead,
BufRecvStream<S, B, R>: futures_util::io::AsyncRead,
{
fn poll_read(
self: std::pin::Pin<&mut Self>,
Expand All @@ -60,9 +60,9 @@ where
}
}

impl<S, B> tokio::io::AsyncRead for RecvStream<S, B>
impl<S, B, R> tokio::io::AsyncRead for RecvStream<S, B, R>
where
BufRecvStream<S, B>: tokio::io::AsyncRead,
BufRecvStream<S, B, R>: tokio::io::AsyncRead,
{
fn poll_read(
self: std::pin::Pin<&mut Self>,
Expand All @@ -76,28 +76,28 @@ where

pin_project! {
/// WebTransport send stream
pub struct SendStream<S,B> {
pub struct SendStream<S, B, R> {
#[pin]
stream: BufRecvStream<S ,B>,
stream: BufRecvStream<S, B, R>
}
}

impl<S, B> std::fmt::Debug for SendStream<S, B> {
impl<S, B, R> std::fmt::Debug for SendStream<S, B, R> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SendStream")
.field("stream", &self.stream)
.finish()
}
}

impl<S, B> SendStream<S, B> {
impl<S, B, R> SendStream<S, B, R> {
#[allow(missing_docs)]
pub(crate) fn new(stream: BufRecvStream<S, B>) -> Self {
pub(crate) fn new(stream: BufRecvStream<S, B, R>) -> Self {
Self { stream }
}
}

impl<S, B> quic::SendStreamUnframed<B> for SendStream<S, B>
impl<S, B, R> quic::SendStreamUnframed<B> for SendStream<S, B, R>
where
S: quic::SendStreamUnframed<B>,
B: Buf,
Expand All @@ -111,7 +111,7 @@ where
}
}

impl<S, B> quic::SendStream<B> for SendStream<S, B>
impl<S, B, R> quic::SendStream<B> for SendStream<S, B, R>
where
S: quic::SendStream<B>,
B: Buf,
Expand Down Expand Up @@ -146,9 +146,9 @@ where
}
}

impl<S, B> futures_util::io::AsyncWrite for SendStream<S, B>
impl<S, B, R> futures_util::io::AsyncWrite for SendStream<S, B, R>
where
BufRecvStream<S, B>: futures_util::io::AsyncWrite,
BufRecvStream<S, B, R>: futures_util::io::AsyncWrite,
{
fn poll_write(
self: std::pin::Pin<&mut Self>,
Expand Down Expand Up @@ -176,9 +176,9 @@ where
}
}

impl<S, B> tokio::io::AsyncWrite for SendStream<S, B>
impl<S, B, R> tokio::io::AsyncWrite for SendStream<S, B, R>
where
BufRecvStream<S, B>: tokio::io::AsyncWrite,
BufRecvStream<S, B, R>: tokio::io::AsyncWrite,
{
fn poll_write(
self: std::pin::Pin<&mut Self>,
Expand Down Expand Up @@ -211,19 +211,19 @@ pin_project! {
///
/// Can be split into a [`RecvStream`] and [`SendStream`] if the underlying QUIC implementation
/// supports it.
pub struct BidiStream<S, B> {
pub struct BidiStream<S, B, R> {
#[pin]
stream: BufRecvStream<S, B>,
stream: BufRecvStream<S, B, R>,
}
}

impl<S, B> BidiStream<S, B> {
pub(crate) fn new(stream: BufRecvStream<S, B>) -> Self {
impl<S, B, R> BidiStream<S, B, R> {
pub(crate) fn new(stream: BufRecvStream<S, B, R>) -> Self {
Self { stream }
}
}

impl<S, B> quic::SendStream<B> for BidiStream<S, B>
impl<S, B, R> quic::SendStream<B> for BidiStream<S, B, R>
where
S: quic::SendStream<B>,
B: Buf,
Expand Down Expand Up @@ -258,10 +258,11 @@ where
}
}

impl<S, B> quic::SendStreamUnframed<B> for BidiStream<S, B>
impl<S, B, R> quic::SendStreamUnframed<B> for BidiStream<S, B, R>
where
S: quic::SendStreamUnframed<B>,
B: Buf,
R: Buf,
{
fn poll_send<D: Buf>(
&mut self,
Expand All @@ -272,8 +273,8 @@ where
}
}

impl<S: quic::RecvStream, B> quic::RecvStream for BidiStream<S, B> {
type Buf = Bytes;
impl<S: quic::RecvStream<Buf = R>, B, R: Buf> quic::RecvStream for BidiStream<S, B, R> {
type Buf = R;

fn poll_data(
&mut self,
Expand All @@ -291,24 +292,25 @@ impl<S: quic::RecvStream, B> quic::RecvStream for BidiStream<S, B> {
}
}

impl<S, B> quic::BidiStream<B> for BidiStream<S, B>
impl<S, B, R> quic::BidiStream<B> for BidiStream<S, B, R>
where
S: quic::BidiStream<B>,
S: quic::BidiStream<B, RecvStream: quic::RecvStream<Buf = R>> + quic::RecvStream<Buf = R>,
B: Buf,
R: Buf,
{
type SendStream = SendStream<S::SendStream, B>;
type SendStream = SendStream<S::SendStream, B, R>;

type RecvStream = RecvStream<S::RecvStream, B>;
type RecvStream = RecvStream<S::RecvStream, B, R>;

fn split(self) -> (Self::SendStream, Self::RecvStream) {
let (send, recv) = self.stream.split();
(SendStream::new(send), RecvStream::new(recv))
}
}

impl<S, B> futures_util::io::AsyncRead for BidiStream<S, B>
impl<S, B, R> futures_util::io::AsyncRead for BidiStream<S, B, R>
where
BufRecvStream<S, B>: futures_util::io::AsyncRead,
BufRecvStream<S, B, R>: futures_util::io::AsyncRead,
{
fn poll_read(
self: std::pin::Pin<&mut Self>,
Expand All @@ -320,9 +322,9 @@ where
}
}

impl<S, B> futures_util::io::AsyncWrite for BidiStream<S, B>
impl<S, B, R> futures_util::io::AsyncWrite for BidiStream<S, B, R>
where
BufRecvStream<S, B>: futures_util::io::AsyncWrite,
BufRecvStream<S, B, R>: futures_util::io::AsyncWrite,
{
fn poll_write(
self: std::pin::Pin<&mut Self>,
Expand Down Expand Up @@ -350,9 +352,9 @@ where
}
}

impl<S, B> tokio::io::AsyncRead for BidiStream<S, B>
impl<S, B, R> tokio::io::AsyncRead for BidiStream<S, B, R>
where
BufRecvStream<S, B>: tokio::io::AsyncRead,
BufRecvStream<S, B, R>: tokio::io::AsyncRead,
{
fn poll_read(
self: std::pin::Pin<&mut Self>,
Expand All @@ -364,9 +366,9 @@ where
}
}

impl<S, B> tokio::io::AsyncWrite for BidiStream<S, B>
impl<S, B, R> tokio::io::AsyncWrite for BidiStream<S, B, R>
where
BufRecvStream<S, B>: tokio::io::AsyncWrite,
BufRecvStream<S, B, R>: tokio::io::AsyncWrite,
{
fn poll_write(
self: std::pin::Pin<&mut Self>,
Expand Down
Loading
Loading