Zum Inhalt

Beispiel-Projekte

In diesem Kapitel werden anhand der IoT-Szenarien und des IoT-Architekturmodells Übungs- und Umsetzungsbeispiele behandelt.

architect_language

Sprachen für eine IoT-Plattform

Auf den rotumrandeten Bereich liegt der Fokus!

In der Regel werden die Projekte mit Hilfe einer Simulation umgesetzt. Diese kann in allen ebenen der IoT-Architektur eingreifen:

sim_Konzept_allg.jpg

Allgemeines Konzept zur Einbindung von Simulationen entlang der IoT-Plattform-Architektur

Signale generieren mit Python

Erster Gehversuch mit VS Code und Python, um eine Datenquelle zur Verfügung zu haben.

Sinus-Generator
# Bibliotheken importieren
import numpy as np
import time

# Parameter initialisieren
n = 10
x = np.zeros(n)

# Berechnungen
for i in range(n):
    x[i] = np.sin(i/1.23)


# Ausgabe
print(fErgebniss: {x}mm)

Weitere Aufgaben:

  • Rechteck-Generator
  • PT1-Prozessverhalten (ideal)
    • Stochastische Störungen
    • Drift
  • Random Walk

Skripte

Datei Link
squareSignal.py File
sinusSignal.py File
RandomWalk.py File

mqtt + node-RED

Mit mqtt sollen hier Daten aus der Ebene (1) in die Ebene "Smart Services" (8) übertragen werden. Die Ereignisorientierung von mqtt kann untersucht werden.

mqtt_py_tool_NR

mqtt + Python + Node-Red

Software-Voraussetzungen

Info
Software Link
mqtt-Broker "Mosquitto" Installation
mqtt-Explorer Installation
Python Installation
VS Code Installation
Node-Red Installation

Aufgaben

  1. Publisher mit Python (mqpub.py)
  2. mqtt-Explorer -> msg-Object untersuchen
  3. Subscriber mit Python (mqsub.py)
  4. Node-Flow entwickeln

Skripte

Multi-Topics

PY-sim_mqtt_mdb_NR/sensor_multiple_2.py
    '''
    Einfacher Publisher
    =============================
    client.publish (sequentielles Publish) und
    publish.multiple(msgs,...)
    veröffentlichen jeweils "Einzel"-Topic, die vom Subscriper sequentiell empfangen werden

    => Dieses Skript setzt "subscriber_2.py" voraus

    '''

    import time

    import paho.mqtt.client as mqtt
    import paho.mqtt.publish as publish
    import numpy as np

    # --------------------------------------------------------------
    # Parameterblock
    mq_host = "127.0.0.1"        # lokaler PC
    # mq_host = "172.18.45.150"    # Labor-Server
    mq_topic = "test_172"   # letzte IP-Adr-Nr

    # --------------------------------------------------------------

    def on_connect(client, userdata, flags, rc):
        print("Connected with result code " + str(rc))


    client = mqtt.Client()
    client.on_connect = on_connect

    client.connect(mq_host, 1883, 60)

    client.loop_start()

    t = 0.0
    while True:
        time.sleep(2)
        t = t + 0.01
    #     client.publish(f"{mq_topic}/temperature", t)
    #     client.publish(f"{mq_topic}/sinus", np.sin(t))
        msgs = [(f"{mq_topic}/cos",np.cos(t)),
                (f"{mq_topic}/cos2",np.cos(t)*2),
                (f"{mq_topic}/ts",time.time())]
        publish.multiple(msgs, hostname=mq_host)
        print(msgs)
Download: File

