SNAPC: use dynamic buffers for rml.send and rml.recv
authorAdrian Reber <adrian.reber@hs-esslingen.de>
Sun, 26 Jan 2014 11:10:41 +0000 (12:10 +0100)
committerAdrian Reber <adrian.reber@hs-esslingen.de>
Mon, 27 Jan 2014 14:53:23 +0000 (15:53 +0100)
The snapc component was still using static buffers
for send_buffer_nb(). This patch changes opal_buffer_t buffer;
to opal_buffer_t *buffer;

orte/mca/snapc/full/snapc_full_app.c
orte/mca/snapc/full/snapc_full_global.c
orte/mca/snapc/full/snapc_full_local.c

index 0f0f147..4edac63 100644 (file)
@@ -221,7 +221,7 @@ int app_coord_finalize()
     int ret, exit_status = ORTE_SUCCESS;
     orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
     orte_snapc_base_request_op_event_t op_event = ORTE_SNAPC_OP_FIN;
-    opal_buffer_t *buffer;
+    opal_buffer_t *buffer = NULL;
     orte_std_cntr_t count;
     orte_grpcomm_collective_t *coll;
 
@@ -298,17 +298,18 @@ int app_coord_finalize()
             goto cleanup;
         }
 #endif /* ENABLE_FT_FIXED */
-        orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, NULL);
+        orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, buffer);
 
+        /* wait for completion */
         count = 1;
-        if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
+        if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
             ORTE_ERROR_LOG(ret);
             exit_status = ret;
             goto cleanup;
         }
 
         count = 1;
-        if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &op_event, &count, OPAL_INT))) {
+        if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_event, &count, OPAL_INT))) {
             ORTE_ERROR_LOG(ret);
             exit_status = ret;
             goto cleanup;
@@ -332,6 +333,11 @@ int app_coord_finalize()
 
  cleanup:
     /* cleanup */
+    if (NULL != buffer) {
+        OBJ_RELEASE(buffer);
+        buffer = NULL;
+    }
+
     OBJ_RELEASE(coll);
 
     /*
@@ -347,7 +353,7 @@ int app_coord_finalize()
         app_comm_pipe_w = NULL;
     }
 
-    return ORTE_SUCCESS;
+    return exit_status;
 }
 
 /******************
@@ -821,24 +827,24 @@ static int app_notify_resp_stage_3(int cr_state, bool skip_fin_msg)
 
 static int snapc_full_app_finished_msg(int cr_state) {
     int ret, exit_status = ORTE_SUCCESS;
-    opal_buffer_t buffer;
+    opal_buffer_t *buffer;
     orte_snapc_cmd_flag_t command = ORTE_SNAPC_LOCAL_FINISH_CMD;
 
-    OBJ_CONSTRUCT(&buffer, opal_buffer_t);
+    buffer = OBJ_NEW(opal_buffer_t);
 
-    if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_CMD )) ) {
+    if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_CMD )) ) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
     }
 
-    if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &cr_state, 1, OPAL_INT))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &cr_state, 1, OPAL_INT))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
     }
 
-    if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, &buffer,
+    if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
                                                        ORTE_RML_TAG_SNAPC,
                                                        orte_rml_send_callback, 0))) {
         ORTE_ERROR_LOG(ret);
@@ -846,8 +852,9 @@ static int snapc_full_app_finished_msg(int cr_state) {
         goto cleanup;
     }
 
+    return ORTE_SUCCESS;
  cleanup:
-    OBJ_DESTRUCT(&buffer);
+    OBJ_RELEASE(buffer);
 
     return exit_status;
 }
@@ -1241,39 +1248,39 @@ int app_coord_ft_event(int state) {
 static int snapc_full_app_ft_event_update_process_info(orte_process_name_t proc, pid_t proc_pid)
 {
     int ret, exit_status = ORTE_SUCCESS;
-    opal_buffer_t buffer;
+    opal_buffer_t *buffer;
     orte_snapc_cmd_flag_t command = ORTE_SNAPC_LOCAL_UPDATE_CMD;
 
-    OBJ_CONSTRUCT(&buffer, opal_buffer_t);
+    buffer = OBJ_NEW(opal_buffer_t);
 
-    if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_CMD )) ) {
+    if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_CMD )) ) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
     }
 
     /* JJH CLEANUP: Do we really need this, it is equal to sender */
-    if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &proc, 1, ORTE_NAME))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &proc, 1, ORTE_NAME))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
     }
 
