1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-30 00:00:11 +01:00

python: Rework Python Sample and Format interfaces

Signed-off-by: Philipp Jungkamp <Philipp.Jungkamp@opal-rt.com>
This commit is contained in:
Philipp Jungkamp 2023-09-19 12:04:53 +02:00
parent 967815f7a6
commit b1eb40a8b3
11 changed files with 813 additions and 274 deletions

View file

@ -1,102 +1,85 @@
"""
Author: Steffen Vogel <post@steffenvogel.de>
SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University
SPDX-License-Identifier: Apache-2.0
""" # noqa: E501
import time import time
from villas.node.node import Node as VILLASnode from villas.node.node import Node as VILLASnode
# SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University
# SPDX-License-Identifier: Apache-2.0
# This could be moved to the DPsim Python code later # This could be moved to the DPsim Python code later
def get_dpsim_shmem_interface_signals(): def get_dpsim_shmem_interface_signals():
""" It would be nice if the DPsim Shmem interface could """It would be nice if the DPsim Shmem interface could
build-up a list of actual signal descriptions build-up a list of actual signal descriptions
(names, units, etc..) which attributes are exported. (names, units, etc..) which attributes are exported.
This would eliviate the user from manually configuring This would eliviate the user from manually configuring
signal mappings """ signal mappings"""
signals = [] signals = []
for i in range(0, 30): for i in range(0, 30):
signals.append({ signals.append(
'name': f'signal_{i}', {
'type': 'float', "name": f"signal_{i}",
'unit': 'volts' "type": "float",
}) "unit": "volts",
}
)
return signals return signals
def get_dpsim_shmem_interface_config(): def get_dpsim_shmem_interface_config():
return { return {
'type': 'shmem', "type": "shmem",
'in': { "in": {
'name': '/dpsim1-villas', "name": "/dpsim1-villas",
'hooks': [ "hooks": [{"type": "stats"}],
{ "signals": get_dpsim_shmem_interface_signals(),
'type': 'stats'
}
],
'signals': get_dpsim_shmem_interface_signals()
}, },
'out': { "out": {"name": "/villas-dpsim1"},
'name': '/villas-dpsim1'
}
} }
def get_villas_config(): def get_villas_config():
return { return {
'nodes': { "nodes": {
'broker1': { "broker1": {
'type': 'mqtt', "type": "mqtt",
'format': 'json', "format": "json",
'host': '172.17.0.1', "host": "172.17.0.1",
'in': { "in": {"subscribe": "/powerflow-dpsim"},
'subscribe': '/powerflow-dpsim'}, "out": {"publish": "/dpsim-powerflow"},
'out': {
'publish': '/dpsim-powerflow'
}
}, },
'dpsim1': get_dpsim_shmem_interface_config(), "dpsim1": get_dpsim_shmem_interface_config(),
}, },
'paths': [ "paths": [
{ {
'in': 'dpsim1', "in": "dpsim1",
'out': 'broker1', "out": "broker1",
"hooks": [{"type": "limit_rate", "rate": 50}],
'hooks': [
{
'type': 'limit_rate',
'rate': 50
}
]
} }
] ],
} }
def main(): def main():
node = VILLASnode(config=get_villas_config())
node = VILLASnode(
config=get_villas_config()
)
node.start() # VILLASnode starts running in the background from here.. node.start() # VILLASnode starts running in the background from here..
# Some infos from the running VILLASnode instance queried via its REST API # Some infos from the running VILLASnode instance queried via its REST API
print('VILLASnode running?: ', node.is_running()) print("VILLASnode running?: ", node.is_running())
print('VILLASnode status: ', node.status) print("VILLASnode status: ", node.status)
print('VILLASnode nodes: ', node.nodes) print("VILLASnode nodes: ", node.nodes)
print('VILLASnode paths: ', node.paths) print("VILLASnode paths: ", node.paths)
print('VILLASnode config: ', node.active_config) print("VILLASnode config: ", node.active_config)
print('VILLASnode version: ', node.get_version()) print("VILLASnode version: ", node.get_version())
# Load a new config into the running # Load a new config into the running
# VILLASnode instance (old config will be replaced) # VILLASnode instance (old config will be replaced)
new_config = node.active_config new_config = node.active_config
new_config['paths'].append({ new_config["paths"].append({"out": "dpsim1", "in": "broker1"})
'out': 'dpsim1',
'in': 'broker1'
})
node.load_config(new_config) node.load_config(new_config)
@ -105,5 +88,5 @@ def main():
node.stop() node.stop()
if __name__ == 'main': if __name__ == "main":
main() main()

