diff --git a/include/villas/nodes/uldaq.h b/include/villas/nodes/uldaq.h index 5f317f45a..79d817e2c 100644 --- a/include/villas/nodes/uldaq.h +++ b/include/villas/nodes/uldaq.h @@ -30,6 +30,8 @@ #pragma once +#include + #include #include @@ -48,12 +50,19 @@ struct uldaq { DaqDeviceInterface device_interface_type; struct { - double *buffer; double sample_rate; + double *buffer; + unsigned buffer_len; + unsigned channel_count; ScanOption scan_options; AInScanFlag flags; AiQueueElement *queues; + ScanStatus status; // protected by mutex + TransferStatus transfer_status; // protected by mutex + + pthread_mutex_t mutex; + pthread_cond_t cv; } in; struct { diff --git a/lib/nodes/uldaq.c b/lib/nodes/uldaq.c index 14c7f06eb..35296e215 100644 --- a/lib/nodes/uldaq.c +++ b/lib/nodes/uldaq.c @@ -176,6 +176,9 @@ int uldaq_init(struct node *n) u->in.scan_options = (ScanOption) (SO_DEFAULTIO | SO_CONTINUOUS); u->in.flags = AINSCAN_FF_DEFAULT; + pthread_mutex_init(&u->in.mutex, NULL); + pthread_cond_init(&u->in.cv, NULL); + return 0; } @@ -186,6 +189,9 @@ int uldaq_destroy(struct node *n) if (u->in.queues) free(u->in.queues); + pthread_mutex_destroy(&u->in.mutex); + pthread_cond_destroy(&u->in.cv); + return 0; } @@ -222,7 +228,8 @@ int uldaq_parse(struct node *n, json_t *cfg) u->device_interface_type = iftype; } - u->in.queues = realloc(u->in.queues, sizeof(struct AiQueueElement) * list_length(&n->signals)); + u->in.channel_count = list_length(&n->signals); + u->in.queues = realloc(u->in.queues, sizeof(struct AiQueueElement) * u->in.channel_count); json_array_foreach(json_signals, i, json_signal) { const char *range_str = NULL, *input_mode_str = NULL; @@ -305,27 +312,40 @@ int uldaq_check(struct node *n) return 0; } +void uldaq_data_available(DaqDeviceHandle device_handle, DaqEventType event_type, unsigned long long event_data, void *ctx) +{ + UlError err; + struct uldaq *u = (struct uldaq *) ctx; + + pthread_mutex_lock(&u->in.mutex); + + err = ulAInScanStatus(device_handle, &u->in.status, &u->in.transfer_status); + if (err != ERR_NO_ERROR) + warn("Failed to retrieve scan status in event callback"); + + pthread_mutex_unlock(&u->in.mutex); + + /* Signal uldaq_read() about new data */ + pthread_cond_signal(&u->in.cv); +} + int uldaq_start(struct node *n) { struct uldaq *u = (struct uldaq *) n->_vd; - u->in.scan_options = (ScanOption) (SO_DEFAULTIO | SO_CONTINUOUS);//it looks like the init function is not called - unsigned num_devs = ULDAQ_MAX_DEV_COUNT; DaqDeviceDescriptor descriptors[num_devs]; - ScanStatus status; - TransferStatus transfer_status; - UlError err; - // allocate a buffer to receive the data - u->in.buffer = (double *) alloc(list_length(&n->signals) * n->in.vectorize * sizeof(double)); - if (u->in.buffer == 0) { + /* Allocate a buffer to receive the data */ + u->in.buffer_len = u->in.channel_count * n->in.vectorize; + u->in.buffer = (double *) alloc(u->in.buffer_len * sizeof(double)); + if (!u->in.buffer) { warn("Out of memory, unable to create scan buffer"); return -1; } - // Get descriptors for all of the available DAQ devices + /* Get descriptors for all of the available DAQ devices */ err = ulGetDaqDeviceInventory(u->device_interface_type, descriptors, &num_devs); if (err != ERR_NO_ERROR) { warn("Failed to retrieve DAQ device list for node '%s'", node_name(n)); @@ -366,6 +386,9 @@ int uldaq_start(struct node *n) return -1; } + /* Enable the event to be notified every time samples are available */ + err = ulEnableEvent(u->device_handle, DE_ON_DATA_AVAILABLE, n->in.vectorize, uldaq_data_available, u); + /* Start the acquisition */ err = ulAInScan(u->device_handle, 0, 0, 0, 0, n->in.vectorize, &u->in.sample_rate, u->in.scan_options, u->in.flags, u->in.buffer); if (err != ERR_NO_ERROR) { @@ -374,13 +397,13 @@ int uldaq_start(struct node *n) } /* Get the initial status of the acquisition */ - err = ulAInScanStatus(u->device_handle, &status, &transfer_status); + err = ulAInScanStatus(u->device_handle, &u->in.status, &u->in.transfer_status); if (err != ERR_NO_ERROR) { warn("Failed to retrieve scan status on DAQ device for node '%s'", node_name(n)); return -1; } - if (status != SS_RUNNING) { + if (u->in.status != SS_RUNNING) { warn ("Acquisition did not start on DAQ device for node '%s'", node_name(n)); return -1; } @@ -393,21 +416,23 @@ int uldaq_stop(struct node *n) struct uldaq *u = (struct uldaq *) n->_vd; UlError err; - ScanStatus status; - TransferStatus transferStatus; + + pthread_mutex_lock(&u->in.mutex); /* Get the current status of the acquisition */ - err = ulAInScanStatus(u->device_handle, &status, &transferStatus); + err = ulAInScanStatus(u->device_handle, &u->in.status, &u->in.transfer_status); if (err != ERR_NO_ERROR) return -1; /* Stop the acquisition if it is still running */ - if (status == SS_RUNNING) { + if (u->in.status == SS_RUNNING) { err = ulAInScanStop(u->device_handle); if (err != ERR_NO_ERROR) return -1; } + pthread_mutex_unlock(&u->in.mutex); + err = ulDisconnectDaqDevice(u->device_handle); if (err != ERR_NO_ERROR) return -1; @@ -423,28 +448,34 @@ int uldaq_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re { struct uldaq *u = (struct uldaq *) n->_vd; - UlError err; - ScanStatus status; - TransferStatus transferStatus; + /* Wait for data available condition triggered by event callback */ + pthread_mutex_lock(&u->in.mutex); + pthread_cond_wait(&u->in.cv, &u->in.mutex); - /* Get the current status of the acquisition */ - err = ulAInScanStatus(u->device_handle, &status, &transferStatus); - if (err != ERR_NO_ERROR) + if (u->in.status != SS_RUNNING) return -1; - if (status == SS_RUNNING) { - int index = transferStatus.currentIndex; + if (cnt != n->in.vectorize) + return -1; - struct sample *smp = smps[0]; + long long start_index = u->in.transfer_status.currentTotalCount - n->in.vectorize * u->in.channel_count; - for (int i = 0; i < list_length(&n->signals); i++) { - smp->data[i].f = u->in.buffer[index + i]; + for (int j = 0; j < n->in.vectorize; j++) { + struct sample *smp = smps[j]; + + long long scan_index = start_index + j + u->in.channel_count; + + for (int i = 0; i < u->in.channel_count; i++) { + long long channel_index = (scan_index + i) % u->in.buffer_len; + + smp->data[i].f = u->in.buffer[channel_index]; } } - return 1; -} + pthread_mutex_unlock(&u->in.mutex); + return cnt; +} static struct plugin p = { .name = "uldaq",