monitors:submqttm

submqttm

Author Wim Nelis
Compatibility Xymon 4.2
Requirements Python3
Download None
Last Update 2020-11-07

Script submqttm.py (short for SUBscribe_to_MQTT_Measurements) is a skeleton for one or more Xymon tests, which receive their measurements via MQTT. An abstract class XymonTest is defined, which handles the receipt of the topics, the detection that all required information is received, and the transmission of the Xymon message to the Xymon server. Per Xymon test a derived class needs to be defined, containing (at least) method BuildMessage, which transforms the measurements received from MQTT into a message to be sent to Xymon. The script contains a receiver thread which subscribes to all required topics, and which calls method XymonTest.Update to handle them. List MqttXymon near the start of the script contains the mapping between MQTT topics and Xymon tests.

As distributed, the script contains a single Xymon test, which shows the incoming (publish) and outgoing (subscribe) message rate as well as the incoming and outgoing octet rate of the MQTT broker.

This script was developed to handle multiple small, low power sensors based on an ESP8266 micro-controller. As multiple destinations for the collected data are expected, and as MQTT provides for a simple transfer and distribution mechanism, these micro-controllers publish their measurements using MQTT.

Client side

The source modules submqttm.py and watchdog.py shown in the next chapter are to be installed in directory /usr/lib/xymon/client/ext.

Script submqttm.py needs to be running continuously, as the topics can arrive at any time. Thus xymonlaunch is not used. In stead, a service is defined which will start script submqttm.py at system start.

Service submqttm is defined by creating a file named /etc/systemd/system as super user. An example of the content of this file is shown below.

[Unit]
Description=Xymon client with MQTT input
 
[Service]
Type=simple
User=xymon
Group=xymon
ExecStart=/usr/lib/xymon/client/ext/submqttm.py
WorkingDirectory=/usr/lib/xymon/client/ext
StandardOutput=null
StandardError=null
Restart=on-abort

[Install]
WantedBy=multi-user.target

Once file submqttm.service is created, the following two commands are needed to activate the script:

sudo systemctl start submqttm
sudo systemctl enable submqttm

Note that though the script is running continuously, the tests with mode 'periodic' are run once every 5 minutes.

Server side

For the initial client, which monitors the throughput of one MQTT broker, two extensions of the xymon server configuration are needed. First, xymonserver.cfg should be extended. This is achieved by adding file mqtt-thr.cfg in directory /usr/lib/xymon/server/etc/xymonserver.d. It' contents is:

TEST2RRD+=",mqtt=devmon"
GRAPHS_mqtt="mqtttm,mqttto"

Secondly, graphs.cfg should be extended with the definitions of the graphs. In directory /usr/lib/xymon/server/etc/graphs.d (also) a file named mqtt-thr.cfg is created containing the following definitions:

[mqtttm]
        TITLE , MQTT message rate
        YAXIS Throughput [p/s]
        -l 0
        DEF:mr=mqtt.broker.message.rrd:Rcv:AVERAGE
        DEF:mx=mqtt.broker.message.rrd:Xmt:AVERAGE
        LINE1:mr#0000FF:ingress
        GPRINT:mr:MIN:Min \: %5.1lf %sp/s
        GPRINT:mr:MAX:Max \: %5.1lf %sp/s
        GPRINT:mr:AVERAGE:Avg \: %5.1lf %sp/s
        GPRINT:mr:LAST:Cur \: %5.1lf %sp/s\n
        LINE1:mx#00FF00:egress
        GPRINT:mx:MIN: Min \: %5.1lf %sp/s
        GPRINT:mx:MAX:Max \: %5.1lf %sp/s
        GPRINT:mx:AVERAGE:Avg \: %5.1lf %sp/s
        GPRINT:mx:LAST:Cur \: %5.1lf %sp/s\n

[mqttto]
        TITLE , MQTT bit rate
        YAXIS Throughput [b/s]
        -l 0
        DEF:or=mqtt.broker.octet.rrd:Rcv:AVERAGE
        DEF:ox=mqtt.broker.octet.rrd:Xmt:AVERAGE
        CDEF:br=or,8,*
        CDEF:bx=ox,8,*
        LINE1:br#0000FF:ingress
        GPRINT:br:MIN:Min \: %5.1lf %sb/s
        GPRINT:br:MAX:Max \: %5.1lf %sb/s
        GPRINT:br:AVERAGE:Avg \: %5.1lf %sb/s
        GPRINT:br:LAST:Cur \: %5.1lf %sb/s\n
        LINE1:bx#00FF00:egress
        GPRINT:bx:MIN: Min \: %5.1lf %sb/s
        GPRINT:bx:MAX:Max \: %5.1lf %sb/s
        GPRINT:bx:AVERAGE:Avg \: %5.1lf %sb/s
        GPRINT:bx:LAST:Cur \: %5.1lf %sb/s\n