43
python/pyproject.toml Normal file
View file

@ -0,0 +1,43 @@
# SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH
# SPDX-License-Identifier: Apache-2.0
[build-system]
requires = ['setuptools>=61.0']
build-backend = 'setuptools.build_meta'
[project]
name = 'villas-python'
version = '0.10.3'
description = 'Python support for the VILLASnode simulation-data gateway'
readme = 'README.md'
requires-python = '>=3.10'
keywords = ['simulation', 'power', 'system', 'real-time', 'villas']
license.text = 'Apache-2.0'
classifiers = [
'Development Status :: 4 - Beta',
'Topic :: Scientific/Engineering',
'License :: OSI Approved :: Apache Software License',
'Operating System :: POSIX :: Linux',
'Programming Language :: Python :: 3',
]
dependencies = ['linuxfd', 'requests']
optional-dependencies.dev = [
'black',
'flake8',
'mypy',
'pytest',
'types-requests',
]
[project.urls]
GitHub = 'https://github.com/VILLASframework/node'
Project = 'https://www.fein-aachen.org/en/projects/villas-node'
Homepage = 'https://villas.fein-aachen.org'
[[project.authors]]
name = 'Steffen Vogel'
email = 'post@steffenvogel.de'
[[project.authors]]
name = 'Philipp Jungkamp'
email = 'Philipp.Jungkamp@opal-rt.com'

View file

