added locking to the mm_rx_buffer_t

This commit is contained in:
Carl-Benedikt Krger 2011-06-09 12:17:48 +02:00
parent 3b39bd8e95
commit 8c6e679ba0

View file

@ -34,6 +34,9 @@
#include <windows.h>
#include <system/threads/bthread.h>
typedef bthread_sem_t sem_t;
typedef bthread_t tid_t;
/* "interrupt" of the other virutal network card*/
extern HANDLE remote_process_event;
/* HANDLE to the other Process (for WPM and RPM)*/
@ -44,20 +47,21 @@ extern HANDLE hProc;
#else
#define DEBUGPRINTF(x,...) kprintf(x,##__VA_ARGS__)
#include <metalsvm/semaphore.h>
#endif
#define MMNIF_TX_QUEUELEN 4
#define MMNIF_RX_QUEUELEN 4
#define MMNIF_RX_QUEUELEN 8
#define MMNIF_RX_BUFFERLEN 1792
#define MMNIF_TX_BUFFERLEN 1792
#define MMNIF_CORES 2
/* decide whether it's polling mode or not
*/
static int no_irq = 1;
static int no_irq = 0;
/* this will be set by open() and close() and shows wether the driver is running or not
*/
@ -65,7 +69,7 @@ static int active = 0;
/* decide wheter it's uses locking or not
*/
static int disable_locking = 1;
static int disable_locking = 0;
/* IP address of the local core and the router core to get packets forwarded
*/
@ -92,6 +96,10 @@ static int local_crb = 0xF8000000;
*/
static struct netif* mmnif_dev = NULL;
/* thread variables */
static tid_t worker_thread;
static tid_t polling_thread;
typedef struct mmnif_device_stats
{
/* device stats (granularity in packets):
@ -102,6 +110,10 @@ typedef struct mmnif_device_stats
*/
unsigned int rx_err;
unsigned int rx;
unsigned int rx_intr;
unsigned int rx_poll;
unsigned int tx_err;
unsigned int tx;
} mmnif_device_stats_t;
@ -111,6 +123,8 @@ typedef struct mm_rx_buffer
/* memory rx buffer build
* - queued : how many packets are in the queue
* - pos : which is the next packet to be worked on
* - iv_intr : inform via interrupt or not
* - lock: semaphore to lock the local variables to be multi access save
*
* Note: this section will soon be complexer.
* I won't use a single buffer the whole time. I think i will use an descripor table
@ -121,6 +135,11 @@ typedef struct mm_rx_buffer
*/
uint8_t queued;
uint8_t pos;
uint8_t iv_intr;
sem_t lock;
// void* rx_desc[MMNIF_CORES * MMNIF_RX_QUEUELEN];
// uint8_t rx_inuse[MMNIF_CORES * MMNIF_RX_QUEUELEN]; /* bits 1: pending 2: finished 3: free ...*/
// uint8_t fin;
@ -199,9 +218,9 @@ __inline uint8_t mmnif_read_rx_queue(uint32_t dest_ip)
/* Read the value of the forgein process
* form rx_buff->queued
*/
uint8_t queued;
while(!ReadProcessMemory(hProc,(char*)mpb_start_address + ( dest_ip -1 ) * mpb_size,&queued,1,NULL));
return queued;
mm_rx_buffer_t hdr;
while(!ReadProcessMemory(hProc,(char*)mpb_start_address + ( dest_ip -1 ) * mpb_size,&hdr,sizeof(hdr),NULL));
return hdr.queued;
#else
return *(uint8_t*)(mpb_start_address + (dest_ip -1 ) * mpb_size );
#endif
@ -216,14 +235,31 @@ __inline uint8_t mmnif_read_rx_pos(uint32_t dest_ip)
/* Read the value of the forgein process
* form rx_buff->pos
*/
uint8_t pos;
while (!ReadProcessMemory(hProc,(char*)mpb_start_address + ( dest_ip -1 ) * mpb_size + 1,&pos,1,NULL));
return pos;
mm_rx_buffer_t hdr;
while(!ReadProcessMemory(hProc,(char*)mpb_start_address + ( dest_ip -1 ) * mpb_size,&hdr,sizeof(hdr),NULL));
return hdr.pos;
#else
return *(uint8_t*)(mpb_start_address + (dest_ip -1 ) * mpb_size +1);
#endif
};
/* read the pos value from the remote buffer
* and return it.
*/
__inline uint8_t mmnif_read_rx_inv_intr(uint32_t dest_ip)
{
#ifdef WIN32
/* Read the value of the forgein process
* form rx_buff->pos
*/
mm_rx_buffer_t hdr;
while(!ReadProcessMemory(hProc,(char*)mpb_start_address + ( dest_ip -1 ) * mpb_size,&hdr,sizeof(hdr),NULL));
return hdr.iv_intr;
#else
return *(uint8_t*)(mpb_start_address + (dest_ip -1 ) * mpb_size +2);
#endif
};
/* write data to the remote buffer
*
*/
@ -235,7 +271,7 @@ __inline int mmnif_write_rx_buff(uint32_t dest_ip, uint32_t pos,void* data)
*/
uint32_t nr_of_bytes_written = 0;
uint16_t length = *((uint16_t*)data);
while(!WriteProcessMemory(hProc,(char*)mpb_start_address +(dest_ip -1 ) * mpb_size + 2+ pos * MMNIF_RX_BUFFERLEN,data,length+2,&nr_of_bytes_written));
while(!WriteProcessMemory(hProc,(char*)mpb_start_address +(dest_ip -1 ) * mpb_size + sizeof(mm_rx_buffer_t)+ pos * MMNIF_RX_BUFFERLEN,data,length+2,&nr_of_bytes_written));
return nr_of_bytes_written;
#else
uint16_t length = *((uint16_t*)data);
@ -244,6 +280,24 @@ __inline int mmnif_write_rx_buff(uint32_t dest_ip, uint32_t pos,void* data)
#endif
};
__inline int mmnif_write_rx_buffl(uint32_t dest_ip, uint32_t pos,void* data,uint16_t length)
{
#ifdef WIN32
/* we assume this is a correct buffer
* therefore here is no further error checking
*/
uint32_t nr_of_bytes_written = 0;
while(!WriteProcessMemory(hProc,(char*)mpb_start_address +(dest_ip -1 ) * mpb_size + sizeof(mm_rx_buffer_t)+ pos * MMNIF_RX_BUFFERLEN,&length,2,&nr_of_bytes_written));
while(!WriteProcessMemory(hProc,(char*)mpb_start_address +(dest_ip -1 ) * mpb_size + sizeof(mm_rx_buffer_t)+ pos * MMNIF_RX_BUFFERLEN + 2,data,length,&nr_of_bytes_written));
return nr_of_bytes_written+2;
#else
// uint16_t length = *((uint16_t*)data);
memcpy((char*)mpb_start_address +(dest_ip -1 ) * mpb_size + sizeof(mm_rx_buffer_t)+ pos * MMNIF_RX_BUFFERLEN,&length,2);
memcpy(char*)mpb_start_address +(dest_ip -1 ) * mpb_size + sizeof(mm_rx_buffer_t)+ pos * MMNIF_RX_BUFFERLEN + 2,data,length);
return 1;
#endif
};
/* write the new queue value to the remote buffer
*
*/
@ -258,7 +312,7 @@ __inline int mmnif_write_rx_queue(uint32_t dest_ip,uint8_t queue)
return nr_of_bytes_written;
#else
memcpy(mpb_start_address + (dest_ip -1 ) * mpb_size,&queue,1);
memcpy((char*)mpb_start_address + ( dest_ip -1 ) * mpb_size ,&queue,1);
return 1;
#endif
@ -287,13 +341,13 @@ __inline int mmnif_trigger_irq(dest_ip)
__inline int mmnif_device_schedule()
{
#ifdef WIN32
bthread_t poller;
bthread_create(&poller,NULL,mmnif_poll,NULL);
bthread_t polling_thread;
bthread_create(&polling_thread,NULL,mmnif_poll,NULL);
return NULL;
#else
tid_t poller;
create_kernel_task(&poller,mmnif_poll,NULL);
tid_t polling_thread;
create_kernel_task(&polling_thread,mmnif_poll,NULL);
return NULL;
#endif
}
@ -305,13 +359,10 @@ __inline int mmnif_device_schedule()
__inline int mmnif_worker_schedule()
{
#ifdef WIN32
bthread_t waiter;
bthread_create(&waiter,NULL,mmnif_worker,NULL);
bthread_create(&worker_thread,NULL,mmnif_worker,NULL);
return NULL;
#else
tid_t waiter;
create_kernel_task(&waiter,mmnif_worker,NULL);
create_kernel_task(&worker_thread,mmnif_worker,NULL);
return NULL;
#endif
}
@ -345,6 +396,84 @@ __inline void* mmnif_shmalloc()
#endif
}
__inline void mmnif_lock_rx_hdr(int dest_ip)
{
#ifdef WIN32
mm_rx_buffer_t hdr;
while(!ReadProcessMemory(hProc,(char*)mpb_start_address + ( dest_ip -1 ) * mpb_size,&hdr,sizeof(hdr),NULL));
sem_wait(&hdr.lock);
#else
mm_rx_buffer_t* hdr = (char*)mpb_start_address + ( dest_ip -1 ) * mpb_size;
sem_wait(&hdr->lock);
#endif
}
__inline void mmnif_unlock_rx_hdr(int dest_ip)
{
#ifdef WIN32
mm_rx_buffer_t hdr;
while(!ReadProcessMemory(hProc,(char*)mpb_start_address + ( dest_ip -1 ) * mpb_size,&hdr,sizeof(hdr),NULL));
sem_post(&hdr.lock);
#else
mm_rx_buffer_t* hdr = (char*)mpb_start_address + ( dest_ip -1 ) * mpb_size;
sem_post(&hdr->lock);
#endif
}
__inline int mmnif_timestamp()
{
#ifdef WIN32
return GetTickCount();
#else
return get_clock_tick();
#endif
}
/* mmnif_get_device_stats():
*
*/
mmnif_device_stats_t mmnif_get_device_stats()
{
mmnif_device_stats_t stats = {0};
if(!mmnif_dev)
{
DEBUGPRINTF("mmnif_get_device_stats(): the device is not initialized yet.");
}
else
stats = ((mmnif_t*)mmnif_dev->state)->stats;
return stats;
}
/* mmnif_print_stats():
*
*/
void mmnif_print_stats()
{
mmnif_t* mmnif;
if (!mmnif_dev)
{
DEBUGPRINTF("mmnif_print_stats(): the device is not initialized yet.");
return;
}
mmnif = (mmnif_t*)mmnif_dev->state;
DEBUGPRINTF("/dev/mmnif - stats: \n");
DEBUGPRINTF("Received: %d packets successfull\n",mmnif->stats.rx);
DEBUGPRINTF("interrupts: %d\n",mmnif->stats.rx_intr);
DEBUGPRINTF("polling: %d\n",mmnif->stats.rx_poll);
DEBUGPRINTF("Received: %d packets containuing errors\n",mmnif->stats.rx_err);
DEBUGPRINTF("Transmitted: %d packests successfull",mmnif->stats.tx);
DEBUGPRINTF("Transmitted: %d packests were dropped due to errors",mmnif->stats.tx_err);
}
/*
* memory maped interface main functions
*/
@ -405,6 +534,8 @@ err_t mmnif_tx(struct netif* netif, struct pbuf* p)
struct pbuf* q; /* interator */
uint8_t queued;
uint8_t pos;
uint8_t build_buff = TRUE;
uint8_t dest_intr = FALSE;
uint32_t dest_ip = mmnif_get_destination(netif,p);
/* take a place in the tx_queue */
@ -425,36 +556,66 @@ err_t mmnif_tx(struct netif* netif, struct pbuf* p)
goto drop_packet;
}
if (p->tot_len > 1792)
if (p->tot_len > MMNIF_TX_BUFFERLEN)
{
DEBUGPRINTF("mmnif_tx(): packet is longer than 1792 bytes\n");
goto drop_packet;
}
/* write packet length to start transmit buffer */
*((unsigned short*)mmnif->tx_buff[slot]) = p->tot_len;
/* build the payload out of the p's
* ensure that the packet is in one memory chunk stored in the transmid buffer
/* check if the pbuf consists only of one element
* if that is the case it would be much overhead
* copying that packet again
*/
for (q = p, i = 2; q != 0; q = q->next)
{
memcpy(mmnif->tx_buff[slot] + i, q->payload, q->len);
i += q->len;
}
if (!p->next)
build_buff = FALSE;
if (build_buff)
{
/* write packet length to start transmit buffer */
*((unsigned short*)mmnif->tx_buff[slot]) = p->tot_len;
/* build the payload out of the p's
* ensure that the packet is in one memory chunk stored in the transmid buffer
*/
for (q = p, i = 2; q != 0; q = q->next)
{
memcpy(mmnif->tx_buff[slot] + i, q->payload, q->len);
i += q->len;
}
}
else
{
/* because there is no copy operation to the tx_slots
* we don't need a place in the queue anymore
*/
mmnif->tx_queue--;
}
/* get the palce the router core is looking for the packet */
mmnif_lock_rx_hdr(dest_ip);
queued = mmnif_read_rx_queue(dest_ip);
pos = mmnif_read_rx_pos(dest_ip);
mmnif_unlock_rx_hdr(dest_ip);
pos = (pos + queued) % MMNIF_RX_QUEUELEN;
/* write buffer to buffer & increment the queued packet count */
mmnif_write_rx_buff(dest_ip, pos, mmnif->tx_buff[slot]);
if (build_buff)
mmnif_write_rx_buff(dest_ip, pos, mmnif->tx_buff[slot]);
else
mmnif_write_rx_buffl(dest_ip, pos, p->payload,p->tot_len);
queued++;
mmnif_lock_rx_hdr(dest_ip);
mmnif_write_rx_queue(dest_ip, queued);
mmnif_unlock_rx_hdr(dest_ip);
#ifdef DEBUG_MMNIF
DEBUGPRINTF("\n SEND 0x%.8X\n",(char*)mmnif->rx_buff + 2 + pos * 1792);
hex_dump(p->tot_len +2, mmnif->tx_buff[slot]);
@ -465,7 +626,8 @@ err_t mmnif_tx(struct netif* netif, struct pbuf* p)
mmnif_trigger_irq(dest_ip);
/* free the transmid queue*/
mmnif->tx_queue--;
if(build_buff)
mmnif->tx_queue--;
/* gather stats:
* - firstly for lwip
@ -509,7 +671,7 @@ err_t mmnif_init(struct netif* netif)
mmnif_t* mmnif;
uint32_t i;
static int num = 0;
uint16_t speed = 400;
uint16_t speed = 4000;
/* Alloc and clear memory for the device struct
*/
@ -535,6 +697,8 @@ err_t mmnif_init(struct netif* netif)
}
memset(mmnif->rx_buff, 0, (sizeof(mm_rx_buffer_t) + MMNIF_RX_QUEUELEN* MMNIF_RX_BUFFERLEN)* (MMNIF_CORES-1));
sem_init(&mmnif->rx_buff->lock,1);
/* Alloc and clear internal memory for tx_buff
*/
#ifdef WIN32
@ -623,6 +787,7 @@ static void mmnif_rx(struct netif* netif)
*/
length = *((uint16_t*) (data + pos));
/* If length is zero return silently */
if (length == 0)
{
// mmnif->rx_buff->pos++;
@ -669,9 +834,14 @@ static void mmnif_rx(struct netif* netif)
/* everything is copied to a new buffer so it's save to release
* the old one for new incoming packets
*/
mmnif_lock_rx_hdr(own_ip_address & 0xFF);
mmnif->rx_buff->pos++;
mmnif->rx_buff->queued--;
mmnif_unlock_rx_hdr(own_ip_address & 0xFF);
/* using the mailbox to hand the buffer to the incoming packet thread
* so the "interrupt" itself is not taking to long
*/
@ -681,6 +851,11 @@ static void mmnif_rx(struct netif* netif)
LINK_STATS_INC(link.xmit);
mmnif->stats.rx++;
if (no_irq)
mmnif->stats.rx_poll++;
else
mmnif->stats.rx_intr++;
return;
drop_packet:
@ -746,7 +921,7 @@ static int mmnif_wait(struct netif* netif, uint32_t poll, int budget)
int mmnif_worker(void* e)
{
while (active)
mmnif_wait(mmnif_dev,0,1);
mmnif_wait(mmnif_dev,0,5);
return NULL;
}
@ -756,12 +931,26 @@ int mmnif_worker(void* e)
*/
int mmnif_poll(void* e)
{
char* data = mpb_start_address + (own_ip_address-router_ip_address)*mpb_size;
mmnif_t* mmnif;
unsigned int diff = mmnif_timestamp();
unsigned int tmp32 = 0;
if (!mmnif_dev)
{
DEBUGPRINTF("mmnif_poll(): the driver is not initialized yet");
return -1;
}
mmnif = (mmnif_t*) mmnif_dev->state;
/*run while driver is up*/
while (active)
{
while (!data[0])
;
while (mmnif->rx_buff->queued)
{
tmp32 = mmnif_timestamp();
diff = diff - tmp32 > 0 ? diff - tmp32 : tmp32 - diff;
}
// Sleep(10);
mmnif_rx(mmnif_dev);
}
@ -769,6 +958,25 @@ int mmnif_poll(void* e)
return NULL;
}
/* mmnif_irqhandler(): handles incoming interrupts
* its just a local wrapper for mmnif_rx()
*/
void mmnif_irqhandler()
{
mmnif_t* mmnif;
/* return if mmnif_dev is not yet initialized*/
if (!mmnif_dev)
{
DEBUGPRINTF("mmnif_irqhandler(): the driver is not initialized yet");
return;
}
mmnif = (mmnif_t*) mmnif_dev->state;
while (mmnif->rx_buff->queued)
mmnif_rx(mmnif_dev);
}
/*
* Open the interface should be called by kernel to use this network interface
*/
@ -849,6 +1057,14 @@ int mmnif_open()
*/
int mmnif_close()
{
mmnif_t* mmnif;
if (!mmnif_dev)
{
DEBUGPRINTF("mmnif_close(): you closed the device before it was properly opened -.-* ");
}
mmnif = (mmnif_t*)mmnif_dev->state;
/* indicate that the driver is not active anymore
* - this will stop the polling thread i.e.
*/
@ -858,5 +1074,12 @@ int mmnif_close()
*/
active = FALSE;
#ifdef WIN32
free(mmnif->tx_buff);
free(mmnif_dev);
#else
kfree(mmnif->tx_buff);
kfree(mmnif_dev);
#endif
return NULL;
}