83 lines
2.7 KiB
Python
Executable file
83 lines
2.7 KiB
Python
Executable file
#!/usr/bin/python
|
|
import sys
|
|
import time
|
|
import threading
|
|
import traceback
|
|
import signal
|
|
import sys
|
|
sys.path.append('.')
|
|
import iec61850
|
|
def signal_handler(signal, frame):
|
|
global running
|
|
running =0
|
|
print('You pressed Ctrl+C!')
|
|
#sys.exit(0)
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
tcpPort = 8102
|
|
running = 1
|
|
class myIECServer():
|
|
def __init__(self):
|
|
self.__model = iec61850.IedModel_create("testmodel")
|
|
lDevice1 = iec61850.LogicalDevice_create("SENSORS", self.__model);
|
|
lln0 = iec61850.LogicalNode_create("LLN0", lDevice1);
|
|
ttmp1 = iec61850.LogicalNode_create("TTMP1", lDevice1);
|
|
iec61850.CDC_SAV_create("TmpSv", iec61850.toModelNode(ttmp1), 0, False)
|
|
iec61850.CDC_ASG_create("TmpSp", iec61850.toModelNode(ttmp1), 0, False)
|
|
self.__iedServer = iec61850.IedServer_create(self.__model)
|
|
iec61850.IedServer_start(self.__iedServer, tcpPort);
|
|
if not(iec61850.IedServer_isRunning(self.__iedServer)) :
|
|
print("Starting server failed! Exit.\n")
|
|
iec61850.IedServer_destroy(self.__iedServer)
|
|
sys.exit(-1)
|
|
def run(self):
|
|
global running
|
|
while running:
|
|
time.sleep(0.1)
|
|
self.stop()
|
|
def stop(self):
|
|
iec61850.IedServer_stop(self.__iedServer)
|
|
iec61850.IedServer_destroy(self.__iedServer)
|
|
iec61850.IedModel_destroy(self.__model)
|
|
|
|
def testClient():
|
|
con = iec61850.IedConnection_create()
|
|
error = iec61850.IedConnection_connect(con, "localhost", tcpPort)
|
|
if (error == iec61850.IED_ERROR_OK):
|
|
# Accessing to SAV values
|
|
theVal = "testmodelSENSORS/TTMP1.TmpSv.instMag.f"
|
|
theValType = iec61850.IEC61850_FC_MX
|
|
temperatureValue = iec61850.IedConnection_readFloatValue(con, theVal, theValType)
|
|
assert(temperatureValue[1]==0)
|
|
newValue= temperatureValue[0]+10
|
|
err = iec61850.IedConnection_writeFloatValue(con, theVal, theValType, newValue)
|
|
assert(err==21)
|
|
# Accessing to ASG values
|
|
theVal = "testmodelSENSORS/TTMP1.TmpSp.setMag.f"
|
|
theValType = iec61850.IEC61850_FC_SP
|
|
temperatureSetpoint = iec61850.IedConnection_readFloatValue(con, theVal, theValType)
|
|
print(temperatureSetpoint)
|
|
assert(temperatureValue[1]==0)
|
|
newValue= temperatureValue[0]+10
|
|
err = iec61850.IedConnection_writeFloatValue(con, theVal, theValType, newValue)
|
|
assert(err==0)
|
|
temperatureSetpoint = iec61850.IedConnection_readFloatValue(con, theVal, theValType)
|
|
print(temperatureSetpoint)
|
|
assert(temperatureSetpoint[0]==newValue)
|
|
iec61850.IedConnection_close(con)
|
|
else:
|
|
print("Connection error")
|
|
sys.exit(-1)
|
|
iec61850.IedConnection_destroy(con)
|
|
print("client ok")
|
|
try:
|
|
srv=myIECServer()
|
|
srvThread = threading.Thread(target = srv.run)
|
|
srvThread.start()
|
|
testClient()
|
|
running = 0
|
|
#signal.pause()
|
|
except:
|
|
running = 0
|
|
print("Error :")
|
|
traceback.print_exc(file=sys.stdout)
|
|
sys.exit(-1)
|