Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
43805ab
io: add `tokio_util::io::simplex`
ADD-SP Aug 31, 2025
b631600
merge: sync changes from the base branch
ADD-SP Sep 1, 2025
0274aed
merge: sync changes from the base branch
ADD-SP Sep 3, 2025
5b8b019
io: unlock before waking up
ADD-SP Sep 3, 2025
afc7e07
io: make `Inner::with_capacity` more "with_capacity"
ADD-SP Sep 3, 2025
4ffa045
merge: sync changes from the base branch
ADD-SP Nov 5, 2025
585a957
do not include `tokio/rt` by default
ADD-SP Nov 5, 2025
f6fc30a
panics on zero capacity
ADD-SP Nov 5, 2025
41665dc
poll_shutdown should be called multiple times without error
ADD-SP Nov 5, 2025
6eafcd1
wake up the receiver on shutdown
ADD-SP Nov 5, 2025
95837ba
wake up the receiver after dropping the sender
ADD-SP Nov 5, 2025
d7f6d20
wake up the sender after dropping the receiver
ADD-SP Nov 5, 2025
1e30455
fix hang forever issues
ADD-SP Nov 5, 2025
53c6d81
unify the error message
ADD-SP Nov 5, 2025
6eb9659
add more tests
ADD-SP Nov 5, 2025
2225999
merge: sync changes from the base branch
ADD-SP Nov 5, 2025
45976a9
support poll_write_vectored
ADD-SP Nov 5, 2025
d35012b
fix typos
ADD-SP Nov 6, 2025
d9809a8
adopt coop for poll_write_vectored
ADD-SP Nov 6, 2025
439a4aa
merge: sync changes from the base branch
ADD-SP Nov 6, 2025
f954e93
fix rustfmt reports
ADD-SP Nov 6, 2025
34e3a42
merge: sync changes from the base branch
ADD-SP Nov 8, 2025
63176ed
merge: sync changes from the base branch
ADD-SP Nov 11, 2025
555e13c
fix coop and unblock before dropping old waker
ADD-SP Nov 20, 2025
dbd099e
merge: sync changes from the base branch
ADD-SP Nov 20, 2025
1f2e6c7
fixup! fix coop and unblock before dropping old waker
ADD-SP Nov 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions tokio-util/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ macro_rules! cfg_rt {
}
}

macro_rules! cfg_not_rt {
($($item:item)*) => {
$(
#[cfg(not(feature = "rt"))]
$item
)*
}
}

