1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-30 00:00:11 +01:00

python: Add digest-hook fifo format parsing

Signed-off-by: Philipp Jungkamp <Philipp.Jungkamp@opal-rt.com>
This commit is contained in:
Philipp Jungkamp 2023-09-26 17:28:02 +02:00 committed by Steffen Vogel
parent b90c5d9c77
commit c3d991b23e
6 changed files with 237 additions and 129 deletions

View file

@ -0,0 +1,117 @@
"""
Author: Philipp Jungkamp <Philipp.Jungkamp@opal-rt.com>
SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH
SPDX-License-Identifier: Apache-2.0
""" # noqa: E501
import hashlib
from dataclasses import dataclass
from typing import Iterable
from villas.node.sample import Sample, Timestamp
@dataclass
class Frame(list[Sample]):
"""
A frame VILLASnode of sample indicated by the new_frame flag.
"""
def __init__(self, it: Iterable[Sample]):
super().__init__(it)
def __repr__(self) -> str:
return f"{self.__class__.__name__}({super().__repr__()})"
def _raw_digest(self, algorithm) -> bytes:
hash = hashlib.new(algorithm)
for sample in self:
hash.update(sample._as_digest_bytes())
return hash.digest()
def digest(self, algorithm: str) -> "Digest":
"""
A digest for a frame of samples that is comparable to the digest hook.
"""
return Digest.fromframe(self, algorithm)
def group(samples: list[Sample]) -> list["Frame"]:
"""
Group samples into Frames according to their new_frame flag.
"""
samples.sort()
if not samples:
return []
frames = []
current_frame = Frame([samples[0]])
for sample in samples[1:]:
if sample.new_frame:
frames.append(current_frame)
current_frame = Frame([sample])
else:
current_frame.append(sample)
frames.append(current_frame)
return frames
@dataclass
class Digest:
first: tuple[Timestamp, int]
last: tuple[Timestamp, int]
algorithm: str
bytes: bytes
@staticmethod
def fromframe(frame: Frame, algorithm: str) -> "Digest":
first = frame[0]
last = frame[-1]
def isnone(x):
return x is None
if any(map(isnone, [first.ts_origin, last.ts_origin])):
raise ValueError("Missing origin timestamp for digest.")
if any(map(isnone, [first.sequence, last.sequence])):
raise ValueError("Missing sequence number for digest.")
return Digest(
first=(first.ts_origin, first.sequence), # type: ignore[arg-type]
last=(last.ts_origin, last.sequence), # type: ignore[arg-type]
algorithm=algorithm,
bytes=frame._raw_digest(algorithm),
)
@staticmethod
def _parse_timestamp(s: str) -> tuple[Timestamp, int]:
ts, seq = s.split("-")
sec, nsec = ts.split(".")
return Timestamp(seconds=int(sec), nanoseconds=int(nsec)), int(seq)
@staticmethod
def parse(s: str) -> "Digest":
first, last, algorithm, digest = s.split(" ")
return Digest(
first=Digest._parse_timestamp(first),
last=Digest._parse_timestamp(last),
algorithm=algorithm,
bytes=bytes.fromhex(digest),
)
@staticmethod
def _dump_timestamp(timestamp: Timestamp, sequence: int) -> str:
return f"{timestamp.seconds}.{timestamp.nanoseconds}-{sequence}"
def dump(self) -> str:
first = Digest._dump_timestamp(*self.first)
last = Digest._dump_timestamp(*self.last)
algorithm = self.algorithm
digest = self.bytes.hex().upper()
return f"{first} {last} {algorithm} {digest}"

View file

@ -9,7 +9,7 @@ from dataclasses import dataclass, field
from itertools import groupby from itertools import groupby
from typing import Iterable from typing import Iterable
from villas.node.sample import Sample, Timestamp, Signal from villas.node.sample import Sample, Signal, Timestamp
class SignalList(list[type]): class SignalList(list[type]):

View file

