1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-30 00:00:11 +01:00

Reimplemented read thread

This commit is contained in:
Vincent Bareiß 2021-09-09 12:50:44 +00:00
parent 57055daab0
commit 288c49707a
2 changed files with 231 additions and 155 deletions

View file

@ -9,29 +9,42 @@
*
*/
#pragma once
#include <pthread.h> //Multithreading
#include <semaphore.h>
#include <villas/dumper.hpp>
#include <libft4222.h> //FT4222h vendor API
#include <ftd2xx.h> //D2XX Driver
#include <ftd2xx.h> //D2XX Driver
static FT_DEVICE_LIST_INFO_NODE ft4222_devices[4]; //There can be at most 4 FT devices at a time
#define FT4222_D2XX_BUFFER_SIZE 65535
#define FT4222_BUFFER_SIZE 40000 //5k sample packages
struct ft4222
{
/* Device */
FT_HANDLE dev_handle;
villas::node::Dumper *raw_dumper;
bool use_dumper;
pthread_t *read_thread;
struct
{
double sample_rate;
size_t channel_count;
long long unsigned int sequece;
};
} chan_config;
struct
{
uint16_t *buffer_parsed; //buffer queue for final values.
uint8 *working_buffer; //working buffer to parse samples in
sem_t *sem_parsed; //number of clean values in buffer_parsed
sem_t *sem_protect; //number of clean values in buffer_parsed
bool is_running;
size_t write_head,read_head;
} thread_args;
};

View file