-    if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &proc_pid, 1, OPAL_PID))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &proc_pid, 1, OPAL_PID))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
     }
 
 #if OPAL_ENABLE_CRDEBUG == 1
-    if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &MPIR_debug_with_checkpoint, 1, OPAL_BOOL))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &MPIR_debug_with_checkpoint, 1, OPAL_BOOL))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
     }
 #endif
 
-    if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, &buffer,
+    if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
                                                        ORTE_RML_TAG_SNAPC,
                                                        orte_rml_send_callback, 0))) {
         ORTE_ERROR_LOG(ret);
@@ -1281,8 +1288,9 @@ static int snapc_full_app_ft_event_update_process_info(orte_process_name_t proc,
         goto cleanup;
     }
 
+    return ORTE_SUCCESS;
  cleanup:
-    OBJ_DESTRUCT(&buffer);
+    OBJ_RELEASE(buffer);
 
     return exit_status;
 }
@@ -1291,7 +1299,7 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
 {
     int ret, exit_status = ORTE_SUCCESS;
     orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
-    opal_buffer_t buffer;
+    opal_buffer_t *buffer;
     orte_std_cntr_t count;
     int op_event, op_state;
     char *seq_str = NULL, *tmp_str = NULL;
@@ -1376,39 +1384,34 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
         /*
          * Send request to HNP
          */
-        OBJ_CONSTRUCT(&buffer, opal_buffer_t);
+        buffer = OBJ_NEW(opal_buffer_t);
 
-        if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
+        if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
             ORTE_ERROR_LOG(ret);
             exit_status = ret;
-            OBJ_DESTRUCT(&buffer);
             goto cleanup;
         }
-        if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(ORTE_PROC_MY_NAME->jobid), 1, ORTE_JOBID))) {
+        if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(ORTE_PROC_MY_NAME->jobid), 1, ORTE_JOBID))) {
             ORTE_ERROR_LOG(ret);
             exit_status = ret;
-            OBJ_DESTRUCT(&buffer);
             goto cleanup;
         }
 
-        if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(datum->event), 1, OPAL_INT))) {
+        if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->event), 1, OPAL_INT))) {
             ORTE_ERROR_LOG(ret);
             exit_status = ret;
-            OBJ_DESTRUCT(&buffer);
             goto cleanup;
         }
 
         if( ORTE_SNAPC_OP_RESTART == datum->event) {
-            if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(datum->seq_num), 1, OPAL_INT))) {
+            if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->seq_num), 1, OPAL_INT))) {
                 ORTE_ERROR_LOG(ret);
                 exit_status = ret;
-                OBJ_DESTRUCT(&buffer);
                 goto cleanup;
             }
-            if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(datum->global_handle), 1, OPAL_STRING))) {
+            if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->global_handle), 1, OPAL_STRING))) {
                 ORTE_ERROR_LOG(ret);
                 exit_status = ret;
-                OBJ_DESTRUCT(&buffer);
                 goto cleanup;
             }
         }
@@ -1438,10 +1441,9 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
             /*
              * Send information
              */
-            if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(datum->mig_num), 1, OPAL_INT))) {
+            if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->mig_num), 1, OPAL_INT))) {
                 ORTE_ERROR_LOG(ret);
                 exit_status = ret;
-                OBJ_DESTRUCT(&buffer);
                 goto cleanup;
             }
 
@@ -1455,17 +1457,15 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
                                      (OPAL_INT_TO_BOOL((datum->mig_off_node)[i]) ? 'T' : 'F')
                                      ));
 
-                if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &((datum->mig_vpids)[i]), 1, OPAL_INT))) {
+                if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &((datum->mig_vpids)[i]), 1, OPAL_INT))) {
                     ORTE_ERROR_LOG(ret);
                     exit_status = ret;
-                    OBJ_DESTRUCT(&buffer);
                     goto cleanup;
                 }
                 tmp_str = strdup((datum->mig_host_pref)[i]);
