libiec61850/pyiec61850/test_pyiec61850.py

83 lines
2.7 KiB
Python
Executable file

#!/usr/bin/python
import sys
import time
import threading
import traceback
import signal
import sys
sys.path.append('.')
import iec61850
def signal_handler(signal, frame):
global running
running =0
print('You pressed Ctrl+C!')
#sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
tcpPort = 8102
running = 1
class myIECServer():
def __init__(self):
self.__model = iec61850.IedModel_create("testmodel")
lDevice1 = iec61850.LogicalDevice_create("SENSORS", self.__model);
lln0 = iec61850.LogicalNode_create("LLN0", lDevice1);
ttmp1 = iec61850.LogicalNode_create("TTMP1", lDevice1);
iec61850.CDC_SAV_create("TmpSv", iec61850.toModelNode(ttmp1), 0, False)
iec61850.CDC_ASG_create("TmpSp", iec61850.toModelNode(ttmp1), 0, False)
self.__iedServer = iec61850.IedServer_create(self.__model)
iec61850.IedServer_start(self.__iedServer, tcpPort);
if not(iec61850.IedServer_isRunning(self.__iedServer)) :
print("Starting server failed! Exit.\n")
iec61850.IedServer_destroy(self.__iedServer)
sys.exit(-1)
def run(self):
global running
while running:
time.sleep(0.1)
self.stop()
def stop(self):
iec61850.IedServer_stop(self.__iedServer)
iec61850.IedServer_destroy(self.__iedServer)
iec61850.IedModel_destroy(self.__model)
def testClient():
con = iec61850.IedConnection_create()
error = iec61850.IedConnection_connect(con, "localhost", tcpPort)
if (error == iec61850.IED_ERROR_OK):
# Accessing to SAV values
theVal = "testmodelSENSORS/TTMP1.TmpSv.instMag.f"
theValType = iec61850.IEC61850_FC_MX
temperatureValue = iec61850.IedConnection_readFloatValue(con, theVal, theValType)
assert(temperatureValue[1]==0)
newValue= temperatureValue[0]+10
err = iec61850.IedConnection_writeFloatValue(con, theVal, theValType, newValue)
assert(err==21)
# Accessing to ASG values
theVal = "testmodelSENSORS/TTMP1.TmpSp.setMag.f"
theValType = iec61850.IEC61850_FC_SP
temperatureSetpoint = iec61850.IedConnection_readFloatValue(con, theVal, theValType)
print(temperatureSetpoint)
assert(temperatureValue[1]==0)
newValue= temperatureValue[0]+10
err = iec61850.IedConnection_writeFloatValue(con, theVal, theValType, newValue)
assert(err==0)
temperatureSetpoint = iec61850.IedConnection_readFloatValue(con, theVal, theValType)
print(temperatureSetpoint)
assert(temperatureSetpoint[0]==newValue)
iec61850.IedConnection_close(con)
else:
print("Connection error")
sys.exit(-1)
iec61850.IedConnection_destroy(con)
print("client ok")
try:
srv=myIECServer()
srvThread = threading.Thread(target = srv.run)
srvThread.start()
testClient()
running = 0
#signal.pause()
except:
running = 0
print("Error :")
traceback.print_exc(file=sys.stdout)
sys.exit(-1)