SSTORE/CRCP: use ORTE_WAIT_FOR_COMPLETION with non-blocking receives
authorAdrian Reber <adrian.reber@hs-esslingen.de>
Tue, 28 Jan 2014 20:54:43 +0000 (21:54 +0100)
committerAdrian Reber <adrian.reber@hs-esslingen.de>
Wed, 29 Jan 2014 16:26:12 +0000 (17:26 +0100)
During the commits to make the C/R code compile again the
blocking receive calls were replaced by non-blocking
which broke the code. This patch uses ORTE_WAIT_FOR_COMPLETION()
to wait until the non-blocking calls have finished.

ompi/mca/crcp/bkmrk/crcp_bkmrk_pml.c
orte/mca/sstore/central/sstore_central_app.c
orte/mca/sstore/stage/sstore_stage_app.c

index f1c21a5..50f01cd 100644 (file)
@@ -5514,6 +5514,7 @@ static int do_send_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
 {
     int ret, exit_status = OMPI_SUCCESS;
     opal_buffer_t *buffer = NULL;
+    orte_rml_recv_cb_t *rb = NULL;
     int32_t recv_response = RECV_MATCH_RESP_ERROR;
     int32_t num_resolv = -1;
     int32_t p_total_found = -1;
@@ -5582,46 +5583,24 @@ static int do_send_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
         exit_status = OMPI_ERROR;
         goto cleanup;
     }
-        
-    if( NULL != buffer) {
-        OBJ_RELEASE(buffer);
-        buffer = NULL;
-    }
-            
-    /*
-     * Check return value from peer to see if we found a match.
-     */
-    if (NULL == (buffer = OBJ_NEW(opal_buffer_t))) {
-        exit_status = OMPI_ERROR;
-        goto cleanup;
-    }
-        
+
     /*
      * Recv the ACK msg
      */
-#ifdef ENABLE_FT_FIXED
-    /* This is the old, now broken code */
-    if ( 0 > (ret = ompi_rte_recv_buffer(&peer_ref->proc_name, buffer,
-                                         OMPI_CRCP_COORD_BOOKMARK_TAG, 0) ) ) {
-        opal_output(mca_crcp_bkmrk_component.super.output_handle,
-                    "crcp:bkmrk: do_send_msg_detail: %s --> %s Failed to receive ACK buffer from peer. Return %d\n",
-                    OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
-                    OMPI_NAME_PRINT(&(peer_ref->proc_name)),
-                    ret);
-        exit_status = ret;
-        goto cleanup;
-    }
-#endif /* ENABLE_FT_FIXED */
+    rb = OBJ_NEW(orte_rml_recv_cb_t);
+    rb->active = true;
     ompi_rte_recv_buffer_nb(&peer_ref->proc_name, OMPI_CRCP_COORD_BOOKMARK_TAG, 0,
-                            orte_rml_recv_callback, NULL);
+                            orte_rml_recv_callback, rb);
+    ORTE_WAIT_FOR_COMPLETION(rb->active);
 
