Skip to content

QUIC compatibility #2025

Open
Open
@maxime-bruno

Description

@maxime-bruno

Hello,

I'm currently conducting a study to figure out why QUIC outperforms TCP in some scenarios.
In addition to a theoretical study, I wanted an empirical one.

Quiche boasts about its high configurability. That's why I chose it.

Following several inconsistent results (#1988 among others), I tried with Quinn (another well-used QUIC lib).
And to my great surprise, the results are different. We find standard results.

I then chose to see if they were both able to communicate. That when I find out that:

  • A quinn server and a quiche client can communicate
  • A quiche server and a quinn client suffer some issues

So here is my question: Does Quiche implement a RFC compatible version of QUIC, or is the issue coming from Quinn, or my code ?

For information, here is my code:

The part to create a server

use std::collections::HashSet;
use std::fmt::Display;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::RwLock as TokioRwLock;
use tokio::net::UdpSocket;

pub struct Server<'a> {
    udp_socket: Arc<UdpSocket>,
    paths: Arc<TokioRwLock<HashSet<Path>>>,
    config: Config<'a>,
}

type Path = (SocketAddr, SocketAddr);

impl Server<'_> {
    pub async fn start(host: impl Display, port: u16, config: Config<'_>) -> error::Result<Server> {
        let paths = Arc::new(TokioRwLock::new(HashSet::new()));

        let udp_socket =
            Arc::new(tokio::net::UdpSocket::bind((host.to_string().as_str(), port)).await?);

        Ok(Server {
            udp_socket,
            paths,
            config,
        })
    }

    pub async fn accept(&mut self) -> error::Result<Connection> {
        let mut interval = tokio::time::interval(TICK);
        let local = self.udp_socket.local_addr()?;

        loop {
            let peer = self.udp_socket.peek_sender().await?;
            if self.paths.read().await.contains(&(local, peer)) {
                log::trace!("Connection already exists");
                interval.tick().await;
                continue;
            }
            return Connection::accept(
                &self.config,
                Arc::clone(&self.udp_socket),
                Arc::clone(&self.paths),
                (local, peer),
            )
            .await;
        }
    }

    pub fn local_addr(&self) -> error::Result<SocketAddr> {
        Ok(self.udp_socket.local_addr()?)
    }
}

The part to handle the connection:

use core::pin::Pin;
use core::task::{Context, Poll};
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock};

use futures::executor::block_on;
use lazy_static::lazy_static;
use quiche::ConnectionId;
use rand::prelude::IteratorRandom;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::UdpSocket;
use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock};

static TICK: std::time::Duration = std::time::Duration::from_micros(10);

lazy_static! {
    static ref SCID: TokioMutex<HashSet<ConnectionId<'static>>> = TokioMutex::new(HashSet::new());
}

pub struct Connection {
    quic_socket: Arc<StdMutex<quiche::Connection>>,
    udp_handles: Arc<TokioRwLock<HashMap<SocketAddr, UdpHandle>>>,
    timeout_handle: tokio::task::JoinHandle<error::Result<()>>,
    event_handle: tokio::task::JoinHandle<error::Result<()>>,
    paths: Arc<TokioRwLock<HashSet<Path>>>,
    read_waker: Arc<StdRwLock<Option<std::task::Waker>>>,
    write_waker: Arc<StdRwLock<Option<std::task::Waker>>>,
}

struct UdpHandle {
    forward_to_quic: tokio::task::JoinHandle<error::Result<()>>,
    forward_from_quic: tokio::task::JoinHandle<error::Result<()>>,
}

impl Connection {
    pub async fn connect<H: Display + Send>(
        host: H,
        port: u16,
        config: &Config<'_>,
    ) -> error::Result<Self> {
        let udp_socket = Arc::new(tokio::net::UdpSocket::bind(("0.0.0.0", 0)).await?);
        udp_socket
            .connect((host.to_string().as_str(), port))
            .await?;

        let peer = udp_socket.peer_addr()?;
        let local = udp_socket.local_addr()?;

        let mut quiche_config = config.try_into()?;

        let scid = Self::generate_scid().await;

        let mut quic_socket = quiche::connect(
            Some(host.to_string().as_str()),
            &scid,
            local,
            peer,
            &mut quiche_config,
        )?;

        if config.log_keys {
            if let Some(file) = std::env::var_os("SSLKEYLOGFILE") {
                quic_socket.set_keylog(Box::new(std::fs::File::create(file)?));
            } else {
                log::warn! {"Key logging is enabled but SSLKEYLOGFILE is not set"}
            }
        }

        Self::start_connection(
            udp_socket,
            quic_socket,
            Arc::new(TokioRwLock::new(HashSet::new())),
            (local, peer),
            config.max_streams * 4 - 2,
        )
        .await
    }