@ -1,42 +1,8 @@
# SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University """
# SPDX-License-Identifier: Apache-2.0 SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University
SPDX-License-Identifier: Apache-2.0
""" # noqa: E501
from setuptools import setup, find_namespace_packages from setuptools import setup
from glob import glob
import sys
with open('README.md') as f: setup()
long_description = f.read()
setup(
name='villas-node',
version='0.10.3',
author='Steffen Vogel',
author_email='acs-software@eonerc.rwth-aachen.de',
description='Python-support for VILLASnode simulation-data gateway',
license='Apache-2.0',
keywords='simulation power system real-time villas',
url='https://git.rwth-aachen.de/acs/public/villas/VILLASnode',
packages=find_namespace_packages(include=['villas.*']),
long_description=long_description,
long_description_content_type='text/markdown',
classifiers=[
'Development Status :: 4 - Beta',
'Topic :: Scientific/Engineering',
'License :: OSI Approved :: '
'License :: OSI Approved :: Apache Software License',
'Operating System :: MacOS :: MacOS X',
'Operating System :: Microsoft :: Windows',
'Operating System :: POSIX :: Linux',
'Programming Language :: Python :: 3'
],
install_requires=[
'requests'
] + [
'linuxfd'
] if sys.platform == 'linux' else [],
setup_requires=[
'm2r'
],
scripts=glob('bin/*')
)

View file

View file

@ -0,0 +1,9 @@
"""
Author: Steffen Vogel <post@steffenvogel.de>
SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University
SPDX-License-Identifier: Apache-2.0
""" # noqa: E501
from .node import Node
__all__ = ["Node"]

View file

@ -1,86 +1,83 @@
# SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University """
# SPDX-License-Identifier: Apache-2.0 Author: Steffen Vogel <post@steffenvogel.de>
SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University
SPDX-License-Identifier: Apache-2.0
""" # noqa: E501
import time
import logging import logging
import sys import sys
import linuxfd
from villas.node.sample import Sample, Timestamp
from threading import Thread from threading import Thread
from typing import Callable
import linuxfd # type: ignore[import]
from villas.node.formats import VillasHuman
from villas.node.sample import Sample
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
RecvCallback = Callable[[Sample], None]
SendCallback = Callable[[int], Sample]
class RecvThread(Thread): class RecvThread(Thread):
def __init__(self, cb: RecvCallback):
def __init__(self, cb):
super().__init__() super().__init__()
self.cb = cb self.cb = cb
self.daemon = True self.daemon = True
self.format = VillasHuman()
def run(self): def run(self):
for line in sys.stdin: for line in sys.stdin:
if line.startswith('#'): logger.debug(f"RecvThread: {line}")
continue
logger.debug("RecvThread: {}".format(line)) if (sample := self.format.load_sample(line)) is not None:
self.cb(sample)
sample = Sample.parse(line)
self.cb(sample.values)
class SendThread(Thread): class SendThread(Thread):
def __init__(self, cb: SendCallback, rate: float):
def __init__(self, cb, rate=None):
super().__init__() super().__init__()
self.cb = cb self.cb = cb
self.rate = rate
self.daemon = True self.daemon = True
self.format = VillasHuman()
self.rate = rate
self.sequence = 0 self.sequence = 0
def run(self): def run(self):
tfd = linuxfd.timerfd()
if self.rate: tfd.settime(1.0, 1.0 / self.rate)
tfd = linuxfd.timerfd()
tfd.settime(1.0, 1.0 / self.rate)
else:
tfd = None
while True: while True:
if tfd: tfd.read()
tfd.read()
values = self.cb() sample = self.cb(self.sequence)
ts = Timestamp.now(None, self.sequence) if sample is None:
continue
sample = Sample(ts, values) sample = self.format.dump_sample(sample)
sys.stdout.write(sample)
sys.stdout.write(str(sample) + '\n')
sys.stdout.flush() sys.stdout.flush()
self.sequence += 1 self.sequence += 1
def communicate(rate, recv_cb=None, send_cb=None, wait=True): def communicate(
rate: float,
if recv_cb: recv_cb: RecvCallback | None = None,
send_cb: SendCallback | None = None,
wait: bool = True,
):
if recv_cb is not None:
rt = RecvThread(recv_cb) rt = RecvThread(recv_cb)
rt.start() rt.start()
if send_cb: if send_cb is not None:
st = SendThread(send_cb, rate) st = SendThread(send_cb, rate)
st.start() st.start()
if wait: if wait:
try: try:
while True: rt.join()
time.sleep(1) st.join()
except KeyboardInterrupt: except KeyboardInterrupt:
logger.info('Received Ctrl+C. Stopping send/recv threads') logger.info("Received Ctrl+C. Stopping send/recv threads")
# Threads are daemon threads
# and therefore killed with program termination

View file

@ -0,0 +1,241 @@
"""
Author: Philipp Jungkamp <Philipp.Jungkamp@opal-rt.com>
SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH
SPDX-License-Identifier: Apache-2.0
""" # noqa: E501
import re
from dataclasses import dataclass, field
from itertools import groupby
from typing import Iterable
from villas.node.sample import Sample, Timestamp, Signal
class SignalList(list[type]):
types = {
"b": bool,
"i": int,
"f": float,
"c": complex,
}
_type_to_char = {t: c for c, t in types.items()}
def __init__(self, fmt: str | Sample | Iterable[type] = "64f"):
if isinstance(fmt, Sample):
super().__init__(map(type, fmt.data) if fmt.data else [])
return
elif not isinstance(fmt, str):
super().__init__(fmt)
return
super().__init__()
regex = f"([{''.join(self.__class__.types.keys())}])"
fields = iter(re.split(regex, fmt))
while (count_str := next(fields, None)) is not None:
if count_str:
count = int(count_str)
else:
count = 1
try:
ty_str = next(fields)
except StopIteration:
if count_str:
raise ValueError("Expected type specifier.")
else:
break
try:
ty = self.__class__.types[ty_str]
except KeyError:
raise ValueError(f"Unknown type {ty_str}")
self.extend([ty] * count)
def __str__(self):
fmt = ""
for ty, run in groupby(self):
run_length = sum(1 for _ in run)
c = self.__class__._type_to_char[ty]
if run_length > 1:
fmt += f"{run_length}"
fmt += f"{c}"
return fmt
def __repr__(self):
return f"{self.__class__.__name__}('{self.__str__()}')"
@dataclass(kw_only=True)
class Format:
"""
The base for VILLASnode formats in Python.
"""
ts_origin: bool = True
ts_received: bool = True
sequence: bool = True
data: bool = True
def _strip_sample(self, sample: Sample) -> Sample:
if not self.ts_origin:
sample.ts_origin = None
if not self.ts_received:
sample.ts_received = None
if not self.sequence:
sample.sequence = None
if not self.data:
sample.data = []
return sample
@dataclass
class VillasHuman(Format):
"""
The villas.human format in Python.
"""
signal_list: SignalList = field(default_factory=SignalList)
separator: str = "\t"
delimiter: str = "\n"
def load(self, file) -> list[Sample]:
"""
Load samples from a text mode file object.
"""
return self.loads(file.read())
def loads(self, s: str) -> list[Sample]:
"""
Load samples from a string.
"""
s.strip(self.separator + self.delimiter)
sample_strs = s.split(sep=self.delimiter)
samples = (self.load_sample(sample) for sample in sample_strs)
return [s for s in samples if s is not None]
def dump(self, samples: Iterable[Sample], file):
"""
Dump samples to a text mode file object.
"""
return file.write(self.dumps(samples))
def dumps(self, samples: Iterable[Sample]) -> str:
"""
Dump samples to a string.
"""
sample_strs = (self.dump_sample(sample) for sample in iter(samples))
return "".join(sample_strs)
def load_sample(self, sample: str) -> Sample | None:
"""
Load a single sample from a string.
"""
sample = sample.strip(self.delimiter)
if sample.startswith("#"):
return None
fields = sample.split(sep=self.separator)
if not fields[0]:
return None
m = re.match(
r"(\d+)(?:\.(\d+))?([-+]\d+(?:\.\d+)?"
r"(?:e[+-]?\d+)?)?(?:\((\d+)\))?(F)?",
fields[0],
)
if m is None:
raise ValueError(f"Invalid header: {fields[0]}")
ts_seconds = int(m.group(1))
ts_nanoseconds = int(m.group(2)) if m.group(2) else 0
ts_offset = float(m.group(3)) if m.group(3) else None
sequence = int(m.group(4)) if m.group(4) else None
new_frame = bool(m.group(5))
ts_origin = Timestamp(ts_seconds, ts_nanoseconds)
if ts_offset is not None:
ts_received_raw = ts_origin.timestamp() + ts_offset
ts_received = Timestamp.fromtimestamp(ts_received_raw)
else:
ts_received = None
data: list[Signal] = []
for ty, value in zip(self.signal_list, fields[1:]):
if ty is bool:
data.append(bool(int(value)))
elif ty is int:
data.append(int(value))
elif ty is float:
data.append(float(value))
elif ty is complex:
data.append(self._unpack_complex(value))
return self._strip_sample(
Sample(
ts_origin=ts_origin,
ts_received=ts_received,
sequence=sequence,
new_frame=new_frame,
data=data,
)
)
def dump_sample(self, smp: Sample) -> str:
"""
Dump a single sample to a string.
"""
smp = self._strip_sample(smp)
s = ""
if smp.ts_origin is not None:
s += f"{smp.ts_origin.seconds}"
if smp.ts_origin.nanoseconds != 0:
s += f".{smp.ts_origin.nanoseconds:09}"
if smp.ts_received is not None:
off = smp.ts_received.timestamp() - smp.ts_origin.timestamp()
s += f"+{off}"
if smp.sequence is not None:
s += f"({smp.sequence})"
if smp.new_frame:
s += "F"
for ty, value in zip(self.signal_list, smp.data):
s += self.separator
assert ty == type(value)
match value:
case bool():
s += str(int(value))
case int():
s += str(value)
case float():
s += str(value)
case complex():
s += self._pack_complex(value)
s += self.delimiter
return s
def _unpack_complex(self, s: str) -> complex:
return complex(s.lower().replace("i", "j"))
def _pack_complex(self, z: complex) -> str:
return f"{z.real}+{z.imag}i"

View file

@ -1,34 +1,42 @@
# SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University """
# SPDX-License-Identifier: Apache-2.0 Author: Steffen Vogel <post@steffenvogel.de>
SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University
SPDX-License-Identifier: Apache-2.0
""" # noqa: E501
import json
import os
import tempfile
import subprocess
import logging
import signal
import requests
import datetime import datetime
import json
import logging
import os
import signal
import subprocess
from tempfile import NamedTemporaryFile
LOGGER = logging.getLogger('villas.node') import requests
LOGGER = logging.getLogger("villas.node")
class Node(object): class Node(object):
api_version = "v2"
api_version = 'v2' def __init__(
self,
def __init__(self, api_url=None, api_url=None,
log_filename=None, log_filename=None,
config_filename=None, config={}, config_filename=None,
executable='villas-node', **kwargs): config={},
executable="villas-node",
**kwargs,
):
self.api_url = api_url self.api_url = api_url
self.log_filename = log_filename self.log_filename = log_filename
self.executable = executable self.executable = executable
if config_filename and config: if config_filename and config:
raise RuntimeError('Can\'t provide config_filename and ' raise RuntimeError(
'config at the same time!') "Can't provide config_filename and " "config at the same time!"
)
if config_filename: if config_filename:
with open(config_filename) as f: with open(config_filename) as f:
@ -38,15 +46,14 @@ class Node(object):
# Try to deduct api_url from config # Try to deduct api_url from config
if self.api_url is None: if self.api_url is None:
port = config.get('http', {}).get('port') port = config.get("http", {}).get("port")
if port is None: if port is None:
port = 80 if os.getuid() == 0 else 8080 port = 80 if os.getuid() == 0 else 8080
self.api_url = f'http://localhost:{port}' self.api_url = f"http://localhost:{port}"
def start(self): def start(self):
self.config_file = tempfile.NamedTemporaryFile(mode='w+', self.config_file = NamedTemporaryFile(mode="w+", suffix=".json")
suffix='.json')
json.dump(self.config, self.config_file) json.dump(self.config, self.config_file)
@ -54,16 +61,20 @@ class Node(object):
if self.log_filename is None: if self.log_filename is None:
now = datetime.datetime.now() now = datetime.datetime.now()
self.log_filename = now.strftime( fmt = "villas-node_%Y-%m-%d_%H-%M-%S.log"
'villas-node_%Y-%m-%d_%H-%M-%S.log') self.log_filename = now.strftime(fmt)
self.log = open(self.log_filename, 'w+') self.log = open(self.log_filename, "w+")
LOGGER.info("Starting VILLASnode instance with config: %s", LOGGER.info(
self.config_file.name) f"Starting VILLASnode instance with config: {self.config_file.name}" # noqa: E501
)
self.child = subprocess.Popen([self.executable, self.config_file.name], self.child = subprocess.Popen(
stdout=self.log, stderr=self.log) [self.executable, self.config_file.name],
stdout=self.log,
stderr=self.log,
)
def pause(self): def pause(self):
LOGGER.info("Pausing VILLASnode instance") LOGGER.info("Pausing VILLASnode instance")
@ -82,61 +93,58 @@ class Node(object):
def restart(self): def restart(self):
LOGGER.info("Restarting VILLASnode instance") LOGGER.info("Restarting VILLASnode instance")
self.request('restart') self.request("restart")
@property @property
def active_config(self): def active_config(self):
return self.request('config') return self.request("config")
@property @property
def nodes(self): def nodes(self):
return self.request('nodes') return self.request("nodes")
@property @property
def paths(self): def paths(self):
return self.request('paths') return self.request("paths")
@property @property
def status(self): def status(self):
return self.request('status') return self.request("status")
def load_config(self, i): def load_config(self, i):
if type(i) is dict: if type(i) is dict:
cfg = i cfg = i
elif type(i) is str: elif type(i) is str:
cfg = json.loads(i) cfg = json.loads(i)
elif hasattr(i, 'read'): # file-like? elif hasattr(i, "read"): # file-like?
cfg = json.load(i) cfg = json.load(i)
else: else:
raise TypeError() raise TypeError()
req = { req = {"config": cfg}
'config': cfg
}
self.request('restart', method='POST', json=req) self.request("restart", method="POST", json=req)
def request(self, action, method='GET', **args): def request(self, action, method="GET", **args):
if "timeout" not in args:
args["timeout"] = 1
if 'timeout' not in args: r = requests.request(
args['timeout'] = 1 method, f"{self.api_url}/api/{self.api_version}/{action}", **args
)
r = requests.request(method,
f'{self.api_url}/api/{self.api_version}/{action}',
**args)
r.raise_for_status() r.raise_for_status()
return r.json() return r.json()
def get_local_version(self): def get_local_version(self):
ver = subprocess.check_output([self.executable, '-V']) ver = subprocess.check_output([self.executable, "-V"])
return ver.decode('ascii').rstrip() return ver.decode("ascii").rstrip()
def get_version(self): def get_version(self):
resp = self.request('status') resp = self.request("status")
return resp['version'] return resp["version"]
def is_running(self): def is_running(self):
if self.child is None: if self.child is None:

View file

@ -1,110 +1,181 @@
# SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University """
# SPDX-License-Identifier: Apache-2.0 Author: Steffen Vogel <post@steffenvogel.de>
Author: Philipp Jungkamp <Philipp.Jungkamp@opal-rt.com>
SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University
SPDX-License-Identifier: Apache-2.0
""" # noqa: E501
import re import hashlib
from ctypes import c_double, c_float, sizeof
from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime
from functools import total_ordering from functools import total_ordering
from sys import byteorder as native
from typing import Iterable
assert sizeof(c_float) == 4
assert sizeof(c_double) == 8
Signal = bool | int | float | complex
@total_ordering @total_ordering
@dataclass
class Timestamp: class Timestamp:
"""Parsing the VILLASnode human-readable timestamp format""" """
A VILLASnode timestamp. Based on the C struct timespec.
"""
def __init__(self, seconds=None, nanoseconds=None, seconds: int
offset=None, sequence=None): nanoseconds: int = 0
self.seconds = seconds
self.nanoseconds = nanoseconds def _as_digest_bytes(self):
self.offset = offset sec = self.seconds.to_bytes(8, "little")
self.sequence = sequence nsec = self.nanoseconds.to_bytes(8, "little")
return bytes().join([sec, nsec])
@classmethod @classmethod
def now(cls, offset=None, sequence=None): def fromdatetime(cls, ts: datetime) -> "Timestamp":
n = datetime.utcnow() secs = int(ts.timestamp())
nsecs = int(1000 * ts.microsecond)
secs = int(n.timestamp()) return cls(seconds=secs, nanoseconds=nsecs)
nsecs = 1000 * n.microsecond
return Timestamp(seconds=secs, nanoseconds=nsecs,
offset=offset, sequence=sequence)
@classmethod @classmethod
def parse(cls, ts): def fromtimestamp(cls, ts: float) -> "Timestamp":
m = re.match(r'(\d+)(?:\.(\d+))?([-+]\d+(?:\.\d+)?' secs = int(ts)
r'(?:e[+-]?\d+)?)?(?:\((\d+)\))?', ts) nsecs = int(1e9 * (ts - float(secs)))
return cls(seconds=secs, nanoseconds=nsecs)
seconds = int(m.group(1)) # Mandatory def timestamp(self) -> float:
nanoseconds = int(m.group(2)) if m.group(2) else None return float(self)
offset = float(m.group(3)) if m.group(3) else None
sequence = int(m.group(4)) if m.group(4) else None
return Timestamp(seconds, nanoseconds, offset, sequence) def datetime(self) -> datetime:
return datetime.fromtimestamp(self.timestamp())
def __str__(self):
str = "%u" % (self.seconds)
if self.nanoseconds is not None:
str += ".%09u" % self.nanoseconds
if self.offset is not None:
str += "+%u" % self.offset
if self.sequence is not None:
str += "(%u)" % self.sequence
return str
def __float__(self): def __float__(self):
sum = float(self.seconds) return float(self.seconds) + float(self.nanoseconds) * 1e-9
if self.nanoseconds is not None: def _as_ordered_tuple(self):
sum += self.nanoseconds * 1e-9 return (
if self.offset is not None: self.seconds,
sum += self.offset self.nanoseconds,
)
return sum def __eq__(self, other: object):
if not isinstance(other, Timestamp):
return False
def __eq__(self, other): return self._as_ordered_tuple() == other._as_ordered_tuple()
return float(self) == float(other)
def __lt__(self, other): def __lt__(self, other: "Timestamp"):
return float(self) < float(other) return self._as_ordered_tuple() < other._as_ordered_tuple()
@total_ordering @total_ordering
@dataclass(kw_only=True)
class Sample: class Sample:
"""Parsing a VILLASnode sample from a file (not a UDP package!!)""" """
A VILLASnode sample.
"""
def __init__(self, ts, values): ts_origin: Timestamp | None = None
self.ts = ts ts_received: Timestamp | None = None
self.values = values sequence: int | None = None
new_frame: bool = False
data: list[Signal] = field(default_factory=list)
@classmethod def _as_ordered_tuple(self):
def parse(cls, line): return (
csv = line.split() self.ts_origin is not None,
self.ts_origin if self.ts_origin is not None else Timestamp(0),
self.ts_received is not None,
self.ts_received if self.ts_received is not None else Timestamp(0),
self.sequence is not None,
self.sequence if self.sequence is not None else 0,
not self.new_frame,
self.data,
)
ts = Timestamp.parse(csv[0]) def __eq__(self, other: object):
vs = [] if not isinstance(other, Sample):
return False
for value in csv[1:]: return self._as_ordered_tuple() == other._as_ordered_tuple()
try:
v = float(value)
except ValueError:
value = value.lower()
try:
v = complex(value)
except Exception:
if value.endswith('i'):
v = complex(value.replace('i', 'j'))
else:
raise ValueError()
vs.append(v) def __lt__(self, other: "Timestamp"):
return self._as_ordered_tuple() < other._as_ordered_tuple()
return Sample(ts, vs) def _as_digest_bytes(self):
def signal_to_bytes(signal):
match signal:
case bool():
return signal.to_bytes(1, "little")
case int():
return signal.to_bytes(8, "little")
case float():
i = int.from_bytes(bytes(c_double(signal)), native)
return i.to_bytes(8, "little")
case complex():
f_real = signal.real
f_imag = signal.imag
i_real = int.from_bytes(bytes(c_float(f_real)), native)
i_imag = int.from_bytes(bytes(c_float(f_imag)), native)
real = i_real.to_bytes(4, "little")
imag = i_imag.to_bytes(4, "little")
return bytes().join([real, imag])
def __str__(self): return bytes().join(
return '%s\t%s' % (self.ts, "\t".join(map(str, self.values))) [
self.ts_origin._as_digest_bytes(),
self.sequence.to_bytes(8, "little"),
]
+ list(map(signal_to_bytes, self.data))
)
def __eq__(self, other):
return self.ts == other.ts
def __lt__(self, other): @dataclass
return self.ts < other.ts class Frame(list[Sample]):
"""
A frame VILLASnode of sample indicated by the new_frame flag.
"""
def __init__(self, it: Iterable[Sample]):
super().__init__(it)
def __repr__(self) -> str:
return f"{self.__class__.__name__}({super().__repr__()})"
def digest(self, algorithm: str) -> bytes:
"""
A digest for a frame of samples that is comparable to the digest hook.
"""
hash = hashlib.new(algorithm)
for sample in self:
hash.update(sample._as_digest_bytes())
return hash.digest()
def group(samples: list[Sample]) -> list["Frame"]:
"""
Group samples into Frames according to their new_frame flag.
"""
samples.sort()
if not samples:
return []
frames = []
current_frame = Frame([samples[0]])
for sample in samples[1:]:
if sample.new_frame:
frames.append(current_frame)
current_frame = Frame([sample])
else:
current_frame.append(sample)
frames.append(current_frame)
return frames

View file

@ -0,0 +1,51 @@
"""
Author: Philipp Jungkamp <Philipp.Jungkamp@opal-rt.com>
SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH
SPDX-License-Identifier: Apache-2.0
""" # noqa: E501
from villas.node.sample import Sample, Timestamp
from villas.node.formats import SignalList, VillasHuman
from cmath import sqrt
def test_signal_list_repr():
signal_list = SignalList("21fb2ic")
assert signal_list == eval(repr(signal_list))
def test_signal_list():
signal_list = SignalList("1fb2ic")
assert signal_list == SignalList([float, bool, int, int, complex])
def test_villas_human_repr():
villas_human = VillasHuman(ts_received=False)
assert villas_human == eval(repr(villas_human))
def test_villas_human():
smp1 = Sample(
ts_origin=Timestamp(123456780),
ts_received=Timestamp(123456781),
sequence=4,
new_frame=True,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
smp2 = Sample(
ts_origin=Timestamp(123456789),
ts_received=Timestamp(123456790),
sequence=5,
new_frame=False,
data=[1.0, 2.0, 3.0, False, 42, sqrt(complex(-1))],
)
villas_human = VillasHuman(signal_list=SignalList(smp1))
smp1_str = "123456780+1.0(4)F\t1.0\t2.0\t3.0\t1\t42\t0.0+1.0i\n"
smp2_str = "123456789+1.0(5)\t1.0\t2.0\t3.0\t0\t42\t0.0+1.0i\n"
assert villas_human.dump_sample(smp1) == smp1_str
assert villas_human.dump_sample(smp2) == smp2_str
assert villas_human.dumps([smp1, smp2]) == smp1_str + smp2_str
assert villas_human.load_sample(smp1_str) == smp1
assert villas_human.load_sample(smp2_str) == smp2
assert villas_human.loads(smp1_str + smp2_str) == [smp1, smp2]

View file

@ -0,0 +1,170 @@
"""
Author: Philipp Jungkamp <Philipp.Jungkamp@opal-rt.com>
SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH
SPDX-License-Identifier: Apache-2.0
""" # noqa: E501
from villas.node.sample import Sample, Timestamp, Frame
from datetime import datetime
from cmath import sqrt
def test_timestamp_repr():
ts = Timestamp(123456789, 123456789)
assert ts == eval(repr(ts))
def test_timestamp_conversion():
ts = Timestamp(123456789, 123456789)
fl = 123456789.123456789
fl_ts = Timestamp(123456789, 123456791)
assert ts.timestamp() == fl
assert fl_ts == Timestamp.fromtimestamp(fl)
dt = datetime(1973, 11, 29, 22, 33, 9, 123457)
dt_ts = Timestamp(123456789, 123457000)
assert ts.datetime() == dt
assert dt_ts == Timestamp.fromdatetime(dt)
def test_timestamp_ordering():
ts1 = Timestamp(123456789)
ts2 = Timestamp(123456789, 0)
ts3 = Timestamp(123456789, 123456789)
assert ts1 == ts2
assert ts2 < ts3
def test_timestamp_as_digest_bytes():
ts = Timestamp(123456789, 123456789)
digest_bytes = bytes.fromhex("15cd5b070000000015cd5b0700000000")
assert ts._as_digest_bytes() == digest_bytes
def test_sample_repr():
smp = Sample(
ts_origin=Timestamp(123456789),
ts_received=Timestamp(123456790),
sequence=4,
new_frame=True,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
assert smp == eval(repr(smp))
def test_sample_ordering():
smp1 = Sample(
ts_origin=Timestamp(123456789),
ts_received=Timestamp(123456790),
sequence=4,
new_frame=True,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
smp2 = Sample(
ts_origin=Timestamp(123456789),
ts_received=Timestamp(123456790),
sequence=4,
new_frame=True,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
smp3 = Sample(
ts_origin=Timestamp(123456789),
ts_received=Timestamp(123456791),
sequence=4,
new_frame=True,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
assert smp1 == smp2
assert smp2 < smp3
def test_sample_as_digest_bytes():
smp = Sample(
ts_origin=Timestamp(123456789),
ts_received=Timestamp(123456790),
sequence=4,
new_frame=True,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
digest_bytes_hex = "15cd5b070000000000000000000000000400000000000000000000000000f03f00000000000000400000000000000840012a00000000000000000000000000803f" # noqa: E501
digest_bytes = bytes.fromhex(digest_bytes_hex)
assert smp._as_digest_bytes() == digest_bytes, smp._as_digest_bytes().hex()
def test_frame_repr():
smp1 = Sample(
ts_origin=Timestamp(123456780),
ts_received=Timestamp(123456781),
sequence=4,
new_frame=True,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
smp2 = Sample(
ts_origin=Timestamp(123456789),
ts_received=Timestamp(123456790),
sequence=5,
new_frame=False,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
frame = Frame([smp1, smp2])
assert frame == eval(repr(frame))
def test_frame_group():
smp1 = Sample(
ts_origin=Timestamp(123456780),
ts_received=Timestamp(123456781),
sequence=4,
new_frame=True,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
smp2 = Sample(
ts_origin=Timestamp(123456789),
ts_received=Timestamp(123456790),
sequence=5,
new_frame=False,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
smp3 = Sample(
ts_origin=Timestamp(123456791),
ts_received=Timestamp(123456793),
sequence=6,
new_frame=True,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
frames = list(Frame.group([smp1, smp2, smp3]))
assert len(frames) == 2
assert list(map(len, frames)) == [2, 1]
assert frames == [[smp1, smp2], [smp3]]
def test_frame_digest():
smp1 = Sample(
ts_origin=Timestamp(123456780),
ts_received=Timestamp(123456781),
sequence=4,
new_frame=True,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
smp2 = Sample(
ts_origin=Timestamp(123456789),
ts_received=Timestamp(123456790),
sequence=5,
new_frame=False,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
digest = bytes.fromhex(
"a573e3b0953a1e4f69addf631d6229bb714d263b4f362f0847e96c3838c83217"
) # noqa: E501
assert Frame([smp1, smp2]).digest("sha256") == digest