A small note: typically I'll end each TITLE-line with two spaces. This results in a bigger separation between the title and the date-range, which improves the readability of the top line in the graph. Those two trailing spaces are missing in the above definition of the graphs.

Show Code ⇲

Hide Code ⇱

#!/usr/bin/python3
#
# submqttm.py, SUBscribe_MQTT_Measurements:
# This script implements a Xymon client, which receives measurements through an
# MQTT broker. Abstract class XymonTest contains all methods to receive the
# measurements and send the result to the Xymon server, except for the method to
# build the actual message from the received measurements. This script needs to
# be extended with a derived class of XymonTest for each new measurement. The
# derived class must implement at least a (tailored) version of method
# BuildMessage.
#
# This script already contains one Xymon client, which takes some statistics
# gathered by the MQTT broker and reports them to the Xymon server.
#
# To do:
# - Thread Receiver: make list of topics per broker, open a session for each
#   broker.
#
import datetime
import paho.mqtt.client as mqtt
import queue
import re
import signal
import socket
import syslog
import threading
import time
import watchdog

#
# Installation constants.
# =======================
#
# MQTT.
#
mqtt_collect_timer = 0.5                # Max time span to receive all topics [s]

#
# Define the mapping between the received messages from the MQTT broker on one
# hand and tests within Xymon on the other hand.
#
# If the mode is 'continuous', it is assumed that for each of the topics one,
# and only one, message is sent to the broker within a short time frame,
# typically within a few milliseconds. They do not have attribute 'retain' set.
# Some time later, this process repeats, that is another full update is sent.
# This mode of operation is context free, as there is no need to remember any
# previous result.
#
# If the mode is 'periodic', it is assumed that the messages all have attribute
# 'retain' set, and that all of them are sent by the broker upon subscribing to
# those topics. Thus by subscribing for a short time when a Xymon status message
# is to be sent, and terminating the subscription once the required information
# is retrieved, there is no need to accumulate and remember the latest value for
# each topic within instances of this class.
# In case of topic tree $SYS, a periodic new subscription is used for an
# additional reason: this will minimize the number of messages sent, and thus
# the 'pollution' of the statistics caused by retrieving the statistics.
#
# Note: In the topics, wildcards '+' and '#' should not be used.
#
MqttXymon= [
 # Throughput statistics of the MQTT broker
  { 'Mode'     : 'periodic',
    'MqttHost' : '<your.mqtt.broker.address>',
    'Topic'    : ( '$SYS/broker/bytes/received'   ,
                   '$SYS/broker/bytes/sent'       ,
                   '$SYS/broker/messages/received',
                   '$SYS/broker/messages/sent'    ,
                   '$SYS/broker/messages/stored'
                 ),
    'XymonClss': 'XymonTestSys',
    'XymonHost': '<your.xymon.server.address>',
    'XymonPort': 1984,
    'XymonSrc' : '<mqtt.server.name>',
    'XymonTest': 'mqtt',
    'Handler'  : None
  },
]


#
# Global variables.
# =================
#
mqtt_broker= None                       # Source of MQTT messages
dispatch= {}                            # Destinations of MQTT messages
mqttq   = queue.Queue()                 # First buffer of MQTT messages


#
# Call-back function definitions.
# ===============================
#
# Call-back function on_connect is invoked if the connection status to the
# broker is established. If successful, the value of return code rc will be
# zero. If the connection failed. the program will be stopped.
#
def on_connect( client, userdata, flags, rc ):
  if rc != 0:
    MainThread.set()                    # Stop this program
    client.disconnect()                 # Clean up

#
# Call-back function on_disconnect is invoked if the session with the broker is
# disconnected. If this event is unexpected, a request to terminate this program
# is posted.
#
def on_disconnect( client, userdata, rc ):
  if rc != 0:                           # Exclude response on disconnect()
    MainThread.set()                    # Stop this program

