Skip to content

Commit a537431

Browse files
authored
Merge pull request #34 from Clonkk/fix_async_bug
Fix async (see #33)
2 parents 4a19a4e + ed7cbb9 commit a537431

File tree

5 files changed

+53
-15
lines changed

5 files changed

+53
-15
lines changed

examples/ex10_async_pushpull.nim

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import ../zmq
66
const N_TASK = 5
77

88
proc pusher(nTask: int): Future[void] {.async.} =
9-
var pusher = listen("tcp://*:5571", PUSH)
9+
var pusher = listen("tcp://127.0.0.1:6309", PUSH)
1010
defer: pusher.close()
1111

1212
for i in 1..nTask:
@@ -17,7 +17,7 @@ proc pusher(nTask: int): Future[void] {.async.} =
1717
await pusher.sendAsync(task)
1818

1919
proc puller(id: int): Future[void] {.async.} =
20-
const connStr = "tcp://localhost:5571"
20+
const connStr = "tcp://localhost:6309"
2121

2222
echo fmt"puller {id}: connecting to {connStr}"
2323
var puller = connect(connStr, PULL)
@@ -31,11 +31,10 @@ proc puller(id: int): Future[void] {.async.} =
3131

3232
when isMainModule:
3333
echo "ex10_async_pushpull.nim"
34-
asyncCheck pusher(N_TASK)
35-
36-
for i in 1..1:
37-
asyncCheck puller(i)
38-
34+
var p = pusher(N_TASK)
35+
asyncCheck puller(1)
36+
waitFor p
37+
# Gives time to the asyncdispatch loop to execute
3938
while hasPendingOperations():
40-
poll()
39+
drain()
4140

tests/config.nims

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
switch("path", "$projectDir/..")
2+
switch("threads", "on")

tests/tzmq.nim

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,10 +234,45 @@ proc asyncpoll() =
234234
check(rec_count == (i div 2))
235235
check(rec_count == snd_count)
236236

237+
proc async_pub_sub() =
238+
const N_MSGS = 10
239+
240+
proc publisher {.async.} =
241+
var publisher = zmq.listen("tcp://127.0.0.1:5571", PUB)
242+
defer: publisher.close()
243+
sleep(150) # Account for slow joiner pattern
244+
245+
var n = 0
246+
while n < N_MSGS:
247+
publisher.send("topic", SNDMORE)
248+
publisher.send("test " & $n)
249+
await sleepAsync(100)
250+
inc n
251+
252+
proc subscriber : Future[int] {.async.} =
253+
var subscriber = zmq.connect("tcp://127.0.0.1:5571", SUB)
254+
defer: subscriber.close()
255+
sleep(150) # Account for slow joiner pattern
256+
subscriber.setsockopt(SUBSCRIBE, "")
257+
var count = 0
258+
while count < N_MSGS:
259+
var msg = await subscriber.receiveAsync()
260+
# echo msg
261+
inc(count)
262+
result = count
263+
264+
let p = publisher()
265+
let s = subscriber()
266+
waitFor p
267+
let count = waitFor s
268+
test "async pub_sub":
269+
check count == N_mSGS
270+
237271
when isMainModule:
238272
reqrep()
239273
pubsub()
240274
inproc_sharectx()
241275
routerdealer()
242276
pairpair()
277+
async_pub_sub()
243278
asyncpoll()

zmq/asynczmq.nim

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,22 +70,24 @@ proc receiveAsync*(conn: ZConnection): Future[string] =
7070
## See https://github.com/zeromq/libzmq/issues/2941 and https://github.com/zeromq/pyzmq/issues/1411
7171
let fut = newFuture[string]("receiveAsync")
7272
result = fut
73+
let sock = conn.socket
7374

7475
proc cb(fd: AsyncFD): bool {.closure, gcsafe.} =
76+
# the cb should work on the low level socket and not the ZConnection object
7577
result = true
7678

7779
# ignore if already finished
7880
if fut.finished: return
7981

8082
try:
81-
let status = getsockopt[cint](conn, ZSockOptions.EVENTS)
83+
let status = getsockopt[cint](sock, ZSockOptions.EVENTS)
8284
if (status and ZMQ_POLLIN) == 0:
8385
# waiting for messages
8486
addRead(fd, cb)
8587
else:
8688
# ready to read
8789
unregister(fd)
88-
fut.complete conn.receive(DONTWAIT)
90+
fut.complete sock.receive(DONTWAIT)
8991
except:
9092
unregister(fd)
9193
fut.fail getCurrentException()
@@ -103,6 +105,7 @@ proc sendAsync*(conn: ZConnection, msg: string, flags: ZSendRecvOptions = DONTWA
103105
## See https://github.com/zeromq/libzmq/issues/2941 and https://github.com/zeromq/pyzmq/issues/1411
104106
let fut = newFuture[void]("sendAsync")
105107
result = fut
108+
let sock = conn.socket
106109

107110
let status = getsockopt[cint](conn, ZSockOptions.EVENTS)
108111
if (status and ZMQ_POLLOUT) == 0:
@@ -114,12 +117,12 @@ proc sendAsync*(conn: ZConnection, msg: string, flags: ZSendRecvOptions = DONTWA
114117
if fut.finished: return
115118

116119
try:
117-
let status = getsockopt[cint](conn, ZSockOptions.EVENTS)
120+
let status = getsockopt[cint](sock, ZSockOptions.EVENTS)
118121
if (status and ZMQ_POLLOUT) == 0:
119122
# waiting for messages
120123
addWrite(fd, cb)
121124
else:
122-
conn.send(msg, flags)
125+
sock.send(msg, flags)
123126
unregister(fd)
124127
fut.complete()
125128
except:

zmq/connections.nim

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import ./bindings
2-
import std/strformat
2+
import std/[strformat]
33

44
# Unofficial easier-for-Nim API
55

@@ -129,7 +129,7 @@ proc getsockopt*[T: SomeOrdinal|string](c: ZConnection, option: ZSockOptions): T
129129
Destructor
130130
]#
131131
when defined(gcDestructors):
132-
proc close*(c: var ZConnection, linger: int = 0)
132+
proc close*(c: var ZConnection, linger: int = 500)
133133
proc `=destroy`(x: var ZConnection) =
134134
if x.alive:
135135
raise newException(ZmqError, &"Connection from/to {x.sockaddr} was destroyed but not closed.")
@@ -235,7 +235,7 @@ proc listen*(address: string, mode: ZSocketType): ZConnection =
235235
result = listen(address, mode, ctx)
236236
result.ownctx = true
237237

238-
proc close*(c: var ZConnection, linger: int = 0) =
238+
proc close*(c: var ZConnection, linger: int = 500) =
239239
## Closes the ``ZConnection``.
240240
## Set socket linger to ``linger`` to drop buffered message and avoid blocking, then close the socket.
241241
##

0 commit comments

Comments
 (0)