//***************************************************************************************
// Synchronized receive routines. 
//***************************************************************************************
//
// Author: Rob F. Van der Wijngaart
//         Intel Corporation
// Date:   008/30/2010
//
//***************************************************************************************
// 
// Copyright 2010 Intel Corporation
// 
//    Licensed under the Apache License, Version 2.0 (the "License");
//    you may not use this file except in compliance with the License.
//    You may obtain a copy of the License at
// 
//        http://www.apache.org/licenses/LICENSE-2.0
// 
//    Unless required by applicable law or agreed to in writing, software
//    distributed under the License is distributed on an "AS IS" BASIS,
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//    See the License for the specific language governing permissions and
//    limitations under the License.
// 
//    [2010-10-25] added support for non-blocking send/recv operations
//                 - RCCE_isend(), ..._test(), ..._wait(), ..._push()
//                 - RCCE_irecv(), ..._test(), ..._wait(), ..._push()
//                 by Carsten Clauss, Chair for Operating Systems,
//                                    RWTH Aachen University
//
//    [2012-09-10] added support for "tagged" flags
//                 by Carsten Clauss, Chair for Operating Systems,
//                                    RWTH Aachen University
//
#include "RCCE_lib.h"
#ifdef __hermit__
#include "rte_memcpy.h"
#define memcpy_scc rte_memcpy
#elif defined(COPPERRIDGE)
#include "scc_memcpy.h"
#else
#define memcpy_scc memcpy
#endif

#include <stdlib.h>
#include <string.h>

