====== submqttm ====== ^ Author | [[ wim.nelis@ziggo.nl | Wim Nelis ]] | ^ Compatibility | Xymon 4.2 | ^ Requirements | Python3 | ^ Download | None | ^ Last Update | 2020-11-07 | ===== Description ===== Script submqttm.py (short for SUBscribe_to_MQTT_Measurements) is a skeleton for one or more Xymon tests, which receive their measurements via MQTT. An abstract class XymonTest is defined, which handles the receipt of the topics, the detection that all required information is received, and the transmission of the Xymon message to the Xymon server. Per Xymon test a derived class needs to be defined, containing (at least) method BuildMessage, which transforms the measurements received from MQTT into a message to be sent to Xymon. The script contains a receiver thread which subscribes to all required topics, and which calls method XymonTest.Update to handle them. List MqttXymon near the start of the script contains the mapping between MQTT topics and Xymon tests. As distributed, the script contains a single Xymon test, which shows the incoming (publish) and outgoing (subscribe) message rate as well as the incoming and outgoing octet rate of the MQTT broker. This script was developed to handle multiple small, low power sensors based on an ESP8266 micro-controller. As multiple destinations for the collected data are expected, and as MQTT provides for a simple transfer and distribution mechanism, these micro-controllers publish their measurements using MQTT. ===== Installation ===== === Client side === The source modules submqttm.py and watchdog.py shown in the next chapter are to be installed in directory /usr/lib/xymon/client/ext. Script submqttm.py needs to be running continuously, as the topics can arrive at any time. Thus xymonlaunch is //not// used. In stead, a service is defined which will start script submqttm.py at system start. Service submqttm is defined by creating a file named /etc/systemd/system as super user. An example of the content of this file is shown below. [Unit] Description=Xymon client with MQTT input [Service] Type=simple User=xymon Group=xymon ExecStart=/usr/lib/xymon/client/ext/submqttm.py WorkingDirectory=/usr/lib/xymon/client/ext StandardOutput=null StandardError=null Restart=on-abort [Install] WantedBy=multi-user.target Once file submqttm.service is created, the following two commands are needed to activate the script: sudo systemctl start submqttm sudo systemctl enable submqttm Note that though the script is running continuously, the tests with mode 'periodic' are run once every 5 minutes. === Server side === For the initial client, which monitors the throughput of one MQTT broker, two extensions of the xymon server configuration are needed. First, xymonserver.cfg should be extended. This is achieved by adding file mqtt-thr.cfg in directory /usr/lib/xymon/server/etc/xymonserver.d. It' contents is: TEST2RRD+=",mqtt=devmon" GRAPHS_mqtt="mqtttm,mqttto" Secondly, graphs.cfg should be extended with the definitions of the graphs. In directory /usr/lib/xymon/server/etc/graphs.d (also) a file named mqtt-thr.cfg is created containing the following definitions: [mqtttm] TITLE , MQTT message rate YAXIS Throughput [p/s] -l 0 DEF:mr=mqtt.broker.message.rrd:Rcv:AVERAGE DEF:mx=mqtt.broker.message.rrd:Xmt:AVERAGE LINE1:mr#0000FF:ingress GPRINT:mr:MIN:Min \: %5.1lf %sp/s GPRINT:mr:MAX:Max \: %5.1lf %sp/s GPRINT:mr:AVERAGE:Avg \: %5.1lf %sp/s GPRINT:mr:LAST:Cur \: %5.1lf %sp/s\n LINE1:mx#00FF00:egress GPRINT:mx:MIN: Min \: %5.1lf %sp/s GPRINT:mx:MAX:Max \: %5.1lf %sp/s GPRINT:mx:AVERAGE:Avg \: %5.1lf %sp/s GPRINT:mx:LAST:Cur \: %5.1lf %sp/s\n [mqttto] TITLE , MQTT bit rate YAXIS Throughput [b/s] -l 0 DEF:or=mqtt.broker.octet.rrd:Rcv:AVERAGE DEF:ox=mqtt.broker.octet.rrd:Xmt:AVERAGE CDEF:br=or,8,* CDEF:bx=ox,8,* LINE1:br#0000FF:ingress GPRINT:br:MIN:Min \: %5.1lf %sb/s GPRINT:br:MAX:Max \: %5.1lf %sb/s GPRINT:br:AVERAGE:Avg \: %5.1lf %sb/s GPRINT:br:LAST:Cur \: %5.1lf %sb/s\n LINE1:bx#00FF00:egress GPRINT:bx:MIN: Min \: %5.1lf %sb/s GPRINT:bx:MAX:Max \: %5.1lf %sb/s GPRINT:bx:AVERAGE:Avg \: %5.1lf %sb/s GPRINT:bx:LAST:Cur \: %5.1lf %sb/s\n A small note: typically I'll end each TITLE-line with two spaces. This results in a bigger separation between the title and the date-range, which improves the readability of the top line in the graph. Those two trailing spaces are missing in the above definition of the graphs. ===== Source ===== ==== submqttm.py ==== #!/usr/bin/python3 # # submqttm.py, SUBscribe_MQTT_Measurements: # This script implements a Xymon client, which receives measurements through an # MQTT broker. Abstract class XymonTest contains all methods to receive the # measurements and send the result to the Xymon server, except for the method to # build the actual message from the received measurements. This script needs to # be extended with a derived class of XymonTest for each new measurement. The # derived class must implement at least a (tailored) version of method # BuildMessage. # # This script already contains one Xymon client, which takes some statistics # gathered by the MQTT broker and reports them to the Xymon server. # # To do: # - Thread Receiver: make list of topics per broker, open a session for each # broker. # import datetime import paho.mqtt.client as mqtt import queue import re import signal import socket import syslog import threading import time import watchdog # # Installation constants. # ======================= # # MQTT. # mqtt_collect_timer = 0.5 # Max time span to receive all topics [s] # # Define the mapping between the received messages from the MQTT broker on one # hand and tests within Xymon on the other hand. # # If the mode is 'continuous', it is assumed that for each of the topics one, # and only one, message is sent to the broker within a short time frame, # typically within a few milliseconds. They do not have attribute 'retain' set. # Some time later, this process repeats, that is another full update is sent. # This mode of operation is context free, as there is no need to remember any # previous result. # # If the mode is 'periodic', it is assumed that the messages all have attribute # 'retain' set, and that all of them are sent by the broker upon subscribing to # those topics. Thus by subscribing for a short time when a Xymon status message # is to be sent, and terminating the subscription once the required information # is retrieved, there is no need to accumulate and remember the latest value for # each topic within instances of this class. # In case of topic tree $SYS, a periodic new subscription is used for an # additional reason: this will minimize the number of messages sent, and thus # the 'pollution' of the statistics caused by retrieving the statistics. # # Note: In the topics, wildcards '+' and '#' should not be used. # MqttXymon= [ # Throughput statistics of the MQTT broker { 'Mode' : 'periodic', 'MqttHost' : '', 'Topic' : ( '$SYS/broker/bytes/received' , '$SYS/broker/bytes/sent' , '$SYS/broker/messages/received', '$SYS/broker/messages/sent' , '$SYS/broker/messages/stored' ), 'XymonClss': 'XymonTestSys', 'XymonHost': '', 'XymonPort': 1984, 'XymonSrc' : '', 'XymonTest': 'mqtt', 'Handler' : None }, ] # # Global variables. # ================= # mqtt_broker= None # Source of MQTT messages dispatch= {} # Destinations of MQTT messages mqttq = queue.Queue() # First buffer of MQTT messages # # Call-back function definitions. # =============================== # # Call-back function on_connect is invoked if the connection status to the # broker is established. If successful, the value of return code rc will be # zero. If the connection failed. the program will be stopped. # def on_connect( client, userdata, flags, rc ): if rc != 0: MainThread.set() # Stop this program client.disconnect() # Clean up # # Call-back function on_disconnect is invoked if the session with the broker is # disconnected. If this event is unexpected, a request to terminate this program # is posted. # def on_disconnect( client, userdata, rc ): if rc != 0: # Exclude response on disconnect() MainThread.set() # Stop this program # # Call-back function on_message is invoked if a message from a subscription # session with the broker is received. The message is moved to the queue of # thread Receiver. # def on_message( client, userdata, message ): now= time.time() # Time of receipt of message mqttq.put( (client,message,now) ) # # Generic class definitions. # ========================== # # Abstract class StoppableThread adds Event _stop_event to the thread and # methods to set and check this event. Moreover, methods LogMessage and Wait # (the latter is an extended version of time.sleep) are added, as they are # needed by almost all subclasses. # class StoppableThread( threading.Thread ): """Thread class with a stop() method. The thread itself has to check regularly for the stopped() condition.""" def __init__( self ): super().__init__() self._stop_event = threading.Event() # # Method LogMessage sends a message to the local syslog server. # def LogMessage( self, Msg ): syslog.openlog( 'XyMqtt', 0, syslog.LOG_LOCAL6 ) syslog.syslog ( ' '.join( (self.name,Msg) ) ) syslog.closelog() def join( self, TimeOut=None ): self.stop() super().join( TimeOut ) def stop( self ): self._stop_event.set() def stopped( self ): return self._stop_event.is_set() def wait( self, timeout ): return self._stop_event.wait( timeout ) # # Method Wait waits until the (Unix) time stamp reaches the next integer # multiple of Period seconds and then another Delay seconds. However, if the # thread method stop is invoked before the time has expired, this method will # return at that time. # Parameters Period and Delay are integer numbers, with Period >= 2 and # 0 <= Delay < Period. # def Wait( self, Period, Delay ): Now= time.time() ActTim= int( Now ) ActTim= ( (ActTim+Period-1) // Period ) * Period SlpTim= int( ActTim - Now ) + Delay if SlpTim < 1.5: SlpTim+= Period self.wait( SlpTim ) # # Abstract class XymonTest defines the collector for a specific set of MQTT # messages which are needed for one Xymon test. Once all messages are received, # the status message is generated and forwarded to the Xymon server. # A child class should (at least) override method BuildMessage with a version # specific for the Xymon test. Thus at least one child class is needed for each # test type mentioned in list MqttXymon. # class XymonTest: XyColour= { 'green': 0, 'yellow': 1, 'red': 2 } # Class read-only variable # # Method __init__ presets the instance variables upon creation of an object. # They are partly derived from table MqttXymon. # def __init__( self, mqxy ): # Define the parameters related to Xymon. self.Host = mqxy['XymonHost'] self.Port = mqxy['XymonPort'] self.Source= mqxy['XymonSrc' ] self.Test = mqxy['XymonTest'] self.Time = None self.Colour= 'green' self.Error = {} for t in ('red','yellow'): self.Error[t]= [] # Define the parameters related to the receipt of measurements via MQTT. self.Topic = {} for t in mqxy['Topic']: self.Topic[t]= None self.Topics= len( self.Topic ) self.Msgs = 0 self.Timer = watchdog.WatchdogTimer( mqtt_collect_timer, self._timeout ) # # Private method _timeout is a call-back routine. It is invoked if the watchdog # timer expires. Upon expiration, the collected information is sent to Xymon, # although not all expected information is received. # def _timeout( self ): for key in self.Topic: if self.Topic[key] is None: self.Error['yellow'].append( 'Missing topic: {}'.format(key) ) # self.InformXymon( self.BuildMessage() ) self.ClearTest() # # Private method _update_colour updates the status (colour) of the xymon test. # The status is set to the worst of the current status and the supplied status. # def _update_colour( self, clr ): assert clr in self.XyColour, 'Unsupported colour: {}'.format(clr) if self.XyColour[clr] > self.XyColour[self.Colour]: self.Colour= clr # # Method BuildMessage builds the message to be sent to the Xymon server. This # version converts the list of error messages and warnings into a string. At # the same time, the test status is updated. # def BuildMessage( self ): XyMsg= '' self.Colour= 'green' for colour in ('red','yellow'): if self.Error[colour]: self._update_colour( colour ) for m in self.Error[colour]: XyMsg+= "&{} : {}\n".format( colour, m ) return XyMsg # # Method ClearTest presets the instance variables for the next measurement. # def ClearTest( self ): if self.Timer.is_alive(): self.Timer.stop() # Inhibit timeout for key in self.Topic: # Clear the measurements self.Topic[key]= None self.Msgs= 0 # for key in self.Error: # Remove any error messages self.Error[key]= [] # # Method InformXymon takes a formatted message for the Xymon server, prepends # the status message header and sends the resulting message to the Xymon # server. The method returns True if sending the message succeeded, otherwise # it will return a False value. # def InformXymon( self, msg ): XyPars= { 'host': self.Source, 'test': self.Test, 'colour': self.Colour, 'message': msg } XyTime= int( time.time() ) if self.Time is None else int( self.Time ) XyTime= datetime.datetime.fromtimestamp(XyTime).isoformat( sep=' ' ) XyPars['time']= XyTime XyMsg = '''status {host}.{test} {colour} {time} {message}'''.format( **XyPars ) # try: s= socket.socket( socket.AF_INET, socket.SOCK_STREAM ) except socket.error as msg: return False # if not re.search( '^[\d\.]+$', self.Host ): try: self.Host= socket.gethostbyname( XyServ ) except socket.gaierror: return False # try: s.connect( (self.Host,self.Port) ) s.sendall( XyMsg.encode() ) s.close() return True except socket.error as msg: s.close() return False # # Method Update handles the next message received from the MQTT broker. The # watchdog timer is started upon receipt of the first topic message. If all the # expected topic messages are received, a Xymon message is generated and send # to the Xymon server. # def Update( self, uts, tpc, msg ): assert tpc in self.Topic, 'Unknown topic "{}"'.format(tpc) # if self.Topic[tpc] is None: if self.Msgs == 0: self.Timer.reset() self.Time= uts else: self.Time= min( self.Time, uts ) self.Topic[tpc]= msg self.Msgs+= 1 if self.Msgs == self.Topics: XymMsg= self.BuildMessage() self.InformXymon( XymMsg ) self.ClearTest() # else: if self.Topic[tpc] != msg: self.Error['yellow'].extend( 'Topic received multiple times: topic={},' ' old value={}, new value={}\n'.format(tpc,self.Topic[tpc],msg) ) self.Topic[tpc]= msg # # Xymon test class definitions. # ============================= # # Class XymonTestSys reports some statistics of the MQTT broker to the Xymon # server. # class XymonTestSys( XymonTest ): # # Method BuildMessage shows the totals as received from the MQTT broker. The # graphs generated from these totals show both the message rate and the bit # rate. # def BuildMessage( self ): TblHdr= '' TblRow= ' \n' OctRcv= self.Topic['$SYS/broker/bytes/received'] OctXmt= self.Topic['$SYS/broker/bytes/sent'] MsgRcv= self.Topic['$SYS/broker/messages/received'] MsgXmt= self.Topic['$SYS/broker/messages/sent'] MsgSav= self.Topic['$SYS/broker/messages/stored'] XyMsg = 'MQTT broker statistics\n' XyMsg+= super().BuildMessage() # Include error messages XyMsg+= '\n' XyMsg+= 'Total counts\n' XyMsg+= TblHdr XyMsg+= ' \n' XyMsg+= TblRow.format( 'Octets' , 'received' , OctRcv ) XyMsg+= TblRow.format( 'Octets' , 'transmitted', OctXmt ) XyMsg+= TblRow.format( 'Messages', 'received' , MsgRcv ) XyMsg+= TblRow.format( 'Messages', 'transmitted', MsgXmt ) XyMsg+= TblRow.format( 'Messages', 'stored' , MsgSav ) XyMsg+= '
{}, {} {}
Counter Total
\n' if OctRcv is None: Octrcv= 'U' if OctXmt is None: OctXmt= 'U' if MsgRcv is None: Msgrcv= 'U' if MsgXmt is None: MsgXmt= 'U' if MsgSav is None: MsgSav= 'U' XyMsg+= '\n' XyMsg+= '\n' XyMsg+= '' return XyMsg # # Thread definitions. # =================== # # Of each of the following classes at most one instance may be created. # # Class Receiver receives the MQTT messages. The primary function of this class # is to provide a queue to buffer the incoming messages. The secondary function # is to move the messages to the XymonTest instances which will actually do # something with the message. # class Receiver( StoppableThread ): def __init__( self ): super().__init__() self.name = 'Receiver' # # Method run subscribes to all topics mentioned in 'continuous' mode tests. It # receives the messages from the broker and uses table dispatch to send them to # their destinations. # def run( self ): global mqtt_broker self.LogMessage( 'Starting thread' ) # # Subscribe to the topics defined in table MqttXymon. To subscribe, a list of # all topics of interest is needed. # sublot= [] # To subscribe, list of topics for mqxy in MqttXymon: if mqxy['Mode'] != 'continuous': continue if mqtt_broker is None or mqtt_broker == mqxy['MqttHost']: mqtt_broker= mqxy['MqttHost'] for t in mqxy['Topic']: if t not in sublot: sublot.append( t ) else: self.LogMessage( 'Ignore test {}/{}'.format(mqxy['XymonSrc'],mqxy['XymonTest']) ) self.LogMessage( ' No support yet for multiple MQTT brokers' ) # # Create a connection to the MQTT broker. # if len(sublot) == 0: self.client= None else: self.client= mqtt.Client( 'xymon.client.a.'+socket.gethostname() ) self.client.on_connect = on_connect self.client.on_disconnect= on_disconnect self.client.on_message = on_message self.client.connect( mqtt_broker ) # # Create tuples containing (topic,qos), in order to subscribe to all topics in # one call to subscribe. # for i,t in enumerate(sublot): sublot[i]= ( t, 0 ) self.client.subscribe( sublot ) self.client.loop_start() # # Wait for messages to arrive. Each message is forwarded to the instances of # (children of) class XymonTest, which will actually do something with the # message. # while not self.stopped(): (cid,msg,uts)= mqttq.get() if msg is None: break # topic= msg.topic payld= str( msg.payload.decode("utf-8") ) if topic in dispatch: for xt in dispatch[topic]: xt.Update( uts, topic, payld ) else: self.LogMessage( 'Unexpected message, topic {}'.format(topic) ) # # Clean up and exit. # if self.client is not None: self.client.loop_stop() self.client.disconnect() self.LogMessage( 'Stopping thread' ) # # Method stop stops this thread. A dummy message is pushed onto the input # queue, to unblock the blocking call to queue.get in method Receiver.run. # def stop( self ): super().stop() mqttq.put( (None,None,None) ) # # Class Periodic periodically schedules a subscription to the topics defined in # entries in table MqttXymon with mode being 'periodic'. The messages in these # topics should all have attribute 'retained' set, thus at subscription the # latest status (value) of each topic is sent. Those messages will initially be # handled by thread Receiver, which forwards them to the appropriate instance of # XymonTest. # class Periodic( StoppableThread ): def __init__( self ): super().__init__() self.name= 'Periodic' self.done= threading.Event() # self.done.clear() def _on_connect( self, client, userdata, flags, rc ): if rc != 0: self.LogMessage( 'Connection to broker failed' ) client.disconnect() # Clean up def _on_disconnect( self, client, userdata, rc ): if rc != 0: # Exclude response on disconnect() self.LogMessage( 'Unexpected disconnect from broker' ) def _on_message( self, client, userdata, message ): if self.done.is_set(): return # Ignore additional messages # now= time.time() # Time of receipt of message self.msgs-= 1 # Decrement expected count of messages if self.msgs == 0 : self.done.set() mqttq.put( (client,message,now) ) def run( self ): self.LogMessage( 'Starting thread' ) # while not self.stopped(): for mqxy in MqttXymon: if mqxy['Mode'] != 'periodic': continue self.subscribe( mqxy ) self.Wait( 300, 0 ) # self.LogMessage( 'Stopping thread' ) def subscribe( self, mqxy ): self.lot= [] for t in mqxy['Topic']: self.lot.append( (t,0) ) self.msgs= len( mqxy['Topic'] ) self.done.clear() # client= mqtt.Client( 'xymon.client.b.'+socket.gethostname() ) client.on_connect = self._on_connect client.on_disconnect= self._on_disconnect client.on_message = self._on_message client.connect( mqtt_broker ) client.subscribe( self.lot ) # client.loop_start() # Start thread to read messages rc= self.done.wait( mqtt_collect_timer ) # Wait till all messages are received client.loop_stop() # Stop thread client.disconnect() # Stop subscriber session MainThread= threading.Event() # Set to stop this script # # Call-back HandleSignal is invoked whenever a SIGINT or SIGTERM signal arrives. # It reports the receipt of the signal and sets the flag to stop this script. # def HandleSignal( signum, frame ): MainThread.set() # Set flag to stop script syslog.openlog( 'XyMqtt', 0, syslog.LOG_LOCAL6 ) syslog.syslog ( 'Termination signal {} received'.format(signum) ) syslog.closelog() # # Main program. # ============= # # Set up handling of termination signals. # signal.signal( signal.SIGINT , HandleSignal ) signal.signal( signal.SIGTERM, HandleSignal ) # # Build the instances of the classes which are to handle the messages received # from MQTT. # for mqxy in MqttXymon: aclass= mqxy['XymonClss'] assert aclass in globals(), 'Unknown XymonTest class name: {}'.format(aclass) mqxy['Handler']= globals()[aclass]( mqxy ) # # Build a routing table for the messages received from MQTT. # for mqxy in MqttXymon: for topic in mqxy['Topic']: if topic not in dispatch: dispatch[topic]= [] dispatch[topic].append( mqxy['Handler'] ) # # Start the threads. # threads= [] # List of long lived threads th0= Receiver() ; threads.append(th0) ; th0.start() th1= Periodic() ; threads.append(th1) ; th1.start() # # Monitor the state of the threads of this script. If one thread dies or if # event MainThread is set, all (other) threads, including this main thread, # should stop (too). # while len(threads) > 0: try: all_alive= True # See if all threads are currently alive for t in threads: all_alive= all_alive and t.is_alive() if all_alive: MainThread.wait( 10 ) # If so, wait some time if not all_alive or MainThread.is_set(): for t in reversed( threads ): # Note the order of this loop if t.is_alive(): t.stop() t.join() threads.remove( t ) # Thread has stopped except KeyboardInterrupt: MainThread.set() # Set flag to stop this script
==== watchdog.py ==== Module watchdog.py and script submqttm.py are to be installed in the same directory. # # WatchdogTimer.py # # This module contains the class definition of a simple watchdog timer. The # timer can be started, stopped and reset. If the timer expires, the supplied # call back function is invoked. # # Written by W.J.M. Nelis, wim.nelis@ziggo.nl, 2019.01 # # Note: a timer is a separate thread. After invoking method cancel, the timer is # stopped (timer.finished.is_set() == True) but the timer thread may still be # alive (timer.is_alive() == True). The time between invoking cancel and the # timer thread becoming inactive should be small. To make sure that method # is_alive will never return True while the timer is stopped, each call to # method cancel is followed by a call to method join. # # Note: Method Threading.Timer.run() invokes the user call-back function upon # expiration of the timer. Upon invocation of the user call-back function, # event timer.finished in not yet set, it will be set upon return. See URL # https://github.com/python/cpython/blob/master/Lib/threading.py. Experience # shows that if a join is attempted while the timer has expired, exception # RunTimeError might be raised with message "cannot join current thread". Thus # this problem might occur in method WatchdogTimer.stop. It is locally solved # with two modifications: (A) by intercepting the call-back and call # timer.finished.set() before the user call-back function is invoked and (B) # by extending method WatchdogTimer.is_alive() to return value False if the # timer has expired. # It is probably better to set event timer.finished in module threading.py # (see aforementioned URL) just before invoking the call-back function. # from threading import Timer # # Define a simple watchdog timer. A specific exception subclass is defined to # handle a time-out for which no handler is defined. # class WdtTimeoutException( Exception ): '''An unhandled time-out of a watchdog timer.''' class WatchdogTimer: '''A simple, stoppable and restartable watchdog timer.''' def __init__( self, to=None, cb=None ): self.timeout= to # Time out value [s] self.handler= cb # Call back function, parameter-less self.timer = Timer( 0, True ) # Dummy timer object instance # # Private method and call-back function _handler handles an expiration of the # timer. The internal timer state is updated, and the user supplied call-back # function is invoked. # def _handler( self ): # Default time-out handler self.timer.finished.set() # Timer has stopped if self.handler is None: raise WdtTimeoutException else: self.handler() # Invoke user handler # # Private method _start contains the common part of methods start and reset. It # stops a timer if there is an active one and starts a new timer using the # parameters saved in the object. # def _start( self ): if self.is_alive(): self.timer.cancel() # Stop timer self.timer.join() # Wait for timer thread to finish self.timer= Timer( self.timeout, self._handler ) self.timer.start() return True # # Method is_alive returns True if the timer is running, False otherwise. # def is_alive( self ): return (not self.timer.finished.is_set()) and self.timer.is_alive() # # Method reset stops the timer if it is running, and creates and starts a new # one using the parameters passed to the previous invocation of method start. # If previously no timer was defined, this method does nothing and returns # False, while it returns True if the timer is restarted. # def reset( self ): # Reset a running timer if self.timeout is None: return False # Error: no timer defined return self._start() # Stop if necessary and start a timer # # Method start starts a new timer. If there was a timer already running, it is # stopped without further notice. The returned value is False if no timer is # started (because no timeout is specified), otherwise the returned value is # True. # def start( self, Timeout, Handler=None ): # Start a timer if Timeout is None: # Check for an illegal value return False self.timeout= Timeout # Save parameters self.handler= Handler return self._start() # Start a new timer # # Method stop stops the timer if it is running. It returns True if the timer is # stopped, False if the timer already is expired. # def stop( self ): # Stop a timer if self.is_alive(): self.timer.cancel() self.timer.join() return True else: return False ===== Known Bugs and Issues ===== A limitation is that the MQTT sources with mode equal to 'continuous' all should specify the same MQTT broker. ===== To Do ===== Support for multiple MQTT brokers. ===== Credits ===== ===== Changelog ===== * **2020-11-07** * Initial release