PY-sim_mqtt_mdb_NR/subscriber_2.py
    '''
    Einfacher Abonnent
    ======================
    mit jedem abonnierten Topic wird die on_message-funktion aufgerufen 
    (globaler Zähler cnt)

    Dieses Skript setzt den Publisher "sensor_multiple_2.py" voraus

    https://pypi.org/project/paho-mqtt/#description
    '''
    import paho.mqtt.client as mqtt

    # --------------------------------------------------------------
    # Parameterblock
    mq_host = "127.0.0.1"        # lokaler PC
    # mq_host = "172.18.45.150"    # Labor-Server
    mq_topic = "test_172"   # letzte IP-Adr-Nr

    # --------------------------------------------------------------


    cnt = 0

    def on_connect(client, userdata, flags, rc):
        print("Connected with result code " + str(rc))

        client.subscribe(mq_topic+"/#")

    def on_message(client, userdata, msg):
        global cnt
        cnt +=1
        print(cnt, '     ',msg.topic + " " + str(msg.payload))


    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_message = on_message

    client.connect(mq_host, 1883, 60)

    client.loop_forever()
Download: File

json-Objects

PY-sim_mqtt_mdb_NR/sensor_json_3.py
    '''
    JSON-Publisher
    =============================
    client.publish (sequentielles Publish) und
    publish.multiple(msgs,...)
    veröffentlicht ein "Sammel"-Topic mit client.publish(), das vom Subscriper 
    als JSON-Objekt empfangen wird.

    => Dieses Skript setzt "subscriber_3_json.py"  oder
       "subscriber_3_json_mDB_2.py" voraus

    '''


    import time

    import paho.mqtt.client as mqtt
    import paho.mqtt.publish as publish
    import numpy as np
    import json

    def on_connect(client, userdata, flags, rc):
        print("Connected with result code " + str(rc))

    # Parameterblock
    mq_host = "127.0.0.1"        # lokaler PC
    # mq_host = "172.18.45.150"    # Labor-Server
    mq_topic = "test_172"   # letzte IP-Adr-Nr

    client = mqtt.Client()
    client.on_connect = on_connect

    client.connect(mq_host, 1883, 60)

    client.loop_start()

    t = 0.0
    while True:
        time.sleep(1)
        t = t + 0.01
        # python-dict beschreibt den Daten-Block (payload)
        PL = {"cos":np.cos(t),
            "cos2":np.cos(t)*2,
            "ts":time.time()
            }
        # Umwandeln in ein json-Objekt
        jPL = json.dumps(PL)
        # versenden (publish)
        client.publish(topic=mq_topic, payload = jPL)
        print(jPL)
Download: File

PY-sim_mqtt_mdb_NR/subscriber_3_json.py
    #!/usr/bin/env python

    '''
    https://pypi.org/project/paho-mqtt/#description
    '''
    import paho.mqtt.client as mqtt
    import json

    cnt = 0

    def on_connect(client, userdata, flags, rc):
        print("Connected with result code " + str(rc))

        client.subscribe("sen/#")
        client.subscribe("test_172")

    def on_message(client, userdata, msg):
        global cnt
        cnt +=1
        # print(cnt, '     ',msg.topic + " " + str(msg.payload))
        data = json.loads(msg.payload)
        # print(cnt, '     ',data)
        print(cnt, '     ',data["ts"])



    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_message = on_message

    client.connect("localhost", 1883, 60)

    client.loop_forever()
Download: File

subscriber_3_json.py

mqtt + Node-RED + MongoDB

Durch die Schicht (3) Datenhaltung lassen sich auch Historie-Daten für Analysezwecke nutzen.

mqtt_py_mDB_NR

mqtt + Python + MongoDB + Node-Red

Software-Voraussetzungen

Info
Software Link
Mongo-DB (inkl. Compass) Installation

Aufgaben

  1. mq_mdb_client.py -> PY-sim_mqtt_mdb_NR/subscriber_3_json_mDB_2.py anpassen und starten
  2. sensor_json_3.py -> PY-sim_mqtt_mdb_NR/sensor_json_3_2.py anpassen und separat starten (Konsole, ...)
  3. mdb-Explorer auf mDB setzen
  4. Node-Red-Flows mit mDB-Verbindungen entwickeln

Skripte

pub / sub