macro_rules! cfg_time {
($($item:item)*) => {
$(
Expand Down
1 change: 1 addition & 0 deletions tokio-util/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod copy_to_bytes;
mod inspect;
mod read_buf;
mod reader_stream;
pub mod simplex;
mod sink_writer;
mod stream_reader;

Expand Down
343 changes: 343 additions & 0 deletions tokio-util/src/io/simplex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,343 @@
//! Unidirectional byte-oriented channel.

use crate::util::poll_proceed;

use bytes::Buf;
use bytes::BytesMut;
use futures_core::ready;
use std::io::Error as IoError;
use std::io::ErrorKind as IoErrorKind;
use std::io::IoSlice;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};

type IoResult<T> = Result<T, IoError>;

const CLOSED_ERROR_MSG: &str = "simplex has been closed";

#[derive(Debug)]
struct Inner {
/// `poll_write` will return [`Poll::Pending`] if the backpressure boundary is reached
backpressure_boundary: usize,

/// either [`Sender`] or [`Receiver`] is closed
is_closed: bool,

/// Waker used to wake the [`Receiver`]
receiver_waker: Option<Waker>,

/// Waker used to wake the [`Sender`]
sender_waker: Option<Waker>,

/// Buffer used to read and write data
buf: BytesMut,
}

impl Inner {
fn with_capacity(capacity: usize) -> Self {
Self {
backpressure_boundary: capacity,
is_closed: false,
receiver_waker: None,
sender_waker: None,
buf: BytesMut::with_capacity(capacity),
}
}

fn register_receiver_waker(&mut self, waker: &Waker) -> Option<Waker> {
match self.receiver_waker.as_mut() {
Some(old) if old.will_wake(waker) => None,
_ => self.receiver_waker.replace(waker.clone()),
}
}

fn register_sender_waker(&mut self, waker: &Waker) -> Option<Waker> {
match self.sender_waker.as_mut() {
Some(old) if old.will_wake(waker) => None,
_ => self.sender_waker.replace(waker.clone()),
}
}

fn take_receiver_waker(&mut self) -> Option<Waker> {
self.receiver_waker.take()
}

fn take_sender_waker(&mut self) -> Option<Waker> {
self.sender_waker.take()
}

fn is_closed(&self) -> bool {
self.is_closed
}

fn close_receiver(&mut self) -> Option<Waker> {
self.is_closed = true;
self.take_sender_waker()
}

fn close_sender(&mut self) -> Option<Waker> {
self.is_closed = true;
self.take_receiver_waker()
}
}

/// Receiver of the simplex channel.
///
/// You can still read the remaining data from the buffer
/// even if the write half has been dropped.
/// See [`Sender::poll_shutdown`] and [`Sender::drop`] for more details.
#[derive(Debug)]
pub struct Receiver {
inner: Arc<Mutex<Inner>>,
}

impl Drop for Receiver {
/// This also wakes up the [`Sender`].
fn drop(&mut self) {
let maybe_waker = {
let mut inner = self.inner.lock().unwrap();
inner.close_receiver()
};

if let Some(waker) = maybe_waker {
waker.wake();
}
}
}

impl AsyncRead for Receiver {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<IoResult<()>> {
let coop = ready!(poll_proceed(cx));

let mut inner = self.inner.lock().unwrap();

let to_read = buf.remaining().min(inner.buf.remaining());
if to_read == 0 {
if inner.is_closed() || buf.remaining() == 0 {
return Poll::Ready(Ok(()));
}

let old_waker = inner.register_receiver_waker(cx.waker());
let maybe_waker = inner.take_sender_waker();

// unlock before waking up and dropping old waker
drop(inner);
drop(old_waker);
if let Some(waker) = maybe_waker {
waker.wake();
}
return Poll::Pending;
}

// this is to avoid starving other tasks
coop.made_progress();

buf.put_slice(&inner.buf[..to_read]);
inner.buf.advance(to_read);

let waker = inner.take_sender_waker();
drop(inner); // unlock before waking up
if let Some(waker) = waker {
waker.wake();
}

Poll::Ready(Ok(()))
}
}

/// Sender of the simplex channel.
///
/// ## Shutdown
///
/// See [`Sender::poll_shutdown`].
#[derive(Debug)]
pub struct Sender {
inner: Arc<Mutex<Inner>>,
}

impl Drop for Sender {
/// This also wakes up the [`Receiver`].
fn drop(&mut self) {
let maybe_waker = {
let mut inner = self.inner.lock().unwrap();
inner.close_sender()
};

if let Some(waker) = maybe_waker {
waker.wake();
}
}
}

impl AsyncWrite for Sender {
/// # Errors
///
/// This method will return [`IoErrorKind::BrokenPipe`]
/// if the channel has been closed.
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
let coop = ready!(poll_proceed(cx));

let mut inner = self.inner.lock().unwrap();

if inner.is_closed() {
return Poll::Ready(Err(IoError::new(IoErrorKind::BrokenPipe, CLOSED_ERROR_MSG)));
}

let free = inner
.backpressure_boundary
.checked_sub(inner.buf.len())
.expect("backpressure boundary overflow");
let to_write = buf.len().min(free);
if to_write == 0 {
if buf.is_empty() {
return Poll::Ready(Ok(0));
}

let old_waker = inner.register_sender_waker(cx.waker());
let waker = inner.take_receiver_waker();

// unlock before waking up and dropping old waker
drop(inner);
drop(old_waker);
if let Some(waker) = waker {
waker.wake();
}

return Poll::Pending;
}

// this is to avoid starving other tasks
coop.made_progress();

inner.buf.extend_from_slice(&buf[..to_write]);

let waker = inner.take_receiver_waker();
drop(inner); // unlock before waking up
if let Some(waker) = waker {
waker.wake();
}

Poll::Ready(Ok(to_write))
}

/// # Errors
///
/// This method will return [`IoErrorKind::BrokenPipe`]
/// if the channel has been closed.
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<IoResult<()>> {
let inner = self.inner.lock().unwrap();
if inner.is_closed() {
Poll::Ready(Err(IoError::new(IoErrorKind::BrokenPipe, CLOSED_ERROR_MSG)))
} else {
Poll::Ready(Ok(()))
}
}

/// After returns [`Poll::Ready`], all the following call to
/// [`Sender::poll_write`] and [`Sender::poll_flush`]
/// will return error.
///
/// The [`Receiver`] can still be used to read remaining data
/// until all bytes have been consumed.
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<IoResult<()>> {
let maybe_waker = {
let mut inner = self.inner.lock().unwrap();
inner.close_sender()
};

if let Some(waker) = maybe_waker {
waker.wake();
}

Poll::Ready(Ok(()))
}

fn is_write_vectored(&self) -> bool {
true
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<Result<usize, IoError>> {
let coop = ready!(poll_proceed(cx));

let mut inner = self.inner.lock().unwrap();
if inner.is_closed() {
return Poll::Ready(Err(IoError::new(IoErrorKind::BrokenPipe, CLOSED_ERROR_MSG)));
}

let free = inner
.backpressure_boundary
.checked_sub(inner.buf.len())
.expect("backpressure boundary overflow");
if free == 0 {
let old_waker = inner.register_sender_waker(cx.waker());
let maybe_waker = inner.take_receiver_waker();

// unlock before waking up and dropping old waker
drop(inner);
drop(old_waker);
if let Some(waker) = maybe_waker {
waker.wake();
}

return Poll::Pending;
}

// this is to avoid starving other tasks
coop.made_progress();

let mut rem = free;
for buf in bufs {
if rem == 0 {
break;
}

let to_write = buf.len().min(rem);
if to_write == 0 {
assert_ne!(rem, 0);
assert_eq!(buf.len(), 0);
continue;
}

inner.buf.extend_from_slice(&buf[..to_write]);
rem -= to_write;
}

let waker = inner.take_receiver_waker();
drop(inner); // unlock before waking up
if let Some(waker) = waker {
waker.wake();
}

Poll::Ready(Ok(free - rem))
}
}

/// Create a simplex channel.
///
/// The `capacity` parameter specifies the maximum number of bytes that can be
/// stored in the channel without making the [`Sender::poll_write`]
/// return [`Poll::Pending`].
///
/// # Panics
///
/// This function will panic if `capacity` is zero.
pub fn new(capacity: usize) -> (Sender, Receiver) {
assert_ne!(capacity, 0, "capacity must be greater than zero");

let inner = Arc::new(Mutex::new(Inner::with_capacity(capacity)));
let tx = Sender {
inner: Arc::clone(&inner),
};
let rx = Receiver { inner };
(tx, rx)
}
23 changes: 23 additions & 0 deletions tokio-util/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,26 @@ pub(crate) use maybe_dangling::MaybeDangling;
#[cfg(any(feature = "io", feature = "codec"))]
#[cfg_attr(not(feature = "io"), allow(unreachable_pub))]
pub use poll_buf::{poll_read_buf, poll_write_buf};

cfg_rt! {
#[cfg_attr(not(feature = "io"), allow(unused))]
pub(crate) use tokio::task::coop::poll_proceed;
}

cfg_not_rt! {
#[cfg_attr(not(feature = "io"), allow(unused))]
use std::task::{Context, Poll};

#[cfg_attr(not(feature = "io"), allow(unused))]
pub(crate) struct RestoreOnPending;

#[cfg_attr(not(feature = "io"), allow(unused))]
impl RestoreOnPending {
pub(crate) fn made_progress(&self) {}
}

#[cfg_attr(not(feature = "io"), allow(unused))]
pub(crate) fn poll_proceed(_cx: &mut Context<'_>) -> Poll<RestoreOnPending> {
Poll::Ready(RestoreOnPending)
}
}
Loading