-    UNPACK_BUFFER(buffer, recv_response, 1, OPAL_UINT32,
+    UNPACK_BUFFER(&rb->data, recv_response, 1, OPAL_UINT32,
                   "crcp:bkmrk: send_msg_details: Failed to unpack the ACK from peer buffer.");
-    UNPACK_BUFFER(buffer, num_resolv,  1, OPAL_UINT32,
+    UNPACK_BUFFER(&rb->data, num_resolv,  1, OPAL_UINT32,
                   "crcp:bkmrk: send_msg_details: Failed to unpack the num_resolv from peer buffer.");
-    UNPACK_BUFFER(buffer, p_total_found, 1, OPAL_UINT32,
+    UNPACK_BUFFER(&rb->data, p_total_found, 1, OPAL_UINT32,
                   "crcp:bkmrk: send_msg_details: Failed to unpack the total_found from peer buffer.");
-    
+
+    OBJ_RELEASE(rb);
     /* Mark message as matched */
     msg_ref->matched += num_resolv;
     *num_matches      = num_resolv;
@@ -5767,55 +5746,38 @@ static int do_recv_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
                               size_t *count, size_t *datatype_size,
                               int *p_num_sent)
 {
-    opal_buffer_t * buffer = NULL;
+    orte_rml_recv_cb_t *rb = NULL;
     int exit_status = OMPI_SUCCESS;
     int ret;
 
-    if (NULL == (buffer = OBJ_NEW(opal_buffer_t))) {
-        goto cleanup;
-    }
-
     /*
      * Recv the msg
      */
-#ifdef ENABLE_FT_FIXED
-    /* This is the old, now broken code */
-    if ( 0 > (ret = ompi_rte_recv_buffer(&peer_ref->proc_name, buffer, OMPI_CRCP_COORD_BOOKMARK_TAG, 0) ) ) {
-        opal_output(mca_crcp_bkmrk_component.super.output_handle,
-                    "crcp:bkmrk: do_recv_msg_detail: %s <-- %s Failed to receive buffer from peer. Return %d\n",
-                    OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
-                    OMPI_NAME_PRINT(&(peer_ref->proc_name)),
-                    ret);
-        exit_status = ret;
-        goto cleanup;
-    }
-#endif /* ENABLE_FT_FIXED */
-    ompi_rte_recv_buffer_nb(&peer_ref->proc_name, OMPI_CRCP_COORD_BOOKMARK_TAG, 0, orte_rml_recv_callback, NULL);
+    rb = OBJ_NEW(orte_rml_recv_cb_t);
+    rb->active = true;
+    ompi_rte_recv_buffer_nb(&peer_ref->proc_name, OMPI_CRCP_COORD_BOOKMARK_TAG, 0, orte_rml_recv_callback, rb);
+    ORTE_WAIT_FOR_COMPLETION(rb->active);
 
     /* Pull out the communicator ID */
-    UNPACK_BUFFER(buffer, (*comm_id), 1, OPAL_UINT32,
+    UNPACK_BUFFER(&rb->data, (*comm_id), 1, OPAL_UINT32,
                   "crcp:bkmrk: recv_msg_details: Failed to unpack the communicator ID");
-    UNPACK_BUFFER(buffer, (*rank), 1, OPAL_INT,
+    UNPACK_BUFFER(&rb->data, (*rank), 1, OPAL_INT,
                   "crcp:bkmrk: recv_msg_details: Failed to unpack the communicator rank ID");
-    
+
     /* Pull out the message details */
-    UNPACK_BUFFER(buffer, (*tag), 1, OPAL_INT,
+    UNPACK_BUFFER(&rb->data, (*tag), 1, OPAL_INT,
                   "crcp:bkmrk: recv_msg_details: Failed to unpack the tag");
-    UNPACK_BUFFER(buffer, (*count), 1, OPAL_SIZE,
+    UNPACK_BUFFER(&rb->data, (*count), 1, OPAL_SIZE,
                   "crcp:bkmrk: recv_msg_details: Failed to unpack the count");
-    UNPACK_BUFFER(buffer, (*datatype_size), 1, OPAL_SIZE,
+    UNPACK_BUFFER(&rb->data, (*datatype_size), 1, OPAL_SIZE,
                   "crcp:bkmrk: recv_msg_details: Failed to unpack the datatype size");
 
     /* Pull out the counts */
-    UNPACK_BUFFER(buffer, (*p_num_sent), 1, OPAL_INT,
+    UNPACK_BUFFER(&rb->data, (*p_num_sent), 1, OPAL_INT,
                   "crcp:bkmrk: recv_msg_details: Failed to unpack the sent count");
 
- cleanup:
-    if( NULL != buffer) {
-        OBJ_RELEASE(buffer);
-        buffer = NULL;
-    }
-
+cleanup:
+    OBJ_RELEASE(rb);
     return exit_status;
 }
 
index c815266..ac1736a 100644 (file)
@@ -443,6 +443,7 @@ static int pull_handle_info(orte_sstore_central_app_snapshot_info_t *handle_info
     orte_sstore_central_cmd_flag_t command;
     orte_std_cntr_t count;
     orte_sstore_base_handle_t loc_id;
+    orte_rml_recv_cb_t* rb = NULL;
 
     buffer = OBJ_NEW(opal_buffer_t);
 
@@ -470,40 +471,32 @@ static int pull_handle_info(orte_sstore_central_app_snapshot_info_t *handle_info
         goto cleanup;
     }
 
+    /* buffer should not be released here; the callback releases it */
+    buffer = NULL;
+
     /*
      * Receive the response
      */
-    buffer = OBJ_NEW(opal_buffer_t);
     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
                          "sstore:central:(app): pull() from %s -> %s",
                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
                          ORTE_NAME_PRINT(ORTE_PROC_MY_DAEMON)));
-#ifdef ENABLE_FT_FIXED
-    /* This is the old, now broken code */
-    if( ORTE_SUCCESS != (ret = orte_rml.recv_buffer(ORTE_PROC_MY_DAEMON,
-                                                    &buffer,
-                                                    ORTE_RML_TAG_SSTORE_INTERNAL,
-                                                    0)) ) {
-        ORTE_ERROR_LOG(ret);
-        exit_status = ret;
-        goto cleanup;
-    }
-#endif /* ENABLE_FT_FIXED */
 
+    rb = OBJ_NEW(orte_rml_recv_cb_t);
+    rb->active = true;
     orte_rml.recv_buffer_nb(ORTE_PROC_MY_DAEMON, ORTE_RML_TAG_SSTORE_INTERNAL,
-                            0, orte_rml_recv_callback, NULL);
-
-    /* wait for completion */
+                            0, orte_rml_recv_callback, rb);
+    ORTE_WAIT_FOR_COMPLETION(rb->active);
 
     count = 1;
-    if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SSTORE_CENTRAL_CMD))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SSTORE_CENTRAL_CMD))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
     }
 
     count = 1;
-    if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_id, &count, ORTE_SSTORE_HANDLE))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &loc_id, &count, ORTE_SSTORE_HANDLE))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
@@ -513,28 +506,28 @@ static int pull_handle_info(orte_sstore_central_app_snapshot_info_t *handle_info
     }
 
     count = 1;
