diff --git a/clients/python/client-new.py b/clients/python/client-new.py new file mode 100644 index 000000000..0c08f7025 --- /dev/null +++ b/clients/python/client-new.py @@ -0,0 +1,84 @@ +import math + +from villas.node.node import Node +from villas.node.zeromq import ZeroMQNode + + +def main(): + zmq = ZeroMQInterface() + + zmq_config = zmq.config + zmq_config['in']['signals'] = [ + { + 'name': 'a', + 'type': 'float' + }, + { + 'name': 'b', + 'type': 'float' + } + ] + + hook_signals = [ + { + 'name': 'sum', + 'type': 'float', + 'expression': 'smp.data.a + smp.data.b' + }, + { + 'name': 'hyp', + 'type': 'float', + 'expression': 'math.sqrt(smp.data.a^2 + smp.data.b^2)' + }, + { + 'name': 'max', + 'type': 'float', + 'expression': 'math.max(smp.data.sine, smp.data[3])' + } + ] + + config = { + 'nodes': { + 'zmq': zmq_config, + 'sin_a': { + 'type': 'signal' + 'signal': 'sine', + 'phase': 0 + }, + 'sin_b': { + 'type': 'signal' + 'signal': 'sine', + 'phase': math.pi / 2 + } + }, + 'paths': [ + { + 'in': [ + 'zmq.a', + 'zmq.b', + 'sin_a.sine', + 'sin_b.sine' + ], + 'out': 'zmq', + 'hooks': hook_signals + } + ] + } + + node = Node(config) + node.start() + + while x in range(0, 100): + smp = + + zmq.send(smp) + + smp_recv = zmq.recv() + + # assert smp_recv + + node.stop() + + +if __name__ == '__main__': + main() diff --git a/python/setup.py b/python/setup.py index 0d5fb6a43..6887432e7 100644 --- a/python/setup.py +++ b/python/setup.py @@ -28,7 +28,9 @@ setup( 'Programming Language :: Python :: 3' ], install_requires=[ - 'requests' + 'requests', + 'protobuf', + 'pyzmq' ] + [ 'linuxfd' ] if sys.platform == 'linux' else [], diff --git a/python/villas/node/node.py b/python/villas/node/node.py index bf4c1ce2c..cbde39182 100644 --- a/python/villas/node/node.py +++ b/python/villas/node/node.py @@ -56,7 +56,7 @@ class Node(object): self.log = open(self.log_filename, 'w+') - LOGGER.info("Starting VILLASnode instance with config: %s", + LOGGER.info("Starting VILLASnode instance with config: {}", self.config_file.name) self.child = subprocess.Popen([self.executable, self.config_file.name], diff --git a/python/villas/node/sample.py b/python/villas/node/sample.py index 2f8e5914b..eac314a6f 100644 --- a/python/villas/node/sample.py +++ b/python/villas/node/sample.py @@ -1,8 +1,8 @@ import re +import villas.node.sample.villas_pb2 from datetime import datetime from functools import total_ordering - @total_ordering class Timestamp: """Parsing the VILLASnode human-readable timestamp format""" @@ -75,6 +75,27 @@ class Sample: @classmethod def parse(cls, line): + return cls.decode(format='villas.human', line) + + @classmethod + def decode(cls, format='protobuf', buffer): + if format == 'protobuf': + return cls.decode_protobuf(buffer) + elif format == 'villas.human': + return cls.decode_villas_human(buffer) + else: + raise NotImplementedError() + + def encode(self, format='protobuf'): + if format == 'protobuf': + return cls.encode_protobuf() + elif format == 'villas.human': + return cls.encode_villas_human() + else: + raise NotImplementedError() + + @classmethod + def decode_villas_human(self, buf): csv = line.split() ts = Timestamp.parse(csv[0]) @@ -97,6 +118,65 @@ class Sample: return Sample(ts, vs) + def encode_villas_human(self): + return bytes(self.__str__()) + + @classmethod + def decode_protobuf(cls, buffer): + msg = villas_pb2.Message() + msg.ParseFromString(buffer) + + samples = [] + for smp in msg: + if smp.HasField('timestamp'): + ts = Timestamp( + smp.timestamp.sec, + smp.timestamp.nsec + ) + else: + ts = Timestamp.now() + + if smp.HasField('sequence'): + ts.sequence = smp.sequence + + values = [] + for val in smp.values: + if val.HasField('f'): + values.append(val.f) + elif val.HasField('i'): + values.append(val.i) + elif val.HasField('b'): + values.append(val.b) + elif val.HasField('z'): + values.append(val.z) + + sample = cls(ts, values) + + def encode_protobuf(self): + msg = villas_pb2.Message() + + smp = msg.samples.add() + + ts = smp.timestamp.add() + + ts.sec = self.ts.seconds + ts.nsec = self.ts.nanoseconds + + for value in self.values: + val = smp.values.add() + + if type(value) is int: + val.i = value + elif type(value) is float: + val.f = value + elif type(value) is bool: + val.b = value + elif type(value) is complex: + val.z.real = value.real + val.z.imag = value.imag + + return msg.SerializeToString() + def __str__(self): return '%s\t%s' % (self.ts, "\t".join(map(str, self.values))) diff --git a/python/villas/node/villas_pb2.py b/python/villas/node/villas_pb2.py new file mode 100644 index 000000000..1b90101cd --- /dev/null +++ b/python/villas/node/villas_pb2.py @@ -0,0 +1,329 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: villas.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='villas.proto', + package='villas.node', + syntax='proto2', + serialized_pb=_b('\n\x0cvillas.proto\x12\x0bvillas.node\"/\n\x07Message\x12$\n\x07samples\x18\x01 \x03(\x0b\x32\x13.villas.node.Sample\"\xbe\x01\n\x06Sample\x12,\n\x04type\x18\x01 \x02(\x0e\x32\x18.villas.node.Sample.Type:\x04\x44\x41TA\x12\x10\n\x08sequence\x18\x02 \x01(\x04\x12)\n\ttimestamp\x18\x04 \x01(\x0b\x32\x16.villas.node.Timestamp\x12\"\n\x06values\x18\x05 \x03(\x0b\x32\x12.villas.node.Value\"%\n\x04Type\x12\x08\n\x04\x44\x41TA\x10\x01\x12\t\n\x05START\x10\x02\x12\x08\n\x04STOP\x10\x03\"&\n\tTimestamp\x12\x0b\n\x03sec\x18\x01 \x02(\r\x12\x0c\n\x04nsec\x18\x02 \x02(\r\"Z\n\x05Value\x12\x0b\n\x01\x66\x18\x01 \x01(\x01H\x00\x12\x0b\n\x01i\x18\x02 \x01(\x03H\x00\x12\x0b\n\x01\x62\x18\x03 \x01(\x08H\x00\x12!\n\x01z\x18\x04 \x01(\x0b\x32\x14.villas.node.ComplexH\x00\x42\x07\n\x05value\"%\n\x07\x43omplex\x12\x0c\n\x04real\x18\x01 \x02(\x02\x12\x0c\n\x04imag\x18\x02 \x02(\x02') +) + + + +_SAMPLE_TYPE = _descriptor.EnumDescriptor( + name='Type', + full_name='villas.node.Sample.Type', + filename=None, + file=DESCRIPTOR, + values=[ + _descriptor.EnumValueDescriptor( + name='DATA', index=0, number=1, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='START', index=1, number=2, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='STOP', index=2, number=3, + options=None, + type=None), + ], + containing_type=None, + options=None, + serialized_start=232, + serialized_end=269, +) +_sym_db.RegisterEnumDescriptor(_SAMPLE_TYPE) + + +_MESSAGE = _descriptor.Descriptor( + name='Message', + full_name='villas.node.Message', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='samples', full_name='villas.node.Message.samples', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=29, + serialized_end=76, +) + + +_SAMPLE = _descriptor.Descriptor( + name='Sample', + full_name='villas.node.Sample', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='type', full_name='villas.node.Sample.type', index=0, + number=1, type=14, cpp_type=8, label=2, + has_default_value=True, default_value=1, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='sequence', full_name='villas.node.Sample.sequence', index=1, + number=2, type=4, cpp_type=4, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='timestamp', full_name='villas.node.Sample.timestamp', index=2, + number=4, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='values', full_name='villas.node.Sample.values', index=3, + number=5, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + _SAMPLE_TYPE, + ], + options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=79, + serialized_end=269, +) + + +_TIMESTAMP = _descriptor.Descriptor( + name='Timestamp', + full_name='villas.node.Timestamp', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='sec', full_name='villas.node.Timestamp.sec', index=0, + number=1, type=13, cpp_type=3, label=2, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='nsec', full_name='villas.node.Timestamp.nsec', index=1, + number=2, type=13, cpp_type=3, label=2, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=271, + serialized_end=309, +) + + +_VALUE = _descriptor.Descriptor( + name='Value', + full_name='villas.node.Value', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='f', full_name='villas.node.Value.f', index=0, + number=1, type=1, cpp_type=5, label=1, + has_default_value=False, default_value=float(0), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='i', full_name='villas.node.Value.i', index=1, + number=2, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='b', full_name='villas.node.Value.b', index=2, + number=3, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='z', full_name='villas.node.Value.z', index=3, + number=4, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + _descriptor.OneofDescriptor( + name='value', full_name='villas.node.Value.value', + index=0, containing_type=None, fields=[]), + ], + serialized_start=311, + serialized_end=401, +) + + +_COMPLEX = _descriptor.Descriptor( + name='Complex', + full_name='villas.node.Complex', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='real', full_name='villas.node.Complex.real', index=0, + number=1, type=2, cpp_type=6, label=2, + has_default_value=False, default_value=float(0), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='imag', full_name='villas.node.Complex.imag', index=1, + number=2, type=2, cpp_type=6, label=2, + has_default_value=False, default_value=float(0), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=403, + serialized_end=440, +) + +_MESSAGE.fields_by_name['samples'].message_type = _SAMPLE +_SAMPLE.fields_by_name['type'].enum_type = _SAMPLE_TYPE +_SAMPLE.fields_by_name['timestamp'].message_type = _TIMESTAMP +_SAMPLE.fields_by_name['values'].message_type = _VALUE +_SAMPLE_TYPE.containing_type = _SAMPLE +_VALUE.fields_by_name['z'].message_type = _COMPLEX +_VALUE.oneofs_by_name['value'].fields.append( + _VALUE.fields_by_name['f']) +_VALUE.fields_by_name['f'].containing_oneof = _VALUE.oneofs_by_name['value'] +_VALUE.oneofs_by_name['value'].fields.append( + _VALUE.fields_by_name['i']) +_VALUE.fields_by_name['i'].containing_oneof = _VALUE.oneofs_by_name['value'] +_VALUE.oneofs_by_name['value'].fields.append( + _VALUE.fields_by_name['b']) +_VALUE.fields_by_name['b'].containing_oneof = _VALUE.oneofs_by_name['value'] +_VALUE.oneofs_by_name['value'].fields.append( + _VALUE.fields_by_name['z']) +_VALUE.fields_by_name['z'].containing_oneof = _VALUE.oneofs_by_name['value'] +DESCRIPTOR.message_types_by_name['Message'] = _MESSAGE +DESCRIPTOR.message_types_by_name['Sample'] = _SAMPLE +DESCRIPTOR.message_types_by_name['Timestamp'] = _TIMESTAMP +DESCRIPTOR.message_types_by_name['Value'] = _VALUE +DESCRIPTOR.message_types_by_name['Complex'] = _COMPLEX +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +Message = _reflection.GeneratedProtocolMessageType('Message', (_message.Message,), dict( + DESCRIPTOR = _MESSAGE, + __module__ = 'villas_pb2' + # @@protoc_insertion_point(class_scope:villas.node.Message) + )) +_sym_db.RegisterMessage(Message) + +Sample = _reflection.GeneratedProtocolMessageType('Sample', (_message.Message,), dict( + DESCRIPTOR = _SAMPLE, + __module__ = 'villas_pb2' + # @@protoc_insertion_point(class_scope:villas.node.Sample) + )) +_sym_db.RegisterMessage(Sample) + +Timestamp = _reflection.GeneratedProtocolMessageType('Timestamp', (_message.Message,), dict( + DESCRIPTOR = _TIMESTAMP, + __module__ = 'villas_pb2' + # @@protoc_insertion_point(class_scope:villas.node.Timestamp) + )) +_sym_db.RegisterMessage(Timestamp) + +Value = _reflection.GeneratedProtocolMessageType('Value', (_message.Message,), dict( + DESCRIPTOR = _VALUE, + __module__ = 'villas_pb2' + # @@protoc_insertion_point(class_scope:villas.node.Value) + )) +_sym_db.RegisterMessage(Value) + +Complex = _reflection.GeneratedProtocolMessageType('Complex', (_message.Message,), dict( + DESCRIPTOR = _COMPLEX, + __module__ = 'villas_pb2' + # @@protoc_insertion_point(class_scope:villas.node.Complex) + )) +_sym_db.RegisterMessage(Complex) + + +# @@protoc_insertion_point(module_scope) diff --git a/python/villas/node/zeromq.py b/python/villas/node/zeromq.py new file mode 100644 index 000000000..db7882780 --- /dev/null +++ b/python/villas/node/zeromq.py @@ -0,0 +1,76 @@ +import zmq +import zmq.asyncio + +class Interface: + + def __init__(self, format): + self.format = format + + +class ZeroMQInterface(Interface): + + def __init__(self, format): + super().__init__(format) + + self.context = zmq.Context() + self.socket = context.socket(zmq.REP) + + @property + def config(self): + return { + 'type': 'zeromq', + 'pattern': 'pubsub', + 'ipv6': False, + + 'curve': { # Z85 encoded Curve25519 keys + 'enabled': False, + 'public_key': 'Veg+Q.V-c&1k>yVh663gQ^7fL($y47gybE-nZP1L' + 'secret_key': 'HPY.+mFuB[jGs@(zZr6$IZ1H1dZ7Ji*j>oi@O?Pc' + } + + 'in': { + 'subscribe': '', + 'filter': self.filter + }, + 'out': { + 'publish': [ + '' + ], + 'filter': self.filter + } + } + + def connect(self, addr): + self.socket.connect(addr) + + def bind(self, addr): + self.socket.bind(addr) + + def send(self, sample): + buffer = sample.encode(self.format, sample) + + self.socket.send(buffer) + + def recv(self): + buffer = self.socket.recv() + + return Sample.decode(self.format, buffer) + + +class AsyncZeroMQNode(ZeroMQNode): + + def __init__(self, format): + Interface.__init__(self, format) + + self.context = zmq.asyncio.Context() + self.socket = context.socket(zmq.REP) + + async def send(self, sample): + buffer = sample.encode(self.format, sample) + + await self.socket.send(buffer) + + async def recv(self): + buffer = await self.socket.recv() + + return Sample.decode(self.format, buffer)