//--------------------------------------------------------------------------------------
// FUNCTION: RCCE_recv_general
//--------------------------------------------------------------------------------------
// Synchronized receive function (gory and non-gory mode)
//--------------------------------------------------------------------------------------
static int RCCE_recv_general(
  char *privbuf,    // destination buffer in local private memory (receive buffer)
  t_vcharp combuf,  // intermediate buffer in MPB
  size_t chunk,     // size of MPB available for this message (bytes)
  RCCE_FLAG *ready, // flag indicating whether receiver is ready
  RCCE_FLAG *sent,  // flag indicating whether message has been sent by source
  size_t size,      // size of message (bytes)
  int source,       // UE that sent the message
  int *test,        // if 1 upon entry, do nonblocking receive; if message available
                    // set to 1, otherwise to 0
  int copy,         // set to 0 for cancel function
  int pipe,         // use pipelining?
  int mcast,        // multicast?
  void* tag,        // additional tag?
  int len,          // length of additional tag
  RCCE_FLAG *probe  // flag for probing for incoming messages
  ) {

  char padline[RCCE_LINE_SIZE]; // copy buffer, used if message not multiple of line size
  size_t wsize,   // offset within receive buffer when pulling in "chunk" bytes
       remainder, // bytes remaining to be received
       nbytes;    // number of bytes to be received in single RCCE_get call
  int first_test; // only use first chunk to determine if message has been received yet
  char *bufptr;   // running pointer inside privbuf for current location
  RCCE_FLAG_STATUS flag;

  first_test = 1;

#ifdef USE_REMOTE_PUT_LOCAL_GET
  if(mcast) return(RCCE_error_return(1, RCCE_ERROR_NO_MULTICAST_SUPPORT));
#endif

  if(probe) {
#ifdef USE_TAGGED_FLAGS
    RCCE_wait_tagged(*probe, RCCE_FLAG_SET, tag, len);
#else
    RCCE_wait_until(*probe, RCCE_FLAG_SET);
#endif
    RCCE_flag_write(probe, RCCE_FLAG_UNSET, RCCE_IAM);
  }

#ifdef USE_SYNCH_FOR_ZERO_BYTE
  // synchronize even in case of zero byte messages:
  if(size == 0) {
#ifdef USE_REMOTE_PUT_LOCAL_GET
    RCCE_flag_write(ready, RCCE_FLAG_SET, source);
#ifdef USE_TAGGED_FLAGS
    if(!probe)
      RCCE_wait_tagged(*sent, RCCE_FLAG_SET, tag, len);
    else
#endif
    RCCE_wait_until(*sent, RCCE_FLAG_SET);
    RCCE_flag_write(sent, RCCE_FLAG_UNSET, RCCE_IAM);
#else // LOCAL PUT / REMOTE GET: (standard)
#ifdef USE_TAGGED_FLAGS
    if(!probe)
      RCCE_wait_tagged(*sent, RCCE_FLAG_SET, tag, len);
    else
#endif
    RCCE_wait_until(*sent, RCCE_FLAG_SET);      
    RCCE_flag_write(ready, RCCE_FLAG_SET, source);
#endif // !USE_REMOTE_PUT_LOCAL_GET
    return(RCCE_SUCCESS);
  }
#endif // USE_SYNCH_FOR_ZERO_BYTE

#ifdef USE_REMOTE_PUT_LOCAL_GET

  first_test = 0; /* force blocking function, does not work for now */
  *test = 1;

  // tell the source I am ready to receive
  RCCE_flag_write(ready, RCCE_FLAG_SET, source);
  
  if(!pipe) {
    // receive data in units of available chunk size of MPB 
    for (wsize=0; wsize< (size/chunk)*chunk; wsize+=chunk) {
      bufptr = privbuf + wsize;
      nbytes = chunk;
      // if function is called in test mode, check if first chunk has been sent already. 
      // If so, proceed as usual. If not, exit immediately 
      if (*test && first_test) {
	first_test = 0;
	RCCE_test_flag(*sent, RCCE_FLAG_SET, test);
	if (!(*test)) return(RCCE_SUCCESS);
      }
      
      if (wsize != 0)
	RCCE_flag_write(ready, RCCE_FLAG_SET, source);
  
#ifdef USE_TAGGED_FLAGS
      if( (wsize == 0) && (!probe) )
	RCCE_wait_tagged(*sent, RCCE_FLAG_SET, tag, len);
      else
#endif     
      RCCE_wait_until(*sent, RCCE_FLAG_SET);

      RCCE_flag_write(sent, RCCE_FLAG_UNSET, RCCE_IAM);

      // copy data from local MPB space to private memory 
      if(copy) RCCE_get((t_vcharp)bufptr, combuf, nbytes, RCCE_IAM);
    }
  }
  
#else // LOCAL PUT / REMOTE GET: (standard)

  if(!pipe) {
    // receive data in units of available chunk size of MPB 
    for (wsize=0; wsize< (size/chunk)*chunk; wsize+=chunk) {
      bufptr = privbuf + wsize;
      nbytes = chunk;
      // if function is called in test mode, check if first chunk has been sent already. 
      // If so, proceed as usual. If not, exit immediately 
      if (*test && first_test) {
	first_test = 0;
	RCCE_test_flag(*sent, RCCE_FLAG_SET, test);
	if (!(*test)) return(RCCE_SUCCESS);
      }
      if(!mcast)
      {
#ifdef USE_TAGGED_FLAGS
	if( (wsize == 0) && (!probe) )
	  RCCE_wait_tagged(*sent, RCCE_FLAG_SET, tag, len);
	else
#endif
	RCCE_wait_until(*sent, RCCE_FLAG_SET);

	RCCE_flag_write(sent, RCCE_FLAG_UNSET, RCCE_IAM);
      }
      else {
	RCCE_TNS_barrier(&RCCE_COMM_WORLD);
      }
      // copy data from remote MPB space to private memory 
      if(copy) RCCE_get((t_vcharp)bufptr, combuf, nbytes, source);
      
      if(!mcast) {
	// tell the source I have moved data out of its comm buffer
	RCCE_flag_write(ready, RCCE_FLAG_SET, source);
      }
      else {
	RCCE_TNS_barrier(&RCCE_COMM_WORLD);
      }
    }
  }
#endif // !USE_REMOTE_PUT_LOCAL_GET

#ifdef USE_REMOTE_PUT_LOCAL_GET

  else // if(!pipe) ->  if(pipe)
  { 
    // pipelined version of send/recv:

    size_t subchunk1, subchunk2; 

    for (wsize=0; wsize < (size/chunk)*chunk; wsize+=chunk) {

      if (*test && first_test) {
	first_test = 0;
	RCCE_test_flag(*sent, RCCE_FLAG_SET, test);
	if (!(*test)) return(RCCE_SUCCESS);
      }

      if(wsize == 0) {
	// allign sub-chunks to cache line granularity:
	subchunk1 = ( (chunk / 2) / RCCE_LINE_SIZE ) * RCCE_LINE_SIZE;
	subchunk2 = chunk - subchunk1;
      }

      bufptr = privbuf + wsize;
      nbytes = subchunk1;
      
#ifdef USE_TAGGED_FLAGS
      if( (wsize == 0) && (!probe) )
	RCCE_wait_tagged(*sent, RCCE_FLAG_SET, tag, len);
      else
#endif
      RCCE_wait_until(*sent, RCCE_FLAG_SET);
      
      RCCE_flag_write(sent, RCCE_FLAG_UNSET, RCCE_IAM);

      RCCE_flag_write(ready, RCCE_FLAG_SET, source);

      // copy data chunk 1 from local MPB space to private memory 
      if(copy) RCCE_get((t_vcharp)bufptr, combuf, nbytes, RCCE_IAM);

      bufptr = privbuf + wsize + subchunk1;
      nbytes = subchunk2;

      RCCE_wait_until(*sent, RCCE_FLAG_SET);
      RCCE_flag_write(sent, RCCE_FLAG_UNSET, RCCE_IAM);

      if (wsize + chunk < (size/chunk)*chunk)
        RCCE_flag_write(ready, RCCE_FLAG_SET, source);

      // copy data chunk 2 from local MPB space to private memory 
      if(copy) RCCE_get((t_vcharp)bufptr, combuf + subchunk1, nbytes, RCCE_IAM);
    }

  } //  if(pipe)

#else // LOCAL PUT / REMOTE GET: (standard)

  else // if(!pipe) ->  if(pipe)
  {
    // pipelined version of send/recv:
  
    size_t subchunk1, subchunk2;

    for (wsize=0; wsize < (size/chunk)*chunk; wsize+=chunk) {
      
      if (*test && first_test) {
	first_test = 0;
	RCCE_test_flag(*sent, RCCE_FLAG_SET, test);
	if (!(*test)) return(RCCE_SUCCESS);
      }

      if(wsize == 0) {
	// allign sub-chunks to cache line granularity:
	subchunk1 = ( (chunk / 2) / RCCE_LINE_SIZE ) * RCCE_LINE_SIZE;
	subchunk2 = chunk - subchunk1;
      }
      
      bufptr = privbuf + wsize;
      nbytes = subchunk1;
      
#ifdef USE_TAGGED_FLAGS
      if( (wsize == 0) && (!probe) )
	RCCE_wait_tagged(*sent, RCCE_FLAG_SET, tag, len);
      else
#endif
      RCCE_wait_until(*sent, RCCE_FLAG_SET);

      RCCE_flag_write(sent, RCCE_FLAG_UNSET, RCCE_IAM);
      RCCE_flag_write(ready, RCCE_FLAG_SET, source);

      // copy data chunk 1 from remote MPB space to private memory 
      if(copy) RCCE_get((t_vcharp)bufptr, combuf, nbytes, source);
      
      bufptr = privbuf + wsize + subchunk1;
      nbytes = subchunk2;
      
      RCCE_wait_until(*sent, RCCE_FLAG_SET);
      RCCE_flag_write(sent, RCCE_FLAG_UNSET, RCCE_IAM);
      RCCE_flag_write(ready, RCCE_FLAG_SET, source);

      // copy data chunk 2 from remote MPB space to private memory 
      if(copy) RCCE_get((t_vcharp)bufptr, combuf + subchunk1, nbytes, source);
    }

  } // if(pipe)

#endif // !USE_REMOTE_PUT_LOCAL_GET

  remainder = size%chunk; 
  // if nothing is left over, we are done 
  if (!remainder) return(RCCE_SUCCESS);

  // receive remainder of data--whole cache lines               
  bufptr = privbuf + (size/chunk)*chunk;
  nbytes = remainder - remainder%RCCE_LINE_SIZE;

  if (nbytes) {

    // if function is called in test mode, check if first chunk has been sent already. 
    // If so, proceed as usual. If not, exit immediately 
    if (*test && first_test) {
      first_test = 0;
      RCCE_test_flag(*sent, RCCE_FLAG_SET, test);
      if (!(*test)) return(RCCE_SUCCESS);
    }

#ifdef USE_REMOTE_PUT_LOCAL_GET

    if (wsize != 0)
      RCCE_flag_write(ready, RCCE_FLAG_SET, source);

#ifdef USE_TAGGED_FLAGS
    if( (wsize == 0) && (!probe) )
      RCCE_wait_tagged(*sent, RCCE_FLAG_SET, tag, len);
    else
#endif
    RCCE_wait_until(*sent, RCCE_FLAG_SET);

    RCCE_flag_write(sent, RCCE_FLAG_UNSET, RCCE_IAM);
    
    // copy data from local MPB space to private memory 
    if(copy) RCCE_get((t_vcharp)bufptr, combuf, nbytes, RCCE_IAM);
    wsize += nbytes;

#else // LOCAL PUT / REMOTE GET: (standard)

    if(!mcast) {
#ifdef USE_TAGGED_FLAGS
      if( (wsize == 0) && (!probe) )
	RCCE_wait_tagged(*sent, RCCE_FLAG_SET, tag, len);
      else
#endif
      RCCE_wait_until(*sent, RCCE_FLAG_SET);

      RCCE_flag_write(sent, RCCE_FLAG_UNSET, RCCE_IAM);
    }
    else {
      RCCE_TNS_barrier(&RCCE_COMM_WORLD);
    }

    // copy data from remote MPB space to private memory 
    if(copy) RCCE_get((t_vcharp)bufptr, combuf, nbytes, source);

    if(!mcast) {
      // tell the source I have moved data out of its comm buffer
      RCCE_flag_write(ready, RCCE_FLAG_SET, source);
    }
    else {
      RCCE_TNS_barrier(&RCCE_COMM_WORLD);
    }
#endif // !USE_REMOTE_PUT_LOCAL_GET

  } // if (nbytes)

  remainder = remainder%RCCE_LINE_SIZE;
  if (!remainder) return(RCCE_SUCCESS);

  // remainder is less than cache line. This must be copied into appropriately sized 
  // intermediate space before exact number of bytes get copied to the final destination 
  bufptr = privbuf + (size/chunk)*chunk + nbytes;
  nbytes = RCCE_LINE_SIZE;

  // if function is called in test mode, check if first chunk has been sent already. 
  // If so, proceed as usual. If not, exit immediately 
  if (*test && first_test) {
    first_test = 0;
    RCCE_test_flag(*sent, RCCE_FLAG_SET, test);
    if (!(*test)) return(RCCE_SUCCESS);
  }

#ifdef USE_REMOTE_PUT_LOCAL_GET

  if (wsize != 0)
    RCCE_flag_write(ready, RCCE_FLAG_SET, source);

#ifdef USE_TAGGED_FLAGS
  if( (wsize == 0) && (!probe) )
    RCCE_wait_tagged(*sent, RCCE_FLAG_SET, tag, len);
  else
#endif
  RCCE_wait_until(*sent, RCCE_FLAG_SET);

  RCCE_flag_write(sent, RCCE_FLAG_UNSET, RCCE_IAM);

  // copy data from local MPB space to private memory
  if(copy) {
    RCCE_get((t_vcharp)padline, combuf, nbytes, RCCE_IAM);
#ifdef COPPERRIDGE
    memcpy_scc(bufptr,padline,remainder);
#else
    memcpy(bufptr,padline,remainder);
#endif
  }
    
#else // LOCAL PUT / REMOTE GET: (standard)

  if(!mcast) {
#ifdef USE_TAGGED_FLAGS
    if( (wsize == 0) && (!probe) )
      RCCE_wait_tagged(*sent, RCCE_FLAG_SET, tag, len);
    else
#endif
    RCCE_wait_until(*sent, RCCE_FLAG_SET);
    
    RCCE_flag_write(sent, RCCE_FLAG_UNSET, RCCE_IAM);
  }
  else {
    RCCE_TNS_barrier(&RCCE_COMM_WORLD);
  }
  
  // copy data from remote MPB space to private memory   
  if(copy) {
    RCCE_get((t_vcharp)padline, combuf, nbytes, source);
#ifdef COPPERRIDGE
    memcpy_scc(bufptr,padline,remainder);
#else
    memcpy(bufptr,padline,remainder);
#endif
  }
  
  if(!mcast) {
    // tell the source I have moved data out of its comm buffer
    RCCE_flag_write(ready, RCCE_FLAG_SET, source);
  }
  else {
    RCCE_TNS_barrier(&RCCE_COMM_WORLD);
  }

#endif // !USE_REMOTE_PUT_LOCAL_GET

  return(RCCE_SUCCESS);
}


