1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

uldaq: make use of events to signal availability of new data

This commit is contained in:
Steffen Vogel 2018-09-25 20:27:01 +02:00
parent 350112db1c
commit 0ba65c3ae1
2 changed files with 70 additions and 30 deletions

View file

@ -30,6 +30,8 @@
#pragma once
#include <pthread.h>
#include <villas/queue_signalled.h>
#include <villas/pool.h>
@ -48,12 +50,19 @@ struct uldaq {
DaqDeviceInterface device_interface_type;
struct {
double *buffer;
double sample_rate;
double *buffer;
unsigned buffer_len;
unsigned channel_count;
ScanOption scan_options;
AInScanFlag flags;
AiQueueElement *queues;
ScanStatus status; // protected by mutex
TransferStatus transfer_status; // protected by mutex
pthread_mutex_t mutex;
pthread_cond_t cv;
} in;
struct {

View file

@ -176,6 +176,9 @@ int uldaq_init(struct node *n)
u->in.scan_options = (ScanOption) (SO_DEFAULTIO | SO_CONTINUOUS);
u->in.flags = AINSCAN_FF_DEFAULT;
pthread_mutex_init(&u->in.mutex, NULL);
pthread_cond_init(&u->in.cv, NULL);
return 0;
}
@ -186,6 +189,9 @@ int uldaq_destroy(struct node *n)
if (u->in.queues)
free(u->in.queues);
pthread_mutex_destroy(&u->in.mutex);
pthread_cond_destroy(&u->in.cv);
return 0;
}
@ -222,7 +228,8 @@ int uldaq_parse(struct node *n, json_t *cfg)
u->device_interface_type = iftype;
}
u->in.queues = realloc(u->in.queues, sizeof(struct AiQueueElement) * list_length(&n->signals));
u->in.channel_count = list_length(&n->signals);
u->in.queues = realloc(u->in.queues, sizeof(struct AiQueueElement) * u->in.channel_count);
json_array_foreach(json_signals, i, json_signal) {
const char *range_str = NULL, *input_mode_str = NULL;
@ -305,27 +312,40 @@ int uldaq_check(struct node *n)
return 0;
}
void uldaq_data_available(DaqDeviceHandle device_handle, DaqEventType event_type, unsigned long long event_data, void *ctx)
{
UlError err;
struct uldaq *u = (struct uldaq *) ctx;
pthread_mutex_lock(&u->in.mutex);
err = ulAInScanStatus(device_handle, &u->in.status, &u->in.transfer_status);
if (err != ERR_NO_ERROR)
warn("Failed to retrieve scan status in event callback");
pthread_mutex_unlock(&u->in.mutex);
/* Signal uldaq_read() about new data */
pthread_cond_signal(&u->in.cv);
}
int uldaq_start(struct node *n)
{
struct uldaq *u = (struct uldaq *) n->_vd;
u->in.scan_options = (ScanOption) (SO_DEFAULTIO | SO_CONTINUOUS);//it looks like the init function is not called
unsigned num_devs = ULDAQ_MAX_DEV_COUNT;
DaqDeviceDescriptor descriptors[num_devs];
ScanStatus status;
TransferStatus transfer_status;
UlError err;
// allocate a buffer to receive the data
u->in.buffer = (double *) alloc(list_length(&n->signals) * n->in.vectorize * sizeof(double));
if (u->in.buffer == 0) {
/* Allocate a buffer to receive the data */
u->in.buffer_len = u->in.channel_count * n->in.vectorize;
u->in.buffer = (double *) alloc(u->in.buffer_len * sizeof(double));
if (!u->in.buffer) {
warn("Out of memory, unable to create scan buffer");
return -1;
}
// Get descriptors for all of the available DAQ devices
/* Get descriptors for all of the available DAQ devices */
err = ulGetDaqDeviceInventory(u->device_interface_type, descriptors, &num_devs);
if (err != ERR_NO_ERROR) {
warn("Failed to retrieve DAQ device list for node '%s'", node_name(n));
@ -366,6 +386,9 @@ int uldaq_start(struct node *n)
return -1;
}
/* Enable the event to be notified every time samples are available */
err = ulEnableEvent(u->device_handle, DE_ON_DATA_AVAILABLE, n->in.vectorize, uldaq_data_available, u);
/* Start the acquisition */
err = ulAInScan(u->device_handle, 0, 0, 0, 0, n->in.vectorize, &u->in.sample_rate, u->in.scan_options, u->in.flags, u->in.buffer);
if (err != ERR_NO_ERROR) {
@ -374,13 +397,13 @@ int uldaq_start(struct node *n)
}
/* Get the initial status of the acquisition */
err = ulAInScanStatus(u->device_handle, &status, &transfer_status);
err = ulAInScanStatus(u->device_handle, &u->in.status, &u->in.transfer_status);
if (err != ERR_NO_ERROR) {
warn("Failed to retrieve scan status on DAQ device for node '%s'", node_name(n));
return -1;
}
if (status != SS_RUNNING) {
if (u->in.status != SS_RUNNING) {
warn ("Acquisition did not start on DAQ device for node '%s'", node_name(n));
return -1;
}
@ -393,21 +416,23 @@ int uldaq_stop(struct node *n)
struct uldaq *u = (struct uldaq *) n->_vd;
UlError err;
ScanStatus status;
TransferStatus transferStatus;
pthread_mutex_lock(&u->in.mutex);
/* Get the current status of the acquisition */
err = ulAInScanStatus(u->device_handle, &status, &transferStatus);
err = ulAInScanStatus(u->device_handle, &u->in.status, &u->in.transfer_status);
if (err != ERR_NO_ERROR)
return -1;
/* Stop the acquisition if it is still running */
if (status == SS_RUNNING) {
if (u->in.status == SS_RUNNING) {
err = ulAInScanStop(u->device_handle);
if (err != ERR_NO_ERROR)
return -1;
}
pthread_mutex_unlock(&u->in.mutex);
err = ulDisconnectDaqDevice(u->device_handle);
if (err != ERR_NO_ERROR)
return -1;
@ -423,28 +448,34 @@ int uldaq_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
{
struct uldaq *u = (struct uldaq *) n->_vd;
UlError err;
ScanStatus status;
TransferStatus transferStatus;
/* Wait for data available condition triggered by event callback */
pthread_mutex_lock(&u->in.mutex);
pthread_cond_wait(&u->in.cv, &u->in.mutex);
/* Get the current status of the acquisition */
err = ulAInScanStatus(u->device_handle, &status, &transferStatus);
if (err != ERR_NO_ERROR)
if (u->in.status != SS_RUNNING)
return -1;
if (status == SS_RUNNING) {
int index = transferStatus.currentIndex;
if (cnt != n->in.vectorize)
return -1;
struct sample *smp = smps[0];
long long start_index = u->in.transfer_status.currentTotalCount - n->in.vectorize * u->in.channel_count;
for (int i = 0; i < list_length(&n->signals); i++) {
smp->data[i].f = u->in.buffer[index + i];
for (int j = 0; j < n->in.vectorize; j++) {
struct sample *smp = smps[j];
long long scan_index = start_index + j + u->in.channel_count;
for (int i = 0; i < u->in.channel_count; i++) {
long long channel_index = (scan_index + i) % u->in.buffer_len;
smp->data[i].f = u->in.buffer[channel_index];
}
}
return 1;
}
pthread_mutex_unlock(&u->in.mutex);
return cnt;
}
static struct plugin p = {
.name = "uldaq",