@ -15,6 +15,7 @@
/* Forward declartions */
static struct vnode_type p;
void *ft4222_thread_func(void *);
using namespace villas;
using namespace villas::node;
@ -204,6 +205,58 @@ int ft4222_gather_devices()
return 0;
}
/**
* @brief This function parses the .conf configuration file.
*/
int ft4222_parse(struct vnode *n, json_t *json)
{
//0. Setup values we are going to need
int ret;
struct ft4222 *s = (struct ft4222 *)n->_vd;
json_error_t err;
int sys_clock_int;
int vec_int;
const char *test;
const char *sig_json_str = nullptr;
bool use_dumper;
//1. Node json
const char *a = json_dumps(json, 0);
n->logger->debug(a);
ret = json_unpack_ex(json, &err, 0, "{s?: s,s?: i, s: b, s:{s?: i,s?: i,s: o}}",
"type", &test,
"system_clock", &sys_clock_int,
"dumper", &use_dumper,
"in",
"sample_rate", &(s->chan_config.sample_rate),
"vectorize", &vec_int,
"signals", &sig_json_str);
if (ret < 0)
{
n->logger->error("Fehler: {} \t\t At: line:{}\tcol:{}\tsrc:{}\tpos:{}", err.text, err.line, err.column, err.source, err.position);
throw new RuntimeError("Pasing failed");
}
s->chan_config.channel_count = vlist_length(&n->in.signals);
s->use_dumper = use_dumper;
return 0;
}
/**
* @brief TODO
*
* @param n
* @return char*
*/
char *ft_print(struct vnode *n)
{
//struct ft4222 *s = (struct ft4222 *)n->_vd;
return strf("Version with dumper");
}
/**
* @brief This function initializes an instance of a FT4222h node. The device handle is created and the device is opened.
*/
@ -238,7 +291,7 @@ int ft4222_init(struct vnode *n)
}
/**
* @brief This function starts an instance of aFT4222h node. The chip is configured as SPI slave. Clock and drive strength are set.
* @brief This function starts an instance of a FT4222h node. The chip is configured as SPI slave. Clock and drive strength are set and the thread is started.
*/
int ft4222_start(struct vnode *n)
{
@ -272,13 +325,28 @@ int ft4222_start(struct vnode *n)
return -1;
}
//3. Create socket to dump values to for debugging reasons, if set by the
if(s->use_dumper){
//3. Create socket to dump values to for debugging reasons, if set by the
if (s->use_dumper)
{
n->logger->debug("Created dumper at /tmp/ft4222_dump");
s->raw_dumper = new villas::node::Dumper("/tmp/ft4222_dump");
n->logger->debug("Created dumper at /tmp/ft4222_dump");
}
s->sequece = 0;
//4. Start background read thread
s->thread_args.buffer_parsed = new uint16_t[FT4222_BUFFER_SIZE];
s->thread_args.working_buffer = new uint8[FT4222_D2XX_BUFFER_SIZE];
s->thread_args.write_head = 0;
s->thread_args.read_head = 0;
s->thread_args.sem_parsed = (sem_t *)malloc(sizeof(sem_t));
s->thread_args.sem_protect = (sem_t *)malloc(sizeof(sem_t));
s->read_thread = (pthread_t *)malloc(sizeof(pthread_t));
sem_init(s->thread_args.sem_parsed, false, 0);
sem_init(s->thread_args.sem_protect, false, FT4222_BUFFER_SIZE);
s->chan_config.sequece = 0;
s->thread_args.is_running = true;
pthread_create(s->read_thread, NULL, ft4222_thread_func, s);
return 0;
}
@ -287,67 +355,114 @@ int ft4222_start(struct vnode *n)
*/
int ft4222_destroy(struct vnode *n)
{
//Todo: Kill thread and stuff
struct ft4222 *s = (struct ft4222 *)n->_vd;
n->logger->debug("Deleting FT4222\n");
//Stop thread
s->thread_args.is_running = false;
pthread_join(*(s->read_thread), NULL);
//Free memory
delete[] s->thread_args.buffer_parsed;
delete[] s->thread_args.working_buffer;
free(s->read_thread);
free(s->thread_args.sem_parsed);
//De-Init FT4222
FT4222_UnInitialize(&s->dev_handle);
FT_Close(&s->dev_handle);
if(&s->use_dumper){
if (&s->use_dumper)
{
delete &s->raw_dumper;
}
return 0;
}
/**
* @brief This function parses the .conf configuration file.
*/
int ft4222_parse(struct vnode *n, json_t *json)
{
//0. Setup values we are going to need
int ret;
struct ft4222 *s = (struct ft4222 *)n->_vd;
json_error_t err;
int sys_clock_int;
int vec_int;
const char *test;
const char *sig_json_str = nullptr;
bool use_dumper;
//1. Node json
const char *a = json_dumps(json, 0);
n->logger->debug(a);
ret = json_unpack_ex(json, &err, 0, "{s?: s,s?: i, s: b, s:{s?: i,s?: i,s: o}}",
"type", &test,
"system_clock", &sys_clock_int,
"dumper", &use_dumper,
"in",
"sample_rate", &(s->sample_rate),
"vectorize", &vec_int,
"signals", &sig_json_str);
if (ret < 0)
{
n->logger->error("Fehler: {} \t\t At: line:{}\tcol:{}\tsrc:{}\tpos:{}", err.text, err.line, err.column, err.source, err.position);
throw new RuntimeError("Pasing failed");
}
s->channel_count = vlist_length(&n->in.signals);
s->use_dumper = use_dumper;
return 0;
}
/**
* @brief TODO
* @brief This function runs as a paralell thread in the background and is the main interaction point with the FT4222h
* It fills the XXX buffer with parsed 12 bit samples and monitores for overflows
*
* @param n
* @return char*
*/
char *ft_print(struct vnode *n)
void *ft4222_thread_func(void *thread_args)
{
//struct ft4222 *s = (struct ft4222 *)n->_vd;
return strf("Version with dumper");
ft4222 *s = (ft4222 *)thread_args;
FT4222_STATUS status = FT4222_STATUS::FT4222_OK;
Logger log = logging.get("node:ft4222:read_thread");
log->debug("Enter FT4222 thread_func");
uint16 rxSize = 0;
uint16 read_data = 0;
uint8 chan_pos = 0;
while (s->thread_args.is_running && FT4222_SPISlave_GetRxStatus(s->dev_handle, &rxSize) == FT4222_OK)
{
if (rxSize == FT4222_D2XX_BUFFER_SIZE)
{ //FT4222 buffer overflow
log->error("d2xx buffer overflow. Disregarding {} values", rxSize / 1.5);
status = FT4222_SPI_ResetTransaction(s->dev_handle, 0x00);
//throw new RuntimeError("Resetted spi\n");
}
if (rxSize > 0)
{
log->debug("rxSize={}\n", rxSize); //Data in FT4222 buffer
uint16 to_read = rxSize - (rxSize % 3); //Only read from FT4222 Buffer alligned to 3 bytes to prevent a sample from getting chopped up
if(to_read == 0)
continue;
status = FT4222_SPISlave_Read(s->dev_handle, s->thread_args.working_buffer, to_read, &read_data);
assert(read_data == to_read);
if (status != FT4222_OK)
{
log->error("Read failed");
return nullptr;
}
//Parse samples into buffer
for (int i = 0; i < to_read / 3; i++)
{ //Read out 3 bytes and create two samples
short a = s->thread_args.working_buffer[3 * i];
short b = s->thread_args.working_buffer[3 * i + 1];
short c = s->thread_args.working_buffer[3 * i + 2];
uint16 smp1 = (a << 4) | ((b & 0xF0) >> 4);
uint16 smp2 = ((b & 0x0F) << 8) | (c);
if (chan_pos == 2 && smp1 < 1000)
{
log->error("SPI allignment error. WH={}, i={}", s->thread_args.write_head, i);
//throw new RuntimeError("asd");
}
s->thread_args.buffer_parsed[s->thread_args.write_head] = smp1;
s->thread_args.write_head++; //Buffer size is even, so we dont need to check for an overflow here
chan_pos++;
s->thread_args.buffer_parsed[s->thread_args.write_head] = smp2;
s->thread_args.write_head++;
chan_pos++;
if (s->thread_args.write_head >= FT4222_BUFFER_SIZE)
{
s->thread_args.write_head = 0;
}
if (chan_pos >= s->chan_config.channel_count)
{
chan_pos = 0;
sem_post(s->thread_args.sem_parsed); //Signal the arrival of a new entire sample
//Check for overflow
int val = 0;
sem_getvalue(s->thread_args.sem_parsed, &val);
if (val > FT4222_BUFFER_SIZE / 8)
{
log->error("Internal buffer overflow!\n");
throw new RuntimeError("Internal buffer overflow.\n");
s->thread_args.is_running = false;
}
}
}
}
}
log->debug("Leaving thread_func");
return nullptr;
}
/**
@ -358,116 +473,64 @@ char *ft_print(struct vnode *n)
* @param cnt
* @return int
*/
bool hmmErr = false;
int hack, hack1 = 0;
int hack = 0;
int hack2 = 20;
int seq = 0;
int ft4222_read(struct vnode *n, struct sample *const smps[], unsigned cnt)
{
struct ft4222 *u = (struct ft4222 *)n->_vd;
Logger log = logging.get("node:ft4222");
struct ft4222 *s = (struct ft4222 *)n->_vd;
Logger log = logging.get("node:ft4222");
//0. Check if we have enough room for cnt requested samples
assert(smps[0]->capacity >= s->chan_config.channel_count);
//The libFT4222 internaly maintains its own buffer on the host computer that is supplied with the data from the FT4222h
//This function only needs to read out this buffer.
//0. Confirm that enough space is ready for the data.
//const int needed_space = n->in.vectorize * u->channel_count * (3.0 / 2.0);
//assert(cnt == n->in.vectorize); //Enough samples
//assert(smps[0]->capacity >= u->channel_count); //Large enough samples
//assert((float)needed_space == (n->in.vectorize * u->channel_count * (3.0 / 2.0))); //This confirms that there will be no sample that is split between this read and the next one
//1. Confirm that there is enough data in the libft4222 buffer to get 100 smp packages
uint16_t data_in_buffer;
/*do
//Read samples
int smp_available = 0;
int to_read = 0;
sem_getvalue(s->thread_args.sem_parsed, &smp_available);
if (smp_available > (int)cnt)
{
FT4222_SPISlave_GetRxStatus(u->dev_handle, &available_space);
//n->logger->debug("Data in buffer: {}",available_space);
} while (available_space <= needed_space);
*/
FT4222_SPISlave_GetRxStatus(u->dev_handle, &data_in_buffer);
if(++hack%1 == 0){
log->debug("start Data in buffer: {}\n", data_in_buffer);
log->warn("More samples available in buffer {} then requested to read {}\n", smp_available, cnt);
to_read = cnt;
}
//if(data_in_buffer <= needed_space){
// return 0; //Trywait
//}
//2. Read out data and sort into sample packages
uint16 read_data;
uint8 buffer[data_in_buffer];
if(data_in_buffer < u->channel_count * (3.0 / 2.0))
return 0;
/*if(data_in_buffer == 65535){
throw RuntimeError("whatever1");
}*/
uint16 toRead = (data_in_buffer>=1200)?1200:data_in_buffer;
FT4222_SPISlave_Read(u->dev_handle, buffer, toRead, &read_data);
cnt = (int)(toRead/u->channel_count/1.5);
//assert(read_data == toRead);
FT_STATUS tmpStat = FT4222_SPISlave_GetRxStatus(u->dev_handle, &data_in_buffer);
if (tmpStat != FT_OK ){
log->debug("FTERRRR\n\n");
throw RuntimeError("whatever");
}
if(++hack%1 == 0){
log->debug("end Data in buffer: {}\n", data_in_buffer);
}
for (size_t i = 0; i < cnt; i++) //Loop over all samples we want
else
{
struct sample *smp = smps[i];
to_read = smp_available;
}
int row_start = i * u->channel_count * (3.0 / 2.0);
//log->debug("Reading {} samples\n",to_read);
for (size_t chan_index = 0; chan_index < u->channel_count; chan_index++)
for (int i = 0; i < to_read; i++)
{
sem_trywait(s->thread_args.sem_parsed);
for (size_t j = 0; j < s->chan_config.channel_count; j++)
{
//Channel index is multiplied by 3.0/2.0 = 1.5 to move 12 bit forward with every channel
//Channel index is then cast to an int to allign back with 8 bit array.
int chan_start = ((int)(chan_index * 1.5)) + row_start;
int chan_allign = chan_start % 3;
smps[i]->data[j].f = s->thread_args.buffer_parsed[s->thread_args.read_head];
if (chan_allign == 0) //Bit 11 alligns with start of buffer value
//sem_post(s->thread_args.sem_protect); //Signal space for new buffer
//Wrap around
if (++(s->thread_args.read_head) >= FT4222_BUFFER_SIZE)
{
short a = (buffer[chan_start] << 4);
short b = (((buffer[chan_start + 1]) & 0xF0) >> 4);
smp->data[chan_index].f = (float) (a | b);
s->thread_args.read_head = 0;
}
else if (chan_allign == 1) //Bit 11 is in the middle of a buffer value
{
smp->data[chan_index].f = (((buffer[chan_start]) & 0x0F) << 8) |
(buffer[chan_start + 1]);
}
else //Allignment error
{
throw new RuntimeError("Allignment faliure");
}
u->raw_dumper->writeDataBinary(1, &(smp->data[chan_index].f));
}
if (hack > 100 && smp->data[2].f < 1 && !hmmErr) {
hmmErr = true;
log->debug("Something just went wrong!!\n\n\n");
}else if(hmmErr && hack1 < 100)
hack1++;
else if(hmmErr){
log->debug("Booom!!\n\n\n");
throw new RuntimeError("Big error");
if (hack % 50 == 0 && smps[i]->data[2].f < 2)
{
log->error("This should not happen");
log->error("WH={}\nRH={}\n", s->thread_args.write_head, s->thread_args.read_head);
hack2--;
}
smp->length = u->channel_count;
smp->signals = &n->in.signals;
smp->sequence = u->sequece++;
smp->flags = (int)SampleFlags::HAS_SEQUENCE | (int)SampleFlags::HAS_DATA;
}
return cnt;
if (hack2 == 0)
{
throw new RuntimeError("Errorororo");
}
smps[i]->sequence = s->chan_config.sequece++;
smps[i]->length = s->chan_config.channel_count;
smps[i]->signals = &n->in.signals;
smps[i]->flags = (int)SampleFlags::HAS_SEQUENCE | (int)SampleFlags::HAS_DATA;
};
return to_read;
}
__attribute__((constructor(110))) static void register_plugin()