Zum Inhalt

Software-Umsetzung

SimPro besteht aus dem Generator der Zeitreihe, der in eine Anwendungsschale eingebunden wird. Der Generator verarbeitet bei seiner Klasseninstanziierung das Konfigurations-Dict aus der toml-Datei.

Instanziierung

Der Generator wird in der Datei simLib_01.py bereitgestellt.

simLib_01.py
1
2
3
4
5
6
7
from benedict import benedict  # zum vereinfachten Handling von dict
import simLib_01 as simLib

# Einlesen der toml-Konfigurationsdatei
d = benedict.from_toml('prz_2plt.toml', keyattr_dynamic=True)

bProz = simLib.BatchProz(cfg=d,T=1)

Die class BatchProz() erwartet die Parameter:

  • T : Abtastzeit in Sekunden
  • cfg : Dict

Interpolation der Zeitreihen

Der Aufruf von genTS() berechnet einen Zeitschritt. Beim nächsten Aufruf wird automatisch der Zeitschritt weitergetaktet, was die Verwendung stark vereinfacht, aber eben auch beachtet werden muss.

Rückgabewerte

  • t0 Zeitwert im Batch [s]
  • tG Zeitwert seit Simulationsstart [s]
  • cntG Batch-Zyklen-Zähler seit Simulationsstart
  • y Liste der Simulationswerte

Test-Umgebung PLOT

Für die Entwicklung eines Simulationsmodells ist es hilfreich, für eine vorgegebene Anzahl von Zeitschritten eine grafische Ausgabe zu erzeugen. Mit dem Skript przSIM_plt_0x.py steht eine sehr einfache Anwendung von SimPro zur Verfügung.

Im wesentlichen werden die N=... Werte in einer Schleife mit genTS() berechnet und in ein numpy-Array abgelegt.

Ausschnitt: przSIM_plt_.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Generierung der Zeitreihen -----------------------------
for k in range(N):
    t0, tG, cntG, y = bProz.genTS() 
    tpl[k]=tG
    cntGpl[k] = (1 if cntG>cnt_km1 else 0)  # Peek-Signal für Zyklusende/-anfang
    cnt_km1=cntG
    # Datenmatrix belegen
    ypl = np.hstack((ypl,y))

m = len(bProz.xLst)

# Plot-Ausgabe --------------------------------------------
fig, axs = plt.subplots(m, 1, layout='constrained')

y = ypl.reshape(N,m).T
for i, b in zip(range(m), bProz.xLst):
    axs[i].plot(tpl, y[i])
    axs[i].plot(tpl, cntGpl*y[i].max())
    axs[i].set_xlabel('t')
    axs[i].set_ylabel(b.name)
    axs[i].grid(True)

plt.show()

Der Zugriff auf die Klassenvariable xLst ermöglicht den Zugriff auf Eigenschaften der einzelnen Prozesszustandsgrößen, wie sie in der toml-Datei vorgegeben wurden: - name: Name der Größe (Bezeichner im Sinne der Eindeutigung einer Variablen) - meta.unit: Einheit - meta.min: Minimum - meta.max: Maximum

Realtime-Umsetzung

Die Simulation soll als "Pseudo"-Prozess bei der Software-Entwicklung kontinuierlich Daten liefern. Diese Daten werden entweder in Datenbanken gespeichert oder Dashboard-Apps zur Verfügung gestellt. Daher ist ein Zeitstempel eine wichtige Information. In Python gibt es verschiedene Möglichkeiten, einen zeitzyklischen Rechenprozess zu realisieren. Der einfachste Weg mit Boardmitteln nutzt die Funktion time.sleep():

time.sleep()
1
2
3
4
5
6
import time
from datetime import datetime

while True:
    print(datetime.now())
    time.sleep(2)   # 2 Sekunden Pause...

Allerdings hat diese Lösung den Nachteil, dass die Abtastzeit rechenzeitabhängig schwankt. Daher empfiehlt sich eine spezielle thread-orientierte Lösung einzusetzen: APscheduler siehe pypi