@ -5,13 +5,11 @@ SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Syst
SPDX-License-Identifier: Apache-2.0 SPDX-License-Identifier: Apache-2.0
""" # noqa: E501 """ # noqa: E501
import hashlib
from ctypes import c_double, c_float, sizeof from ctypes import c_double, c_float, sizeof
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime, timezone from datetime import datetime, timezone
from functools import total_ordering from functools import total_ordering
from sys import byteorder as native from sys import byteorder as native
from typing import Iterable
assert sizeof(c_float) == 4 assert sizeof(c_float) == 4
assert sizeof(c_double) == 8 assert sizeof(c_double) == 8
@ -133,50 +131,3 @@ class Sample:
] ]
+ list(map(signal_to_bytes, self.data)) + list(map(signal_to_bytes, self.data))
) )
@dataclass
class Frame(list[Sample]):
"""
A frame VILLASnode of sample indicated by the new_frame flag.
"""
def __init__(self, it: Iterable[Sample]):
super().__init__(it)
def __repr__(self) -> str:
return f"{self.__class__.__name__}({super().__repr__()})"
def digest(self, algorithm: str) -> bytes:
"""
A digest for a frame of samples that is comparable to the digest hook.
"""
hash = hashlib.new(algorithm)
for sample in self:
hash.update(sample._as_digest_bytes())
return hash.digest()
def group(samples: list[Sample]) -> list["Frame"]:
"""
Group samples into Frames according to their new_frame flag.
"""
samples.sort()
if not samples:
return []
frames = []
current_frame = Frame([samples[0]])
for sample in samples[1:]:
if sample.new_frame:
frames.append(current_frame)
current_frame = Frame([sample])
else:
current_frame.append(sample)
frames.append(current_frame)
return frames

View file

