diff --git a/src/include/watchdog/wd_internal_commands.h b/src/include/watchdog/wd_internal_commands.h index 5093eecc..41444637 100644 --- a/src/include/watchdog/wd_internal_commands.h +++ b/src/include/watchdog/wd_internal_commands.h @@ -30,6 +30,7 @@ #include "watchdog/wd_json_data.h" #include "watchdog/wd_ipc_conn.h" #include "watchdog/wd_commands.h" +#include "parser/pg_list.h" /* * These lock can only be acquired by @@ -50,8 +51,7 @@ extern WDFailoverCMDResults wd_send_failback_request(int node_id, unsigned char extern WDFailoverCMDResults wd_degenerate_backend_set(int *node_id_set, int count, unsigned char flags); extern WDFailoverCMDResults wd_promote_backend(int node_id, unsigned char flags); -extern WdCommandResult wd_execute_cluster_command(char* clusterCommand, - int nArgs, WDExecCommandArg *wdExecCommandArg); +extern WdCommandResult wd_execute_cluster_command(char* clusterCommand,List *argsList); extern WDPGBackendStatus * get_pg_backend_status_from_leader_wd_node(void); diff --git a/src/include/watchdog/wd_json_data.h b/src/include/watchdog/wd_json_data.h index 4a45c4e0..7b53999e 100644 --- a/src/include/watchdog/wd_json_data.h +++ b/src/include/watchdog/wd_json_data.h @@ -23,6 +23,7 @@ */ #include "utils/json.h" #include "pool_config.h" +#include "parser/pg_list.h" #include "watchdog/watchdog.h" #ifndef WD_JSON_DATA_H @@ -71,12 +72,10 @@ extern char *get_data_request_json(char *request_type, unsigned int sharedKey, c extern bool parse_wd_exec_cluster_command_json(char *json_data, int data_len, - char **clusterCommand, - int *nArgs, WDExecCommandArg **wdExecCommandArg); + char **clusterCommand, List **args_list); extern char * -get_wd_exec_cluster_command_json(char *clusterCommand,int nArgs, - WDExecCommandArg *wdExecCommandArg, +get_wd_exec_cluster_command_json(char *clusterCommand,List *args_list, unsigned int sharedKey, char *authKey); #endif diff --git a/src/pcp_con/pcp_worker.c b/src/pcp_con/pcp_worker.c index cf123e7e..5f89911e 100644 --- a/src/pcp_con/pcp_worker.c +++ b/src/pcp_con/pcp_worker.c @@ -57,6 +57,7 @@ #include "watchdog/wd_internal_commands.h" #include "main/pool_internal_comms.h" + #define MAX_FILE_LINE_LEN 512 extern char *pcp_conf_file; /* global variable defined in main.c holds the @@ -1022,7 +1023,7 @@ process_reload_config(PCP_CONNECTION * frontend, char scope) ereport(LOG, (errmsg("PCP: sending command to watchdog to reload config cluster"))); - if (wd_execute_cluster_command(WD_COMMAND_RELOAD_CONFIG_CLUSTER,0, NULL) != COMMAND_OK) + if (wd_execute_cluster_command(WD_COMMAND_RELOAD_CONFIG_CLUSTER, NULL) != COMMAND_OK) ereport(ERROR, (errmsg("PCP: error while processing reload config request for cluster"), errdetail("failed to propogate reload config command through watchdog"))); @@ -1324,17 +1325,21 @@ process_shutown_request(PCP_CONNECTION * frontend, char mode, char tos) if (tos == 't' && pool_config->use_watchdog) { WDExecCommandArg wdExecCommandArg; + List *args_list = NULL; strncpy(wdExecCommandArg.arg_name, "mode", sizeof(wdExecCommandArg.arg_name) - 1); - snprintf(wdExecCommandArg.arg_value, sizeof(wdExecCommandArg.arg_name) - 1, "%c",mode); + snprintf(wdExecCommandArg.arg_value, sizeof(wdExecCommandArg.arg_value) - 1, "%c",mode); + args_list = lappend(args_list,&wdExecCommandArg); ereport(LOG, (errmsg("PCP: sending command to watchdog to shutdown cluster"))); - if (wd_execute_cluster_command(WD_COMMAND_SHUTDOWN_CLUSTER,1, &wdExecCommandArg) != COMMAND_OK) + if (wd_execute_cluster_command(WD_COMMAND_SHUTDOWN_CLUSTER, args_list) != COMMAND_OK) ereport(ERROR, (errmsg("PCP: error while processing shutdown cluster request"), errdetail("failed to propogate shutdown command through watchdog"))); + + list_free(args_list); } pcp_write(frontend, "t", 1); @@ -1342,7 +1347,6 @@ process_shutown_request(PCP_CONNECTION * frontend, char mode, char tos) pcp_write(frontend, &len, sizeof(int)); pcp_write(frontend, code, sizeof(code)); do_pcp_flush(frontend); - terminate_pgpool(mode, true); } diff --git a/src/watchdog/watchdog.c b/src/watchdog/watchdog.c index 69ff60b1..477bbae2 100644 --- a/src/watchdog/watchdog.c +++ b/src/watchdog/watchdog.c @@ -2081,14 +2081,13 @@ process_IPC_execute_cluster_command(WDCommandData * ipcCommand) { /* get the json for node list */ char *clusterCommand = NULL; - int nArgs; - WDExecCommandArg *wdExecCommandArg = NULL; + List *args_list = NULL; if (ipcCommand->sourcePacket.len <= 0 || ipcCommand->sourcePacket.data == NULL) return IPC_CMD_ERROR; if (!parse_wd_exec_cluster_command_json(ipcCommand->sourcePacket.data, ipcCommand->sourcePacket.len, - &clusterCommand, &nArgs, &wdExecCommandArg)) + &clusterCommand, &args_list)) { goto ERROR_EXIT; } @@ -2110,6 +2109,7 @@ process_IPC_execute_cluster_command(WDCommandData * ipcCommand) { ereport(LOG, (errmsg("'LOCK ON STANDBY' command can only be processed on coordinator node"))); + goto ERROR_EXIT; } } else @@ -2128,15 +2128,15 @@ process_IPC_execute_cluster_command(WDCommandData * ipcCommand) ipcCommand->sourcePacket.data, ipcCommand->sourcePacket.len, NULL); - if (wdExecCommandArg) - pfree(wdExecCommandArg); + if (args_list) + list_free_deep(args_list); pfree(clusterCommand); return IPC_CMD_OK; ERROR_EXIT: - if (wdExecCommandArg) - pfree(wdExecCommandArg); + if (args_list) + list_free_deep(args_list); if (clusterCommand) pfree(clusterCommand); return IPC_CMD_ERROR; @@ -4053,8 +4053,7 @@ wd_execute_cluster_command_processor(WatchdogNode * wdNode, WDPacketData * pkt) { /* get the json for node list */ char *clusterCommand = NULL; - int nArgs; - WDExecCommandArg *wdExecCommandArg = NULL; + List *args_list = NULL; if (pkt->type != WD_EXECUTE_COMMAND_REQUEST) return; @@ -4067,7 +4066,7 @@ wd_execute_cluster_command_processor(WatchdogNode * wdNode, WDPacketData * pkt) } if (!parse_wd_exec_cluster_command_json(pkt->data, pkt->len, - &clusterCommand, &nArgs, &wdExecCommandArg)) + &clusterCommand, &args_list)) { ereport(LOG, (errmsg("node \"%s\" sent an invalid JSON data in cluster command message", wdNode->nodeName))); @@ -4076,21 +4075,22 @@ wd_execute_cluster_command_processor(WatchdogNode * wdNode, WDPacketData * pkt) ereport(DEBUG1, (errmsg("received \"%s\" command from node \"%s\"",clusterCommand, wdNode->nodeName))); - if (strcasecmp(WD_COMMAND_SHUTDOWN_CLUSTER, clusterCommand) == 0) { - int i; char mode = 's'; - for ( i =0; i < nArgs; i++) + ListCell *lc; + foreach(lc, args_list) { - if (strcmp(wdExecCommandArg[i].arg_name, "mode") == 0) + WDExecCommandArg *wdExecCommandArg = lfirst(lc); + if (strcmp(wdExecCommandArg->arg_name, "mode") == 0) { - mode = wdExecCommandArg[i].arg_value[0]; + mode = wdExecCommandArg->arg_value[0]; } else ereport(LOG, - (errmsg("unsupported argument \"%s\" in shutdown command from remote node \"%s\"", wdExecCommandArg[i].arg_name, wdNode->nodeName))); + (errmsg("unsupported argument \"%s\" in shutdown command from remote node \"%s\"", wdExecCommandArg->arg_name, wdNode->nodeName))); } + ereport(LOG, (errmsg("processing shutdown command from remote node \"%s\"", wdNode->nodeName))); terminate_pgpool(mode, false); @@ -4103,26 +4103,27 @@ wd_execute_cluster_command_processor(WatchdogNode * wdNode, WDPacketData * pkt) } else if (strcasecmp(WD_COMMAND_LOCK_ON_STANDBY, clusterCommand) == 0) { - int i; int lock_type = -1; char *operation = NULL; - if (get_local_node_state() != WD_STANDBY && wdNode->state == WD_COORDINATOR) + if (get_local_node_state() == WD_STANDBY && wdNode->state == WD_COORDINATOR) { - if (nArgs == 2) + if (list_length(args_list) == 2) { - for ( i =0; i < nArgs; i++) + ListCell *lc; + foreach(lc, args_list) { - if (strcmp(wdExecCommandArg[i].arg_name, "StandbyLockType") == 0) + WDExecCommandArg *wdExecCommandArg = lfirst(lc); + if (strcmp(wdExecCommandArg->arg_name, "StandbyLockType") == 0) { - lock_type = atoi(wdExecCommandArg[i].arg_value); + lock_type = atoi(wdExecCommandArg->arg_value); } - else if (strcmp(wdExecCommandArg[i].arg_name, "LockingOperation") == 0) + else if (strcmp(wdExecCommandArg->arg_name, "LockingOperation") == 0) { - operation = wdExecCommandArg[i].arg_value; + operation = wdExecCommandArg->arg_value; } else ereport(LOG, - (errmsg("unsupported argument \"%s\" in 'LOCK ON STANDBY' from remote node \"%s\"", wdExecCommandArg[i].arg_name, wdNode->nodeName))); + (errmsg("unsupported argument \"%s\" in 'LOCK ON STANDBY' from remote node \"%s\"", wdExecCommandArg->arg_name, wdNode->nodeName))); } if (lock_type < 0 || operation == NULL) { @@ -4174,8 +4175,8 @@ wd_execute_cluster_command_processor(WatchdogNode * wdNode, WDPacketData * pkt) (errmsg("received \"%s\" command from node \"%s\" is not supported",clusterCommand, wdNode->nodeName))); } - if (wdExecCommandArg) - pfree(wdExecCommandArg); + if (args_list) + list_free_deep(args_list); pfree(clusterCommand); return; } diff --git a/src/watchdog/wd_internal_commands.c b/src/watchdog/wd_internal_commands.c index 6c1fac1d..730d9d9a 100644 --- a/src/watchdog/wd_internal_commands.c +++ b/src/watchdog/wd_internal_commands.c @@ -278,13 +278,12 @@ wd_end_recovery(void) } WdCommandResult -wd_execute_cluster_command(char* clusterCommand, - int nArgs, WDExecCommandArg *wdExecCommandArg) +wd_execute_cluster_command(char* clusterCommand, List *argsList) { char type; unsigned int *shared_key = get_ipc_shared_key(); - char *func = get_wd_exec_cluster_command_json(clusterCommand, nArgs, wdExecCommandArg, + char *func = get_wd_exec_cluster_command_json(clusterCommand, argsList, shared_key ? *shared_key : 0, pool_config->wd_authkey); WDIPCCmdResult *result = issue_command_to_watchdog(WD_EXECUTE_CLUSTER_COMMAND, @@ -558,19 +557,25 @@ wd_internal_get_watchdog_local_node_state(void) static WdCommandResult wd_send_locking_command(WD_LOCK_STANDBY_TYPE lock_type, bool acquire) { + WdCommandResult res; + List *args_list = NULL; WDExecCommandArg wdExecCommandArg[2]; strncpy(wdExecCommandArg[0].arg_name, "StandbyLockType", sizeof(wdExecCommandArg[0].arg_name) - 1); - snprintf(wdExecCommandArg[0].arg_value, sizeof(wdExecCommandArg[0].arg_name) - 1, "%d",lock_type); + snprintf(wdExecCommandArg[0].arg_value, sizeof(wdExecCommandArg[0].arg_value) - 1, "%d",lock_type); strncpy(wdExecCommandArg[1].arg_name, "LockingOperation", sizeof(wdExecCommandArg[1].arg_name) - 1); - snprintf(wdExecCommandArg[1].arg_value, sizeof(wdExecCommandArg[1].arg_name) - 1, + snprintf(wdExecCommandArg[1].arg_value, sizeof(wdExecCommandArg[1].arg_value) - 1, "%s",acquire?"acquire":"release"); + args_list = lappend(args_list,&wdExecCommandArg[0]); + args_list = lappend(args_list,&wdExecCommandArg[1]); ereport(DEBUG1, (errmsg("sending standby locking request to watchdog"))); - return wd_execute_cluster_command(WD_COMMAND_LOCK_ON_STANDBY, 2, wdExecCommandArg); + res = wd_execute_cluster_command(WD_COMMAND_LOCK_ON_STANDBY, args_list); + list_free(args_list); + return res; } WdCommandResult diff --git a/src/watchdog/wd_json_data.c b/src/watchdog/wd_json_data.c index eecfddce..5bb0e417 100644 --- a/src/watchdog/wd_json_data.c +++ b/src/watchdog/wd_json_data.c @@ -804,12 +804,12 @@ get_wd_simple_message_json(char *message) } char * -get_wd_exec_cluster_command_json(char *clusterCommand, int nArgs, - WDExecCommandArg *wdExecCommandArg, +get_wd_exec_cluster_command_json(char *clusterCommand, List *args_list, unsigned int sharedKey, char *authKey) { - int i; - char *json_str; + char *json_str; + int nArgs = args_list? list_length(args_list):0; + JsonNode *jNode = jw_create_with_object(true); jw_put_int(jNode, WD_IPC_SHARED_KEY, sharedKey); /* put the shared key */ @@ -824,12 +824,15 @@ get_wd_exec_cluster_command_json(char *clusterCommand, int nArgs, /* Array of arguments */ if(nArgs > 0) { + ListCell *lc; jw_start_array(jNode, "argument_list"); - for (i = 0; i < nArgs; i++) + + foreach(lc, args_list) { + WDExecCommandArg *wdExecCommandArg = lfirst(lc); jw_start_object(jNode, "Arg"); - jw_put_string(jNode, "arg_name", wdExecCommandArg[i].arg_name); - jw_put_string(jNode, "arg_value", wdExecCommandArg[i].arg_value); + jw_put_string(jNode, "arg_name", wdExecCommandArg->arg_name); + jw_put_string(jNode, "arg_value", wdExecCommandArg->arg_value); jw_end_element(jNode); } jw_end_element(jNode); /* argument_list array End */ @@ -844,12 +847,14 @@ get_wd_exec_cluster_command_json(char *clusterCommand, int nArgs, bool parse_wd_exec_cluster_command_json(char *json_data, int data_len, - char **clusterCommand, - int *nArgs, WDExecCommandArg **wdExecCommandArg) + char **clusterCommand, List **args_list) { json_value *root; char *ptr = NULL; int i; + int nArgs = 0; + + *args_list = NULL; root = json_parse(json_data, data_len); @@ -873,44 +878,50 @@ parse_wd_exec_cluster_command_json(char *json_data, int data_len, } *clusterCommand = pstrdup(ptr); - if (json_get_int_value_for_key(root, "nArgs", nArgs)) + if (json_get_int_value_for_key(root, "nArgs", &nArgs)) { /* nArgs not found, Just ignore it */ - *nArgs = 0; + nArgs = 0; /* it may be from the old version */ } - if (*nArgs > 0) + if (nArgs > 0) { json_value *value; - - *wdExecCommandArg = palloc0(sizeof(WDExecCommandArg) * *nArgs); - /* backend_desc array */ value = json_get_value_for_key(root, "argument_list"); if (value == NULL || value->type != json_array) goto ERROR_EXIT; - if (*nArgs!= value->u.array.length) + if (nArgs!= value->u.array.length) { ereport(LOG, (errmsg("watchdog is unable to parse exec cluster command json"), errdetail("nArgs is different than argument array length \"%s\"", json_data))); goto ERROR_EXIT; } - for (i = 0; i < *nArgs; i++) + for (i = 0; i < nArgs; i++) { + WDExecCommandArg *command_arg = palloc0(sizeof(WDExecCommandArg)); + /* + * Append to list right away, so that depp freeing the list also + * get rid of half cooked argumnts in case of an error + */ + *args_list = lappend(*args_list,command_arg); + json_value *arr_value = value->u.array.values[i]; char *ptr; ptr = json_get_string_value_for_key(arr_value, "arg_name"); if (ptr == NULL) goto ERROR_EXIT; - strncpy(wdExecCommandArg[i]->arg_name, ptr, sizeof(wdExecCommandArg[i]->arg_name) - 1); + + strncpy(command_arg->arg_name, ptr, sizeof(command_arg->arg_name) - 1); ptr = json_get_string_value_for_key(arr_value, "arg_value"); if (ptr == NULL) goto ERROR_EXIT; - strncpy(wdExecCommandArg[i]->arg_value, ptr, sizeof(wdExecCommandArg[i]->arg_value) - 1); + + strncpy(command_arg->arg_value, ptr, sizeof(command_arg->arg_value) - 1); } } @@ -920,7 +931,8 @@ parse_wd_exec_cluster_command_json(char *json_data, int data_len, ERROR_EXIT: if (root) json_value_free(root); - if (*wdExecCommandArg ) - pfree(*wdExecCommandArg); + if (*args_list) + list_free_deep(*args_list); + *args_list = NULL; return false; }