Skip to content

Commit 928dd70

Browse files
committed
make send to two disks atomic
Signed-off-by: Connor1996 <[email protected]>
1 parent adbd517 commit 928dd70

File tree

1 file changed

+37
-22
lines changed

1 file changed

+37
-22
lines changed

src/env/double_write.rs

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::sync::atomic::AtomicU64;
2222
use std::sync::atomic::Ordering;
2323
use std::sync::Arc;
2424
use std::thread;
25+
use std::sync::Mutex;
2526

2627
use crate::env::default::LogFd;
2728
use crate::env::DefaultFileSystem;
@@ -92,13 +93,35 @@ fn replace_path(path: &Path, from: &Path, to: &Path) -> PathBuf {
9293
}
9394
}
9495

96+
97+
// Make sure the task is sent to two disks' channel atomically, otherwise the ordering of the tasks in two disks are not same.
98+
#[derive(Clone)]
99+
struct HedgedSender(Arc<Mutex<HedgedSenderInner>>);
100+
101+
struct HedgedSenderInner {
102+
disk1: Sender<(Task, Callback<TaskRes>)>,
103+
disk2: Sender<(Task, Callback<TaskRes>)>,
104+
}
105+
106+
impl HedgedSender {
107+
fn new(disk1: Sender<(Task, Callback<TaskRes>)>, disk2: Sender<(Task, Callback<TaskRes>)>) -> Self {
108+
Self(Arc::new(Mutex::new(HedgedSenderInner { disk1, disk2 })))
109+
}
110+
111+
fn send(&self, task1: Task, task2: Task, cb1: Callback<TaskRes>, cb2: Callback<TaskRes>) {
112+
let mut inner = self.0.lock().unwrap();
113+
inner.disk1.send((task1, cb1)).unwrap();
114+
inner.disk2.send((task2, cb2)).unwrap();
115+
}
116+
}
117+
95118
pub struct HedgedFileSystem {
96119
base: Arc<DefaultFileSystem>,
97120

98121
path1: PathBuf,
99122
path2: PathBuf,
100-
disk1: Sender<(Task, Callback<TaskRes>)>,
101-
disk2: Sender<(Task, Callback<TaskRes>)>,
123+
124+
sender: HedgedSender,
102125

103126
counter1: Arc<AtomicU64>,
104127
counter2: Arc<AtomicU64>,
@@ -141,12 +164,12 @@ impl HedgedFileSystem {
141164
counter2_clone.fetch_add(1, Ordering::Relaxed);
142165
}
143166
});
167+
let sender = HedgedSender::new(tx1, tx2);
144168
Self {
145169
base,
146170
path1,
147171
path2,
148-
disk1: tx1,
149-
disk2: tx2,
172+
sender,
150173
counter1,
151174
counter2,
152175
handle1: Some(handle1),
@@ -337,8 +360,7 @@ impl HedgedFileSystem {
337360
async fn wait_handle(&self, task1: Task, task2: Task) -> IoResult<HedgedHandle> {
338361
let (cb1, mut f1) = paired_future_callback();
339362
let (cb2, mut f2) = paired_future_callback();
340-
self.disk1.send((task1, cb1)).unwrap();
341-
self.disk2.send((task2, cb2)).unwrap();
363+
self.sender.send(task1, task2, cb1, cb2);
342364

343365
let resolve = |res: TaskRes| -> LogFd {
344366
match res {
@@ -348,18 +370,17 @@ impl HedgedFileSystem {
348370
}
349371
};
350372
select! {
351-
res1 = f1 => res1.unwrap().map(|res| HedgedHandle::new(
373+
res1 = f1 => res1.unwrap().map(|res| HedgedHandle::new(self.sender.clone(),
352374
FutureHandle::new_owned(resolve(res)), FutureHandle::new(f2) , self.counter1.clone(), self.counter2.clone())),
353-
res2 = f2 => res2.unwrap().map(|res| HedgedHandle::new(
375+
res2 = f2 => res2.unwrap().map(|res| HedgedHandle::new(self.sender.clone(),
354376
FutureHandle::new(f1), FutureHandle::new_owned(resolve(res)) , self.counter1.clone(), self.counter2.clone())),
355377
}
356378
}
357379

358380
async fn wait_one(&self, task1: Task, task2: Task) -> IoResult<()> {
359381
let (cb1, mut f1) = paired_future_callback();
360382
let (cb2, mut f2) = paired_future_callback();
361-
self.disk1.send((task1, cb1)).unwrap();
362-
self.disk2.send((task2, cb2)).unwrap();
383+
self.sender.send(task1, task2, cb1, cb2) ;
363384

364385
select! {
365386
res1 = f1 => res1.unwrap().map(|_| ()),
@@ -404,8 +425,7 @@ impl HedgedFileSystem {
404425

405426
impl Drop for HedgedFileSystem {
406427
fn drop(&mut self) {
407-
self.disk1.send((Task::Stop, Box::new(|_| {}))).unwrap();
408-
self.disk2.send((Task::Stop, Box::new(|_| {}))).unwrap();
428+
self.sender.send(Task::Stop, Task::Stop, Box::new(|_| {}), Box::new(|_| {}));
409429
self.handle1.take().unwrap().join().unwrap();
410430
self.handle2.take().unwrap().join().unwrap();
411431
}
@@ -585,8 +605,7 @@ impl FutureHandle {
585605
}
586606

587607
pub struct HedgedHandle {
588-
disk1: Sender<(Task, Callback<TaskRes>)>,
589-
disk2: Sender<(Task, Callback<TaskRes>)>,
608+
sender: HedgedSender,
590609

591610
handle1: Arc<FutureHandle>,
592611
handle2: Arc<FutureHandle>,
@@ -596,18 +615,15 @@ pub struct HedgedHandle {
596615
}
597616

598617
impl HedgedHandle {
599-
pub fn new(
618+
fn new(
619+
sender: HedgedSender,
600620
handle1: FutureHandle,
601621
handle2: FutureHandle,
602622
counter1: Arc<AtomicU64>,
603623
counter2: Arc<AtomicU64>,
604624
) -> Self {
605-
let (tx1, rx1) = unbounded::<(Task, Callback<TaskRes>)>();
606-
let (tx2, rx2) = unbounded::<(Task, Callback<TaskRes>)>();
607-
608625
Self {
609-
disk1: tx1,
610-
disk2: tx2,
626+
sender,
611627
handle1: Arc::new(handle1),
612628
handle2: Arc::new(handle2),
613629
counter1,
@@ -676,8 +692,7 @@ impl HedgedHandle {
676692
async fn wait_one(&self, task1: Task, task2: Task) -> IoResult<TaskRes> {
677693
let (cb1, mut f1) = paired_future_callback();
678694
let (cb2, mut f2) = paired_future_callback();
679-
self.disk1.send((task1, cb1)).unwrap();
680-
self.disk2.send((task2, cb2)).unwrap();
695+
self.sender.send(task1, task2, cb1, cb2) ;
681696

682697
select! {
683698
res1 = f1 => res1.unwrap(),

0 commit comments

Comments
 (0)