    async fn accept(
        config: &Config<'_>,
        udp_socket: Arc<UdpSocket>,
        udp_paths: Arc<TokioRwLock<HashSet<Path>>>,
        my_path: Path,
    ) -> error::Result<Self> {
        let mut quiche_config = config.try_into()?;

        let scid = Self::generate_scid().await;

        let mut quic_socket =
            quiche::accept(&scid, None, my_path.0, my_path.1, &mut quiche_config)?;

        if config.log_keys {
            if let Some(file) = std::env::var_os("SSLKEYLOGFILE") {
                quic_socket.set_keylog(Box::new(std::fs::File::create(file)?));
            } else {
                log::warn! {"Key logging is enabled but SSLKEYLOGFILE is not set"}
            }
        }

        Self::start_connection(
            udp_socket,
            quic_socket,
            udp_paths,
            my_path,
            config.max_streams * 4 - 1,
        )
        .await
    }

    async fn generate_scid<'a>() -> ConnectionId<'a> {
        let mut id = ConnectionId::from_vec(rand::random_iter().take(20).collect());
        let mut existing_id = SCID.lock().await;
        while existing_id.contains(&id) {
            id = ConnectionId::from_vec(rand::random_iter().take(20).collect());
        }
        existing_id.insert(id.clone());
        id
    }

    async fn start_connection(
        udp_socket: Arc<UdpSocket>,
        quic_socket: quiche::Connection,
        udp_paths: Arc<TokioRwLock<HashSet<Path>>>,
        my_path: Path,
        max_streams: u64,
    ) -> error::Result<Self> {
        let quic_socket = Arc::new(StdMutex::new(quic_socket));
        udp_paths.write().await.insert(my_path);
        let my_paths = Arc::new(TokioRwLock::new(HashSet::from([my_path])));

        let write_waker = Arc::new(StdRwLock::new(None));
        let read_waker = Arc::new(StdRwLock::new(None));

        let udp_handle = UdpHandle::new(
            Arc::clone(&udp_socket),
            Arc::clone(&quic_socket),
            Arc::clone(&my_paths),
            Arc::clone(&read_waker),
            Arc::clone(&write_waker),
        );
        let udp_handles = Arc::new(TokioRwLock::new(HashMap::from([(my_path.0, udp_handle)])));

        log::debug!("UDP Handle created for {my_path:?}");

        let timeout_handle = tokio::task::spawn(Self::timeout_handler(Arc::clone(&quic_socket)));

        log::debug!("Timeout Handle created for {my_path:?}");

        let event_handle = tokio::task::spawn(Self::event_handler(
            Arc::clone(&quic_socket),
            Arc::clone(&udp_handles),
            Arc::clone(&udp_paths),
            Arc::clone(&my_paths),
        ));

        log::debug!("Event Handle created for {my_path:?}");

        let mut interval = tokio::time::interval(TICK);
        while !quic_socket.lock()?.is_established() && !quic_socket.lock()?.is_closed() {
            if let Some(err) = quic_socket.lock()?.local_error() {
                log::error!("Error during connection establishment: {err:?}");
                panic!();
            }
            for (path, handle) in udp_handles.read().await.iter() {
                if !handle.is_alive() {
                    log::error!("Connection to {path:?} is not alive");
                    panic!();
                }
            }
            log::trace!("Waiting for connection establishment on {my_path:?}");
            interval.tick().await;
        }

        log::debug!("Connection established on {my_path:?}");

        if quic_socket.lock()?.is_closed() {
            panic!("Connection closed before opening")
        }

        quic_socket
            .lock()?
            .stream_priority(max_streams, 127, false)?;

        log::debug!("Stream opened on {my_path:?}");

        Ok(Connection {
            quic_socket,
            udp_handles,
            timeout_handle,
            event_handle,
            paths: my_paths,
            read_waker,
            write_waker,
        })
    }

    async fn timeout_handler(quic_socket: Arc<StdMutex<quiche::Connection>>) -> error::Result<()> {
        let mut interval = tokio::time::interval(TICK);
        loop {
            let timeout = { quic_socket.lock()?.timeout() };
            match timeout {
                Some(timeout) => {
                    tokio::time::sleep(timeout).await;
                    quic_socket.lock()?.on_timeout();
                }
                None => {
                    interval.tick().await;
                }
            }
        }
    }