PY-sim_mqtt_mdb_NR\sensor_json_3_2.py
    '''
    IoT-Schicht 1
    - Publishd mqtt-basierte Daten (simulierter Prozess)

    letzte Änderung: 27.4.2023

    Doku:
    https://pypi.org/project/paho-mqtt/#description
    '''

    import time

    import paho.mqtt.client as mqtt
    import numpy as np
    import json
    import datetime
    # --------------------------------------------------------------
    # Parameterblock
    mq_host = "127.0.0.1"        # lokaler PC
    # mq_host = "172.18.45.150"    # Labor-Server
    mq_topic = "test_172"   # letzte IP-Adr-Nr

    # --------------------------------------------------------------


    def on_connect(client, userdata, flags, rc):
        print("Connected with result code " + str(rc))


    # __main__ ======================================================
    client = mqtt.Client()
    client.on_connect = on_connect

    client.connect(mq_host, 1883, 60)

    client.loop_start()

    # Generieren von simulierten Daten -----------
    cnt = 0
    while True:
        time.sleep(1)
        cnt += 10
        ts = datetime.datetime.now()
        # python-dict beschreibt den Daten-Block (payload)
        PL = {"t": time.time(),     # unix-Zeit
            "ts": str(ts),          # utf-Zeit
            "cnt": cnt,
            "v1": np.random.normal(), 
            "v2":np.cos(cnt/10)*2, 
            "v3": np.cos(cnt/20)*200
            }
        # Umwandeln in ein json-Objekt
        jPL = json.dumps(PL)
        # versenden (publish)
        client.publish(topic=mq_topic, payload = jPL )
        print(PL)
Download: File

PY-sim_mqtt_mdb_NR\subscriber_3_json_mDB_2.py
    '''
    IoT-Schicht 2-3
    - subribed mqtt-basierte json-Daten
    - schreibt die Daten in die MongoDB

    => setzt das Skript "sensor_json_3.py" voraus (schreibt als payload ein json-Objekt).


    letzte Änderung: 27.4.2023

    Doku:
    https://pypi.org/project/paho-mqtt/#description
    https://pymongo.readthedocs.io/en/stable/
    '''
    import paho.mqtt.client as mqtt
    from pymongo import MongoClient
    import json
    # --------------------------------------------------------------
    # Parameterblock
    mdb_mq_host = "127.0.0.1"        # lokaler PC
    # mdb_mq_host = "172.18.45.150"    # Labor-Server
    mdb_db = "test"
    mdb_col = "M_172"       # letzte IP-Adr-Nr
    mq_topic = "test_172"   # letzte IP-Adr-Nr

    # --------------------------------------------------------------

    cnt = 0

    class mqClient():
        def __init__(self,col):
            self.col = col

        def on_connect(self,client, userdata, flags, rc):
            print("Connected with result code " + str(rc))
            client.subscribe("test_172")

        def on_message(self,client, userdata, msg):
            global cnt
            cnt +=1
            print(cnt, '     ',msg.topic + " " + str(msg.payload))
            # payload enthält ein json-Objekt mit allen gesendeten Datenwerten {key1:value1, key2:value2}
            data = json.loads(msg.payload)
            print(data)
            # Schreiben in die MongoDB
            result = self.col.insert_one(data)

    # __main__ ======================================================
    mo_client = MongoClient(mdb_mq_host, 27017)
    db = mo_client[mdb_db]
    col = db[mdb_col]
    # col.drop()      # Löschen der bereits erzeugten Collection (ggf. auskommentieren...)

    mqC = mqClient(col)

    client = mqtt.Client()
    client.on_connect = mqC.on_connect
    client.on_message = mqC.on_message

    client.connect(mdb_mq_host, 1883, 60)

    client.loop_forever()
Download: File

mqtt + py3 + MongoDB + NR

In der nächsten Stufe soll die Analytic Services dazukommen. Die Schichten 1 bis 3 wurden bereits vollständig umgesetzt. Wird nun der Fokus im Entwicklungsprozess auf die Prozess-Simulation in Anlehnung an den realen Prozess gerichtet, dann können die simulierten Prozessdaten hierzu direkt in die Datenbank geschrieben werden (Grün markierter Bereich 1,2 +3).

mqtt_py2_mDB_NR

mqtt + Python + MongoDB + pyAnalytics + Node-Red

Diese Variante zeigt das Skript

client

PY-simProz_mDB_NR\sim__mdb_prozSig_rt_01.py
    # -*- coding: utf-8 -*-
    """
    @author: jbechtloff
    Vorgabe: tsA, tsE, Timedelta

    Zusammenführen von  sim_sig_design_03.py &
                        sim_mdb_struc_03.py
    Generieren + Realtime                    
    """

    from pymongo import MongoClient
    import numpy as np
    import pandas as pd
    from PT1 import PT1
    import time

    import os
    from apscheduler.schedulers.blocking import BlockingScheduler


    class clProzess:
        def __init__(self,  T, T1, Kdrift, Tdrift, Kw, col):
            self.pt1a = PT1(T,T1)
            self.pt1b = PT1(T,T1)
            self.T = T
            self.Kdrift = Kdrift
            self.Tdrift = Tdrift
            self.Kw = Kw
            self.col = col
            self.cnt = 0

        def simProz(self):
            k = self.cnt
            u_1 = np.sin(4*k/100*np.pi)
            if u_1 <0: u_1=0
            if u_1 > 0.5: u_1 = 0.5
            u_1 = u_1*2.0

            # Amplituden-Drift-Simu: 1.0 < u2 < 1.1 (MUL)
            a_Drift = 1.0 + self.Kdrift * (np.sin(4*k*self.T/self.Tdrift*np.pi)+1.0)/2.0
            errFl = 0
            if a_Drift > 1.0+19*self.Kdrift/20: errFl = 1.0
            u_2 = self.pt1b.PT1rek(self.pt1a.PT1rek(u_1*a_Drift))  # PT2-Verschleiß
            w = np.random.randn()*self.Kw

            self.cnt +=1
            return(u_1, u_2, a_Drift, w, errFl)

        def write_mdb(self, ts):
            [u_1, u_2, a_Drift, wi, err_Flg] = self.simProz()

            data = {
                'ts': ts,
                'cnt': self.cnt,
                'u': u_2 + wi,
                'err': err_Flg
            }
            result = self.col.insert_one(data)
            print("%s" % (self.cnt))

        def write_mdb_now(self):
            self.write_mdb(pd.Timestamp.now())


    if __name__ == "__main__":
        client = MongoClient('localhost', 27017)
        db = client['IoT']
        col = db["m"]
        col.drop()  # Alte Collection LÖSCHEN !!!

        Prozess = clProzess(T=1,T1=4,
                        Kdrift=0.1, 
                        Tdrift=2000, 
                        Kw=0.01,
                        col = col)



        Tscal = {"s":1,"m":60,"h":60*60, "d":60*60*24}
        dT = 1
        T_unit = "s"

        sched = BlockingScheduler({'apscheduler.timezone': 'UTC'})
        sched.add_job(Prozess.write_mdb_now,'interval',seconds=Tscal[T_unit]*dT,id='runJob1') 

        # tsA = pd.Timestamp(2021,11, 20,  0,0,0)
        # tsE = pd.Timestamp(2021,11, 21,  10,0,0)

        tsE = pd.Timestamp.now()  # now
        tsA = tsE - pd.Timedelta(1,"h")
        # tsDeltaAE = tsE - tsA


        ts = tsA
        while ts <= tsE: 
            Prozess.write_mdb(ts)
            ts = ts + pd.Timedelta(dT,T_unit)


        print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

        try:
            sched.start()
        except (KeyboardInterrupt, SystemExit):
            pass                
Download: File

In diesem Python-Skript werden mehrere simulationsspezifische Teillösungen verwendet:

  • Zeitzyklische (periodische Task --> APScheduler) Kapitel
  • Simulationsmodell Kapitel

Info

Folgende Anpassungen müssen vorgenommen werden:

  1. MongoDB: IP & Port (client = MongoClient('localhost', 27017))
  2. MongoDB: Datenbank (db = client['IoT'])
  3. MongoDB: Collection (col = db["m"])
  4. Simulationsvorlauf: tsA = tsE - pd.Timedelta(1,"h")