Skip to content

Commit a123996

Browse files
committed
feat(demo-thread): add time wheel implementation
1 parent d99dcfc commit a123996

File tree

4 files changed

+198
-0
lines changed

4 files changed

+198
-0
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.helltractor.demo.timewheel;
2+
3+
public class Demo {
4+
5+
public static void main(String[] args) {
6+
TimeWheel timeWheel = new TimeWheel();
7+
for (int i = 0; i < 123; i++) {
8+
final int index = i;
9+
timeWheel.addDelayTask(() -> {
10+
System.out.println(index);
11+
}, 1000L * i);
12+
}
13+
}
14+
15+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.helltractor.demo.timewheel;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.concurrent.atomic.AtomicReference;
6+
7+
/**
8+
* A multi-producer single-consumer task queue for scheduling tasks in a time wheel.
9+
*/
10+
public class MpscTaskQueue {
11+
12+
private final AtomicReference<TimeWheel.DelayTask> head = new AtomicReference<>(null);
13+
14+
public void pushTask(TimeWheel.DelayTask delayTask) {
15+
while (true) {
16+
TimeWheel.DelayTask currentHead = head.get();
17+
delayTask.next = currentHead;
18+
if (head.compareAndSet(currentHead, delayTask)) {
19+
return;
20+
}
21+
}
22+
}
23+
24+
public List<Runnable> removeAndReturnTasks(long tickTime) {
25+
List<Runnable> result = new ArrayList<>();
26+
TimeWheel.DelayTask prev = null;
27+
TimeWheel.DelayTask current = head.get();
28+
29+
while (current != null) {
30+
if (current.deadline > tickTime) {
31+
prev = current;
32+
current = current.next;
33+
continue;
34+
}
35+
TimeWheel.DelayTask next = current.next;
36+
if (prev != null) {
37+
prev.next = next;
38+
current.next = null;
39+
current = next;
40+
continue;
41+
}
42+
if (head.compareAndSet(current, next)) {
43+
result.add(current.task);
44+
current.next = null;
45+
current = next;
46+
continue;
47+
}
48+
current = head.get();
49+
}
50+
51+
return result;
52+
}
53+
54+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package com.helltractor.demo.timewheel;
2+
3+
import java.util.List;
4+
import java.util.concurrent.CountDownLatch;
5+
import java.util.concurrent.ExecutorService;
6+
import java.util.concurrent.Executors;
7+
import java.util.concurrent.atomic.AtomicBoolean;
8+
import java.util.concurrent.locks.LockSupport;
9+
10+
/**
11+
* A simple time wheel implementation for scheduling delayed tasks.
12+
* Use MPSC task queues for each slot to allow concurrent task additions.
13+
*/
14+
public class TimeWheel {
15+
16+
private final AtomicBoolean started;
17+
private final CountDownLatch startTimeLatch;
18+
private final Ticker ticker;
19+
private final ExecutorService executor;
20+
private final MpscTaskQueue[] wheel;
21+
22+
// avoid multi-thread read/write issues
23+
private volatile long startTime;
24+
25+
public TimeWheel() {
26+
this.started = new AtomicBoolean(false);
27+
this.startTimeLatch = new CountDownLatch(1);
28+
this.ticker = new Ticker();
29+
this.executor = Executors.newFixedThreadPool(8);
30+
this.wheel = new MpscTaskQueue[10];
31+
for (int i = 0; i < wheel.length; i++) {
32+
wheel[i] = new MpscTaskQueue();
33+
}
34+
}
35+
36+
public void start() {
37+
if (started.compareAndSet(false, true)) {
38+
ticker.start();
39+
}
40+
try {
41+
startTimeLatch.await();
42+
} catch (InterruptedException e) {
43+
Thread.currentThread().interrupt();
44+
}
45+
}
46+
47+
public void stop() {
48+
if (started.compareAndSet(true, false)) {
49+
LockSupport.unpark(ticker);
50+
}
51+
}
52+
53+
public void addDelayTask(Runnable task, long delayMillis) {
54+
start();
55+
DelayTask delayTask = new DelayTask(task, delayMillis);
56+
int index = Math.toIntExact((delayTask.deadline - startTime) / 100 % wheel.length);
57+
MpscTaskQueue slot = wheel[index];
58+
slot.pushTask(delayTask);
59+
}
60+
61+
static class DelayTask {
62+
final Runnable task;
63+
final long deadline;
64+
DelayTask next;
65+
66+
public DelayTask(Runnable task, long delayTime) {
67+
this.task = task;
68+
this.deadline = System.currentTimeMillis() + delayTime;
69+
}
70+
}
71+
72+
class Ticker extends Thread {
73+
74+
int tickCount = 0;
75+
76+
@Override
77+
public void run() {
78+
startTime = System.currentTimeMillis();
79+
startTimeLatch.countDown();
80+
while (started.get()) {
81+
long tickTime = startTime + (tickCount + 1) * 100L;
82+
while (System.currentTimeMillis() <= tickTime) {
83+
LockSupport.parkUntil(tickTime);
84+
// check if stopped
85+
if (!started.get()) {
86+
return;
87+
}
88+
}
89+
int index = tickCount % wheel.length;
90+
MpscTaskQueue queue = wheel[index];
91+
List<Runnable> tasks = queue.removeAndReturnTasks(tickTime);
92+
tasks.forEach(executor::execute);
93+
tickCount++;
94+
}
95+
96+
}
97+
}
98+
99+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.helltractor.demo.timewheel;
2+
3+
import org.junit.jupiter.api.Test;
4+
5+
import java.util.concurrent.CountDownLatch;
6+
import java.util.concurrent.atomic.AtomicInteger;
7+
8+
9+
class TimerWheelTest {
10+
11+
@Test
12+
void testThreadSafe() throws InterruptedException {
13+
TimeWheel timerWheel = new TimeWheel();
14+
CountDownLatch countDownLatch = new CountDownLatch(10000);
15+
AtomicInteger count = new AtomicInteger();
16+
for (int i = 0; i < 10; i++) {
17+
new Thread(() -> {
18+
for (int i1 = 0; i1 < 1000; i1++) {
19+
timerWheel.addDelayTask(() -> {
20+
System.out.println(count.incrementAndGet());
21+
countDownLatch.countDown();
22+
}, 100);
23+
}
24+
System.out.println("添加结束");
25+
}).start();
26+
}
27+
countDownLatch.await();
28+
}
29+
30+
}

0 commit comments

Comments
 (0)