Cfengine3 Messaging
> what would be killer is a combination of the latest cfengine > (with promise theory) and 0mq.
Source: http://lists.zeromq.org/pipermail/zeromq-dev/2011-February/009491.html
This part of the Webhuis CFEngine initiative is research and therefore subject to change. :-)
It is the objective to have messaging incorporated in CFEngine and thus enabling nodes, by cf-agent that is, to send out messages to whatever listeners out there. The listeners can be cf-messages daemons or other nodes configured to process messages delivered by CFEngine Nodes.
CFEngine will need to implement a new promise type: messages:.
Parts of the works is to be transferred to and discussions will take place on:
https://github.com/Webhuis/Data
So let's go and deliver that kill!
Contents
0mq
The search for a suitable messaging mechanism eventually led to 0mq, just a library not an application. Could it be less in weight?
For reasons of symplicity the code examples, so far are in Python. CFEngine is independent of anything but libc and thus the end results of CFEngine messaging have to be developed in C.
Basic patterns
ZeroMQ comes with five basic patterns
- Synchronous Request/Response
- Asynchronous Request/Response
- Publish/Subscribe
- Push/Pull
- Exclusive Pair
Nodes carry different kinds of information
- Common information
- Static information
- Dynamic Information
- State Information
In the analysis use cases of types of messages has to be mapped on the basic patterns. The listeners mirror the nodes mappings.
Synchronous Request/Response
Example Code
Client Requests
socket = context.socket(zmq.REQ) socket.connect("tcp://wbhs-pkg.webhuis.nl:8080") # Do 10 requests, waiting each time for a response print("Sending request %s ..." % request) socket.send(b"Hello") # Get the reply. message = socket.recv() print("Received reply %s [ %s ]" % (request, message))
Server Replies
socket = context.socket(zmq.REP) socket.bind("tcp://10.68.71.184:8080") # Wait for next request from client message = socket.recv() print("Received request: %s" % message) # Send reply back to client socket.send(b"World")
Asynchronous Request/Response
Example Code
Publish/Subscribe
Server publishes
ctx = zmq.Context() publisher = ctx.socket(zmq.PUB) publisher.bind("tcp://10.68.71.184:5556") time.sleep(5) sequence = 0 id = 0 data =1000 while sequence < 20: id += 1; data += 1000; publisher.send("%i %i %i" % (sequence, id, data)); sequence += 1
Client subscriber receives
context = zmq.Context() socket = context.socket(zmq.SUB) print("Collecting updates from server...") socket.connect("tcp://10.68.71.184:5556") socket.setsockopt(zmq.SUBSCRIBE, b'') for update_nbr in range(5): string = socket.recv_string() print string
Output
0 1 2000 1 2 3000 2 3 4000 3 4 5000 4 5 6000
Push/Pull
Example Code
- Author: Lev Givon <lev(at)columbia(dot)edu>
# The ventilator # Binds PUSH socket to tcp://localhost:5557 # Sends batch of tasks to workers via that socket import zmq import random import time context = zmq.Context() # Socket to send messages on sender = context.socket(zmq.PUSH) sender.bind("tcp://10.68.71.184:5557") # Socket with direct access to the sink: used to syncronize start of batch sink = context.socket(zmq.PUSH) sink.connect("tcp://10.68.71.184:5558") try: raw_input except NameError: # Python 3 raw_input = input print("Press Enter when the workers are ready: ") _ = raw_input() print("Sending tasks to workers...") # The first message is "0" and signals start of batch sink.send(b'0') # Initialize random number generator random.seed() # Send 100 tasks total_msec = 0 for task_nbr in range(100): # Random workload from 1 to 100 msecs workload = random.randint(1, 100) total_msec += workload sender.send_string(u'%i' % workload) print("Total expected cost: %s msec" % total_msec) # Give 0MQ time to deliver time.sleep(1)
# Worker # Connects PULL socket to tcp://10.68.71.184:5557 # Collects workloads from ventilator via that socket # Connects PUSH socket to tcp://10.68.71.184:5558 # Sends results to sink via that socket import sys import time import zmq context = zmq.Context() # Socket to receive messages on receiver = context.socket(zmq.PULL) receiver.connect("tcp://10.68.71.184:5557") # Socket to send messages to sender = context.socket(zmq.PUSH) sender.connect("tcp://10.68.71.184:5558") # Process tasks forever while True: s = receiver.recv() # Simple progress indicator for the viewer sys.stdout.write('.') sys.stdout.flush() # Do the work time.sleep(int(s)*0.001) # Send results to sink sender.send(b'')
# Sink # Binds PULL socket to tcp://10.68.71.184:5558 # Collects results from worker via that socket import sys import time import zmq context = zmq.Context() # Socket to receive messages on receiver = context.socket(zmq.PULL) receiver.bind("tcp://10.68.71.184:5558") # Wait for start of batch s = receiver.recv() # Start our clock now tstart = time.time() # Process 100 confirmations total_msec = 0 for task_nbr in range(100): s = receiver.recv() if task_nbr % 10 == 0: sys.stdout.write(':') else: sys.stdout.write('.') sys.stdout.flush() # Calculate and report duration of batch tend = time.time() print("Total elapsed time: %d msec" % ((tend-tstart)*1000))
Exclusive Pair
Return to: Cfengine