SNAPC: use ORTE_WAIT_FOR_COMPLETION with non-blocking receives
authorAdrian Reber <adrian.reber@hs-esslingen.de>
Mon, 27 Jan 2014 13:31:36 +0000 (14:31 +0100)
committerAdrian Reber <adrian.reber@hs-esslingen.de>
Wed, 29 Jan 2014 16:26:49 +0000 (17:26 +0100)
During the commits to make the C/R code compile again the
blocking receive calls in snapc_full_app.c were
replaced by non-blocking receive calls with a dummy callback
function. This commit adds ORTE_WAIT_FOR_COMPLETION()
after each non-blocking receive to wait for the data.

orte/mca/snapc/full/snapc_full_app.c

index db8ad02..8a31f41 100644 (file)
@@ -99,12 +99,6 @@ static int current_cr_state = OPAL_CRS_NONE;
 static orte_sstore_base_handle_t current_ss_handle = ORTE_SSTORE_HANDLE_INVALID, last_ss_handle = ORTE_SSTORE_HANDLE_INVALID;
 static opal_crs_base_ckpt_options_t *current_options = NULL;
 
-static void snapc_full_app_callback_recv(int status,
-                                         orte_process_name_t* sender,
-                                         opal_buffer_t* buffer,
-                                         orte_rml_tag_t tag,
-                                         void* cbdata);
-
 /************************
  * Function Definitions
  ************************/
@@ -224,6 +218,7 @@ int app_coord_finalize()
     opal_buffer_t *buffer = NULL;
     orte_std_cntr_t count;
     orte_grpcomm_collective_t *coll;
+    orte_rml_recv_cb_t *rb = NULL;
 
     /*
      * All processes must sync here, so the Global coordinator can know that
@@ -281,6 +276,9 @@ int app_coord_finalize()
             goto cleanup;
         }
 
+        /* buffer should not be released here; the callback releases it */
+        buffer = NULL;
+
         OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
                              "app) Shutdown Barrier: Waiting on FIN_ACK...!"));
 
@@ -288,28 +286,20 @@ int app_coord_finalize()
          * We could have been checkpointing just as we entered finalize, so we
          * need to wait until the checkpoint is finished before finishing.
          */
-        buffer = OBJ_NEW(opal_buffer_t);
-#ifdef ENABLE_FT_FIXED
-        /* This is the old, now broken code */
-        if (0 > (ret = orte_rml.recv_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_SNAPC_FULL, 0))) {
-            ORTE_ERROR_LOG(ret);
-            exit_status = ret;
-            OBJ_DESTRUCT(&buffer);
-            goto cleanup;
-        }
-#endif /* ENABLE_FT_FIXED */
-        orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, buffer);
+        rb = OBJ_NEW(orte_rml_recv_cb_t);
+        rb->active = true;
+        orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, orte_rml_recv_callback, rb);
+        ORTE_WAIT_FOR_COMPLETION(rb->active);
 
-        /* wait for completion */
         count = 1;
-        if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
+        if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SNAPC_FULL_CMD))) {
             ORTE_ERROR_LOG(ret);
             exit_status = ret;
             goto cleanup;
         }
 
         count = 1;
-        if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_event, &count, OPAL_INT))) {
+        if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_event, &count, OPAL_INT))) {
             ORTE_ERROR_LOG(ret);
             exit_status = ret;
             goto cleanup;
@@ -337,6 +327,10 @@ int app_coord_finalize()
         OBJ_RELEASE(buffer);
         buffer = NULL;
     }
+    if (NULL != rb) {
+        OBJ_RELEASE(rb);
+        rb = NULL;
+    }
 
     OBJ_RELEASE(coll);
 
@@ -1301,6 +1295,7 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
     orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
     opal_buffer_t *buffer = NULL;
     orte_std_cntr_t count;
+    orte_rml_recv_cb_t *rb = NULL;
     int op_event, op_state;
     char *seq_str = NULL, *tmp_str = NULL;
     int cr_state = OPAL_CRS_CONTINUE;
