Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tokio-util: allow encoding borrowed buffers at LengthDelimitedCodec #6533

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 9 additions & 1 deletion tokio-util/src/codec/length_delimited.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,14 @@ impl Encoder<Bytes> for LengthDelimitedCodec {
type Error = io::Error;

fn encode(&mut self, data: Bytes, dst: &mut BytesMut) -> Result<(), io::Error> {
<Self as Encoder<&'_ [u8]>>::encode(self, &data, dst)
}
}

impl<'a> Encoder<&'a [u8]> for LengthDelimitedCodec {
type Error = io::Error;

fn encode(&mut self, data: &'a [u8], dst: &mut BytesMut) -> Result<(), io::Error> {
let n = data.len();

if n > self.builder.max_frame_len {
Expand Down Expand Up @@ -615,7 +623,7 @@ impl Encoder<Bytes> for LengthDelimitedCodec {
}

// Write the frame to the buffer
dst.extend_from_slice(&data[..]);
dst.extend_from_slice(data);

Ok(())
}
Expand Down
157 changes: 126 additions & 31 deletions tokio-util/tests/length_delimited.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ macro_rules! assert_done {
}};
}

type MockFramedWrite =
tokio_util::codec::FramedWrite<Mock, tokio_util::codec::LengthDelimitedCodec>;

#[test]
fn read_empty_io_yields_nothing() {
let io = Box::pin(FramedRead::new(mock!(), LengthDelimitedCodec::new()));
Expand Down Expand Up @@ -423,9 +426,15 @@ fn write_single_frame_length_adjusted() {
pin_mut!(io);

task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_ready(
io.as_mut(),
cx
));
Comment on lines +429 to +432
Copy link
Member

@mox692 mox692 May 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in the original issue, this change could break the existing code like this, so I'm not sure we can make this change right now.

assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
assert_ready_ok!(io.as_mut().poll_flush(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_flush(
io.as_mut(),
cx
));
assert!(io.get_ref().calls.is_empty());
});
}
Expand All @@ -436,7 +445,9 @@ fn write_nothing_yields_nothing() {
pin_mut!(io);

task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.poll_flush(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_flush(
io, cx
));
});
}

Expand All @@ -453,9 +464,15 @@ fn write_single_frame_one_packet() {
pin_mut!(io);

task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_ready(
io.as_mut(),
cx
));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
assert_ready_ok!(io.as_mut().poll_flush(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_flush(
io.as_mut(),
cx
));
assert!(io.get_ref().calls.is_empty());
});
}
Expand All @@ -477,16 +494,28 @@ fn write_single_multi_frame_one_packet() {
pin_mut!(io);

task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_ready(
io.as_mut(),
cx
));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));

assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_ready(
io.as_mut(),
cx
));
assert_ok!(io.as_mut().start_send(Bytes::from("123")));

assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_ready(
io.as_mut(),
cx
));
assert_ok!(io.as_mut().start_send(Bytes::from("hello world")));

assert_ready_ok!(io.as_mut().poll_flush(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_flush(
io.as_mut(),
cx
));
assert!(io.get_ref().calls.is_empty());
});
}
Expand All @@ -510,20 +539,38 @@ fn write_single_multi_frame_multi_packet() {
pin_mut!(io);

task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_ready(
io.as_mut(),
cx
));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));

assert_ready_ok!(io.as_mut().poll_flush(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_flush(
io.as_mut(),
cx
));

assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_ready(
io.as_mut(),
cx
));
assert_ok!(io.as_mut().start_send(Bytes::from("123")));

assert_ready_ok!(io.as_mut().poll_flush(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_flush(
io.as_mut(),
cx
));

assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_ready(
io.as_mut(),
cx
));
assert_ok!(io.as_mut().start_send(Bytes::from("hello world")));

assert_ready_ok!(io.as_mut().poll_flush(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_flush(
io.as_mut(),
cx
));
assert!(io.get_ref().calls.is_empty());
});
}
Expand All @@ -544,12 +591,24 @@ fn write_single_frame_would_block() {
pin_mut!(io);

task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_ready(
io.as_mut(),
cx
));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));

assert_pending!(io.as_mut().poll_flush(cx));
assert_pending!(io.as_mut().poll_flush(cx));
assert_ready_ok!(io.as_mut().poll_flush(cx));
assert_pending!(<MockFramedWrite as futures::Sink<Bytes>>::poll_flush(
io.as_mut(),
cx
));
assert_pending!(<MockFramedWrite as futures::Sink<Bytes>>::poll_flush(
io.as_mut(),
cx
));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_flush(
io.as_mut(),
cx
));

assert!(io.get_ref().calls.is_empty());
});
Expand All @@ -567,10 +626,16 @@ fn write_single_frame_little_endian() {
pin_mut!(io);

task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_ready(
io.as_mut(),
cx
));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));

assert_ready_ok!(io.as_mut().poll_flush(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_flush(
io.as_mut(),
cx
));
assert!(io.get_ref().calls.is_empty());
});
}
Expand All @@ -587,10 +652,16 @@ fn write_single_frame_with_short_length_field() {
pin_mut!(io);

task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_ready(
io.as_mut(),
cx
));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));

assert_ready_ok!(io.as_mut().poll_flush(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_flush(
io.as_mut(),
cx
));

assert!(io.get_ref().calls.is_empty());
});
Expand All @@ -604,7 +675,10 @@ fn write_max_frame_len() {
pin_mut!(io);

task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_ready(
io.as_mut(),
cx
));
assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));

assert!(io.get_ref().calls.is_empty());
Expand All @@ -621,10 +695,16 @@ fn write_update_max_frame_len_at_rest() {
pin_mut!(io);

task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_ready(
io.as_mut(),
cx
));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));

assert_ready_ok!(io.as_mut().poll_flush(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_flush(
io.as_mut(),
cx
));

io.encoder_mut().set_max_frame_length(5);

Expand All @@ -646,14 +726,23 @@ fn write_update_max_frame_len_in_flight() {
pin_mut!(io);

task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_ready(
io.as_mut(),
cx
));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));

assert_pending!(io.as_mut().poll_flush(cx));
assert_pending!(<MockFramedWrite as futures::Sink<Bytes>>::poll_flush(
io.as_mut(),
cx
));

io.encoder_mut().set_max_frame_length(5);

assert_ready_ok!(io.as_mut().poll_flush(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_flush(
io.as_mut(),
cx
));

assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
assert!(io.get_ref().calls.is_empty());
Expand All @@ -666,10 +755,16 @@ fn write_zero() {
pin_mut!(io);

task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ready_ok!(<MockFramedWrite as futures::Sink<Bytes>>::poll_ready(
io.as_mut(),
cx
));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));

assert_ready_err!(io.as_mut().poll_flush(cx));
assert_ready_err!(<MockFramedWrite as futures::Sink<Bytes>>::poll_flush(
io.as_mut(),
cx
));

assert!(io.get_ref().calls.is_empty());
});
Expand Down