Difference between revisions of "Cfengine3 Messaging"
(→Publish/Subscribe) |
(→Publish/Subscribe) |
||
Line 40: | Line 40: | ||
== Asynchronous Request/Response == |
== Asynchronous Request/Response == |
||
== Publish/Subscribe == |
== Publish/Subscribe == |
||
+ | Server publishes |
||
<pre> |
<pre> |
||
+ | ctx = zmq.Context() |
||
− | socket = context.socket(zmq.PUB) |
||
+ | publisher = ctx.socket(zmq.PUB) |
||
− | socket.bind("tcp://10.68.71.184:5556") |
||
+ | publisher.bind("tcp://10.68.71.184:5556") |
||
− | # Wait for next request from client |
||
− | message = socket.recv() |
||
− | print("Received request: %s" % message) |
||
+ | time.sleep(5) |
||
− | # Send reply back to client |
||
+ | |||
− | socket.send(b"World") |
||
+ | sequence = 0 |
||
+ | id = 0 |
||
+ | data =1000 |
||
+ | |||
+ | while sequence < 20: |
||
+ | id += 1; |
||
+ | data += 1000; |
||
+ | publisher.send("%i %i %i" % (sequence, id, data)); |
||
+ | sequence += 1 |
||
+ | </pre> |
||
+ | Client subscriber receives |
||
+ | <pre> |
||
+ | context = zmq.Context() |
||
+ | socket = context.socket(zmq.SUB) |
||
+ | |||
+ | print("Collecting updates from server...") |
||
+ | socket.connect("tcp://10.68.71.184:5556") |
||
+ | |||
+ | socket.setsockopt(zmq.SUBSCRIBE, b'') |
||
+ | |||
+ | for update_nbr in range(5): |
||
+ | string = socket.recv_string() |
||
+ | print string |
||
+ | </pre> |
||
+ | Output |
||
+ | <pre> |
||
+ | 0 1 2000 |
||
+ | 1 2 3000 |
||
+ | 2 3 4000 |
||
+ | 3 4 5000 |
||
+ | 4 5 6000 |
||
</pre> |
</pre> |
||
Revision as of 22:30, 6 December 2015
> what would be killer is a combination of the latest cfengine > (with promise theory) and 0mq.
http://lists.zeromq.org/pipermail/zeromq-dev/2011-February/009491.html
Contents
0mq
ZeroMQ comes with 5 basic patterns
- Synchronous Request/Response
- Asynchronous Request/Response
- Publish/Subscribe
- Push/Pull
- Exclusive Pair
Synchronous Request/Response
Client Requests
socket = context.socket(zmq.REQ) socket.connect("tcp://wbhs-pkg.webhuis.nl:8080") # Do 10 requests, waiting each time for a response print("Sending request %s ..." % request) socket.send(b"Hello") # Get the reply. message = socket.recv() print("Received reply %s [ %s ]" % (request, message))
Server Replies
socket = context.socket(zmq.REP) socket.bind("tcp://10.68.71.184:8080") # Wait for next request from client message = socket.recv() print("Received request: %s" % message) # Send reply back to client socket.send(b"World")
Asynchronous Request/Response
Publish/Subscribe
Server publishes
ctx = zmq.Context() publisher = ctx.socket(zmq.PUB) publisher.bind("tcp://10.68.71.184:5556") time.sleep(5) sequence = 0 id = 0 data =1000 while sequence < 20: id += 1; data += 1000; publisher.send("%i %i %i" % (sequence, id, data)); sequence += 1
Client subscriber receives
context = zmq.Context() socket = context.socket(zmq.SUB) print("Collecting updates from server...") socket.connect("tcp://10.68.71.184:5556") socket.setsockopt(zmq.SUBSCRIBE, b'') for update_nbr in range(5): string = socket.recv_string() print string
Output
0 1 2000 1 2 3000 2 3 4000 3 4 5000 4 5 6000