static int RCCE_push_recv_request(RCCE_RECV_REQUEST *request) {

  char padline[RCCE_LINE_SIZE]; // copy buffer, used if message not multiple of line size
  int   test;                   // flag for calling RCCE_test_flag()

  if(request->finished) return(RCCE_SUCCESS);

  if(request->label == 1) goto label1;
  if(request->label == 2) goto label2;
  if(request->label == 3) goto label3;
  if(request->label == 4) goto label4;

  if(request->probe) {
#ifdef USE_TAGGED_FLAGS
    RCCE_test_tagged(*(request->probe), RCCE_FLAG_SET, &test, request->tag, request->len);
#else
    RCCE_test_flag(*(request->probe), RCCE_FLAG_SET, &test);
#endif
    if(!test) {
      request->label = 0;
      return(RCCE_PENDING);
    }
    RCCE_flag_write(request->probe, RCCE_FLAG_UNSET, RCCE_IAM);
  }

#ifdef USE_SYNCH_FOR_ZERO_BYTE
  // synchronize even in case of zero byte messages:
  if(request->size == 0) {
#ifdef USE_REMOTE_PUT_LOCAL_GET
    RCCE_flag_write(request->ready, RCCE_FLAG_SET, request->source);
  label1:
#ifdef USE_TAGGED_FLAGS
    if(!request->probe)
      RCCE_test_tagged(*(request->sent), RCCE_FLAG_SET, &test, request->tag, request->len);
    else
#endif
    RCCE_test_flag(*(request->sent), RCCE_FLAG_SET, &test);
    if(!test) {
      request->label = 1;
      return(RCCE_PENDING);      
    }
    RCCE_flag_write(request->sent, RCCE_FLAG_UNSET, RCCE_IAM);
#else // LOCAL PUT / REMOTE GET: (standard)
  label1:
#ifdef USE_TAGGED_FLAGS
    if(!request->probe)
      RCCE_test_tagged(*(request->sent), RCCE_FLAG_SET, &test, request->tag, request->len);
    else
#endif
    RCCE_test_flag(*(request->sent), RCCE_FLAG_SET, &test);
    if(!test) {
      request->label = 1;
      return(RCCE_PENDING);      
    }
    RCCE_flag_write(request->sent, RCCE_FLAG_UNSET, RCCE_IAM);
    RCCE_flag_write(request->ready, RCCE_FLAG_SET, request->source);
#endif // !USE_REMOTE_PUT_LOCAL_GET
    request->finished = 1;
    return(RCCE_SUCCESS);
  }
#endif // USE_SYNCH_FOR_ZERO_BYTE


  // receive data in units of available chunk size of MPB 
  for (; request->wsize < (request->size / request->chunk) * request->chunk; request->wsize += request->chunk) {
    request->bufptr = request->privbuf + request->wsize;
    request->nbytes = request->chunk;

#ifdef USE_REMOTE_PUT_LOCAL_GET

    // tell the source I am ready to receive
    RCCE_flag_write(request->ready, RCCE_FLAG_SET, request->source);

  label2:
#ifdef USE_TAGGED_FLAGS
    if( (request->wsize == 0) && (!request->probe) )
      RCCE_test_tagged(*(request->sent), RCCE_FLAG_SET, &test, request->tag, request->len);
    else
#endif
    RCCE_test_flag(*(request->sent), RCCE_FLAG_SET, &test);

    if(!test) {
      request->label = 2;
      return(RCCE_PENDING);
    }
    RCCE_flag_write(request->sent, RCCE_FLAG_UNSET, RCCE_IAM);
    
    // copy data from local MPB space to private memory 
    if(request->copy) RCCE_get((t_vcharp)request->bufptr, request->combuf, request->nbytes, RCCE_IAM);

#else // LOCAL PUT / REMOTE GET: (standard)

  label2:
#ifdef USE_TAGGED_FLAGS
    if( (request->wsize == 0) && (!request->probe) )
      RCCE_test_tagged(*(request->sent), RCCE_FLAG_SET, &test, request->tag, request->len);
    else
#endif
    RCCE_test_flag(*(request->sent), RCCE_FLAG_SET, &test);

    if(!test) {
      request->label = 2;
      return(RCCE_PENDING);
    }
    RCCE_flag_write(request->sent, RCCE_FLAG_UNSET, RCCE_IAM);
    
    // copy data from remote MPB space to private memory 
    if(request->copy) RCCE_get((t_vcharp)request->bufptr, request->combuf, request->nbytes, request->source);

    // tell the source I have moved data out of its comm buffer
    RCCE_flag_write(request->ready, RCCE_FLAG_SET, request->source);

#endif // !USE_REMOTE_PUT_LOCAL_GET

  } // for

  request->remainder = request->size % request->chunk; 
  // if nothing is left over, we are done 
  if (!request->remainder) {
    request->finished = 1;
    return(RCCE_SUCCESS);
  }

  // receive remainder of data--whole cache lines               
  request->bufptr = request->privbuf + (request->size / request->chunk) * request->chunk;
  request->nbytes = request->remainder - request->remainder % RCCE_LINE_SIZE;

  if (request->nbytes) {

#ifdef USE_REMOTE_PUT_LOCAL_GET

    // tell the source I am ready to receive
    RCCE_flag_write(request->ready, RCCE_FLAG_SET, request->source);

  label3:
#ifdef USE_TAGGED_FLAGS
    if( (request->wsize == 0) && (!request->probe) )
      RCCE_test_tagged(*(request->sent), RCCE_FLAG_SET, &test, request->tag, request->len);
    else
#endif
    RCCE_test_flag(*(request->sent), RCCE_FLAG_SET, &test);

    if(!test) {
      request->label = 3;
      return(RCCE_PENDING);
    }
    RCCE_flag_write(request->sent, RCCE_FLAG_UNSET, RCCE_IAM);
    
    // copy data from local MPB space to private memory 
     if(request->copy) RCCE_get((t_vcharp)request->bufptr, request->combuf, request->nbytes, RCCE_IAM);

#else // LOCAL PUT / REMOTE GET: (standard)

  label3:
#ifdef USE_TAGGED_FLAGS
    if( (request->wsize == 0) && (!request->probe) )
      RCCE_test_tagged(*(request->sent), RCCE_FLAG_SET, &test, request->tag, request->len);
    else
#endif
    RCCE_test_flag(*(request->sent), RCCE_FLAG_SET, &test);

    if(!test) {
      request->label = 3;
      return(RCCE_PENDING);
    }

    RCCE_flag_write(request->sent, RCCE_FLAG_UNSET, RCCE_IAM);

    // copy data from remote MPB space to private memory 
    if(request->copy) RCCE_get((t_vcharp)request->bufptr, request->combuf, request->nbytes, request->source);

    // tell the source I have moved data out of its comm buffer
    RCCE_flag_write(request->ready, RCCE_FLAG_SET, request->source);

#endif // !USE_REMOTE_PUT_LOCAL_GET

  } // if(request->nbytes)

  request->remainder = request->size % request->chunk; 
  request->remainder = request->remainder % RCCE_LINE_SIZE;

  if (!request->remainder) {
    request->finished = 1;
    return(RCCE_SUCCESS);
  }

  // remainder is less than cache line. This must be copied into appropriately sized 
  // intermediate space before exact number of bytes get copied to the final destination 
  request->bufptr = request->privbuf + (request->size / request->chunk) * request->chunk + request->nbytes;
  request->nbytes = RCCE_LINE_SIZE;

#ifdef USE_REMOTE_PUT_LOCAL_GET

  // tell the source I am ready to receive
  RCCE_flag_write(request->ready, RCCE_FLAG_SET, request->source);

label4:
#ifdef USE_TAGGED_FLAGS
#ifdef USE_PROBE_FLAGS_SHORTCUT
  if(request->privbuf == NULL) 
  {
    request->finished = 1;
    return(RCCE_SUCCESS);
  }
#endif
  if( (request->wsize == 0) && (!request->probe) )
    RCCE_test_tagged(*(request->sent), RCCE_FLAG_SET, &test, request->tag, request->len);
  else
#endif
  RCCE_test_flag(*(request->sent), RCCE_FLAG_SET, &test);
  
  if(!test) {
    request->label = 4;
    return(RCCE_PENDING);
  }
  RCCE_flag_write(request->sent, RCCE_FLAG_UNSET, RCCE_IAM);
  
  // copy data from local MPB space to private memory 
  if(request->copy) {
    RCCE_get((t_vcharp)padline, request->combuf, request->nbytes, RCCE_IAM);  
#ifdef COPPERRIDGE
    memcpy_scc(request->bufptr,padline,request->remainder);
#else
    memcpy(request->bufptr,padline,request->remainder);
#endif
  }

#else // LOCAL PUT / REMOTE GET: (standard)

 label4:
#ifdef USE_TAGGED_FLAGS
  if( (request->wsize == 0) && (!request->probe) )
    RCCE_test_tagged(*(request->sent), RCCE_FLAG_SET, &test, request->tag, request->len);
  else
#endif
  RCCE_test_flag(*(request->sent), RCCE_FLAG_SET, &test);

  if(!test) {
    request->label = 4;
    return(RCCE_PENDING);
  }
  RCCE_flag_write(request->sent, RCCE_FLAG_UNSET, RCCE_IAM);

  // copy data from remote MPB space to private memory   
  if(request->copy) {
    RCCE_get((t_vcharp)padline, request->combuf, request->nbytes, request->source);
#ifdef COPPERRIDGE
    memcpy_scc(request->bufptr,padline,request->remainder);
#else
    memcpy(request->bufptr,padline,request->remainder);
#endif
  }
  
  // tell the source I have moved data out of its comm buffer
  RCCE_flag_write(request->ready, RCCE_FLAG_SET, request->source);

#endif // !USE_REMOTE_PUT_LOCAL_GET

  request->finished = 1;
  return(RCCE_SUCCESS);
}

