Skip to content

Commit

Permalink
implemented distributor, wip on processing initial packet
Browse files Browse the repository at this point in the history
  • Loading branch information
ilumary committed May 20, 2024
1 parent f8d4a5e commit 360a8d7
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 93 deletions.
219 changes: 130 additions & 89 deletions project/src/quic/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,26 @@ const SPACE_ID_DATA: usize = 0x02;
type TSDistributor = Arc<RwLock<Distributor>>;

pub struct Distributor {
//server config for rustls. Will have to be updated to allow client side endpoint
server_config: Option<Arc<rustls::ServerConfig>>,

//RFC 2104, used to generate reset tokens from connection ids
hmac_reset_key: ring::hmac::Key,

//shared channel for sending packets
sending_handle: mpsc::Sender<packet::Datagram>,

//channels for each connection
connection_send_handles: HashMap<ConnectionId, mpsc::Sender<packet::EarlyDatagram>>,
}

impl Default for Distributor {
fn default() -> Self {
//placeholder for default initialization with 0 capacity
let (tx, _) = mpsc::channel::<packet::Datagram>(0);
impl Distributor {
fn new(key: ring::hmac::Key, server_cfg: Option<Arc<rustls::ServerConfig>>) -> Self {
//placeholder for default initialization with 1 capacity
let (tx, _) = mpsc::channel::<packet::Datagram>(1);
Self {
server_config: server_cfg,
hmac_reset_key: key,
sending_handle: tx,
connection_send_handles: HashMap::new(),
}
Expand All @@ -57,16 +68,15 @@ impl Acceptor {

//move into static connection function
println!("accepting connection from {}", &remote);
header.debug_print();

let (tx, rx) = mpsc::channel::<packet::EarlyDatagram>(8);
let new_conn = Connection::early_connection((packet, remote, header), tsd)
.await
.unwrap();

{
tsd.write()
.await
.connection_send_handles
.insert(header.dcid, tx);
}
//while(conn.poll_state() != established) {
// match state
// new_conn.handle_packet() //recv().await
//}

None
}
Expand Down Expand Up @@ -144,11 +154,6 @@ pub struct Endpoint {
recv_loop_handle: Option<JoinHandle<Result<u64, quic_error::Error>>>,
send_loop_handle: Option<JoinHandle<Result<u64, quic_error::Error>>>,

//server config for rustls. Will have to be updated to allow client side endpoint
server_config: Option<Arc<rustls::ServerConfig>>,
//RFC 2104, used to generate reset tokens from connection ids
hmac_reset_key: ring::hmac::Key,

//stores connection channel sender handles
distributor: TSDistributor,
}
Expand All @@ -166,9 +171,10 @@ impl Endpoint {
),
recv_loop_handle: None,
send_loop_handle: None,
server_config,
hmac_reset_key: ring::hmac::Key::new(ring::hmac::HMAC_SHA256, &hmac_reset_key),
distributor: Arc::new(RwLock::new(Distributor::default())),
distributor: Arc::new(RwLock::new(Distributor::new(
ring::hmac::Key::new(ring::hmac::HMAC_SHA256, &hmac_reset_key),
server_config,
))),
}
}

Expand All @@ -192,8 +198,14 @@ impl Endpoint {
}
};
println!("Received {:?} bytes from {:?}", size, src_addr);
let mut octets = OctetsMut::with_slice(&mut buffer[..size]);
let partial_decode = Header::from_bytes(&mut octets, 8).unwrap();
//truncate vector to correct size
buffer.truncate(size);
let mut octets = OctetsMut::with_slice(&mut buffer);

let partial_decode: Header = match Header::from_bytes(&mut octets, 8) {
Ok(h) => h,
Err(error) => panic!("Error: {}", error),
};

if partial_decode.is_inital() {
tx_initial
Expand Down Expand Up @@ -240,48 +252,38 @@ impl Endpoint {

Acceptor { rx: rx_initial }
}
}

// handles a single incoming UDP datagram
pub fn recv(&mut self) -> Result<(), quic_error::Error> {
// TODO: handle coalescing inital, 0-rtt and handshake packets, max udp packet size is 65kb
let mut buffer = [0u8; 65535];

let (size, src_addr) = (0, "0.0.0.0:8080".parse::<SocketAddr>().unwrap());
/*match self.socket.recv_from(&mut buffer) {
Ok((size, src_addr)) => (size, src_addr),
Err(error) => {
println!("Error while receiving datagram: {:?}", error);
return Err(quic_error::Error::socket_error("receiving packet"));
}
};*/
println!("Received {:?} bytes from {:?}", size, src_addr);

let mut octet_buffer = OctetsMut::with_slice(&mut buffer);

//println!("\nRAW\n{:x?}", octet_buffer); // offset 0
pub struct Connection {
inner: Inner,
//bidi_stream_r: mpsc::Receiver<stream::data>,
//uni_stream_r: mpsc::Receiver<stream::data>,
}

//check if most significant bit is 1 or 0, if 0 => short packet => have to get cid len
//somehow
let mut head: Header = match Header::from_bytes(&mut octet_buffer, 8) {
Ok(h) => h,
Err(error) => panic!("Error: {}", error),
impl Connection {
//initializes a connection from an inital packet
async fn early_connection(
inital_datagram: packet::EarlyDatagram,
tsd: TSDistributor,
) -> Result<Self, quic_error::Error> {
let (mut buffer, src_addr, mut head) = inital_datagram;
let (hmac_reset_key, server_config) = {
let t = tsd.read().await;
(t.hmac_reset_key.clone(), t.server_config.clone())
};

//match connection if one already exists

//handle new connection

//get initial keys
let ikp = Keys::initial(Version::V1, &head.dcid.id, Side::Server);

match head.decrypt(&mut octet_buffer, &ikp.remote.header) {
Ok(_) => {
head.debug_print();
}
let header_length = match head.decrypt(&mut buffer, &ikp.remote.header) {
Ok(s) => s,
Err(error) => panic!("Error: {}", error),
}
};

println!("{:?}", head.debug_print());

let (header_raw, mut payload_cipher) = octet_buffer.split_at(octet_buffer.off()).unwrap();
let mut b = OctetsMut::with_slice(&mut buffer);
let (header_raw, mut payload_cipher) = b.split_at(header_length).unwrap();

//cut off trailing 0s from buffer, substract 1 extra beacuse packet num length of 1 is
//encoded as 0...
Expand All @@ -303,7 +305,8 @@ impl Endpoint {
};

//truncate payload to length returned by decrypting packet payload
let mut payload = OctetsMut::with_slice(payload_cipher.as_mut()[..dec_len].as_mut());
//let payload = OctetsMut::with_slice(payload_cipher.as_mut()[..dec_len].as_mut());
buffer.truncate(dec_len);

let initial_local_scid = ConnectionId::generate_with_length(head.dcid.len());
let orig_dcid = head.dcid.clone();
Expand All @@ -313,7 +316,7 @@ impl Endpoint {
.original_destination_connection_id(orig_dcid.id())
.initial_source_connection_id(initial_local_scid.id())
.stateless_reset_token(
token::StatelessResetToken::new(&self.hmac_reset_key, &initial_local_scid)
token::StatelessResetToken::new(&hmac_reset_key, &initial_local_scid)
.token
.to_vec(),
);
Expand All @@ -326,7 +329,7 @@ impl Endpoint {

let conn = RustlsConnection::Server(
rustls::quic::ServerConnection::new(
self.server_config.as_ref().unwrap().clone(),
server_config.unwrap(),
rustls::quic::Version::V1,
data.to_vec(),
)
Expand All @@ -338,26 +341,65 @@ impl Endpoint {
..PacketNumberSpace::new(true)
};

let new_ch = 1; /*self
.create_connection(
Side::Server,
head.version,
orig_dcid,
head.scid.clone().unwrap(),
initial_local_scid,
conn,
ikp,
initial_space,
src_addr,
)
.unwrap();*/

//accept new connection
Ok(())
let (transmit_q, recv_q) = mpsc::channel::<packet::EarlyDatagram>(8);

{
tsd.write()
.await
.connection_send_handles
.insert(initial_local_scid.clone(), transmit_q);
}

let inner = Inner::new(
Side::Server,
head.version,
conn,
ikp,
orig_dcid,
head.scid.clone().unwrap(),
initial_local_scid,
src_addr.parse().unwrap(),
initial_space,
);

let conn = Self { inner };

//process inital packet inside connection, all subsequent packets are sent through channel
//self.inner.accept(payload, head).await;

Ok(conn.start(recv_q).await.unwrap())
}

//starts recv_q, feeds into all other queues
async fn start(
mut self,
mut recv_q: mpsc::Receiver<packet::EarlyDatagram>,
) -> Result<Self, quic_error::Error> {
self.inner.loop_handle = Some(tokio::spawn(async move {
while let Some(msg) = recv_q.recv().await {
println!("received datagram inside connection {:?}", msg.1);

//process

//poll for answer packet
}
Ok(0)
}));
Ok(self)
}

//pub async fn accept_bidi_stream() {}

//pub async fn accept_uni_stream() {}

//pub async fn init_bidi_stream() {}

//pub async fn init_uni_stream() {}
}

pub struct Connection {
struct Inner {
loop_handle: Option<JoinHandle<Result<u64, quic_error::Error>>>,

//side
side: Side,

Expand Down Expand Up @@ -400,8 +442,8 @@ pub struct Connection {
zero_rtt_enabled: bool,
}

impl Connection {
pub fn new(
impl Inner {
fn new(
side: Side,
version: u32,
tls_session: RustlsConnection,
Expand All @@ -412,7 +454,8 @@ impl Connection {
remote_address: SocketAddr,
initial_space: PacketNumberSpace,
) -> Self {
Connection {
Self {
loop_handle: None,
side,
version,
tls_session,
Expand Down Expand Up @@ -441,7 +484,7 @@ impl Connection {
}

//fully decrypts packet header and payload, does tls stuff and passes packet to process_payload
pub fn recv(
fn recv(
&mut self,
header: &mut Header,
payload: &mut OctetsMut<'_>,
Expand All @@ -456,12 +499,10 @@ impl Connection {
}

//accepts new connection, passes payload to process_payload
pub fn accept(
&mut self,
header: &Header,
payload: &mut OctetsMut<'_>,
) -> Result<(), quic_error::Error> {
self.process_payload(header, payload);
fn accept(&mut self, header: &Header, payload_raw: &mut [u8]) -> Result<(), quic_error::Error> {
let mut payload = octets::OctetsMut::with_slice(payload_raw);

self.process_payload(header, &mut payload);

println!(
"transport parameters from peer: {:x?}",
Expand Down Expand Up @@ -816,7 +857,7 @@ impl Connection {
}
}

pub enum ConnectionState {
enum ConnectionState {
Handshake,
Connected,
Terminated,
Expand All @@ -825,7 +866,7 @@ pub enum ConnectionState {
}

//RFC 9000 section 12.3. we have 3 packet number spaces: initial, handshake & 1-RTT
pub struct PacketNumberSpace {
struct PacketNumberSpace {
keys: Option<Keys>,

outgoing_acks: Vec<u64>,
Expand All @@ -837,7 +878,7 @@ pub struct PacketNumberSpace {
}

impl PacketNumberSpace {
pub fn new(with_crypto_buffer: bool) -> Self {
fn new(with_crypto_buffer: bool) -> Self {
Self {
keys: None,
outgoing_acks: Vec::new(),
Expand All @@ -851,7 +892,7 @@ impl PacketNumberSpace {
}

#[derive(Eq, Hash, PartialEq, Clone)]
pub struct ConnectionId {
struct ConnectionId {
id: Vec<u8>,
}

Expand Down
Loading

0 comments on commit 360a8d7

Please sign in to comment.