Implementierung eines MQTT-Clients für reaktive Systeme
MQTT-Reactive ist ein MQTT v3.1.1-Client, der von der MQTT-C-Bibliothek von LiamBindle abgeleitet ist. Das Ziel von MQTT-Reactive ist die Bereitstellung eines portablen und blockierungsfreien MQTT-Clients in C geschrieben, um in reaktiven eingebetteten Systemen eingesetzt zu werden. Dieser Artikel erklärt zunächst, was ein reaktives System ist. Anschließend wird beschrieben, wie eine geeignete Softwarestruktur für diese Art von System entworfen wird. Schließlich zeigt der Artikel, wie Sie die MQTT-Reactive-Bibliothek in einem reaktiven System verwenden, indem Sie eine Zustandsmaschine und das ereignisgesteuerte Paradigma verwenden. Dazu verwendet der Artikel ein reales IoT-Gerät als anschauliches Beispiel, an dem der Artikel dessen Softwarestruktur und zustandsbasiertes Verhalten anhand von UML-Diagrammen wie Zustandsmaschine, Interaktion und Struktur erläutert. Der Artikel enthält auch Richtlinien zur Implementierung des MQTT-Reactive-Clients des IoT-Geräts in der Sprache C.
Viele eingebettete Systeme sind reaktiv, d. h. sie reagieren auf interne oder externe Ereignisse. Sobald diese Reaktionen abgeschlossen sind, geht die Software zurück, um auf das nächste Ereignis zu warten. Aus diesem Grund werden ereignisgesteuerte Systeme alternativ als reaktive Systeme bezeichnet.
Ereignisgesteuerte Programmierung oder reaktive Programmierung ist eines der am besten geeigneten Programmierparadigmen, um eine flexible, vorhersehbare und wartbare Software für reaktive Systeme zu erreichen. In diesem Paradigma wird der Ablauf des Programms durch Ereignisse bestimmt. Häufig besteht die Struktur der reaktiven Software aus mehreren gleichzeitig laufenden Einheiten, sogenannten aktiven Objekten, die verschiedene Arten von Ereignissen abwarten und verarbeiten. Jedes aktive Objekt besitzt einen Kontroll-Thread und eine Ereigniswarteschlange, über die es seine eingehenden Ereignisse verarbeitet. In reaktiven Systemen haben die aktiven Objekte typischerweise ein zustandsbasiertes Verhalten, das in einem Zustandsdiagramm definiert ist.
Um zu untersuchen, wie die MQTT-Reactive-Bibliothek in einem reaktiven System mit mehreren und gleichzeitigen Aufgaben verwendet wird und sowohl eine Zustandsmaschine als auch das ereignisgesteuerte Paradigma verwendet werden, verwenden wir ein IoT-Gerät als Beispiel.
Die Idee, das MQTT-Protokoll zu verwenden, entstand während der Entwicklung eines IoT-Geräts für ein Eisenbahnunternehmen. Dieses Gerät war ein eindeutig reaktives System, das in der Lage war:
- Erkennen und speichern von Änderungen mehrerer digitaler Eingänge
- Erfassen, filtern und speichern Sie mehrere analoge Signale
- Gespeicherte Informationen regelmäßig an einen Remote-Server senden
- Informationen über das MQTT-Protokoll über das GSM-Netzwerk senden und empfangen
MQTT wurde gewählt, weil es ein leichtgewichtiges Messaging-Protokoll auf Publisher-Abonnenten-Basis ist, das häufig in IoT- und Netzwerkanwendungen verwendet wird, bei denen Verbindungen mit hoher Latenz und niedriger Datenrate erwartet werden, wie z. B. in GSM-Netzwerken.
Die MQTT-Fähigkeit für das erwähnte IoT-Gerät wurde durch die Verwendung einer modifizierten Version von LiamBindles MQTT-C erreicht. Da die Software dieses Geräts als reaktive Software konzipiert war, musste MQTT-C modifiziert werden, um es durch den Austausch asynchroner Ereignisse mit dem Rest des Systems zu kommunizieren. Diese Ereignisse wurden verwendet, um Datenverkehr über das Netzwerk zu empfangen und zu senden sowie um sensible Informationen mit einem Server zu verbinden und zu veröffentlichen. Die resultierende Softwarebibliothek wurde MQTT-Reactive genannt.
State-Maschine
MQTT-Reactive wurde durch eine Zustandsmaschine verwendet, wie in Abbildung 1 gezeigt, die das grundlegende Verhalten eines MQTT-Reactive-Clients modelliert. Es war ein aktives Objekt namens MqttMgr (MQTT Manager). Die State-Machine-Aktionen in Abbildung 1 zeigen, wie die MQTT-Reactive-Bibliothek von einem State-Machine verwendet werden könnte. Obwohl in Abbildung 1 die Sprache C als Aktionssprache verwendet wurde, kann jede beliebige Computer- oder formale Sprache verwendet werden.
Klicken für größeres Bild
Abbildung 1. Zustandsmaschine eines MQTT-reaktiven Clients (Quelle:VortexMakes)
Die Zustandsmaschine in Abbildung 1 startet im Zustand WaitingForNetConnection. Nachdem eine Netzwerkverbindung zu einem Server hergestellt wurde, empfängt WaitingForNetConnection das Activate-Ereignis, und der Zustandsautomat geht dann in den WaitingForSync-Zustand über. Nur in diesem Zustand kann die Zustandsmaschine MQTT-Nachrichten bereitstellen, die an den Broker wie CONNECT oder PUBLISH über die Connect- bzw. Publish-Ereignisse gesendet werden. Der Sync-Zustand verwendet den speziellen Mechanismus einer UML zum Zurückstellen des Publish-Ereignisses, das durch das defer-Schlüsselwort angegeben wird, das im internen Bereich des Sync-Zustands enthalten ist. Wenn das Veröffentlichungsereignis eintritt, während die Synchronisierung der aktuelle Status ist, wird es für die zukünftige Verarbeitung gespeichert (zurückgestellt), bis der SM in einen Zustand eintritt, in dem sich das Veröffentlichungsereignis nicht in seiner Liste verzögerter Ereignisse befindet, z. B. WaitingForSync oder WaitingForNetConnection. Beim Eintritt in solche Zustände ruft die Zustandsmaschine automatisch jedes gespeicherte Veröffentlichungsereignis zurück und konsumiert oder verwirft dieses Ereignis dann entsprechend dem Übergangszielzustand.
Alle Millisekunden von SyncTime geht die Zustandsmaschine in den zusammengesetzten Sync-Zustand über, der das eigentliche Senden und Empfangen von Datenverkehr aus dem Netzwerk durchführt, indem Empfangs- und Sendeereignisse an den Netzwerkmanager gesendet werden. Es ist eine gleichzeitige Einheit, die sich mit Netzwerkproblemen befasst.
Obwohl der eingeführte MqttMgr nur die CONNECT- und PUBLISH-Pakete unterstützt, könnte er das SUBSCRIBE-Paket mit relativ einfachen Änderungen unterstützen.
Die Zustandsmaschinenaktionen greifen auf die Parameter des konsumierten Ereignisses zu, indem sie das Schlüsselwort params verwenden. Im folgenden Übergang trägt das Connect-Ereignis beispielsweise zwei Parameter, clientId und keepAlive, deren Werte verwendet werden, um die Attribute des entsprechenden MqttMgr-Objekts zu aktualisieren:
Connect(clientId, keepAlive)/ me->clientId =params->clientId; me->keepAlive =params->keepAlive; me->operRes =mqtt_connect(&me->client, me->clientId, NULL, NULL, 0, NULL, NULL, 0, me->keepAlive);
In diesem Beispiel ist das Ereignis Connect(clientId, keepAlive) der Auslöser der Transition und der Aufruf mqtt_connect() ist Teil der Aktion, die als Ergebnis ausgeführt wird. Mit anderen Worten, wenn das MqttMgr-Objekt ein Connect(clientId, keepAlive)-Ereignis mit den Parametern 'publishing_client' und '400' empfängt, Connect(“publishing_client”, 400), werden die clientId- und keepAlive-Attribute des MqttMgr mit den Werten ' Publishing_Client' und '400' folglich.
Um Ereignisse zu erstellen und zu senden, verwenden die Aktionen der Zustandsmaschine das GEN()-Makro. Die folgende Anweisung sendet beispielsweise ein Receive-Ereignis an das Collector-Objekt, das durch seinen Collector-Zeiger als Attribut eines MqttMgr-Objekts referenziert wird:
GEN(me->itsCollector, Receive());
Das erste Argument der GEN()-Anweisung ist das Objekt, das das Ereignis empfängt, während das zweite Argument das gesendete Ereignis ist, einschließlich der Ereignisargumente (sofern vorhanden). Die Argumente müssen mit den Ereignisparametern übereinstimmen. Die folgende Anweisung generiert beispielsweise ein ConnRefused(code)-Ereignis und sendet es an das Collector-Objekt, wobei der vom Broker zurückgegebene Code als Ereignisparameter übergeben wird:
GEN(me->itsCollector, ConRefused(code));
Die Idee, das Schlüsselwort params für den Zugriff auf die Parameter des konsumierten Ereignisses und das Makro GEN() zum Generieren von Ereignissen aus Aktionen zu verwenden, wurde zu reinen Veranschaulichungszwecken vom Codegenerator von Rational Rhapsody Developer übernommen.
Die Default-Aktion der State Machine in Abbildung 1 setzt den Callback, der von MQTT-Reactive aufgerufen wird, wenn eine Verbindungsannahme vom Broker eingeht. Dieser Rückruf sollte im MqttMgr-Code implementiert werden. Dieser Callback muss entweder ConnAccepted- oder ConnRefused(code)-Ereignisse generieren, um an das Collector-Objekt gesendet zu werden, wie unten gezeigt.
statisch ungültig connack_response_callback (Aufzählung MQTTConnackReturnCode return_code){ /*...*/ wenn (return_code ==MQTT_CONNACK_ACCEPTED) { GEN(me->itsCollector, ConnAccepted()); } sonst { GEN(me->itsCollector, ConnRefused(return_code)); }}
Modellimplementierung
Das Modell in Abbildung 1 könnte in C oder C++ implementiert werden, indem Sie entweder Ihr bevorzugtes Softwaretool oder einfach Ihre eigene Zustandsautomatenimplementierung verwenden. Dafür stehen im Internet verschiedene Tools zur Verfügung, wie unter anderem das RKH-Framework, das QP-Framework, das Yakindu Statechart Tool oder der Rational Rhapsody Developer. Alle unterstützen Statecharts und C/C++-Sprachen. Darüber hinaus enthalten einige von ihnen ein Tool zum Zeichnen eines Statechart-Diagramms und zum Generieren von Code daraus.
Diese Zustandsmaschine wurde von einem aktiven Objekt namens MqttMgr (MQTT Manager) ausgeführt, das eine strikte Kapselung des MQTT-Reactive-Codes vorsah und die einzige Instanz war, die eine MQTT-Reactive-Funktion aufrufen oder auf MQTT-Reactive-Daten zugreifen durfte. Die anderen gleichzeitigen Entitäten im System sowie alle ISRs konnten MQTT-Reactive nur indirekt durch den Austausch von Ereignissen mit MqttMgr nutzen. Die Verwendung dieses Mechanismus zum Synchronisieren gleichzeitiger Entitäten und zum Austausch von Daten zwischen ihnen vermeidet den Umgang mit den Gefahren herkömmlicher Blockierungsmechanismen wie Semaphoren, Mutex, Verzögerungen oder Ereignis-Flags. Diese Mechanismen können unerwartete Fehlfunktionen verursachen, die schwer und mühsam zu diagnostizieren und zu beheben sind.
Das aktive Objekt MqttMgr kapselt seine Attribute als einen Satz von Datenelementen. Ein Datenelement bezeichnet eine Variable mit einem Namen und einem Typ, wobei der Typ eigentlich ein Datentyp ist. Ein Datenelement für das MqttMgr-Objekt wird einem Mitglied der Objektstruktur zugeordnet. Name und Typ des Mitglieds sind mit denen der Objektdaten identisch. Beispielsweise wird das Clientattribut des MqttMgr-Objekttyps nach Wert als Datenelement in die MqttMgr-Struktur eingebettet:
strukturieren MqttMgr { /* ... */ strukturieren mqtt_client Kunde; /* Attribut Client */ LocalRecvAll localRecv; /* Attribut localRecv */ };
Auf die Daten des MqttMgr-Objekts wird direkt zugegriffen und diese geändert, ohne Zugriffs- oder Mutatoroperationen zu verwenden. Auf client und localRecv wird beispielsweise über den me-Zeiger zugegriffen, der auf eine Instanz von MqttMgr zeigt.
mqtt_recvMsgError(&me->client, &me->localRecv);
Der MqttMgr hat die in Tabelle 1 gezeigte Liste von Attributen.
Tabelle 1. MqttMgr-Attribute
Die Struktur in Abbildung 2 ist nützlich, um die Beziehungen zwischen den betroffenen Akteuren zu berücksichtigen. Dies sind:das Collector-Objekt, das Informationen an den Broker senden möchte; das NetMgr-Objekt, das sich mit dem Netzwerk befasst; und das MqttMgr-Objekt.
Abbildung 2. Entwurf der IoT-Systemstruktur (Quelle:VortexMakes)
Das Sequenzdiagramm in Abbildung 3 zeigt, wie das MqttMgr-Objekt mit dem Rest des Systems interagiert, wenn eine Sitzung mit einem MQTT-Server geöffnet werden muss. In diesem Diagramm werden der MqttMgr-Zustand und die ausgetauschten asynchronen Nachrichten zwischen Collector-, MqttMgr- und NetMgr-Akteuren dargestellt.
Abbildung 3. Verbindung zu einem MQTT-Broker (Quelle:VortexMakes)
Nachdem eine Netzwerkverbindung durch das NetMgr-Objekt zu einem Broker hergestellt wurde, muss das erste Paket, das von MqttMgr an einen MQTT-Server gesendet wird, ein CONNECT-Paket sein. Somit sendet der Collector-Akteur ein Connect(clientId, keepAlive)-Ereignis an den MqttMgr-Akteur. Dieses Ereignis muss die Client-ID und das Keep-Alive-Zeitintervall enthalten. Wenn der Server die Verbindungsanforderung akzeptiert, sendet der MqttMgr-Akteur ein ConnAccepted-Ereignis an den Collector-Akteur, um diese Situation zu benachrichtigen. Von da an kann der Collector-Akteur Informationsnachrichten an diesen Broker veröffentlichen.
Wenn der Server die Verbindungsanforderung ablehnt, sendet der MqttMgr-Akteur ein ConnRefused-Ereignis an den Collector-Akteur. Dieses Ereignis enthält einen Code, der die Ablehnungsursache wie in Abbildung 4 gezeigt mitteilt. Siehe MQTT v3.1.1 Abschnitt 3.2.2.3.
Abbildung 4. Broker lehnt eine Verbindungsanfrage ab (Quelle:VortexMakes)
Abbildung 5 zeigt den Interaktionsfluss, wenn eine Nachricht veröffentlicht wird. Dazu sendet der Collector-Akteur ein Publish(data, size, topic, qos)-Ereignis, das die zu veröffentlichenden Informationen (data), die Länge der Informationen in Bytes (size), den Themennamen, zu dem die Informationen werden veröffentlicht (Thema) und der Grad der Sicherheit, diese Nachricht zu übermitteln (qos). Im zuvor erwähnten IoT-Gerät wurden die veröffentlichten Informationen mithilfe der JSON-Spezifikation formatiert. Es ist ein offenes Standardformat, das Datenobjekte mit Attribut-Wert-Paaren in menschenlesbarem Text enthält. Dieses Format wurde mit jWrite erreicht, einer einfachen und leichtgewichtigen Bibliothek, die in C geschrieben wurde.
Abbildung 5. Veröffentlichen von Daten an einen Broker (Quelle:VortexMakes)
Abbildung 6 zeigt ein Szenario, in dem der Empfang und das Senden von MQTT-Nachrichten an das Netzwerk fehlschlägt. Wenn der Netzwerkmanager keinen Datenverkehr vom Netzwerk empfangen kann, sendet er ein ReceiveFail an den MqttMgr-Aktor. Wenn der Netzwerkmanager keine Daten an das Netzwerk senden kann, sendet er ein SendFail an den MqttMgr-Akteur.
Abbildung 6. Fehler im Netzwerk (Quelle:VortexMakes)
Tabelle 2 fasst die beteiligten Ereignisse in den gezeigten Szenarien zusammen.
Tabelle 2. Ereignisse
Schlussfolgerung
Durch die Vermeidung der Gefahren herkömmlicher Blockierungsmechanismen – wie Semaphoren, Mutex, Verzögerungen oder Ereignis-Flags – ermöglichen die MQTT-Reactive-Bibliothek, die Zustandsmaschine und die in diesem Artikel vorgeschlagene Softwarearchitektur reaktiven eingebetteten Systemen die Implementierung eines MQTT-Clients in einem neuartigen Weg. Dies wird durch die Kapselung von MQTT-reaktivem Code in einer Parallelitätseinheit namens aktives Objekt . erreicht , deren zustandsbasiertes Verhalten in der vorgeschlagenen Zustandsmaschine definiert ist. Dieses aktive Objekt kommuniziert mit dem Rest des Systems durch den Austausch asynchroner Ereignisse, die verwendet werden:nicht nur zum Empfangen und Senden von Datenverkehr über das Netzwerk, sondern auch zum Verbinden und Veröffentlichen von Informationen mit einem Server für Internet-of-Things-Anwendungen.
Eingebettet
- Eine Taxonomie für das IIoT
- Aufbau flexibler Fertigungssysteme für Industrie 4.0
- Ein Überblick über die IC-Technologie für Mikrocontroller und eingebettete Systeme
- Würth Elektronik eiSos präsentiert neue Komponenten für smarte Systeme
- Entwicklung von Motorsteuerungen für Robotersysteme
- Syslogic:robuste Computer und HMI-Systeme für Baumaschinen
- Kontron und SYSGO:SAFe-VX-Rechenplattform für sicherheitskritische Systeme
- Gewünschte Zustandskonfiguration für Stromkreise
- Unternehmen setzen Fristen für intelligente Systeme
- Top 10 Arbeitsabläufe für Hersteller