Difference between revisions of "Cfengine3 Messaging"
(→Publish/Subscribe) |
(→Publish/Subscribe) |
||
| Line 40: | Line 40: | ||
== Asynchronous Request/Response == |
== Asynchronous Request/Response == |
||
== Publish/Subscribe == |
== Publish/Subscribe == |
||
| + | Server publishes |
||
<pre> |
<pre> |
||
| + | ctx = zmq.Context() |
||
| − | socket = context.socket(zmq.PUB) |
||
| + | publisher = ctx.socket(zmq.PUB) |
||
| − | socket.bind("tcp://10.68.71.184:5556") |
||
| + | publisher.bind("tcp://10.68.71.184:5556") |
||
| − | # Wait for next request from client |
||
| − | message = socket.recv() |
||
| − | print("Received request: %s" % message) |
||
| + | time.sleep(5) |
||
| − | # Send reply back to client |
||
| + | |||
| − | socket.send(b"World") |
||
| + | sequence = 0 |
||
| + | id = 0 |
||
| + | data =1000 |
||
| + | |||
| + | while sequence < 20: |
||
| + | id += 1; |
||
| + | data += 1000; |
||
| + | publisher.send("%i %i %i" % (sequence, id, data)); |
||
| + | sequence += 1 |
||
| + | </pre> |
||
| + | Client subscriber receives |
||
| + | <pre> |
||
| + | context = zmq.Context() |
||
| + | socket = context.socket(zmq.SUB) |
||
| + | |||
| + | print("Collecting updates from server...") |
||
| + | socket.connect("tcp://10.68.71.184:5556") |
||
| + | |||
| + | socket.setsockopt(zmq.SUBSCRIBE, b'') |
||
| + | |||
| + | for update_nbr in range(5): |
||
| + | string = socket.recv_string() |
||
| + | print string |
||
| + | </pre> |
||
| + | Output |
||
| + | <pre> |
||
| + | 0 1 2000 |
||
| + | 1 2 3000 |
||
| + | 2 3 4000 |
||
| + | 3 4 5000 |
||
| + | 4 5 6000 |
||
</pre> |
</pre> |
||
Revision as of 22:30, 6 December 2015
> what would be killer is a combination of the latest cfengine > (with promise theory) and 0mq.
http://lists.zeromq.org/pipermail/zeromq-dev/2011-February/009491.html
Contents
0mq
ZeroMQ comes with 5 basic patterns
- Synchronous Request/Response
- Asynchronous Request/Response
- Publish/Subscribe
- Push/Pull
- Exclusive Pair
Synchronous Request/Response
Client Requests
socket = context.socket(zmq.REQ)
socket.connect("tcp://wbhs-pkg.webhuis.nl:8080")
# Do 10 requests, waiting each time for a response
print("Sending request %s ..." % request)
socket.send(b"Hello")
# Get the reply.
message = socket.recv()
print("Received reply %s [ %s ]" % (request, message))
Server Replies
socket = context.socket(zmq.REP)
socket.bind("tcp://10.68.71.184:8080")
# Wait for next request from client
message = socket.recv()
print("Received request: %s" % message)
# Send reply back to client
socket.send(b"World")
Asynchronous Request/Response
Publish/Subscribe
Server publishes
ctx = zmq.Context()
publisher = ctx.socket(zmq.PUB)
publisher.bind("tcp://10.68.71.184:5556")
time.sleep(5)
sequence = 0
id = 0
data =1000
while sequence < 20:
id += 1;
data += 1000;
publisher.send("%i %i %i" % (sequence, id, data));
sequence += 1
Client subscriber receives
context = zmq.Context()
socket = context.socket(zmq.SUB)
print("Collecting updates from server...")
socket.connect("tcp://10.68.71.184:5556")
socket.setsockopt(zmq.SUBSCRIBE, b'')
for update_nbr in range(5):
string = socket.recv_string()
print string
Output
0 1 2000 1 2 3000 2 3 4000 3 4 5000 4 5 6000