Skip to content

Commit 595865d

Browse files
committed
add more tests - change overwrite to use send
1 parent 4b33065 commit 595865d

File tree

2 files changed

+25
-23
lines changed

2 files changed

+25
-23
lines changed

tests/tchannels_ring.nim

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ suite "Ring Buffer Channel Tests":
88
test "Non-blocking ring buffer behavior":
99

1010
proc fillBuffer(n: int): seq[int] =
11-
var chan = newChan[int](BufferSize, overwrite = true)
11+
var chan = newChan[int](BufferSize)
1212
# Fill the buffer
1313
for i in 0..<BufferSize+n:
14-
chan.send(i)
14+
chan.send(i, overwrite = true)
1515

1616
# Receive values - should get BufferSize as first value
1717
var values: seq[int]
@@ -35,9 +35,9 @@ suite "Ring Buffer Channel Tests":
3535
check fillBuffer(8) == @[8, 9, 10]
3636

3737
test "Non-blocking ring buffer behavior with size 1":
38-
var chan = newChan[int](1, overwrite = true)
39-
chan.send(1)
40-
chan.send(2)
38+
var chan = newChan[int](1)
39+
chan.send(1, overwrite = true)
40+
chan.send(2, overwrite = true)
4141
var x: int
4242
check chan.tryRecv(x)
4343
check x == 2

threading/channels.nim

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,9 @@ runnableExamples("--threads:on --gc:orc"):
101101
assert messages.len >= 2
102102

103103
block example_non_blocking_overwrite:
104-
var chanRingBuffer = newChan[string](elements = 1, overwrite = true)
104+
var chanRingBuffer = newChan[string](elements = 1)
105105
chanRingBuffer.send("Hello")
106-
chanRingBuffer.send("World")
106+
chanRingBuffer.send("World", overwrite = true)
107107
var msg = ""
108108
assert chanRingBuffer.tryRecv(msg)
109109
assert msg == "World"
@@ -121,7 +121,6 @@ type
121121
ChannelObj = object
122122
lock: Lock
123123
spaceAvailableCV, dataAvailableCV: Cond
124-
overwrite: bool
125124
slots: int ## Number of item slots in the buffer
126125
head: Atomic[int] ## Write/enqueue/send index
127126
tail: Atomic[int] ## Read/dequeue/receive index
@@ -192,25 +191,30 @@ proc freeChannel(chan: ChannelRaw) =
192191
# MPMC Channels (Multi-Producer Multi-Consumer)
193192
# ------------------------------------------------------------------------------
194193

195-
template incrementReadIndex(chan: ChannelRaw) =
194+
template incrWriteIndex(chan: ChannelRaw) =
195+
atomicInc(chan.head)
196+
if chan.getHead() == 2 * chan.slots:
197+
chan.setHead(0)
198+
199+
template incrReadIndex(chan: ChannelRaw) =
196200
atomicInc(chan.tail)
197201
if chan.getTail() == 2 * chan.slots:
198202
chan.setTail(0)
199203

200-
proc channelSend(chan: ChannelRaw, data: pointer, size: int, blocking: static bool): bool =
204+
proc channelSend(chan: ChannelRaw, data: pointer, size: int, blocking: static bool, overwrite: bool): bool =
201205
assert not chan.isNil
202206
assert not data.isNil
203207

204208
when not blocking:
205-
if chan.isFull() and not chan.overwrite: return false
209+
if chan.isFull() and not overwrite: return false
206210

207211
acquire(chan.lock)
208212

209213
# check for when another thread was faster to fill
210214
when blocking:
211215
if chan.isFull():
212-
if chan.overwrite:
213-
incrementReadIndex(chan)
216+
if overwrite:
217+
incrReadIndex(chan)
214218
else:
215219
while chan.isFull():
216220
wait(chan.spaceAvailableCV, chan.lock)
@@ -228,9 +232,8 @@ proc channelSend(chan: ChannelRaw, data: pointer, size: int, blocking: static bo
228232
chan.getHead() - chan.slots
229233

230234
copyMem(chan.buffer[writeIdx * size].addr, data, size)
231-
atomicInc(chan.head)
232-
if chan.getHead() == 2 * chan.slots:
233-
chan.setHead(0)
235+
236+
incrWriteIndex(chan)
234237

235238
signal(chan.dataAvailableCV)
236239
release(chan.lock)
@@ -264,7 +267,7 @@ proc channelReceive(chan: ChannelRaw, data: pointer, size: int, blocking: static
264267

265268
copyMem(data, chan.buffer[readIdx * size].addr, size)
266269

267-
incrementReadIndex(chan)
270+
incrReadIndex(chan)
268271

269272
signal(chan.spaceAvailableCV)
270273
release(chan.lock)
@@ -364,7 +367,7 @@ proc tryRecv*[T](c: Chan[T], dst: var T): bool {.inline.} =
364367
## Returns `false` and does not change `dist` if no message was received.
365368
channelReceive(c.d, dst.addr, sizeof(T), false)
366369

367-
proc send*[T](c: Chan[T], src: sink Isolated[T]) {.inline.} =
370+
proc send*[T](c: Chan[T], src: sink Isolated[T], overwrite = false) {.inline.} =
368371
## Sends the message `src` to the channel `c`.
369372
## This blocks the sending thread until `src` was successfully sent.
370373
##
@@ -374,13 +377,13 @@ proc send*[T](c: Chan[T], src: sink Isolated[T]) {.inline.} =
374377
## messages from the channel are removed.
375378
when defined(gcOrc) and defined(nimSafeOrcSend):
376379
GC_runOrc()
377-
discard channelSend(c.d, src.addr, sizeof(T), true)
380+
discard channelSend(c.d, src.addr, sizeof(T), true, overwrite)
378381
wasMoved(src)
379382

380-
template send*[T](c: Chan[T]; src: T) =
383+
template send*[T](c: Chan[T]; src: T, overwrite = false) =
381384
## Helper template for `send`.
382385
mixin isolate
383-
send(c, isolate(src))
386+
send(c, isolate(src), overwrite)
384387

385388
proc recv*[T](c: Chan[T], dst: var T) {.inline.} =
386389
## Receives a message from the channel `c` and fill `dst` with its value.
@@ -405,11 +408,10 @@ proc peek*[T](c: Chan[T]): int {.inline.} =
405408
## Returns an estimation of the current number of messages held by the channel.
406409
numItems(c.d)
407410

408-
proc newChan*[T](elements: Positive = 30, overwrite = false): Chan[T] =
411+
proc newChan*[T](elements: Positive = 30): Chan[T] =
409412
## An initialization procedure, necessary for acquiring resources and
410413
## initializing internal state of the channel.
411414
##
412415
## `elements` is the capacity of the channel and thus how many messages it can hold
413416
## before it refuses to accept any further messages.
414417
result = Chan[T](d: allocChannel(sizeof(T), elements))
415-
result.d.overwrite = overwrite

0 commit comments

Comments
 (0)