Beispiel-Projekte
In diesem Kapitel werden anhand der IoT-Szenarien und des IoT-Architekturmodells Übungs- und Umsetzungsbeispiele behandelt.
Sprachen für eine IoT-Plattform
Auf den rotumrandeten Bereich liegt der Fokus!
In der Regel werden die Projekte mit Hilfe einer Simulation umgesetzt. Diese kann in allen ebenen der IoT-Architektur eingreifen:
Allgemeines Konzept zur Einbindung von Simulationen entlang der IoT-Plattform-Architektur
Signale generieren mit Python
Erster Gehversuch mit VS Code und Python, um eine Datenquelle zur Verfügung zu haben.
# Bibliotheken importieren
import numpy as np
import time
…
# Parameter initialisieren
n = 10
x = np.zeros(n)
# Berechnungen
for i in range(n):
x[i] = np.sin(i/1.23)
# Ausgabe
print(f’Ergebniss: {x}mm’)
Weitere Aufgaben:
- Rechteck-Generator
- PT1-Prozessverhalten (ideal)
-
- Stochastische Störungen
-
- Drift
- Random Walk
Skripte
Datei | Link |
---|---|
squareSignal.py | File |
sinusSignal.py | File |
RandomWalk.py | File |
mqtt + node-RED
Mit mqtt sollen hier Daten aus der Ebene (1) in die Ebene "Smart Services" (8) übertragen werden. Die Ereignisorientierung von mqtt kann untersucht werden.
mqtt + Python + Node-Red
Software-Voraussetzungen
Info
Software | Link |
---|---|
mqtt-Broker "Mosquitto" | Installation |
mqtt-Explorer | Installation |
Python | Installation |
VS Code | Installation |
Node-Red | Installation |
Aufgaben
- Publisher mit Python (
mqpub.py
) - mqtt-Explorer ->
msg-Object
untersuchen - Subscriber mit Python (
mqsub.py
) - Node-Flow entwickeln
Skripte
Multi-Topics
'''
Einfacher Publisher
=============================
client.publish (sequentielles Publish) und
publish.multiple(msgs,...)
veröffentlichen jeweils "Einzel"-Topic, die vom Subscriper sequentiell empfangen werden
=> Dieses Skript setzt "subscriber_2.py" voraus
'''
import time
import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
import numpy as np
# --------------------------------------------------------------
# Parameterblock
mq_host = "127.0.0.1" # lokaler PC
# mq_host = "172.18.45.150" # Labor-Server
mq_topic = "test_172" # letzte IP-Adr-Nr
# --------------------------------------------------------------
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client = mqtt.Client()
client.on_connect = on_connect
client.connect(mq_host, 1883, 60)
client.loop_start()
t = 0.0
while True:
time.sleep(2)
t = t + 0.01
# client.publish(f"{mq_topic}/temperature", t)
# client.publish(f"{mq_topic}/sinus", np.sin(t))
msgs = [(f"{mq_topic}/cos",np.cos(t)),
(f"{mq_topic}/cos2",np.cos(t)*2),
(f"{mq_topic}/ts",time.time())]
publish.multiple(msgs, hostname=mq_host)
print(msgs)
'''
Einfacher Abonnent
======================
mit jedem abonnierten Topic wird die on_message-funktion aufgerufen
(globaler Zähler cnt)
Dieses Skript setzt den Publisher "sensor_multiple_2.py" voraus
https://pypi.org/project/paho-mqtt/#description
'''
import paho.mqtt.client as mqtt
# --------------------------------------------------------------
# Parameterblock
mq_host = "127.0.0.1" # lokaler PC
# mq_host = "172.18.45.150" # Labor-Server
mq_topic = "test_172" # letzte IP-Adr-Nr
# --------------------------------------------------------------
cnt = 0
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client.subscribe(mq_topic+"/#")
def on_message(client, userdata, msg):
global cnt
cnt +=1
print(cnt, ' ',msg.topic + " " + str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(mq_host, 1883, 60)
client.loop_forever()
json-Objects
'''
JSON-Publisher
=============================
client.publish (sequentielles Publish) und
publish.multiple(msgs,...)
veröffentlicht ein "Sammel"-Topic mit client.publish(), das vom Subscriper
als JSON-Objekt empfangen wird.
=> Dieses Skript setzt "subscriber_3_json.py" oder
"subscriber_3_json_mDB_2.py" voraus
'''
import time
import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
import numpy as np
import json
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
# Parameterblock
mq_host = "127.0.0.1" # lokaler PC
# mq_host = "172.18.45.150" # Labor-Server
mq_topic = "test_172" # letzte IP-Adr-Nr
client = mqtt.Client()
client.on_connect = on_connect
client.connect(mq_host, 1883, 60)
client.loop_start()
t = 0.0
while True:
time.sleep(1)
t = t + 0.01
# python-dict beschreibt den Daten-Block (payload)
PL = {"cos":np.cos(t),
"cos2":np.cos(t)*2,
"ts":time.time()
}
# Umwandeln in ein json-Objekt
jPL = json.dumps(PL)
# versenden (publish)
client.publish(topic=mq_topic, payload = jPL)
print(jPL)
#!/usr/bin/env python
'''
https://pypi.org/project/paho-mqtt/#description
'''
import paho.mqtt.client as mqtt
import json
cnt = 0
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client.subscribe("sen/#")
client.subscribe("test_172")
def on_message(client, userdata, msg):
global cnt
cnt +=1
# print(cnt, ' ',msg.topic + " " + str(msg.payload))
data = json.loads(msg.payload)
# print(cnt, ' ',data)
print(cnt, ' ',data["ts"])
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("localhost", 1883, 60)
client.loop_forever()
subscriber_3_json.py
mqtt + Node-RED + MongoDB
Durch die Schicht (3) Datenhaltung lassen sich auch Historie-Daten für Analysezwecke nutzen.
mqtt + Python + MongoDB + Node-Red
Software-Voraussetzungen
Info
Software | Link |
---|---|
Mongo-DB (inkl. Compass) | Installation |
Aufgaben
mq_mdb_client.py
->PY-sim_mqtt_mdb_NR/subscriber_3_json_mDB_2.py
anpassen und startensensor_json_3.py
->PY-sim_mqtt_mdb_NR/sensor_json_3_2.py
anpassen und separat starten (Konsole, ...)- mdb-Explorer auf mDB setzen
- Node-Red-Flows mit mDB-Verbindungen entwickeln
Skripte
pub / sub
'''
IoT-Schicht 1
- Publishd mqtt-basierte Daten (simulierter Prozess)
letzte Änderung: 27.4.2023
Doku:
https://pypi.org/project/paho-mqtt/#description
'''
import time
import paho.mqtt.client as mqtt
import numpy as np
import json
import datetime
# --------------------------------------------------------------
# Parameterblock
mq_host = "127.0.0.1" # lokaler PC
# mq_host = "172.18.45.150" # Labor-Server
mq_topic = "test_172" # letzte IP-Adr-Nr
# --------------------------------------------------------------
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
# __main__ ======================================================
client = mqtt.Client()
client.on_connect = on_connect
client.connect(mq_host, 1883, 60)
client.loop_start()
# Generieren von simulierten Daten -----------
cnt = 0
while True:
time.sleep(1)
cnt += 10
ts = datetime.datetime.now()
# python-dict beschreibt den Daten-Block (payload)
PL = {"t": time.time(), # unix-Zeit
"ts": str(ts), # utf-Zeit
"cnt": cnt,
"v1": np.random.normal(),
"v2":np.cos(cnt/10)*2,
"v3": np.cos(cnt/20)*200
}
# Umwandeln in ein json-Objekt
jPL = json.dumps(PL)
# versenden (publish)
client.publish(topic=mq_topic, payload = jPL )
print(PL)
'''
IoT-Schicht 2-3
- subribed mqtt-basierte json-Daten
- schreibt die Daten in die MongoDB
=> setzt das Skript "sensor_json_3.py" voraus (schreibt als payload ein json-Objekt).
letzte Änderung: 27.4.2023
Doku:
https://pypi.org/project/paho-mqtt/#description
https://pymongo.readthedocs.io/en/stable/
'''
import paho.mqtt.client as mqtt
from pymongo import MongoClient
import json
# --------------------------------------------------------------
# Parameterblock
mdb_mq_host = "127.0.0.1" # lokaler PC
# mdb_mq_host = "172.18.45.150" # Labor-Server
mdb_db = "test"
mdb_col = "M_172" # letzte IP-Adr-Nr
mq_topic = "test_172" # letzte IP-Adr-Nr
# --------------------------------------------------------------
cnt = 0
class mqClient():
def __init__(self,col):
self.col = col
def on_connect(self,client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client.subscribe("test_172")
def on_message(self,client, userdata, msg):
global cnt
cnt +=1
print(cnt, ' ',msg.topic + " " + str(msg.payload))
# payload enthält ein json-Objekt mit allen gesendeten Datenwerten {key1:value1, key2:value2}
data = json.loads(msg.payload)
print(data)
# Schreiben in die MongoDB
result = self.col.insert_one(data)
# __main__ ======================================================
mo_client = MongoClient(mdb_mq_host, 27017)
db = mo_client[mdb_db]
col = db[mdb_col]
# col.drop() # Löschen der bereits erzeugten Collection (ggf. auskommentieren...)
mqC = mqClient(col)
client = mqtt.Client()
client.on_connect = mqC.on_connect
client.on_message = mqC.on_message
client.connect(mdb_mq_host, 1883, 60)
client.loop_forever()
mqtt + py3 + MongoDB + NR
In der nächsten Stufe soll die Analytic Services dazukommen. Die Schichten 1 bis 3 wurden bereits vollständig umgesetzt. Wird nun der Fokus im Entwicklungsprozess auf die Prozess-Simulation in Anlehnung an den realen Prozess gerichtet, dann können die simulierten Prozessdaten hierzu direkt in die Datenbank geschrieben werden (Grün markierter Bereich 1,2 +3).
mqtt + Python + MongoDB + pyAnalytics + Node-Red
Diese Variante zeigt das Skript
client
# -*- coding: utf-8 -*-
"""
@author: jbechtloff
Vorgabe: tsA, tsE, Timedelta
Zusammenführen von sim_sig_design_03.py &
sim_mdb_struc_03.py
Generieren + Realtime
"""
from pymongo import MongoClient
import numpy as np
import pandas as pd
from PT1 import PT1
import time
import os
from apscheduler.schedulers.blocking import BlockingScheduler
class clProzess:
def __init__(self, T, T1, Kdrift, Tdrift, Kw, col):
self.pt1a = PT1(T,T1)
self.pt1b = PT1(T,T1)
self.T = T
self.Kdrift = Kdrift
self.Tdrift = Tdrift
self.Kw = Kw
self.col = col
self.cnt = 0
def simProz(self):
k = self.cnt
u_1 = np.sin(4*k/100*np.pi)
if u_1 <0: u_1=0
if u_1 > 0.5: u_1 = 0.5
u_1 = u_1*2.0
# Amplituden-Drift-Simu: 1.0 < u2 < 1.1 (MUL)
a_Drift = 1.0 + self.Kdrift * (np.sin(4*k*self.T/self.Tdrift*np.pi)+1.0)/2.0
errFl = 0
if a_Drift > 1.0+19*self.Kdrift/20: errFl = 1.0
u_2 = self.pt1b.PT1rek(self.pt1a.PT1rek(u_1*a_Drift)) # PT2-Verschleiß
w = np.random.randn()*self.Kw
self.cnt +=1
return(u_1, u_2, a_Drift, w, errFl)
def write_mdb(self, ts):
[u_1, u_2, a_Drift, wi, err_Flg] = self.simProz()
data = {
'ts': ts,
'cnt': self.cnt,
'u': u_2 + wi,
'err': err_Flg
}
result = self.col.insert_one(data)
print("%s" % (self.cnt))
def write_mdb_now(self):
self.write_mdb(pd.Timestamp.now())
if __name__ == "__main__":
client = MongoClient('localhost', 27017)
db = client['IoT']
col = db["m"]
col.drop() # Alte Collection LÖSCHEN !!!
Prozess = clProzess(T=1,T1=4,
Kdrift=0.1,
Tdrift=2000,
Kw=0.01,
col = col)
Tscal = {"s":1,"m":60,"h":60*60, "d":60*60*24}
dT = 1
T_unit = "s"
sched = BlockingScheduler({'apscheduler.timezone': 'UTC'})
sched.add_job(Prozess.write_mdb_now,'interval',seconds=Tscal[T_unit]*dT,id='runJob1')
# tsA = pd.Timestamp(2021,11, 20, 0,0,0)
# tsE = pd.Timestamp(2021,11, 21, 10,0,0)
tsE = pd.Timestamp.now() # now
tsA = tsE - pd.Timedelta(1,"h")
# tsDeltaAE = tsE - tsA
ts = tsA
while ts <= tsE:
Prozess.write_mdb(ts)
ts = ts + pd.Timedelta(dT,T_unit)
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
sched.start()
except (KeyboardInterrupt, SystemExit):
pass
In diesem Python-Skript werden mehrere simulationsspezifische Teillösungen verwendet:
Info
Folgende Anpassungen müssen vorgenommen werden:
- MongoDB: IP & Port (
client = MongoClient('localhost', 27017)
) - MongoDB: Datenbank (
db = client['IoT']
) - MongoDB: Collection (
col = db["m"]
) - Simulationsvorlauf:
tsA = tsE - pd.Timedelta(1,"h")