1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-16 00:00:02 +01:00
VILLASnode/python/villas/node/sample.py
2021-03-19 06:37:44 -04:00

187 lines
5.1 KiB
Python

import re
import villas.node.sample.villas_pb2
from datetime import datetime
from functools import total_ordering
@total_ordering
class Timestamp:
"""Parsing the VILLASnode human-readable timestamp format"""
def __init__(self, seconds=None, nanoseconds=None,
offset=None, sequence=None):
self.seconds = seconds
self.nanoseconds = nanoseconds
self.offset = offset
self.sequence = sequence
@classmethod
def now(cls, offset=None, sequence=None):
n = datetime.utcnow()
secs = int(n.timestamp())
nsecs = 1000 * n.microsecond
return Timestamp(seconds=secs, nanoseconds=nsecs,
offset=offset, sequence=sequence)
@classmethod
def parse(cls, ts):
m = re.match(r'(\d+)(?:\.(\d+))?([-+]\d+(?:\.\d+)?'
r'(?:e[+-]?\d+)?)?(?:\((\d+)\))?', ts)
seconds = int(m.group(1)) # Mandatory
nanoseconds = int(m.group(2)) if m.group(2) else None
offset = float(m.group(3)) if m.group(3) else None
sequence = int(m.group(4)) if m.group(4) else None
return Timestamp(seconds, nanoseconds, offset, sequence)
def __str__(self):
str = "%u" % (self.seconds)
if self.nanoseconds is not None:
str += ".%09u" % self.nanoseconds
if self.offset is not None:
str += "+%u" % self.offset
if self.sequence is not None:
str += "(%u)" % self.sequence
return str
def __float__(self):
sum = float(self.seconds)
if self.nanoseconds is not None:
sum += self.nanoseconds * 1e-9
if self.offset is not None:
sum += self.offset
return sum
def __eq__(self, other):
return float(self) == float(other)
def __lt__(self, other):
return float(self) < float(other)
@total_ordering
class Sample:
"""Parsing a VILLASnode sample from a file (not a UDP package!!)"""
def __init__(self, ts, values):
self.ts = ts
self.values = values
@classmethod
def parse(cls, line):
return cls.decode(format='villas.human', line)
@classmethod
def decode(cls, format='protobuf', buffer):
if format == 'protobuf':
return cls.decode_protobuf(buffer)
elif format == 'villas.human':
return cls.decode_villas_human(buffer)
else:
raise NotImplementedError()
def encode(self, format='protobuf'):
if format == 'protobuf':
return cls.encode_protobuf()
elif format == 'villas.human':
return cls.encode_villas_human()
else:
raise NotImplementedError()
@classmethod
def decode_villas_human(self, buf):
csv = line.split()
ts = Timestamp.parse(csv[0])
vs = []
for value in csv[1:]:
try:
v = float(value)
except ValueError:
value = value.lower()
try:
v = complex(value)
except Exception:
if value.endswith('i'):
v = complex(value.replace('i', 'j'))
else:
raise ValueError()
vs.append(v)
return Sample(ts, vs)
def encode_villas_human(self):
return bytes(self.__str__())
@classmethod
def decode_protobuf(cls, buffer):
msg = villas_pb2.Message()
msg.ParseFromString(buffer)
samples = []
for smp in msg:
if smp.HasField('timestamp'):
ts = Timestamp(
smp.timestamp.sec,
smp.timestamp.nsec
)
else:
ts = Timestamp.now()
if smp.HasField('sequence'):
ts.sequence = smp.sequence
values = []
for val in smp.values:
if val.HasField('f'):
values.append(val.f)
elif val.HasField('i'):
values.append(val.i)
elif val.HasField('b'):
values.append(val.b)
elif val.HasField('z'):
values.append(val.z)
sample = cls(ts, values)
def encode_protobuf(self):
msg = villas_pb2.Message()
smp = msg.samples.add()
ts = smp.timestamp.add()
ts.sec = self.ts.seconds
ts.nsec = self.ts.nanoseconds
for value in self.values:
val = smp.values.add()
if type(value) is int:
val.i = value
elif type(value) is float:
val.f = value
elif type(value) is bool:
val.b = value
elif type(value) is complex:
val.z.real = value.real
val.z.imag = value.imag
return msg.SerializeToString()
def __str__(self):
return '%s\t%s' % (self.ts, "\t".join(map(str, self.values)))
def __eq__(self, other):
return self.ts == other.ts
def __lt__(self, other):
return self.ts < other.ts