-                if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &tmp_str, 1, OPAL_STRING))) {
+                if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &tmp_str, 1, OPAL_STRING))) {
                     ORTE_ERROR_LOG(ret);
                     exit_status = ret;
-                    OBJ_DESTRUCT(&buffer);
                     goto cleanup;
                 }
                 if( NULL != tmp_str ) {
@@ -1473,31 +1473,27 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
                     tmp_str = NULL;
                 }
 
-                if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &((datum->mig_vpid_pref)[i]), 1, OPAL_INT))) {
+                if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &((datum->mig_vpid_pref)[i]), 1, OPAL_INT))) {
                     ORTE_ERROR_LOG(ret);
                     exit_status = ret;
-                    OBJ_DESTRUCT(&buffer);
                     goto cleanup;
                 }
-                if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &((datum->mig_off_node)[i]), 1, OPAL_INT))) {
+                if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &((datum->mig_off_node)[i]), 1, OPAL_INT))) {
                     ORTE_ERROR_LOG(ret);
                     exit_status = ret;
-                    OBJ_DESTRUCT(&buffer);
                     goto cleanup;
                 }
             }
         }
 
-        if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, &buffer,
-                                                           ORTE_RML_TAG_SNAPC_FULL,
+        if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer, ORTE_RML_TAG_SNAPC_FULL,
                                                            orte_rml_send_callback, 0))) {
             ORTE_ERROR_LOG(ret);
             exit_status = ret;
-            OBJ_DESTRUCT(&buffer);
             goto cleanup;
         }
-
-        OBJ_DESTRUCT(&buffer);
+        /* buffer should not be released here; the callback releases it */
+        buffer = NULL;
     }
 
     /*
@@ -1539,7 +1535,7 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
                 goto cleanup;
             }
 
-            OBJ_CONSTRUCT(&buffer, opal_buffer_t);
+            buffer = OBJ_NEW(opal_buffer_t);
 
             /*
              * Wait for a response regarding completion
@@ -1553,31 +1549,30 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
                 goto cleanup;
             }
 #endif /* ENABLE_FT_FIXED */
-            orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, NULL);
+            orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, buffer);
+            /* wait for completion */
 
             count = 1;
-            if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
+            if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
                 ORTE_ERROR_LOG(ret);
                 exit_status = ret;
                 goto cleanup;
             }
 
             count = 1;
-            if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &op_event, &count, OPAL_INT))) {
+            if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_event, &count, OPAL_INT))) {
                 ORTE_ERROR_LOG(ret);
                 exit_status = ret;
                 goto cleanup;
             }
 
             count = 1;
-            if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &op_state, &count, OPAL_INT))) {
+            if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_state, &count, OPAL_INT))) {
                 ORTE_ERROR_LOG(ret);
                 exit_status = ret;
                 goto cleanup;
             }
 
-            OBJ_DESTRUCT(&buffer);
-
             orte_sstore.get_attr(last_ss_handle,
                                  SSTORE_METADATA_GLOBAL_SNAP_SEQ,
                                  &seq_str);
@@ -1613,7 +1608,7 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
                                  "App) Request_op: Leader waiting for Migrate release (%3d)...",
                                  datum->event));
 
-            OBJ_CONSTRUCT(&buffer, opal_buffer_t);
+            buffer = OBJ_NEW(opal_buffer_t);
 
             /*
              * Wait for a response regarding completion
@@ -1627,31 +1622,30 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
                 goto cleanup;
             }
 #endif /* ENABLE_FT_FIXED */
-            orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, NULL);
+            orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, buffer);
+            /* wait for completion */
 
             count = 1;
-            if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
+            if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
                 ORTE_ERROR_LOG(ret);
                 exit_status = ret;
                 goto cleanup;
             }
 
             count = 1;
-            if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &op_event, &count, OPAL_INT))) {
+            if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_event, &count, OPAL_INT))) {
                 ORTE_ERROR_LOG(ret);
                 exit_status = ret;
                 goto cleanup;
             }
 
             count = 1;
-            if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &op_state, &count, OPAL_INT))) {
+            if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_state, &count, OPAL_INT))) {
                 ORTE_ERROR_LOG(ret);
                 exit_status = ret;
                 goto cleanup;
             }
 
