Skip to content
121 changes: 120 additions & 1 deletion src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,36 @@ where
vec.push(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), i)?);
}
ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64);

return Ok(ents_idx.len());
}
Ok(0)
}

pub fn fetch_entries_to_aio<M: Message + MessageExt<Entry = M>>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn fetch_entries_to_aio<M: Message + MessageExt<Entry = M>>(
pub fn fetch_entries_to_aio<M: MessageExt>(

&self,
region_id: u64,
begin: u64,
end: u64,
max_size: Option<usize>,
vec: &mut Vec<M::Entry>,
) -> Result<usize> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think based on the tests, you can antomatically select single_read or multi_read and avoid creating two different engine methods, e.g. use aio when blocks.len() > 4 or something.

One issue though is that I'm not sure if aio syscall is portable enough. You might need to do some research on how to detect if aio is available (maybe take a look at how RocksDB did it).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Let me try.

let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM);
if let Some(memtable) = self.memtables.get(region_id) {
let length = (end - begin) as usize;
let mut ents_idx: Vec<EntryIndex> = Vec::with_capacity(length);
memtable
.read()
.fetch_entries_to(begin, end, max_size, &mut ents_idx)?;

let bytes = self.pipe_log.async_read_bytes(&mut ents_idx).unwrap();
parse_entries_from_bytes::<M>(bytes, &mut ents_idx, vec);

ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64);

return Ok(ents_idx.len());
}

Ok(0)
}

Expand Down Expand Up @@ -544,7 +572,36 @@ impl BlockCache {
thread_local! {
static BLOCK_CACHE: BlockCache = BlockCache::new();
}

pub(crate) fn parse_entries_from_bytes<M: MessageExt>(
bytes: Vec<Vec<u8>>,
ents_idx: &mut [EntryIndex],
vec: &mut Vec<M::Entry>,
) {
let mut decode_buf = vec![];
let mut seq: i32 = -1;
for (t, idx) in ents_idx.iter().enumerate() {
decode_buf =
match t == 0 || ents_idx[t - 1].entries.unwrap() != ents_idx[t].entries.unwrap() {
true => {
seq += 1;
bytes[seq as usize].to_vec()
}
false => decode_buf,
};
vec.push(
parse_from_bytes(
&LogBatch::decode_entries_block(
&decode_buf,
idx.entries.unwrap(),
idx.compression_type,
)
.unwrap()
[idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize],
)
.unwrap(),
);
}
}
pub(crate) fn read_entry_from_file<M, P>(pipe_log: &P, idx: &EntryIndex) -> Result<M::Entry>
where
M: MessageExt,
Expand Down Expand Up @@ -683,6 +740,41 @@ mod tests {
reader(e.index, entry_index.entries.unwrap().id.queue, &e.data);
}
}
fn scan_entries_aio<FR: Fn(u64, LogQueue, &[u8])>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a newline between functions.

&self,
rid: u64,
start: u64,
end: u64,
reader: FR,
) {
let mut entries = Vec::new();
self.fetch_entries_to_aio::<Entry>(
rid,
self.first_index(rid).unwrap(),
self.last_index(rid).unwrap() + 1,
None,
&mut entries,
)
.unwrap();
assert_eq!(entries.len(), (end - start) as usize);
assert_eq!(entries.first().unwrap().index, start);
assert_eq!(
entries.last().unwrap().index,
self.decode_last_index(rid).unwrap()
);
assert_eq!(entries.last().unwrap().index + 1, end);
for e in entries.iter() {
let entry_index = self
.memtables
.get(rid)
.unwrap()
.read()
.get_entry(e.index)
.unwrap();
assert_eq!(&self.get_entry::<Entry>(rid, e.index).unwrap().unwrap(), e);
reader(e.index, entry_index.entries.unwrap().id.queue, &e.data);
}
}

