diff --git a/python/examples/Shmem_CIGRE_MV.py b/python/examples/Shmem_CIGRE_MV.py index 1d1640729..fc65c551b 100644 --- a/python/examples/Shmem_CIGRE_MV.py +++ b/python/examples/Shmem_CIGRE_MV.py @@ -1,102 +1,85 @@ +""" +Author: Steffen Vogel +SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University +SPDX-License-Identifier: Apache-2.0 +""" # noqa: E501 import time from villas.node.node import Node as VILLASnode -# SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University -# SPDX-License-Identifier: Apache-2.0 - # This could be moved to the DPsim Python code later def get_dpsim_shmem_interface_signals(): - """ It would be nice if the DPsim Shmem interface could - build-up a list of actual signal descriptions - (names, units, etc..) which attributes are exported. - This would eliviate the user from manually configuring - signal mappings """ + """It would be nice if the DPsim Shmem interface could + build-up a list of actual signal descriptions + (names, units, etc..) which attributes are exported. + This would eliviate the user from manually configuring + signal mappings""" signals = [] for i in range(0, 30): - signals.append({ - 'name': f'signal_{i}', - 'type': 'float', - 'unit': 'volts' - }) + signals.append( + { + "name": f"signal_{i}", + "type": "float", + "unit": "volts", + } + ) return signals def get_dpsim_shmem_interface_config(): return { - 'type': 'shmem', - 'in': { - 'name': '/dpsim1-villas', - 'hooks': [ - { - 'type': 'stats' - } - ], - 'signals': get_dpsim_shmem_interface_signals() + "type": "shmem", + "in": { + "name": "/dpsim1-villas", + "hooks": [{"type": "stats"}], + "signals": get_dpsim_shmem_interface_signals(), }, - 'out': { - 'name': '/villas-dpsim1' - } + "out": {"name": "/villas-dpsim1"}, } def get_villas_config(): return { - 'nodes': { - 'broker1': { - 'type': 'mqtt', - 'format': 'json', - 'host': '172.17.0.1', - 'in': { - 'subscribe': '/powerflow-dpsim'}, - 'out': { - 'publish': '/dpsim-powerflow' - } + "nodes": { + "broker1": { + "type": "mqtt", + "format": "json", + "host": "172.17.0.1", + "in": {"subscribe": "/powerflow-dpsim"}, + "out": {"publish": "/dpsim-powerflow"}, }, - 'dpsim1': get_dpsim_shmem_interface_config(), + "dpsim1": get_dpsim_shmem_interface_config(), }, - 'paths': [ + "paths": [ { - 'in': 'dpsim1', - 'out': 'broker1', - - 'hooks': [ - { - 'type': 'limit_rate', - 'rate': 50 - } - ] + "in": "dpsim1", + "out": "broker1", + "hooks": [{"type": "limit_rate", "rate": 50}], } - ] + ], } def main(): - - node = VILLASnode( - config=get_villas_config() - ) + node = VILLASnode(config=get_villas_config()) node.start() # VILLASnode starts running in the background from here.. # Some infos from the running VILLASnode instance queried via its REST API - print('VILLASnode running?: ', node.is_running()) - print('VILLASnode status: ', node.status) - print('VILLASnode nodes: ', node.nodes) - print('VILLASnode paths: ', node.paths) - print('VILLASnode config: ', node.active_config) - print('VILLASnode version: ', node.get_version()) + print("VILLASnode running?: ", node.is_running()) + print("VILLASnode status: ", node.status) + print("VILLASnode nodes: ", node.nodes) + print("VILLASnode paths: ", node.paths) + print("VILLASnode config: ", node.active_config) + print("VILLASnode version: ", node.get_version()) # Load a new config into the running # VILLASnode instance (old config will be replaced) new_config = node.active_config - new_config['paths'].append({ - 'out': 'dpsim1', - 'in': 'broker1' - }) + new_config["paths"].append({"out": "dpsim1", "in": "broker1"}) node.load_config(new_config) @@ -105,5 +88,5 @@ def main(): node.stop() -if __name__ == 'main': +if __name__ == "main": main() diff --git a/python/pyproject.toml b/python/pyproject.toml new file mode 100644 index 000000000..65f4edc34 --- /dev/null +++ b/python/pyproject.toml @@ -0,0 +1,43 @@ +# SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH +# SPDX-License-Identifier: Apache-2.0 + +[build-system] +requires = ['setuptools>=61.0'] +build-backend = 'setuptools.build_meta' + +[project] +name = 'villas-python' +version = '0.10.3' +description = 'Python support for the VILLASnode simulation-data gateway' +readme = 'README.md' +requires-python = '>=3.10' +keywords = ['simulation', 'power', 'system', 'real-time', 'villas'] +license.text = 'Apache-2.0' +classifiers = [ + 'Development Status :: 4 - Beta', + 'Topic :: Scientific/Engineering', + 'License :: OSI Approved :: Apache Software License', + 'Operating System :: POSIX :: Linux', + 'Programming Language :: Python :: 3', +] +dependencies = ['linuxfd', 'requests'] +optional-dependencies.dev = [ + 'black', + 'flake8', + 'mypy', + 'pytest', + 'types-requests', +] + +[project.urls] +GitHub = 'https://github.com/VILLASframework/node' +Project = 'https://www.fein-aachen.org/en/projects/villas-node' +Homepage = 'https://villas.fein-aachen.org' + +[[project.authors]] +name = 'Steffen Vogel' +email = 'post@steffenvogel.de' + +[[project.authors]] +name = 'Philipp Jungkamp' +email = 'Philipp.Jungkamp@opal-rt.com' diff --git a/python/setup.py b/python/setup.py index d8092dbe6..2d19d0d65 100644 --- a/python/setup.py +++ b/python/setup.py @@ -1,42 +1,8 @@ -# SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University -# SPDX-License-Identifier: Apache-2.0 +""" +SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University +SPDX-License-Identifier: Apache-2.0 +""" # noqa: E501 -from setuptools import setup, find_namespace_packages -from glob import glob -import sys +from setuptools import setup -with open('README.md') as f: - long_description = f.read() - -setup( - name='villas-node', - version='0.10.3', - author='Steffen Vogel', - author_email='acs-software@eonerc.rwth-aachen.de', - description='Python-support for VILLASnode simulation-data gateway', - license='Apache-2.0', - keywords='simulation power system real-time villas', - url='https://git.rwth-aachen.de/acs/public/villas/VILLASnode', - packages=find_namespace_packages(include=['villas.*']), - long_description=long_description, - long_description_content_type='text/markdown', - classifiers=[ - 'Development Status :: 4 - Beta', - 'Topic :: Scientific/Engineering', - 'License :: OSI Approved :: ' - 'License :: OSI Approved :: Apache Software License', - 'Operating System :: MacOS :: MacOS X', - 'Operating System :: Microsoft :: Windows', - 'Operating System :: POSIX :: Linux', - 'Programming Language :: Python :: 3' - ], - install_requires=[ - 'requests' - ] + [ - 'linuxfd' - ] if sys.platform == 'linux' else [], - setup_requires=[ - 'm2r' - ], - scripts=glob('bin/*') -) +setup() diff --git a/python/villas/__init__.py b/python/villas/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/villas/node/__init__.py b/python/villas/node/__init__.py index e69de29bb..6b25e1978 100644 --- a/python/villas/node/__init__.py +++ b/python/villas/node/__init__.py @@ -0,0 +1,9 @@ +""" +Author: Steffen Vogel +SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University +SPDX-License-Identifier: Apache-2.0 +""" # noqa: E501 + +from .node import Node + +__all__ = ["Node"] diff --git a/python/villas/node/communicate.py b/python/villas/node/communicate.py index d78ed5741..f01a05146 100644 --- a/python/villas/node/communicate.py +++ b/python/villas/node/communicate.py @@ -1,86 +1,83 @@ -# SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University -# SPDX-License-Identifier: Apache-2.0 +""" +Author: Steffen Vogel +SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University +SPDX-License-Identifier: Apache-2.0 +""" # noqa: E501 -import time import logging import sys -import linuxfd -from villas.node.sample import Sample, Timestamp from threading import Thread +from typing import Callable + +import linuxfd # type: ignore[import] +from villas.node.formats import VillasHuman +from villas.node.sample import Sample logger = logging.getLogger(__name__) +RecvCallback = Callable[[Sample], None] +SendCallback = Callable[[int], Sample] + + class RecvThread(Thread): - - def __init__(self, cb): + def __init__(self, cb: RecvCallback): super().__init__() - self.cb = cb self.daemon = True + self.format = VillasHuman() def run(self): for line in sys.stdin: - if line.startswith('#'): - continue + logger.debug(f"RecvThread: {line}") - logger.debug("RecvThread: {}".format(line)) - - sample = Sample.parse(line) - - self.cb(sample.values) + if (sample := self.format.load_sample(line)) is not None: + self.cb(sample) class SendThread(Thread): - - def __init__(self, cb, rate=None): + def __init__(self, cb: SendCallback, rate: float): super().__init__() - self.cb = cb - self.rate = rate self.daemon = True - + self.format = VillasHuman() + self.rate = rate self.sequence = 0 def run(self): - - if self.rate: - tfd = linuxfd.timerfd() - tfd.settime(1.0, 1.0 / self.rate) - else: - tfd = None + tfd = linuxfd.timerfd() + tfd.settime(1.0, 1.0 / self.rate) while True: - if tfd: - tfd.read() + tfd.read() - values = self.cb() - ts = Timestamp.now(None, self.sequence) + sample = self.cb(self.sequence) + if sample is None: + continue - sample = Sample(ts, values) - - sys.stdout.write(str(sample) + '\n') + sample = self.format.dump_sample(sample) + sys.stdout.write(sample) sys.stdout.flush() - self.sequence += 1 -def communicate(rate, recv_cb=None, send_cb=None, wait=True): - - if recv_cb: +def communicate( + rate: float, + recv_cb: RecvCallback | None = None, + send_cb: SendCallback | None = None, + wait: bool = True, +): + if recv_cb is not None: rt = RecvThread(recv_cb) rt.start() - if send_cb: + if send_cb is not None: st = SendThread(send_cb, rate) st.start() if wait: try: - while True: - time.sleep(1) + rt.join() + st.join() except KeyboardInterrupt: - logger.info('Received Ctrl+C. Stopping send/recv threads') - - # Threads are daemon threads - # and therefore killed with program termination + logger.info("Received Ctrl+C. Stopping send/recv threads") diff --git a/python/villas/node/formats.py b/python/villas/node/formats.py new file mode 100644 index 000000000..eefb3d4ce --- /dev/null +++ b/python/villas/node/formats.py @@ -0,0 +1,241 @@ +""" +Author: Philipp Jungkamp +SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH +SPDX-License-Identifier: Apache-2.0 +""" # noqa: E501 + +import re +from dataclasses import dataclass, field +from itertools import groupby +from typing import Iterable + +from villas.node.sample import Sample, Timestamp, Signal + + +class SignalList(list[type]): + types = { + "b": bool, + "i": int, + "f": float, + "c": complex, + } + + _type_to_char = {t: c for c, t in types.items()} + + def __init__(self, fmt: str | Sample | Iterable[type] = "64f"): + if isinstance(fmt, Sample): + super().__init__(map(type, fmt.data) if fmt.data else []) + return + elif not isinstance(fmt, str): + super().__init__(fmt) + return + + super().__init__() + regex = f"([{''.join(self.__class__.types.keys())}])" + fields = iter(re.split(regex, fmt)) + while (count_str := next(fields, None)) is not None: + if count_str: + count = int(count_str) + else: + count = 1 + + try: + ty_str = next(fields) + except StopIteration: + if count_str: + raise ValueError("Expected type specifier.") + else: + break + + try: + ty = self.__class__.types[ty_str] + except KeyError: + raise ValueError(f"Unknown type {ty_str}") + + self.extend([ty] * count) + + def __str__(self): + fmt = "" + + for ty, run in groupby(self): + run_length = sum(1 for _ in run) + c = self.__class__._type_to_char[ty] + if run_length > 1: + fmt += f"{run_length}" + fmt += f"{c}" + + return fmt + + def __repr__(self): + return f"{self.__class__.__name__}('{self.__str__()}')" + + +@dataclass(kw_only=True) +class Format: + """ + The base for VILLASnode formats in Python. + """ + + ts_origin: bool = True + ts_received: bool = True + sequence: bool = True + data: bool = True + + def _strip_sample(self, sample: Sample) -> Sample: + if not self.ts_origin: + sample.ts_origin = None + + if not self.ts_received: + sample.ts_received = None + + if not self.sequence: + sample.sequence = None + + if not self.data: + sample.data = [] + + return sample + + +@dataclass +class VillasHuman(Format): + """ + The villas.human format in Python. + """ + + signal_list: SignalList = field(default_factory=SignalList) + separator: str = "\t" + delimiter: str = "\n" + + def load(self, file) -> list[Sample]: + """ + Load samples from a text mode file object. + """ + + return self.loads(file.read()) + + def loads(self, s: str) -> list[Sample]: + """ + Load samples from a string. + """ + + s.strip(self.separator + self.delimiter) + sample_strs = s.split(sep=self.delimiter) + samples = (self.load_sample(sample) for sample in sample_strs) + return [s for s in samples if s is not None] + + def dump(self, samples: Iterable[Sample], file): + """ + Dump samples to a text mode file object. + """ + + return file.write(self.dumps(samples)) + + def dumps(self, samples: Iterable[Sample]) -> str: + """ + Dump samples to a string. + """ + + sample_strs = (self.dump_sample(sample) for sample in iter(samples)) + return "".join(sample_strs) + + def load_sample(self, sample: str) -> Sample | None: + """ + Load a single sample from a string. + """ + + sample = sample.strip(self.delimiter) + + if sample.startswith("#"): + return None + + fields = sample.split(sep=self.separator) + + if not fields[0]: + return None + + m = re.match( + r"(\d+)(?:\.(\d+))?([-+]\d+(?:\.\d+)?" + r"(?:e[+-]?\d+)?)?(?:\((\d+)\))?(F)?", + fields[0], + ) + + if m is None: + raise ValueError(f"Invalid header: {fields[0]}") + + ts_seconds = int(m.group(1)) + ts_nanoseconds = int(m.group(2)) if m.group(2) else 0 + ts_offset = float(m.group(3)) if m.group(3) else None + sequence = int(m.group(4)) if m.group(4) else None + new_frame = bool(m.group(5)) + + ts_origin = Timestamp(ts_seconds, ts_nanoseconds) + if ts_offset is not None: + ts_received_raw = ts_origin.timestamp() + ts_offset + ts_received = Timestamp.fromtimestamp(ts_received_raw) + else: + ts_received = None + + data: list[Signal] = [] + for ty, value in zip(self.signal_list, fields[1:]): + if ty is bool: + data.append(bool(int(value))) + elif ty is int: + data.append(int(value)) + elif ty is float: + data.append(float(value)) + elif ty is complex: + data.append(self._unpack_complex(value)) + + return self._strip_sample( + Sample( + ts_origin=ts_origin, + ts_received=ts_received, + sequence=sequence, + new_frame=new_frame, + data=data, + ) + ) + + def dump_sample(self, smp: Sample) -> str: + """ + Dump a single sample to a string. + """ + + smp = self._strip_sample(smp) + + s = "" + if smp.ts_origin is not None: + s += f"{smp.ts_origin.seconds}" + if smp.ts_origin.nanoseconds != 0: + s += f".{smp.ts_origin.nanoseconds:09}" + if smp.ts_received is not None: + off = smp.ts_received.timestamp() - smp.ts_origin.timestamp() + s += f"+{off}" + if smp.sequence is not None: + s += f"({smp.sequence})" + if smp.new_frame: + s += "F" + + for ty, value in zip(self.signal_list, smp.data): + s += self.separator + assert ty == type(value) + match value: + case bool(): + s += str(int(value)) + case int(): + s += str(value) + case float(): + s += str(value) + case complex(): + s += self._pack_complex(value) + + s += self.delimiter + + return s + + def _unpack_complex(self, s: str) -> complex: + return complex(s.lower().replace("i", "j")) + + def _pack_complex(self, z: complex) -> str: + return f"{z.real}+{z.imag}i" diff --git a/python/villas/node/node.py b/python/villas/node/node.py index f88fef939..2ecce0ad3 100644 --- a/python/villas/node/node.py +++ b/python/villas/node/node.py @@ -1,34 +1,42 @@ -# SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University -# SPDX-License-Identifier: Apache-2.0 +""" +Author: Steffen Vogel +SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University +SPDX-License-Identifier: Apache-2.0 +""" # noqa: E501 -import json -import os -import tempfile -import subprocess -import logging -import signal -import requests import datetime +import json +import logging +import os +import signal +import subprocess +from tempfile import NamedTemporaryFile -LOGGER = logging.getLogger('villas.node') +import requests + +LOGGER = logging.getLogger("villas.node") class Node(object): + api_version = "v2" - api_version = 'v2' - - def __init__(self, api_url=None, - log_filename=None, - config_filename=None, config={}, - executable='villas-node', **kwargs): - + def __init__( + self, + api_url=None, + log_filename=None, + config_filename=None, + config={}, + executable="villas-node", + **kwargs, + ): self.api_url = api_url self.log_filename = log_filename self.executable = executable if config_filename and config: - raise RuntimeError('Can\'t provide config_filename and ' - 'config at the same time!') + raise RuntimeError( + "Can't provide config_filename and " "config at the same time!" + ) if config_filename: with open(config_filename) as f: @@ -38,15 +46,14 @@ class Node(object): # Try to deduct api_url from config if self.api_url is None: - port = config.get('http', {}).get('port') + port = config.get("http", {}).get("port") if port is None: port = 80 if os.getuid() == 0 else 8080 - self.api_url = f'http://localhost:{port}' + self.api_url = f"http://localhost:{port}" def start(self): - self.config_file = tempfile.NamedTemporaryFile(mode='w+', - suffix='.json') + self.config_file = NamedTemporaryFile(mode="w+", suffix=".json") json.dump(self.config, self.config_file) @@ -54,16 +61,20 @@ class Node(object): if self.log_filename is None: now = datetime.datetime.now() - self.log_filename = now.strftime( - 'villas-node_%Y-%m-%d_%H-%M-%S.log') + fmt = "villas-node_%Y-%m-%d_%H-%M-%S.log" + self.log_filename = now.strftime(fmt) - self.log = open(self.log_filename, 'w+') + self.log = open(self.log_filename, "w+") - LOGGER.info("Starting VILLASnode instance with config: %s", - self.config_file.name) + LOGGER.info( + f"Starting VILLASnode instance with config: {self.config_file.name}" # noqa: E501 + ) - self.child = subprocess.Popen([self.executable, self.config_file.name], - stdout=self.log, stderr=self.log) + self.child = subprocess.Popen( + [self.executable, self.config_file.name], + stdout=self.log, + stderr=self.log, + ) def pause(self): LOGGER.info("Pausing VILLASnode instance") @@ -82,61 +93,58 @@ class Node(object): def restart(self): LOGGER.info("Restarting VILLASnode instance") - self.request('restart') + self.request("restart") @property def active_config(self): - return self.request('config') + return self.request("config") @property def nodes(self): - return self.request('nodes') + return self.request("nodes") @property def paths(self): - return self.request('paths') + return self.request("paths") @property def status(self): - return self.request('status') + return self.request("status") def load_config(self, i): if type(i) is dict: cfg = i elif type(i) is str: cfg = json.loads(i) - elif hasattr(i, 'read'): # file-like? + elif hasattr(i, "read"): # file-like? cfg = json.load(i) else: raise TypeError() - req = { - 'config': cfg - } + req = {"config": cfg} - self.request('restart', method='POST', json=req) + self.request("restart", method="POST", json=req) - def request(self, action, method='GET', **args): + def request(self, action, method="GET", **args): + if "timeout" not in args: + args["timeout"] = 1 - if 'timeout' not in args: - args['timeout'] = 1 - - r = requests.request(method, - f'{self.api_url}/api/{self.api_version}/{action}', - **args) + r = requests.request( + method, f"{self.api_url}/api/{self.api_version}/{action}", **args + ) r.raise_for_status() return r.json() def get_local_version(self): - ver = subprocess.check_output([self.executable, '-V']) + ver = subprocess.check_output([self.executable, "-V"]) - return ver.decode('ascii').rstrip() + return ver.decode("ascii").rstrip() def get_version(self): - resp = self.request('status') + resp = self.request("status") - return resp['version'] + return resp["version"] def is_running(self): if self.child is None: diff --git a/python/villas/node/sample.py b/python/villas/node/sample.py index 2bbb2f910..1c90b625f 100644 --- a/python/villas/node/sample.py +++ b/python/villas/node/sample.py @@ -1,110 +1,181 @@ -# SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University -# SPDX-License-Identifier: Apache-2.0 +""" +Author: Steffen Vogel +Author: Philipp Jungkamp +SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University +SPDX-License-Identifier: Apache-2.0 +""" # noqa: E501 -import re +import hashlib +from ctypes import c_double, c_float, sizeof +from dataclasses import dataclass, field from datetime import datetime from functools import total_ordering +from sys import byteorder as native +from typing import Iterable + +assert sizeof(c_float) == 4 +assert sizeof(c_double) == 8 + + +Signal = bool | int | float | complex @total_ordering +@dataclass class Timestamp: - """Parsing the VILLASnode human-readable timestamp format""" + """ + A VILLASnode timestamp. Based on the C struct timespec. + """ - def __init__(self, seconds=None, nanoseconds=None, - offset=None, sequence=None): - self.seconds = seconds - self.nanoseconds = nanoseconds - self.offset = offset - self.sequence = sequence + seconds: int + nanoseconds: int = 0 + + def _as_digest_bytes(self): + sec = self.seconds.to_bytes(8, "little") + nsec = self.nanoseconds.to_bytes(8, "little") + return bytes().join([sec, nsec]) @classmethod - def now(cls, offset=None, sequence=None): - n = datetime.utcnow() - - secs = int(n.timestamp()) - nsecs = 1000 * n.microsecond - - return Timestamp(seconds=secs, nanoseconds=nsecs, - offset=offset, sequence=sequence) + def fromdatetime(cls, ts: datetime) -> "Timestamp": + secs = int(ts.timestamp()) + nsecs = int(1000 * ts.microsecond) + return cls(seconds=secs, nanoseconds=nsecs) @classmethod - def parse(cls, ts): - m = re.match(r'(\d+)(?:\.(\d+))?([-+]\d+(?:\.\d+)?' - r'(?:e[+-]?\d+)?)?(?:\((\d+)\))?', ts) + def fromtimestamp(cls, ts: float) -> "Timestamp": + secs = int(ts) + nsecs = int(1e9 * (ts - float(secs))) + return cls(seconds=secs, nanoseconds=nsecs) - seconds = int(m.group(1)) # Mandatory - nanoseconds = int(m.group(2)) if m.group(2) else None - offset = float(m.group(3)) if m.group(3) else None - sequence = int(m.group(4)) if m.group(4) else None + def timestamp(self) -> float: + return float(self) - return Timestamp(seconds, nanoseconds, offset, sequence) - - def __str__(self): - str = "%u" % (self.seconds) - - if self.nanoseconds is not None: - str += ".%09u" % self.nanoseconds - if self.offset is not None: - str += "+%u" % self.offset - if self.sequence is not None: - str += "(%u)" % self.sequence - - return str + def datetime(self) -> datetime: + return datetime.fromtimestamp(self.timestamp()) def __float__(self): - sum = float(self.seconds) + return float(self.seconds) + float(self.nanoseconds) * 1e-9 - if self.nanoseconds is not None: - sum += self.nanoseconds * 1e-9 - if self.offset is not None: - sum += self.offset + def _as_ordered_tuple(self): + return ( + self.seconds, + self.nanoseconds, + ) - return sum + def __eq__(self, other: object): + if not isinstance(other, Timestamp): + return False - def __eq__(self, other): - return float(self) == float(other) + return self._as_ordered_tuple() == other._as_ordered_tuple() - def __lt__(self, other): - return float(self) < float(other) + def __lt__(self, other: "Timestamp"): + return self._as_ordered_tuple() < other._as_ordered_tuple() @total_ordering +@dataclass(kw_only=True) class Sample: - """Parsing a VILLASnode sample from a file (not a UDP package!!)""" + """ + A VILLASnode sample. + """ - def __init__(self, ts, values): - self.ts = ts - self.values = values + ts_origin: Timestamp | None = None + ts_received: Timestamp | None = None + sequence: int | None = None + new_frame: bool = False + data: list[Signal] = field(default_factory=list) - @classmethod - def parse(cls, line): - csv = line.split() + def _as_ordered_tuple(self): + return ( + self.ts_origin is not None, + self.ts_origin if self.ts_origin is not None else Timestamp(0), + self.ts_received is not None, + self.ts_received if self.ts_received is not None else Timestamp(0), + self.sequence is not None, + self.sequence if self.sequence is not None else 0, + not self.new_frame, + self.data, + ) - ts = Timestamp.parse(csv[0]) - vs = [] + def __eq__(self, other: object): + if not isinstance(other, Sample): + return False - for value in csv[1:]: - try: - v = float(value) - except ValueError: - value = value.lower() - try: - v = complex(value) - except Exception: - if value.endswith('i'): - v = complex(value.replace('i', 'j')) - else: - raise ValueError() + return self._as_ordered_tuple() == other._as_ordered_tuple() - vs.append(v) + def __lt__(self, other: "Timestamp"): + return self._as_ordered_tuple() < other._as_ordered_tuple() - return Sample(ts, vs) + def _as_digest_bytes(self): + def signal_to_bytes(signal): + match signal: + case bool(): + return signal.to_bytes(1, "little") + case int(): + return signal.to_bytes(8, "little") + case float(): + i = int.from_bytes(bytes(c_double(signal)), native) + return i.to_bytes(8, "little") + case complex(): + f_real = signal.real + f_imag = signal.imag + i_real = int.from_bytes(bytes(c_float(f_real)), native) + i_imag = int.from_bytes(bytes(c_float(f_imag)), native) + real = i_real.to_bytes(4, "little") + imag = i_imag.to_bytes(4, "little") + return bytes().join([real, imag]) - def __str__(self): - return '%s\t%s' % (self.ts, "\t".join(map(str, self.values))) + return bytes().join( + [ + self.ts_origin._as_digest_bytes(), + self.sequence.to_bytes(8, "little"), + ] + + list(map(signal_to_bytes, self.data)) + ) - def __eq__(self, other): - return self.ts == other.ts - def __lt__(self, other): - return self.ts < other.ts +@dataclass +class Frame(list[Sample]): + """ + A frame VILLASnode of sample indicated by the new_frame flag. + """ + + def __init__(self, it: Iterable[Sample]): + super().__init__(it) + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({super().__repr__()})" + + def digest(self, algorithm: str) -> bytes: + """ + A digest for a frame of samples that is comparable to the digest hook. + """ + + hash = hashlib.new(algorithm) + + for sample in self: + hash.update(sample._as_digest_bytes()) + + return hash.digest() + + def group(samples: list[Sample]) -> list["Frame"]: + """ + Group samples into Frames according to their new_frame flag. + """ + + samples.sort() + + if not samples: + return [] + + frames = [] + current_frame = Frame([samples[0]]) + for sample in samples[1:]: + if sample.new_frame: + frames.append(current_frame) + current_frame = Frame([sample]) + else: + current_frame.append(sample) + frames.append(current_frame) + + return frames diff --git a/python/villas/node/test_formats.py b/python/villas/node/test_formats.py new file mode 100644 index 000000000..2b2e9fb5c --- /dev/null +++ b/python/villas/node/test_formats.py @@ -0,0 +1,51 @@ +""" +Author: Philipp Jungkamp +SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH +SPDX-License-Identifier: Apache-2.0 +""" # noqa: E501 +from villas.node.sample import Sample, Timestamp +from villas.node.formats import SignalList, VillasHuman +from cmath import sqrt + + +def test_signal_list_repr(): + signal_list = SignalList("21fb2ic") + assert signal_list == eval(repr(signal_list)) + + +def test_signal_list(): + signal_list = SignalList("1fb2ic") + assert signal_list == SignalList([float, bool, int, int, complex]) + + +def test_villas_human_repr(): + villas_human = VillasHuman(ts_received=False) + assert villas_human == eval(repr(villas_human)) + + +def test_villas_human(): + smp1 = Sample( + ts_origin=Timestamp(123456780), + ts_received=Timestamp(123456781), + sequence=4, + new_frame=True, + data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))], + ) + + smp2 = Sample( + ts_origin=Timestamp(123456789), + ts_received=Timestamp(123456790), + sequence=5, + new_frame=False, + data=[1.0, 2.0, 3.0, False, 42, sqrt(complex(-1))], + ) + + villas_human = VillasHuman(signal_list=SignalList(smp1)) + smp1_str = "123456780+1.0(4)F\t1.0\t2.0\t3.0\t1\t42\t0.0+1.0i\n" + smp2_str = "123456789+1.0(5)\t1.0\t2.0\t3.0\t0\t42\t0.0+1.0i\n" + assert villas_human.dump_sample(smp1) == smp1_str + assert villas_human.dump_sample(smp2) == smp2_str + assert villas_human.dumps([smp1, smp2]) == smp1_str + smp2_str + assert villas_human.load_sample(smp1_str) == smp1 + assert villas_human.load_sample(smp2_str) == smp2 + assert villas_human.loads(smp1_str + smp2_str) == [smp1, smp2] diff --git a/python/villas/node/test_sample.py b/python/villas/node/test_sample.py new file mode 100644 index 000000000..4a723eb44 --- /dev/null +++ b/python/villas/node/test_sample.py @@ -0,0 +1,170 @@ +""" +Author: Philipp Jungkamp +SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH +SPDX-License-Identifier: Apache-2.0 +""" # noqa: E501 +from villas.node.sample import Sample, Timestamp, Frame +from datetime import datetime +from cmath import sqrt + + +def test_timestamp_repr(): + ts = Timestamp(123456789, 123456789) + assert ts == eval(repr(ts)) + + +def test_timestamp_conversion(): + ts = Timestamp(123456789, 123456789) + + fl = 123456789.123456789 + fl_ts = Timestamp(123456789, 123456791) + assert ts.timestamp() == fl + assert fl_ts == Timestamp.fromtimestamp(fl) + + dt = datetime(1973, 11, 29, 22, 33, 9, 123457) + dt_ts = Timestamp(123456789, 123457000) + assert ts.datetime() == dt + assert dt_ts == Timestamp.fromdatetime(dt) + + +def test_timestamp_ordering(): + ts1 = Timestamp(123456789) + ts2 = Timestamp(123456789, 0) + ts3 = Timestamp(123456789, 123456789) + assert ts1 == ts2 + assert ts2 < ts3 + + +def test_timestamp_as_digest_bytes(): + ts = Timestamp(123456789, 123456789) + digest_bytes = bytes.fromhex("15cd5b070000000015cd5b0700000000") + assert ts._as_digest_bytes() == digest_bytes + + +def test_sample_repr(): + smp = Sample( + ts_origin=Timestamp(123456789), + ts_received=Timestamp(123456790), + sequence=4, + new_frame=True, + data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))], + ) + assert smp == eval(repr(smp)) + + +def test_sample_ordering(): + smp1 = Sample( + ts_origin=Timestamp(123456789), + ts_received=Timestamp(123456790), + sequence=4, + new_frame=True, + data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))], + ) + + smp2 = Sample( + ts_origin=Timestamp(123456789), + ts_received=Timestamp(123456790), + sequence=4, + new_frame=True, + data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))], + ) + + smp3 = Sample( + ts_origin=Timestamp(123456789), + ts_received=Timestamp(123456791), + sequence=4, + new_frame=True, + data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))], + ) + + assert smp1 == smp2 + assert smp2 < smp3 + + +def test_sample_as_digest_bytes(): + smp = Sample( + ts_origin=Timestamp(123456789), + ts_received=Timestamp(123456790), + sequence=4, + new_frame=True, + data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))], + ) + + digest_bytes_hex = "15cd5b070000000000000000000000000400000000000000000000000000f03f00000000000000400000000000000840012a00000000000000000000000000803f" # noqa: E501 + digest_bytes = bytes.fromhex(digest_bytes_hex) + assert smp._as_digest_bytes() == digest_bytes, smp._as_digest_bytes().hex() + + +def test_frame_repr(): + smp1 = Sample( + ts_origin=Timestamp(123456780), + ts_received=Timestamp(123456781), + sequence=4, + new_frame=True, + data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))], + ) + + smp2 = Sample( + ts_origin=Timestamp(123456789), + ts_received=Timestamp(123456790), + sequence=5, + new_frame=False, + data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))], + ) + + frame = Frame([smp1, smp2]) + assert frame == eval(repr(frame)) + + +def test_frame_group(): + smp1 = Sample( + ts_origin=Timestamp(123456780), + ts_received=Timestamp(123456781), + sequence=4, + new_frame=True, + data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))], + ) + + smp2 = Sample( + ts_origin=Timestamp(123456789), + ts_received=Timestamp(123456790), + sequence=5, + new_frame=False, + data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))], + ) + + smp3 = Sample( + ts_origin=Timestamp(123456791), + ts_received=Timestamp(123456793), + sequence=6, + new_frame=True, + data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))], + ) + + frames = list(Frame.group([smp1, smp2, smp3])) + assert len(frames) == 2 + assert list(map(len, frames)) == [2, 1] + assert frames == [[smp1, smp2], [smp3]] + + +def test_frame_digest(): + smp1 = Sample( + ts_origin=Timestamp(123456780), + ts_received=Timestamp(123456781), + sequence=4, + new_frame=True, + data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))], + ) + + smp2 = Sample( + ts_origin=Timestamp(123456789), + ts_received=Timestamp(123456790), + sequence=5, + new_frame=False, + data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))], + ) + + digest = bytes.fromhex( + "a573e3b0953a1e4f69addf631d6229bb714d263b4f362f0847e96c3838c83217" + ) # noqa: E501 + assert Frame([smp1, smp2]).digest("sha256") == digest