Die nachfolgendne Lösungen basieren auf dem Release 3.10. Eine minimale Anwendung zeigt das nachfolgende Listing:

apscheduler
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import os
from datetime import datetime

from apscheduler.schedulers.blocking import BlockingScheduler
# -----------------

def test_job():
    print(datetime.now())
# ------------------------------------------------------------------

sched = BlockingScheduler({'apscheduler.timezone': 'UTC'})
sched.add_job(test_job, 'interval', seconds=2, id='run_testjob')

print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

try:
    sched.start()
except (KeyboardInterrupt, SystemExit):
    pass

Info

In der Version 4 wird es eine Reihe von Veränderungen und Erweiterungen geben, die in das Projekt eingearbeitet werden.

Kommunikationsschnittstellen

Hier sollen die für die IoT-Anwendungen im industriellen Umfeld relevanten Kommunikationsschnittstellen realisiert werden. Auf eine ausführliche Einführung in die Kommunikationskonzepte wird hier nicht eingegangen.

MQTT

MQTT setzt einen Broker voraus. Dieser wickelt den gesamten Datenverkehr ab. MQTT basiert auf dem Publish-Subscriber-Konzept. Diese Realisierung basiert auf dem paho-mqtt Paket für Python, das eine sehr einfache und effiziente Implementierung ermöglicht. Als Broker hat sich der Mosquito Broker bewert. Als Tool Zur Beobachtung des MQTT-Datenverkehrs auf dem Broker setzen wir den MQTT-Explorer ein.

Das Beispielskript published das gesamte json-Objekt. Das hat den Vorteil, dass dieses json-Objekt mit einem Timestamp versehen als Paket bzw. Message über den Broker von einem Client subscribt werden kann. So kann das json-Objekt direkt mit seiner Struktur in einer dokumentenorientierte Datenbank abgelegt werden kann.

Simulationsfunktion und Publisher
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import paho.mqtt.client as mqtt
...
mq_topic = "test_172"
...
def simProz():
    t0, tG, cntG, y0 = bProz.genTS() 
    ts = datetime.now()     # timestamp
    print( f'ts= {ts},    y= {y0}')            

    PL = {"ts": str(ts),   # utf-Zeit
            "cnt": cntG,
        }
    for i, p in zip(range(bProz.m-1), bProz.xLst):
        PL.update({p.name : y0[i]})
    # Umwandeln in EIN json-Objekt
    jPL = json.dumps(PL)
    # versenden (publish)
    mq_client.publish(topic=mq_topic, payload = jPL )

Mit dem MQTT-Explorer läßt sich der Datenverkehr im Broker gut beobachten:

MQTT-Explorer

OPC-UA

Mit der industriellen Kommunikation OPC-UA steht eine leistungsfähige Kommunikationsmechanismus zur Verfügung. Als Demo wird hier ein sehr einfacher OPC-UA Server realisiert (Ohne method, events, ...), der auf dem FreeOpcUa basiert. Die Bibliothek asyncua stellt sowohl asynchrone als auch synchrone Kommunikationsmechanismen zur Verfügung. Die Implementierung lässt sich genau wie die MQTT-Variante umsetzen. Der Echtzeit-Job übergibt die Simulationsdaten an den OpcUa-Server mv.write_value(y0[i]). Die Anfragen der externen Clients bearbeitet der OpcUa-Server dann vollständig selbständig.

Initialisierung der OpcUa-Kommunikation
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
...
from asyncua.sync import Server, ua
...
# setup Server
server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:4842/freeopcua/server/")

# setup namespace
uri = "http://examples.freeopcua.github.io"
self.idx = server.register_namespace(uri)
# address space
self.simProzObj = server.nodes.objects.add_object(self.idx, "simProz")
self.uaVar_ts  = self.simProzObj.add_variable(ua.NodeId.from_string(f'ns={self.idx};s=ts'),'ts','')
self.uaVar_cnt = self.simProzObj.add_variable(ua.NodeId.from_string(f'ns={self.idx};s=cnt'),'cnt',0)            
self.uaVarLst = []
for p in self.bProz.xLst:
    s = f'ns={self.idx};s={p.name}'
    self.uaVarLst.append(self.simProzObj.add_variable(ua.NodeId.from_string(s),p.name,0.0))   