-    if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->seq_num), &count, OPAL_INT))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->seq_num), &count, OPAL_INT))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
     }
 
     count = 1;
-    if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->global_ref_name), &count, OPAL_STRING))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->global_ref_name), &count, OPAL_STRING))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
     }
 
     count = 1;
-    if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->local_location), &count, OPAL_STRING))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->local_location), &count, OPAL_STRING))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
     }
 
     count = 1;
-    if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->metadata_filename), &count, OPAL_STRING))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->metadata_filename), &count, OPAL_STRING))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
@@ -553,6 +546,10 @@ static int pull_handle_info(orte_sstore_central_app_snapshot_info_t *handle_info
         OBJ_RELEASE(buffer);
         buffer = NULL;
     }
+    if (NULL != rb) {
+        OBJ_RELEASE(rb);
+        buffer = NULL;
+    }
 
     return exit_status;
 }
index 390a4c9..cecd5b0 100644 (file)
@@ -432,6 +432,7 @@ static int pull_handle_info(orte_sstore_stage_app_snapshot_info_t *handle_info )
     orte_sstore_stage_cmd_flag_t command;
     orte_std_cntr_t count;
     orte_sstore_base_handle_t loc_id;
+    orte_rml_recv_cb_t *rb = NULL;
 
     buffer = OBJ_NEW(opal_buffer_t);
 
@@ -459,37 +460,32 @@ static int pull_handle_info(orte_sstore_stage_app_snapshot_info_t *handle_info )
         goto cleanup;
     }
 
+    /* buffer should not be released here; the callback releases it */
+    buffer = NULL;
+
     /*
      * Receive the response
      */
-    buffer = OBJ_NEW(opal_buffer_t);
     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
                          "sstore:stage:(app): pull() from %s -> %s",
                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
                          ORTE_NAME_PRINT(ORTE_PROC_MY_DAEMON)));
-#ifdef ENABLE_FT_FIXED
-    /* This is the old, now broken code */
-    if( ORTE_SUCCESS != (ret = orte_rml.recv_buffer(ORTE_PROC_MY_DAEMON,
-                                                    &buffer,
-                                                    ORTE_RML_TAG_SSTORE_INTERNAL,
-                                                    0)) ) {
-        ORTE_ERROR_LOG(ret);
-        exit_status = ret;
-        goto cleanup;
-    }
 
-#endif /* ENABLE_FT_FIXED */
+    rb = OBJ_NEW(orte_rml_recv_cb_t);
+    rb->active = true;
     orte_rml.recv_buffer_nb(ORTE_PROC_MY_DAEMON, ORTE_RML_TAG_SSTORE_INTERNAL,
-                            0, orte_rml_recv_callback, NULL);
+                            0, orte_rml_recv_callback, rb);
+    ORTE_WAIT_FOR_COMPLETION(rb->active);
+
     count = 1;
-    if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SSTORE_STAGE_CMD))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SSTORE_STAGE_CMD))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
     }
 
     count = 1;
-    if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_id, &count, ORTE_SSTORE_HANDLE))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &loc_id, &count, ORTE_SSTORE_HANDLE))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
@@ -499,28 +495,28 @@ static int pull_handle_info(orte_sstore_stage_app_snapshot_info_t *handle_info )
     }
 
     count = 1;
-    if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->seq_num), &count, OPAL_INT))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->seq_num), &count, OPAL_INT))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
     }
 
     count = 1;
-    if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->global_ref_name), &count, OPAL_STRING))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->global_ref_name), &count, OPAL_STRING))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
     }
 
     count = 1;
-    if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->local_location), &count, OPAL_STRING))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->local_location), &count, OPAL_STRING))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
     }
 
     count = 1;
-    if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->metadata_filename), &count, OPAL_STRING))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->metadata_filename), &count, OPAL_STRING))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
@@ -531,6 +527,10 @@ static int pull_handle_info(orte_sstore_stage_app_snapshot_info_t *handle_info )
         OBJ_RELEASE(buffer);
         buffer = NULL;
     }
+    if (NULL != rb) {
+        OBJ_RELEASE(rb);
+        buffer = NULL;
+    }
 
     return exit_status;
 }