    async fn event_handler(
        quic_socket: Arc<StdMutex<quiche::Connection>>,
        _udp_handles: Arc<TokioRwLock<HashMap<SocketAddr, UdpHandle>>>,
        _udp_paths: Arc<TokioRwLock<HashSet<Path>>>,
        _my_paths: Arc<TokioRwLock<HashSet<Path>>>,
    ) -> error::Result<()> {
        let mut interval = tokio::time::interval(TICK);
        loop {
            {
                let mut quic_socket = quic_socket.lock()?;
                if let Some(event) = quic_socket.path_event_next() {
                    match event {
                        _ => {
                            log::error!("Unhandled event: {event:?}");
                            todo!()
                        }
                    }
                }
            }
            interval.tick().await;
        }
    }

    pub fn is_alive_blocking(&self) -> bool {
        let handles = block_on(self.udp_handles.read());
        for (ip, handle) in handles.iter() {
            if !handle.is_alive() {
                log::error!("Connection from {ip:?} is not alive");
                return false;
            }
        }
        true
    }

    pub async fn is_alive(&self) -> Result<(), Vec<error::Error>> {
        let mut res = false;
        for (ip, handle) in self.udp_handles.read().await.iter() {
            if !handle.is_alive() {
                log::error!("Connection from {ip:?} is not alive");
                res = true;
            }
        }
        if res {
            let mut handles = self.udp_handles.write().await;
            let mut errors = vec![];
            for ip in handles.keys().cloned().collect::<Vec<SocketAddr>>() {
                if !handles.get(&ip).expect("We are using a existing key").is_alive() {
                    log::error!("Connection from {ip:?} is not alive");
                    let handle = handles.remove(&ip).expect("We are using a existing key");
                    errors.extend(handle.errors().await);
                }
            }
            Err(errors)
        } else {
            Ok(())
        }
    }

    pub async fn peer_addr(&self) -> SocketAddr {
        self.paths.read().await.iter().next().unwrap().1
    }
}

impl UdpHandle {
    pub fn new(
        udp_socket: Arc<UdpSocket>,
        quic_socket: Arc<StdMutex<quiche::Connection>>,
        paths: Arc<TokioRwLock<HashSet<Path>>>,
        read_waker: Arc<StdRwLock<Option<std::task::Waker>>>,
        write_waker: Arc<StdRwLock<Option<std::task::Waker>>>,
    ) -> Self {
        let forward_from_quic = tokio::task::spawn(Self::forward_from_quic(
            Arc::clone(&udp_socket),
            Arc::clone(&quic_socket),
            Arc::clone(&write_waker),
        ));

        let forward_to_quic = tokio::task::spawn(Self::forward_to_quic(
            Arc::clone(&udp_socket),
            Arc::clone(&paths),
            Arc::clone(&quic_socket),
            Arc::clone(&read_waker),
            Arc::clone(&write_waker),
        ));

        Self {
            forward_from_quic,
            forward_to_quic,
        }
    }

    pub fn is_alive(&self) -> bool {
        !self.forward_from_quic.is_finished() && !self.forward_to_quic.is_finished()
    }

    pub async fn errors(self) -> Vec<error::Error> {
        
        let tmp = std::mem::ManuallyDrop::new(self);
        let forward_from_quic = unsafe { std::ptr::read(&tmp.forward_from_quic) };
        let forward_to_quic = unsafe { std::ptr::read(&tmp.forward_to_quic) };
        let mut errors = vec![];
        if !forward_from_quic.is_finished() {
            forward_to_quic.abort();
        }
            errors.push(match forward_from_quic.await {
                Ok(Err(err)) => err,
                Err(err) => err.into(),
                Ok(Ok(_)) => unreachable!("This task is an infinite loop, do it exit should be an error"),
            });
        if !forward_to_quic.is_finished() {
            forward_to_quic.abort();
        }
            errors.push(match forward_to_quic.await {
                Ok(Err(err)) => err,
                Err(err) => err.into(),
                Ok(Ok(_)) => unreachable!("This task is an infinite loop, do it exit should be an error"),
            });
        errors
    }

