Difference between revisions of "Cfengine3 Messaging"
 (→Publish/Subscribe)  | 
				 (→Publish/Subscribe)  | 
				||
| Line 40: | Line 40: | ||
== Asynchronous Request/Response ==  | 
  == Asynchronous Request/Response ==  | 
||
== Publish/Subscribe ==  | 
  == Publish/Subscribe ==  | 
||
| + | Server publishes  | 
||
<pre>  | 
  <pre>  | 
||
| + | ctx = zmq.Context()  | 
||
| − | socket = context.socket(zmq.PUB)  | 
  ||
| + | publisher = ctx.socket(zmq.PUB)  | 
||
| − | socket.bind("tcp://10.68.71.184:5556")  | 
  ||
| + | publisher.bind("tcp://10.68.71.184:5556")  | 
||
| − | #  Wait for next request from client  | 
  ||
| − | message = socket.recv()  | 
  ||
| − | print("Received request: %s" % message)  | 
  ||
| + | time.sleep(5)  | 
||
| − | #  Send reply back to client  | 
  ||
| + | |||
| − | socket.send(b"World")  | 
  ||
| + | sequence = 0  | 
||
| + | id = 0  | 
||
| + | data =1000  | 
||
| + | |||
| + | while sequence < 20:  | 
||
| + |   id += 1;  | 
||
| + |   data += 1000;  | 
||
| + |   publisher.send("%i %i %i" % (sequence, id, data));  | 
||
| + |   sequence += 1  | 
||
| + | </pre>  | 
||
| + | Client subscriber receives  | 
||
| + | <pre>  | 
||
| + | context = zmq.Context()  | 
||
| + | socket = context.socket(zmq.SUB)  | 
||
| + | |||
| + | print("Collecting updates from server...")  | 
||
| + | socket.connect("tcp://10.68.71.184:5556")  | 
||
| + | |||
| + | socket.setsockopt(zmq.SUBSCRIBE, b'')  | 
||
| + | |||
| + | for update_nbr in range(5):  | 
||
| + |     string = socket.recv_string()  | 
||
| + |     print string  | 
||
| + | </pre>  | 
||
| + | Output  | 
||
| + | <pre>  | 
||
| + | 0 1 2000  | 
||
| + | 1 2 3000  | 
||
| + | 2 3 4000  | 
||
| + | 3 4 5000  | 
||
| + | 4 5 6000  | 
||
</pre>  | 
  </pre>  | 
||
Revision as of 22:30, 6 December 2015
> what would be killer is a combination of the latest cfengine > (with promise theory) and 0mq.
http://lists.zeromq.org/pipermail/zeromq-dev/2011-February/009491.html
Contents
0mq
ZeroMQ comes with 5 basic patterns
- Synchronous Request/Response
 - Asynchronous Request/Response
 - Publish/Subscribe
 - Push/Pull
 - Exclusive Pair
 
Synchronous Request/Response
Client Requests
socket = context.socket(zmq.REQ)
socket.connect("tcp://wbhs-pkg.webhuis.nl:8080")
#  Do 10 requests, waiting each time for a response
print("Sending request %s ..." % request)
socket.send(b"Hello")
#  Get the reply.
message = socket.recv()
print("Received reply %s [ %s ]" % (request, message))
Server Replies
socket = context.socket(zmq.REP)
socket.bind("tcp://10.68.71.184:8080")
#  Wait for next request from client
message = socket.recv()
print("Received request: %s" % message)
#  Send reply back to client
socket.send(b"World")
Asynchronous Request/Response
Publish/Subscribe
Server publishes
ctx = zmq.Context()
publisher = ctx.socket(zmq.PUB)
publisher.bind("tcp://10.68.71.184:5556")
time.sleep(5)
sequence = 0
id = 0
data =1000
while sequence < 20:
  id += 1;
  data += 1000;
  publisher.send("%i %i %i" % (sequence, id, data));
  sequence += 1
Client subscriber receives
context = zmq.Context()
socket = context.socket(zmq.SUB)
print("Collecting updates from server...")
socket.connect("tcp://10.68.71.184:5556")
socket.setsockopt(zmq.SUBSCRIBE, b'')
for update_nbr in range(5):
    string = socket.recv_string()
    print string
Output
0 1 2000 1 2 3000 2 3 4000 3 4 5000 4 5 6000