# start Server             
server.start()
...

Simulationsdaten an den OpcUa-Server:

Daten an OpcUa-Server
1
2
3
4
5
6
7
...
self.uaVar_ts.write_value(ts.isoformat())
self.uaVar_cnt.write_value(cntG)

for i, mv in zip(range(self.bProz.m-1),self.uaVarLst):
    mv.write_value(y0[i])
...

Die Funktion des OpcUa-Servers lässt sich mit dem Tool UA Expert interaktiv visualisieren, wie der Screenshot zeigt:

UaExpert

RESTapi

Die RESTapi oder RESTful API (Application Programming Interface) oder Web-API, unterliegt den Beschränkungen der REST-Architektur und ermöglicht Interaktionen mit RESTful Webservices. In diesem Beispiel werden 2 Endpoints realisiert. Der erste Endpoint meta wird die Metadaten der Simulationsstruktur an den Client ausliefern. Der Endpoint data wird die Value's zu den einzelnen Signalen mit den Timestamp zusammen ausliefern. Die Realisierung hier wird mit dem Framework Fast API umgesetzt. Als Serverdienst wird uvicorn eingesetzt.

Sim-Server

Die Simulation wird wie bisher auch als Echtzeitprozess gestartet und hält die Simulationsergebnisse als Properties.

SimProz
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
...
class SimProz():

    def __init__(self,bProz) -> None:
        self.PL = {}
        self.bProz = bProz

    def sim(self):
        t0, tG, cntG, y0 = self.bProz.genTS()
        ts = datetime.now()
        print(f'ts= {ts},  cnt={cntG}  y= {y0}')

        self.PL =    {"ts": str(ts),   # utf-Zeit
                "cnt": cntG,
                }
        for i, p in zip(range(self.bProz.m-1), self.bProz.xLst):
            self.PL.update({p.name: y0[i]})
...

Die beiden Endpoints werden mit den Decorators wie folgt definiert:

Endpoints
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@app.get("/data")
def read_data():
    return json.dumps(simProz.PL)

# ------------------------------------------------------------------
@app.get("/meta")
def read_meta():
    print(cfg_sim.bat)
    meta = []
    for b in cfg_sim.bat:
        meta.append({'name':b.name, 'meta':b.meta})
    return json.dumps(meta)
...

Die Serverdienste:

Server
1
2
3
4
5
6
...
sched = BackgroundScheduler({'apscheduler.timezone': 'UTC'})
sched.add_job(simProz.sim, 'interval', seconds=cfg_sim.T, id='runJob1')
sched.start()

uvicorn.run(app, host=cfg.RESTapi.ip, port=cfg.RESTapi.port)

Client-Beispiel: Swagger-UI

Fast API bringt einen interaktiven Browser-Client Swagger-UI zum Erkunden und Testen der RestAPI-Schnittstelle mit. Diese Anwendung kann mit {IP-Adresse}:{Port}/docs (Endpoint docs) im Browser aufgerufen werden:

UaExpert

Client-Beispiel:

Ein GET-Request lässt sich in einem Python-Skript sehr einfache umsetzen. Dazu steht die Bibliothek requests zur Verfügung. Und es braucht lediglich die vollständige URL mit Endpoint angegeben werden und.

GET-Requests
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import json 
import requests

api_url = "http://127.0.0.1:8008/meta"
meta_json = json.loads(requests.get(api_url).json())
print(json.dumps(meta_json, indent=4))

api_url = "http://127.0.0.1:8008/data"
data_json = json.loads(requests.get(api_url).json())
print(json.dumps(data_json, indent=4))

Die Antworten:

