Difference between revisions of "Cfengine3 Messaging"

From PostgreSQL_wiki
Jump to: navigation, search
(Exclusive Pair)
(Push/Pull)
Line 99: Line 99:
   
 
== Push/Pull ==
 
== Push/Pull ==
  +
# Author: Lev Givon <lev(at)columbia(dot)edu>
  +
  +
  +
<pre>
  +
# The ventilator
  +
# Binds PUSH socket to tcp://localhost:5557
  +
# Sends batch of tasks to workers via that socket
  +
  +
import zmq
  +
import random
  +
import time
  +
  +
context = zmq.Context()
  +
  +
# Socket to send messages on
  +
sender = context.socket(zmq.PUSH)
  +
sender.bind("tcp://10.68.71.184:5557")
  +
  +
# Socket with direct access to the sink: used to syncronize start of batch
  +
sink = context.socket(zmq.PUSH)
  +
sink.connect("tcp://10.68.71.184:5558")
  +
  +
try:
  +
raw_input
  +
except NameError:
  +
# Python 3
  +
raw_input = input
  +
  +
print("Press Enter when the workers are ready: ")
  +
_ = raw_input()
  +
print("Sending tasks to workers...")
  +
  +
# The first message is "0" and signals start of batch
  +
sink.send(b'0')
  +
  +
# Initialize random number generator
  +
random.seed()
  +
  +
# Send 100 tasks
  +
total_msec = 0
  +
for task_nbr in range(100):
  +
  +
# Random workload from 1 to 100 msecs
  +
workload = random.randint(1, 100)
  +
total_msec += workload
  +
  +
sender.send_string(u'%i' % workload)
  +
  +
print("Total expected cost: %s msec" % total_msec)
  +
  +
# Give 0MQ time to deliver
  +
time.sleep(1)
  +
  +
</pre>
  +
<pre>
  +
# Worker
  +
# Connects PULL socket to tcp://10.68.71.184:5557
  +
# Collects workloads from ventilator via that socket
  +
# Connects PUSH socket to tcp://10.68.71.184:5558
  +
# Sends results to sink via that socket
  +
  +
import sys
  +
import time
  +
import zmq
  +
  +
  +
context = zmq.Context()
  +
  +
# Socket to receive messages on
  +
receiver = context.socket(zmq.PULL)
  +
receiver.connect("tcp://10.68.71.184:5557")
  +
  +
# Socket to send messages to
  +
sender = context.socket(zmq.PUSH)
  +
sender.connect("tcp://10.68.71.184:5558")
  +
  +
# Process tasks forever
  +
while True:
  +
s = receiver.recv()
  +
  +
# Simple progress indicator for the viewer
  +
sys.stdout.write('.')
  +
sys.stdout.flush()
  +
  +
# Do the work
  +
time.sleep(int(s)*0.001)
  +
  +
# Send results to sink
  +
sender.send(b'')
  +
  +
</pre>
  +
<pre>
  +
# Sink
  +
# Binds PULL socket to tcp://10.68.71.184:5558
  +
# Collects results from worker via that socket
  +
  +
import sys
  +
import time
  +
import zmq
  +
  +
  +
context = zmq.Context()
  +
  +
# Socket to receive messages on
  +
receiver = context.socket(zmq.PULL)
  +
receiver.bind("tcp://10.68.71.184:5558")
  +
  +
# Wait for start of batch
  +
s = receiver.recv()
  +
  +
# Start our clock now
  +
tstart = time.time()
  +
  +
# Process 100 confirmations
  +
total_msec = 0
  +
for task_nbr in range(100):
  +
s = receiver.recv()
  +
if task_nbr % 10 == 0:
  +
sys.stdout.write(':')
  +
else:
  +
sys.stdout.write('.')
  +
sys.stdout.flush()
  +
  +
# Calculate and report duration of batch
  +
tend = time.time()
  +
print("Total elapsed time: %d msec" % ((tend-tstart)*1000))
  +
</pre>
  +
 
== Exclusive Pair ==
 
== Exclusive Pair ==
 
<hr>
 
<hr>

Revision as of 13:21, 8 December 2015

> what would be killer is a combination of the latest cfengine
> (with promise theory) and 0mq.

Source: http://lists.zeromq.org/pipermail/zeromq-dev/2011-February/009491.html

This part of the Webhuis CFEngine initiative is research and therefore subject to change. :-)
It is the objective to have messaging incorporated in CFEngine and thus enabling nodes, by cf-agent that is, to send out messages to whatever listeners out there. The listeners can be cf-messages daemons or other nodes configured to process messages delivered by CFEngine Nodes.
CFEngine will need to implement a new promise type: messages:.
Parts of the works is to be transferred to and discussions will take place on:

https://github.com/Webhuis/Data

