1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00
VILLASnode/villas-digest.py
2023-11-30 10:53:04 +01:00

160 lines
4 KiB
Python
Executable file

#!/usr/bin/env python
import hashlib
import json
import sys
from ctypes import c_double, c_float, sizeof
from dataclasses import dataclass
from enum import Enum, auto
assert sizeof(c_float) == 4
assert sizeof(c_double) == 8
@dataclass
class Timestamp:
sec: int
nsec: int
def as_bytes(self) -> bytes:
sec = self.sec.to_bytes(8, "little")
nsec = self.nsec.to_bytes(8, "little")
return bytes().join([sec, nsec])
@dataclass
class Sequence:
number: int
def as_bytes(self) -> bytes:
return self.number.to_bytes(8, "little")
class SignalType(Enum):
Boolean = auto()
Integer = auto()
Float = auto()
Complex = auto()
Invalid = auto()
SignalData = bool | int | float | complex
@dataclass
class Signal:
data: SignalData
@property
def type(self) -> SignalType:
match self.data:
case bool(_):
return SignalType.Boolean
case int(_):
return SignalType.Integer
case float(_):
return SignalType.Float
case complex(_):
return SignalType.Complex
def as_bytes(self, byteorder: str) -> bytes:
match self.data:
case bool(b):
return b.to_bytes(1, "little")
case int(i):
return i.to_bytes(8, "little")
case float(f):
i = int.from_bytes(bytes(c_double(f)), sys.byteorder)
return i.to_bytes(8, "little")
case complex(z):
i_real = int.from_bytes(bytes(c_float(z.real)), sys.byteorder)
i_imag = int.from_bytes(bytes(c_float(z.imag)), sys.byteorder)
real = i_real.to_bytes(4, "little")
imag = i_imag.to_bytes(4, "little")
return bytes().join([real, imag])
@dataclass
class Sample:
timestamp: Timestamp
sequence: Sequence
signals: list[Signal]
new_frame: bool
def parse(sample: dict) -> "Sample":
ts = sample["ts"]["origin"]
timestamp = Timestamp(sec=ts[0], nsec=ts[1])
sequence = Sequence(sample["sequence"])
signals = [Signal(s) for s in sample["data"]]
new_frame = "flags" in sample and "new_frame" in sample["flags"]
return Sample(timestamp, sequence, signals, new_frame)
def as_bytes(self) -> bytes:
return bytes().join(
[
self.timestamp.as_bytes(),
self.sequence.as_bytes(),
*(signal.as_bytes() for signal in self.signals),
]
)
@dataclass
class Frame:
samples: list[Sample]
def digest(self, algorithm: str) -> bytes:
hash = hashlib.new(algorithm)
for sample in self.samples:
b = sample.as_bytes()
hash.update(b)
return hash.digest()
def group(samples: list[Sample]) -> list["Frame"]:
samples.sort()
if not samples:
return []
frames = []
current_frame = Frame([samples[0]])
for sample in samples[1:]:
if sample.new_frame:
frames.append(current_frame)
current_frame = Frame([sample])
else:
current_frame.samples.append(sample)
frames.append(current_frame)
return frames
def main():
with open("signals.json", "r") as file:
samples = [Sample.parse(json.loads(s)) for s in file.readlines()]
frames = Frame.split(samples)
algorithm = "sha256"
print("BEGIN FRAMES")
for frame in frames:
first = frame.samples[0]
last = frame.samples[-1]
first_ts = first.timestamp
last_ts = last.timestamp
print("\t", end="")
print(
f"{first_ts.sec}.{first_ts.nsec:09}-{first.sequence.number}",
f"{last_ts.sec}.{last_ts.nsec:09}-{first.sequence.number}",
algorithm,
f"{frame.digest(algorithm).hex().upper()}",
)
print("END FRAMES")
if __name__ == "__main__":
main()