    async fn forward_from_quic(
        udp_socket: Arc<UdpSocket>,
        quic_socket: Arc<StdMutex<quiche::Connection>>,
        write_waker: Arc<StdRwLock<Option<std::task::Waker>>>,
    ) -> error::Result<()> {
        let mut interval = tokio::time::interval(TICK);
        let mut buffer = [0; 65_536];
        loop {
            let next = {
                let mut quic_socket = quic_socket.lock()?;
                quic_socket.send_on_path(&mut buffer, Some(udp_socket.local_addr()?), None)
            };
            match next {
                Ok((len, info)) => {
                    log::debug!(
                        "Sending a message for quic on path {:?}",
                        (info.from, info.to)
                    );
                    tokio::time::sleep_until(info.at.into()).await;
                    assert_eq!(info.from, udp_socket.local_addr()?);
                    udp_socket.send_to(&buffer[..len], info.to).await?;
                    if let Some(waker) = write_waker
                        .read()?
                        .as_ref() { waker.wake_by_ref()}
                }
                Err(quiche::Error::Done) => {
                    interval.tick().await;
                }
                Err(err) => return Err(err.into()),
            }
        }
    }

    async fn forward_to_quic(
        udp_socket: Arc<UdpSocket>,
        paths: Arc<TokioRwLock<HashSet<Path>>>,
        quic_socket: Arc<StdMutex<quiche::Connection>>,
        read_waker: Arc<StdRwLock<Option<std::task::Waker>>>,
        write_waker: Arc<StdRwLock<Option<std::task::Waker>>>,
    ) -> error::Result<()> {
        let mut interval = tokio::time::interval(TICK);
        let mut buffer = [0; 65_536];
        loop {
            log::trace!(
                "Trying to get message for quic on paths: {:?}",
                paths.read().await.iter()
            );
            let sender = udp_socket.peek_sender().await?;
            let local = udp_socket.local_addr()?;
            if paths.read().await.contains(&(local, sender)) {
                log::debug!("Received message for quic on paths: {:?}", (local, sender));
                let (len, addr) = udp_socket.recv_from(&mut buffer).await?;
                assert_eq!(addr, sender);
                let recv_info = quiche::RecvInfo {
                    from: addr,
                    to: local,
                };
                let mut start = 0;
                while start < len {
                    start = quic_socket
                        .lock()?
                        .recv(&mut buffer[start..len], recv_info)?
                }
                if let Some(waker) = read_waker.read()?.as_ref() { waker.wake_by_ref() }
                if let Some(waker) = write_waker
                    .read()?
                    .as_ref() { waker.wake_by_ref()}
            } else {
                interval.tick().await;
            }
        }
    }
}

impl AsyncRead for Connection {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<Result<(), std::io::Error>> {
        if !self.is_alive_blocking() {
            return Poll::Ready(Err(std::io::Error::other("The connection is corrupted")));
        }
        let quic_socket = Arc::clone(&self.quic_socket);
        let mut quic_socket = quic_socket
            .lock()
            .map_err(std::io::Error::from_poison_error)?;
        let mut rng = rand::rng();
        if let Some(stream_id) = quic_socket.readable().choose(&mut rng) {
            self.read_waker
                .write()
                .map_err(std::io::Error::from_poison_error)?
                .take();
            let buffer = buf.initialize_unfilled();
            let (len, _done) = quic_socket
                .stream_recv(stream_id, buffer)
                .map_err(crate::Error::QuicheError)?;
            buf.advance(len);
            Poll::Ready(Ok(()))
        } else {
            if quic_socket.is_closed() {
                return Poll::Ready(Ok(()));
            }

            let mut waker = self
                .read_waker
                .write()
                .map_err(std::io::Error::from_poison_error)?;
            let waker = waker.get_or_insert_with(|| cx.waker().clone());
            if !waker.will_wake(cx.waker()) {
                waker.clone_from(cx.waker());
            }
            Poll::Pending
        }
    }
}

impl AsyncWrite for Connection {
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize, std::io::Error>> {
        if !self.is_alive_blocking() {
            return Poll::Ready(Err(std::io::Error::other("The connection is corrupted")));
        }
        let quic_socket = Arc::clone(&self.quic_socket);
        let mut quic_socket = quic_socket
            .lock()
            .map_err(std::io::Error::from_poison_error)?;
        let stream_id = match quic_socket
            .writable()
            .filter_map(|id| {
                quic_socket
                    .stream_writable(id, buf.len()).ok().and_then(|valid| if valid {
                        Some(id)
                    } else {
                        None
                    })
            }).next()
        {
            Some(id) => id,
            None => match quic_socket
                .writable()
                .filter_map(|id| {
                    quic_socket
                        .stream_capacity(id)
                        .map(|capacity| (id, capacity)).ok()
                })
                .max_by_key(|(_, capacity)| *capacity)
            {
                Some((id, _)) => id,
                None => {
                    let mut waker = self
                        .write_waker
                        .write()
                        .map_err(std::io::Error::from_poison_error)?;
                    let waker = waker.get_or_insert_with(|| cx.waker().clone());
                    if !waker.will_wake(cx.waker()) {
                        waker.clone_from(cx.waker());
                    }
                    return Poll::Pending;
                }
            },
        };

        let len = quic_socket
            .stream_send(stream_id, buf, false)
            .map_err(crate::Error::QuicheError)?;
        self.write_waker
            .write()
            .map_err(std::io::Error::from_poison_error)?
            .take();
        Poll::Ready(Ok(len))
    }

    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
        Poll::Ready(Ok(()))
    }

    fn poll_shutdown(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
    ) -> Poll<Result<(), std::io::Error>> {
        todo!()
    }
}

