-
-
Notifications
You must be signed in to change notification settings - Fork 2.8k
fs: support io_uring with tokio::fs::read
#7696
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
001517e
6e24938
d544c91
a5dbf4e
b7ccee7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,135 @@ | ||||||||
| use crate::fs::OpenOptions; | ||||||||
| use crate::runtime::driver::op::Op; | ||||||||
|
|
||||||||
| use std::io; | ||||||||
| use std::io::ErrorKind; | ||||||||
| use std::os::fd::OwnedFd; | ||||||||
| use std::path::Path; | ||||||||
|
|
||||||||
| // this algorithm is inspired from rust std lib version 1.90.0 | ||||||||
| // https://doc.rust-lang.org/1.90.0/src/std/io/mod.rs.html#409 | ||||||||
| const PROBE_SIZE: usize = 32; | ||||||||
| const PROBE_SIZE_U32: u32 = PROBE_SIZE as u32; | ||||||||
|
|
||||||||
| // Max bytes we can read using io uring submission at a time | ||||||||
| // SAFETY: cannot be higher than u32::MAX for safe cast | ||||||||
| // Set to read max 64 MiB at time | ||||||||
| const MAX_READ_SIZE: usize = 64 * 1024 * 1024; | ||||||||
|
|
||||||||
| pub(crate) async fn read_uring(path: &Path) -> io::Result<Vec<u8>> { | ||||||||
| let file = OpenOptions::new().read(true).open(path).await?; | ||||||||
|
|
||||||||
| // TODO: use io uring in the future to obtain metadata | ||||||||
| let size_hint: Option<usize> = file.metadata().await.map(|m| m.len() as usize).ok(); | ||||||||
|
|
||||||||
| let fd: OwnedFd = file | ||||||||
| .try_into_std() | ||||||||
| .expect("unexpected in-flight operation detected") | ||||||||
| .into(); | ||||||||
|
|
||||||||
| let mut buf = Vec::new(); | ||||||||
|
|
||||||||
| if let Some(size_hint) = size_hint { | ||||||||
| buf.try_reserve(size_hint)?; | ||||||||
| } | ||||||||
|
|
||||||||
| read_to_end_uring(fd, buf).await | ||||||||
| } | ||||||||
|
|
||||||||
| async fn read_to_end_uring(mut fd: OwnedFd, mut buf: Vec<u8>) -> io::Result<Vec<u8>> { | ||||||||
| let mut offset = 0; | ||||||||
| let start_cap = buf.capacity(); | ||||||||
|
|
||||||||
| loop { | ||||||||
| if buf.len() == buf.capacity() && buf.capacity() == start_cap && buf.len() > PROBE_SIZE { | ||||||||
| // The buffer might be an exact fit. Let's read into a probe buffer | ||||||||
| // and see if it returns `Ok(0)`. If so, we've avoided an | ||||||||
| // unnecessary increasing of the capacity. But if not, append the | ||||||||
| // probe buffer to the primary buffer and let its capacity grow. | ||||||||
| let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, &mut offset).await?; | ||||||||
|
|
||||||||
| if size_read == 0 { | ||||||||
| return Ok(r_buf); | ||||||||
| } | ||||||||
|
|
||||||||
| buf = r_buf; | ||||||||
| fd = r_fd; | ||||||||
| } | ||||||||
|
|
||||||||
| // buf is full, need more capacity | ||||||||
| if buf.len() == buf.capacity() { | ||||||||
| buf.try_reserve(PROBE_SIZE)?; | ||||||||
| } | ||||||||
|
|
||||||||
| // prepare the spare capacity to be read into | ||||||||
| let buf_len = usize::min(buf.spare_capacity_mut().len(), MAX_READ_SIZE); | ||||||||
|
|
||||||||
| // buf_len cannot be greater than u32::MAX because MAX_READ_SIZE | ||||||||
| // is less than u32::MAX | ||||||||
| let read_len = buf_len as u32; | ||||||||
|
|
||||||||
| // read into spare capacity | ||||||||
| let (size_read, r_fd, r_buf) = op_read(fd, buf, &mut offset, read_len).await?; | ||||||||
|
|
||||||||
| if size_read == 0 { | ||||||||
| return Ok(r_buf); | ||||||||
| } | ||||||||
|
|
||||||||
| fd = r_fd; | ||||||||
| buf = r_buf; | ||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| async fn small_probe_read( | ||||||||
| fd: OwnedFd, | ||||||||
| mut buf: Vec<u8>, | ||||||||
| offset: &mut u64, | ||||||||
| ) -> io::Result<(u32, OwnedFd, Vec<u8>)> { | ||||||||
| let read_len = PROBE_SIZE_U32; | ||||||||
|
|
||||||||
| let mut temp_arr = [0; PROBE_SIZE]; | ||||||||
| let back_bytes_len = buf.len() - PROBE_SIZE; | ||||||||
|
|
||||||||
| temp_arr.copy_from_slice(&buf[back_bytes_len..]); | ||||||||
|
|
||||||||
| // We're decreasing the length of the buffer and len is greater | ||||||||
| // than PROBE_SIZE. So we can read into the discarded length | ||||||||
| buf.truncate(back_bytes_len); | ||||||||
|
|
||||||||
| let (size_read, r_fd, mut r_buf) = op_read(fd, buf, offset, read_len).await?; | ||||||||
|
|
||||||||
| r_buf.try_reserve(PROBE_SIZE)?; | ||||||||
| r_buf.splice(back_bytes_len..back_bytes_len, temp_arr); | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can handle allocation failure by error rather than panic by allocating before calling
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Darksonn Will this splice write in the reserved area? Because we can only index in the length and not capacity
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. The length of
ADD-SP marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
|
|
||||||||
| Ok((size_read, r_fd, r_buf)) | ||||||||
| } | ||||||||
|
|
||||||||
| async fn op_read( | ||||||||
| mut fd: OwnedFd, | ||||||||
| mut buf: Vec<u8>, | ||||||||
| offset: &mut u64, | ||||||||
| mut read_len: u32, | ||||||||
| ) -> io::Result<(u32, OwnedFd, Vec<u8>)> { | ||||||||
| loop { | ||||||||
| let (res, r_fd, r_buf) = Op::read(fd, buf, read_len, *offset).await; | ||||||||
|
|
||||||||
| match res { | ||||||||
| Err(e) if e.kind() == ErrorKind::Interrupted => { | ||||||||
| buf = r_buf; | ||||||||
| fd = r_fd; | ||||||||
| } | ||||||||
| Err(e) => return Err(e), | ||||||||
| Ok(size_read) => { | ||||||||
| *offset += size_read as u64; | ||||||||
| read_len -= size_read; | ||||||||
|
|
||||||||
| if read_len == 0 || size_read == 0 { | ||||||||
| return Ok((size_read, r_fd, r_buf)); | ||||||||
| } | ||||||||
|
|
||||||||
| buf = r_buf; | ||||||||
| fd = r_fd; | ||||||||
| } | ||||||||
| } | ||||||||
| } | ||||||||
| } | ||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| pub(crate) mod open; | ||
| pub(crate) mod read; | ||
| pub(crate) mod utils; | ||
| pub(crate) mod write; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}; | ||
|
|
||
| use io_uring::{opcode, types}; | ||
| use std::io::{self, Error}; | ||
| use std::os::fd::{AsRawFd, OwnedFd}; | ||
|
|
||
| #[derive(Debug)] | ||
| pub(crate) struct Read { | ||
| fd: OwnedFd, | ||
| buf: Vec<u8>, | ||
| } | ||
|
|
||
| impl Completable for Read { | ||
| type Output = (io::Result<u32>, OwnedFd, Vec<u8>); | ||
|
|
||
| fn complete(self, cqe: CqeResult) -> Self::Output { | ||
| let mut buf = self.buf; | ||
|
|
||
| if let Ok(len) = cqe.result { | ||
| let new_len = buf.len() + len as usize; | ||
| // SAFETY: Kernel read len bytes | ||
| unsafe { buf.set_len(new_len) }; | ||
| } | ||
|
|
||
| (cqe.result, self.fd, buf) | ||
| } | ||
|
|
||
| fn complete_with_error(self, err: Error) -> Self::Output { | ||
| (Err(err), self.fd, self.buf) | ||
| } | ||
| } | ||
|
|
||
| impl Cancellable for Read { | ||
| fn cancel(self) -> CancelData { | ||
| CancelData::Read(self) | ||
| } | ||
| } | ||
|
|
||
| impl Op<Read> { | ||
| // Submit a request to read a FD at given length and offset into a | ||
| // dynamic buffer with uninitialized memory. The read happens on unitialized | ||
Daksh14 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // buffer and no overwriting happens. | ||
|
|
||
| // SAFETY: The `len` of the amount to be read and the buffer that is passed | ||
| // should have capacity > len. | ||
| // | ||
| // If `len` read is higher than vector capacity then setting its length by | ||
| // the caller in terms of size_read can be unsound. | ||
| pub(crate) fn read(fd: OwnedFd, mut buf: Vec<u8>, len: u32, offset: u64) -> Self { | ||
Daksh14 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // don't overwrite on already written part | ||
| assert!(buf.spare_capacity_mut().len() >= len as usize); | ||
| let buf_mut_ptr = buf.spare_capacity_mut().as_mut_ptr().cast(); | ||
|
|
||
| let read_op = opcode::Read::new(types::Fd(fd.as_raw_fd()), buf_mut_ptr, len) | ||
| .offset(offset) | ||
| .build(); | ||
|
|
||
| // SAFETY: Parameters are valid for the entire duration of the operation | ||
| unsafe { Op::new(read_op, Read { fd, buf }) } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason why
cfg_io_uring! {is not used here ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martin-augment its following the currently used code block macro here in
tokio/src/fs/open_options.rs, I can change those in a different "refactor" PR https://github.com/tokio-rs/tokio/blob/master/tokio/src/fs/open_options.rs#L592