Skip to content

Commit 4a19a4e

Browse files
authored
Merge pull request #32 from Clonkk/feat_async_poller
Feat async poller
2 parents d9c13f7 + 0fb8a19 commit 4a19a4e

File tree

5 files changed

+126
-14
lines changed

5 files changed

+126
-14
lines changed

tests/tzmq.nim

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import ../zmq
22
import std/[unittest, os]
3+
import std/[asyncdispatch, asyncfutures]
34

45
proc reqrep() =
56
test "reqrep":
@@ -52,7 +53,7 @@ proc pubsub() =
5253
block alltopic:
5354
let topic = broadcast.receive()
5455
let msg = broadcast.receive()
55-
check topic == topic1
56+
check topic == topic1
5657
check msg == "content1"
5758
block s1:
5859
let topic = sub1.receive()
@@ -66,7 +67,7 @@ proc pubsub() =
6667
block alltopic:
6768
let topic = broadcast.receive()
6869
let msg = broadcast.receive()
69-
check topic == topic2
70+
check topic == topic2
7071
check msg == "content2"
7172
block s2:
7273
let topic = sub2.receive()
@@ -80,7 +81,7 @@ proc pubsub() =
8081
block alltopic:
8182
let topic = broadcast.receive()
8283
let msg = broadcast.receive()
83-
check topic == ""
84+
check topic == ""
8485
check msg == "content3"
8586

8687
proc routerdealer() =
@@ -181,10 +182,62 @@ proc pairpair() =
181182
for p in pairs.mitems:
182183
p.close()
183184

185+
proc asyncDummy(i: int) {.async.} =
186+
# echo "asyncDummy=", i
187+
asyncCheck sleepAsync(2500)
188+
189+
proc asyncpoll() =
190+
test "asyncZPoller":
191+
const zaddr = "tcp://127.0.0.1:15571"
192+
var pusher = listen(zaddr, PUSH)
193+
var puller = connect(zaddr, PULL)
194+
var poller: AsyncZPoller
195+
# Register the callback
196+
poller.register(
197+
puller,
198+
ZMQ_POLLIN,
199+
proc(x: ZSocket) =
200+
let msg = x.receive()
201+
# debugecho "==> Received msg=", msg
202+
sleep(300) # Do Stuff
203+
)
204+
205+
let N = 10
206+
var snd_count = 0
207+
# A client send some message
208+
for i in 0..<N:
209+
if (i mod 2) == 0:
210+
# Can periodically send stuff
211+
pusher.send($i)
212+
inc(snd_count)
213+
var
214+
i = 0
215+
rec_count = 0
216+
217+
while i < N:
218+
# I don't recommend a high timeout because it's going to poll for the duration if there is no message in queue
219+
var fut = poller.pollAsync(1)
220+
# Can do Asyncstuff here
221+
asyncCheck asyncDummy(i)
222+
fut.addCallback proc(x: Future[int]) =
223+
if x.read() > 0:
224+
inc(rec_count)
225+
inc(i)
226+
227+
# No longer polling but some callback may not have finished
228+
while hasPendingOperations():
229+
drain()
230+
231+
pusher.close()
232+
puller.close()
233+
check(i == N)
234+
check(rec_count == (i div 2))
235+
check(rec_count == snd_count)
236+
184237
when isMainModule:
185238
reqrep()
186239
pubsub()
187240
inproc_sharectx()
188241
routerdealer()
189242
pairpair()
190-
243+
asyncpoll()

zmq.nimble

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Package
22

3-
version = "1.2.1"
3+
version = "1.3.0"
44
author = "Andreas Rumpf"
55
description = "ZeroMQ wrapper"
66
license = "MIT"
@@ -14,7 +14,7 @@ task buildexamples, "Compile all examples":
1414
echo fstr
1515
if fstr.endsWith(".nim") and fstr.startsWith("./ex"):
1616
echo "running ", fstr
17-
selfExec("cpp -d:release " & fstr)
17+
selfExec("cpp --mm:orc -d:release " & fstr)
1818

1919
task gendoc, "Generate documentation":
2020
exec("nimble doc --project zmq.nim --out:docs/")