static void RCCE_init_recv_request(
  char *privbuf,    // source buffer in local private memory (send buffer)
  t_vcharp combuf,  // intermediate buffer in MPB
  size_t chunk,     // size of MPB available for this message (bytes)
  RCCE_FLAG *ready, // flag indicating whether receiver is ready
  RCCE_FLAG *sent,  // flag indicating whether message has been sent by source
  size_t size,      // size of message (bytes)
  int source,       // UE that will send the message
  int copy,         // set to 0 for cancel function
  void* tag,        // additional tag?
  int len,          // length of additional tag  
  RCCE_FLAG *probe, // flag for probing for incoming messages
  RCCE_RECV_REQUEST *request
  ) {

  request->privbuf   = privbuf;
  request->combuf    = combuf;
  request->chunk     = chunk;
  request->ready     = ready;
  request->sent      = sent;
  request->size      = size;
  request->source    = source;

  request->copy      = copy;
  request->tag       = tag;
  request->len       = len;
  request->probe     = probe;

  request->wsize     = 0;
  request->remainder = 0;
  request->nbytes    = 0;
  request->bufptr    = NULL;

  request->label     = 0;
  request->finished  = 0;

  request->next      = NULL;

  return;
}

#ifndef GORY
// this is the LfBS-customized message passing API      

