-
Notifications
You must be signed in to change notification settings - Fork 94
read entries using AIO #286
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 10 commits
b0fa034
147130d
93282f9
169b6fe
ad3cbcf
dfe2340
aadefae
7a6745a
1a41d8c
f3a89c0
1010535
1bc8888
5337a74
0b9988d
198ea08
a732d52
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -308,8 +308,36 @@ where | |
| vec.push(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), i)?); | ||
| } | ||
| ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); | ||
|
|
||
| return Ok(ents_idx.len()); | ||
| } | ||
| Ok(0) | ||
| } | ||
|
|
||
| pub fn fetch_entries_to_aio<M: Message + MessageExt<Entry = M>>( | ||
| &self, | ||
| region_id: u64, | ||
| begin: u64, | ||
| end: u64, | ||
| max_size: Option<usize>, | ||
| vec: &mut Vec<M::Entry>, | ||
| ) -> Result<usize> { | ||
|
||
| let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM); | ||
| if let Some(memtable) = self.memtables.get(region_id) { | ||
| let length = (end - begin) as usize; | ||
| let mut ents_idx: Vec<EntryIndex> = Vec::with_capacity(length); | ||
| memtable | ||
| .read() | ||
| .fetch_entries_to(begin, end, max_size, &mut ents_idx)?; | ||
|
|
||
| let bytes = self.pipe_log.async_read_bytes(&mut ents_idx).unwrap(); | ||
| parse_entries_from_bytes::<M>(bytes, &mut ents_idx, vec); | ||
|
|
||
| ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); | ||
|
|
||
| return Ok(ents_idx.len()); | ||
| } | ||
|
|
||
| Ok(0) | ||
| } | ||
|
|
||
|
|
@@ -544,7 +572,36 @@ impl BlockCache { | |
| thread_local! { | ||
| static BLOCK_CACHE: BlockCache = BlockCache::new(); | ||
| } | ||
|
|
||
| pub(crate) fn parse_entries_from_bytes<M: MessageExt>( | ||
| bytes: Vec<Vec<u8>>, | ||
| ents_idx: &mut [EntryIndex], | ||
| vec: &mut Vec<M::Entry>, | ||
| ) { | ||
| let mut decode_buf = vec![]; | ||
| let mut seq: i32 = -1; | ||
| for (t, idx) in ents_idx.iter().enumerate() { | ||
| decode_buf = | ||
| match t == 0 || ents_idx[t - 1].entries.unwrap() != ents_idx[t].entries.unwrap() { | ||
| true => { | ||
| seq += 1; | ||
| bytes[seq as usize].to_vec() | ||
| } | ||
| false => decode_buf, | ||
| }; | ||
| vec.push( | ||
| parse_from_bytes( | ||
| &LogBatch::decode_entries_block( | ||
| &decode_buf, | ||
| idx.entries.unwrap(), | ||
| idx.compression_type, | ||
| ) | ||
| .unwrap() | ||
| [idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize], | ||
| ) | ||
| .unwrap(), | ||
| ); | ||
| } | ||
| } | ||
| pub(crate) fn read_entry_from_file<M, P>(pipe_log: &P, idx: &EntryIndex) -> Result<M::Entry> | ||
| where | ||
| M: MessageExt, | ||
|
|
@@ -683,6 +740,41 @@ mod tests { | |
| reader(e.index, entry_index.entries.unwrap().id.queue, &e.data); | ||
| } | ||
| } | ||
| fn scan_entries_aio<FR: Fn(u64, LogQueue, &[u8])>( | ||
|
||
| &self, | ||
| rid: u64, | ||
| start: u64, | ||
| end: u64, | ||
| reader: FR, | ||
| ) { | ||
| let mut entries = Vec::new(); | ||
| self.fetch_entries_to_aio::<Entry>( | ||
| rid, | ||
| self.first_index(rid).unwrap(), | ||
| self.last_index(rid).unwrap() + 1, | ||
| None, | ||
| &mut entries, | ||
| ) | ||
| .unwrap(); | ||
| assert_eq!(entries.len(), (end - start) as usize); | ||
| assert_eq!(entries.first().unwrap().index, start); | ||
| assert_eq!( | ||
| entries.last().unwrap().index, | ||
| self.decode_last_index(rid).unwrap() | ||
| ); | ||
| assert_eq!(entries.last().unwrap().index + 1, end); | ||
| for e in entries.iter() { | ||
| let entry_index = self | ||
| .memtables | ||
| .get(rid) | ||
| .unwrap() | ||
| .read() | ||
| .get_entry(e.index) | ||
| .unwrap(); | ||
| assert_eq!(&self.get_entry::<Entry>(rid, e.index).unwrap().unwrap(), e); | ||
| reader(e.index, entry_index.entries.unwrap().id.queue, &e.data); | ||
| } | ||
| } | ||
|
|
||
| fn file_count(&self, queue: Option<LogQueue>) -> usize { | ||
| if let Some(queue) = queue { | ||
|
|
@@ -745,6 +837,14 @@ mod tests { | |
| assert_eq!(d, &data); | ||
| }); | ||
| } | ||
| for i in 10..20 { | ||
| let rid = i; | ||
| let index = i; | ||
| engine.scan_entries_aio(rid, index, index + 2, |_, q, d| { | ||
| assert_eq!(q, LogQueue::Append); | ||
| assert_eq!(d, &data); | ||
| }); | ||
| } | ||
|
|
||
| // Recover the engine. | ||
| let engine = engine.reopen(); | ||
|
|
@@ -1994,6 +2094,21 @@ mod tests { | |
| type Handle = <ObfuscatedFileSystem as FileSystem>::Handle; | ||
| type Reader = <ObfuscatedFileSystem as FileSystem>::Reader; | ||
| type Writer = <ObfuscatedFileSystem as FileSystem>::Writer; | ||
| type AsyncIoContext = <ObfuscatedFileSystem as FileSystem>::AsyncIoContext; | ||
|
|
||
| fn async_read( | ||
| &self, | ||
| ctx: &mut Self::AsyncIoContext, | ||
| handle: Arc<Self::Handle>, | ||
| buf: Vec<u8>, | ||
| block: &mut FileBlockHandle, | ||
| ) -> std::io::Result<()> { | ||
| self.inner.async_read(ctx, handle, buf, block) | ||
| } | ||
|
|
||
| fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> std::io::Result<Vec<Vec<u8>>> { | ||
| self.inner.async_finish(ctx) | ||
| } | ||
|
|
||
| fn create<P: AsRef<Path>>(&self, path: P) -> std::io::Result<Self::Handle> { | ||
| let handle = self.inner.create(&path)?; | ||
|
|
@@ -2056,6 +2171,10 @@ mod tests { | |
| fn new_writer(&self, h: Arc<Self::Handle>) -> std::io::Result<Self::Writer> { | ||
| self.inner.new_writer(h) | ||
| } | ||
|
|
||
| fn new_async_io_context(&self, block_sum: usize) -> std::io::Result<Self::AsyncIoContext> { | ||
| self.inner.new_async_io_context(block_sum) | ||
| } | ||
| } | ||
|
|
||
| #[test] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,20 +1,27 @@ | ||
| // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. | ||
|
|
||
| use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; | ||
| use std::ffi::c_void; | ||
| use std::io::{Error, ErrorKind, Read, Result as IoResult, Seek, SeekFrom, Write}; | ||
| use std::os::unix::io::RawFd; | ||
| use std::path::Path; | ||
| use std::sync::Arc; | ||
| use std::{mem, ptr}; | ||
|
|
||
| use fail::fail_point; | ||
| use libc::{aio_return, aiocb, off_t}; | ||
| use log::error; | ||
| use nix::errno::Errno; | ||
| use nix::fcntl::{self, OFlag}; | ||
| use nix::sys::signal::{SigEvent, SigevNotify}; | ||
| use nix::sys::stat::Mode; | ||
| use nix::sys::uio::{pread, pwrite}; | ||
| use nix::unistd::{close, ftruncate, lseek, Whence}; | ||
| use nix::NixPath; | ||
|
|
||
| use crate::env::{FileSystem, Handle, WriteExt}; | ||
| use crate::env::{AsyncContext, FileSystem, Handle, WriteExt}; | ||
| use crate::pipe_log::FileBlockHandle; | ||
|
|
||
| const MAX_ASYNC_READ_TRY_TIME: usize = 10; | ||
|
|
||
| fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error { | ||
| let kind = std::io::Error::from(e).kind(); | ||
|
|
@@ -97,6 +104,19 @@ impl LogFd { | |
| Ok(readed) | ||
| } | ||
|
|
||
| pub fn read_aio(&self, aior: &mut aiocb, len: usize, pbuf: *mut u8, offset: u64) { | ||
| unsafe { | ||
| aior.aio_fildes = self.0; | ||
| aior.aio_reqprio = 0; | ||
| aior.aio_sigevent = SigEvent::new(SigevNotify::SigevNone).sigevent(); | ||
| aior.aio_nbytes = len; | ||
| aior.aio_buf = pbuf as *mut c_void; | ||
| aior.aio_lio_opcode = libc::LIO_READ; | ||
| aior.aio_offset = offset as off_t; | ||
| libc::aio_read(aior); | ||
| } | ||
| } | ||
|
|
||
| /// Writes some bytes to this file starting at `offset`. Returns how many | ||
| /// bytes were written. | ||
| pub fn write(&self, mut offset: usize, content: &[u8]) -> IoResult<usize> { | ||
|
|
@@ -257,12 +277,113 @@ impl WriteExt for LogFile { | |
| } | ||
| } | ||
|
|
||
| pub struct AioContext { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't use this name outside this file. Just like |
||
| inner: Option<Arc<LogFd>>, | ||
| index: usize, | ||
| aio_vec: Vec<aiocb>, | ||
| pub(crate) buf_vec: Vec<Vec<u8>>, | ||
| } | ||
| impl AioContext { | ||
| pub fn new(block_sum: usize) -> Self { | ||
| let mut aio_vec = vec![]; | ||
| let buf_vec = vec![]; | ||
| unsafe { | ||
| for _ in 0..block_sum { | ||
| aio_vec.push(mem::zeroed::<libc::aiocb>()); | ||
| } | ||
|
||
| } | ||
| Self { | ||
| inner: None, | ||
| index: 0, | ||
| aio_vec, | ||
| buf_vec, | ||
| } | ||
| } | ||
|
|
||
| pub fn set_fd(&mut self, fd: Arc<LogFd>) { | ||
|
||
| self.inner = Some(fd); | ||
| } | ||
| } | ||
|
|
||
| impl AsyncContext for AioContext { | ||
|
||
| fn wait(&mut self) -> IoResult<usize> { | ||
| let mut total = 0; | ||
| for seq in 0..self.aio_vec.len() { | ||
| match self.single_wait(seq) { | ||
| Ok(_) => total += 1, | ||
| Err(e) => return Err(e), | ||
| } | ||
| } | ||
| Ok(total as usize) | ||
| } | ||
|
|
||
| fn data(&self, seq: usize) -> &[u8] { | ||
| &self.buf_vec[seq] | ||
| } | ||
|
|
||
| fn single_wait(&mut self, seq: usize) -> IoResult<usize> { | ||
| let buf_len = self.buf_vec[seq].len(); | ||
|
|
||
| unsafe { | ||
| for _ in 0..MAX_ASYNC_READ_TRY_TIME { | ||
| libc::aio_suspend( | ||
| vec![&mut self.aio_vec[seq]].as_ptr() as *const *const aiocb, | ||
| 1_i32, | ||
| ptr::null::<libc::timespec>(), | ||
| ); | ||
| if buf_len == aio_return(&mut self.aio_vec[seq]) as usize { | ||
| return Ok(buf_len); | ||
| } | ||
| } | ||
| } | ||
| Err(Error::new(ErrorKind::Other, "Async IO panic.")) | ||
| } | ||
|
|
||
| fn submit_read_req(&mut self, buf: Vec<u8>, offset: u64) -> IoResult<()> { | ||
| let seq = self.index; | ||
| self.index += 1; | ||
| self.buf_vec.push(buf); | ||
|
|
||
| self.inner.as_ref().unwrap().read_aio( | ||
| &mut self.aio_vec[seq], | ||
| self.buf_vec[seq].len(), | ||
| self.buf_vec[seq].as_mut_ptr(), | ||
| offset, | ||
| ); | ||
|
|
||
| Ok(()) | ||
| } | ||
| } | ||
|
|
||
| pub struct DefaultFileSystem; | ||
|
|
||
| impl FileSystem for DefaultFileSystem { | ||
| type Handle = LogFd; | ||
| type Reader = LogFile; | ||
| type Writer = LogFile; | ||
| type AsyncIoContext = AioContext; | ||
|
|
||
| fn async_read( | ||
| &self, | ||
| ctx: &mut Self::AsyncIoContext, | ||
| handle: Arc<Self::Handle>, | ||
| buf: Vec<u8>, | ||
| block: &mut FileBlockHandle, | ||
| ) -> IoResult<()> { | ||
| ctx.set_fd(handle); | ||
| ctx.submit_read_req(buf, block.offset) | ||
| } | ||
|
|
||
| fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> IoResult<Vec<Vec<u8>>> { | ||
| let mut res = vec![]; | ||
| for seq in 0..ctx.index { | ||
| match ctx.single_wait(seq) { | ||
| Ok(_) => res.push(ctx.data(seq).to_vec()), | ||
| Err(e) => return Err(e), | ||
| } | ||
| } | ||
| Ok(res) | ||
| } | ||
|
|
||
| fn create<P: AsRef<Path>>(&self, path: P) -> IoResult<Self::Handle> { | ||
| LogFd::create(path.as_ref()) | ||
|
|
@@ -288,4 +409,8 @@ impl FileSystem for DefaultFileSystem { | |
| fn new_writer(&self, handle: Arc<Self::Handle>) -> IoResult<Self::Writer> { | ||
| Ok(LogFile::new(handle)) | ||
| } | ||
|
|
||
| fn new_async_io_context(&self, block_sum: usize) -> IoResult<Self::AsyncIoContext> { | ||
| Ok(AioContext::new(block_sum)) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,14 +7,26 @@ use std::sync::Arc; | |
| mod default; | ||
| mod obfuscated; | ||
|
|
||
| pub use default::AioContext; | ||
|
||
| pub use default::DefaultFileSystem; | ||
| pub use obfuscated::ObfuscatedFileSystem; | ||
|
|
||
| use crate::pipe_log::FileBlockHandle; | ||
| /// FileSystem | ||
| pub trait FileSystem: Send + Sync { | ||
| type Handle: Send + Sync + Handle; | ||
| type Reader: Seek + Read + Send; | ||
| type Writer: Seek + Write + Send + WriteExt; | ||
| type AsyncIoContext: AsyncContext; | ||
|
|
||
| fn async_read( | ||
|
||
| &self, | ||
| ctx: &mut Self::AsyncIoContext, | ||
| handle: Arc<Self::Handle>, | ||
| buf: Vec<u8>, | ||
| block: &mut FileBlockHandle, | ||
| ) -> Result<()>; | ||
| fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> Result<Vec<Vec<u8>>>; | ||
|
|
||
| fn create<P: AsRef<Path>>(&self, path: P) -> Result<Self::Handle>; | ||
|
|
||
|
|
@@ -55,6 +67,8 @@ pub trait FileSystem: Send + Sync { | |
| fn new_reader(&self, handle: Arc<Self::Handle>) -> Result<Self::Reader>; | ||
|
|
||
| fn new_writer(&self, handle: Arc<Self::Handle>) -> Result<Self::Writer>; | ||
|
|
||
| fn new_async_io_context(&self, block_sum: usize) -> Result<Self::AsyncIoContext>; | ||
| } | ||
|
|
||
| pub trait Handle { | ||
|
|
@@ -71,3 +85,11 @@ pub trait WriteExt { | |
| fn truncate(&mut self, offset: usize) -> Result<()>; | ||
| fn allocate(&mut self, offset: usize, size: usize) -> Result<()>; | ||
| } | ||
|
|
||
| pub trait AsyncContext { | ||
| fn wait(&mut self) -> Result<usize>; | ||
| fn data(&self, seq: usize) -> &[u8]; | ||
| fn single_wait(&mut self, seq: usize) -> Result<usize>; | ||
|
|
||
| fn submit_read_req(&mut self, buf: Vec<u8>, offset: u64) -> Result<()>; | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.