zmq/asynczmq.nim

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,65 @@
1-
import std/asyncdispatch
1+
import std/[asyncdispatch]
22
import ./connections
33
import ./bindings
4+
import ./poller
5+
6+
type
7+
AsyncZPollCB* = proc(x: ZSocket) {.gcsafe.}
8+
AsyncZPoller* = object
9+
## Experimental type to use zmq.poll() with Nim's async dispatch loop
10+
cb* : seq[AsyncZPollCB]
11+
zpoll*: ZPoller
12+
13+
iterator items*(poller: AsyncZPoller): tuple[item: ZPollItem, cb: AsyncZPollCB] =
14+
var
15+
i = 0
16+
n = poller.zpoll.len()
17+
while i < n:
18+
yield(poller.zpoll[i], poller.cb[i])
19+
inc(i)
20+
21+
proc `=destroy`*(obj: var AsyncZPoller) =
22+
if hasPendingOperations():
23+
raise newException(ZmqError, "AsyncZPoller closed with pending operation")
24+
25+
proc register*(poller: var AsyncZPoller, sock: ZSocket, event: int, cb: AsyncZPollCB) =
26+
## Register ZSocket function
27+
poller.zpoll.register(sock, event)
28+
poller.cb.add(cb)
29+
30+
proc register*(poller: var AsyncZPoller, conn: ZConnection, event: int, cb: AsyncZPollCB) =
31+
## Register ZConnection
32+
poller.register(conn.socket, event, cb)
33+
34+
proc register*(poller: var AsyncZPoller, item: ZPollItem, cb: AsyncZPollCB) =
35+
## Register ZConnection
36+
poller.zpoll.items.add(item)
37+
poller.cb.add(cb)
38+
39+
proc initZPoller*(poller: sink ZPoller, cb: AsyncZPollCB) : AsyncZPoller =
40+
for p in poller.items:
41+
result.register(p, cb)
42+
43+
proc initZPoller*(args: openArray[tuple[item: ZConnection, cb: AsyncZPollCB]], event: cshort): AsyncZPoller =
44+
## Init a ZPoller with all items on the same event
45+
for arg in args:
46+
result.register(arg.item, event, arg.cb)
47+
48+
proc pollAsync*(poller: AsyncZPoller, timeout: int = 1) : Future[int] =
49+
## Experimental API. Poll all the ZConnection and execute an async CB when ``event`` occurs.
50+
result = newFuture[int]("pollAsync")
51+
var r = poller.zpoll.poll(timeout)
52+
# ZMQ can't have a timeout smaller than one
53+
if r > 0:
54+
for zpoll, cb in poller.items():
55+
if events(zpoll):
56+
proc localcb = cb(zpoll.socket)
57+
callSoon localcb
58+
if hasPendingOperations():
59+
# poll vs drain ?
60+
poll(timeout)
61+
62+
result.complete(r)
463

564
proc receiveAsync*(conn: ZConnection): Future[string] =
665
## Similar to `receive()`, but `receiveAsync()` allows other async tasks to run.

zmq/bindings.nim

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,8 @@ proc sanity_check_libzmq(): void =
172172
version(actual_lib_major, actual_lib_minor, actual_lib_patch)
173173

174174
let
175-
expected_lib_version = make_dotted_version(ZMQ_VERSION_MAJOR,
176-
ZMQ_VERSION_MINOR, ZMQ_VERSION_PATCH)
175+
# expected_lib_version = make_dotted_version(ZMQ_VERSION_MAJOR,
176+
# ZMQ_VERSION_MINOR, ZMQ_VERSION_PATCH)
177177
actual_lib_version = make_dotted_version(actual_lib_major, actual_lib_minor, actual_lib_patch)
178178

179179
# This is possibly over-particular about versioning
@@ -406,7 +406,7 @@ const
406406
ZMQ_MORE* = 1
407407
ZMQ_SRCFD* = 2
408408
ZMQ_SHARED* = 3
409-
type ZMsgOptions = enum
409+
type ZMsgOptions* = enum
410410
MORE = 1
411411
SRCFD = 2
412412
SHARED = 3

zmq/connections.nim

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,14 +132,14 @@ when defined(gcDestructors):
132132
proc close*(c: var ZConnection, linger: int = 0)
133133
proc `=destroy`(x: var ZConnection) =
134134
if x.alive:
135-
raise newException(ZmqError, "Connection destroyed but not closed")
135+
raise newException(ZmqError, &"Connection from/to {x.sockaddr} was destroyed but not closed.")
136136

137137
#[
138138
Connect / Listen / Close
139139
]#
140140
proc reconnect*(conn: ZConnection) =
141141
## Reconnect a previously binded/connected address
142-
if connect(conn.socket, conn.sockaddr) != 0:
142+
if connect(conn.socket, conn.sockaddr.cstring) != 0:
143143
zmqError()
144144

145145
proc reconnect*(conn: var ZConnection, address: string) =
@@ -150,12 +150,12 @@ proc reconnect*(conn: var ZConnection, address: string) =
150150

151151
proc disconnect*(conn: ZConnection) =
152152
## Disconnect the socket
153-
if disconnect(conn.socket, conn.sockaddr) != 0:
153+
if disconnect(conn.socket, conn.sockaddr.cstring) != 0:
154154
zmqError()
155155

156156
proc unbind*(conn: ZConnection) =
157157
## Unbind the socket
158-
if unbind(conn.socket, conn.sockaddr) != 0:
158+
if unbind(conn.socket, conn.sockaddr.cstring) != 0:
159159
zmqError()
160160

161161
proc bindAddr*(conn: var ZConnection, address: string) =

0 commit comments

Comments
 (0)