//*************************************************************************************** // Synchronized receive routines. //*************************************************************************************** // // Author: Rob F. Van der Wijngaart // Intel Corporation // Date: 008/30/2010 // //*************************************************************************************** // // Copyright 2010 Intel Corporation // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // [2010-10-25] added support for non-blocking send/recv operations // - iRCCE_isend(), ..._test(), ..._wait(), ..._push() // - iRCCE_irecv(), ..._test(), ..._wait(), ..._push() // by Carsten Clauss, Chair for Operating Systems, // RWTH Aachen University // // [2010-11-12] extracted non-blocking code into separate library // by Carsten Scholtes // // [2010-12-09] added cancel functions for non-blocking send/recv requests // by Carsten Clauss // // [2011-02-21] added support for multiple incoming queues // (one recv queue per remote rank) // #include #include #ifdef CONFIG_ROCKCREEK #include static int iRCCE_push_recv_request(iRCCE_RECV_REQUEST *request) { char padline[RCCE_LINE_SIZE]; // copy buffer, used if message not multiple of line size int test; // flag for calling iRCCE_test_flag() if(request->finished) { return(iRCCE_SUCCESS); } if(request->label == 1) goto label1; if(request->label == 2) goto label2; if(request->label == 3) goto label3; // receive data in units of available chunk size of MPB for (; request->wsize < (request->size / request->chunk) * request->chunk; request->wsize += request->chunk) { request->bufptr = request->privbuf + request->wsize; request->nbytes = request->chunk; label1: iRCCE_test_flag(*(request->sent), RCCE_FLAG_SET, &test); if(!test) { request->label = 1; return(iRCCE_PENDING); } request->started = 1; RCCE_flag_write(request->sent, RCCE_FLAG_UNSET, RCCE_IAM); // copy data from source's MPB space to private memory iRCCE_get((t_vcharp)request->bufptr, request->combuf, request->nbytes, request->source); // tell the source I have moved data out of its comm buffer RCCE_flag_write(request->ready, RCCE_FLAG_SET, request->source); } request->remainder = request->size % request->chunk; // if nothing is left over, we are done if (!request->remainder) { request->finished = 1; return(iRCCE_SUCCESS); } // receive remainder of data--whole cache lines request->bufptr = request->privbuf + (request->size / request->chunk) * request->chunk; request->nbytes = request->remainder - request->remainder % RCCE_LINE_SIZE; if (request->nbytes) { label2: iRCCE_test_flag(*(request->sent), RCCE_FLAG_SET, &test); if(!test) { request->label = 2; return(iRCCE_PENDING); } request->started = 1; RCCE_flag_write(request->sent, RCCE_FLAG_UNSET, RCCE_IAM); // copy data from source's MPB space to private memory iRCCE_get((t_vcharp)request->bufptr, request->combuf, request->nbytes, request->source); // tell the source I have moved data out of its comm buffer RCCE_flag_write(request->ready, RCCE_FLAG_SET, request->source); } request->remainder = request->size % request->chunk; request->remainder = request->remainder % RCCE_LINE_SIZE; if (!request->remainder) { request->finished = 1; return(iRCCE_SUCCESS); } // remainder is less than cache line. This must be copied into appropriately sized // intermediate space before exact number of bytes get copied to the final destination request->bufptr = request->privbuf + (request->size / request->chunk) * request->chunk + request->nbytes; request->nbytes = RCCE_LINE_SIZE; label3: iRCCE_test_flag(*(request->sent), RCCE_FLAG_SET, &test); if(!test) { request->label = 3; return(iRCCE_PENDING); } request->started = 1; RCCE_flag_write(request->sent, RCCE_FLAG_UNSET, RCCE_IAM); // copy data from source's MPB space to private memory iRCCE_get((t_vcharp)padline, request->combuf, request->nbytes, request->source); memcpy(request->bufptr,padline,request->remainder); // tell the source I have moved data out of its comm buffer RCCE_flag_write(request->ready, RCCE_FLAG_SET, request->source); request->finished = 1; return(iRCCE_SUCCESS); } static void iRCCE_init_recv_request( char *privbuf, // source buffer in local private memory (send buffer) t_vcharp combuf, // intermediate buffer in MPB size_t chunk, // size of MPB available for this message (bytes) RCCE_FLAG *ready, // flag indicating whether receiver is ready RCCE_FLAG *sent, // flag indicating whether message has been sent by source size_t size, // size of message (bytes) int source, // UE that will send the message iRCCE_RECV_REQUEST *request ) { request->privbuf = privbuf; request->combuf = combuf; request->chunk = chunk; request->ready = ready; request->sent = sent; request->size = size; request->source = source; request->wsize = 0; request->remainder = 0; request->nbytes = 0; request->bufptr = NULL; request->label = 0; request->finished = 0; request->started = 0; request->next = NULL; return; } int iRCCE_irecv_search_source() { int i, j; int res =iRCCE_ANY_SOURCE; for( i=0; inext == NULL ) { iRCCE_irecv_any_source_queue->next = request; } else { iRCCE_RECV_REQUEST* run = iRCCE_irecv_any_source_queue; while( run->next != NULL ) run = run->next; run->next = request; } } return iRCCE_RESERVED; } } if (source<0 || source >= RCCE_NP) return(RCCE_error_return(RCCE_debug_comm,RCCE_ERROR_ID)); else { iRCCE_init_recv_request(privbuf, RCCE_buff_ptr, RCCE_chunk, &RCCE_ready_flag[RCCE_IAM], &RCCE_sent_flag[source], size, source, request); if(iRCCE_irecv_queue[source] == NULL) { if(iRCCE_push_recv_request(request) == iRCCE_SUCCESS) { return(iRCCE_SUCCESS); } else { iRCCE_irecv_queue[source] = request; if(request == &blocking_irecv_request) { iRCCE_irecv_wait(request); return(iRCCE_SUCCESS); } return(iRCCE_PENDING); } } else { if(iRCCE_irecv_queue[source]->next == NULL) { iRCCE_irecv_queue[source]->next = request; } else { iRCCE_RECV_REQUEST *run = iRCCE_irecv_queue[source]; while(run->next != NULL) run = run->next; run->next = request; } if(request == &blocking_irecv_request) { iRCCE_irecv_wait(request); return(iRCCE_SUCCESS); } return(iRCCE_RESERVED); } } } //-------------------------------------------------------------------------------------- // FUNCTION: iRCCE_irecv_test //-------------------------------------------------------------------------------------- // test function for completion of the requestes non-blocking recv operation // Just provide NULL instead of the testvar if you don't need it //-------------------------------------------------------------------------------------- int iRCCE_irecv_test(iRCCE_RECV_REQUEST *request, int *test) { int source; if(request == NULL) { if(iRCCE_irecv_push() == iRCCE_SUCCESS) { if (test) (*test) = 1; return(iRCCE_SUCCESS); } else { if (test) (*test) = 0; return(iRCCE_PENDING); } } // does request still have no source? if( request->source == iRCCE_ANY_SOURCE ) { request->source = iRCCE_irecv_search_source(); if( request->source == iRCCE_ANY_SOURCE ) { if (test) (*test) = 0; return iRCCE_RESERVED; } else { // take request out of wait_any_source-list // find request in queue if( request == iRCCE_irecv_any_source_queue ) { iRCCE_irecv_any_source_queue = iRCCE_irecv_any_source_queue->next; } else { iRCCE_RECV_REQUEST* run = iRCCE_irecv_any_source_queue; while( run->next != request ) run = run->next; run->next = request->next; } request->next = NULL; request->sent = &RCCE_sent_flag[request->source]; // set senders flag source = request->source; // queue request in iRCCE_irecv_queue if(iRCCE_irecv_queue[source] == NULL) { if(iRCCE_push_recv_request(request) == iRCCE_SUCCESS) { if (test) (*test) = 1; return(iRCCE_SUCCESS); } else { iRCCE_irecv_queue[source] = request; if(request == &blocking_irecv_request) { iRCCE_irecv_wait(request); if (test) (*test) = 1; return(iRCCE_SUCCESS); } if (test) (*test) = 0; return(iRCCE_PENDING); } } else { if(iRCCE_irecv_queue[source]->next == NULL) { iRCCE_irecv_queue[source]->next = request; } else { iRCCE_RECV_REQUEST *run = iRCCE_irecv_queue[source]; while(run->next != NULL) run = run->next; run->next = request; } if(request == &blocking_irecv_request) { iRCCE_irecv_wait(request); if (test) (*test) = 1; return(iRCCE_SUCCESS); } if (test) (*test) = 1; return(iRCCE_RESERVED); } } } else { source = request->source; if(request->finished) { if (test) (*test) = 1; return(iRCCE_SUCCESS); } if(iRCCE_irecv_queue[source] != request) { if (test) (*test) = 0; return(iRCCE_RESERVED); } iRCCE_push_recv_request(request); if(request->finished) { iRCCE_irecv_queue[source] = request->next; if (test) (*test) = 1; return(iRCCE_SUCCESS); } if (test) (*test) = 0; return(iRCCE_PENDING); } } //-------------------------------------------------------------------------------------- // FUNCTION: iRCCE_irecv_push //-------------------------------------------------------------------------------------- // progress function for pending requests in the irecv queue //-------------------------------------------------------------------------------------- static int iRCCE_irecv_push_source(int source) { iRCCE_RECV_REQUEST *request = iRCCE_irecv_queue[source]; if(request == NULL) { return(iRCCE_SUCCESS); } if(request->finished) { return(iRCCE_SUCCESS); } iRCCE_push_recv_request(request); if(request->finished) { iRCCE_irecv_queue[source] = request->next; return(iRCCE_SUCCESS); } return(iRCCE_PENDING); } int iRCCE_irecv_push(void) { iRCCE_RECV_REQUEST* help_request; // first check sourceless requests if( iRCCE_irecv_any_source_queue != NULL) { while( iRCCE_irecv_any_source_queue != NULL ) { iRCCE_irecv_any_source_queue->source = iRCCE_irecv_search_source(); if( iRCCE_irecv_any_source_queue->source == iRCCE_ANY_SOURCE ) { break; } // source found for first request in iRCCE_irecv_any_source_queue else { // set senders flag iRCCE_irecv_any_source_queue->sent = &RCCE_sent_flag[iRCCE_irecv_any_source_queue->source]; // take request out of irecv_any_source_queue help_request = iRCCE_irecv_any_source_queue; iRCCE_irecv_any_source_queue = iRCCE_irecv_any_source_queue->next; help_request->next = NULL; // put request into irecv_queue if(iRCCE_irecv_queue[help_request->source] == NULL) { iRCCE_irecv_queue[help_request->source] = help_request; } else { iRCCE_RECV_REQUEST *run = iRCCE_irecv_queue[help_request->source]; while(run->next != NULL) run = run->next; run->next = help_request; } } } } int i, j; int retval = iRCCE_SUCCESS; for(i=0; ifinished) { iRCCE_irecv_push(); iRCCE_isend_push(); } } else { do { iRCCE_isend_push(); } while( iRCCE_irecv_push() != iRCCE_SUCCESS ); } return(iRCCE_SUCCESS); } //-------------------------------------------------------------------------------------- // FUNCTION: iRCCE_irecv_cancel //-------------------------------------------------------------------------------------- // try to cancel a pending non-blocking recv request //-------------------------------------------------------------------------------------- int iRCCE_irecv_cancel(iRCCE_RECV_REQUEST *request, int *test) { int source; iRCCE_RECV_REQUEST *run; if( (request == NULL) || (request->finished) ) { if (test) (*test) = 0; return iRCCE_NOT_ENQUEUED; } // does request have any source specified? if( request->source == iRCCE_ANY_SOURCE ) { for( run = iRCCE_irecv_any_source_queue; run->next != NULL; run = run->next ) { if( run->next == request ) { run->next = run->next->next; if (test) (*test) = 1; return iRCCE_SUCCESS; } } if (test) (*test) = 0; return iRCCE_NOT_ENQUEUED; } source = request->source; if(iRCCE_irecv_queue[source] == NULL) { if (test) (*test) = 0; return iRCCE_NOT_ENQUEUED; } if(iRCCE_irecv_queue[source] == request) { // have parts of the message already been received? if(request->started) { if (test) (*test) = 0; return iRCCE_PENDING; } else { // no, thus request can be canceld just in time: iRCCE_irecv_queue[source] = request->next; if (test) (*test) = 1; return iRCCE_SUCCESS; } } for(run = iRCCE_irecv_queue[source]; run->next != NULL; run = run->next) { // request found --> remove it from recv queue: if(run->next == request) { run->next = run->next->next; if (test) (*test) = 1; return iRCCE_SUCCESS; } } if (test) (*test) = 0; return iRCCE_NOT_ENQUEUED; } #endif