#!/usr/bin/python3
#
# submqttm.py, SUBscribe_MQTT_Measurements:
# This script implements a Xymon client, which receives measurements through an
# MQTT broker. Abstract class XymonTest contains all methods to receive the
# measurements and send the result to the Xymon server, except for the method to
# build the actual message from the received measurements. This script needs to
# be extended with a derived class of XymonTest for each new measurement. The
# derived class must implement at least a (tailored) version of method
# BuildMessage.
#
# This script already contains one Xymon client, which takes some statistics
# gathered by the MQTT broker and reports them to the Xymon server.
#
# To do:
# - Thread Receiver: make list of topics per broker, open a session for each
# broker.
#
import datetime
import paho.mqtt.client as mqtt
import queue
import re
import signal
import socket
import syslog
import threading
import time
import watchdog
#
# Installation constants.
# =======================
#
# MQTT.
#
mqtt_collect_timer = 0.5 # Max time span to receive all topics [s]
#
# Define the mapping between the received messages from the MQTT broker on one
# hand and tests within Xymon on the other hand.
#
# If the mode is 'continuous', it is assumed that for each of the topics one,
# and only one, message is sent to the broker within a short time frame,
# typically within a few milliseconds. They do not have attribute 'retain' set.
# Some time later, this process repeats, that is another full update is sent.
# This mode of operation is context free, as there is no need to remember any
# previous result.
#
# If the mode is 'periodic', it is assumed that the messages all have attribute
# 'retain' set, and that all of them are sent by the broker upon subscribing to
# those topics. Thus by subscribing for a short time when a Xymon status message
# is to be sent, and terminating the subscription once the required information
# is retrieved, there is no need to accumulate and remember the latest value for
# each topic within instances of this class.
# In case of topic tree $SYS, a periodic new subscription is used for an
# additional reason: this will minimize the number of messages sent, and thus
# the 'pollution' of the statistics caused by retrieving the statistics.
#
# Note: In the topics, wildcards '+' and '#' should not be used.
#
MqttXymon= [
# Throughput statistics of the MQTT broker
{ 'Mode' : 'periodic',
'MqttHost' : '<your.mqtt.broker.address>',
'Topic' : ( '$SYS/broker/bytes/received' ,
'$SYS/broker/bytes/sent' ,
'$SYS/broker/messages/received',
'$SYS/broker/messages/sent' ,
'$SYS/broker/messages/stored'
),
'XymonClss': 'XymonTestSys',
'XymonHost': '<your.xymon.server.address>',
'XymonPort': 1984,
'XymonSrc' : '<mqtt.server.name>',
'XymonTest': 'mqtt',
'Handler' : None
},
]
#
# Global variables.
# =================
#
mqtt_broker= None # Source of MQTT messages
dispatch= {} # Destinations of MQTT messages
mqttq = queue.Queue() # First buffer of MQTT messages
#
# Call-back function definitions.
# ===============================
#
# Call-back function on_connect is invoked if the connection status to the
# broker is established. If successful, the value of return code rc will be
# zero. If the connection failed. the program will be stopped.
#
def on_connect( client, userdata, flags, rc ):
if rc != 0:
MainThread.set() # Stop this program
client.disconnect() # Clean up
#
# Call-back function on_disconnect is invoked if the session with the broker is
# disconnected. If this event is unexpected, a request to terminate this program
# is posted.
#
def on_disconnect( client, userdata, rc ):
if rc != 0: # Exclude response on disconnect()
MainThread.set() # Stop this program
#
# Call-back function on_message is invoked if a message from a subscription
# session with the broker is received. The message is moved to the queue of
# thread Receiver.
#
def on_message( client, userdata, message ):
now= time.time() # Time of receipt of message
mqttq.put( (client,message,now) )
#
# Generic class definitions.
# ==========================
#
# Abstract class StoppableThread adds Event _stop_event to the thread and
# methods to set and check this event. Moreover, methods LogMessage and Wait
# (the latter is an extended version of time.sleep) are added, as they are
# needed by almost all subclasses.
#
class StoppableThread( threading.Thread ):
"""Thread class with a stop() method. The thread itself has to check regularly
for the stopped() condition."""
def __init__( self ):
super().__init__()
self._stop_event = threading.Event()
#
# Method LogMessage sends a message to the local syslog server.
#
def LogMessage( self, Msg ):
syslog.openlog( 'XyMqtt', 0, syslog.LOG_LOCAL6 )
syslog.syslog ( ' '.join( (self.name,Msg) ) )
syslog.closelog()
def join( self, TimeOut=None ):
self.stop()
super().join( TimeOut )
def stop( self ):
self._stop_event.set()
def stopped( self ):
return self._stop_event.is_set()
def wait( self, timeout ):
return self._stop_event.wait( timeout )
#
# Method Wait waits until the (Unix) time stamp reaches the next integer
# multiple of Period seconds and then another Delay seconds. However, if the
# thread method stop is invoked before the time has expired, this method will
# return at that time.
# Parameters Period and Delay are integer numbers, with Period >= 2 and
# 0 <= Delay < Period.
#
def Wait( self, Period, Delay ):
Now= time.time()
ActTim= int( Now )
ActTim= ( (ActTim+Period-1) // Period ) * Period
SlpTim= int( ActTim - Now ) + Delay
if SlpTim < 1.5: SlpTim+= Period
self.wait( SlpTim )
#
# Abstract class XymonTest defines the collector for a specific set of MQTT
# messages which are needed for one Xymon test. Once all messages are received,
# the status message is generated and forwarded to the Xymon server.
# A child class should (at least) override method BuildMessage with a version
# specific for the Xymon test. Thus at least one child class is needed for each
# test type mentioned in list MqttXymon.
#
class XymonTest:
XyColour= { 'green': 0, 'yellow': 1, 'red': 2 } # Class read-only variable
#
# Method __init__ presets the instance variables upon creation of an object.
# They are partly derived from table MqttXymon.
#
def __init__( self, mqxy ):
# Define the parameters related to Xymon.
self.Host = mqxy['XymonHost']
self.Port = mqxy['XymonPort']
self.Source= mqxy['XymonSrc' ]
self.Test = mqxy['XymonTest']
self.Time = None
self.Colour= 'green'
self.Error = {}
for t in ('red','yellow'):
self.Error[t]= []
# Define the parameters related to the receipt of measurements via MQTT.
self.Topic = {}
for t in mqxy['Topic']:
self.Topic[t]= None
self.Topics= len( self.Topic )
self.Msgs = 0
self.Timer = watchdog.WatchdogTimer( mqtt_collect_timer, self._timeout )
#
# Private method _timeout is a call-back routine. It is invoked if the watchdog
# timer expires. Upon expiration, the collected information is sent to Xymon,
# although not all expected information is received.
#
def _timeout( self ):
for key in self.Topic:
if self.Topic[key] is None:
self.Error['yellow'].append( 'Missing topic: {}'.format(key) )
#
self.InformXymon( self.BuildMessage() )
self.ClearTest()
#
# Private method _update_colour updates the status (colour) of the xymon test.
# The status is set to the worst of the current status and the supplied status.
#
def _update_colour( self, clr ):
assert clr in self.XyColour, 'Unsupported colour: {}'.format(clr)
if self.XyColour[clr] > self.XyColour[self.Colour]:
self.Colour= clr
#
# Method BuildMessage builds the message to be sent to the Xymon server. This
# version converts the list of error messages and warnings into a string. At
# the same time, the test status is updated.
#
def BuildMessage( self ):
XyMsg= ''
self.Colour= 'green'
for colour in ('red','yellow'):
if self.Error[colour]:
self._update_colour( colour )
for m in self.Error[colour]:
XyMsg+= "&{} : {}\n".format( colour, m )
return XyMsg
#
# Method ClearTest presets the instance variables for the next measurement.
#
def ClearTest( self ):
if self.Timer.is_alive():
self.Timer.stop() # Inhibit timeout
for key in self.Topic: # Clear the measurements
self.Topic[key]= None
self.Msgs= 0
#
for key in self.Error: # Remove any error messages
self.Error[key]= []
#
# Method InformXymon takes a formatted message for the Xymon server, prepends
# the status message header and sends the resulting message to the Xymon
# server. The method returns True if sending the message succeeded, otherwise
# it will return a False value.
#
def InformXymon( self, msg ):
XyPars= { 'host': self.Source, 'test': self.Test, 'colour': self.Colour,
'message': msg }
XyTime= int( time.time() ) if self.Time is None else int( self.Time )
XyTime= datetime.datetime.fromtimestamp(XyTime).isoformat( sep=' ' )
XyPars['time']= XyTime
XyMsg = '''status {host}.{test} {colour} {time}
{message}'''.format( **XyPars )
#
try:
s= socket.socket( socket.AF_INET, socket.SOCK_STREAM )
except socket.error as msg:
return False
#
if not re.search( '^[\d\.]+$', self.Host ):
try:
self.Host= socket.gethostbyname( XyServ )
except socket.gaierror:
return False
#
try:
s.connect( (self.Host,self.Port) )
s.sendall( XyMsg.encode() )
s.close()
return True
except socket.error as msg:
s.close()
return False
#
# Method Update handles the next message received from the MQTT broker. The
# watchdog timer is started upon receipt of the first topic message. If all the
# expected topic messages are received, a Xymon message is generated and send
# to the Xymon server.
#
def Update( self, uts, tpc, msg ):
assert tpc in self.Topic, 'Unknown topic "{}"'.format(tpc)
#
if self.Topic[tpc] is None:
if self.Msgs == 0:
self.Timer.reset()
self.Time= uts
else:
self.Time= min( self.Time, uts )
self.Topic[tpc]= msg
self.Msgs+= 1
if self.Msgs == self.Topics:
XymMsg= self.BuildMessage()
self.InformXymon( XymMsg )
self.ClearTest()
#
else:
if self.Topic[tpc] != msg:
self.Error['yellow'].extend( 'Topic received multiple times: topic={},'
' old value={}, new value={}\n'.format(tpc,self.Topic[tpc],msg) )
self.Topic[tpc]= msg
#
# Xymon test class definitions.
# =============================
#
# Class XymonTestSys reports some statistics of the MQTT broker to the Xymon
# server.
#
class XymonTestSys( XymonTest ):
#
# Method BuildMessage shows the totals as received from the MQTT broker. The
# graphs generated from these totals show both the message rate and the bit
# rate.
#
def BuildMessage( self ):
TblHdr= '<table border=1 cellpadding=5>'
TblRow= ' <tr> <td>{}, {}</td> <td align="right">{}</td> </tr>\n'
OctRcv= self.Topic['$SYS/broker/bytes/received']
OctXmt= self.Topic['$SYS/broker/bytes/sent']
MsgRcv= self.Topic['$SYS/broker/messages/received']
MsgXmt= self.Topic['$SYS/broker/messages/sent']
MsgSav= self.Topic['$SYS/broker/messages/stored']
XyMsg = '<b>MQTT broker statistics</b>\n'
XyMsg+= super().BuildMessage() # Include error messages
XyMsg+= '\n'
XyMsg+= '<b>Total counts</b>\n'
XyMsg+= TblHdr
XyMsg+= ' <tr> <th>Counter</th> <th>Total</th> </tr>\n'
XyMsg+= TblRow.format( 'Octets' , 'received' , OctRcv )
XyMsg+= TblRow.format( 'Octets' , 'transmitted', OctXmt )
XyMsg+= TblRow.format( 'Messages', 'received' , MsgRcv )
XyMsg+= TblRow.format( 'Messages', 'transmitted', MsgXmt )
XyMsg+= TblRow.format( 'Messages', 'stored' , MsgSav )
XyMsg+= '</table>\n'
if OctRcv is None: Octrcv= 'U'
if OctXmt is None: OctXmt= 'U'
if MsgRcv is None: Msgrcv= 'U'
if MsgXmt is None: MsgXmt= 'U'
if MsgSav is None: MsgSav= 'U'
XyMsg+= '<!-- linecount=1 -->\n'
XyMsg+= '<!--DEVMON RRD: mqtt.broker 0 0\n'
XyMsg+= 'DS:Rcv:DERIVE:600:0:U DS:Xmt:DERIVE:600:0:U\n'
XyMsg+= 'octet {}:{}\n'.format( OctRcv, OctXmt )
XyMsg+= '-->\n'
XyMsg+= '<!--DEVMON RRD: mqtt.broker 0 0\n'
XyMsg+= 'DS:Rcv:DERIVE:600:0:U DS:Xmt:DERIVE:600:0:U DS:Sav:GAUGE:600:0:U\n'
XyMsg+= 'message {}:{}:{}\n'.format( MsgRcv, MsgXmt, MsgSav )
XyMsg+= '-->'
return XyMsg
#
# Thread definitions.
# ===================
#
# Of each of the following classes at most one instance may be created.
#
# Class Receiver receives the MQTT messages. The primary function of this class
# is to provide a queue to buffer the incoming messages. The secondary function
# is to move the messages to the XymonTest instances which will actually do
# something with the message.
#
class Receiver( StoppableThread ):
def __init__( self ):
super().__init__()
self.name = 'Receiver'
#
# Method run subscribes to all topics mentioned in 'continuous' mode tests. It
# receives the messages from the broker and uses table dispatch to send them to
# their destinations.
#
def run( self ):
global mqtt_broker
self.LogMessage( 'Starting thread' )
#
# Subscribe to the topics defined in table MqttXymon. To subscribe, a list of
# all topics of interest is needed.
#
sublot= [] # To subscribe, list of topics
for mqxy in MqttXymon:
if mqxy['Mode'] != 'continuous': continue
if mqtt_broker is None or mqtt_broker == mqxy['MqttHost']:
mqtt_broker= mqxy['MqttHost']
for t in mqxy['Topic']:
if t not in sublot:
sublot.append( t )
else:
self.LogMessage( 'Ignore test {}/{}'.format(mqxy['XymonSrc'],mqxy['XymonTest']) )
self.LogMessage( ' No support yet for multiple MQTT brokers' )
#
# Create a connection to the MQTT broker.
#
if len(sublot) == 0:
self.client= None
else:
self.client= mqtt.Client( 'xymon.client.a.'+socket.gethostname() )
self.client.on_connect = on_connect
self.client.on_disconnect= on_disconnect
self.client.on_message = on_message
self.client.connect( mqtt_broker )
#
# Create tuples containing (topic,qos), in order to subscribe to all topics in
# one call to subscribe.
#
for i,t in enumerate(sublot):
sublot[i]= ( t, 0 )
self.client.subscribe( sublot )
self.client.loop_start()
#
# Wait for messages to arrive. Each message is forwarded to the instances of
# (children of) class XymonTest, which will actually do something with the
# message.
#
while not self.stopped():
(cid,msg,uts)= mqttq.get()
if msg is None: break
#
topic= msg.topic
payld= str( msg.payload.decode("utf-8") )
if topic in dispatch:
for xt in dispatch[topic]:
xt.Update( uts, topic, payld )
else:
self.LogMessage( 'Unexpected message, topic {}'.format(topic) )
#
# Clean up and exit.
#
if self.client is not None:
self.client.loop_stop()
self.client.disconnect()
self.LogMessage( 'Stopping thread' )
#
# Method stop stops this thread. A dummy message is pushed onto the input
# queue, to unblock the blocking call to queue.get in method Receiver.run.
#
def stop( self ):
super().stop()
mqttq.put( (None,None,None) )
#
# Class Periodic periodically schedules a subscription to the topics defined in
# entries in table MqttXymon with mode being 'periodic'. The messages in these
# topics should all have attribute 'retained' set, thus at subscription the
# latest status (value) of each topic is sent. Those messages will initially be
# handled by thread Receiver, which forwards them to the appropriate instance of
# XymonTest.
#
class Periodic( StoppableThread ):
def __init__( self ):
super().__init__()
self.name= 'Periodic'
self.done= threading.Event()
# self.done.clear()
def _on_connect( self, client, userdata, flags, rc ):
if rc != 0:
self.LogMessage( 'Connection to broker failed' )
client.disconnect() # Clean up
def _on_disconnect( self, client, userdata, rc ):
if rc != 0: # Exclude response on disconnect()
self.LogMessage( 'Unexpected disconnect from broker' )
def _on_message( self, client, userdata, message ):
if self.done.is_set(): return # Ignore additional messages
#
now= time.time() # Time of receipt of message
self.msgs-= 1 # Decrement expected count of messages
if self.msgs == 0 : self.done.set()
mqttq.put( (client,message,now) )
def run( self ):
self.LogMessage( 'Starting thread' )
#
while not self.stopped():
for mqxy in MqttXymon:
if mqxy['Mode'] != 'periodic': continue
self.subscribe( mqxy )
self.Wait( 300, 0 )
#
self.LogMessage( 'Stopping thread' )
def subscribe( self, mqxy ):
self.lot= []
for t in mqxy['Topic']:
self.lot.append( (t,0) )
self.msgs= len( mqxy['Topic'] )
self.done.clear()
#
client= mqtt.Client( 'xymon.client.b.'+socket.gethostname() )
client.on_connect = self._on_connect
client.on_disconnect= self._on_disconnect
client.on_message = self._on_message
client.connect( mqtt_broker )
client.subscribe( self.lot )
#
client.loop_start() # Start thread to read messages
rc= self.done.wait( mqtt_collect_timer ) # Wait till all messages are received
client.loop_stop() # Stop thread
client.disconnect() # Stop subscriber session
MainThread= threading.Event() # Set to stop this script
#
# Call-back HandleSignal is invoked whenever a SIGINT or SIGTERM signal arrives.
# It reports the receipt of the signal and sets the flag to stop this script.
#
def HandleSignal( signum, frame ):
MainThread.set() # Set flag to stop script
syslog.openlog( 'XyMqtt', 0, syslog.LOG_LOCAL6 )
syslog.syslog ( 'Termination signal {} received'.format(signum) )
syslog.closelog()
#
# Main program.
# =============
#
# Set up handling of termination signals.
#
signal.signal( signal.SIGINT , HandleSignal )
signal.signal( signal.SIGTERM, HandleSignal )
#
# Build the instances of the classes which are to handle the messages received
# from MQTT.
#
for mqxy in MqttXymon:
aclass= mqxy['XymonClss']
assert aclass in globals(), 'Unknown XymonTest class name: {}'.format(aclass)
mqxy['Handler']= globals()[aclass]( mqxy )
#
# Build a routing table for the messages received from MQTT.
#
for mqxy in MqttXymon:
for topic in mqxy['Topic']:
if topic not in dispatch:
dispatch[topic]= []
dispatch[topic].append( mqxy['Handler'] )
#
# Start the threads.
#
threads= [] # List of long lived threads
th0= Receiver() ; threads.append(th0) ; th0.start()
th1= Periodic() ; threads.append(th1) ; th1.start()
#
# Monitor the state of the threads of this script. If one thread dies or if
# event MainThread is set, all (other) threads, including this main thread,
# should stop (too).
#
while len(threads) > 0:
try:
all_alive= True # See if all threads are currently alive
for t in threads:
all_alive= all_alive and t.is_alive()
if all_alive:
MainThread.wait( 10 ) # If so, wait some time
if not all_alive or MainThread.is_set():
for t in reversed( threads ): # Note the order of this loop
if t.is_alive():
t.stop()
t.join()
threads.remove( t ) # Thread has stopped
except KeyboardInterrupt:
MainThread.set() # Set flag to stop this script