//--------------------------------------------------------------------------------------
// FUNCTION: RCCE_recv
//--------------------------------------------------------------------------------------
// recv function for simplified API; use library-maintained variables for synchronization
// and set the test variable to 0 (ignore)
//--------------------------------------------------------------------------------------
int RCCE_recv(char *privbuf, size_t size, int source) {
  int ignore;

#ifdef USE_PROBE_FLAGS
  RCCE_FLAG* probe = &RCCE_probe_flag[source];
#else
  RCCE_FLAG* probe = 0;
#endif

#ifndef USE_REMOTE_PUT_LOCAL_GET
  if(RCCE_recv_queue[source] != NULL)
#else
  if(RCCE_recv_queue != NULL)
#endif
    return(RCCE_REJECTED);

  ignore = 0;
#ifdef USE_TAGGED_FOR_SHORT
  if(size <= (RCCE_LINE_SIZE - sizeof(int)))
  {
#ifdef USE_PROBE_FLAGS
    RCCE_wait_tagged(*probe, RCCE_FLAG_SET, privbuf, size);
    RCCE_flag_write(probe, RCCE_FLAG_UNSET, RCCE_IAM);
#endif

#ifdef USE_REMOTE_PUT_LOCAL_GET

    RCCE_flag_write(&RCCE_ready_flag[RCCE_IAM], RCCE_FLAG_SET, source);
  
#ifndef USE_PROBE_FLAGS_SHORTCUT
#ifdef USE_PROBE_FLAGS
    RCCE_wait_until(RCCE_sent_flag[source], RCCE_FLAG_SET);
#else
    RCCE_wait_tagged(RCCE_sent_flag[source], RCCE_FLAG_SET, privbuf, size);
#endif
    RCCE_flag_write(&RCCE_sent_flag[source], RCCE_FLAG_UNSET, RCCE_IAM);
#endif

#else // LOCAL PUT / REMOTE GET: (standard)

#ifdef USE_PROBE_FLAGS
    RCCE_wait_until(RCCE_sent_flag[source], RCCE_FLAG_SET);
#else
    RCCE_wait_tagged(RCCE_sent_flag[source], RCCE_FLAG_SET, privbuf, size);
#endif
    RCCE_flag_write(&RCCE_sent_flag[source], RCCE_FLAG_UNSET, RCCE_IAM);

    RCCE_flag_write(&RCCE_ready_flag[RCCE_IAM], RCCE_FLAG_SET, source);  

#endif // !USE_REMOTE_PUT_LOCAL_GET

    return(RCCE_SUCCESS);
  }
  else
#endif
  return(RCCE_recv_general(privbuf, RCCE_buff_ptr, RCCE_chunk, 
			   &RCCE_ready_flag[RCCE_IAM], &RCCE_sent_flag[source], 
			   size, source, &ignore, 
			   1, 0, 0,          // copy, pipe, mcast
			   NULL, 0, probe)); // tag, len, probe
}