meta_json
[
    {
        "name": "var1",
        "meta": {
            "unit": "\u00b0C",
            "min": 0,
            "max": 60
        }
    },
    {
        "name": "var2",
        "meta": {
            "unit": "A",
            "min": 0,
            "max": 120
        }
    }
]
data_json
{
    "ts": "2024-03-01 23:47:08.308112",
    "cnt": 8,
    "var1": 46.46386301671156,
    "var2": 92.5501867734563
}

ZMQ

Ein schlichtes und leistungsfähiges Kommunikationsmechanismus, der Websockets aufsetzt, ist ZeroMQ. 0MQ kann sowohl Client-Server- als auch Publish-Subscribe-Mechanismen anbieten. Hier wird um den SimPro-Kern eine 0MQ-Server realisiert:

SimPro + 0MQ-Server
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import simLib_01 as simLib
import zmq
...

# 0MQ-Server Definition
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
...

class SimProz():

    def __init__(self,bProz) -> None:
    ...

    def sim(self):
    ...

    def serv(self):
        message = socket.recv()
        print(f"Received request: {message}")
        socket.send_json(self.PL)

# Konfiguration und Start der Simulation
...

Datenbanken

Für das Loggen von Zeitreihen werden hier die zwei gängigsten NoSQL-Datenbanken genutzt.

MongoDB

Die MongoDB ist eine dokumentenorientierte Datenbank, die frei verfügbar für Windows und Linux ist. Für Python wird das Paket mongodb genutzt.

Für die datenbankgestützte Entwicklung kann es sehr hilfreich sein, dass die Zeitreihe der Simulationsdaten nicht nur in einem festen Zeitraster (Abtastzeit T ).

MongoDB - Code-Sequenz
SimPro + MongoDB-Client
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import simLib_01 as simLib
from pymongo import MongoClient
...

class SimProz():

    def __init__(self,bProz) -> None:
        self.PL = {'ts':datetime.now(), 'cnt':0}
        self.bProz = bProz

    def sim(self):
        ...

    def init_mdb(self,ip,port,db,co):
        client = MongoClient(ip, port)
        mdb = client[db]
        self.col = mdb[co]
        self.col.drop()  # Alte Collection LÖSCHEN !!!

        try:
            mdb.create_collection(
                co,
                timeseries={
                    "timeField": "ts",
                    "metaField": "loc",
                    "granularity": "seconds"
                },
                expireAfterSeconds = cfg.expire # Ablauf der Gültigkeit von Datensätzen (damit die DB nicht unendlich wächst..)  -> CFG
            )
        # except mdbErrors.OperationFailure:  # If the collection doesn't exist
        except:  # If the collection doesn't exist
            print("This collection doesn't exist")        

    def write_mdb(self, ts):
        self.sim()
        self.PL["ts"] = ts
        result = self.col.insert_one(self.PL)

    def write_mdb_now(self):
        self.sim()
        result = self.col.insert_one(self.PL)

    ...

# Init Sim
simProz.init_mdb(cfg.mdb)

# ------------------------------------------------------------------
# Vorlauf der Simulation == Historie
tsE = datetime.now()  # now
tsA = tsE - timedelta(weeks=cfg.historie.weeks, days=cfg.historie.days , 
    minutes=cfg.historie.minutes, hours=cfg.historie.hours, 
    seconds=cfg.historie.seconds)  # Vorlauf der Simulation

ts = tsA
while ts <= tsE: 
    simProz.write_mdb(ts)
    ts = ts + timedelta(seconds=cfg_sim.T)
    print(ts)

# Start der zyklischen Simulation
sched = BlockingScheduler({'apscheduler.timezone': 'UTC'})
sched.add_job(simProz.write_mdb_now, 'interval', seconds=cfg_sim.T, id='run_mdbwrite')
...

Bei der Definition des Collection als werden wesentliche Eigenschaften festgelegt.

Timeseries
timeseries={
            "timeField": "ts",
            "metaField": "loc",
            "granularity": "seconds"
        },
        expireAfterSeconds = cfg.expire  # Ablauf der Gültigkeit von Datensätzen

InfluxDB

Die InfluxDB ist eine reine Zeitreihen-Datenbank und läuft auf allen System - auch auf einen RaspberryPi!