@@ -1535,39 +1530,30 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
                 goto cleanup;
             }
 
-            buffer = OBJ_NEW(opal_buffer_t);
-
             /*
              * Wait for a response regarding completion
              */
-#ifdef ENABLE_FT_FIXED
-            /* This is the old, now broken code */
-            if (0 > (ret = orte_rml.recv_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_SNAPC_FULL, 0))) {
-                ORTE_ERROR_LOG(ret);
-                exit_status = ret;
-                OBJ_DESTRUCT(&buffer);
-                goto cleanup;
-            }
-#endif /* ENABLE_FT_FIXED */
-            orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, buffer);
-            /* wait for completion */
+            rb = OBJ_NEW(orte_rml_recv_cb_t);
+            rb->active = true;
+            orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, orte_rml_recv_callback, rb);
+            ORTE_WAIT_FOR_COMPLETION(rb->active);
 
             count = 1;
-            if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
+            if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SNAPC_FULL_CMD))) {
                 ORTE_ERROR_LOG(ret);
                 exit_status = ret;
                 goto cleanup;
             }
 
             count = 1;
-            if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_event, &count, OPAL_INT))) {
+            if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_event, &count, OPAL_INT))) {
                 ORTE_ERROR_LOG(ret);
                 exit_status = ret;
                 goto cleanup;
             }
 
             count = 1;
-            if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_state, &count, OPAL_INT))) {
+            if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_state, &count, OPAL_INT))) {
                 ORTE_ERROR_LOG(ret);
                 exit_status = ret;
                 goto cleanup;
@@ -1608,39 +1594,31 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
                                  "App) Request_op: Leader waiting for Migrate release (%3d)...",
                                  datum->event));
 
-            buffer = OBJ_NEW(opal_buffer_t);
 
             /*
              * Wait for a response regarding completion
              */
-#ifdef ENABLE_FT_FIXED
-            /* This is the old, now broken code */
-            if (0 > (ret = orte_rml.recv_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_SNAPC_FULL, 0))) {
-                ORTE_ERROR_LOG(ret);
-                exit_status = ret;
-                OBJ_DESTRUCT(&buffer);
-                goto cleanup;
-            }
-#endif /* ENABLE_FT_FIXED */
-            orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, buffer);
-            /* wait for completion */
+            rb = OBJ_NEW(orte_rml_recv_cb_t);
+            rb->active = true;
+            orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, orte_rml_recv_callback, rb);
+            ORTE_WAIT_FOR_COMPLETION(rb->active);
 
             count = 1;
-            if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
+            if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SNAPC_FULL_CMD))) {
                 ORTE_ERROR_LOG(ret);
                 exit_status = ret;
                 goto cleanup;
             }
 
             count = 1;
-            if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_event, &count, OPAL_INT))) {
+            if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_event, &count, OPAL_INT))) {
                 ORTE_ERROR_LOG(ret);
                 exit_status = ret;
                 goto cleanup;
             }
 
             count = 1;
-            if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_state, &count, OPAL_INT))) {
+            if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_state, &count, OPAL_INT))) {
                 ORTE_ERROR_LOG(ret);
                 exit_status = ret;
                 goto cleanup;
@@ -1683,6 +1661,10 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
         OBJ_RELEASE(buffer);
         buffer = NULL;
     }
+    if (NULL != rb) {
+        OBJ_RELEASE(rb);
+        rb = NULL;
+    }
 
     if( NULL != seq_str ) {
         free(seq_str);
@@ -1696,12 +1678,3 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
 
     return exit_status;
 }
-
-/* dummy implementation of a callback function to get it to compile again */
-static void snapc_full_app_callback_recv(int status,
-                                         orte_process_name_t* sender,
-                                         opal_buffer_t* buffer,
-                                         orte_rml_tag_t tag,
-                                         void* cbdata)
-{
-}