From 45d5f9b2622669a6f4ac80ad2dce74e55b64f43b Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sun, 7 Jun 2020 18:12:58 +0100 Subject: [PATCH] python: add new communicate function --- python/setup.py | 3 +- python/villas/node/communicate.py | 80 +++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 1 deletion(-) create mode 100644 python/villas/node/communicate.py diff --git a/python/setup.py b/python/setup.py index 30bb9f711..72131d1fc 100644 --- a/python/setup.py +++ b/python/setup.py @@ -27,7 +27,8 @@ setup( 'Programming Language :: Python :: 3' ], install_requires = [ - 'requests' + 'requests', + 'linuxfd' ], setup_requires = [ 'm2r' diff --git a/python/villas/node/communicate.py b/python/villas/node/communicate.py new file mode 100644 index 000000000..206e3e623 --- /dev/null +++ b/python/villas/node/communicate.py @@ -0,0 +1,80 @@ +import time +import logging +import sys +import linuxfd +from villas.node.sample import Sample, Timestamp +from threading import Thread + +logger = logging.getLogger(__name__) + +class RecvThread(Thread): + + def __init__(self, cb): + super().__init__() + + self.cb = cb + self.daemon = True + + def run(self): + for line in sys.stdin: + if line.startswith('#'): + continue + + logger.debug("RecvThread: {}".format(line)) + + sample = Sample.parse(line) + + self.cb(sample.values) + + +class SendThread(Thread): + + def __init__(self, cb, rate=None): + super().__init__() + + self.cb = cb + self.rate = rate + self.daemon = True + + self.sequence = 0 + + def run(self): + + if self.rate: + tfd = linuxfd.timerfd() + tfd.settime(1.0, 1.0 / self.rate) + else: + tfd = None + + while True: + if tfd: + tfd.read() + + values = self.cb() + ts = Timestamp.now(None, self.sequence) + + sample = Sample(ts, values) + + sys.stdout.write(str(sample) + '\n') + + self.sequence += 1 + + +def communicate(rate, recv_cb=None, send_cb=None): + + if recv_cb: + rt = RecvThread(recv_cb) + rt.start() + + if send_cb: + st = SendThread(send_cb, rate) + st.start() + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + logger.info('Received Ctrl+C. Stopping send/recv threads') + + # Threads are daemon threads + # and therefore killed with program termination \ No newline at end of file