Für Python wird das Paket influx-client genutzt. Es kann mit der InfluxDB >= Version 2 arbeiten ( Tutorial )

InfluxDB - Code-Sequenz
SimPro + InfluxDB-Client
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import simLib_01 as simLib
from influxdb_client import InfluxDBClient, Point, WritePrecision, WriteOptions, BucketRetentionRules
from influxdb_client.client.write_api import SYNCHRONOUS
...

class SimProz():

    def __init__(self,bProz) -> None:
        ...

    def sim(self):
        ...
        self.ts = datetime.now(timezone.utc) # influxDB !!!
        self.PL =   {"cnt": cntG}
        for i, p in zip(range(self.bProz.m-1), self.bProz.xLst):
            self.PL.update({p.name: y0[i]})

    def init_idb(self,cfg):
        client = InfluxDBClient(url=f'http://{cfg.ip}:{cfg.port}', token=cfg.token, org=cfg.org)

        self.wapi = client.write_api(write_options=WriteOptions(batch_size=1))
        self.buc = cfg.buc
        self.mes = cfg.mes
        buckets_api = client.buckets_api()
        # wenn die Datenbank noch vorhanden ist, dann wird sie erzeugt:
        if buckets_api.find_bucket_by_name(self.buc) == None:
            print(f"Bucket {self.buc} nicht da")
            retention_rules = BucketRetentionRules(type="expire", every_seconds=cfg.expire)
            created_bucket = buckets_api.create_bucket(bucket_name=buc,
                                                        retention_rules=retention_rules,
                                                        org=org)

    def write_idb(self, ts):
        self.sim()
        dat = {
            "measurement": self.mes,
            "tags": {},
            "time": ts,
            "fields": self.PL
        }
        self.wapi.write(bucket=self.buc, record=dat, write_precision='ms')

    def write_idb_now(self):
        self.sim()
        dat = {
            "measurement": self.mes,
            "tags": {"loc": "host3"},
            "time": self.ts,
            "fields": self.PL
        }
        self.wapi.write(bucket=self.buc, record=dat, write_precision='ms')

    ...
...

Requirements

Für die Nutzung der zahlreichen Beispiele müssen eine Reihe von Bibliotheken installiert werden. Am einfachsten gelingt dies mit dem pip-Befehl:

Cmd

py -m pip install -r req.txt

pip install -r req.txt

Requirements req.txt

numpy
APScheduler
pandas
influx-client
pymongo
paho-mqtt
asyncua
asyncio
fastapi
uvicorn
requests

Installation als Linux-Dienst

Für automatische Ausführung eines Python-Skripts muss ein Dienst == serice eomngerichtet werden Link.

sim.service
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
[Unit]
Description=My-Service
After=network.target

[Service]
ExecStart=/usr/bin/python3 -u main.py
WorkingDirectory=/home/pi/py/sim
StandardOutput=inherit
StandardError=inherit
Restart=always
User=pi
root=false

[Install]
WantedBy=multi-user.target

Wird das py-Skript mit einer virtuellen Python-Umgebung gestartet, dann ist z.B. zu ändern:

.venv
6
ExecStart=/home/pi/py/sim/.venv/bin/python3 -u main.py

Wenn z.B. der Port eines Webservers auf Port 80 laufen soll, dann muss das Python-Skript mit root-Rechten ausgeführt werden:

root
12
root=true

Folgende Arbeitsschritte sind für die Installation und der Steuerung des Dienstes erforderlich:

Service an den richtigen Ort kopieren (/etc/systemd/systemm )

kopieren
sudo cp /home/pi/py/sim/sim.service /etc/systemd/system/sim.service

Service starten
sudo systemctl start sim.service
Service stoppen
sudo systemctl stop sim.service
Service fest einrichten
sudo systemctl enable sim.service
Service wieder starten
sudo systemctl restart sim.service

Status abfragen

sudo systemctl status sim.service

Befehlsübersicht
man systemctl
FH-SWF / Prof. Dr. J. Bechtloff, 2024