@ -0,0 +1,111 @@
"""
Author: Philipp Jungkamp <Philipp.Jungkamp@opal-rt.com>
SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH
SPDX-License-Identifier: Apache-2.0
""" # noqa: E501
from cmath import sqrt
from villas.node.digest import Digest, Frame
from villas.node.sample import Sample, Timestamp
def test_frame_repr():
smp1 = Sample(
ts_origin=Timestamp(123456780),
ts_received=Timestamp(123456781),
sequence=4,
new_frame=True,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
smp2 = Sample(
ts_origin=Timestamp(123456789),
ts_received=Timestamp(123456790),
sequence=5,
new_frame=False,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
frame = Frame([smp1, smp2])
assert frame == eval(repr(frame))
def test_frame_group():
smp1 = Sample(
ts_origin=Timestamp(123456780),
ts_received=Timestamp(123456781),
sequence=4,
new_frame=True,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
smp2 = Sample(
ts_origin=Timestamp(123456789),
ts_received=Timestamp(123456790),
sequence=5,
new_frame=False,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
smp3 = Sample(
ts_origin=Timestamp(123456791),
ts_received=Timestamp(123456793),
sequence=6,
new_frame=True,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
frames = list(Frame.group([smp1, smp2, smp3]))
assert len(frames) == 2
assert list(map(len, frames)) == [2, 1]
assert frames == [[smp1, smp2], [smp3]]
def test_digest_fromframe():
smp1 = Sample(
ts_origin=Timestamp(123456780),
ts_received=Timestamp(123456781),
sequence=4,
new_frame=True,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
smp2 = Sample(
ts_origin=Timestamp(123456789),
ts_received=Timestamp(123456790),
sequence=5,
new_frame=False,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
algorithm = "sha256"
digest_hex = (
"a573e3b0953a1e4f69addf631d6229bb714d263b4f362f0847e96c3838c83217" # noqa: E501
)
digest = Digest(
first=(Timestamp(123456780), 4),
last=(Timestamp(123456789), 5),
algorithm=algorithm,
bytes=bytes.fromhex(digest_hex),
)
assert Frame([smp1, smp2]).digest(algorithm) == digest
def test_digest():
digest_str = "1695904705.856457323-0 1695904709.956060462-41 sha256 8E49482DDDAFF6E3B7411D7D20CA338002126FE109EC1BA5932C02FC5E7EFD23" # noqa: E501
digest = Digest(
first=(Timestamp(1695904705, 856457323), 0),
last=(Timestamp(1695904709, 956060462), 41),
algorithm="sha256",
bytes=bytes.fromhex(
"8E49482DDDAFF6E3B7411D7D20CA338002126FE109EC1BA5932C02FC5E7EFD23"
), # noqa: E501
)
assert Digest.parse(digest_str) == digest
assert digest_str == digest.dump()

View file

@ -3,10 +3,12 @@ Author: Philipp Jungkamp <Philipp.Jungkamp@opal-rt.com>
SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH
SPDX-License-Identifier: Apache-2.0 SPDX-License-Identifier: Apache-2.0
""" # noqa: E501 """ # noqa: E501
from villas.node.sample import Sample, Timestamp
from villas.node.formats import SignalList, VillasHuman
from cmath import sqrt from cmath import sqrt
from villas.node.formats import SignalList, VillasHuman
from villas.node.sample import Sample, Timestamp
def test_signal_list_repr(): def test_signal_list_repr():
signal_list = SignalList("21fb2ic") signal_list = SignalList("21fb2ic")

View file

@ -3,9 +3,11 @@ Author: Philipp Jungkamp <Philipp.Jungkamp@opal-rt.com>
SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH
SPDX-License-Identifier: Apache-2.0 SPDX-License-Identifier: Apache-2.0
""" # noqa: E501 """ # noqa: E501
from villas.node.sample import Sample, Timestamp, Frame
from datetime import datetime, timezone
from cmath import sqrt from cmath import sqrt
from datetime import datetime, timezone
from villas.node.sample import Sample, Timestamp
def test_timestamp_repr(): def test_timestamp_repr():
@ -93,78 +95,3 @@ def test_sample_as_digest_bytes():
digest_bytes_hex = "15cd5b070000000000000000000000000400000000000000000000000000f03f00000000000000400000000000000840012a00000000000000000000000000803f" # noqa: E501 digest_bytes_hex = "15cd5b070000000000000000000000000400000000000000000000000000f03f00000000000000400000000000000840012a00000000000000000000000000803f" # noqa: E501
digest_bytes = bytes.fromhex(digest_bytes_hex) digest_bytes = bytes.fromhex(digest_bytes_hex)
assert smp._as_digest_bytes() == digest_bytes, smp._as_digest_bytes().hex() assert smp._as_digest_bytes() == digest_bytes, smp._as_digest_bytes().hex()
def test_frame_repr():
smp1 = Sample(
ts_origin=Timestamp(123456780),
ts_received=Timestamp(123456781),
sequence=4,
new_frame=True,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
smp2 = Sample(
ts_origin=Timestamp(123456789),
ts_received=Timestamp(123456790),
sequence=5,
new_frame=False,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
frame = Frame([smp1, smp2])
assert frame == eval(repr(frame))
def test_frame_group():
smp1 = Sample(
ts_origin=Timestamp(123456780),
ts_received=Timestamp(123456781),
sequence=4,
new_frame=True,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
smp2 = Sample(
ts_origin=Timestamp(123456789),
ts_received=Timestamp(123456790),
sequence=5,
new_frame=False,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
smp3 = Sample(
ts_origin=Timestamp(123456791),
ts_received=Timestamp(123456793),
sequence=6,
new_frame=True,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
frames = list(Frame.group([smp1, smp2, smp3]))
assert len(frames) == 2
assert list(map(len, frames)) == [2, 1]
assert frames == [[smp1, smp2], [smp3]]
def test_frame_digest():
smp1 = Sample(
ts_origin=Timestamp(123456780),
ts_received=Timestamp(123456781),
sequence=4,
new_frame=True,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
smp2 = Sample(
ts_origin=Timestamp(123456789),
ts_received=Timestamp(123456790),
sequence=5,
new_frame=False,
data=[1.0, 2.0, 3.0, True, 42, sqrt(complex(-1))],
)
digest = bytes.fromhex(
"a573e3b0953a1e4f69addf631d6229bb714d263b4f362f0847e96c3838c83217"
) # noqa: E501
assert Frame([smp1, smp2]).digest("sha256") == digest