mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-16 00:00:02 +01:00
83 lines
1.7 KiB
Python
83 lines
1.7 KiB
Python
import time
|
|
import logging
|
|
import sys
|
|
import linuxfd
|
|
from villas.node.sample import Sample, Timestamp
|
|
from threading import Thread
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RecvThread(Thread):
|
|
|
|
def __init__(self, cb):
|
|
super().__init__()
|
|
|
|
self.cb = cb
|
|
self.daemon = True
|
|
|
|
def run(self):
|
|
for line in sys.stdin:
|
|
if line.startswith('#'):
|
|
continue
|
|
|
|
logger.debug("RecvThread: {}".format(line))
|
|
|
|
sample = Sample.parse(line)
|
|
|
|
self.cb(sample.values)
|
|
|
|
|
|
class SendThread(Thread):
|
|
|
|
def __init__(self, cb, rate=None):
|
|
super().__init__()
|
|
|
|
self.cb = cb
|
|
self.rate = rate
|
|
self.daemon = True
|
|
|
|
self.sequence = 0
|
|
|
|
def run(self):
|
|
|
|
if self.rate:
|
|
tfd = linuxfd.timerfd()
|
|
tfd.settime(1.0, 1.0 / self.rate)
|
|
else:
|
|
tfd = None
|
|
|
|
while True:
|
|
if tfd:
|
|
tfd.read()
|
|
|
|
values = self.cb()
|
|
ts = Timestamp.now(None, self.sequence)
|
|
|
|
sample = Sample(ts, values)
|
|
|
|
sys.stdout.write(str(sample) + '\n')
|
|
sys.stdout.flush()
|
|
|
|
self.sequence += 1
|
|
|
|
|
|
def communicate(rate, recv_cb=None, send_cb=None, wait=True):
|
|
|
|
if recv_cb:
|
|
rt = RecvThread(recv_cb)
|
|
rt.start()
|
|
|
|
if send_cb:
|
|
st = SendThread(send_cb, rate)
|
|
st.start()
|
|
|
|
if wait:
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
logger.info('Received Ctrl+C. Stopping send/recv threads')
|
|
|
|
# Threads are daemon threads
|
|
# and therefore killed with program termination
|