use ORTE_WAIT_FOR_COMPLETION with non-blocking receives
authorAdrian Reber <adrian.reber@hs-esslingen.de>
Mon, 27 Jan 2014 13:31:36 +0000 (14:31 +0100)
committerAdrian Reber <adrian.reber@hs-esslingen.de>
Mon, 27 Jan 2014 15:32:53 +0000 (16:32 +0100)
During the commits to make the C/R code compile again the
blocking receive calls in snapc_full_app.c were
replaced by non-blocking receive calls with a dummy callback
function. This commit adds ORTE_WAIT_FOR_COMPLETION()
after each non-blocking receive to wait for the data.

orte/mca/snapc/full/snapc_full_app.c

index 4edac63..67f4eba 100644 (file)
@@ -93,16 +93,15 @@ static bool currently_all_migrating = false;
 
 static bool currently_checkpointing = false;
 static int  current_unique_id = 0;
+static bool cb_done = false;
 
 static int current_cr_state = OPAL_CRS_NONE;
 
 static orte_sstore_base_handle_t current_ss_handle = ORTE_SSTORE_HANDLE_INVALID, last_ss_handle = ORTE_SSTORE_HANDLE_INVALID;
 static opal_crs_base_ckpt_options_t *current_options = NULL;
 
-static void snapc_full_app_callback_recv(int status,
-                                         orte_process_name_t* sender,
-                                         opal_buffer_t* buffer,
-                                         orte_rml_tag_t tag,
+static void snapc_full_app_callback_recv(int status, orte_process_name_t* sender,
+                                         opal_buffer_t* buffer, orte_rml_tag_t tag,
                                          void* cbdata);
 
 /************************
@@ -289,18 +288,10 @@ int app_coord_finalize()
          * need to wait until the checkpoint is finished before finishing.
          */
         buffer = OBJ_NEW(opal_buffer_t);
-#ifdef ENABLE_FT_FIXED
-        /* This is the old, now broken code */
-        if (0 > (ret = orte_rml.recv_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_SNAPC_FULL, 0))) {
-            ORTE_ERROR_LOG(ret);
-            exit_status = ret;
-            OBJ_DESTRUCT(&buffer);
-            goto cleanup;
-        }
-#endif /* ENABLE_FT_FIXED */
+        cb_done = false;
         orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, buffer);
+        ORTE_WAIT_FOR_COMPLETION(cb_done);
 
-        /* wait for completion */
         count = 1;
         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
             ORTE_ERROR_LOG(ret);
@@ -1540,17 +1531,9 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
             /*
              * Wait for a response regarding completion
              */
-#ifdef ENABLE_FT_FIXED
-            /* This is the old, now broken code */
-            if (0 > (ret = orte_rml.recv_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_SNAPC_FULL, 0))) {
-                ORTE_ERROR_LOG(ret);
-                exit_status = ret;
-                OBJ_DESTRUCT(&buffer);
-                goto cleanup;
-            }
-#endif /* ENABLE_FT_FIXED */
+            cb_done = false;
             orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, buffer);
-            /* wait for completion */
+            ORTE_WAIT_FOR_COMPLETION(cb_done);
 
             count = 1;
             if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
@@ -1613,17 +1596,9 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
             /*
              * Wait for a response regarding completion
              */
-#ifdef ENABLE_FT_FIXED
-            /* This is the old, now broken code */
-            if (0 > (ret = orte_rml.recv_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_SNAPC_FULL, 0))) {
-                ORTE_ERROR_LOG(ret);
-                exit_status = ret;
-                OBJ_DESTRUCT(&buffer);
-                goto cleanup;
-            }
-#endif /* ENABLE_FT_FIXED */
+            cb_done = false;
             orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, buffer);
-            /* wait for completion */
+            ORTE_WAIT_FOR_COMPLETION(cb_done);
 
             count = 1;
             if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
@@ -1697,11 +1672,14 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
     return exit_status;
 }
 
-/* dummy implementation of a callback function to get it to compile again */
-static void snapc_full_app_callback_recv(int status,
-                                         orte_process_name_t* sender,
-                                         opal_buffer_t* buffer,
-                                         orte_rml_tag_t tag,
+static void snapc_full_app_callback_recv(int status, orte_process_name_t* sender,
+                                         opal_buffer_t* buffer, orte_rml_tag_t tag,
                                          void* cbdata)
 {
+    OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
+                         "App) SNAPC full receive callback\n"));
+
+    opal_dss.copy_payload(cbdata, buffer);
+    /* release buffer ??? */
+    cb_done = true;
 }