-            OBJ_DESTRUCT(&buffer);
-
             OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
                                  "App) Request_op: Leader continuing from Migration (%3d)...",
                                  datum->event));
@@ -1685,6 +1679,11 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
 
 
  cleanup:
+    if (NULL != buffer) {
+        OBJ_RELEASE(buffer);
+        buffer = NULL;
+    }
+
     if( NULL != seq_str ) {
         free(seq_str);
         seq_str = NULL;
index 0422aac..4e8f3b1 100644 (file)
@@ -1170,7 +1170,7 @@ static void snapc_full_process_request_op_cmd(orte_process_name_t* sender,
     orte_jobid_t jobid;
     int op_event, op_state;
     opal_crs_base_ckpt_options_t *options = NULL;
-    opal_buffer_t buffer;
+    opal_buffer_t *buffer = NULL;
     orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
     int seq_num = -1, i;
     char * global_handle = NULL, *tmp_str = NULL;
@@ -1231,26 +1231,25 @@ static void snapc_full_process_request_op_cmd(orte_process_name_t* sender,
         OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
                              "Global) process_request_op(): Send Finalize ACK to the job"));
 
-        OBJ_CONSTRUCT(&buffer, opal_buffer_t);
-        if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
+        buffer = OBJ_NEW(opal_buffer_t);
+        if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
             ORTE_ERROR_LOG(ret);
             goto cleanup;
         }
 
         op_event = ORTE_SNAPC_OP_FIN_ACK;
-        if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &op_event, 1, OPAL_INT))) {
+        if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &op_event, 1, OPAL_INT))) {
             ORTE_ERROR_LOG(ret);
             goto cleanup;
         }
 
-        if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(sender, &buffer,
-                                                           ORTE_RML_TAG_SNAPC_FULL,
+        if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(sender, buffer, ORTE_RML_TAG_SNAPC_FULL,
                                                            orte_rml_send_callback, NULL))) {
             ORTE_ERROR_LOG(ret);
-            /* FIXME: buffer not cleaned up */
             goto cleanup;
         }
-        OBJ_DESTRUCT(&buffer);
+        /* buffer should not be released here; the callback releases it */
+        buffer = NULL;
     }
     /************************************
      * Start a checkpoint operation
@@ -1283,30 +1282,29 @@ static void snapc_full_process_request_op_cmd(orte_process_name_t* sender,
         /*
          * Tell the sender that the operation is finished
          */
-        OBJ_CONSTRUCT(&buffer, opal_buffer_t);
-        if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
+        buffer = OBJ_NEW(opal_buffer_t);
+        if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
             ORTE_ERROR_LOG(ret);
             goto cleanup;
         }
 
-        if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &op_event, 1, OPAL_INT))) {
+        if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &op_event, 1, OPAL_INT))) {
             ORTE_ERROR_LOG(ret);
             goto cleanup;
         }
 
-        if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &op_state, 1, OPAL_INT))) {
+        if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &op_state, 1, OPAL_INT))) {
             ORTE_ERROR_LOG(ret);
             goto cleanup;
         }
 
-        if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(sender, &buffer,
-                                                           ORTE_RML_TAG_SNAPC_FULL,
+        if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(sender, buffer, ORTE_RML_TAG_SNAPC_FULL,
                                                            orte_rml_send_callback, NULL))) {
             ORTE_ERROR_LOG(ret);
-            /* FIXME: buffer not cleaned up */
             goto cleanup;
         }
