1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-16 00:00:02 +01:00
VILLASnode/python/villas/node/communicate.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

84 lines
2 KiB
Python
Raw Permalink Normal View History

"""
Author: Steffen Vogel <post@steffenvogel.de>
SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University
SPDX-License-Identifier: Apache-2.0
""" # noqa: E501
2020-06-07 18:12:58 +01:00
import logging
import sys
from threading import Thread
from typing import Callable
import linuxfd # type: ignore[import]
from villas.node.formats import VillasHuman
from villas.node.sample import Sample
2020-06-07 18:12:58 +01:00
logger = logging.getLogger(__name__)
2020-06-07 23:24:21 +02:00
RecvCallback = Callable[[Sample], None]
SendCallback = Callable[[int], Sample]
2020-06-07 18:12:58 +01:00
class RecvThread(Thread):
def __init__(self, cb: RecvCallback):
super().__init__()
2020-06-07 18:12:58 +01:00
self.cb = cb
self.daemon = True
self.format = VillasHuman()
2020-06-07 18:12:58 +01:00
def run(self):
for line in sys.stdin:
logger.debug(f"RecvThread: {line}")
2020-06-07 18:12:58 +01:00
if (sample := self.format.load_sample(line)) is not None:
self.cb(sample)
2020-06-07 18:12:58 +01:00
class SendThread(Thread):
def __init__(self, cb: SendCallback, rate: float):
2020-06-07 18:12:58 +01:00
super().__init__()
self.cb = cb
self.daemon = True
self.format = VillasHuman()
self.rate = rate
2020-06-07 18:12:58 +01:00
self.sequence = 0
def run(self):
tfd = linuxfd.timerfd()
tfd.settime(1.0, 1.0 / self.rate)
2020-06-07 18:12:58 +01:00
while True:
tfd.read()
2020-06-07 18:12:58 +01:00
sample = self.cb(self.sequence)
if sample is None:
continue
2020-06-07 18:12:58 +01:00
sample = self.format.dump_sample(sample)
sys.stdout.write(sample)
sys.stdout.flush()
2020-06-07 18:12:58 +01:00
self.sequence += 1
def communicate(
rate: float,
recv_cb: RecvCallback | None = None,
send_cb: SendCallback | None = None,
wait: bool = True,
):
if recv_cb is not None:
2020-06-07 18:12:58 +01:00
rt = RecvThread(recv_cb)
rt.start()
if send_cb is not None:
2020-06-07 18:12:58 +01:00
st = SendThread(send_cb, rate)
st.start()
if wait:
try:
rt.join()
st.join()
except KeyboardInterrupt:
logger.info("Received Ctrl+C. Stopping send/recv threads")