#
# Call-back function on_message is invoked if a message from a subscription
# session with the broker is received. The message is moved to the queue of
# thread Receiver.
#
def on_message( client, userdata, message ):
  now= time.time()                      # Time of receipt of message
  mqttq.put( (client,message,now) )


#
# Generic class definitions.
# ==========================
#
# Abstract class StoppableThread adds Event _stop_event to the thread and
# methods to set and check this event. Moreover, methods LogMessage and Wait
# (the latter is an extended version of time.sleep) are added, as they are
# needed by almost all subclasses.
#
class StoppableThread( threading.Thread ):
  """Thread class with a stop() method. The thread itself has to check regularly
  for the stopped() condition."""

  def __init__( self ):
    super().__init__()
    self._stop_event = threading.Event()

 #
 # Method LogMessage sends a message to the local syslog server.
 #
  def LogMessage( self, Msg ):
    syslog.openlog( 'XyMqtt', 0, syslog.LOG_LOCAL6 )
    syslog.syslog ( ' '.join( (self.name,Msg) ) )
    syslog.closelog()

  def join( self, TimeOut=None ):
    self.stop()
    super().join( TimeOut )

  def stop( self ):
    self._stop_event.set()

  def stopped( self ):
    return self._stop_event.is_set()

  def wait( self, timeout ):
    return self._stop_event.wait( timeout )

 #
 # Method Wait waits until the (Unix) time stamp reaches the next integer
 # multiple of Period seconds and then another Delay seconds. However, if the
 # thread method stop is invoked before the time has expired, this method will
 # return at that time.
 # Parameters Period and Delay are integer numbers, with Period >= 2 and
 # 0 <= Delay < Period.
 #
  def Wait( self, Period, Delay ):
    Now= time.time()
    ActTim= int( Now )
    ActTim= ( (ActTim+Period-1) // Period ) * Period
    SlpTim= int( ActTim - Now ) + Delay
    if SlpTim < 1.5:  SlpTim+= Period
    self.wait( SlpTim )


#
# Abstract class XymonTest defines the collector for a specific set of MQTT
# messages which are needed for one Xymon test. Once all messages are received,
# the status message is generated and forwarded to the Xymon server.
# A child class should (at least) override method BuildMessage with a version
# specific for the Xymon test. Thus at least one child class is needed for each
# test type mentioned in list MqttXymon.
#
class XymonTest:
  XyColour= { 'green': 0, 'yellow': 1, 'red': 2 }       # Class read-only variable

 #
 # Method __init__ presets the instance variables upon creation of an object.
 # They are partly derived from table MqttXymon.
 #
  def __init__( self, mqxy ):
  # Define the parameters related to Xymon.
    self.Host  = mqxy['XymonHost']
    self.Port  = mqxy['XymonPort']
    self.Source= mqxy['XymonSrc' ]
    self.Test  = mqxy['XymonTest']
    self.Time  = None
    self.Colour= 'green'
    self.Error = {}
    for t in ('red','yellow'):
      self.Error[t]= []
  # Define the parameters related to the receipt of measurements via MQTT.
    self.Topic = {}
    for t in mqxy['Topic']:
      self.Topic[t]= None
    self.Topics= len( self.Topic )
    self.Msgs  = 0
    self.Timer = watchdog.WatchdogTimer( mqtt_collect_timer, self._timeout )

 #
 # Private method _timeout is a call-back routine. It is invoked if the watchdog
 # timer expires. Upon expiration, the collected information is sent to Xymon,
 # although not all expected information is received.
 #
  def _timeout( self ):
    for key in self.Topic:
      if self.Topic[key] is None:
        self.Error['yellow'].append( 'Missing topic: {}'.format(key) )
  #
    self.InformXymon( self.BuildMessage() )
    self.ClearTest()

 #
 # Private method _update_colour updates the status (colour) of the xymon test.
 # The status is set to the worst of the current status and the supplied status.
 #
  def _update_colour( self, clr ):
    assert clr in self.XyColour, 'Unsupported colour: {}'.format(clr)
    if self.XyColour[clr] > self.XyColour[self.Colour]:
      self.Colour= clr

 #
 # Method BuildMessage builds the message to be sent to the Xymon server. This
 # version converts the list of error messages and warnings into a string. At
 # the same time, the test status is updated.
 #
  def BuildMessage( self ):
    XyMsg= ''
    self.Colour= 'green'
    for colour in ('red','yellow'):
      if self.Error[colour]:
        self._update_colour( colour )
        for m in self.Error[colour]:
          XyMsg+= "&{} : {}\n".format( colour, m )
    return XyMsg

 #
 # Method ClearTest presets the instance variables for the next measurement.
 #
  def ClearTest( self ):
    if self.Timer.is_alive():
      self.Timer.stop()                 # Inhibit timeout
    for key in self.Topic:              # Clear the measurements
      self.Topic[key]= None
    self.Msgs= 0
 #
    for key in self.Error:              # Remove any error messages
      self.Error[key]= []

 #
 # Method InformXymon takes a formatted message for the Xymon server, prepends
 # the status message header and sends the resulting message to the Xymon
 # server. The method returns True if sending the message succeeded, otherwise
 # it will return a False value.
 #
  def InformXymon( self, msg ):
    XyPars= { 'host': self.Source, 'test': self.Test, 'colour': self.Colour,
              'message': msg }
    XyTime= int( time.time() )  if self.Time is None else int( self.Time )
    XyTime= datetime.datetime.fromtimestamp(XyTime).isoformat( sep=' ' )
    XyPars['time']= XyTime
    XyMsg = '''status {host}.{test} {colour} {time}
{message}'''.format( **XyPars )
  #
    try:
      s= socket.socket( socket.AF_INET, socket.SOCK_STREAM )
    except socket.error as msg:
      return False
  #
    if not re.search( '^[\d\.]+$', self.Host ):
      try:
        self.Host= socket.gethostbyname( XyServ )
      except socket.gaierror:
        return False
  #
    try:
      s.connect( (self.Host,self.Port) )
      s.sendall( XyMsg.encode() )
      s.close()
      return True
    except socket.error as msg:
      s.close()
      return False

 #
 # Method Update handles the next message received from the MQTT broker. The
 # watchdog timer is started upon receipt of the first topic message. If all the
 # expected topic messages are received, a Xymon message is generated and send
 # to the Xymon server.
 #
  def Update( self, uts, tpc, msg ):
    assert tpc in self.Topic, 'Unknown topic "{}"'.format(tpc)
  #
    if self.Topic[tpc] is None:
      if self.Msgs == 0:
        self.Timer.reset()
        self.Time= uts
      else:
        self.Time= min( self.Time, uts )
      self.Topic[tpc]= msg
      self.Msgs+= 1
      if self.Msgs == self.Topics:
        XymMsg= self.BuildMessage()
        self.InformXymon( XymMsg )
        self.ClearTest()
  #
    else:
      if self.Topic[tpc] != msg:
        self.Error['yellow'].extend( 'Topic received multiple times: topic={},'
          ' old value={}, new value={}\n'.format(tpc,self.Topic[tpc],msg) )
        self.Topic[tpc]= msg


#
# Xymon test class definitions.
# =============================
#
# Class XymonTestSys reports some statistics of the MQTT broker to the Xymon
# server.
#
class XymonTestSys( XymonTest ):

 #
 # Method BuildMessage shows the totals as received from the MQTT broker. The
 # graphs generated from these totals show both the message rate and the bit
 # rate.
 #
  def BuildMessage( self ):
    TblHdr= '<table border=1 cellpadding=5>'
    TblRow= ' <tr> <td>{}, {}</td> <td align="right">{}</td> </tr>\n'

    OctRcv= self.Topic['$SYS/broker/bytes/received']
    OctXmt= self.Topic['$SYS/broker/bytes/sent']
    MsgRcv= self.Topic['$SYS/broker/messages/received']
    MsgXmt= self.Topic['$SYS/broker/messages/sent']
    MsgSav= self.Topic['$SYS/broker/messages/stored']

    XyMsg = '<b>MQTT broker statistics</b>\n'
    XyMsg+= super().BuildMessage()      # Include error messages
    XyMsg+= '\n'
    XyMsg+= '<b>Total counts</b>\n'
    XyMsg+= TblHdr
    XyMsg+= ' <tr> <th>Counter</th> <th>Total</th> </tr>\n'
    XyMsg+= TblRow.format( 'Octets'  , 'received'   , OctRcv )
    XyMsg+= TblRow.format( 'Octets'  , 'transmitted', OctXmt )
    XyMsg+= TblRow.format( 'Messages', 'received'   , MsgRcv )
    XyMsg+= TblRow.format( 'Messages', 'transmitted', MsgXmt )
    XyMsg+= TblRow.format( 'Messages', 'stored'     , MsgSav )
    XyMsg+= '</table>\n'

    if OctRcv is None:  Octrcv= 'U'
    if OctXmt is None:  OctXmt= 'U'
    if MsgRcv is None:  Msgrcv= 'U'
    if MsgXmt is None:  MsgXmt= 'U'
    if MsgSav is None:  MsgSav= 'U'

    XyMsg+= '<!-- linecount=1 -->\n'
    XyMsg+= '<!--DEVMON RRD: mqtt.broker 0 0\n'
    XyMsg+= 'DS:Rcv:DERIVE:600:0:U DS:Xmt:DERIVE:600:0:U\n'
    XyMsg+= 'octet {}:{}\n'.format( OctRcv, OctXmt )
    XyMsg+= '-->\n'
    XyMsg+= '<!--DEVMON RRD: mqtt.broker 0 0\n'
    XyMsg+= 'DS:Rcv:DERIVE:600:0:U DS:Xmt:DERIVE:600:0:U DS:Sav:GAUGE:600:0:U\n'
    XyMsg+= 'message {}:{}:{}\n'.format( MsgRcv, MsgXmt, MsgSav )
    XyMsg+= '-->'

    return XyMsg


#
# Thread definitions.
# ===================
#
# Of each of the following classes at most one instance may be created.
#
# Class Receiver receives the MQTT messages. The primary function of this class
# is to provide a queue to buffer the incoming messages. The secondary function
# is to move the messages to the XymonTest instances which will actually do
# something with the message.
#
class Receiver( StoppableThread ):

  def __init__( self ):
    super().__init__()
    self.name = 'Receiver'

 #
 # Method run subscribes to all topics mentioned in 'continuous' mode tests. It
 # receives the messages from the broker and uses table dispatch to send them to
 # their destinations.
 #
  def run( self ):
    global mqtt_broker
    self.LogMessage( 'Starting thread' )
  #
  # Subscribe to the topics defined in table MqttXymon. To subscribe, a list of
  # all topics of interest is needed.
  #
    sublot= []                          # To subscribe, list of topics
    for mqxy in MqttXymon:
      if mqxy['Mode'] != 'continuous':  continue
      if mqtt_broker is None  or  mqtt_broker == mqxy['MqttHost']:
        mqtt_broker= mqxy['MqttHost']
        for t in mqxy['Topic']:
          if t not in sublot:
            sublot.append( t )
      else:
        self.LogMessage( 'Ignore test {}/{}'.format(mqxy['XymonSrc'],mqxy['XymonTest']) )
        self.LogMessage( '  No support yet for multiple MQTT brokers' )
  #
  # Create a connection to the MQTT broker.
  #
    if len(sublot) == 0:
      self.client= None
    else:
      self.client= mqtt.Client( 'xymon.client.a.'+socket.gethostname() )
      self.client.on_connect   = on_connect
      self.client.on_disconnect= on_disconnect
      self.client.on_message   = on_message
      self.client.connect( mqtt_broker )
  #
  # Create tuples containing (topic,qos), in order to subscribe to all topics in
  # one call to subscribe.
  #
      for i,t in enumerate(sublot):
        sublot[i]= ( t, 0 )
      self.client.subscribe( sublot )
      self.client.loop_start()
  #
  # Wait for messages to arrive. Each message is forwarded to the instances of
  # (children of) class XymonTest, which will actually do something with the
  # message.
  #
    while not self.stopped():
      (cid,msg,uts)= mqttq.get()
      if msg is None:  break
  #
      topic= msg.topic
      payld= str( msg.payload.decode("utf-8") )
      if topic in dispatch:
        for xt in dispatch[topic]:
          xt.Update( uts, topic, payld )
      else:
        self.LogMessage( 'Unexpected message, topic {}'.format(topic) )
  #
  # Clean up and exit.
  #
    if self.client is not None:
      self.client.loop_stop()
      self.client.disconnect()
    self.LogMessage( 'Stopping thread' )

 #
 # Method stop stops this thread. A dummy message is pushed onto the input
 # queue, to unblock the blocking call to queue.get in method Receiver.run.
 #
  def stop( self ):
    super().stop()
    mqttq.put( (None,None,None) )


#
# Class Periodic periodically schedules a subscription to the topics defined in
# entries in table MqttXymon with mode being 'periodic'. The messages in these
# topics should all have attribute 'retained' set, thus at subscription the
# latest status (value) of each topic is sent. Those messages will initially be
# handled by thread Receiver, which forwards them to the appropriate instance of
# XymonTest.
#
class Periodic( StoppableThread ):

  def __init__( self ):
    super().__init__()
    self.name= 'Periodic'
    self.done= threading.Event()
#   self.done.clear()

  def _on_connect( self, client, userdata, flags, rc ):
    if rc != 0:
      self.LogMessage( 'Connection to broker failed' )
      client.disconnect()               # Clean up

  def _on_disconnect( self, client, userdata, rc ):
    if rc != 0:                         # Exclude response on disconnect()
      self.LogMessage( 'Unexpected disconnect from broker' )

  def _on_message( self, client, userdata, message ):
    if self.done.is_set():  return      # Ignore additional messages
  #
    now= time.time()                    # Time of receipt of message
    self.msgs-= 1                       # Decrement expected count of messages
    if self.msgs == 0 :  self.done.set()
    mqttq.put( (client,message,now) )

  def run( self ):
    self.LogMessage( 'Starting thread' )
  #
    while not self.stopped():
      for mqxy in MqttXymon:
        if mqxy['Mode'] != 'periodic':  continue
        self.subscribe( mqxy )
      self.Wait( 300, 0 )
  #
    self.LogMessage( 'Stopping thread' )

  def subscribe( self, mqxy ):
    self.lot= []
    for t in mqxy['Topic']:
      self.lot.append( (t,0) )
    self.msgs= len( mqxy['Topic'] )
    self.done.clear()
  #
    client= mqtt.Client( 'xymon.client.b.'+socket.gethostname() )
    client.on_connect   = self._on_connect
    client.on_disconnect= self._on_disconnect
    client.on_message   = self._on_message
    client.connect( mqtt_broker )
    client.subscribe( self.lot )
  #
    client.loop_start()                 # Start thread to read messages
    rc= self.done.wait( mqtt_collect_timer ) # Wait till all messages are received
    client.loop_stop()                  # Stop thread
    client.disconnect()                 # Stop subscriber session


MainThread= threading.Event()           # Set to stop this script

#
# Call-back HandleSignal is invoked whenever a SIGINT or SIGTERM signal arrives.
# It reports the receipt of the signal and sets the flag to stop this script.
#
def HandleSignal( signum, frame ):
  MainThread.set()                      # Set flag to stop script
  syslog.openlog( 'XyMqtt', 0, syslog.LOG_LOCAL6 )
  syslog.syslog ( 'Termination signal {} received'.format(signum) )
  syslog.closelog()

#
# Main program.
# =============
#
# Set up handling of termination signals.
#
signal.signal( signal.SIGINT , HandleSignal )
signal.signal( signal.SIGTERM, HandleSignal )

#
# Build the instances of the classes which are to handle the messages received
# from MQTT.
#
for mqxy in MqttXymon:
  aclass= mqxy['XymonClss']
  assert aclass in globals(), 'Unknown XymonTest class name: {}'.format(aclass)
  mqxy['Handler']= globals()[aclass]( mqxy )

#
# Build a routing table for the messages received from MQTT.
#
for mqxy in MqttXymon:
  for topic in mqxy['Topic']:
    if topic not in dispatch:
      dispatch[topic]= []
    dispatch[topic].append( mqxy['Handler'] )

#
# Start the threads.
#
threads= []                             # List of long lived threads
th0= Receiver() ;  threads.append(th0) ;  th0.start()
th1= Periodic() ;  threads.append(th1) ;  th1.start()

#
# Monitor the state of the threads of this script. If one thread dies or if
# event MainThread is set, all (other) threads, including this main thread,
# should stop (too).
#
while len(threads) > 0:
  try:
    all_alive= True                     # See if all threads are currently alive
    for t in threads:
      all_alive= all_alive and t.is_alive()
    if all_alive:
      MainThread.wait( 10 )             # If so, wait some time

    if not all_alive  or  MainThread.is_set():
      for t in reversed( threads ):     # Note the order of this loop
        if t.is_alive():
          t.stop()
        t.join()
        threads.remove( t )             # Thread has stopped

  except KeyboardInterrupt:
    MainThread.set()                    # Set flag to stop this script

Module watchdog.py and script submqttm.py are to be installed in the same directory.

Show Code ⇲

Hide Code ⇱

#
# WatchdogTimer.py
#
# This module contains the class definition of a simple watchdog timer. The
# timer can be started, stopped and reset. If the timer expires, the supplied
# call back function is invoked.
#
# Written by W.J.M. Nelis, wim.nelis@ziggo.nl, 2019.01
#
# Note: a timer is a separate thread. After invoking method cancel, the timer is
#   stopped (timer.finished.is_set() == True) but the timer thread may still be
#   alive (timer.is_alive() == True). The time between invoking cancel and the
#   timer thread becoming inactive should be small. To make sure that method
#   is_alive will never return True while the timer is stopped, each call to
#   method cancel is followed by a call to method join.
#
# Note: Method Threading.Timer.run() invokes the user call-back function upon
#   expiration of the timer. Upon invocation of the user call-back function,
#   event timer.finished in not yet set, it will be set upon return. See URL
#   https://github.com/python/cpython/blob/master/Lib/threading.py. Experience
#   shows that if a join is attempted while the timer has expired, exception
#   RunTimeError might be raised with message "cannot join current thread". Thus
#   this problem might occur in method WatchdogTimer.stop. It is locally solved
#   with two modifications: (A) by intercepting the call-back and call
#   timer.finished.set() before the user call-back function is invoked and (B)
#   by extending method WatchdogTimer.is_alive() to return value False if the
#   timer has expired.
#   It is probably better to set event timer.finished in module threading.py
#   (see aforementioned URL) just before invoking the call-back function.
#
from threading import Timer

#
# Define a simple watchdog timer. A specific exception subclass is defined to
# handle a time-out for which no handler is defined.
#
class WdtTimeoutException( Exception ):
  '''An unhandled time-out of a watchdog timer.'''

class WatchdogTimer:
  '''A simple, stoppable and restartable watchdog timer.'''
  def __init__( self, to=None, cb=None ):
    self.timeout= to                    # Time out value [s]
    self.handler= cb                    # Call back function, parameter-less
    self.timer  = Timer( 0, True )      # Dummy timer object instance

 #
 # Private method and call-back function _handler handles an expiration of the
 # timer. The internal timer state is updated, and the user supplied call-back
 # function is invoked.
 #
  def _handler( self ):                 # Default time-out handler
    self.timer.finished.set()           # Timer has stopped
    if self.handler is None:
      raise WdtTimeoutException
    else:
      self.handler()                    # Invoke user handler

 #
 # Private method _start contains the common part of methods start and reset. It
 # stops a timer if there is an active one and starts a new timer using the
 # parameters saved in the object.
 #
  def _start( self ):
    if self.is_alive():
      self.timer.cancel()               # Stop timer
      self.timer.join()                 # Wait for timer thread to finish

    self.timer= Timer( self.timeout, self._handler )
    self.timer.start()
    return True

 #
 # Method is_alive returns True if the timer is running, False otherwise.
 #
  def is_alive( self ):
    return (not self.timer.finished.is_set())  and  self.timer.is_alive()

 #
 # Method reset stops the timer if it is running, and creates and starts a new
 # one using the parameters passed to the previous invocation of method start.
 # If previously no timer was defined, this method does nothing and returns
 # False, while it returns True if the timer is restarted.
 #
  def reset( self ):                    # Reset a running timer
    if self.timeout is None:
      return False                      # Error: no timer defined
    return self._start()                # Stop if necessary and start a timer

 #
 # Method start starts a new timer. If there was a timer already running, it is
 # stopped without further notice. The returned value is False if no timer is
 # started (because no timeout is specified), otherwise the returned value is
 # True.
 #
  def start( self, Timeout, Handler=None ):     # Start a timer
    if Timeout is None:                 # Check for an illegal value
      return False
    self.timeout= Timeout               # Save parameters
    self.handler= Handler
    return self._start()                # Start a new timer

 #
 # Method stop stops the timer if it is running. It returns True if the timer is
 # stopped, False if the timer already is expired.
 #
  def stop( self ):                     # Stop a timer
    if self.is_alive():
      self.timer.cancel()
      self.timer.join()
      return True
    else:
      return False

A limitation is that the MQTT sources with mode equal to 'continuous' all should specify the same MQTT broker.

Support for multiple MQTT brokers.

  • 2020-11-07
    • Initial release
  • monitors/submqttm.txt
  • Last modified: 2020/11/13 12:48
  • by wnelis