-        OBJ_DESTRUCT(&buffer);
+        /* buffer should not be released here; the callback releases it */
+        buffer = NULL;
     }
     /************************************
      * Start the Restart operation
@@ -1426,31 +1424,28 @@ static void snapc_full_process_request_op_cmd(orte_process_name_t* sender,
         OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
                              "Global) ------ Finished Migration. Release processes (%15s )-----",
                              ORTE_NAME_PRINT(sender) ));
-        OBJ_CONSTRUCT(&buffer, opal_buffer_t);
-        if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
+        buffer = OBJ_NEW(opal_buffer_t);
+        if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
             ORTE_ERROR_LOG(ret);
             goto cleanup;
         }
 
-        if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &op_event, 1, OPAL_INT))) {
+        if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &op_event, 1, OPAL_INT))) {
             ORTE_ERROR_LOG(ret);
             goto cleanup;
         }
 
         op_state = 0;
-        if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &op_state, 1, OPAL_INT))) {
+        if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &op_state, 1, OPAL_INT))) {
             ORTE_ERROR_LOG(ret);
             goto cleanup;
         }
 
-        if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(sender, &buffer,
-                                                           ORTE_RML_TAG_SNAPC_FULL,
+        if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(sender, buffer, ORTE_RML_TAG_SNAPC_FULL,
                                                            orte_rml_send_callback, NULL))) {
             ORTE_ERROR_LOG(ret);
-            /* FIXME: buffer not cleaned up */
             goto cleanup;
         }
-        OBJ_DESTRUCT(&buffer);
 
         OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
                              "Global) ------ Finished Migration. Released processes (%15s )-----",
@@ -1504,8 +1499,12 @@ static void snapc_full_process_request_op_cmd(orte_process_name_t* sender,
                              op_event));
     }
 
+cleanup:
+    if (NULL != buffer) {
+        OBJ_RELEASE(buffer);
+        buffer = NULL;
+    }
 
- cleanup:
     if( NULL != options ) {
         OBJ_RELEASE(options);
         options = NULL;
@@ -2105,7 +2104,7 @@ static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
 {
     int ret, exit_status = ORTE_SUCCESS;
     orte_snapc_full_cmd_flag_t command;
-    opal_buffer_t buffer;
+    opal_buffer_t *buffer = NULL;
     char * state_str = NULL;
     orte_proc_t *proc = NULL;
     opal_list_item_t *item = NULL;
@@ -2114,7 +2113,7 @@ static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
     /*
      * Update all Local Coordinators (broadcast operation)
      */
-    OBJ_CONSTRUCT(&buffer, opal_buffer_t);
+    buffer = OBJ_NEW(opal_buffer_t);
 
     if( quick ) {
         command = ORTE_SNAPC_FULL_UPDATE_JOB_STATE_QUICK_CMD;
@@ -2122,19 +2121,19 @@ static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
         command = ORTE_SNAPC_FULL_UPDATE_JOB_STATE_CMD;
     }
 
-    if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
     }
 
-    if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &jobid, 1, ORTE_JOBID))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &jobid, 1, ORTE_JOBID))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
     }
 
-    if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &ckpt_state, 1, OPAL_INT))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &ckpt_state, 1, OPAL_INT))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
@@ -2144,19 +2143,19 @@ static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
         goto process_msg;
     }
 
-    if (ORTE_SUCCESS != (ret = orte_sstore.pack_handle(NULL, &buffer, handle))) {
+    if (ORTE_SUCCESS != (ret = orte_sstore.pack_handle(NULL, buffer, handle))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
     }
 
-    if( ORTE_SUCCESS != (ret = orte_snapc_base_pack_options(&buffer, options)) ) {
+    if(ORTE_SUCCESS != (ret = orte_snapc_base_pack_options(buffer, options))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
     }
 
-    if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(currently_migrating), 1, OPAL_BOOL))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(currently_migrating), 1, OPAL_BOOL))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
@@ -2165,7 +2164,7 @@ static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
     if( currently_migrating ) {
         num_procs = opal_list_get_size(migrating_procs);
 
-        if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &num_procs, 1, OPAL_SIZE))) {
+        if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &num_procs, 1, OPAL_SIZE))) {
             ORTE_ERROR_LOG(ret);
             exit_status = ret;
             goto cleanup;
@@ -2175,7 +2174,7 @@ static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
              item != opal_list_get_end(migrating_procs);
              item  = opal_list_get_next(item)) {
             proc = (orte_proc_t*)item;
-            if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(proc->name), 1, ORTE_NAME))) {
+            if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(proc->name), 1, ORTE_NAME))) {
                 ORTE_ERROR_LOG(ret);
                 exit_status = ret;
                 goto cleanup;
@@ -2191,7 +2190,7 @@ static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
     free(state_str);
     state_str = NULL;
 
-    if( ORTE_SUCCESS != (ret = orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, &buffer, ORTE_RML_TAG_SNAPC_FULL)) ) {
+    if( ORTE_SUCCESS != (ret = orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, buffer, ORTE_RML_TAG_SNAPC_FULL)) ) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
@@ -2207,7 +2206,7 @@ static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
         state_str = NULL;
     }
 
