fix locking errors

This commit is contained in:
Carl-Benedikt Krger 2011-07-09 09:00:17 +02:00
parent 2bd85cd82f
commit 0ff5546477

View file

@ -23,7 +23,7 @@
#include <lwip/ip.h> /* struct iphdr*/
#include <lwip/tcpip.h> /* tcpip_input()*/
//#define DEBUG_MMNIF
#define DEBUG_MMNIF
#ifdef DEBUG_MMNIF
#include "util.h" /* hex dump */
@ -39,6 +39,8 @@ typedef bthread_t tid_t;
/* "interrupt" of the other virutal network card*/
extern HANDLE remote_process_event;
extern HANDLE remote_process_mutex;
extern HANDLE own_process_mutex;
/* HANDLE to the other Process (for WPM and RPM)*/
extern HANDLE hProc;
@ -69,9 +71,9 @@ extern HANDLE hProc;
#define MMNIF_RX_BUFFERLEN 1792
#define MMNIF_TX_BUFFERLEN 1792
#define MMNIF_CORES 2
#define MMNIF_CORES 2
#define MMNIF_WORKER_BUDGET 2
#define MMNIF_WORKER_BUDGET 4
#define MMNIF_POLL_BUDGET 0x100000
#define MMNIF_WAIT_BUDGET 0x2
@ -187,8 +189,8 @@ typedef struct mm_rx_buffer
uint8_t iv_intr;
// sem_t lock;
spinlock_t lock;
sem_t lock;
// spinlock_t lock;
// uint32_t timestamp[MMNIF_RX_QUEUELEN];
// uint32_t bitmap[MMNIF_RX_QUEUELEN];
@ -538,15 +540,22 @@ __inline void* mmnif_shmalloc()
__inline void mmnif_lock_rx_hdr(int dest_ip)
{
#ifdef WIN32
mm_rx_buffer_t hdr;
if(disable_locking) return;
while(!ReadProcessMemory(hProc,(char*)mpb_start_address + ( dest_ip -1 ) * mpb_size,&hdr,sizeof(hdr),NULL));
sem_wait(&hdr.lock);
// mm_rx_buffer_t hdr;
// if(disable_locking) return;
// while(!ReadProcessMemory(hProc,(char*)mpb_start_address + ( dest_ip -1 ) * mpb_size,&hdr,sizeof(hdr),NULL));
// ReadProcessMemory(hProc,(char*)mpb_start_address + ( dest_ip -1 ) * mpb_size,&hdr,sizeof(hdr),NULL);
// sem_wait(&hdr.lock);
if (dest_ip != own_ip_address && 0xFF)
return WaitForSingleObject(remote_process_mutex,INFINITE);
else
return WaitForSingleObject(own_process_mutex,INFINITE);
#else
if(disable_locking) return;
mm_rx_buffer_t* hdr = (char*)mpb_start_address + ( dest_ip -1 ) * mpb_size;
// sem_wait(&hdr->lock);
spinlock_lock(&hdr->lock);
spinlock_lock(&hdr->lock);
#endif
}
/* mmnif_unlock_rx_hdr(): unlock the header
@ -555,14 +564,20 @@ __inline void mmnif_lock_rx_hdr(int dest_ip)
__inline void mmnif_unlock_rx_hdr(int dest_ip)
{
#ifdef WIN32
mm_rx_buffer_t hdr;
if(disable_locking) return;
while(!ReadProcessMemory(hProc,(char*)mpb_start_address + ( dest_ip -1 ) * mpb_size,&hdr,sizeof(hdr),NULL));
sem_post(&hdr.lock);
// mm_rx_buffer_t hdr;
// if(disable_locking) return;
// while(!ReadProcessMemory(hProc,(char*)mpb_start_address + ( dest_ip -1 ) * mpb_size,&hdr,sizeof(hdr),NULL));
// ReadProcessMemory(hProc,(char*)mpb_start_address + ( dest_ip -1 ) * mpb_size,&hdr,sizeof(hdr),NULL);
// sem_post(&hdr.lock);
if (dest_ip != own_ip_address && 0xFF)
return ReleaseMutex(remote_process_mutex);
else
return ReleaseMutex(own_process_mutex);
#else
if(disable_locking) return;
mm_rx_buffer_t* hdr = (char*)mpb_start_address + ( dest_ip -1 ) * mpb_size;
spinlock_unlock(&hdr->lock);
// sem_post(&hdr->lock);
spinlock_unlock(&hdr->lock);
#endif
}
@ -689,7 +704,9 @@ err_t mmnif_tx(struct netif* netif, struct pbuf* p)
uint8_t dest_intr = FALSE;
uint32_t dest_ip = mmnif_get_destination(netif,p);
// mmnif_lock_rx_hdr(dest_ip);
/* take a place in the tx_queue */
// InterlockedIncrement(&mmnif->tx_queue);
mmnif->tx_queue++;
/* Perform serveral sanity checks on the packet and the buffers:
@ -701,7 +718,7 @@ err_t mmnif_tx(struct netif* netif, struct pbuf* p)
* just one thread is writing to pos and queue of the mm_rx_buff
*/
if (mmnif->tx_queue >= MMNIF_TX_QUEUELEN)
if (mmnif->tx_queue > MMNIF_TX_QUEUELEN)
{
DEBUGPRINTF("mmnif_tx(): too many packets at once for tx_queue\n");
goto drop_packet;
@ -740,7 +757,8 @@ err_t mmnif_tx(struct netif* netif, struct pbuf* p)
/* because there is no copy operation to the tx_slots
* we don't need a place in the queue anymore
*/
mmnif->tx_queue--;
// mmnif->tx_queue--;
// InterlockedDecrement(&mmnif->tx_queue);
}
/* get the palce the router core is looking for the packet */
@ -757,7 +775,7 @@ err_t mmnif_tx(struct netif* netif, struct pbuf* p)
mmnif_write_rx_pending(dest_ip,pending);
/* and unlock the dest_ip mm_rx_buffer_hdr */
mmnif_unlock_rx_hdr(dest_ip);
mmnif_unlock_rx_hdr(dest_ip);
/* check if there is a space in the queue without overwriting another packet */
if ((queued + pending) > MMNIF_RX_QUEUELEN)
@ -778,7 +796,7 @@ err_t mmnif_tx(struct netif* netif, struct pbuf* p)
mmnif_write_rx_buffl(dest_ip, pos, p->payload,p->tot_len);
/* like above ensure we are the only ones editing the hdr */
mmnif_lock_rx_hdr(dest_ip);
mmnif_lock_rx_hdr(dest_ip);
queued = mmnif_read_rx_queue(dest_ip);
pending = mmnif_read_rx_pending(dest_ip);
@ -787,7 +805,7 @@ err_t mmnif_tx(struct netif* netif, struct pbuf* p)
dest_intr = mmnif_read_rx_inv_intr(dest_ip);
#ifdef DEBUG_MMNIF
#ifdef DEBUG_MMNIF_PACKET
DEBUGPRINTF("\n SEND 0x%.8X with length: %d\n",(char*)mpb_start_address + (dest_ip -1)*mpb_size + pos * 1792,p->tot_len +2);
hex_dump(p->tot_len, p->payload);
#endif
@ -802,8 +820,9 @@ err_t mmnif_tx(struct netif* netif, struct pbuf* p)
mmnif_trigger_irq(dest_ip);
/* free the transmid queue*/
if(build_buff)
mmnif->tx_queue--;
// if(build_buff)
mmnif->tx_queue--;
// InterlockedDecrement(&mmnif->tx_queue);
/* gather stats:
* - firstly for lwip
@ -815,11 +834,18 @@ err_t mmnif_tx(struct netif* netif, struct pbuf* p)
mmnif->stats.tx++;
mmnif->stats.tx_bytes += p->tot_len;
// mmnif_unlock_rx_hdr(dest_ip);
return ERR_OK;
drop_packet: /* packet is lost. clean up and gather stats */
mmnif->tx_queue--;
mmnif_lock_rx_hdr(dest_ip);
pending = mmnif_read_rx_pending(dest_ip);
pending--;
mmnif_write_rx_pending(dest_ip, pending);
mmnif_unlock_rx_hdr(dest_ip);
LINK_STATS_INC(link.drop);
@ -877,8 +903,8 @@ err_t mmnif_init(struct netif* netif)
/* init the lock for the hdr
*/
// sem_init(&mmnif->rx_buff->lock,1);
spinlock_init(&mmnif->rx_buff->lock);
sem_init(&mmnif->rx_buff->lock,1);
// spinlock_init(&mmnif->rx_buff->lock);
/* init the sems for communication art
*/
@ -980,7 +1006,9 @@ static void mmnif_rx(struct netif* netif)
/* retrieve pointer to actual data array */
data = (char*) mmnif->rx_buff + sizeof(mm_rx_buffer_t);
/* retrice position wich is needed to be worked on */
mmnif_lock_rx_hdr(own_ip_address && 0xFF);
pos = (mmnif->rx_buff->pos % MMNIF_RX_QUEUELEN) * MMNIF_RX_BUFFERLEN;
// mmnif_unlock_rx_hdr(own_ip_address && 0xFF);
/* The packet length is stored in the first 2 bytes but does not include
* the header. Check for reasonable sizes before processing the data to
@ -993,6 +1021,8 @@ static void mmnif_rx(struct netif* netif)
{
// mmnif->rx_buff->pos++;
// mmnif->rx_buff->queued--;
DEBUGPRINTF("mmnif_rx(): empty packet error\n");
mmnif_unlock_rx_hdr(own_ip_address & 0xFF);
return;
}
if (length < sizeof(struct ip_hdr) ||length > netif->mtu)
@ -1006,7 +1036,7 @@ static void mmnif_rx(struct netif* netif)
*/
#ifdef DEBUG_MMNIF
#ifdef DEBUG_MMNIF_PACKET
DEBUGPRINTF("\n RECIEVED - 0x%.8X with legth: %d\n",data + pos,length+2);
hex_dump(length+2,data + pos);
#endif
@ -1036,12 +1066,18 @@ static void mmnif_rx(struct netif* netif)
* the old one for new incoming packets
*/
mmnif_lock_rx_hdr(own_ip_address & 0xFF);
// mmnif_lock_rx_hdr(own_ip_address & 0xFF);
mmnif->rx_buff->pos++;
mmnif->rx_buff->queued--;
if (mmnif->rx_buff->queued > MMNIF_RX_QUEUELEN)
{
DEBUGPRINTF("mmnif_rx(): integer underflow on mmnif->rx_buff->queued\n");
mmnif->rx_buff->queued = 0;
}
mmnif_unlock_rx_hdr(own_ip_address & 0xFF);
// mmnif_unlock_rx_hdr(own_ip_address & 0xFF);
/* using the mailbox to hand the buffer to the incoming packet thread
* so the "interrupt" itself is not taking to long
@ -1059,11 +1095,12 @@ static void mmnif_rx(struct netif* netif)
else
mmnif->stats.rx_poll++;
mmnif_unlock_rx_hdr(own_ip_address & 0xFF);
return;
drop_packet:
/* packet is lost so gather stats and leave the rx handler*/
mmnif_lock_rx_hdr(own_ip_address & 0xFF);
// mmnif_lock_rx_hdr(own_ip_address & 0xFF);
mmnif->rx_buff->pos++;
mmnif->rx_buff->queued--;
@ -1103,9 +1140,9 @@ static int mmnif_wait(struct netif* netif, uint32_t poll, int budget)
*/
if (mmnif->rx_buff->iv_intr == TRUE)
{
mmnif_lock_rx_hdr(own_ip_address && 0xff);
// mmnif_lock_rx_hdr(own_ip_address && 0xff);
mmnif->rx_buff->iv_intr = FALSE;
mmnif_unlock_rx_hdr(own_ip_address && 0xff);
// mmnif_unlock_rx_hdr(own_ip_address && 0xff);
#ifdef DEBUG_MMNIF
DEBUGPRINTF("mmnif_wait(): heuristical polling enables\n");
#endif
@ -1197,25 +1234,25 @@ int mmnif_poll(void* e)
{
while (!mmnif->rx_buff->queued)
{
mmnif->stats.pll_empty++;
mmnif->stats.pll_empty++;
if (mmnif->stats.pll_empty >= MMNIF_POLL_BUDGET)
{
/* enable interrupts and suspend polling
*
*/
mmnif_lock_rx_hdr(own_ip_address && 0xff);
// mmnif_lock_rx_hdr(own_ip_address && 0xff);
mmnif->rx_buff->iv_intr = TRUE;
mmnif_unlock_rx_hdr(own_ip_address && 0xff);
// mmnif_unlock_rx_hdr(own_ip_address && 0xff);
#ifdef DEBUG_MMNIF
DEBUGPRINTF("mmnif_poll(): heuristical interrupts enabled\n");
#endif
sem_wait(&mmnif->com_poll);
}
/* uncomment this to test only polling
*/
//mmnif->stats.pll_empty = 0;
/* uncomment this to test only polling
*/
// mmnif->stats.pll_empty = 0;
}
mmnif->stats.pll_empty=0;
mmnif->stats.pll_empty=0;
mmnif_rx(mmnif_dev);
if (instant_process)
@ -1245,7 +1282,7 @@ void mmnif_irqhandler()
mmnif_rx(mmnif_dev);
if (instant_process)
mmnif_wait(mmnif_dev,0,MMNIF_WORKER_BUDGET);
mmnif_wait(mmnif_dev,1,MMNIF_WORKER_BUDGET);
}
}