int RCCE_recv_tagged(char *privbuf, size_t size, int source, void* tag, int len) {
  int ignore;

#ifdef USE_PROBE_FLAGS
  RCCE_FLAG* probe = &RCCE_probe_flag[source];
#else
  RCCE_FLAG* probe = 0;
#endif

#ifndef USE_REMOTE_PUT_LOCAL_GET
  if(RCCE_recv_queue[source] != NULL)
#else
  if(RCCE_recv_queue != NULL)
#endif
    return(RCCE_REJECTED);

  ignore = 0;
#ifdef USE_TAGGED_FLAGS
  return(RCCE_recv_general(privbuf, RCCE_buff_ptr, RCCE_chunk, 
			   &RCCE_ready_flag[RCCE_IAM], &RCCE_sent_flag[source], 
			   size, source, &ignore, 
			   1, 0, 0,           // copy, pipe, mcast
			   tag, len, probe)); // tag, len, probe
#else
  RCCE_recv_general(tag, RCCE_buff_ptr, RCCE_chunk, 
		    &RCCE_ready_flag[RCCE_IAM], &RCCE_sent_flag[source], 
		    len, source, &ignore, 
		    1, 0, 0,         // copy, pipe, mcast
		    NULL, 0, probe); // tag, len, probe

  return(RCCE_recv_general(privbuf, RCCE_buff_ptr, RCCE_chunk, 
			   &RCCE_ready_flag[RCCE_IAM], &RCCE_sent_flag[source], 
			   size, source, &ignore, 
			   1, 0, 0,          // copy, pipe, mcast
			   NULL, 0, probe)); // tag, len, probe
#endif
}


//--------------------------------------------------------------------------------------
// FUNCTION: RCCE_recv_pipe
//--------------------------------------------------------------------------------------
// recv function for simplified API; use library-maintained variables for synchronization
// and set the test variable to 0 (ignore)
//--------------------------------------------------------------------------------------
int RCCE_recv_pipe(char *privbuf, size_t size, int source) {
  int ignore;

#ifdef USE_PROBE_FLAGS
  RCCE_FLAG* probe = &RCCE_probe_flag[source];
#else
  RCCE_FLAG* probe = 0;
#endif

#ifndef USE_REMOTE_PUT_LOCAL_GET
  if(RCCE_recv_queue[source] != NULL)
#else
  if(RCCE_recv_queue != NULL)
#endif
    return(RCCE_REJECTED);

  ignore = 0;

#ifdef USE_PIPELINE_FLAGS
  return(RCCE_recv_general(privbuf, RCCE_buff_ptr, RCCE_chunk, 
			   &RCCE_ready_flag_pipe[RCCE_IAM], &RCCE_sent_flag_pipe[source], 
			   size, source, &ignore, 
			   1, 1, 0,          // copy, pipe, mcast
			   NULL, 0, probe)); // tag, len, probe
#else
  return(RCCE_recv_general(privbuf, RCCE_buff_ptr, RCCE_chunk, 
			   &RCCE_ready_flag[RCCE_IAM], &RCCE_sent_flag[source],
			   size, source, &ignore,
			   1, 1, 0,          // copy, pipe, mcast
			   NULL, 0, probe)); // tag, len, probe
#endif
}

int RCCE_recv_mcast(char *privbuf, size_t size, int source) {
  int ignore;

#ifdef USE_PROBE_FLAGS
  RCCE_FLAG* probe = &RCCE_probe_flag[source];
#else
  RCCE_FLAG* probe = 0;
#endif

#ifndef USE_REMOTE_PUT_LOCAL_GET
  if(RCCE_recv_queue[source] != NULL)
#else
  if(RCCE_recv_queue != NULL)
#endif
    return(RCCE_REJECTED);

  ignore = 0;
  return(RCCE_recv_general(privbuf, RCCE_buff_ptr, RCCE_chunk, 
			   NULL, NULL, 
			   size, source, &ignore,
			   1, 0, 1,          // copy, pipe, mcast
			   NULL, 0, probe)); // tag, len, probe
}

//--------------------------------------------------------------------------------------
// FUNCTION: RCCE_recv_cancel
//--------------------------------------------------------------------------------------
// recv function without copying the message into the recv buffer
//--------------------------------------------------------------------------------------
int RCCE_recv_cancel(size_t size, int source) {
  int ignore;

#ifdef USE_PROBE_FLAGS
  RCCE_FLAG* probe = &RCCE_probe_flag[source];
#else
  RCCE_FLAG* probe = 0;
#endif

#ifndef USE_REMOTE_PUT_LOCAL_GET
  if(RCCE_recv_queue[source] != NULL)
#else
  if(RCCE_recv_queue != NULL)
#endif
    return(RCCE_REJECTED);

  ignore = 0;
#ifdef USE_TAGGED_FOR_SHORT
  if(size <= (RCCE_LINE_SIZE - sizeof(int)))
  {
#ifdef USE_PROBE_FLAGS
    RCCE_wait_until(*probe, RCCE_FLAG_SET);
    RCCE_flag_write(probe, RCCE_FLAG_UNSET, RCCE_IAM);
#endif

#ifdef USE_REMOTE_PUT_LOCAL_GET

    RCCE_flag_write(&RCCE_ready_flag[RCCE_IAM], RCCE_FLAG_SET, source);
#ifndef USE_PROBE_FLAGS_SHORTCUT
    RCCE_wait_until(RCCE_sent_flag[source], RCCE_FLAG_SET);
    RCCE_flag_write(&RCCE_sent_flag[source], RCCE_FLAG_UNSET, RCCE_IAM);
#endif

#else // LOCAL PUT / REMOTE GET: (standard)

    RCCE_wait_until(RCCE_sent_flag[source], RCCE_FLAG_SET);
    RCCE_flag_write(&RCCE_sent_flag[source], RCCE_FLAG_UNSET, RCCE_IAM);
    RCCE_flag_write(&RCCE_ready_flag[RCCE_IAM], RCCE_FLAG_SET, source);  

#endif // !USE_REMOTE_PUT_LOCAL_GET

    return(RCCE_SUCCESS);
  }
  else
#endif
  return(RCCE_recv_general(NULL, RCCE_buff_ptr, RCCE_chunk, 
			   &RCCE_ready_flag[RCCE_IAM], &RCCE_sent_flag[source], 
			   size, source, &ignore,
			   0, 0, 0,          // copy, pipe, mcast
			   NULL, 0, probe)); // tag, len, probe
}