So let's go and deliver that kill!

0mq

The search for a suitable messaging mechanism eventually led to 0mq, just a library not an application. Could it be less in weight?
For reasons of symplicity the code examples, so far are in Python. CFEngine is independent of anything but libc and thus the end results of CFEngine messaging have to be developed in C.

Basic patterns

ZeroMQ comes with five basic patterns

  • Synchronous Request/Response
  • Asynchronous Request/Response
  • Publish/Subscribe
  • Push/Pull
  • Exclusive Pair

Nodes carry different kinds of information

  • Common information
  • Static information
  • Dynamic Information
  • State Information

In the analysis use cases of types of messages has to be mapped on the basic patterns. The listeners mirror the nodes mappings.

Synchronous Request/Response

Client Requests

socket = context.socket(zmq.REQ)
socket.connect("tcp://wbhs-pkg.webhuis.nl:8080")

#  Do 10 requests, waiting each time for a response
print("Sending request %s ..." % request)
socket.send(b"Hello")

#  Get the reply.
message = socket.recv()
print("Received reply %s [ %s ]" % (request, message))

Server Replies

socket = context.socket(zmq.REP)
socket.bind("tcp://10.68.71.184:8080")

#  Wait for next request from client
message = socket.recv()
print("Received request: %s" % message)

#  Send reply back to client
socket.send(b"World")

Asynchronous Request/Response

Publish/Subscribe

Server publishes

ctx = zmq.Context()
publisher = ctx.socket(zmq.PUB)

publisher.bind("tcp://10.68.71.184:5556")

time.sleep(5)

sequence = 0
id = 0
data =1000

while sequence < 20:
  id += 1;
  data += 1000;
  publisher.send("%i %i %i" % (sequence, id, data));
  sequence += 1

Client subscriber receives

context = zmq.Context()
socket = context.socket(zmq.SUB)

print("Collecting updates from server...")
socket.connect("tcp://10.68.71.184:5556")

socket.setsockopt(zmq.SUBSCRIBE, b'')

for update_nbr in range(5):
    string = socket.recv_string()
    print string

Output

0 1 2000
1 2 3000
2 3 4000
3 4 5000
4 5 6000

Push/Pull

  1. Author: Lev Givon <lev(at)columbia(dot)edu>


# The ventilator
# Binds PUSH socket to tcp://localhost:5557
# Sends batch of tasks to workers via that socket

import zmq
import random
import time

context = zmq.Context()

# Socket to send messages on
sender = context.socket(zmq.PUSH)
sender.bind("tcp://10.68.71.184:5557")

# Socket with direct access to the sink: used to syncronize start of batch
sink = context.socket(zmq.PUSH)
sink.connect("tcp://10.68.71.184:5558")

try:
    raw_input
except NameError:
    # Python 3
    raw_input = input

print("Press Enter when the workers are ready: ")
_ = raw_input()
print("Sending tasks to workers...")

# The first message is "0" and signals start of batch
sink.send(b'0')

# Initialize random number generator
random.seed()

# Send 100 tasks
total_msec = 0
for task_nbr in range(100):

    # Random workload from 1 to 100 msecs
    workload = random.randint(1, 100)
    total_msec += workload

    sender.send_string(u'%i' % workload)

print("Total expected cost: %s msec" % total_msec)

# Give 0MQ time to deliver
time.sleep(1)

# Worker
# Connects PULL socket to tcp://10.68.71.184:5557
# Collects workloads from ventilator via that socket
# Connects PUSH socket to tcp://10.68.71.184:5558
# Sends results to sink via that socket

import sys
import time
import zmq


context = zmq.Context()

# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://10.68.71.184:5557")

# Socket to send messages to
sender = context.socket(zmq.PUSH)
sender.connect("tcp://10.68.71.184:5558")

# Process tasks forever
while True:
    s = receiver.recv()

    # Simple progress indicator for the viewer
    sys.stdout.write('.')
    sys.stdout.flush()

    # Do the work
    time.sleep(int(s)*0.001)

    # Send results to sink
    sender.send(b'')

# Sink
# Binds PULL socket to tcp://10.68.71.184:5558
# Collects results from worker via that socket

import sys
import time
import zmq


context = zmq.Context()

# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://10.68.71.184:5558")

# Wait for start of batch
s = receiver.recv()

# Start our clock now
tstart = time.time()

# Process 100 confirmations
total_msec = 0
for task_nbr in range(100):
    s = receiver.recv()
    if task_nbr % 10 == 0:
        sys.stdout.write(':')
    else:
        sys.stdout.write('.')
    sys.stdout.flush()

# Calculate and report duration of batch
tend = time.time()
print("Total elapsed time: %d msec" % ((tend-tstart)*1000))

Exclusive Pair


Return to: Cfengine