fn file_count(&self, queue: Option<LogQueue>) -> usize {
if let Some(queue) = queue {
Expand Down Expand Up @@ -745,6 +837,14 @@ mod tests {
assert_eq!(d, &data);
});
}
for i in 10..20 {
let rid = i;
let index = i;
engine.scan_entries_aio(rid, index, index + 2, |_, q, d| {
assert_eq!(q, LogQueue::Append);
assert_eq!(d, &data);
});
}

// Recover the engine.
let engine = engine.reopen();
Expand Down Expand Up @@ -1994,6 +2094,21 @@ mod tests {
type Handle = <ObfuscatedFileSystem as FileSystem>::Handle;
type Reader = <ObfuscatedFileSystem as FileSystem>::Reader;
type Writer = <ObfuscatedFileSystem as FileSystem>::Writer;
type AsyncIoContext = <ObfuscatedFileSystem as FileSystem>::AsyncIoContext;

fn async_read(
&self,
ctx: &mut Self::AsyncIoContext,
handle: Arc<Self::Handle>,
buf: Vec<u8>,
block: &mut FileBlockHandle,
) -> std::io::Result<()> {
self.inner.async_read(ctx, handle, buf, block)
}

fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> std::io::Result<Vec<Vec<u8>>> {
self.inner.async_finish(ctx)
}

fn create<P: AsRef<Path>>(&self, path: P) -> std::io::Result<Self::Handle> {
let handle = self.inner.create(&path)?;
Expand Down Expand Up @@ -2056,6 +2171,10 @@ mod tests {
fn new_writer(&self, h: Arc<Self::Handle>) -> std::io::Result<Self::Writer> {
self.inner.new_writer(h)
}

fn new_async_io_context(&self, block_sum: usize) -> std::io::Result<Self::AsyncIoContext> {
self.inner.new_async_io_context(block_sum)
}
}

#[test]
Expand Down
129 changes: 127 additions & 2 deletions src/env/default.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0.

use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write};
use std::ffi::c_void;
use std::io::{Error, ErrorKind, Read, Result as IoResult, Seek, SeekFrom, Write};
use std::os::unix::io::RawFd;
use std::path::Path;
use std::sync::Arc;
use std::{mem, ptr};

use fail::fail_point;
use libc::{aio_return, aiocb, off_t};
use log::error;
use nix::errno::Errno;
use nix::fcntl::{self, OFlag};
use nix::sys::signal::{SigEvent, SigevNotify};
use nix::sys::stat::Mode;
use nix::sys::uio::{pread, pwrite};
use nix::unistd::{close, ftruncate, lseek, Whence};
use nix::NixPath;

use crate::env::{FileSystem, Handle, WriteExt};
use crate::env::{AsyncContext, FileSystem, Handle, WriteExt};
use crate::pipe_log::FileBlockHandle;

const MAX_ASYNC_READ_TRY_TIME: usize = 10;

fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error {
let kind = std::io::Error::from(e).kind();
Expand Down Expand Up @@ -97,6 +104,19 @@ impl LogFd {
Ok(readed)
}

pub fn read_aio(&self, aior: &mut aiocb, len: usize, pbuf: *mut u8, offset: u64) {
unsafe {
aior.aio_fildes = self.0;
aior.aio_reqprio = 0;
aior.aio_sigevent = SigEvent::new(SigevNotify::SigevNone).sigevent();
aior.aio_nbytes = len;
aior.aio_buf = pbuf as *mut c_void;
aior.aio_lio_opcode = libc::LIO_READ;
aior.aio_offset = offset as off_t;
libc::aio_read(aior);
}
}

/// Writes some bytes to this file starting at `offset`. Returns how many
/// bytes were written.
pub fn write(&self, mut offset: usize, content: &[u8]) -> IoResult<usize> {
Expand Down Expand Up @@ -257,12 +277,113 @@ impl WriteExt for LogFile {
}
}

pub struct AioContext {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use this name outside this file. Just like type Handle = <DefaultFileSystem as FileSystem>::Handle;, you can use the same syntax to reference aio context of base file system without needing to expose this struct.

inner: Option<Arc<LogFd>>,
index: usize,
aio_vec: Vec<aiocb>,
pub(crate) buf_vec: Vec<Vec<u8>>,
}
impl AioContext {
pub fn new(block_sum: usize) -> Self {
let mut aio_vec = vec![];
let buf_vec = vec![];
unsafe {
for _ in 0..block_sum {
aio_vec.push(mem::zeroed::<libc::aiocb>());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why creating them before hand, instead of creating them the same time as buf_vec?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tabokie If libc:: aiocb is created in multiple function calls, the compiler will assign the same address to the pointer every time, which will leads incorrect result. I'm not so familiar with the memory allocate API of trust, do you have any other good methods?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's possible to have multiple variables with the same address. It's possible that you mistakenly free some of them.

}
Self {
inner: None,
index: 0,
aio_vec,
buf_vec,
}
}

pub fn set_fd(&mut self, fd: Arc<LogFd>) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this is to hide the LogFd. But it isn't necessary if you treat AsyncContext as a data-only struct (inline all code into file system, and remove the trait AsyncContext entirely). i.e.

impl SomeFileSystem {
  fn async_read(ctx: &mut Self::Context, handle: Self::Handle, block: FileBlockHandle) {
    handle.submit_async_read(ctx.buf[i], ...)?;
  }
}

self.inner = Some(fd);
}
}

impl AsyncContext for AioContext {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation wouldn't work with custom file system such as ObfuscatedFileSystem. If you add a test that reads async with obfuscated fs, the result would be wrong.

Also, another minor detail is, it's not intuitive to have async context be used both passively and actively. i.e. fs::new_reader(ctx, handle) should not coexist with ctx::wait().

Let's do this instead:

pub trait FileSystem {
  pub type AsyncContext;
  fn async_read(&self, ctx: &mut Self::AsyncContext, handle: Arc<Self::Handle>, block: FileBlockHandle) -> Result<()>;
  fn async_finish(&self, ctx: Self::AsyncContext) -> Result<Vec<Vec<u8>>;
}

// for obfuscated.rs
pub struct ObfuscatedContext(<DefaultFileSystem as FileSystem>::AsyncIoContext);
impl FileSystem for ObfuscatedFileSystem {
  fn async_finish(&self, ctx: Self::AsyncContext) -> Result<Vec<Vec<u8>>> {
    let base = self.0.async_finish(ctx.0)?;
    for v in &mut base {
      // do obfuscation
      for c in v {
        c.wrapping_sub(1);
      }
    }
  }
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tabokie Whether it is necessary to consider such situations? That is in an fetch_entries_to_aio call, some handles belong to the Append queue and others belong to the Rewrite queue. They correspond to different files_ system, then you need to call 2 times async_finish().In more complex cases, they are interspersed, like Append, Rewrite, Append, Rewrite, Rewrite...,then the design of async_finish() will doesn't work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like this:

impl DualPipes {
  fn read_async(&self, handls: Vec<FileBlockHandle>) ->Vec<Vec<u8>> {
    let mut ctx = fs.new_context();
    for handle in handles {
      fs.read_async(&mut ctx, handle);
    }
    fs.async_finish(ctx);
  }
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tabokie How can I determine which fs to use? Use self.pipes[LogQueue::Append].file_system, or self.pipes[LogQueue::Rewrite].file_system, or both?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either one is fine. They are always the same.

fn wait(&mut self) -> IoResult<usize> {
let mut total = 0;
for seq in 0..self.aio_vec.len() {
match self.single_wait(seq) {
Ok(_) => total += 1,
Err(e) => return Err(e),
}
}
Ok(total as usize)
}

fn data(&self, seq: usize) -> &[u8] {
&self.buf_vec[seq]
}

fn single_wait(&mut self, seq: usize) -> IoResult<usize> {
let buf_len = self.buf_vec[seq].len();

unsafe {
for _ in 0..MAX_ASYNC_READ_TRY_TIME {
libc::aio_suspend(
vec![&mut self.aio_vec[seq]].as_ptr() as *const *const aiocb,
1_i32,
ptr::null::<libc::timespec>(),
);
if buf_len == aio_return(&mut self.aio_vec[seq]) as usize {
return Ok(buf_len);
}
}
}
Err(Error::new(ErrorKind::Other, "Async IO panic."))
}

fn submit_read_req(&mut self, buf: Vec<u8>, offset: u64) -> IoResult<()> {
let seq = self.index;
self.index += 1;
self.buf_vec.push(buf);

self.inner.as_ref().unwrap().read_aio(
&mut self.aio_vec[seq],
self.buf_vec[seq].len(),
self.buf_vec[seq].as_mut_ptr(),
offset,
);

Ok(())
}
}

pub struct DefaultFileSystem;

impl FileSystem for DefaultFileSystem {
type Handle = LogFd;
type Reader = LogFile;
type Writer = LogFile;
type AsyncIoContext = AioContext;

fn async_read(
&self,
ctx: &mut Self::AsyncIoContext,
handle: Arc<Self::Handle>,
buf: Vec<u8>,
block: &mut FileBlockHandle,
) -> IoResult<()> {
ctx.set_fd(handle);
ctx.submit_read_req(buf, block.offset)
}

fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> IoResult<Vec<Vec<u8>>> {
let mut res = vec![];
for seq in 0..ctx.index {
match ctx.single_wait(seq) {
Ok(_) => res.push(ctx.data(seq).to_vec()),
Err(e) => return Err(e),
}
}
Ok(res)
}

fn create<P: AsRef<Path>>(&self, path: P) -> IoResult<Self::Handle> {
LogFd::create(path.as_ref())
Expand All @@ -288,4 +409,8 @@ impl FileSystem for DefaultFileSystem {
fn new_writer(&self, handle: Arc<Self::Handle>) -> IoResult<Self::Writer> {
Ok(LogFile::new(handle))
}

fn new_async_io_context(&self, block_sum: usize) -> IoResult<Self::AsyncIoContext> {
Ok(AioContext::new(block_sum))
}
}
22 changes: 22 additions & 0 deletions src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,26 @@ use std::sync::Arc;
mod default;
mod obfuscated;

pub use default::AioContext;
Copy link
Member

@tabokie tabokie Mar 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this, as explained in #286 (comment)

pub use default::DefaultFileSystem;
pub use obfuscated::ObfuscatedFileSystem;

use crate::pipe_log::FileBlockHandle;
/// FileSystem
pub trait FileSystem: Send + Sync {
type Handle: Send + Sync + Handle;
type Reader: Seek + Read + Send;
type Writer: Seek + Write + Send + WriteExt;
type AsyncIoContext: AsyncContext;

fn async_read(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since all the async happens inside the implementation, we can rename this to something like RocksDB's MultiGet, i.e. multi_read.

&self,
ctx: &mut Self::AsyncIoContext,
handle: Arc<Self::Handle>,
buf: Vec<u8>,
block: &mut FileBlockHandle,
) -> Result<()>;
fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> Result<Vec<Vec<u8>>>;

fn create<P: AsRef<Path>>(&self, path: P) -> Result<Self::Handle>;

Expand Down Expand Up @@ -55,6 +67,8 @@ pub trait FileSystem: Send + Sync {
fn new_reader(&self, handle: Arc<Self::Handle>) -> Result<Self::Reader>;

fn new_writer(&self, handle: Arc<Self::Handle>) -> Result<Self::Writer>;

fn new_async_io_context(&self, block_sum: usize) -> Result<Self::AsyncIoContext>;
}

pub trait Handle {
Expand All @@ -71,3 +85,11 @@ pub trait WriteExt {
fn truncate(&mut self, offset: usize) -> Result<()>;
fn allocate(&mut self, offset: usize, size: usize) -> Result<()>;
}

pub trait AsyncContext {
fn wait(&mut self) -> Result<usize>;
fn data(&self, seq: usize) -> &[u8];
fn single_wait(&mut self, seq: usize) -> Result<usize>;

fn submit_read_req(&mut self, buf: Vec<u8>, offset: u64) -> Result<()>;
}
Loading