impl Drop for UdpHandle {
    fn drop(&mut self) {
        self.forward_from_quic.abort();
        self.forward_to_quic.abort();
    }
}

impl Drop for Connection {
    fn drop(&mut self) {
        self.timeout_handle.abort();
        self.event_handle.abort();
    }
}

The part to handle the configuration:

static MAX_STREAM: u64 = 1;
static MAX_DATA_STREAM: u64 = 1_000_000_000;

#[derive(Clone, Debug)]
pub struct Config<'a> {
    pub ca: Option<std::path::PathBuf>,
    pub certificate: Option<std::path::PathBuf>,
    pub private_key: Option<std::path::PathBuf>,
    pub protocols: &'a [&'a [u8]],
    pub log_keys: bool,
    pub verify_peer: bool,
    pub max_streams: u64,
    pub max_data_stream: u64,
    pub max_data: u64,
}

impl<'a> Config<'a> {
    pub fn client<'b: 'a, P: AsRef<std::path::Path>>(
        ca_folder: P,
        protocols: &'b [&'b [u8]],
    ) -> Self {
        Self {
            ca: Some(ca_folder.as_ref().join("ca.pem")),
            protocols,
            verify_peer: true,
            ..Self::default()
        }
    }

    pub fn server<'b: 'a>(
        host: impl Display,
        certificate_folder: impl AsRef<std::path::Path>,
        ca_folder: impl AsRef<std::path::Path>,
        protocols: &'b [&'b [u8]],
    ) -> Self {
        log::debug!(
            "{host} {} {}",
            ca_folder.as_ref().display(),
            certificate_folder.as_ref().display()
        );
        Self {
            ca: Some(ca_folder.as_ref().join("ca.pem")),
            certificate: Some(certificate_folder.as_ref().join(format!("{host}.pem"))),
            private_key: Some(certificate_folder.as_ref().join(format!("{host}.key"))),
            protocols,
            log_keys: true,
            ..Self::default()
        }
    }
}

impl Default for Config<'_> {
    fn default() -> Self {
        Self {
            ca: None,
            certificate: None,
            private_key: None,
            protocols: &[],
            log_keys: false,
            verify_peer: false,
            max_streams: MAX_STREAM,
            max_data_stream: MAX_DATA_STREAM,
            max_data: MAX_DATA_STREAM * MAX_STREAM,
        }
    }
}

impl TryFrom<&Config<'_>> for quiche::Config {
    type Error = crate::Error;
    fn try_from(old: &Config<'_>) -> Result<Self, Self::Error> {
        let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
        if let Some(ref ca) = old.ca {
            config.load_verify_locations_from_file(
                ca.as_os_str()
                    .to_str()
                    .expect("Invalid certificate folder name"),
            )?;
        }
        if let Some(ref certificate) = old.certificate {
            config.load_cert_chain_from_pem_file(
                certificate
                    .as_os_str()
                    .to_str()
                    .expect("Invalid certificate name"),
            )?;
        }
        if let Some(ref private_key) = old.private_key {
            config.load_priv_key_from_pem_file(
                private_key
                    .as_os_str()
                    .to_str()
                    .expect("Invalid private key name"),
            )?;
        }
        config.verify_peer(old.verify_peer);

        config.set_application_protos(old.protocols)?;

        if old.log_keys {
            config.log_keys();
        }

        config.set_initial_max_streams_uni(old.max_streams);
        config.set_initial_max_stream_data_uni(old.max_data_stream);
        config.set_initial_max_data(old.max_data);

        config.discover_pmtu(true);

        Ok(config)
    }
}

I hope you can help me figure out, either what I did wrong, or why it is not working properly.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions