Zum Inhalt

Beispiel-Projekte

In diesem Kapitel werden anhand der IoT-Szenarien und des IoT-Architekturmodells Übungs- und Umsetzungsbeispiele behandelt.

architect_language

Sprachen für eine IoT-Plattform

Auf den rotumrandeten Bereich liegt der Fokus!

In der Regel werden die Projekte mit Hilfe einer Simulation umgesetzt. Diese kann in allen ebenen der IoT-Architektur eingreifen:

sim_Konzept_allg.jpg

Allgemeines Konzept zur Einbindung von Simulationen entlang der IoT-Plattform-Architektur

Signale generieren mit Python

Erster Gehversuch mit VS Code und Python, um eine Datenquelle zur Verfügung zu haben.

Sinus-Generator
# Bibliotheken importieren
import numpy as np
import time

# Parameter initialisieren
n = 10
x = np.zeros(n)

# Berechnungen
for i in range(n):
    x[i] = np.sin(i/1.23)


# Ausgabe
print(fErgebniss: {x}mm)

Weitere Aufgaben:

  • Rechteck-Generator
  • PT1-Prozessverhalten (ideal)
    • Stochastische Störungen
    • Drift
  • Random Walk

Skripte

Datei Link
squareSignal.py File
sinusSignal.py File
RandomWalk.py File

mqtt-nodeRED

Mit mqtt sollen hier Daten aus der Ebene (1) in die Ebene "Smart Services" (8) übertragen werden. Die Ereignisorientierung von mqtt kann untersucht werden.

mqtt_py_tool_NR

mqtt + Python + NodeRed

Software-Voraussetzungen

Info
Software Link
mqtt-Broker "Mosquitto" Installation
mqtt-Explorer Installation
Python Installation
VS Code Installation
NodeRed Installation

Aufgaben

  1. Publisher mit Python (mqpub.py)
  2. mqtt-Explorer -> msg-Object untersuchen
  3. Subscriber mit Python (mqsub.py)
  4. Node-Flow entwickeln

Skripte

Multi-Topics

PY-sim_mqtt_mdb_NR/sensor_multiple_2.py
'''
Einfacher Publisher
=============================
client.publish (sequentielles Publish) und
publish.multiple(msgs,...)
veröffentlichen jeweils "Einzel"-Topic, die vom Subscriper sequentiell empfangen werden

=> Dieses Skript setzt "subscriber_2.py" voraus

'''

import time

import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
import numpy as np

# --------------------------------------------------------------
# Parameterblock
mq_host = "127.0.0.1"        # lokaler PC
# mq_host = "172.18.45.150"    # Labor-Server
mq_topic = "test_172"   # letzte IP-Adr-Nr

# --------------------------------------------------------------

def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))


client = mqtt.Client()
client.on_connect = on_connect

client.connect(mq_host, 1883, 60)

client.loop_start()

t = 0.0
while True:
    time.sleep(2)
    t = t + 0.01
#     client.publish(f"{mq_topic}/temperature", t)
#     client.publish(f"{mq_topic}/sinus", np.sin(t))
    msgs = [(f"{mq_topic}/cos",np.cos(t)),
            (f"{mq_topic}/cos2",np.cos(t)*2),
            (f"{mq_topic}/ts",time.time())]
    publish.multiple(msgs, hostname=mq_host)
    print(msgs)
Download: File

PY-sim_mqtt_mdb_NR/subscriber_2.py
'''
Einfacher Abonnent
======================
mit jedem abonnierten Topic wird die on_message-funktion aufgerufen 
(globaler Zähler cnt)

Dieses Skript setzt den Publisher "sensor_multiple_2.py" voraus

https://pypi.org/project/paho-mqtt/#description
'''
import paho.mqtt.client as mqtt

# --------------------------------------------------------------
# Parameterblock
mq_host = "127.0.0.1"        # lokaler PC
# mq_host = "172.18.45.150"    # Labor-Server
mq_topic = "test_172"   # letzte IP-Adr-Nr

# --------------------------------------------------------------


cnt = 0

def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))

    client.subscribe(mq_topic+"/#")

def on_message(client, userdata, msg):
    global cnt
    cnt +=1
    print(cnt, '     ',msg.topic + " " + str(msg.payload))


client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect(mq_host, 1883, 60)

client.loop_forever()
Download: File

json-Objects

PY-sim_mqtt_mdb_NR/sensor_json_3.py
'''
JSON-Publisher
=============================
client.publish (sequentielles Publish) und
publish.multiple(msgs,...)
veröffentlicht ein "Sammel"-Topic mit client.publish(), das vom Subscriper 
als JSON-Objekt empfangen wird.

=> Dieses Skript setzt "subscriber_3_json.py"  oder
   "subscriber_3_json_mDB_2.py" voraus

'''


import time

import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
import numpy as np
import json

def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))

# Parameterblock
mq_host = "127.0.0.1"        # lokaler PC
# mq_host = "172.18.45.150"    # Labor-Server
mq_topic = "test_172"   # letzte IP-Adr-Nr

client = mqtt.Client()
client.on_connect = on_connect

client.connect(mq_host, 1883, 60)

client.loop_start()

t = 0.0
while True:
    time.sleep(1)
    t = t + 0.01
    # python-dict beschreibt den Daten-Block (payload)
    PL = {"cos":np.cos(t),
        "cos2":np.cos(t)*2,
        "ts":time.time()
        }
    # Umwandeln in ein json-Objekt
    jPL = json.dumps(PL)
    # versenden (publish)
    client.publish(topic=mq_topic, payload = jPL)
    print(jPL)
Download: File

PY-sim_mqtt_mdb_NR/subscriber_3_json.py
#!/usr/bin/env python

'''
https://pypi.org/project/paho-mqtt/#description
'''
import paho.mqtt.client as mqtt
import json

cnt = 0

def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))

    client.subscribe("sen/#")
    client.subscribe("test_172")

def on_message(client, userdata, msg):
    global cnt
    cnt +=1
    # print(cnt, '     ',msg.topic + " " + str(msg.payload))
    data = json.loads(msg.payload)
    # print(cnt, '     ',data)
    print(cnt, '     ',data["ts"])



client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect("localhost", 1883, 60)

client.loop_forever()
Download: File

subscriber_3_json.py

mqtt-NodeRED-MongoDB

Durch die Schicht (3) Datenhaltung lassen sich auch Historie-Daten für Analysezwecke nutzen.

mqtt_py_mDB_NR

mqtt + Python + MongoDB + NodeRed

Software-Voraussetzungen

Info
Software Link
Mongo-DB (inkl. Compass) Installation

Aufgaben

  1. mq_mdb_client.py -> PY-sim_mqtt_mdb_NR/subscriber_3_json_mDB_2.py anpassen und starten
  2. sensor_json_3.py -> PY-sim_mqtt_mdb_NR/sensor_json_3_2.py anpassen und separat starten (Konsole, ...)
  3. mdb-Explorer auf mDB setzen
  4. NodeRed-Flows mit mDB-Verbindungen entwickeln

Skripte

pub / sub

PY-sim_mqtt_mdb_NR\sensor_json_3_2.py
'''
IoT-Schicht 1
- Publishd mqtt-basierte Daten (simulierter Prozess)

letzte Änderung: 27.4.2023

Doku:
https://pypi.org/project/paho-mqtt/#description
'''

import time

import paho.mqtt.client as mqtt
import numpy as np
import json
import datetime
# --------------------------------------------------------------
# Parameterblock
mq_host = "127.0.0.1"        # lokaler PC
# mq_host = "172.18.45.150"    # Labor-Server
mq_topic = "test_172"   # letzte IP-Adr-Nr

# --------------------------------------------------------------


def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))


# __main__ ======================================================
client = mqtt.Client()
client.on_connect = on_connect

client.connect(mq_host, 1883, 60)

client.loop_start()

# Generieren von simulierten Daten -----------
cnt = 0
while True:
    time.sleep(1)
    cnt += 10
    ts = datetime.datetime.now()
    # python-dict beschreibt den Daten-Block (payload)
    PL = {"t": time.time(),     # unix-Zeit
        "ts": str(ts),          # utf-Zeit
        "cnt": cnt,
        "v1": np.random.normal(), 
        "v2":np.cos(cnt/10)*2, 
        "v3": np.cos(cnt/20)*200
        }
    # Umwandeln in ein json-Objekt
    jPL = json.dumps(PL)
    # versenden (publish)
    client.publish(topic=mq_topic, payload = jPL )
    print(PL)
Download: File

PY-sim_mqtt_mdb_NR\subscriber_3_json_mDB_2.py
'''
IoT-Schicht 2-3
- subribed mqtt-basierte json-Daten
- schreibt die Daten in die MongoDB

=> setzt das Skript "sensor_json_3.py" voraus (schreibt als payload ein json-Objekt).


letzte Änderung: 27.4.2023

Doku:
https://pypi.org/project/paho-mqtt/#description
https://pymongo.readthedocs.io/en/stable/
'''
import paho.mqtt.client as mqtt
from pymongo import MongoClient
import json
# --------------------------------------------------------------
# Parameterblock
mdb_mq_host = "127.0.0.1"        # lokaler PC
# mdb_mq_host = "172.18.45.150"    # Labor-Server
mdb_db = "test"
mdb_col = "M_172"       # letzte IP-Adr-Nr
mq_topic = "test_172"   # letzte IP-Adr-Nr

# --------------------------------------------------------------

cnt = 0

class mqClient():
    def __init__(self,col):
        self.col = col

    def on_connect(self,client, userdata, flags, rc):
        print("Connected with result code " + str(rc))
        client.subscribe("test_172")

    def on_message(self,client, userdata, msg):
        global cnt
        cnt +=1
        print(cnt, '     ',msg.topic + " " + str(msg.payload))
        # payload enthält ein json-Objekt mit allen gesendeten Datenwerten {key1:value1, key2:value2}
        data = json.loads(msg.payload)
        print(data)
        # Schreiben in die MongoDB
        result = self.col.insert_one(data)

# __main__ ======================================================
mo_client = MongoClient(mdb_mq_host, 27017)
db = mo_client[mdb_db]
col = db[mdb_col]
# col.drop()      # Löschen der bereits erzeugten Collection (ggf. auskommentieren...)

mqC = mqClient(col)

client = mqtt.Client()
client.on_connect = mqC.on_connect
client.on_message = mqC.on_message

client.connect(mdb_mq_host, 1883, 60)

client.loop_forever()
Download: File

NodeRed

Für den Zugriff auf die MongoDB in NodeRed muss das Node-Paket "node-red-contrib-mongodb4" installiert sein.

mongodb_conf

Konfiguration der MongoDB-Schnittstelle

Die eigentliche Suche (query) wird mit dem Node-Parameterdialog (2-4) eingestellt:

mongodb_par

Konfiguration des MongoDB-Node

Als Input auf den MongoDB-Node liefert der Function-Node die eigentliche Such-Anfrage (hier: limit = 2 !!):

fcn_mdb_query

Function-Node

Javascript
// find query argument
const query = {};
// find option argument
const options = {
    sort: { ts: -1 },
    limit: 2
};
// payload for mongodb4 node
msg.payload = [query, options];
return msg;

Damit liefert die Abfrage 2 "Documents" zurück:

nr_mdb_result

Result der Abfrage

flows_contribMDB.json

[
{
    "id": "36ac58d44b0a6fbb",
    "type": "tab",
    "label": "Flow 2",
    "disabled": false,
    "info": "",
    "env": []
},
{
    "id": "f4334596e259510e",
    "type": "inject",
    "z": "36ac58d44b0a6fbb",
    "name": "",
    "props": [
        {
            "p": "payload"
        },
        {
            "p": "topic",
            "vt": "str"
        }
    ],
    "repeat": "",
    "crontab": "",
    "once": false,
    "onceDelay": 0.1,
    "topic": "",
    "payload": "",
    "payloadType": "date",
    "x": 180,
    "y": 180,
    "wires": [
        [
            "248a30f8569a9831"
        ]
    ]
},
{
    "id": "248a30f8569a9831",
    "type": "function",
    "z": "36ac58d44b0a6fbb",
    "name": "mDB-Query",
    "func": "// find query argument\nconst query = {};\n// find option argument\nconst options = {\n    sort: { ts: -1 },\n    limit: 2\n};\n// payload for mongodb4 node\nmsg.payload = [query, options];\nreturn msg;\n",
    "outputs": 1,
    "timeout": 0,
    "noerr": 0,
    "initialize": "",
    "finalize": "",
    "libs": [],
    "x": 350,
    "y": 180,
    "wires": [
        [
            "9f451ce54d300234"
        ]
    ]
},
{
    "id": "9f451ce54d300234",
    "type": "mongodb4",
    "z": "36ac58d44b0a6fbb",
    "clientNode": "ceb5e2fe4fbc5cac",
    "mode": "collection",
    "collection": "M3",
    "operation": "find",
    "output": "forEach",
    "maxTimeMS": "0",
    "handleDocId": false,
    "name": "find",
    "x": 510,
    "y": 180,
    "wires": [
        [
            "b455d65d1ab4bd1d",
            "7fda93745a6cc2dc"
        ]
    ]
},
{
    "id": "b455d65d1ab4bd1d",
    "type": "debug",
    "z": "36ac58d44b0a6fbb",
    "name": "debug 10",
    "active": true,
    "tosidebar": true,
    "console": false,
    "tostatus": false,
    "complete": "true",
    "targetType": "full",
    "statusVal": "",
    "statusType": "auto",
    "x": 700,
    "y": 180,
    "wires": []
},
{
    "id": "7fda93745a6cc2dc",
    "type": "function",
    "z": "36ac58d44b0a6fbb",
    "name": "function 4",
    "func": "msg.payload = msg.payload.y;\nreturn msg;",
    "outputs": 1,
    "timeout": 0,
    "noerr": 0,
    "initialize": "",
    "finalize": "",
    "libs": [],
    "x": 660,
    "y": 300,
    "wires": [
        [
            "32d6ada36d21c1f6"
        ]
    ]
},
{
    "id": "32d6ada36d21c1f6",
    "type": "debug",
    "z": "36ac58d44b0a6fbb",
    "name": "debug 11",
    "active": false,
    "tosidebar": true,
    "console": false,
    "tostatus": false,
    "complete": "true",
    "targetType": "full",
    "statusVal": "",
    "statusType": "auto",
    "x": 820,
    "y": 300,
    "wires": []
},
{
    "id": "ceb5e2fe4fbc5cac",
    "type": "mongodb4-client",
    "name": "Test_127",
    "protocol": "mongodb",
    "hostname": "127.0.0.1",
    "port": "27017",
    "dbName": "IoT",
    "appName": "",
    "authSource": "",
    "authMechanism": "DEFAULT",
    "tls": false,
    "tlsCAFile": "",
    "tlsCertificateKeyFile": "",
    "tlsInsecure": false,
    "connectTimeoutMS": "",
    "socketTimeoutMS": "",
    "minPoolSize": "",
    "maxPoolSize": "",
    "maxIdleTimeMS": "",
    "uri": "",
    "advanced": "",
    "uriTabActive": "tab-uri-simple"
}
]
Download: File

mqtt-py3-MongoDB-NR

In der nächsten Stufe soll die Analytic Services dazukommen. Die Schichten 1 bis 3 wurden bereits vollständig umgesetzt. Wird nun der Fokus im Entwicklungsprozess auf die Prozess-Simulation in Anlehnung an den realen Prozess gerichtet, dann können die simulierten Prozessdaten hierzu direkt in die Datenbank geschrieben werden (Grün markierter Bereich 1,2 +3).

mqtt_py2_mDB_NR

mqtt + Python + MongoDB + pyAnalytics + NodeRed

Diese Variante zeigt das Skript

client

PY-simProz_mDB_NR\sim__mdb_prozSig_rt_01.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
    # -*- coding: utf-8 -*-
    """
    @author: jbechtloff
    Vorgabe: tsA, tsE, Timedelta

    Zusammenführen von  sim_sig_design_03.py &
                        sim_mdb_struc_03.py
    Generieren + Realtime                    
    """

    from pymongo import MongoClient
    import numpy as np
    import pandas as pd
    from PT1 import PT1

    import os
    from apscheduler.schedulers.blocking import BlockingScheduler


    class clProzess:
        def __init__(self,  T, T1, Kdrift, Tdrift, Kw, col):
            self.pt1a = PT1(T,T1)
            self.pt1b = PT1(T,T1)
            self.T = T
            self.Kdrift = Kdrift
            self.Tdrift = Tdrift
            self.Kw = Kw
            self.col = col
            self.cnt = 0

        def simProz(self):
            k = self.cnt
            u_1 = np.sin(4*k/100*np.pi)
            if u_1 <0: u_1=0
            if u_1 > 0.5: u_1 = 0.5
            u_1 = u_1*2.0

            # Amplituden-Drift-Simu: 1.0 < u2 < 1.1 (MUL)
            a_Drift = 1.0 + self.Kdrift * (np.sin(4*k*self.T/self.Tdrift*np.pi)+1.0)/2.0
            errFl = 0
            if a_Drift > 1.0+19*self.Kdrift/20: errFl = 1.0  # noqa: E701
            u_2 = self.pt1b.PT1rek(self.pt1a.PT1rek(u_1*a_Drift))  # PT2-Verschleiß
            w = np.random.randn()*self.Kw

            self.cnt +=1
            return(u_1, u_2, a_Drift, w, errFl)

        def write_mdb(self, ts):
            [u_1, u_2, a_Drift, wi, err_Flg] = self.simProz()

            data = {
                'ts': ts,
                'cnt': self.cnt,
                'u': u_2 + wi,
                'err': err_Flg
            }
            result = self.col.insert_one(data)
            print(self.cnt)

        def write_mdb_now(self):
            self.write_mdb(pd.Timestamp.now())


    if __name__ == "__main__":
        client = MongoClient('localhost', 27017)
        db = client['IoT']
        col = db["m"]
        col.drop()  # Alte Collection LÖSCHEN !!!

        Prozess = clProzess(T=1,T1=4,
                        Kdrift=0.1, 
                        Tdrift=2000, 
                        Kw=0.01,
                        col = col)



        Tscal = {"s":1,"m":60,"h":60*60, "d":60*60*24}
        dT = 1
        dT_unit = "s"

        sched = BlockingScheduler({'apscheduler.timezone': 'UTC'})
        sched.add_job(Prozess.write_mdb_now,'interval',seconds=Tscal[dT_unit]*dT,id='runJob1') 

        # tsA = pd.Timestamp(2021,11, 20,  0,0,0)
        # tsE = pd.Timestamp(2021,11, 21,  10,0,0)
        # tsDeltaAE = tsE - tsA

        tsE = pd.Timestamp.now()  # now
        tsA = tsE - pd.Timedelta(1,"h")


        ts = tsA
        while ts <= tsE: 
            Prozess.write_mdb(ts)
            ts = ts + pd.Timedelta(dT,dT_unit)


        print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

        try:
            sched.start()
        except (KeyboardInterrupt, SystemExit):
            pass                
Download: File

In diesem Python-Skript werden mehrere simulationsspezifische Teillösungen verwendet:

  • Zeitzyklische (periodische Task --> APScheduler) Kapitel
  • Simulationsmodell Kapitel

Steuerung der Simulation

Dieser Simulation basiert auf der Idee, dass die Simulation in zwei zeitlich aufeinander folgende Phasen aufgeteilt wird:

  1. Berechnung und Speicherung von Prozessdaten "as quick as possible": Dieser Vorlauf generiert historische Datensätze mit tsA = tsE - pd.Timedelta(1,"h") (Zeile 90)(hier 1 Stunde)
  2. Ist der Zeitpunkt now()erreicht, läuft die Simulation in Echtzeit mit der Abtastzeit ts = ts + pd.Timedelta(dT,T_unit) (Zeile 97) weiter, bis das Skript mit Ctrl+C abgebrochen wird.

sim_2Phasen

Zwei Phasen der Simulation

Ausschnitt: Phase 1+2
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
tsE = pd.Timestamp.now()  # now
tsA = tsE - pd.Timedelta(1,"h")
# tsDeltaAE = tsE - tsA

ts = tsA
# Phase 1 ---------------------------------------------
while ts <= tsE: 
    Prozess.write_mdb(ts) # (1)
    ts = ts + pd.Timedelta(dT,T_unit)


print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

# Phase 2 ---------------------------------------------
try:
    sched.start() # (2)
except (KeyboardInterrupt, SystemExit):
    pass                

  1. Tranfer in MongoDB mit berechnetem Timestamp ts
  2. Tranfer in MongoDB mit "echtem" Timestamp pd.Timestamp.now() in function write_mdb_now()

Info

Folgende Anpassungen müssen vorgenommen werden:

  1. MongoDB: IP & Port (client = MongoClient('localhost', 27017))
  2. MongoDB: Datenbank (db = client['IoT'])
  3. MongoDB: Collection (col = db["m"])
  4. Simulationsvorlauf: tsA = tsE - pd.Timedelta(1,"h")

Analytic-Services

Die Analytic-Services (Datenanalyse-Prozesse) wurden in den vorangegangen Beispielen in NodeRed realisiert. Um sie als SmartServices umzusetzen, sollen sie nun als eigenständige Python-Programme realisiert werden.

Aufgabe 1: Zyklische Analyse

Die erste Aufgabe besteht in der zyklischen Berechunng statistischer Kennwerte einer Zeitreihe. Der Simulationsprozess liefert das Signal \(u(t)\). Die Berechnung des Mittelwertes:

\[\overline{u} = \frac{1}{{n + 1}}\sum\limits_{j = 0}^n {{u_j}} \]

und der Standardabweichung

\[{\sigma _n} = \sqrt {\frac{1}{n}\sum\limits_{j = 0}^n {{{\left( {{u_j} - \overline {u} } \right)}^2}} } \]

lassen sich mit Hilfe der Methoden mean() und std() eines pandas-Dataframes berechnen:

#  Analyse-Operationen
uMean = u.mean()
uStd = u.std()
errCnt = err.sum()

Die Methode sum()berechnet die Summe der Fehler-Ereignisse.

Datenflow

Die Datenverarbeitung dieses Analyseprozesses läuft in drei Schritten ab:

ZyklAnalytic

Zyklische Analyse

Für den zyklischen Prozess wird hier wieder das Pakage APScheduler eingesetzt APScheduler

Konfiguration

Hier wird die Konfiguration, wie IP-Adresse, Datenbank und Collection etc. durch die toml-Datei cfg.tomlvorgenommen:

Toml

PY-simProz_mDB_NR\cfg.toml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    [mdb]  # MongoDB-Parameter
    host = "127.0.0.1"
    db = 'IoT'
    colRaw = "m"
    colErg = "a"


    [analytic]
    dT = 10
    dT_unit = "s"

    # Spec.: https://toml.io/en/
Download: File

Gesamt-Script
Python Analytic

PY-simProz_mDB_NR\analytic_mDB_01.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
    #  Analytic-mDB_test
    '''
    letzte Änderung: 07.05.2025
    '''
    from pymongo import MongoClient 
    import pandas as pd
    import datetime as dt
    import tomllib 

    import os
    from apscheduler.schedulers.blocking import BlockingScheduler

    #  ===========================================================================================

    class clAnalytic:
        def __init__(self, colRaw, colErg, DTs):
            self.colRaw = colRaw
            self.colErg = colErg
            self.DTs = DTs  # Abtastung in s
            self.cnt = 0
            # ------------------------------------------------------------------------------------

        def calcProz(self):
            # Zeitfenster lesen
            tsE = dt.datetime.now() # (1)
            tsA = tsE - pd.Timedelta(self.DTs,"s")

            print(self.cnt,'*    tsA: ',tsA, '    tsE: ',tsE)

            uMean = 0
            errCnt = 0
            df_F = pd.DataFrame(list(self.colRaw.find({"ts":{"$lte":tsE, "$gte":tsA}}).sort("_id",-1))) # (2)
            if df_F.size > 0:
                # ts = df_F.loc[:,'ts']
                u = df_F.loc[:,'u']
                err = df_F.loc[:,'err']

                #  Analyse-Operationen
                uMean = u.mean()
                uStd = u.std()
                errCnt = err.sum()
            self.cnt +=1
            return(uMean, uStd, errCnt)
            # ------------------------------------------------------------------------------------

        def write_mdb_now(self):
            [uMean, uStd, errCnt] = self.calcProz()

            data = {
                'ts': dt.datetime.now(),
                'cnt': self.cnt,
                'uMean' : float(uMean),
                'uStd' : float(uStd),
                'errCnt' : int(errCnt)
            }
            self.colErg.insert_one(data) # (3)
            # ------------------------------------------------------------------------------------


    if __name__ == "__main__":
        # Parametrierung über cfg.toml (https://toml.io/en/)
        with open("cfg.toml", "rb") as f:
            cfg = tomllib.load(f)

        client = MongoClient(cfg["mdb"]["host"], 27017)
        db = client[cfg["mdb"]["db"]]
        colRaw = db[cfg["mdb"]["colRaw"]]
        colErg = db[cfg["mdb"]["colErg"]]
        colErg.drop()  # Alte Collection LÖSCHEN !!! # (4)

        Tscal = {"s":1,"m":60,"h":60*60, "d":60*60*24}
        # Zeitintervall der Analyse ---------------------------------------
        dT = cfg["analytic"]["dT"]
        dT_unit = cfg["analytic"]["dT_unit"]
        # -----------------------------------------------------------------

        AProz = clAnalytic(colRaw = colRaw, colErg=colErg, DTs=Tscal[dT_unit]*dT)

        sched = BlockingScheduler({'apscheduler.timezone': 'UTC'})
        sched.add_job(AProz.write_mdb_now,'interval',seconds=Tscal[dT_unit]*dT,id='runAJob1') # (5)

        print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

        try:
            sched.start() # (6)
        except (KeyboardInterrupt, SystemExit):
            pass                

  1. Event-Ereigniss
  2. MongoDB-Abfrage auf das Zeitfenster t_sA > t > t_sE
  3. Speichern in der MongoDB
  4. Für Entwicklungszwecke !!
  5. Erzeugen des Schedulers
  6. Starten des zyklischen Analytic-Jobs

Download: File

Dashboard-Service

In der Abbildung ist ein NodeRed-Visualisierug vorgesehen. Diese kann wie bereits gezeigt den Datenbankzugriff auf die MongoDB vornehmen und die Dahboard-Widgets bereitstellen. Alternativ soll hier in einem zweiten Schritt eine Lösung mittels Proxy-Server aufgezeigt werden.

Node-Red (Visu u. mDB)

Node-Red-Visu und py-Proxy

Aufteilung und Systematisierung der Frontend- und Backend-Entwicklung.

Backend: Python - FastApi

Datenbank-Zugriffe und Analytic Services lassen sich deutlich besser in Python umsetzen. Die Schnittstelle zu Node-Red kann entweder über Message-Routine via MQTT oder Endpoints einer RestApi realisiert werden. Beide Varianten lassen sich sehr einfach realisieren und gut debuggen.

RestApi mit FastApi

Rumpf-Skript:

Python Backend

py_nr/fapi_1.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
    # --------------------------------------------------------------------------------
    # RestApi-Beispiel für die Verwendung mit Node-Red "http-Node"
    # (c) 2025 Prof. Dr. J. Bechtloff
    # --------------------------------------------------------------------------------
    from fastapi import FastAPI
    import uvicorn  # (1)
    # --------------------------------------------------------------------------------
    app = FastAPI()
    # --------------------------------------------------------------------------------

    @app.get("/") # (2)
    async def root():
        return {"message": "Hello World"}

    @app.post("/sfcn") # (3)
    async def sfcn(payload:dict={}):
        print(payload)
        return {"payload": {"sin": 12.34, "cos":0.231}}

    # --------------------------------------------------------------------------------
    if __name__ == "__main__":
        uvicorn.run(app, host="0.0.0.0", port=8001) # (4)

  1. Webserver (muss vorher installiert worden sein uvicorn)
  2. Root-Endpoint (zum Testen)
  3. Endpoint für NodeRed-http-request
  4. Webservice starten mit uvicorn

Download: File Download: File

Getestet werden der Server mit Hilfe des Tools curl(1):

  1. curl ist ein Konndozeilen-Tool, dass unterschiedlichste Protokolle beherrscht. Link
curl 127.0.0.1:8001/sfcn -X POST

In Node-Red werden der Node "http request" eingesetzt:

nr-Flow 1

Der erste Request zeigt einen einfache Get-Aufruf:

nr-Dlg_Get

Er zeigt direkt auf den root-Endpoint, der das Message-json-Objekt {"message": "Hello World"} zurückliefert.

Der zweite Request demonstriert einen Post-Request. Der NR-Client sendet hier ein json-Objekt an den Server. Dieser antwortet ebenfalls mit einem json-Objekt:

nr-Dlg_Post

Er zeigt auf den Endpoint @app.post("/sfcn"), der das Message-json-Objekt {"payload": {"sin": 12.34, "cos":0.231}} zurückliefert.

Der große Vorteil von FastApi ist die Möglichkeit, die API-Schnittstelle automatisch zu dokumentieren und testen zu können:

    127.0.0.:8001/docs

restapi-docs

Wird ein Post-Request abgesetzt,

restapi-docs_resp

Dann erhält man direkt das json-Objekt der Antwort.