Difference between revisions of "Cfengine3 Messaging"
(New page: <pre> > what would be killer is a combination of the latest cfengine > (with promise theory) and 0mq. </pre> http://lists.zeromq.org/pipermail/zeromq-dev/2011-February/009491.html) |
Martn admin (talk | contribs) |
||
(67 intermediate revisions by one other user not shown) | |||
Line 3: | Line 3: | ||
> (with promise theory) and 0mq. |
> (with promise theory) and 0mq. |
||
</pre> |
</pre> |
||
− | + | Source: http://lists.zeromq.org/pipermail/zeromq-dev/2011-February/009491.html<br/><br/> |
|
+ | This part of the Webhuis CFEngine initiative is research and therefore subject to change. :-)<br/> |
||
+ | It is the objective to have messaging incorporated in CFEngine and thus enabling nodes, by cf-agent that is, to send out messages to whatever listeners out there. The listeners can be cf-messages daemons or other nodes configured to process messages delivered by CFEngine Nodes.<br/> |
||
+ | CFEngine will need to implement a new promise type: <b>messages:</b>.<br/> |
||
+ | Parts of the works is to be transferred to and discussions will take place on: |
||
+ | https://github.com/Webhuis/Data |
||
+ | So let's go and deliver that kill! |
||
+ | = Nodes and messaging = |
||
+ | When adding messages as a feature to CFEngine different positions with regards to messages exist. |
||
+ | * Nodes carrying information they are willing to share |
||
+ | * Nodes willing to process the information Nodes are willing to share |
||
+ | == Nodes sharing information == |
||
+ | Information sharing nodes need a method that enables them to send the information they are willing to share. In the analysis use cases of types of messages have to be mapped on the basic patterns. The listeners mirror the nodes mappings. |
||
+ | Nodes carry different kinds of information |
||
+ | * Common information |
||
+ | * Static information |
||
+ | * Dynamic Information |
||
+ | * State Information |
||
+ | In the current design of CFEngine the cf-agent then has to be extended with a promise type: messages:<br/><br/> |
||
+ | It goes withoout saying that messaging needs authentication and autorization mechanisms. |
||
+ | === Example CFEngine message policy === |
||
+ | <pre> |
||
+ | bundle agent cf_message { |
||
+ | |||
+ | vars: |
||
+ | "role" string => "x2gvdt"; |
||
+ | "host_info" data => parsejson(' |
||
+ | { |
||
+ | "host": { "uqhost": "$(sys.uqhost)", "domain": "$(sys.domain)", "role": "$(role)" }, |
||
+ | }'); |
||
+ | |||
+ | "host_store" string => storejson(host_info); |
||
+ | |||
+ | commands: |
||
+ | |||
+ | "/root/client-synchronous-request-response.py '${host_store}'"; |
||
+ | |||
+ | reports: |
||
+ | |||
+ | "${this.bundle} host_store: ${host_store}"; |
||
+ | |||
+ | } |
||
+ | |||
+ | body common control { |
||
+ | |||
+ | bundlesequence => { cf_message }; |
||
+ | |||
+ | inputs => { "/var/cfengine/inputs/lib/3.6/stdlib.cf" }; |
||
+ | |||
+ | } |
||
+ | </pre> |
||
+ | Mind the single quotes around the parameter string being the json container, when ommitted the quotes around the keys and values will be lost. |
||
+ | |||
+ | === Message Delivery === |
||
+ | We use Python to do the messaging until an alternative in C is available (work in progress). 0mq is the first messaging system to consider, others may follow. This is the Python script the agent uses to send a message: |
||
+ | <pre> |
||
+ | #!/usr/bin/python |
||
+ | |||
+ | import sys |
||
+ | import zmq |
||
+ | |||
+ | context = zmq.Context() |
||
+ | |||
+ | # Socket to talk to server |
||
+ | print("Connecting to server...") |
||
+ | socket = context.socket(zmq.REQ) |
||
+ | socket.connect("tcp://wbhs-pkg.webhuis.nl:5555") |
||
+ | |||
+ | message = '' |
||
+ | # sys.argv[0] is the program filename, slice it off |
||
+ | for element in sys.argv[1:]: |
||
+ | message = message + element |
||
+ | |||
+ | print("Sending request %s ..." % message ) |
||
+ | socket.send( message ) |
||
+ | |||
+ | # Get the reply. |
||
+ | response = socket.recv() |
||
+ | print( response ) |
||
+ | </pre> |
||
+ | |||
+ | == Templates, Schemas or ? == |
||
+ | Messages need to be structured. |
||
+ | * Identity |
||
+ | * Version |
||
+ | * Envelope |
||
+ | * Encryption |
||
+ | * Signature |
||
+ | With CFEngine we have mechanisms in place for editing templates. |
||
+ | === Identified Schemas === |
||
+ | The following Schemas have been identified so far: |
||
+ | * Host |
||
+ | * Domain |
||
+ | * Role |
||
+ | * Network |
||
+ | * Time zone |
||
+ | * DNS |
||
+ | * LDAP |
||
+ | Schemas are templates for Data, thus enabling a mechanism to provision data to Nodes. |
||
+ | |||
+ | === messages: === |
||
+ | The messages promise type is an abstract messaging method. It enables the agent to send messages to whatever listener is around in the Universe. The data type of a message is a container, and the stucture is: |
||
+ | <pre> |
||
+ | { |
||
+ | "message_schema_id": { |
||
+ | "message_id": "$(message_id)", |
||
+ | "content": "$(content)", |
||
+ | "version": "$(version)" |
||
+ | } |
||
+ | } |
||
+ | </pre> |
||
+ | |||
+ | == Information processing Nodes == |
||
+ | Information processing Nodes receive their information through message feeds and for their part may be willing to share their information through views. Data would be an information processing Node, relying on a defined set of feeds, an information processing process that builds the Universal Truth and defined autorized views that share the information stored. |
||
+ | === cf-messaged === |
||
+ | In this proof of concept the listener simply adds every message to a file, queue.txt as were it a queue. Nodes in fect feed the listener with information and the listener knows how to process incoming messages. Feeds are input schemas. The example uses the typical Zeromq port 5555, but why would we not use a more typical CFEngine port 5309? |
||
+ | <pre> |
||
+ | import time |
||
+ | import zmq |
||
+ | |||
+ | context = zmq.Context() |
||
+ | socket = context.socket(zmq.REP) |
||
+ | socket.bind("tcp://10.68.71.184:5555") |
||
+ | |||
+ | response = "message received" |
||
+ | while True: |
||
+ | # Wait for next request from client |
||
+ | message = socket.recv() |
||
+ | print("Received request: %s " % message) |
||
+ | |||
+ | # Do some 'work' |
||
+ | time.sleep(1) |
||
+ | |||
+ | with open("queue.txt", "a") as queue: |
||
+ | queue.write(message + "\n") |
||
+ | |||
+ | # Send reply back to client |
||
+ | socket.send( response ) |
||
+ | |||
+ | </pre> |
||
+ | The output would be: |
||
+ | <pre> |
||
+ | root@wbhs-pkg:~# tail -F queue.txt |
||
+ | {host:{uqhost:x2gvdt0001,domain:webhuis.nl,role:x2gvdt}} |
||
+ | {host:{uqhost:kvmutl0004,domain:webhuis.nl,role:kvmutl}} |
||
+ | {host:{uqhost:x2gvdt0001,domain:webhuis.nl,role:x2gvdt}} |
||
+ | {host:{uqhost:x2gvdt0001,domain:webhuis.nl,role:x2gvdt}} |
||
+ | </pre> |
||
+ | When a message is received the daemon places the message in a local queue for processing. The sending node receives a hard coded response "message received", but this could be any response as a result on the side of the listener. The response is useful when it triggers the node to supply more information in more messages. |
||
+ | |||
+ | = 0mq = |
||
+ | CFEngine Enterprise offers messaging to some degree. Nodes share information they hold with the Policy Hub. This information can be |
||
+ | * Patches of installed software |
||
+ | * State of the Nodes with regards to promises fulfilled |
||
+ | * Configuration information (CMDB) |
||
+ | Outside this context, however, there is no method available to the agent to share any information it is willing to share, with a monitoring system for instance.<br/> |
||
+ | There are many messaging systems around and CFEngine should be able to interface with any of them, but in the initial stages the need is for a very light weight messaging system. The search for a suitable messaging mechanism eventually led to 0mq, just a library not an application. Could it be less in weight?<br/> |
||
+ | For reasons of simplicity the code examples, so far are in Python. CFEngine is independent of anything but libc and thus the end results of CFEngine messaging have to be developed in C. |
||
+ | == Information sources == |
||
+ | * http://nichol.as/zeromq-an-introduction |
||
+ | * https://github.com/zeromq/pyzmq/issues/132 |
||
+ | * Author: Lev Givon <lev(at)columbia(dot)edu> |
||
+ | * http://blog.pythonisito.com/2012/08/distributed-systems-with-zeromq.html |
||
+ | * http://blog.scottlogic.com/2015/03/20/ZeroMQ-Quick-Intro.html |
||
+ | * http://lists.zeromq.org/pipermail/zeromq-dev/2011-February/009490.html |
||
+ | |||
+ | == Basic patterns == |
||
+ | ZeroMQ comes with five basic patterns |
||
+ | * Synchronous Request/Response |
||
+ | * Asynchronous Request/Response |
||
+ | * Publish/Subscribe |
||
+ | * Push/Pull |
||
+ | * Exclusive Pair |
||
+ | Clients and servers can take the role of server, being the one that is listening on a certain socket, IP address and port number, to incoming requests. When it comes to exchanging messages the most stable one is chosen to be the listener. |
||
+ | |||
+ | |||
+ | == Synchronous Request/Response == |
||
+ | This is the client server model and the most basic pattern. The client sends a request to the server and the server replies to the request. Other names are Request Reply |
||
+ | === Sample Message === |
||
+ | |||
+ | === Example Code === |
||
+ | Client Requests |
||
+ | <pre> |
||
+ | socket = context.socket(zmq.REQ) |
||
+ | socket.connect("tcp://wbhs-pkg.webhuis.nl:8080") |
||
+ | |||
+ | # Do 10 requests, waiting each time for a response |
||
+ | print("Sending request %s ..." % request) |
||
+ | socket.send(b"Hello") |
||
+ | |||
+ | # Get the reply. |
||
+ | message = socket.recv() |
||
+ | print("Received reply %s [ %s ]" % (request, message)) |
||
+ | </pre> |
||
+ | Server Replies |
||
+ | <pre> |
||
+ | socket = context.socket(zmq.REP) |
||
+ | socket.bind("tcp://10.68.71.184:8080") |
||
+ | |||
+ | # Wait for next request from client |
||
+ | message = socket.recv() |
||
+ | print("Received request: %s" % message) |
||
+ | |||
+ | # Send reply back to client |
||
+ | socket.send(b"World") |
||
+ | </pre> |
||
+ | |||
+ | == Asynchronous Request/Response == |
||
+ | |||
+ | === Example Code === |
||
+ | |||
+ | == Publish/Subscribe == |
||
+ | This is comparable to broadcasting. |
||
+ | === Example Code === |
||
+ | Server publishes |
||
+ | <pre> |
||
+ | ctx = zmq.Context() |
||
+ | publisher = ctx.socket(zmq.PUB) |
||
+ | |||
+ | publisher.bind("tcp://10.68.71.184:5556") |
||
+ | |||
+ | time.sleep(5) |
||
+ | |||
+ | sequence = 0 |
||
+ | id = 0 |
||
+ | data =1000 |
||
+ | |||
+ | while sequence < 20: |
||
+ | id += 1; |
||
+ | data += 1000; |
||
+ | publisher.send("%i %i %i" % (sequence, id, data)); |
||
+ | sequence += 1 |
||
+ | </pre> |
||
+ | Client subscriber receives |
||
+ | <pre> |
||
+ | context = zmq.Context() |
||
+ | socket = context.socket(zmq.SUB) |
||
+ | |||
+ | print("Collecting updates from server...") |
||
+ | socket.connect("tcp://10.68.71.184:5556") |
||
+ | |||
+ | socket.setsockopt(zmq.SUBSCRIBE, b'') |
||
+ | |||
+ | for update_nbr in range(5): |
||
+ | string = socket.recv_string() |
||
+ | print string |
||
+ | </pre> |
||
+ | Output |
||
+ | <pre> |
||
+ | 0 1 2000 |
||
+ | 1 2 3000 |
||
+ | 2 3 4000 |
||
+ | 3 4 5000 |
||
+ | 4 5 6000 |
||
+ | </pre> |
||
+ | |||
+ | == Push/Pull == |
||
+ | In the official 0mq documentation this pattern is referred to as Parallel Pipeline. |
||
+ | === Example Code === |
||
+ | |||
+ | |||
+ | |||
+ | <pre> |
||
+ | # The ventilator |
||
+ | # Binds PUSH socket to tcp://localhost:5557 |
||
+ | # Sends batch of tasks to workers via that socket |
||
+ | |||
+ | import zmq |
||
+ | import random |
||
+ | import time |
||
+ | |||
+ | context = zmq.Context() |
||
+ | |||
+ | # Socket to send messages on |
||
+ | sender = context.socket(zmq.PUSH) |
||
+ | sender.bind("tcp://10.68.71.184:5557") |
||
+ | |||
+ | # Socket with direct access to the sink: used to syncronize start of batch |
||
+ | sink = context.socket(zmq.PUSH) |
||
+ | sink.connect("tcp://10.68.71.184:5558") |
||
+ | |||
+ | try: |
||
+ | raw_input |
||
+ | except NameError: |
||
+ | # Python 3 |
||
+ | raw_input = input |
||
+ | |||
+ | print("Press Enter when the workers are ready: ") |
||
+ | _ = raw_input() |
||
+ | print("Sending tasks to workers...") |
||
+ | |||
+ | # The first message is "0" and signals start of batch |
||
+ | sink.send(b'0') |
||
+ | |||
+ | # Initialize random number generator |
||
+ | random.seed() |
||
+ | |||
+ | # Send 100 tasks |
||
+ | total_msec = 0 |
||
+ | for task_nbr in range(100): |
||
+ | |||
+ | # Random workload from 1 to 100 msecs |
||
+ | workload = random.randint(1, 100) |
||
+ | total_msec += workload |
||
+ | |||
+ | sender.send_string(u'%i' % workload) |
||
+ | |||
+ | print("Total expected cost: %s msec" % total_msec) |
||
+ | |||
+ | # Give 0MQ time to deliver |
||
+ | time.sleep(1) |
||
+ | |||
+ | </pre> |
||
+ | <pre> |
||
+ | # Worker |
||
+ | # Connects PULL socket to tcp://10.68.71.184:5557 |
||
+ | # Collects workloads from ventilator via that socket |
||
+ | # Connects PUSH socket to tcp://10.68.71.184:5558 |
||
+ | # Sends results to sink via that socket |
||
+ | |||
+ | import sys |
||
+ | import time |
||
+ | import zmq |
||
+ | |||
+ | |||
+ | context = zmq.Context() |
||
+ | |||
+ | # Socket to receive messages on |
||
+ | receiver = context.socket(zmq.PULL) |
||
+ | receiver.connect("tcp://10.68.71.184:5557") |
||
+ | |||
+ | # Socket to send messages to |
||
+ | sender = context.socket(zmq.PUSH) |
||
+ | sender.connect("tcp://10.68.71.184:5558") |
||
+ | |||
+ | # Process tasks forever |
||
+ | while True: |
||
+ | s = receiver.recv() |
||
+ | |||
+ | # Simple progress indicator for the viewer |
||
+ | sys.stdout.write('.') |
||
+ | sys.stdout.flush() |
||
+ | |||
+ | # Do the work |
||
+ | time.sleep(int(s)*0.001) |
||
+ | |||
+ | # Send results to sink |
||
+ | sender.send(b'') |
||
+ | |||
+ | </pre> |
||
+ | <pre> |
||
+ | # Sink |
||
+ | # Binds PULL socket to tcp://10.68.71.184:5558 |
||
+ | # Collects results from worker via that socket |
||
+ | |||
+ | import sys |
||
+ | import time |
||
+ | import zmq |
||
+ | |||
+ | |||
+ | context = zmq.Context() |
||
+ | |||
+ | # Socket to receive messages on |
||
+ | receiver = context.socket(zmq.PULL) |
||
+ | receiver.bind("tcp://10.68.71.184:5558") |
||
+ | |||
+ | # Wait for start of batch |
||
+ | s = receiver.recv() |
||
+ | |||
+ | # Start our clock now |
||
+ | tstart = time.time() |
||
+ | |||
+ | # Process 100 confirmations |
||
+ | total_msec = 0 |
||
+ | for task_nbr in range(100): |
||
+ | s = receiver.recv() |
||
+ | if task_nbr % 10 == 0: |
||
+ | sys.stdout.write(':') |
||
+ | else: |
||
+ | sys.stdout.write('.') |
||
+ | sys.stdout.flush() |
||
+ | |||
+ | # Calculate and report duration of batch |
||
+ | tend = time.time() |
||
+ | print("Total elapsed time: %d msec" % ((tend-tstart)*1000)) |
||
+ | </pre> |
||
+ | |||
+ | == Exclusive Pair == |
||
+ | |||
+ | === Example Code === |
||
+ | <pre> |
||
+ | exclusive-bind |
||
+ | import zmq |
||
+ | |||
+ | # ZeroMQ Context |
||
+ | context = zmq.Context() |
||
+ | |||
+ | # Define the socket using the "Context" |
||
+ | socket = context.socket(zmq.PAIR) |
||
+ | socket.bind("tcp://127.0.0.1:5696") |
||
+ | </pre> |
||
+ | <pre> |
||
+ | import zmq |
||
+ | |||
+ | # ZeroMQ Context |
||
+ | context = zmq.Context() |
||
+ | |||
+ | # Define the socket using the "Context" |
||
+ | socket = context.socket(zmq.PAIR) |
||
+ | socket.connect("tcp://127.0.0.1:5696") |
||
+ | </pre> |
||
+ | <hr> |
||
+ | Return to: [[CFEngine]] |
Latest revision as of 14:58, 1 June 2022
> what would be killer is a combination of the latest cfengine > (with promise theory) and 0mq.
Source: http://lists.zeromq.org/pipermail/zeromq-dev/2011-February/009491.html
This part of the Webhuis CFEngine initiative is research and therefore subject to change. :-)
It is the objective to have messaging incorporated in CFEngine and thus enabling nodes, by cf-agent that is, to send out messages to whatever listeners out there. The listeners can be cf-messages daemons or other nodes configured to process messages delivered by CFEngine Nodes.
CFEngine will need to implement a new promise type: messages:.
Parts of the works is to be transferred to and discussions will take place on:
https://github.com/Webhuis/Data
So let's go and deliver that kill!
Contents
Nodes and messaging
When adding messages as a feature to CFEngine different positions with regards to messages exist.
- Nodes carrying information they are willing to share
- Nodes willing to process the information Nodes are willing to share
Nodes sharing information
Information sharing nodes need a method that enables them to send the information they are willing to share. In the analysis use cases of types of messages have to be mapped on the basic patterns. The listeners mirror the nodes mappings. Nodes carry different kinds of information
- Common information
- Static information
- Dynamic Information
- State Information
In the current design of CFEngine the cf-agent then has to be extended with a promise type: messages:
It goes withoout saying that messaging needs authentication and autorization mechanisms.
Example CFEngine message policy
bundle agent cf_message { vars: "role" string => "x2gvdt"; "host_info" data => parsejson(' { "host": { "uqhost": "$(sys.uqhost)", "domain": "$(sys.domain)", "role": "$(role)" }, }'); "host_store" string => storejson(host_info); commands: "/root/client-synchronous-request-response.py '${host_store}'"; reports: "${this.bundle} host_store: ${host_store}"; } body common control { bundlesequence => { cf_message }; inputs => { "/var/cfengine/inputs/lib/3.6/stdlib.cf" }; }
Mind the single quotes around the parameter string being the json container, when ommitted the quotes around the keys and values will be lost.
Message Delivery
We use Python to do the messaging until an alternative in C is available (work in progress). 0mq is the first messaging system to consider, others may follow. This is the Python script the agent uses to send a message:
#!/usr/bin/python import sys import zmq context = zmq.Context() # Socket to talk to server print("Connecting to server...") socket = context.socket(zmq.REQ) socket.connect("tcp://wbhs-pkg.webhuis.nl:5555") message = '' # sys.argv[0] is the program filename, slice it off for element in sys.argv[1:]: message = message + element print("Sending request %s ..." % message ) socket.send( message ) # Get the reply. response = socket.recv() print( response )
Templates, Schemas or ?
Messages need to be structured.
- Identity
- Version
- Envelope
- Encryption
- Signature
With CFEngine we have mechanisms in place for editing templates.
Identified Schemas
The following Schemas have been identified so far:
- Host
- Domain
- Role
- Network
- Time zone
- DNS
- LDAP
Schemas are templates for Data, thus enabling a mechanism to provision data to Nodes.
messages:
The messages promise type is an abstract messaging method. It enables the agent to send messages to whatever listener is around in the Universe. The data type of a message is a container, and the stucture is:
{ "message_schema_id": { "message_id": "$(message_id)", "content": "$(content)", "version": "$(version)" } }
Information processing Nodes
Information processing Nodes receive their information through message feeds and for their part may be willing to share their information through views. Data would be an information processing Node, relying on a defined set of feeds, an information processing process that builds the Universal Truth and defined autorized views that share the information stored.
cf-messaged
In this proof of concept the listener simply adds every message to a file, queue.txt as were it a queue. Nodes in fect feed the listener with information and the listener knows how to process incoming messages. Feeds are input schemas. The example uses the typical Zeromq port 5555, but why would we not use a more typical CFEngine port 5309?
import time import zmq context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://10.68.71.184:5555") response = "message received" while True: # Wait for next request from client message = socket.recv() print("Received request: %s " % message) # Do some 'work' time.sleep(1) with open("queue.txt", "a") as queue: queue.write(message + "\n") # Send reply back to client socket.send( response )
The output would be:
root@wbhs-pkg:~# tail -F queue.txt {host:{uqhost:x2gvdt0001,domain:webhuis.nl,role:x2gvdt}} {host:{uqhost:kvmutl0004,domain:webhuis.nl,role:kvmutl}} {host:{uqhost:x2gvdt0001,domain:webhuis.nl,role:x2gvdt}} {host:{uqhost:x2gvdt0001,domain:webhuis.nl,role:x2gvdt}}
When a message is received the daemon places the message in a local queue for processing. The sending node receives a hard coded response "message received", but this could be any response as a result on the side of the listener. The response is useful when it triggers the node to supply more information in more messages.
0mq
CFEngine Enterprise offers messaging to some degree. Nodes share information they hold with the Policy Hub. This information can be
- Patches of installed software
- State of the Nodes with regards to promises fulfilled
- Configuration information (CMDB)
Outside this context, however, there is no method available to the agent to share any information it is willing to share, with a monitoring system for instance.
There are many messaging systems around and CFEngine should be able to interface with any of them, but in the initial stages the need is for a very light weight messaging system. The search for a suitable messaging mechanism eventually led to 0mq, just a library not an application. Could it be less in weight?
For reasons of simplicity the code examples, so far are in Python. CFEngine is independent of anything but libc and thus the end results of CFEngine messaging have to be developed in C.
Information sources
- http://nichol.as/zeromq-an-introduction
- https://github.com/zeromq/pyzmq/issues/132
- Author: Lev Givon <lev(at)columbia(dot)edu>
- http://blog.pythonisito.com/2012/08/distributed-systems-with-zeromq.html
- http://blog.scottlogic.com/2015/03/20/ZeroMQ-Quick-Intro.html
- http://lists.zeromq.org/pipermail/zeromq-dev/2011-February/009490.html
Basic patterns
ZeroMQ comes with five basic patterns
- Synchronous Request/Response
- Asynchronous Request/Response
- Publish/Subscribe
- Push/Pull
- Exclusive Pair
Clients and servers can take the role of server, being the one that is listening on a certain socket, IP address and port number, to incoming requests. When it comes to exchanging messages the most stable one is chosen to be the listener.
Synchronous Request/Response
This is the client server model and the most basic pattern. The client sends a request to the server and the server replies to the request. Other names are Request Reply
Sample Message
Example Code
Client Requests
socket = context.socket(zmq.REQ) socket.connect("tcp://wbhs-pkg.webhuis.nl:8080") # Do 10 requests, waiting each time for a response print("Sending request %s ..." % request) socket.send(b"Hello") # Get the reply. message = socket.recv() print("Received reply %s [ %s ]" % (request, message))
Server Replies
socket = context.socket(zmq.REP) socket.bind("tcp://10.68.71.184:8080") # Wait for next request from client message = socket.recv() print("Received request: %s" % message) # Send reply back to client socket.send(b"World")
Asynchronous Request/Response
Example Code
Publish/Subscribe
This is comparable to broadcasting.
Example Code
Server publishes
ctx = zmq.Context() publisher = ctx.socket(zmq.PUB) publisher.bind("tcp://10.68.71.184:5556") time.sleep(5) sequence = 0 id = 0 data =1000 while sequence < 20: id += 1; data += 1000; publisher.send("%i %i %i" % (sequence, id, data)); sequence += 1
Client subscriber receives
context = zmq.Context() socket = context.socket(zmq.SUB) print("Collecting updates from server...") socket.connect("tcp://10.68.71.184:5556") socket.setsockopt(zmq.SUBSCRIBE, b'') for update_nbr in range(5): string = socket.recv_string() print string
Output
0 1 2000 1 2 3000 2 3 4000 3 4 5000 4 5 6000
Push/Pull
In the official 0mq documentation this pattern is referred to as Parallel Pipeline.
Example Code
# The ventilator # Binds PUSH socket to tcp://localhost:5557 # Sends batch of tasks to workers via that socket import zmq import random import time context = zmq.Context() # Socket to send messages on sender = context.socket(zmq.PUSH) sender.bind("tcp://10.68.71.184:5557") # Socket with direct access to the sink: used to syncronize start of batch sink = context.socket(zmq.PUSH) sink.connect("tcp://10.68.71.184:5558") try: raw_input except NameError: # Python 3 raw_input = input print("Press Enter when the workers are ready: ") _ = raw_input() print("Sending tasks to workers...") # The first message is "0" and signals start of batch sink.send(b'0') # Initialize random number generator random.seed() # Send 100 tasks total_msec = 0 for task_nbr in range(100): # Random workload from 1 to 100 msecs workload = random.randint(1, 100) total_msec += workload sender.send_string(u'%i' % workload) print("Total expected cost: %s msec" % total_msec) # Give 0MQ time to deliver time.sleep(1)
# Worker # Connects PULL socket to tcp://10.68.71.184:5557 # Collects workloads from ventilator via that socket # Connects PUSH socket to tcp://10.68.71.184:5558 # Sends results to sink via that socket import sys import time import zmq context = zmq.Context() # Socket to receive messages on receiver = context.socket(zmq.PULL) receiver.connect("tcp://10.68.71.184:5557") # Socket to send messages to sender = context.socket(zmq.PUSH) sender.connect("tcp://10.68.71.184:5558") # Process tasks forever while True: s = receiver.recv() # Simple progress indicator for the viewer sys.stdout.write('.') sys.stdout.flush() # Do the work time.sleep(int(s)*0.001) # Send results to sink sender.send(b'')
# Sink # Binds PULL socket to tcp://10.68.71.184:5558 # Collects results from worker via that socket import sys import time import zmq context = zmq.Context() # Socket to receive messages on receiver = context.socket(zmq.PULL) receiver.bind("tcp://10.68.71.184:5558") # Wait for start of batch s = receiver.recv() # Start our clock now tstart = time.time() # Process 100 confirmations total_msec = 0 for task_nbr in range(100): s = receiver.recv() if task_nbr % 10 == 0: sys.stdout.write(':') else: sys.stdout.write('.') sys.stdout.flush() # Calculate and report duration of batch tend = time.time() print("Total elapsed time: %d msec" % ((tend-tstart)*1000))
Exclusive Pair
Example Code
exclusive-bind import zmq # ZeroMQ Context context = zmq.Context() # Define the socket using the "Context" socket = context.socket(zmq.PAIR) socket.bind("tcp://127.0.0.1:5696")
import zmq # ZeroMQ Context context = zmq.Context() # Define the socket using the "Context" socket = context.socket(zmq.PAIR) socket.connect("tcp://127.0.0.1:5696")
Return to: CFEngine