-    OBJ_DESTRUCT(&buffer);
+    OBJ_RELEASE(buffer);
 
     return exit_status;
 }
index 0965571..22466ac 100644 (file)
@@ -626,7 +626,7 @@ static int snapc_full_local_send_restart_proc_info(void)
     int ret, exit_status = ORTE_SUCCESS;
     orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
     opal_list_item_t* item = NULL;
-    opal_buffer_t buffer;
+    opal_buffer_t *buffer;
     orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_RESTART_PROC_INFO;
     size_t num_vpids = 0;
 
@@ -656,21 +656,21 @@ static int snapc_full_local_send_restart_proc_info(void)
         return exit_status;
     }
 
-    OBJ_CONSTRUCT(&buffer, opal_buffer_t);
+    buffer = OBJ_NEW(opal_buffer_t);
 
-    if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
     }
 
-    if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(orte_process_info.nodename), 1, OPAL_STRING))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(orte_process_info.nodename), 1, OPAL_STRING))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
     }
 
-    if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &num_vpids, 1, OPAL_SIZE))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &num_vpids, 1, OPAL_SIZE))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
@@ -681,7 +681,7 @@ static int snapc_full_local_send_restart_proc_info(void)
         item  = opal_list_get_next(item) ) {
         vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
 
-        if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(vpid_snapshot->process_pid), 1, OPAL_PID))) {
+        if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(vpid_snapshot->process_pid), 1, OPAL_PID))) {
             ORTE_ERROR_LOG(ret);
             exit_status = ret;
             goto cleanup;
@@ -689,14 +689,17 @@ static int snapc_full_local_send_restart_proc_info(void)
 
     }
 
-    if (0 > (ret = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_SNAPC_FULL, 0))) {
+    if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer, ORTE_RML_TAG_SNAPC_FULL,
+                                                       orte_rml_send_callback, 0))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
     }
 
+    return ORTE_SUCCESS;
+
  cleanup:
-    OBJ_DESTRUCT(&buffer);
+    OBJ_RELEASE(buffer);
 
     return exit_status;
 }
@@ -1309,26 +1312,26 @@ static int local_define_pipe_names(orte_snapc_full_app_snapshot_t *vpid_snapshot
 static int snapc_full_local_update_coord(int state, bool quick)
 {
     int ret, exit_status = ORTE_SUCCESS;
-    opal_buffer_t buffer;
+    opal_buffer_t *buffer;
     orte_snapc_full_cmd_flag_t command;
 
     /*
      * Local Coordinator: Send Global Coordinator state information
      */
-    OBJ_CONSTRUCT(&buffer, opal_buffer_t);
+    buffer = OBJ_NEW(opal_buffer_t);
 
     if( quick ) {
         command = ORTE_SNAPC_FULL_UPDATE_ORTED_STATE_QUICK_CMD;
     } else {
         command = ORTE_SNAPC_FULL_UPDATE_ORTED_STATE_CMD;
     }
-    if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_FULL_CMD )) ) {
+    if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD )) ) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
     }
 
-    if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &state, 1, OPAL_INT))) {
+    if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &state, 1, OPAL_INT))) {
         ORTE_ERROR_LOG(ret);
         exit_status = ret;
         goto cleanup;
@@ -1345,7 +1348,7 @@ static int snapc_full_local_update_coord(int state, bool quick)
     }
 
  send_data:
-    if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, &buffer,
+    if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
                                                        ORTE_RML_TAG_SNAPC_FULL,
                                                        orte_rml_send_callback, 0))) {
         ORTE_ERROR_LOG(ret);
@@ -1353,8 +1356,10 @@ static int snapc_full_local_update_coord(int state, bool quick)
         goto cleanup;
     }
 
+    return ORTE_SUCCESS;
+
  cleanup:
-    OBJ_DESTRUCT(&buffer);
+    OBJ_RELEASE(buffer);
 
     return exit_status;
 }