Description
Hello,
I'm currently conducting a study to figure out why QUIC outperforms TCP in some scenarios.
In addition to a theoretical study, I wanted an empirical one.
Quiche boasts about its high configurability. That's why I chose it.
Following several inconsistent results (#1988 among others), I tried with Quinn (another well-used QUIC lib).
And to my great surprise, the results are different. We find standard results.
I then chose to see if they were both able to communicate. That when I find out that:
- A quinn server and a quiche client can communicate
- A quiche server and a quinn client suffer some issues
So here is my question: Does Quiche implement a RFC compatible version of QUIC, or is the issue coming from Quinn, or my code ?
For information, here is my code:
The part to create a server
use std::collections::HashSet;
use std::fmt::Display;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::RwLock as TokioRwLock;
use tokio::net::UdpSocket;
pub struct Server<'a> {
udp_socket: Arc<UdpSocket>,
paths: Arc<TokioRwLock<HashSet<Path>>>,
config: Config<'a>,
}
type Path = (SocketAddr, SocketAddr);
impl Server<'_> {
pub async fn start(host: impl Display, port: u16, config: Config<'_>) -> error::Result<Server> {
let paths = Arc::new(TokioRwLock::new(HashSet::new()));
let udp_socket =
Arc::new(tokio::net::UdpSocket::bind((host.to_string().as_str(), port)).await?);
Ok(Server {
udp_socket,
paths,
config,
})
}
pub async fn accept(&mut self) -> error::Result<Connection> {
let mut interval = tokio::time::interval(TICK);
let local = self.udp_socket.local_addr()?;
loop {
let peer = self.udp_socket.peek_sender().await?;
if self.paths.read().await.contains(&(local, peer)) {
log::trace!("Connection already exists");
interval.tick().await;
continue;
}
return Connection::accept(
&self.config,
Arc::clone(&self.udp_socket),
Arc::clone(&self.paths),
(local, peer),
)
.await;
}
}
pub fn local_addr(&self) -> error::Result<SocketAddr> {
Ok(self.udp_socket.local_addr()?)
}
}
The part to handle the connection:
use core::pin::Pin;
use core::task::{Context, Poll};
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock};
use futures::executor::block_on;
use lazy_static::lazy_static;
use quiche::ConnectionId;
use rand::prelude::IteratorRandom;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::UdpSocket;
use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock};
static TICK: std::time::Duration = std::time::Duration::from_micros(10);
lazy_static! {
static ref SCID: TokioMutex<HashSet<ConnectionId<'static>>> = TokioMutex::new(HashSet::new());
}
pub struct Connection {
quic_socket: Arc<StdMutex<quiche::Connection>>,
udp_handles: Arc<TokioRwLock<HashMap<SocketAddr, UdpHandle>>>,
timeout_handle: tokio::task::JoinHandle<error::Result<()>>,
event_handle: tokio::task::JoinHandle<error::Result<()>>,
paths: Arc<TokioRwLock<HashSet<Path>>>,
read_waker: Arc<StdRwLock<Option<std::task::Waker>>>,
write_waker: Arc<StdRwLock<Option<std::task::Waker>>>,
}
struct UdpHandle {
forward_to_quic: tokio::task::JoinHandle<error::Result<()>>,
forward_from_quic: tokio::task::JoinHandle<error::Result<()>>,
}
impl Connection {
pub async fn connect<H: Display + Send>(
host: H,
port: u16,
config: &Config<'_>,
) -> error::Result<Self> {
let udp_socket = Arc::new(tokio::net::UdpSocket::bind(("0.0.0.0", 0)).await?);
udp_socket
.connect((host.to_string().as_str(), port))
.await?;
let peer = udp_socket.peer_addr()?;
let local = udp_socket.local_addr()?;
let mut quiche_config = config.try_into()?;
let scid = Self::generate_scid().await;
let mut quic_socket = quiche::connect(
Some(host.to_string().as_str()),
&scid,
local,
peer,
&mut quiche_config,
)?;
if config.log_keys {
if let Some(file) = std::env::var_os("SSLKEYLOGFILE") {
quic_socket.set_keylog(Box::new(std::fs::File::create(file)?));
} else {
log::warn! {"Key logging is enabled but SSLKEYLOGFILE is not set"}
}
}
Self::start_connection(
udp_socket,
quic_socket,
Arc::new(TokioRwLock::new(HashSet::new())),
(local, peer),
config.max_streams * 4 - 2,
)
.await
}
async fn accept(
config: &Config<'_>,
udp_socket: Arc<UdpSocket>,
udp_paths: Arc<TokioRwLock<HashSet<Path>>>,
my_path: Path,
) -> error::Result<Self> {
let mut quiche_config = config.try_into()?;
let scid = Self::generate_scid().await;
let mut quic_socket =
quiche::accept(&scid, None, my_path.0, my_path.1, &mut quiche_config)?;
if config.log_keys {
if let Some(file) = std::env::var_os("SSLKEYLOGFILE") {
quic_socket.set_keylog(Box::new(std::fs::File::create(file)?));
} else {
log::warn! {"Key logging is enabled but SSLKEYLOGFILE is not set"}
}
}
Self::start_connection(
udp_socket,
quic_socket,
udp_paths,
my_path,
config.max_streams * 4 - 1,
)
.await
}
async fn generate_scid<'a>() -> ConnectionId<'a> {
let mut id = ConnectionId::from_vec(rand::random_iter().take(20).collect());
let mut existing_id = SCID.lock().await;
while existing_id.contains(&id) {
id = ConnectionId::from_vec(rand::random_iter().take(20).collect());
}
existing_id.insert(id.clone());
id
}
async fn start_connection(
udp_socket: Arc<UdpSocket>,
quic_socket: quiche::Connection,
udp_paths: Arc<TokioRwLock<HashSet<Path>>>,
my_path: Path,
max_streams: u64,
) -> error::Result<Self> {
let quic_socket = Arc::new(StdMutex::new(quic_socket));
udp_paths.write().await.insert(my_path);
let my_paths = Arc::new(TokioRwLock::new(HashSet::from([my_path])));
let write_waker = Arc::new(StdRwLock::new(None));
let read_waker = Arc::new(StdRwLock::new(None));
let udp_handle = UdpHandle::new(
Arc::clone(&udp_socket),
Arc::clone(&quic_socket),
Arc::clone(&my_paths),
Arc::clone(&read_waker),
Arc::clone(&write_waker),
);
let udp_handles = Arc::new(TokioRwLock::new(HashMap::from([(my_path.0, udp_handle)])));
log::debug!("UDP Handle created for {my_path:?}");
let timeout_handle = tokio::task::spawn(Self::timeout_handler(Arc::clone(&quic_socket)));
log::debug!("Timeout Handle created for {my_path:?}");
let event_handle = tokio::task::spawn(Self::event_handler(
Arc::clone(&quic_socket),
Arc::clone(&udp_handles),
Arc::clone(&udp_paths),
Arc::clone(&my_paths),
));
log::debug!("Event Handle created for {my_path:?}");
let mut interval = tokio::time::interval(TICK);
while !quic_socket.lock()?.is_established() && !quic_socket.lock()?.is_closed() {
if let Some(err) = quic_socket.lock()?.local_error() {
log::error!("Error during connection establishment: {err:?}");
panic!();
}
for (path, handle) in udp_handles.read().await.iter() {
if !handle.is_alive() {
log::error!("Connection to {path:?} is not alive");
panic!();
}
}
log::trace!("Waiting for connection establishment on {my_path:?}");
interval.tick().await;
}
log::debug!("Connection established on {my_path:?}");
if quic_socket.lock()?.is_closed() {
panic!("Connection closed before opening")
}
quic_socket
.lock()?
.stream_priority(max_streams, 127, false)?;
log::debug!("Stream opened on {my_path:?}");
Ok(Connection {
quic_socket,
udp_handles,
timeout_handle,
event_handle,
paths: my_paths,
read_waker,
write_waker,
})
}
async fn timeout_handler(quic_socket: Arc<StdMutex<quiche::Connection>>) -> error::Result<()> {
let mut interval = tokio::time::interval(TICK);
loop {
let timeout = { quic_socket.lock()?.timeout() };
match timeout {
Some(timeout) => {
tokio::time::sleep(timeout).await;
quic_socket.lock()?.on_timeout();
}
None => {
interval.tick().await;
}
}
}
}
async fn event_handler(
quic_socket: Arc<StdMutex<quiche::Connection>>,
_udp_handles: Arc<TokioRwLock<HashMap<SocketAddr, UdpHandle>>>,
_udp_paths: Arc<TokioRwLock<HashSet<Path>>>,
_my_paths: Arc<TokioRwLock<HashSet<Path>>>,
) -> error::Result<()> {
let mut interval = tokio::time::interval(TICK);
loop {
{
let mut quic_socket = quic_socket.lock()?;
if let Some(event) = quic_socket.path_event_next() {
match event {
_ => {
log::error!("Unhandled event: {event:?}");
todo!()
}
}
}
}
interval.tick().await;
}
}
pub fn is_alive_blocking(&self) -> bool {
let handles = block_on(self.udp_handles.read());
for (ip, handle) in handles.iter() {
if !handle.is_alive() {
log::error!("Connection from {ip:?} is not alive");
return false;
}
}
true
}
pub async fn is_alive(&self) -> Result<(), Vec<error::Error>> {
let mut res = false;
for (ip, handle) in self.udp_handles.read().await.iter() {
if !handle.is_alive() {
log::error!("Connection from {ip:?} is not alive");
res = true;
}
}
if res {
let mut handles = self.udp_handles.write().await;
let mut errors = vec![];
for ip in handles.keys().cloned().collect::<Vec<SocketAddr>>() {
if !handles.get(&ip).expect("We are using a existing key").is_alive() {
log::error!("Connection from {ip:?} is not alive");
let handle = handles.remove(&ip).expect("We are using a existing key");
errors.extend(handle.errors().await);
}
}
Err(errors)
} else {
Ok(())
}
}
pub async fn peer_addr(&self) -> SocketAddr {
self.paths.read().await.iter().next().unwrap().1
}
}
impl UdpHandle {
pub fn new(
udp_socket: Arc<UdpSocket>,
quic_socket: Arc<StdMutex<quiche::Connection>>,
paths: Arc<TokioRwLock<HashSet<Path>>>,
read_waker: Arc<StdRwLock<Option<std::task::Waker>>>,
write_waker: Arc<StdRwLock<Option<std::task::Waker>>>,
) -> Self {
let forward_from_quic = tokio::task::spawn(Self::forward_from_quic(
Arc::clone(&udp_socket),
Arc::clone(&quic_socket),
Arc::clone(&write_waker),
));
let forward_to_quic = tokio::task::spawn(Self::forward_to_quic(
Arc::clone(&udp_socket),
Arc::clone(&paths),
Arc::clone(&quic_socket),
Arc::clone(&read_waker),
Arc::clone(&write_waker),
));
Self {
forward_from_quic,
forward_to_quic,
}
}
pub fn is_alive(&self) -> bool {
!self.forward_from_quic.is_finished() && !self.forward_to_quic.is_finished()
}
pub async fn errors(self) -> Vec<error::Error> {
let tmp = std::mem::ManuallyDrop::new(self);
let forward_from_quic = unsafe { std::ptr::read(&tmp.forward_from_quic) };
let forward_to_quic = unsafe { std::ptr::read(&tmp.forward_to_quic) };
let mut errors = vec![];
if !forward_from_quic.is_finished() {
forward_to_quic.abort();
}
errors.push(match forward_from_quic.await {
Ok(Err(err)) => err,
Err(err) => err.into(),
Ok(Ok(_)) => unreachable!("This task is an infinite loop, do it exit should be an error"),
});
if !forward_to_quic.is_finished() {
forward_to_quic.abort();
}
errors.push(match forward_to_quic.await {
Ok(Err(err)) => err,
Err(err) => err.into(),
Ok(Ok(_)) => unreachable!("This task is an infinite loop, do it exit should be an error"),
});
errors
}
async fn forward_from_quic(
udp_socket: Arc<UdpSocket>,
quic_socket: Arc<StdMutex<quiche::Connection>>,
write_waker: Arc<StdRwLock<Option<std::task::Waker>>>,
) -> error::Result<()> {
let mut interval = tokio::time::interval(TICK);
let mut buffer = [0; 65_536];
loop {
let next = {
let mut quic_socket = quic_socket.lock()?;
quic_socket.send_on_path(&mut buffer, Some(udp_socket.local_addr()?), None)
};
match next {
Ok((len, info)) => {
log::debug!(
"Sending a message for quic on path {:?}",
(info.from, info.to)
);
tokio::time::sleep_until(info.at.into()).await;
assert_eq!(info.from, udp_socket.local_addr()?);
udp_socket.send_to(&buffer[..len], info.to).await?;
if let Some(waker) = write_waker
.read()?
.as_ref() { waker.wake_by_ref()}
}
Err(quiche::Error::Done) => {
interval.tick().await;
}
Err(err) => return Err(err.into()),
}
}
}
async fn forward_to_quic(
udp_socket: Arc<UdpSocket>,
paths: Arc<TokioRwLock<HashSet<Path>>>,
quic_socket: Arc<StdMutex<quiche::Connection>>,
read_waker: Arc<StdRwLock<Option<std::task::Waker>>>,
write_waker: Arc<StdRwLock<Option<std::task::Waker>>>,
) -> error::Result<()> {
let mut interval = tokio::time::interval(TICK);
let mut buffer = [0; 65_536];
loop {
log::trace!(
"Trying to get message for quic on paths: {:?}",
paths.read().await.iter()
);
let sender = udp_socket.peek_sender().await?;
let local = udp_socket.local_addr()?;
if paths.read().await.contains(&(local, sender)) {
log::debug!("Received message for quic on paths: {:?}", (local, sender));
let (len, addr) = udp_socket.recv_from(&mut buffer).await?;
assert_eq!(addr, sender);
let recv_info = quiche::RecvInfo {
from: addr,
to: local,
};
let mut start = 0;
while start < len {
start = quic_socket
.lock()?
.recv(&mut buffer[start..len], recv_info)?
}
if let Some(waker) = read_waker.read()?.as_ref() { waker.wake_by_ref() }
if let Some(waker) = write_waker
.read()?
.as_ref() { waker.wake_by_ref()}
} else {
interval.tick().await;
}
}
}
}
impl AsyncRead for Connection {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<(), std::io::Error>> {
if !self.is_alive_blocking() {
return Poll::Ready(Err(std::io::Error::other("The connection is corrupted")));
}
let quic_socket = Arc::clone(&self.quic_socket);
let mut quic_socket = quic_socket
.lock()
.map_err(std::io::Error::from_poison_error)?;
let mut rng = rand::rng();
if let Some(stream_id) = quic_socket.readable().choose(&mut rng) {
self.read_waker
.write()
.map_err(std::io::Error::from_poison_error)?
.take();
let buffer = buf.initialize_unfilled();
let (len, _done) = quic_socket
.stream_recv(stream_id, buffer)
.map_err(crate::Error::QuicheError)?;
buf.advance(len);
Poll::Ready(Ok(()))
} else {
if quic_socket.is_closed() {
return Poll::Ready(Ok(()));
}
let mut waker = self
.read_waker
.write()
.map_err(std::io::Error::from_poison_error)?;
let waker = waker.get_or_insert_with(|| cx.waker().clone());
if !waker.will_wake(cx.waker()) {
waker.clone_from(cx.waker());
}
Poll::Pending
}
}
}
impl AsyncWrite for Connection {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
if !self.is_alive_blocking() {
return Poll::Ready(Err(std::io::Error::other("The connection is corrupted")));
}
let quic_socket = Arc::clone(&self.quic_socket);
let mut quic_socket = quic_socket
.lock()
.map_err(std::io::Error::from_poison_error)?;
let stream_id = match quic_socket
.writable()
.filter_map(|id| {
quic_socket
.stream_writable(id, buf.len()).ok().and_then(|valid| if valid {
Some(id)
} else {
None
})
}).next()
{
Some(id) => id,
None => match quic_socket
.writable()
.filter_map(|id| {
quic_socket
.stream_capacity(id)
.map(|capacity| (id, capacity)).ok()
})
.max_by_key(|(_, capacity)| *capacity)
{
Some((id, _)) => id,
None => {
let mut waker = self
.write_waker
.write()
.map_err(std::io::Error::from_poison_error)?;
let waker = waker.get_or_insert_with(|| cx.waker().clone());
if !waker.will_wake(cx.waker()) {
waker.clone_from(cx.waker());
}
return Poll::Pending;
}
},
};
let len = quic_socket
.stream_send(stream_id, buf, false)
.map_err(crate::Error::QuicheError)?;
self.write_waker
.write()
.map_err(std::io::Error::from_poison_error)?
.take();
Poll::Ready(Ok(len))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
todo!()
}
}
impl Drop for UdpHandle {
fn drop(&mut self) {
self.forward_from_quic.abort();
self.forward_to_quic.abort();
}
}
impl Drop for Connection {
fn drop(&mut self) {
self.timeout_handle.abort();
self.event_handle.abort();
}
}
The part to handle the configuration:
static MAX_STREAM: u64 = 1;
static MAX_DATA_STREAM: u64 = 1_000_000_000;
#[derive(Clone, Debug)]
pub struct Config<'a> {
pub ca: Option<std::path::PathBuf>,
pub certificate: Option<std::path::PathBuf>,
pub private_key: Option<std::path::PathBuf>,
pub protocols: &'a [&'a [u8]],
pub log_keys: bool,
pub verify_peer: bool,
pub max_streams: u64,
pub max_data_stream: u64,
pub max_data: u64,
}
impl<'a> Config<'a> {
pub fn client<'b: 'a, P: AsRef<std::path::Path>>(
ca_folder: P,
protocols: &'b [&'b [u8]],
) -> Self {
Self {
ca: Some(ca_folder.as_ref().join("ca.pem")),
protocols,
verify_peer: true,
..Self::default()
}
}
pub fn server<'b: 'a>(
host: impl Display,
certificate_folder: impl AsRef<std::path::Path>,
ca_folder: impl AsRef<std::path::Path>,
protocols: &'b [&'b [u8]],
) -> Self {
log::debug!(
"{host} {} {}",
ca_folder.as_ref().display(),
certificate_folder.as_ref().display()
);
Self {
ca: Some(ca_folder.as_ref().join("ca.pem")),
certificate: Some(certificate_folder.as_ref().join(format!("{host}.pem"))),
private_key: Some(certificate_folder.as_ref().join(format!("{host}.key"))),
protocols,
log_keys: true,
..Self::default()
}
}
}
impl Default for Config<'_> {
fn default() -> Self {
Self {
ca: None,
certificate: None,
private_key: None,
protocols: &[],
log_keys: false,
verify_peer: false,
max_streams: MAX_STREAM,
max_data_stream: MAX_DATA_STREAM,
max_data: MAX_DATA_STREAM * MAX_STREAM,
}
}
}
impl TryFrom<&Config<'_>> for quiche::Config {
type Error = crate::Error;
fn try_from(old: &Config<'_>) -> Result<Self, Self::Error> {
let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
if let Some(ref ca) = old.ca {
config.load_verify_locations_from_file(
ca.as_os_str()
.to_str()
.expect("Invalid certificate folder name"),
)?;
}
if let Some(ref certificate) = old.certificate {
config.load_cert_chain_from_pem_file(
certificate
.as_os_str()
.to_str()
.expect("Invalid certificate name"),
)?;
}
if let Some(ref private_key) = old.private_key {
config.load_priv_key_from_pem_file(
private_key
.as_os_str()
.to_str()
.expect("Invalid private key name"),
)?;
}
config.verify_peer(old.verify_peer);
config.set_application_protos(old.protocols)?;
if old.log_keys {
config.log_keys();
}
config.set_initial_max_streams_uni(old.max_streams);
config.set_initial_max_stream_data_uni(old.max_data_stream);
config.set_initial_max_data(old.max_data);
config.discover_pmtu(true);
Ok(config)
}
}
I hope you can help me figure out, either what I did wrong, or why it is not working properly.