//--------------------------------------------------------------------------------------
// FUNCTION: RCCE_recv_test
//--------------------------------------------------------------------------------------
// recv_test function for simplified API; use library-maintained variables for 
// synchronization and set the test variable to 1 (do test)
//--------------------------------------------------------------------------------------
int RCCE_recv_test(char *privbuf, size_t size, int source, int *test) {

#ifdef USE_PROBE_FLAGS
  RCCE_FLAG* probe = &RCCE_probe_flag[source];
#else
  RCCE_FLAG* probe = 0;
#endif

#ifndef USE_REMOTE_PUT_LOCAL_GET
  if(RCCE_recv_queue[source] != NULL) {
#else
  if(RCCE_recv_queue != NULL) {
#endif
    (*test) = 0;
    return(RCCE_REJECTED);
  }

  
  /* make sure the test flag is set, regardless of input value */
  *test = 1;
  return(RCCE_recv_general(privbuf, RCCE_buff_ptr, RCCE_chunk, 
			   &RCCE_ready_flag[RCCE_IAM], &RCCE_sent_flag[source], 
			   size, source, test,
			   1, 0, 0,          // copy, pipe, mcast
			   NULL, 0, probe)); // tag, len, probe
}


//--------------------------------------------------------------------------------------
// FUNCTION: RCCE_recv_probe
//--------------------------------------------------------------------------------------
// probe for a message; just like RCCE_recv_test, but without any receiving
//--------------------------------------------------------------------------------------
int RCCE_recv_probe(int source, int *test, t_vcharp *combuf) {

#ifdef USE_PROBE_FLAGS
  RCCE_FLAG* flag = &RCCE_probe_flag[source];
#else
  RCCE_FLAG* flag = &RCCE_sent_flag[source];
#endif

#ifndef USE_REMOTE_PUT_LOCAL_GET
  if(RCCE_recv_queue[source] != NULL) {
#else
  if(RCCE_recv_queue != NULL) {
#endif
    (*test) = 0;
    (*combuf) = NULL;
    return(RCCE_REJECTED);
  }

  if(test) {
    RCCE_test_flag((*flag), RCCE_FLAG_SET, test);
#ifdef USE_REMOTE_PUT_LOCAL_GET
    if(combuf && (*test)) (*combuf) = RCCE_buff_ptr;
#else
    if(combuf && (*test)) (*combuf) = RCCE_comm_buffer[source]+(RCCE_buff_ptr-RCCE_comm_buffer[RCCE_IAM]);
#endif
  }
  else {
    RCCE_wait_until((*flag), RCCE_FLAG_SET);
#ifdef USE_REMOTE_PUT_LOCAL_GET
    if(combuf) (*combuf) = RCCE_buff_ptr;
#else
    if(combuf) (*combuf) = RCCE_comm_buffer[source]+(RCCE_buff_ptr-RCCE_comm_buffer[RCCE_IAM]);
#endif
  }

#ifdef USE_PROBE_FLAGS
  (*combuf) = NULL;
#endif
  
  return(RCCE_SUCCESS);
}

int RCCE_recv_probe_tagged(int source, int *test, t_vcharp *combuf, void* tag, int len) {

#ifdef USE_PROBE_FLAGS
  RCCE_FLAG* flag = &RCCE_probe_flag[source];
#else
  RCCE_FLAG* flag = &RCCE_sent_flag[source];
#endif

#ifndef USE_REMOTE_PUT_LOCAL_GET
  if(RCCE_recv_queue[source] != NULL) {
#else
  if(RCCE_recv_queue != NULL) {
#endif
    (*test) = 0;
    (*combuf) = NULL;
    return(RCCE_REJECTED);
  }

#ifdef USE_TAGGED_FLAGS
  if(test) {
    RCCE_test_tagged((*flag), RCCE_FLAG_SET, test, tag, len);
#ifdef USE_REMOTE_PUT_LOCAL_GET
    if(combuf && (*test)) (*combuf) = RCCE_buff_ptr;
#else
    if(combuf && (*test)) (*combuf) = RCCE_comm_buffer[source]+(RCCE_buff_ptr-RCCE_comm_buffer[RCCE_IAM]);
#endif
  }
  else {
    RCCE_wait_tagged((*flag), RCCE_FLAG_SET, tag, len);
#ifdef USE_REMOTE_PUT_LOCAL_GET
    if(combuf) (*combuf) = RCCE_buff_ptr;
#else
    if(combuf) (*combuf) = RCCE_comm_buffer[source]+(RCCE_buff_ptr-RCCE_comm_buffer[RCCE_IAM]);
#endif
  }
#else
  if(test) {
    RCCE_test_flag((*flag), RCCE_FLAG_SET, test);
  }
  else {
    RCCE_wait_until((*flag), RCCE_FLAG_SET);
  }

  if(!test || (test && (*test)))
  {
    RCCE_recv(tag, len, source);
    RCCE_wait_until((*flag), RCCE_FLAG_SET);
#ifdef USE_REMOTE_PUT_LOCAL_GET
    if(combuf) (*combuf) = RCCE_buff_ptr;
#else
    if(combuf) (*combuf) = RCCE_comm_buffer[source]+(RCCE_buff_ptr-RCCE_comm_buffer[RCCE_IAM]);
#endif
  } 
#endif

#ifdef USE_PROBE_FLAGS
  (*combuf) = NULL;
#endif

  return(RCCE_SUCCESS);
}

//--------------------------------------------------------------------------------------
// FUNCTION: RCCE_irecv
//--------------------------------------------------------------------------------------
// non-blocking recv function; returns an handle of type RCCE_RECV_REQUEST
//--------------------------------------------------------------------------------------
int RCCE_irecv(char *privbuf, size_t size, int source, RCCE_RECV_REQUEST *request) {

#ifdef USE_PROBE_FLAGS
  RCCE_FLAG* probe = &RCCE_probe_flag[source];
#else
  RCCE_FLAG* probe = 0;
#endif

  if(request == NULL){
    RCCE_RECV_REQUEST dummy_request;
    RCCE_irecv(privbuf, size, source, &dummy_request);
    RCCE_irecv_wait(&dummy_request);
    return(RCCE_SUCCESS);
  }

#ifdef USE_TAGGED_FOR_SHORT
  if(size <= (RCCE_LINE_SIZE - sizeof(int)))
    RCCE_init_recv_request(NULL, RCCE_buff_ptr, RCCE_chunk, 
			   &RCCE_ready_flag[RCCE_IAM], &RCCE_sent_flag[source], 
			   size, source, 0, privbuf, size, probe, request);
  else
#endif
  RCCE_init_recv_request(privbuf, RCCE_buff_ptr, RCCE_chunk, 
			 &RCCE_ready_flag[RCCE_IAM], &RCCE_sent_flag[source], 
			 size, source, 1, NULL, 0, probe, request);
  
#ifndef USE_REMOTE_PUT_LOCAL_GET
  if(RCCE_recv_queue[source] == NULL) {
#else
  if(RCCE_recv_queue == NULL) {
#endif
    
    if(RCCE_push_recv_request(request) == RCCE_SUCCESS) {
      return(RCCE_SUCCESS);
    }
    else {
#ifndef USE_REMOTE_PUT_LOCAL_GET
      RCCE_recv_queue[source] = request;
#else
      RCCE_recv_queue = request;
#endif
      return(RCCE_PENDING);
    }
  }
  else {
#ifndef USE_REMOTE_PUT_LOCAL_GET
    if(RCCE_recv_queue[source]->next == NULL) {
      RCCE_recv_queue[source]->next = request;
    }
#else
    if(RCCE_recv_queue->next == NULL) {
      RCCE_recv_queue->next = request;
    }
#endif
    else {
#ifndef USE_REMOTE_PUT_LOCAL_GET
      RCCE_RECV_REQUEST *run = RCCE_recv_queue[source];
#else
      RCCE_RECV_REQUEST *run = RCCE_recv_queue;
#endif
      while(run->next != NULL) run = run->next;      
      run->next = request;   
    }
    return(RCCE_RESERVED);
  }  
}

//--------------------------------------------------------------------------------------
// FUNCTION: RCCE_irecv_test
//--------------------------------------------------------------------------------------
// test function for completion of the requested non-blocking receive operation
//--------------------------------------------------------------------------------------
int RCCE_irecv_test(RCCE_RECV_REQUEST *request, int *test) {

  int source = request->source;

  if(request->finished) {
    (*test) = 1;
    return(RCCE_SUCCESS);
  }
  
#ifndef USE_REMOTE_PUT_LOCAL_GET
  if(RCCE_recv_queue[source] != request) {
#else
  if(RCCE_recv_queue != request) {
#endif
    (*test) = 0;
    return(RCCE_RESERVED);
  }

  RCCE_push_recv_request(request);
     
  if(request->finished) {
#ifndef USE_REMOTE_PUT_LOCAL_GET
    RCCE_recv_queue[source] = request->next;
#else
    RCCE_recv_queue = request->next;
#endif
   
    (*test) = 1;
    return(RCCE_SUCCESS);
  }

  (*test) = 0;
  return(RCCE_PENDING);
}


//--------------------------------------------------------------------------------------
// FUNCTION: RCCE_irecv_push
//--------------------------------------------------------------------------------------
// progress function for pending requests in the irecv queue 
//--------------------------------------------------------------------------------------
int RCCE_irecv_push(int source) {

#ifndef USE_REMOTE_PUT_LOCAL_GET
  RCCE_RECV_REQUEST *request = RCCE_recv_queue[source];
#else
  RCCE_RECV_REQUEST *request = RCCE_recv_queue;
#endif

  if(request == NULL) {
    return(RCCE_SUCCESS);
  }

  if(request->finished) {
    return(RCCE_SUCCESS);
  }
  
  RCCE_push_recv_request(request);   
     
  if(request->finished) {
#ifndef USE_REMOTE_PUT_LOCAL_GET
    RCCE_recv_queue[source] = request->next;
#else
    RCCE_recv_queue = request->next;
#endif
    return(RCCE_SUCCESS);
  }

  return(RCCE_PENDING);
}


//--------------------------------------------------------------------------------------
// FUNCTION: RCCE_irecv_wait
//--------------------------------------------------------------------------------------
// just wait for completion of the requestes non-blocking send operation
//--------------------------------------------------------------------------------------
int RCCE_irecv_wait(RCCE_RECV_REQUEST *request) {

  int ue;

#ifndef USE_REMOTE_PUT_LOCAL_GET
  while(!request->finished) {

    RCCE_irecv_push(request->source);

    if(!request->finished) {

      RCCE_isend_push(-1);

      for(ue=0; ue<RCCE_NP; ue++) {
	RCCE_irecv_push(ue);
      }
    }
  }
#else
  while(!request->finished) {

    RCCE_irecv_push(-1);

    if(!request->finished) {

      for(ue=0; ue<RCCE_NP; ue++) {
	RCCE_isend_push(ue);
      }

      RCCE_irecv_push(-1);
    }
  }
#endif
  
  return(RCCE_SUCCESS);
}

#else
// this is the gory synchronized message passing API      

//--------------------------------------------------------------------------------------
// FUNCTION: RCCE_recv
//--------------------------------------------------------------------------------------
// recv function for simplified API; use user-supplied variables for synchronization
// and set the test variable to 0 (ignore)
//--------------------------------------------------------------------------------------
int RCCE_recv(char *privbuf, t_vcharp combuf, size_t chunk, RCCE_FLAG *ready, 
              RCCE_FLAG *sent, size_t size, int source, RCCE_FLAG *probe) {
  int ignore = 0;
  return(RCCE_recv_general(privbuf, combuf, chunk, ready, sent, size, source,
                           &ignore,
			   1, 0, 0,   // copy, pipe, mcast
			   NULL, 0, probe)); // tag, len
}

//--------------------------------------------------------------------------------------
// FUNCTION: RCCE_recv_test
//--------------------------------------------------------------------------------------
// recv_test function for simplified API; use user-supplied variables for 
// synchronization and set the test variable to 1 (do test)
//--------------------------------------------------------------------------------------
int RCCE_recv_test(char *privbuf, t_vcharp combuf, size_t chunk, RCCE_FLAG *ready, 
              RCCE_FLAG *sent, size_t size, int source, int *test, RCCE_FLAG *probe) {
  /* make sure the test flag is set, regardless of input value */
  *test = 1;
  return(RCCE_recv_general(privbuf, combuf, chunk, ready, sent, size, source,
                           test,
			   1, 0, 0,   // copy, pipe, mcast
			   NULL, 0, probe)); // tag, len
}
#endif