[pgpool-hackers: 2505] Re: New Feature with patch: Quorum and Consensus for backend failover
Muhammad Usama
m.usama at gmail.com
Fri Aug 25 03:32:49 JST 2017
Hi Ishii-San
Please fine the updated patch, It fixes the regression issue you were
facing and also another bug which I encountered during my testing.
-- Adding Yugo to the thread,
Hi Yugo,
Since you are an expert of watchdog feature, So I thought you might have
something to say especially regarding the discussion points mentioned in
the initial mail.
Thanks
Best Regards
Muhammad Usama
On Thu, Aug 24, 2017 at 11:25 AM, Muhammad Usama <m.usama at gmail.com> wrote:
>
>
> On Thu, Aug 24, 2017 at 4:34 AM, Tatsuo Ishii <ishii at sraoss.co.jp> wrote:
>
>> After applying the patch, many of regression tests fail. It seems
>> pgpool.conf.sample has bogus comment which causes the pgpool.conf
>> parser to complain parse error.
>>
>> 2017-08-24 08:22:36: pid 6017: FATAL: syntex error in configuration file
>> "/home/t-ishii/work/pgpool-II/current/pgpool2/src/test/regre
>> ssion/tests/004.watchdog/standby/etc/pgpool.conf"
>> 2017-08-24 08:22:36: pid 6017: DETAIL: parse error at line 568 '*' token
>> = 8
>>
>
> Really sorry, Somehow I overlooked the sample config file changes I made
> at the last minute.
> Will send you the updated version.
>
> Thanks
> Best Regards
> Muhammad Usama
>
>>
>> Best regards,
>> --
>> Tatsuo Ishii
>> SRA OSS, Inc. Japan
>> English: http://www.sraoss.co.jp/index_en.php
>> Japanese:http://www.sraoss.co.jp
>>
>> > Usama,
>> >
>> > Thanks for the patch. I am going to review it.
>> >
>> > In the mean time when I apply your patch, I got some trailing
>> > whitespace errors. Can you please fix them?
>> >
>> > /home/t-ishii/quorum_aware_failover.diff:470: trailing whitespace.
>> >
>> > /home/t-ishii/quorum_aware_failover.diff:485: trailing whitespace.
>> >
>> > /home/t-ishii/quorum_aware_failover.diff:564: trailing whitespace.
>> >
>> > /home/t-ishii/quorum_aware_failover.diff:1428: trailing whitespace.
>> >
>> > /home/t-ishii/quorum_aware_failover.diff:1450: trailing whitespace.
>> >
>> > warning: squelched 3 whitespace errors
>> > warning: 8 lines add whitespace errors.
>> >
>> > Best regards,
>> > --
>> > Tatsuo Ishii
>> > SRA OSS, Inc. Japan
>> > English: http://www.sraoss.co.jp/index_en.php
>> > Japanese:http://www.sraoss.co.jp
>> >
>> >> Hi
>> >>
>> >> I was working on the new feature to make the backend node failover
>> quorum
>> >> aware and on the half way through the implementation I also added the
>> >> majority consensus feature for the same.
>> >>
>> >> So please find the first version of the patch for review that makes the
>> >> backend node failover consider the watchdog cluster quorum status and
>> seek
>> >> the majority consensus before performing failover.
>> >>
>> >> *Changes in the Failover mechanism with watchdog.*
>> >> For this new feature I have modified the Pgpool-II's existing failover
>> >> mechanism with watchdog.
>> >> Previously as you know when the Pgpool-II require to perform a node
>> >> operation (failover, failback, promote-node) with the watchdog. The
>> >> watchdog used to propagated the failover request to all the Pgpool-II
>> nodes
>> >> in the watchdog cluster and as soon as the request was received by the
>> >> node, it used to initiate the local failover and that failover was
>> >> synchronised on all nodes using the distributed locks.
>> >>
>> >> *Now Only the Master node performs the failover.*
>> >> The attached patch changes the mechanism of synchronised failover, and
>> now
>> >> only the Pgpool-II of master watchdog node performs the failover, and
>> all
>> >> other standby nodes sync the backend statuses after the master
>> Pgpool-II is
>> >> finished with the failover.
>> >>
>> >> *Overview of new failover mechanism.*
>> >> -- If the failover request is received to the standby watchdog
>> node(from
>> >> local Pgpool-II), That request is forwarded to the master watchdog and
>> the
>> >> Pgpool-II main process is returned with the FAILOVER_RES_WILL_BE_DONE
>> >> return code. And upon receiving the FAILOVER_RES_WILL_BE_DONE from the
>> >> watchdog for the failover request the requesting Pgpool-II moves
>> forward
>> >> without doing anything further for the particular failover command.
>> >>
>> >> -- Now when the failover request from standby node is received by the
>> >> master watchdog, after performing the validation, applying the
>> consensus
>> >> rules the failover request is triggered on the local Pgpool-II .
>> >>
>> >> -- When the failover request is received to the master watchdog node
>> from
>> >> the local Pgpool-II (On the IPC channel) the watchdog process inform
>> the
>> >> Pgpool-II requesting process to proceed with failover (provided all
>> >> failover rules are satisfied).
>> >>
>> >> -- After the failover is finished on the master Pgpool-II, the failover
>> >> function calls the *wd_failover_end*() which sends the backend sync
>> >> required message to all standby watchdogs.
>> >>
>> >> -- Upon receiving the sync required message from master watchdog node
>> all
>> >> Pgpool-II sync the new statuses of each backend node from the master
>> >> watchdog.
>> >>
>> >> *No More Failover locks*
>> >> Since with this new failover mechanism we do not require any
>> >> synchronisation and guards against the execution of failover_commands
>> by
>> >> multiple Pgpool-II nodes, So the patch removes all the distributed
>> locks
>> >> from failover function, This makes the failover simpler and faster.
>> >>
>> >> *New kind of Failover operation NODE_QUARANTINE_REQUEST*
>> >> The patch adds the new kind of backend node operation NODE_QUARANTINE
>> which
>> >> is effectively same as the NODE_DOWN, but with node_quarantine the
>> >> failover_command is not triggered.
>> >> The NODE_DOWN_REQUEST is automatically converted to the
>> >> NODE_QUARANTINE_REQUEST when the failover is requested on the backend
>> node
>> >> but watchdog cluster does not holds the quorum.
>> >> This means in the absence of quorum the failed backend nodes are
>> >> quarantined and when the quorum becomes available again the Pgpool-II
>> >> performs the failback operation on all quarantine nodes.
>> >> And again when the failback is performed on the quarantine backend
>> node the
>> >> failover function does not trigger the failback_command.
>> >>
>> >> *Controlling the Failover behaviour.*
>> >> The patch adds three new configuration parameters to configure the
>> failover
>> >> behaviour from user side.
>> >>
>> >> *failover_when_quorum_exists*
>> >> When enabled the failover command will only be executed when the
>> watchdog
>> >> cluster holds the quorum. And when the quorum is absent and
>> >> failover_when_quorum_exists is enabled the failed backend nodes will
>> get
>> >> quarantine until the quorum becomes available again.
>> >> disabling it will enable the old behaviour of failover commands.
>> >>
>> >>
>> >> *failover_require_consensus*This new configuration parameter can be
>> used to
>> >> make sure we get the majority vote before performing the failover on
>> the
>> >> node. When *failover_require_consensus* is enabled then the failover is
>> >> only performed after receiving the failover request from the majority
>> or
>> >> Pgpool-II nodes.
>> >> For example in three nodes cluster the failover will not be performed
>> until
>> >> at least two nodes ask for performing the failover on the particular
>> >> backend node.
>> >>
>> >> It is also worthwhile to mention here that *failover_require_consensus*
>> >> only works when failover_when_quorum_exists is enables.
>> >>
>> >>
>> >> *enable_multiple_failover_requests_from_node*
>> >> This parameter works in connection with *failover_require_consensus*
>> >> config. When enabled a single Pgpool-II node can vote for failover
>> multiple
>> >> times.
>> >> For example in the three nodes cluster if one Pgpool-II node sends the
>> >> failover request of particular node twice that would be counted as two
>> >> votes in favour of failover and the failover will be performed even if
>> we
>> >> do not get a vote from other two nodes.
>> >>
>> >> And when *enable_multiple_failover_requests_from_node* is disabled,
>> Only
>> >> the first vote from each Pgpool-II will be accepted and all other
>> >> subsequent votes will be marked duplicate and rejected.
>> >> So in that case we will require a majority votes from distinct nodes to
>> >> execute the failover.
>> >> Again this *enable_multiple_failover_requests_from_node* only becomes
>> >> effective when both *failover_when_quorum_exists* and
>> >> *failover_require_consensus* are enabled.
>> >>
>> >>
>> >> *Controlling the failover: The Coding perspective.*
>> >> Although the failover functions are made quorum and consensus aware but
>> >> there is still a way to bypass the quorum conditions, and requirement
>> of
>> >> consensus.
>> >>
>> >> For this the patch uses the existing request_details flags in
>> >> POOL_REQUEST_NODE to control the behaviour of failover.
>> >>
>> >> Here are the newly added flags values.
>> >>
>> >> *REQ_DETAIL_WATCHDOG*:
>> >> Setting this flag while issuing the failover command will not send the
>> >> failover request to the watchdog. But this flag may not be useful in
>> any
>> >> other place than where it is already used.
>> >> Mostly this flag can be used to avoid the failover command from going
>> to
>> >> watchdog that is already originated from watchdog. Otherwise we can
>> end up
>> >> in infinite loop.
>> >>
>> >> *REQ_DETAIL_CONFIRMED*:
>> >> Setting this flag will bypass the *failover_require_consensus*
>> >> configuration and immediately perform the failover if quorum is
>> present.
>> >> This flag can be used to issue the failover request originated from PCP
>> >> command.
>> >>
>> >> *REQ_DETAIL_UPDATE*:
>> >> This flag is used for the command where we are failing back the
>> quarantine
>> >> nodes. Setting this flag will not trigger the failback_command.
>> >>
>> >> *Some conditional flags used:*
>> >> I was not sure about the configuration of each type of failover
>> operation.
>> >> As we have three main failover operations NODE_UP_REQUEST,
>> >> NODE_DOWN_REQUEST, and PROMOTE_NODE_REQUEST
>> >> So I was thinking do we need to give the configuration option to the
>> users,
>> >> if they want to enable/disable quorum checking and consensus for
>> individual
>> >> failover operation type.
>> >> For example: is it a practical configuration where a user would want to
>> >> ensure quorum while preforming NODE_DOWN operation while does not want
>> it
>> >> for NODE_UP.
>> >> So in this patch I use three compile time defines to enable disable the
>> >> individual failover operation, while we can decide on the best
>> solution.
>> >>
>> >> NODE_UP_REQUIRE_CONSENSUS: defining it will enable quorum checking
>> feature
>> >> for NODE_UP_REQUESTs
>> >>
>> >> NODE_DOWN_REQUIRE_CONSENSUS: defining it will enable quorum checking
>> >> feature for NODE_DOWN_REQUESTs
>> >>
>> >> NODE_PROMOTE_REQUIRE_CONSENSUS: defining it will enable quorum
>> checking
>> >> feature for PROMOTE_NODE_REQUESTs
>> >>
>> >> *Some Point for Discussion:*
>> >>
>> >> *Do we really need to check ReqInfo->switching flag before enqueuing
>> >> failover request.*
>> >> While working on the patch I was wondering why do we disallow
>> enqueuing the
>> >> failover command when the failover is already in progress? For example
>> in
>> >> *pcp_process_command*() function if we see the *Req_info->switching*
>> flag
>> >> set we bailout with the error instead of enqueuing the command. Is is
>> >> really necessary?
>> >>
>> >> *Do we need more granule control over each failover operation:*
>> >> As described in section "Some conditional flags used" I want the
>> opinion on
>> >> do we need configuration parameters in pgpool.conf to enable disable
>> quorum
>> >> and consensus checking on individual failover types.
>> >>
>> >> *Which failover should be mark as Confirmed:*
>> >> As defined in the above section of REQ_DETAIL_CONFIRMED, We can mark
>> the
>> >> failover request to not need consensus, currently the requests from
>> the PCP
>> >> commands are fired with this flag. But I was wondering there may be
>> more
>> >> places where we many need to use the flag.
>> >> For example I currently use the same confirmed flag when failover is
>> >> triggered because of *replication_stop_on_mismatch*.
>> >>
>> >> I think we should think this flag for each place of failover, like
>> when the
>> >> failover is triggered
>> >> because of health_check failure.
>> >> because of replication mismatch
>> >> because of backend_error
>> >> e.t.c
>> >>
>> >> *Node Quarantine behaviour.*
>> >> What do you think about the node quarantine used by this patch. Can you
>> >> think of some problem which can be caused by this?
>> >>
>> >> *What should be the default values for each newly added config
>> parameters.*
>> >>
>> >>
>> >>
>> >> *TODOs*
>> >>
>> >> -- Updating the documentation is still todo. Will do that once every
>> aspect
>> >> of the feature will be finalised.
>> >> -- Some code warnings and cleanups are still not done.
>> >> -- I am still little short on testing
>> >> -- Regression test cases for the feature
>> >>
>> >>
>> >> Thoughts and suggestions are most welcome.
>> >>
>> >> Thanks
>> >> Best regards
>> >> Muhammad Usama
>> > _______________________________________________
>> > pgpool-hackers mailing list
>> > pgpool-hackers at pgpool.net
>> > http://www.pgpool.net/mailman/listinfo/pgpool-hackers
>>
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://www.sraoss.jp/pipermail/pgpool-hackers/attachments/20170824/6032cdd2/attachment-0001.html>
-------------- next part --------------
diff --git a/src/config/pool_config_variables.c b/src/config/pool_config_variables.c
index f44ef412..29617c41 100644
--- a/src/config/pool_config_variables.c
+++ b/src/config/pool_config_variables.c
@@ -231,7 +231,33 @@ static struct config_bool ConfigureNamesBool[] =
NULL, /* check func */
NULL /* show hook */
},
-
+ {
+ {"failover_when_quorum_exists", CFGCXT_INIT, FAILOVER_CONFIG,
+ "Do failover only when cluster has the quorum.",
+ CONFIG_VAR_TYPE_BOOL,false, 0
+ },
+ &g_pool_config.failover_when_quorum_exists,
+ false,
+ NULL, NULL,NULL
+ },
+ {
+ {"failover_require_consensus", CFGCXT_INIT, FAILOVER_CONFIG,
+ "Only do failover when majority aggrees.",
+ CONFIG_VAR_TYPE_BOOL,false, 0
+ },
+ &g_pool_config.failover_require_consensus,
+ false,
+ NULL, NULL,NULL
+ },
+ {
+ {"enable_multiple_failover_requests_from_node", CFGCXT_INIT, FAILOVER_CONFIG,
+ "A Pgpool-II node can send multiple failover requests to build consensus.",
+ CONFIG_VAR_TYPE_BOOL,false, 0
+ },
+ &g_pool_config.enable_multiple_failover_requests_from_node,
+ false,
+ NULL, NULL,NULL
+ },
{
{"log_connections", CFGCXT_RELOAD, LOGING_CONFIG,
"Logs each successful connection.",
diff --git a/src/include/pcp/libpcp_ext.h b/src/include/pcp/libpcp_ext.h
index 705ebf15..654dd7e0 100644
--- a/src/include/pcp/libpcp_ext.h
+++ b/src/include/pcp/libpcp_ext.h
@@ -74,6 +74,7 @@ typedef struct {
double unnormalized_weight; /* descripted parameter */
char backend_data_directory[MAX_PATH_LENGTH];
unsigned short flag; /* various flags */
+ bool quarantine; /* true if node is CON_DOWN because of quarantine */
uint64 standby_delay; /* The replication delay against the primary */
} BackendInfo;
diff --git a/src/include/pool.h b/src/include/pool.h
index b2f10ca6..d179f7d0 100644
--- a/src/include/pool.h
+++ b/src/include/pool.h
@@ -404,15 +404,18 @@ typedef enum {
NODE_DOWN_REQUEST,
NODE_RECOVERY_REQUEST,
CLOSE_IDLE_REQUEST,
- PROMOTE_NODE_REQUEST
+ PROMOTE_NODE_REQUEST,
+ NODE_QUARANTINE_REQUEST
} POOL_REQUEST_KIND;
#define REQ_DETAIL_SWITCHOVER 0x00000001 /* failover due to switch over */
+#define REQ_DETAIL_WATCHDOG 0x00000002 /* failover req from watchdog */
+#define REQ_DETAIL_CONFIRMED 0x00000004 /* failover req that does not require majority vote */
+#define REQ_DETAIL_UPDATE 0x00000008 /* failover req is just and update node status request */
typedef struct {
POOL_REQUEST_KIND kind; /* request kind */
unsigned char request_details; /* option flags kind */
- unsigned int wd_failover_id; /* watchdog ID for this failover operation */
int node_id[MAX_NUM_BACKENDS]; /* request node id */
int count; /* request node ids count */
}POOL_REQUEST_NODE;
@@ -520,8 +523,10 @@ extern char remote_port[]; /* client port */
/*
* public functions
*/
+extern void register_watchdog_quorum_change_interupt(void);
extern void register_watchdog_state_change_interupt(void);
-extern bool register_node_operation_request(POOL_REQUEST_KIND kind, int* node_id_set, int count, bool switch_over, unsigned int wd_failover_id);
+extern void register_backend_state_sync_req_interupt(void);
+extern bool register_node_operation_request(POOL_REQUEST_KIND kind, int* node_id_set, int count, unsigned char flags);
extern char *get_config_file_name(void);
extern char *get_hba_file_name(void);
extern void do_child(int *fds);
@@ -551,11 +556,11 @@ extern POOL_STATUS ErrorResponse(POOL_CONNECTION *frontend,
extern void NoticeResponse(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend);
-extern void notice_backend_error(int node_id, bool switch_over);
-extern bool degenerate_backend_set(int *node_id_set, int count, bool switch_over, unsigned int wd_failover_id);
-extern bool degenerate_backend_set_ex(int *node_id_set, int count, bool error, bool test_only, bool switch_over, unsigned int wd_failover_id);
-extern bool promote_backend(int node_id, unsigned int wd_failover_id);
-extern bool send_failback_request(int node_id, bool throw_error, unsigned int wd_failover_id);
+extern void notice_backend_error(int node_id, unsigned char flags);
+extern bool degenerate_backend_set(int *node_id_set, int count, unsigned char flags);
+extern bool degenerate_backend_set_ex(int *node_id_set, int count, unsigned char flags, bool error, bool test_only);
+extern bool promote_backend(int node_id, unsigned char flags);
+extern bool send_failback_request(int node_id, bool throw_error, unsigned char flags);
extern void pool_set_timeout(int timeoutval);
diff --git a/src/include/pool_config.h b/src/include/pool_config.h
index aeb63082..dbbe2043 100644
--- a/src/include/pool_config.h
+++ b/src/include/pool_config.h
@@ -300,6 +300,11 @@ typedef struct {
* add for watchdog
*/
bool use_watchdog; /* Enables watchdog */
+ bool failover_when_quorum_exists; /* Failover only when cluster has the quorum */
+ bool failover_require_consensus; /* Only do failover when majority aggrees */
+ bool enable_multiple_failover_requests_from_node; /* A Pgpool-II node can send multiple
+ * failover requests to build consensus
+ */
WdLifeCheckMethod wd_lifecheck_method; /* method of lifecheck. 'heartbeat' or 'query' */
bool clear_memqcache_on_escalation; /* Clear query cache on shmem when escalating ?*/
char *wd_escalation_command; /* Executes this command at escalation on new active pgpool.*/
diff --git a/src/include/watchdog/wd_ipc_commands.h b/src/include/watchdog/wd_ipc_commands.h
index 723f9e96..dbcaa4a9 100644
--- a/src/include/watchdog/wd_ipc_commands.h
+++ b/src/include/watchdog/wd_ipc_commands.h
@@ -70,13 +70,14 @@ extern bool get_watchdog_node_escalation_state(void);
extern WdCommandResult wd_start_recovery(void);
extern WdCommandResult wd_end_recovery(void);
-extern WDFailoverCMDResults wd_send_failback_request(int node_id, unsigned int *wd_failover_id);
-extern WDFailoverCMDResults wd_degenerate_backend_set(int *node_id_set, int count, unsigned int *wd_failover_id);
-extern WDFailoverCMDResults wd_promote_backend(int node_id, unsigned int *wd_failover_id);
+extern WDFailoverCMDResults wd_send_failback_request(int node_id, unsigned char flags);
+extern WDFailoverCMDResults wd_degenerate_backend_set(int *node_id_set, int count, unsigned char flags);
+extern WDFailoverCMDResults wd_promote_backend(int node_id, unsigned char flags);
extern WDPGBackendStatus* get_pg_backend_status_from_master_wd_node(void);
extern WDGenericData *get_wd_runtime_variable_value(char *varName);
extern WD_STATES get_watchdog_local_node_state(void);
+extern int get_watchdog_quorum_state(void);
extern char* wd_get_watchdog_nodes(int nodeID);
@@ -84,11 +85,8 @@ extern WDIPCCmdResult* issue_command_to_watchdog(char type, int timeout_sec, cha
/* functions for failover commands interlocking */
-extern WDFailoverCMDResults wd_end_failover_interlocking(unsigned int wd_failover_id);
-extern WDFailoverCMDResults wd_start_failover_interlocking(unsigned int wd_failover_id);
-extern WDFailoverCMDResults wd_failover_lock_release(enum WDFailoverLocks lock, unsigned int wd_failover_id);
-extern WDFailoverCMDResults wd_failover_lock_status(enum WDFailoverLocks lock, unsigned int wd_failover_id);
-extern void wd_wait_until_command_complete_or_timeout(enum WDFailoverLocks lock, unsigned int wd_failover_id);
+extern WDFailoverCMDResults wd_failover_end(void);
+extern WDFailoverCMDResults wd_failover_start(void);
diff --git a/src/include/watchdog/wd_ipc_defines.h b/src/include/watchdog/wd_ipc_defines.h
index 0dc648b5..846344f2 100644
--- a/src/include/watchdog/wd_ipc_defines.h
+++ b/src/include/watchdog/wd_ipc_defines.h
@@ -53,11 +53,13 @@ typedef enum WDFailoverCMDResults
* standby node is advanced in the procedure
*/
FAILOVER_RES_PROCEED,
+ FAILOVER_RES_NO_QUORUM,
FAILOVER_RES_WILL_BE_DONE,
FAILOVER_RES_NOT_ALLOWED,
FAILOVER_RES_INVALID_FUNCTION,
FAILOVER_RES_ALREADY_ISSUED,
FAILOVER_RES_MASTER_REJECTED,
+ FAILOVER_RES_BUILDING_CONSENSUS,
FAILOVER_RES_TIMEOUT
}WDFailoverCMDResults;
@@ -84,6 +86,7 @@ typedef enum WDValueDataType
#define WD_FAILOVER_LOCKING_REQUEST 's'
#define WD_GET_MASTER_DATA_REQUEST 'd'
#define WD_GET_RUNTIME_VARIABLE_VALUE 'v'
+#define WD_FAILOVER_INDICATION 'i'
#define WD_FUNCTION_START_RECOVERY "START_RECOVERY"
#define WD_FUNCTION_END_RECOVERY "END_RECOVERY"
diff --git a/src/include/watchdog/wd_json_data.h b/src/include/watchdog/wd_json_data.h
index 1d6a04aa..648aad35 100644
--- a/src/include/watchdog/wd_json_data.h
+++ b/src/include/watchdog/wd_json_data.h
@@ -70,8 +70,8 @@ extern bool parse_beacon_message_json(char* json_data, int data_len, int* state,
bool* escalated);
extern char* get_beacon_message_json(WatchdogNode* wdNode);
-extern char* get_wd_node_function_json(char* func_name, int *node_id_set, int count, unsigned int sharedKey, char* authKey);
-extern bool parse_wd_node_function_json(char* json_data, int data_len, char** func_name, int **node_id_set, int *count);
+extern char* get_wd_node_function_json(char* func_name, int *node_id_set, int count, unsigned char flags, unsigned int sharedKey, char* authKey);
+extern bool parse_wd_node_function_json(char* json_data, int data_len, char** func_name, int **node_id_set, int *count, unsigned char *flags);
extern char* get_wd_simple_message_json(char* message);
extern WDPGBackendStatus* get_pg_backend_node_status_from_json(char* json_data, int data_len);
diff --git a/src/main/health_check.c b/src/main/health_check.c
index 14468f24..b7556593 100644
--- a/src/main/health_check.c
+++ b/src/main/health_check.c
@@ -186,7 +186,7 @@ void do_health_check_child(int *node_id)
/* trigger failover */
partial = health_check_timer_expired?false:true;
- degenerate_backend_set(node_id, 1, partial, 0);
+ degenerate_backend_set(node_id, 1, partial?REQ_DETAIL_SWITCHOVER:0);
}
}
diff --git a/src/main/pgpool_main.c b/src/main/pgpool_main.c
index 8b45c9e2..d57f4a8f 100644
--- a/src/main/pgpool_main.c
+++ b/src/main/pgpool_main.c
@@ -69,6 +69,8 @@ typedef enum
{
SIG_FAILOVER_INTERRUPT, /* signal main to start failover */
SIG_WATCHDOG_STATE_CHANGED, /* notify main about local watchdog node state changed */
+ SIG_BACKEND_SYNC_REQUIRED, /* notify main about local backend state sync required */
+ SIG_WATCHDOG_QUORUM_CHANGED,/* notify main about cluster quorum change of watchdog cluster */
MAX_INTERUPTS /* Must be last! */
} User1SignalReason;
@@ -142,6 +144,7 @@ static void terminate_all_childrens();
static void system_will_go_down(int code, Datum arg);
static char* process_name_from_pid(pid_t pid);
static void sync_backend_from_watchdog(void);
+static void update_backend_quarantine_status(void);
static struct sockaddr_un un_addr; /* unix domain socket path */
static struct sockaddr_un pcp_un_addr; /* unix domain socket path for PCP */
@@ -459,12 +462,11 @@ int PgpoolMain(bool discard_status, bool clear_memcache_oidmaps)
* This function enqueues the failover/failback requests, and fires the failover() if the function
* is not already executing
*/
-bool register_node_operation_request(POOL_REQUEST_KIND kind, int* node_id_set, int count, bool switch_over , unsigned int wd_failover_id)
+bool register_node_operation_request(POOL_REQUEST_KIND kind, int* node_id_set, int count, unsigned char flags)
{
bool failover_in_progress;
pool_sigset_t oldmask;
int index;
- unsigned char request_details = 0;
/*
* if the queue is already full
@@ -485,12 +487,8 @@ bool register_node_operation_request(POOL_REQUEST_KIND kind, int* node_id_set, i
Req_info->request_queue_tail++;
index = Req_info->request_queue_tail % MAX_REQUEST_QUEUE_SIZE;
Req_info->request[index].kind = kind;
- Req_info->request[index].wd_failover_id = wd_failover_id;
- /* Set switch over flag if requested */
- if (switch_over)
- request_details |= REQ_DETAIL_SWITCHOVER;
- Req_info->request[index].request_details = request_details;
+ Req_info->request[index].request_details = flags;
if(count > 0)
memcpy(Req_info->request[index].node_id, node_id_set, (sizeof(int) * count));
@@ -501,16 +499,29 @@ bool register_node_operation_request(POOL_REQUEST_KIND kind, int* node_id_set, i
POOL_SETMASK(&oldmask);
if(failover_in_progress == false)
{
- signal_user1_to_parent_with_reason(SIG_FAILOVER_INTERRUPT);
+ if(processType == PT_MAIN)
+ failover();
+ else
+ signal_user1_to_parent_with_reason(SIG_FAILOVER_INTERRUPT);
}
return true;
}
+void register_watchdog_quorum_change_interupt(void)
+{
+ signal_user1_to_parent_with_reason(SIG_WATCHDOG_QUORUM_CHANGED);
+}
+
void register_watchdog_state_change_interupt(void)
{
signal_user1_to_parent_with_reason(SIG_WATCHDOG_STATE_CHANGED);
}
+void register_backend_state_sync_req_interupt(void)
+{
+ signal_user1_to_parent_with_reason(SIG_BACKEND_SYNC_REQUIRED);
+}
+
static void signal_user1_to_parent_with_reason(User1SignalReason reason)
{
user1SignalSlot->signalFlags[reason] = true;
@@ -956,7 +967,7 @@ static void terminate_all_childrens()
* Reuest failover. If "switch_over" is false, request all existing sessions
* restarting.
*/
-void notice_backend_error(int node_id, bool switch_over)
+void notice_backend_error(int node_id, unsigned char flags)
{
int n = node_id;
@@ -967,7 +978,7 @@ void notice_backend_error(int node_id, bool switch_over)
}
else
{
- degenerate_backend_set(&n, 1, switch_over, 0);
+ degenerate_backend_set(&n, 1, flags);
}
}
@@ -990,8 +1001,7 @@ void notice_backend_error(int node_id, bool switch_over)
*
* wd_failover_id: The watchdog internal ID for this failover
*/
-bool degenerate_backend_set_ex(int *node_id_set, int count, bool error, bool test_only,
- bool switch_over, unsigned int wd_failover_id)
+bool degenerate_backend_set_ex(int *node_id_set, int count, unsigned char flags, bool error, bool test_only)
{
int i;
int node_id[MAX_NUM_BACKENDS];
@@ -1046,12 +1056,12 @@ bool degenerate_backend_set_ex(int *node_id_set, int count, bool error, bool tes
if(test_only)
return true;
- if (pool_config->use_watchdog && wd_failover_id == 0)
+ if (!(flags & REQ_DETAIL_WATCHDOG))
{
int x;
for (x=0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION; x++)
{
- res = wd_degenerate_backend_set(node_id_set, count, &wd_failover_id);
+ res = wd_degenerate_backend_set(node_id_set, count, flags);
if (res != FAILOVER_RES_TRANSITION)
break;
sleep(1);
@@ -1069,10 +1079,26 @@ bool degenerate_backend_set_ex(int *node_id_set, int count, bool error, bool tes
}
if (res == FAILOVER_RES_PROCEED)
{
- register_node_operation_request(NODE_DOWN_REQUEST, node_id, node_count, switch_over, wd_failover_id);
+ register_node_operation_request(NODE_DOWN_REQUEST, node_id, node_count, flags);
+ }
+ else if (res == FAILOVER_RES_NO_QUORUM)
+ {
+ ereport(LOG,
+ (errmsg("degenerate backend request for %d node(s) from pid [%d], is changed to quarantine node request by watchdog"
+ , node_count, getpid()),
+ errdetail("watchdog does not holds the quorum")));
+
+ register_node_operation_request(NODE_QUARANTINE_REQUEST, node_id, node_count, flags);
+ }
+ else if (res == FAILOVER_RES_BUILDING_CONSENSUS)
+ {
+ ereport(LOG,
+ (errmsg("degenerate backend request for node_id: %d from pid [%d], will be handled by watchdog, which is building consensus for request"
+ ,*node_id, getpid())));
}
else if (res == FAILOVER_RES_WILL_BE_DONE)
{
+ /* we will receive a sync request from master watchdog node */
ereport(LOG,
(errmsg("degenerate backend request for %d node(s) from pid [%d], will be handled by watchdog"
, node_count, getpid())));
@@ -1092,13 +1118,13 @@ bool degenerate_backend_set_ex(int *node_id_set, int count, bool error, bool tes
* wrapper over degenerate_backend_set_ex function to register
* NODE down operation request
*/
-bool degenerate_backend_set(int *node_id_set, int count, bool switch_over, unsigned int wd_failover_id)
+bool degenerate_backend_set(int *node_id_set, int count, unsigned char flags)
{
- return degenerate_backend_set_ex(node_id_set, count, false, false, switch_over, wd_failover_id);
+ return degenerate_backend_set_ex(node_id_set, count, flags, false, false);
}
/* send promote node request using SIGUSR1 */
-bool promote_backend(int node_id, unsigned int wd_failover_id)
+bool promote_backend(int node_id, unsigned char flags)
{
WDFailoverCMDResults res = FAILOVER_RES_PROCEED;
bool ret = false;
@@ -1125,12 +1151,12 @@ bool promote_backend(int node_id, unsigned int wd_failover_id)
node_id, getpid())));
/* If this was only a test. Inform the caller without doing anything */
- if (pool_config->use_watchdog && wd_failover_id == 0)
+ if (!(flags & REQ_DETAIL_WATCHDOG))
{
int x;
for (x=0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION; x++)
{
- res = wd_promote_backend(node_id, &wd_failover_id);
+ res = wd_promote_backend(node_id, flags);
if (res != FAILOVER_RES_TRANSITION)
break;
sleep(1);
@@ -1149,7 +1175,7 @@ bool promote_backend(int node_id, unsigned int wd_failover_id)
if (res == FAILOVER_RES_PROCEED)
{
- ret = register_node_operation_request(PROMOTE_NODE_REQUEST, &node_id, 1, false, wd_failover_id);
+ ret = register_node_operation_request(PROMOTE_NODE_REQUEST, &node_id, 1, flags);
}
else if (res == FAILOVER_RES_WILL_BE_DONE)
{
@@ -1157,6 +1183,18 @@ bool promote_backend(int node_id, unsigned int wd_failover_id)
(errmsg("promote backend request for node_id: %d from pid [%d], will be handled by watchdog"
, node_id, getpid())));
}
+ else if (res == FAILOVER_RES_NO_QUORUM)
+ {
+ ereport(LOG,
+ (errmsg("promote backend request for node_id: %d from pid [%d], is canceled because watchdog does not hold quorum"
+ , node_id, getpid())));
+ }
+ else if (res == FAILOVER_RES_BUILDING_CONSENSUS)
+ {
+ ereport(LOG,
+ (errmsg("promote backend request for node_id: %d from pid [%d], will be handled by watchdog, which is building consensus for request"
+ , node_id, getpid())));
+ }
else
{
ereport(LOG,
@@ -1167,7 +1205,7 @@ bool promote_backend(int node_id, unsigned int wd_failover_id)
}
/* send failback request using SIGUSR1 */
-bool send_failback_request(int node_id,bool throw_error, unsigned int wd_failover_id)
+bool send_failback_request(int node_id,bool throw_error, unsigned char flags)
{
WDFailoverCMDResults res = FAILOVER_RES_PROCEED;
bool ret = false;
@@ -1187,16 +1225,16 @@ bool send_failback_request(int node_id,bool throw_error, unsigned int wd_failove
}
ereport(LOG,
- (errmsg("received failback request for node_id: %d from pid [%d] wd_failover_id [%d]",
- node_id, getpid(),wd_failover_id)));
+ (errmsg("received failback request for node_id: %d from pid [%d]",
+ node_id, getpid())));
/* If this was only a test. Inform the caller without doing anything */
- if (pool_config->use_watchdog && wd_failover_id == 0)
+ if (!(flags & REQ_DETAIL_WATCHDOG))
{
int x;
for (x=0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION; x++)
{
- res = wd_send_failback_request(node_id, &wd_failover_id);
+ res = wd_send_failback_request(node_id, flags);
if (res != FAILOVER_RES_TRANSITION)
break;
sleep(1);
@@ -1215,7 +1253,7 @@ bool send_failback_request(int node_id,bool throw_error, unsigned int wd_failove
if (res == FAILOVER_RES_PROCEED)
{
- ret = register_node_operation_request(NODE_UP_REQUEST, &node_id, 1, false, wd_failover_id);
+ ret = register_node_operation_request(NODE_UP_REQUEST, &node_id, 1, flags);
}
else if (res == FAILOVER_RES_WILL_BE_DONE)
{
@@ -1401,10 +1439,40 @@ static void sigusr1_interupt_processor(void)
ereport(DEBUG1,
(errmsg("Pgpool-II parent process received SIGUSR1")));
+ if (user1SignalSlot->signalFlags[SIG_WATCHDOG_QUORUM_CHANGED])
+ {
+ ereport(LOG,
+ (errmsg("Pgpool-II parent process received watchdog quorum change signal from watchdog")));
+
+ user1SignalSlot->signalFlags[SIG_WATCHDOG_QUORUM_CHANGED] = false;
+ if (get_watchdog_quorum_state() >= 0)
+ {
+ ereport(LOG,
+ (errmsg("watchdog cluster now holds the quorum"),
+ errdetail("updating the state of quarantine backend nodes")));
+ update_backend_quarantine_status();
+ }
+ }
+
+ if (user1SignalSlot->signalFlags[SIG_BACKEND_SYNC_REQUIRED])
+ {
+ ereport(LOG,
+ (errmsg("Pgpool-II parent process received sync backend signal from watchdog")));
+
+ user1SignalSlot->signalFlags[SIG_BACKEND_SYNC_REQUIRED] = false;
+ if (get_watchdog_local_node_state() == WD_STANDBY)
+ {
+ ereport(LOG,
+ (errmsg("master watchdog has performed failover"),
+ errdetail("syncing the backend states from the MASTER watchdog node")));
+ sync_backend_from_watchdog();
+ }
+ }
+
if (user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED])
{
ereport(DEBUG1,
- (errmsg("Pgpool-II parent process received SIGUSR1 from watchdog")));
+ (errmsg("Pgpool-II parent process received watchdog state change signal from watchdog")));
user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED] = false;
if (get_watchdog_local_node_state() == WD_STANDBY)
@@ -1468,6 +1536,7 @@ static void failover(void)
int sts;
bool need_to_restart_pcp = false;
bool all_backend_down = true;
+ bool sync_required = false;
ereport(DEBUG1,
(errmsg("failover handler called")));
@@ -1517,8 +1586,6 @@ static void failover(void)
int node_id_set[MAX_NUM_BACKENDS];
int node_count;
unsigned char request_details;
- unsigned int wd_failover_id;
- WDFailoverCMDResults wdInterlockingRes;
pool_semaphore_lock(REQUEST_INFO_SEM);
@@ -1537,7 +1604,6 @@ static void failover(void)
reqkind = Req_info->request[queue_index].kind;
request_details = Req_info->request[queue_index].request_details;
node_count = Req_info->request[queue_index].count;
- wd_failover_id = Req_info->request[queue_index].wd_failover_id;
pool_semaphore_unlock(REQUEST_INFO_SEM);
ereport(DEBUG1,
@@ -1550,8 +1616,8 @@ static void failover(void)
continue;
}
- /* start watchdog interlocking */
- wdInterlockingRes = wd_start_failover_interlocking(wd_failover_id);
+ /* inform all remote watchdog nodes that we are starting the failover */
+ wd_failover_start();
/*
* if not in replication mode/master slave mode, we treat this a restart request.
@@ -1577,9 +1643,6 @@ static void failover(void)
ereport(LOG,
(errmsg("invalid failback request, status: [%d] of node id : %d is invalid for failback",BACKEND_INFO(node_id).backend_status,node_id)));
- if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
- wd_end_failover_interlocking(wd_failover_id);
-
continue;
}
@@ -1592,24 +1655,18 @@ static void failover(void)
all_backend_down = check_all_backend_down();
BACKEND_INFO(node_id).backend_status = CON_CONNECT_WAIT; /* unset down status */
- (void)write_status_file();
-
- /* Aquire failback start command lock */
- if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
+ if (!(request_details & REQ_DETAIL_UPDATE))
{
+ /* The request is a proper failbak request
+ * and not because of the update status of quarantined node
+ */
+ (void)write_status_file();
+
trigger_failover_command(node_id, pool_config->failback_command,
MASTER_NODE_ID, get_next_master_node(), PRIMARY_NODE_ID);
- wd_failover_lock_release(FAILBACK_LOCK, wd_failover_id);
- }
- else
- {
- /*
- * Okay we are not allowed to execute the failover command
- * so we need to wait till the one who is executing the command
- * finish with it.
- */
- wd_wait_until_command_complete_or_timeout(FAILBACK_LOCK,wd_failover_id);
}
+
+ sync_required = true;
}
else if (reqkind == PROMOTE_NODE_REQUEST)
{
@@ -1624,12 +1681,10 @@ static void failover(void)
{
ereport(LOG,
(errmsg("failover: no backends are promoted")));
- if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
- wd_end_failover_interlocking(wd_failover_id);
continue;
}
}
- else /* NODE_DOWN_REQUEST */
+ else /* NODE_DOWN_REQUEST && NODE_QUARANTINE_REQUEST*/
{
int cnt = 0;
@@ -1640,12 +1695,17 @@ static void failover(void)
VALID_BACKEND(node_id_set[i])))
{
ereport(LOG,
- (errmsg("starting degeneration. shutdown host %s(%d)",
+ (errmsg("starting %s. shutdown host %s(%d)",
+ (reqkind == NODE_QUARANTINE_REQUEST)?"quarantine":"degeneration",
BACKEND_INFO(node_id_set[i]).backend_hostname,
BACKEND_INFO(node_id_set[i]).backend_port)));
BACKEND_INFO(node_id_set[i]).backend_status = CON_DOWN; /* set down status */
- (void)write_status_file();
+
+ if (reqkind == NODE_QUARANTINE_REQUEST)
+ BACKEND_INFO(node_id_set[i]).quarantine = true;
+ else
+ (void)write_status_file();
/* save down node */
nodes[node_id_set[i]] = 1;
@@ -1657,10 +1717,6 @@ static void failover(void)
{
ereport(LOG,
(errmsg("failover: no backends are degenerated")));
-
- if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
- wd_end_failover_interlocking(wd_failover_id);
-
continue;
}
}
@@ -1710,7 +1766,7 @@ static void failover(void)
* NODE_DOWN_REQUEST and it's actually a switch over request, we don't
* need to restart all children, except the node is primary.
*/
- else if (STREAM && reqkind == NODE_DOWN_REQUEST &&
+ else if (STREAM && (reqkind == NODE_DOWN_REQUEST || reqkind == NODE_QUARANTINE_REQUEST) &&
request_details & REQ_DETAIL_SWITCHOVER && node_id != PRIMARY_NODE_ID)
{
ereport(LOG,
@@ -1776,36 +1832,36 @@ static void failover(void)
need_to_restart_children = true;
partial_restart = false;
}
- if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
+ /* Exec failover_command if needed
+ * We do not execute failover when request is quarantine type
+ */
+ if (reqkind == NODE_DOWN_REQUEST)
{
- /* Exec failover_command if needed */
for (i = 0; i < pool_config->backend_desc->num_backends; i++)
{
if (nodes[i])
+ {
trigger_failover_command(i, pool_config->failover_command,
MASTER_NODE_ID, new_master, PRIMARY_NODE_ID);
+ sync_required = true;
+ }
}
- wd_failover_lock_release(FAILOVER_LOCK, wd_failover_id);
- }
- else
- {
- wd_wait_until_command_complete_or_timeout(FAILOVER_LOCK, wd_failover_id);
}
- /* no need to wait since it will be done in reap_handler */
-#ifdef NOT_USED
- while (wait(NULL) > 0)
- ;
-
- if (errno != ECHILD)
- ereport(LOG,
- (errmsg("failover_handler: wait() failed. reason:%s", strerror(errno))));
-
-#endif
-
if (reqkind == PROMOTE_NODE_REQUEST && VALID_BACKEND(node_id))
+ {
new_primary = node_id;
-
+ }
+ else if (reqkind == NODE_QUARANTINE_REQUEST)
+ {
+ /* if the quarantine node was the primary node
+ * set the newprimary to -1 (invalid)
+ */
+ if (Req_info->primary_node_id == node_id)
+ new_primary = -1;
+ else
+ new_primary = find_primary_node_repeatedly();
+ }
/*
* If the down node was a standby node in streaming replication
* mode, we can avoid calling find_primary_node_repeatedly() and
@@ -1821,8 +1877,9 @@ static void failover(void)
new_primary = find_primary_node_repeatedly();
}
else
+ {
new_primary = find_primary_node_repeatedly();
-
+ }
/*
* If follow_master_command is provided and in master/slave
* streaming replication mode, we start degenerating all backends
@@ -1874,22 +1931,10 @@ static void failover(void)
}
}
- /*
- * follow master command also uses the same locks used by trigring command
- */
- if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
+ if ((follow_cnt > 0) && (*pool_config->follow_master_command != '\0'))
{
- if ((follow_cnt > 0) && (*pool_config->follow_master_command != '\0'))
- {
- follow_pid = fork_follow_child(Req_info->master_node_id, new_primary,
- Req_info->primary_node_id);
- }
- wd_failover_lock_release(FOLLOW_MASTER_LOCK, wd_failover_id);
- }
- else
- {
- wd_wait_until_command_complete_or_timeout(FOLLOW_MASTER_LOCK, wd_failover_id);
-
+ follow_pid = fork_follow_child(Req_info->master_node_id, new_primary,
+ Req_info->primary_node_id);
}
/* Save primary node id */
@@ -1900,6 +1945,7 @@ static void failover(void)
if (new_master >= 0)
{
Req_info->master_node_id = new_master;
+ sync_required = true;
ereport(LOG,
(errmsg("failover: set new master node: %d", Req_info->master_node_id)));
}
@@ -1977,8 +2023,8 @@ static void failover(void)
*/
kill(worker_pid, SIGUSR1);
- if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
- wd_end_failover_interlocking(wd_failover_id);
+ if ( sync_required)
+ wd_failover_end();
if (reqkind == NODE_UP_REQUEST)
{
@@ -1998,12 +2044,14 @@ static void failover(void)
else
{
/* Temporary black magic. Without this regression 055 does not finish */
- fprintf(stderr, "failover done. shutdown host %s(%d)",
+ fprintf(stderr, "%s done. shutdown host %s(%d)",
+ (reqkind == NODE_DOWN_REQUEST)?"failover":"quarantine",
BACKEND_INFO(node_id).backend_hostname,
BACKEND_INFO(node_id).backend_port);
ereport(LOG,
- (errmsg("failover done. shutdown host %s(%d)",
+ (errmsg("%s done. shutdown host %s(%d)",
+ (reqkind == NODE_DOWN_REQUEST)?"failover":"quarantine",
BACKEND_INFO(node_id).backend_hostname,
BACKEND_INFO(node_id).backend_port)));
}
@@ -3396,6 +3444,29 @@ int pool_frontend_exists(void)
return -1;
}
+static void update_backend_quarantine_status(void)
+{
+ /* Reset the quarantine flag from each backend and
+ * set it to con_wait
+ */
+ int i;
+ WD_STATES wd_state = get_watchdog_local_node_state();
+
+ for (i=0;i<NUM_BACKENDS;i++)
+ {
+ if (BACKEND_INFO(i).quarantine && BACKEND_INFO(i).backend_status == CON_DOWN)
+ {
+ BACKEND_INFO(i).quarantine = false;
+ /* send the failback request for the node
+ * we also set the watchdog flag because we we eventually send the sync
+ * message to all standby nodes
+ */
+ if (wd_state == WD_COORDINATOR)
+ send_failback_request(i,false, REQ_DETAIL_UPDATE | REQ_DETAIL_WATCHDOG);
+ }
+ }
+}
+
/*
* The function fetch the current status of all configured backend
* nodes from the MASTER/COORDINATOR watchdog Pgpool-II and synchronize the
diff --git a/src/pcp_con/pcp_worker.c b/src/pcp_con/pcp_worker.c
index 54219a72..658b461e 100644
--- a/src/pcp_con/pcp_worker.c
+++ b/src/pcp_con/pcp_worker.c
@@ -502,14 +502,14 @@ static int pool_detach_node(int node_id, bool gracefully)
{
if (!gracefully)
{
- degenerate_backend_set_ex(&node_id, 1, true, false, true, 0);
+ degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, true, false);
return 0;
}
/* Check if the NODE DOWN can be executed on
* the given node id.
*/
- degenerate_backend_set_ex(&node_id, 1, true, true, true, 0);
+ degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, true, true);
/*
* Wait until all frontends exit
@@ -529,7 +529,7 @@ static int pool_detach_node(int node_id, bool gracefully)
/*
* Now all frontends have gone. Let's do failover.
*/
- degenerate_backend_set_ex(&node_id, 1, true, false, true, 0);
+ degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, false, true);
/*
* Wait for failover completed.
@@ -556,7 +556,7 @@ static int pool_promote_node(int node_id, bool gracefully)
{
if (!gracefully)
{
- promote_backend(node_id, false); /* send promote request */
+ promote_backend(node_id, REQ_DETAIL_CONFIRMED); /* send promote request */
return 0;
}
@@ -576,7 +576,7 @@ static int pool_promote_node(int node_id, bool gracefully)
/*
* Now all frontends have gone. Let's do failover.
*/
- promote_backend(node_id, false); /* send promote request */
+ promote_backend(node_id, REQ_DETAIL_CONFIRMED); /* send promote request */
/*
* Wait for failover completed.
@@ -910,7 +910,7 @@ process_attach_node(PCP_CONNECTION *frontend,char *buf)
(errmsg("PCP: processing attach node"),
errdetail("attaching Node ID %d", node_id)));
- send_failback_request(node_id,true, false);
+ send_failback_request(node_id,true, REQ_DETAIL_CONFIRMED);
pcp_write(frontend, "c", 1);
wsize = htonl(sizeof(code) + sizeof(int));
diff --git a/src/pcp_con/recovery.c b/src/pcp_con/recovery.c
index f2f57f56..ead5eeaf 100644
--- a/src/pcp_con/recovery.c
+++ b/src/pcp_con/recovery.c
@@ -147,7 +147,7 @@ void start_recovery(int recovery_node)
pcp_worker_wakeup_request = 0;
/* send failback request to pgpool parent */
- send_failback_request(recovery_node,false, false);
+ send_failback_request(recovery_node,false, REQ_DETAIL_CONFIRMED);
/* wait for failback */
failback_wait_count = 0;
diff --git a/src/protocol/pool_connection_pool.c b/src/protocol/pool_connection_pool.c
index 2bbcae24..a84134cd 100644
--- a/src/protocol/pool_connection_pool.c
+++ b/src/protocol/pool_connection_pool.c
@@ -854,7 +854,7 @@ static POOL_CONNECTION_POOL *new_connection(POOL_CONNECTION_POOL *p)
*/
if (pool_config->fail_over_on_backend_error)
{
- notice_backend_error(i, true);
+ notice_backend_error(i, REQ_DETAIL_SWITCHOVER);
ereport(FATAL,
(errmsg("failed to create a backend connection"),
errdetail("executing failover on backend")));
diff --git a/src/protocol/pool_process_query.c b/src/protocol/pool_process_query.c
index 9631a5fb..473c1988 100644
--- a/src/protocol/pool_process_query.c
+++ b/src/protocol/pool_process_query.c
@@ -3595,7 +3595,7 @@ void read_kind_from_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *bac
if (pool_config->replication_stop_on_mismatch)
{
- degenerate_backend_set(degenerate_node, degenerate_node_num, false, 0);
+ degenerate_backend_set(degenerate_node, degenerate_node_num, REQ_DETAIL_CONFIRMED);
retcode = 1;
}
ereport(FATAL,
@@ -4673,7 +4673,7 @@ pool_config->client_idle_limit)));
}
else
{
- notice_backend_error(i, true);
+ notice_backend_error(i, REQ_DETAIL_SWITCHOVER);
sleep(5);
}
break;
diff --git a/src/protocol/pool_proto_modules.c b/src/protocol/pool_proto_modules.c
index 19166832..a4bfe13e 100644
--- a/src/protocol/pool_proto_modules.c
+++ b/src/protocol/pool_proto_modules.c
@@ -504,7 +504,7 @@ POOL_STATUS SimpleQuery(POOL_CONNECTION *frontend,
ereport(DEBUG1,
(errmsg("Query: sending SIGUSR1 signal to parent")));
- register_node_operation_request(CLOSE_IDLE_REQUEST, NULL, 0, false, 0);
+ register_node_operation_request(CLOSE_IDLE_REQUEST, NULL, 0, 0);
/* we need to loop over here since we will get USR1 signal while sleeping */
while (stime > 0)
@@ -1741,7 +1741,7 @@ POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend,
free_string(msg);
- degenerate_backend_set(victim_nodes, number_of_nodes, true, 0);
+ degenerate_backend_set(victim_nodes, number_of_nodes, REQ_DETAIL_CONFIRMED|REQ_DETAIL_SWITCHOVER);
child_exit(POOL_EXIT_AND_RESTART);
}
else
diff --git a/src/sample/pgpool.conf.sample b/src/sample/pgpool.conf.sample
index 5c5c360d..c48d5878 100644
--- a/src/sample/pgpool.conf.sample
+++ b/src/sample/pgpool.conf.sample
@@ -551,6 +551,20 @@ wd_de_escalation_command = ''
# Executes this command when master pgpool resigns from being master.
# (change requires restart)
+# - Watchdog consensus settings for failover -
+
+failover_when_quorum_exists = true
+ # Only perform backend node failover
+ # when the watchdog cluster holds the quorum
+
+failover_require_consensus = true
+ # Perform failover when majority of Pgpool-II nodes
+ # aggrees on the backend node status change
+
+enable_multiple_failover_requests_from_node = false
+ # A Pgpool-II node can cast multiple votes
+ # for building the consensus on failover
+
# - Lifecheck Setting -
# -- common --
diff --git a/src/sample/pgpool.conf.sample-logical b/src/sample/pgpool.conf.sample-logical
index 8e8fa391..8be64bca 100644
--- a/src/sample/pgpool.conf.sample-logical
+++ b/src/sample/pgpool.conf.sample-logical
@@ -549,6 +549,19 @@ wd_escalation_command = ''
wd_de_escalation_command = ''
# Executes this command when master pgpool resigns from being master.
# (change requires restart)
+# - Watchdog consensus settings for failover -
+
+failover_when_quorum_exists = true
+ # Only perform backend node failover
+ # when the watchdog cluster holds the quorum
+
+failover_require_consensus = true
+ # Perform failover when majority of Pgpool-II nodes
+ # aggrees on the backend node status change
+
+enable_multiple_failover_requests_from_node = false
+ # A Pgpool-II node can cast multiple votes
+ # for building the consensus on failover
# - Lifecheck Setting -
diff --git a/src/sample/pgpool.conf.sample-master-slave b/src/sample/pgpool.conf.sample-master-slave
index 24757d2d..360cba1f 100644
--- a/src/sample/pgpool.conf.sample-master-slave
+++ b/src/sample/pgpool.conf.sample-master-slave
@@ -549,6 +549,21 @@ wd_de_escalation_command = ''
# Executes this command when master pgpool resigns from being master.
# (change requires restart)
+# - Watchdog consensus settings for failover -
+
+failover_when_quorum_exists = true
+ # Only perform backend node failover
+ # when the watchdog cluster holds the quorum
+
+failover_require_consensus = true
+ # Perform failover when majority of Pgpool-II nodes
+ # aggrees on the backend node status change
+
+enable_multiple_failover_requests_from_node = false
+ # A Pgpool-II node can cast multiple votes
+ # for building the consensus on failover
+
+
# - Lifecheck Setting -
# -- common --
diff --git a/src/sample/pgpool.conf.sample-replication b/src/sample/pgpool.conf.sample-replication
index 3318753a..5b37626c 100644
--- a/src/sample/pgpool.conf.sample-replication
+++ b/src/sample/pgpool.conf.sample-replication
@@ -549,6 +549,23 @@ wd_de_escalation_command = ''
# Executes this command when master pgpool resigns from being master.
# (change requires restart)
+# - Watchdog consensus settings for failover -
+
+failover_when_quorum_exists = true
+ # Only perform backend node failover
+ # when the watchdog cluster holds the quorum
+
+failover_require_consensus = true
+ # Perform failover when majority of Pgpool-II nodes
+ # aggrees on the backend node status change
+
+
+enable_multiple_failover_requests_from_node = false
+ # A Pgpool-II node can cast multiple votes
+ # for building the consensus on failover
+
+
+
# - Lifecheck Setting -
# -- common --
diff --git a/src/sample/pgpool.conf.sample-stream b/src/sample/pgpool.conf.sample-stream
index a4effb68..f5832c72 100644
--- a/src/sample/pgpool.conf.sample-stream
+++ b/src/sample/pgpool.conf.sample-stream
@@ -550,6 +550,21 @@ wd_de_escalation_command = ''
# Executes this command when master pgpool resigns from being master.
# (change requires restart)
+# - Watchdog consensus settings for failover -
+
+failover_when_quorum_exists = true
+ # Only perform backend node failover
+ # when the watchdog cluster holds the quorum
+
+failover_require_consensus = true
+ # Perform failover when majority of Pgpool-II nodes
+ # aggrees on the backend node status change
+
+enable_multiple_failover_requests_from_node = false
+ # A Pgpool-II node can cast multiple votes
+ # for building the consensus on failover
+
+
# - Lifecheck Setting -
# -- common --
diff --git a/src/utils/pool_ssl.c b/src/utils/pool_ssl.c
index 32f755f1..1f6e9574 100644
--- a/src/utils/pool_ssl.c
+++ b/src/utils/pool_ssl.c
@@ -350,7 +350,7 @@ void pool_ssl_close(POOL_CONNECTION *cp) { return; }
int pool_ssl_read(POOL_CONNECTION *cp, void *buf, int size) {
ereport(WARNING,
(errmsg("pool_ssl: SSL i/o called but SSL support is not available")));
- notice_backend_error(cp->db_node_id, true);
+ notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
child_exit(POOL_EXIT_AND_RESTART);
return -1; /* never reached */
}
@@ -358,7 +358,7 @@ int pool_ssl_read(POOL_CONNECTION *cp, void *buf, int size) {
int pool_ssl_write(POOL_CONNECTION *cp, const void *buf, int size) {
ereport(WARNING,
(errmsg("pool_ssl: SSL i/o called but SSL support is not available")));
- notice_backend_error(cp->db_node_id, true);
+ notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
child_exit(POOL_EXIT_AND_RESTART);
return -1; /* never reached */
}
diff --git a/src/utils/pool_stream.c b/src/utils/pool_stream.c
index f0f31d31..f586d9bf 100644
--- a/src/utils/pool_stream.c
+++ b/src/utils/pool_stream.c
@@ -218,7 +218,7 @@ int pool_read(POOL_CONNECTION *cp, void *buf, int len)
/* if fail_over_on_backend_error is true, then trigger failover */
if (pool_config->fail_over_on_backend_error)
{
- notice_backend_error(cp->db_node_id, true);
+ notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
/* If we are in the main process, we will not exit */
child_exit(POOL_EXIT_AND_RESTART);
@@ -365,7 +365,7 @@ char *pool_read2(POOL_CONNECTION *cp, int len)
/* if fail_over_on_backend_error is true, then trigger failover */
if (pool_config->fail_over_on_backend_error)
{
- notice_backend_error(cp->db_node_id, true);
+ notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
child_exit(POOL_EXIT_AND_RESTART);
/* we are in main process */
ereport(ERROR,
@@ -590,7 +590,7 @@ int pool_flush(POOL_CONNECTION *cp)
/* if fail_over_on_backend_error is true, then trigger failover */
if (pool_config->fail_over_on_backend_error)
{
- notice_backend_error(cp->db_node_id, true);
+ notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
ereport(LOG,
(errmsg("unable to flush data to backend"),
errdetail("do not failover because I am the main process")));
@@ -645,7 +645,7 @@ int pool_flush_noerror(POOL_CONNECTION *cp)
/* if fail_over_on_backend_erro is true, then trigger failover */
if (pool_config->fail_over_on_backend_error)
{
- notice_backend_error(cp->db_node_id, true);
+ notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
child_exit(POOL_EXIT_AND_RESTART);
ereport(LOG,
(errmsg("unable to flush data to backend"),
@@ -813,7 +813,7 @@ char *pool_read_string(POOL_CONNECTION *cp, int *len, int line)
errdetail("pg_terminate_backend was called on the backend")));
}
- notice_backend_error(cp->db_node_id, true);
+ notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
child_exit(POOL_EXIT_AND_RESTART);
ereport(ERROR,
(errmsg("unable to read data from frontend"),
diff --git a/src/watchdog/watchdog.c b/src/watchdog/watchdog.c
index 093aa636..0e3ddc39 100644
--- a/src/watchdog/watchdog.c
+++ b/src/watchdog/watchdog.c
@@ -58,6 +58,14 @@
#include "watchdog/wd_ipc_commands.h"
#include "parser/stringinfo.h"
+/* These defines enables the consensus building feature
+ * in watchdog for node failover operations
+ * We can also take these to the configure script
+ */
+#define NODE_UP_REQUIRE_CONSENSUS
+#define NODE_DOWN_REQUIRE_CONSENSUS
+#define NODE_PROMOTE_REQUIRE_CONSENSUS
+
typedef enum IPC_CMD_PREOCESS_RES
{
IPC_CMD_COMPLETE,
@@ -108,6 +116,9 @@ typedef enum IPC_CMD_PREOCESS_RES
#define WD_CMD_REPLY_IN_DATA '-'
#define WD_CLUSTER_SERVICE_MESSAGE '#'
+#define WD_FAILOVER_START 'F'
+#define WD_FAILOVER_END 'H'
+
/*Cluster Service Message Types */
#define CLUSTER_QUORUM_LOST 'L'
#define CLUSTER_QUORUM_FOUND 'F'
@@ -150,6 +161,7 @@ packet_types all_packet_types[] = {
{WD_GET_RUNTIME_VARIABLE_VALUE, "GET WD RUNTIME VARIABLE VALUE"},
{WD_CMD_REPLY_IN_DATA, "COMMAND REPLY IN DATA"},
{WD_FAILOVER_LOCKING_REQUEST,"FAILOVER LOCKING REQUEST"},
+ {WD_FAILOVER_INDICATION,"FAILOVER INDICATION"},
{WD_CLUSTER_SERVICE_MESSAGE, "CLUSTER SERVICE MESSAGE"},
{WD_REGISTER_FOR_NOTIFICATION, "REGISTER FOR NOTIFICATION"},
{WD_NODE_STATUS_CHANGE_COMMAND, "NODE STATUS CHANGE"},
@@ -323,6 +335,7 @@ typedef struct wd_cluster
int network_monitor_sock;
bool clusterInitialized;
bool ipc_auth_needed;
+ int current_failover_id;
List *unidentified_socks;
List *notify_clients;
List *ipc_command_socks;
@@ -340,7 +353,8 @@ typedef struct WDFailoverObject
int nodesCount;
unsigned int failoverID;
int *nodeList;
- WatchdogNode* wdRequestingNode;
+ List* requestingNodes;
+ int request_count;
struct timeval startTime;
int state;
}WDFailoverObject;
@@ -354,6 +368,9 @@ static bool remove_failover_object_by_id(unsigned int failoverID);
static void remove_failovers_from_node(WatchdogNode* wdNode);
static void remove_failover_object(WDFailoverObject* failoverObj);
static void service_expired_failovers(void);
+static int add_failover(POOL_REQUEST_KIND reqKind, int *node_id_list, int node_count, WatchdogNode *wdNode);
+static WDFailoverCMDResults compute_failover_consensus(POOL_REQUEST_KIND reqKind,int *node_id_list, int node_count,
+ unsigned char flags, WatchdogNode *wdNode);
static int send_command_packet_to_remote_nodes(WDCommandData* ipcCommand, bool source_included);
static void wd_command_is_complete(WDCommandData* ipcCommand);
@@ -477,10 +494,9 @@ static IPC_CMD_PREOCESS_RES process_IPC_nodeStatusChange_command(WDCommandData*
static IPC_CMD_PREOCESS_RES process_IPC_nodeList_command(WDCommandData* ipcCommand);
static IPC_CMD_PREOCESS_RES process_IPC_get_runtime_variable_value_request(WDCommandData* ipcCommand);
static IPC_CMD_PREOCESS_RES process_IPC_online_recovery(WDCommandData* ipcCommand);
-static IPC_CMD_PREOCESS_RES process_IPC_failover_locking_cmd(WDCommandData *ipcCommand);
+static IPC_CMD_PREOCESS_RES process_IPC_failover_indication(WDCommandData *ipcCommand);
static IPC_CMD_PREOCESS_RES process_IPC_data_request_from_master(WDCommandData *ipcCommand);
static IPC_CMD_PREOCESS_RES process_IPC_failover_command(WDCommandData* ipcCommand);
-static IPC_CMD_PREOCESS_RES process_IPC_failover_command_on_coordinator(WDCommandData* ipcCommand);
static IPC_CMD_PREOCESS_RES process_failover_command_on_coordinator(WDCommandData* ipcCommand);
static bool write_ipc_command_with_result_data(WDCommandData* ipcCommand, char type, char* data, int len);
@@ -492,6 +508,9 @@ static bool process_wd_command_function(WatchdogNode* wdNode, WDPacketData* pkt,
static void process_pgpool_remote_failover_command(WatchdogNode* wdNode, WDPacketData* pkt);
static void process_remote_online_recovery_command(WatchdogNode* wdNode, WDPacketData* pkt);
+static WDFailoverCMDResults failover_end_indication(WDCommandData* ipcCommand);
+static WDFailoverCMDResults failover_start_indication(WDCommandData* ipcCommand);
+
static IPC_CMD_PREOCESS_RES process_failover_locking_requests_on_cordinator(WDCommandData* ipcCommand);
static WDFailoverCMDResults node_is_asking_for_failover_end(WatchdogNode* wdNode, WDPacketData* pkt, unsigned int failoverID);
@@ -1823,8 +1842,8 @@ static IPC_CMD_PREOCESS_RES process_IPC_command(WDCommandData* ipcCommand)
return process_IPC_online_recovery(ipcCommand);
break;
- case WD_FAILOVER_LOCKING_REQUEST:
- return process_IPC_failover_locking_cmd(ipcCommand);
+ case WD_FAILOVER_INDICATION:
+ return process_IPC_failover_indication(ipcCommand);
case WD_GET_MASTER_DATA_REQUEST:
return process_IPC_data_request_from_master(ipcCommand);
@@ -1879,7 +1898,7 @@ static IPC_CMD_PREOCESS_RES process_IPC_get_runtime_variable_value_request(WDCom
else if (strcasecmp(WD_RUNTIME_VAR_QUORUM_STATE, requestVarName) == 0)
{
jw_put_int(jNode, WD_JSON_KEY_VALUE_DATA_TYPE, VALUE_DATA_TYPE_INT);
- jw_put_int(jNode, WD_JSON_KEY_VALUE_DATA, g_cluster.quorum_status);
+ jw_put_int(jNode, WD_JSON_KEY_VALUE_DATA, WD_MASTER_NODE?WD_MASTER_NODE->quorum_status:-2);
}
else if (strcasecmp(WD_RUNTIME_VAR_ESCALATION_STATE, requestVarName) == 0)
{
@@ -2043,8 +2062,9 @@ static WDFailoverObject* get_failover_object_by_id(unsigned int failoverID)
static void remove_failover_object(WDFailoverObject* failoverObj)
{
ereport(DEBUG1,
- (errmsg("removing failover object from \"%s\" with ID:%d", failoverObj->wdRequestingNode->nodeName,failoverObj->failoverID)));
+ (errmsg("removing failover request from %d nodes with ID:%d", failoverObj->request_count,failoverObj->failoverID)));
g_cluster.wdCurrentFailovers = list_delete_ptr(g_cluster.wdCurrentFailovers,failoverObj);
+ list_free(failoverObj->requestingNodes);
pfree(failoverObj->nodeList);
pfree(failoverObj);
}
@@ -2063,28 +2083,29 @@ static bool remove_failover_object_by_id(unsigned int failoverID)
/* if the wdNode is NULL. The function removes all failover objects */
static void remove_failovers_from_node(WatchdogNode* wdNode)
{
- ListCell *lc;
- List *failovers_to_del = NULL;
-
- foreach(lc, g_cluster.wdCurrentFailovers)
- {
- WDFailoverObject* failoverObj = lfirst(lc);
- if (failoverObj)
- {
- if (wdNode == NULL || failoverObj->wdRequestingNode == wdNode)
- {
- failovers_to_del = lappend(failovers_to_del,failoverObj);
- }
- }
- }
-
- /* delete the failover objects */
-
- foreach(lc, failovers_to_del)
- {
- WDFailoverObject* failoverObj = lfirst(lc);
- remove_failover_object(failoverObj);
- }
+ return;
+// ListCell *lc;
+// List *failovers_to_del = NULL;
+
+// foreach(lc, g_cluster.wdCurrentFailovers)
+// {
+// WDFailoverObject* failoverObj = lfirst(lc);
+// if (failoverObj)
+// {
+// if (wdNode == NULL || failoverObj->wdRequestingNode == wdNode)
+// {
+// failovers_to_del = lappend(failovers_to_del,failoverObj);
+// }
+// }
+// }
+//
+// /* delete the failover objects */
+//
+// foreach(lc, failovers_to_del)
+// {
+// WDFailoverObject* failoverObj = lfirst(lc);
+// remove_failover_object(failoverObj);
+// }
}
/* Remove the over stayed failover objects */
@@ -2108,7 +2129,7 @@ static void service_expired_failovers(void)
{
failovers_to_del = lappend(failovers_to_del,failoverObj);
ereport(DEBUG1,
- (errmsg("failover object from \"%s\" with ID:%d is timeout", failoverObj->wdRequestingNode->nodeName,failoverObj->failoverID),
+ (errmsg("failover request from %d nodes with ID:%d is timeout", failoverObj->request_count,failoverObj->failoverID),
errdetail("adding the failover object for removal")));
}
@@ -2204,18 +2225,6 @@ static void process_remote_failover_command_on_coordinator(WatchdogNode* wdNode,
}
}
-static IPC_CMD_PREOCESS_RES process_IPC_failover_command_on_coordinator(WDCommandData* ipcCommand)
-{
- if (get_local_node_state() != WD_COORDINATOR)
- return IPC_CMD_ERROR; /* should never hapen*/
-
- ereport(LOG,
- (errmsg("watchdog received the failover command from local pgpool-II on IPC interface")));
-
- return process_failover_command_on_coordinator(ipcCommand);
-}
-
-
static bool reply_to_failove_command(WDCommandData* ipcCommand, WDFailoverCMDResults cmdResult, unsigned int failoverID)
{
bool ret = false;
@@ -2246,7 +2255,139 @@ static bool reply_to_failove_command(WDCommandData* ipcCommand, WDFailoverCMDRes
}
/*
- * The Function forwards the failover command to all standby nodes.
+ * This function process the failover command and decides
+ * about the execution of failover command.
+ */
+
+static WDFailoverCMDResults compute_failover_consensus(POOL_REQUEST_KIND reqKind,int *node_id_list, int node_count, unsigned char flags, WatchdogNode *wdNode)
+{
+#ifndef NODE_UP_REQUIRE_CONSENSUS
+ if (reqKind == NODE_UP_REQUEST)
+ return FAILOVER_RES_PROCEED;
+#endif
+#ifndef NODE_DOWN_REQUIRE_CONSENSUS
+ if (reqKind == NODE_DOWN_REQUEST)
+ return FAILOVER_RES_PROCEED;
+#endif
+#ifndef NODE_PROMOTE_REQUIRE_CONSENSUS
+ if (reqKind == PROMOTE_NODE_REQUEST)
+ return FAILOVER_RES_PROCEED;
+#endif
+
+ if (pool_config->failover_when_quorum_exists == false)
+ {
+ /* No need for any calculation, We do not need a quorum for failover */
+ ereport(LOG,(
+ errmsg("we do not need quorum to hold to proceed with failover"),
+ errdetail("proceeding with the failover"),
+ errhint("failover_when_quorum_exists is set to false")));
+
+ return FAILOVER_RES_PROCEED;
+ }
+ if (flags & REQ_DETAIL_CONFIRMED)
+ {
+ /* Check the request flags, If it asks to bypass quorum */
+ ereport(LOG,(
+ errmsg("The failover request does not need quorum to hold"),
+ errdetail("proceeding with the failover"),
+ errhint("REQ_DETAIL_CONFIRMED")));
+ return FAILOVER_RES_PROCEED;
+ }
+ update_quorum_status();
+ if (g_cluster.quorum_status < 0)
+ {
+ /* quorum is must and it is not present at the moment */
+ ereport(LOG,(
+ errmsg("failover requires the quorum to hold, which is not present at the moment"),
+ errdetail("Rejecting the failover request")));
+ return FAILOVER_RES_NO_QUORUM;
+ }
+
+ /* So we reached here means quorum is present
+ * Now come to dificult part, enusre the consensus
+ *
+ * Record the failover.
+ */
+ if (pool_config->failover_require_consensus == true)
+ {
+ int cnt = add_failover(reqKind, node_id_list, node_count, wdNode);
+ if (cnt <= get_mimimum_nodes_required_for_quorum())
+ {
+ ereport(LOG,(
+ errmsg("failover requires the majority vote, waiting for consensus"),
+ errdetail("failover request noted")));
+ return FAILOVER_RES_BUILDING_CONSENSUS;
+ }
+ }
+ ereport(LOG,(
+ errmsg("we do not require majority votes to proceed with failover"),
+ errdetail("proceeding with the failover"),
+ errhint("failover_require_consensus is set to false")));
+
+ return FAILOVER_RES_PROCEED;
+}
+
+static int add_failover(POOL_REQUEST_KIND reqKind, int *node_id_list, int node_count, WatchdogNode *wdNode)
+{
+ MemoryContext oldCxt;
+ /* Find the failover */
+ WDFailoverObject *failoverObj = get_failover_object(reqKind, node_count, node_id_list);
+ if (failoverObj)
+ {
+ ListCell *lc;
+ /* search the node if it is a duplicate request */
+ foreach(lc, failoverObj->requestingNodes)
+ {
+ WatchdogNode* reqWdNode = lfirst(lc);
+ if (wdNode == reqWdNode)
+ {
+ /* The failover request is duplicate */
+ if (pool_config->enable_multiple_failover_requests_from_node)
+ {
+ failoverObj->request_count++;
+ ereport(LOG,(
+ errmsg("Duplicate failover request from \"%s\" node",wdNode->nodeName),
+ errdetail("Pgpool-II can send multiple failover requests for same node"),
+ errhint("enable_multiple_failover_requests_from_node is enabled")));
+ }
+ else
+ {
+ ereport(LOG,(
+ errmsg("Duplicate failover request from \"%s\" node",wdNode->nodeName),
+ errdetail("request ignored")));
+ }
+ return failoverObj->request_count;
+ }
+ }
+ }
+ else
+ {
+ oldCxt = MemoryContextSwitchTo(TopMemoryContext);
+ failoverObj = palloc0(sizeof(WDFailoverObject));
+ failoverObj->reqKind = reqKind;
+ failoverObj->requestingNodes = NULL;
+ failoverObj->nodesCount = node_count;
+ failoverObj->request_count = 0;
+ if (node_count > 0)
+ {
+ failoverObj->nodeList = palloc(sizeof(int) * node_count);
+ memcpy(failoverObj->nodeList, node_id_list, sizeof(int) * node_count);
+ }
+ failoverObj->failoverID = get_next_commandID();
+ gettimeofday(&failoverObj->startTime, NULL);
+ g_cluster.wdCurrentFailovers = lappend(g_cluster.wdCurrentFailovers,failoverObj);
+ MemoryContextSwitchTo(oldCxt);
+ }
+
+ failoverObj->request_count++;
+ oldCxt = MemoryContextSwitchTo(TopMemoryContext);
+ failoverObj->requestingNodes = lappend(failoverObj->requestingNodes,wdNode);
+ MemoryContextSwitchTo(oldCxt);
+ return failoverObj->request_count;
+}
+
+/*
+ * The function processes all failover commands on master node
*/
static IPC_CMD_PREOCESS_RES process_failover_command_on_coordinator(WDCommandData* ipcCommand)
{
@@ -2254,22 +2395,15 @@ static IPC_CMD_PREOCESS_RES process_failover_command_on_coordinator(WDCommandDat
int node_count = 0;
int *node_id_list = NULL;
bool ret = false;
- WDFailoverObject* failoverObj;
+ unsigned char flags;
POOL_REQUEST_KIND reqKind;
+ WDFailoverCMDResults res;
if (get_local_node_state() != WD_COORDINATOR)
return IPC_CMD_ERROR; /* should never happen*/
- /*
- * The coordinator node
- * Forward this command to all standby nodes.
- * Ask the caller to proceed with failover
- * but first check if this failover is already requested
- * by some other node.
- */
-
ret = parse_wd_node_function_json(ipcCommand->sourcePacket.data, ipcCommand->sourcePacket.len,
- &func_name, &node_id_list, &node_count);
+ &func_name, &node_id_list, &node_count, &flags);
if (ret == false)
{
ereport(LOG,(
@@ -2297,129 +2431,64 @@ static IPC_CMD_PREOCESS_RES process_failover_command_on_coordinator(WDCommandDat
ipcCommand->commandSource == COMMAND_SOURCE_IPC?
"local pgpool-II on IPC interface":ipcCommand->sourceWdNode->nodeName)));
- if (get_cluster_node_count() == 0)
+ res = compute_failover_consensus(reqKind, node_id_list, node_count, flags, ipcCommand->sourceWdNode);
+ if (res == FAILOVER_RES_PROCEED)
{
- /*
- * Since I am the only node in the cluster so nothing
- * we need to do here
+ /* if the command is originated from remote node.
+ * request the local node to start failover
*/
- ereport(LOG,(
- errmsg("I am the only pgpool-II node in the watchdog cluster"),
- errdetail("no need to propagate the failover command [%s]",func_name)));
- reply_to_failove_command(ipcCommand, FAILOVER_RES_PROCEED, 0);
- return IPC_CMD_COMPLETE;
- }
+ if (ipcCommand->commandSource == COMMAND_SOURCE_REMOTE)
+ {
+ /* Set the flag that failover is request is originated from watchdog */
+ flags |= REQ_DETAIL_WATCHDOG;
- if (ipcCommand->commandSource == COMMAND_SOURCE_REMOTE && Req_info->switching)
- {
- /*
- * check if the failover is allowed before doing anything
- */
- ereport(LOG,
- (errmsg("failover command [%s] request from pgpool-II node \"%s\" is rejected because of switching",
- func_name,
- ipcCommand->sourceWdNode->nodeName)));
- reply_to_failove_command(ipcCommand, FAILOVER_RES_NOT_ALLOWED, 0);
+ if (reqKind == NODE_DOWN_REQUEST)
+ ret = degenerate_backend_set(node_id_list, node_count, flags);
+ else if (reqKind == NODE_UP_REQUEST)
+ ret = send_failback_request(node_id_list[0],false, REQ_DETAIL_WATCHDOG);
+ else if (reqKind == PROMOTE_NODE_REQUEST)
+ ret = promote_backend(node_id_list[0], flags);
+
+ if (ret == true)
+ reply_to_failove_command(ipcCommand, FAILOVER_RES_WILL_BE_DONE, 0);
+ else
+ reply_to_failove_command(ipcCommand, FAILOVER_RES_ERROR, 0);
+ }
+ else
+ {
+ /* Just reply the caller to get on with the failover */
+ reply_to_failove_command(ipcCommand, FAILOVER_RES_PROCEED, 0);
+ }
return IPC_CMD_COMPLETE;
}
-
- /*
- * check if the same failover is already issued to the main
- * process
- */
- failoverObj = get_failover_object(reqKind, node_count, node_id_list);
- if (failoverObj)
+ else if (res == FAILOVER_RES_NO_QUORUM)
{
ereport(LOG,
- (errmsg("failover command [%s] from %s is ignored",
+ (errmsg("failover command [%s] request from pgpool-II node \"%s\" is rejected because we do not have the quorum",
func_name,
- ipcCommand->commandSource == COMMAND_SOURCE_IPC?
- "local pgpool-II on IPC interface":ipcCommand->sourceWdNode->nodeName),
- errdetail("similar failover with ID:%d is already in progress",failoverObj->failoverID)));
-
- /* Same failover is already in progress */
- reply_to_failove_command(ipcCommand, FAILOVER_RES_ALREADY_ISSUED, 0);
+ ipcCommand->sourceWdNode->nodeName)));
+ reply_to_failove_command(ipcCommand, FAILOVER_RES_NO_QUORUM, 0);
return IPC_CMD_COMPLETE;
}
- else
+ else if (res == FAILOVER_RES_BUILDING_CONSENSUS)
{
- /* No similar failover is in progress */
- MemoryContext oldCxt;
- ereport(DEBUG1,
- (errmsg("proceeding with the failover command [%s] request from %s",
- func_name,
- ipcCommand->commandSource == COMMAND_SOURCE_IPC?
- "local pgpool-II":ipcCommand->sourceWdNode->nodeName),
- errdetail("no similar failover is in progress")));
- /*
- * okay now ask all nodes to start failover
- */
- wd_packet_shallow_copy(&ipcCommand->sourcePacket, &ipcCommand->commandPacket);
- ipcCommand->commandPacket.type = WD_REMOTE_FAILOVER_REQUEST;
- ipcCommand->sendToNode = NULL; /* command needs to be sent to all nodes */
- set_next_commandID_in_message(&ipcCommand->commandPacket);
-
- if (ipcCommand->commandSource != COMMAND_SOURCE_IPC)
- {
- if (process_wd_command_function(ipcCommand->sourceWdNode, &ipcCommand->sourcePacket,
- func_name, node_count, node_id_list, ipcCommand->commandPacket.command_id) == false)
- {
- return IPC_CMD_COMPLETE;
- }
- }
-
- /* send to all alive nodes */
ereport(LOG,
- (errmsg("forwarding the failover request [%s] to all alive nodes",func_name),
- errdetail("watchdog cluster currently has %d connected remote nodes",get_cluster_node_count())));
- send_command_packet_to_remote_nodes(ipcCommand, false);
-
- /* create a failover object. to make sure we know the node failovers
- * is already in progress
- */
- oldCxt = MemoryContextSwitchTo(TopMemoryContext);
- failoverObj = palloc0(sizeof(WDFailoverObject));
- failoverObj->reqKind = reqKind;
- failoverObj->nodesCount = node_count;
- if (node_count > 0)
- {
- failoverObj->nodeList = palloc(sizeof(int) * node_count);
- memcpy(failoverObj->nodeList, node_id_list, sizeof(int) * node_count);
- }
- failoverObj->failoverID = ipcCommand->commandPacket.command_id; /* use command id as failover id */
- gettimeofday(&failoverObj->startTime, NULL);
- failoverObj->wdRequestingNode = ipcCommand->sourceWdNode;
- g_cluster.wdCurrentFailovers = lappend(g_cluster.wdCurrentFailovers,failoverObj);
-
- MemoryContextSwitchTo(oldCxt);
-
- /* For a moment just think it is successfully sent to all nodes.*/
- if (ipcCommand->commandSource == COMMAND_SOURCE_IPC)
- {
- reply_to_failove_command(ipcCommand, FAILOVER_RES_PROCEED, failoverObj->failoverID);
- return IPC_CMD_COMPLETE;
- }
- else
- {
- if (get_cluster_node_count() <= 1)
- {
- /* Since its just 2 nodes cluster, and the only other
- * node is the one that actually issued the failover
- * so the command actually completes here
- */
- return IPC_CMD_COMPLETE;
- }
- }
+ (errmsg("failover command [%s] request from pgpool-II node \"%s\" is queued, waiting for more nodes confirmation",
+ func_name,
+ ipcCommand->sourceWdNode->nodeName)));
+ reply_to_failove_command(ipcCommand, FAILOVER_RES_BUILDING_CONSENSUS, 0);
+ return IPC_CMD_COMPLETE;
}
-
- return IPC_CMD_PROCESSING;
+ return IPC_CMD_COMPLETE;
}
static IPC_CMD_PREOCESS_RES process_IPC_failover_command(WDCommandData* ipcCommand)
{
if (get_local_node_state() == WD_COORDINATOR)
{
- return process_IPC_failover_command_on_coordinator(ipcCommand);
+ ereport(LOG,
+ (errmsg("watchdog received the failover command from local pgpool-II on IPC interface")));
+ return process_failover_command_on_coordinator(ipcCommand);
}
else if (get_local_node_state() == WD_STANDBY)
{
@@ -2548,54 +2617,66 @@ static IPC_CMD_PREOCESS_RES process_IPC_data_request_from_master(WDCommandData *
return IPC_CMD_TRY_AGAIN;
}
-static IPC_CMD_PREOCESS_RES process_IPC_failover_locking_cmd(WDCommandData *ipcCommand)
+static IPC_CMD_PREOCESS_RES process_IPC_failover_indication(WDCommandData *ipcCommand)
{
+ WDFailoverCMDResults res = FAILOVER_RES_NOT_ALLOWED;
/*
* if cluster or myself is not in stable state
* just return cluster in transaction
*/
ereport(LOG,
- (errmsg("received the failover command lock request from local pgpool-II on IPC interface")));
- if (get_local_node_state() == WD_STANDBY)
- {
- /* I am a standby node, Just forward the request to coordinator */
- wd_packet_shallow_copy(&ipcCommand->sourcePacket, &ipcCommand->commandPacket);
- set_next_commandID_in_message(&ipcCommand->commandPacket);
+ (errmsg("received the failover indication from Pgpool-II on IPC interface")));
- ipcCommand->sendToNode = WD_MASTER_NODE;
- if (send_command_packet_to_remote_nodes(ipcCommand, true) <= 0)
+ if (get_local_node_state() == WD_COORDINATOR)
+ {
+ json_value* root;
+ int failoverState = -1;
+ if (ipcCommand->sourcePacket.data == NULL || ipcCommand->sourcePacket.len <= 0)
{
ereport(LOG,
- (errmsg("unable to process the failover command lock request received on IPC interface"),
- errdetail("failed to forward the request to the master watchdog node \"%s\"",WD_MASTER_NODE->nodeName)));
- return IPC_CMD_ERROR;
+ (errmsg("watchdog unable to process failover indication"),
+ errdetail("invalid command packet")));
+ res = FAILOVER_RES_INVALID_FUNCTION;
+ }
+ root = json_parse(ipcCommand->sourcePacket.data,ipcCommand->sourcePacket.len);
+ if (root && root->type == json_object)
+ {
+ json_get_int_value_for_key(root, "FailoverFuncState", &failoverState);
}
else
{
- /*
- * wait for the result
- */
ereport(LOG,
- (errmsg("failover command lock request from local pgpool-II node received on IPC interface is forwarded to master watchdog node \"%s\"",
- WD_MASTER_NODE->nodeName),
- errdetail("waiting for the reply...")));
- return IPC_CMD_PROCESSING;
+ (errmsg("unable to process failover indication"),
+ errdetail("invalid json data in command packet")));
+ res = FAILOVER_RES_INVALID_FUNCTION;
+ }
+ if (root)
+ json_value_free(root);
+
+ if (failoverState < 0 )
+ {
+ ereport(LOG,
+ (errmsg("unable to process failover indication"),
+ errdetail("invalid json data in command packet")));
+ res = FAILOVER_RES_INVALID_FUNCTION;
+ }
+ else if (failoverState == 0) /* start */
+ {
+ res = failover_start_indication(ipcCommand);
+ }
+ else
+ {
+ res = failover_end_indication(ipcCommand);
}
}
- else if (get_local_node_state() == WD_COORDINATOR)
+ else
{
- /*
- * If I am coordinator, Just process the request locally
- */
- return process_failover_locking_requests_on_cordinator(ipcCommand);
+ ereport(LOG,
+ (errmsg("received the failover indication from Pgpool-II on IPC interface, but only master can do failover")));
}
+ reply_to_failove_command(ipcCommand, res, 0);
- /* we are not in any stable state at the moment */
- ereport(LOG,
- (errmsg("unable to process the failover command lock request received on IPC interface"),
- errdetail("this watchdog node has not joined the cluster yet"),
- errhint("try again in few seconds")));
- return IPC_CMD_TRY_AGAIN;
+ return IPC_CMD_COMPLETE;
}
static void process_remote_failover_locking_request(WatchdogNode* wdNode, WDPacketData* pkt)
@@ -2726,6 +2807,86 @@ static IPC_CMD_PREOCESS_RES process_failover_locking_requests_on_cordinator(WDCo
return IPC_CMD_COMPLETE;
}
+/* Failover start basically does nothing fency, It just sets the failover_in_progress
+ * flag and inform all nodes that the failover is in progress.
+ *
+ * only the local node that is a master can start the failover.
+ */
+static WDFailoverCMDResults
+failover_start_indication(WDCommandData* ipcCommand)
+{
+ ereport(LOG,
+ (errmsg("watchdog is informed of failover start")));
+
+ /* only coordinator(master) node is allowed to process failover */
+ if (get_local_node_state() == WD_COORDINATOR)
+ {
+// if (g_cluster.current_failover_id > 0)
+// {
+// /* we do allow multiple calls to failover_start but
+// * still it's a warning
+// */
+// if (g_cluster.current_failover_id != failoverID)
+// {
+// ereport(LOG,
+// (errmsg("watchdog is informed of new failover start, while failover with ID: %d was already in progress",
+// g_cluster.current_failover_id)));
+// }
+// else
+// {
+// ereport(LOG,
+// (errmsg("watchdog is informed of failover start while it is already in progress")));
+// }
+// }
+ /* Okay not save the current failoverID */
+// g_cluster.current_failover_id = failoverID;
+ /* inform to all nodes about failover start */
+ send_message_of_type(NULL, WD_FAILOVER_START, NULL);
+ return FAILOVER_RES_PROCEED;
+ }
+ else if (get_local_node_state() == WD_STANDBY)
+ {
+ ereport(LOG,
+ (errmsg("failed to process failover start request, I am not the master node"),
+ errdetail("I am standby node and request can only be processed by master watchdog node")));
+ return FAILOVER_RES_ERROR;
+ }
+ else
+ {
+ ereport(LOG,
+ (errmsg("failed to process failover start request, I am not in stable state")));
+ }
+ return FAILOVER_RES_TRANSITION;
+}
+
+static WDFailoverCMDResults
+failover_end_indication(WDCommandData* ipcCommand)
+{
+ ereport(LOG,
+ (errmsg("watchdog is informed of failover end")));
+
+ /* only coordinator(master) node is allowed to process failover */
+ if (get_local_node_state() == WD_COORDINATOR)
+ {
+ send_message_of_type(NULL, WD_FAILOVER_END, NULL);
+ return FAILOVER_RES_PROCEED;
+ }
+ else if (get_local_node_state() == WD_STANDBY)
+ {
+ ereport(LOG,
+ (errmsg("failed to process failover start request, I am not the master node"),
+ errdetail("I am standby node and request can only be processed by master watchdog node")));
+ return FAILOVER_RES_ERROR;
+ }
+ else
+ {
+ ereport(LOG,
+ (errmsg("failed to process failover start request, I am not in stable state")));
+ }
+ return FAILOVER_RES_TRANSITION;
+}
+
+
/*
* node_is_asking_for_failover_start()
* the function process the lock holding requests. If the lock holding node
@@ -3940,6 +4101,7 @@ static int standard_packet_processor(WatchdogNode* wdNode, WDPacketData* pkt)
case WD_INFO_MESSAGE:
{
char *authkey = NULL;
+ int oldQuorumStatus;
WD_STATES oldNodeState;
WatchdogNode* tempNode = parse_node_info_message(pkt, &authkey);
if (tempNode == NULL)
@@ -3949,6 +4111,7 @@ static int standard_packet_processor(WatchdogNode* wdNode, WDPacketData* pkt)
send_cluster_service_message(wdNode,pkt,CLUSTER_NODE_INVALID_VERSION);
break;
}
+ oldQuorumStatus = wdNode->quorum_status;
oldNodeState = wdNode->state;
wdNode->state = tempNode->state;
wdNode->startup_time.tv_sec = tempNode->startup_time.tv_sec;
@@ -3998,6 +4161,11 @@ static int standard_packet_processor(WatchdogNode* wdNode, WDPacketData* pkt)
handle_split_brain(wdNode, pkt);
}
}
+ else if (WD_MASTER_NODE == wdNode && oldQuorumStatus != wdNode->quorum_status)
+ {
+ /* inform Pgpool man about quorum status changes */
+ register_watchdog_quorum_change_interupt();
+ }
}
/* if the info message is from master node. Make sure we are in sync
@@ -4676,6 +4844,9 @@ static WDPacketData* get_message_of_type(char type, WDPacketData* replyFor)
case WD_IAM_COORDINATOR_MESSAGE:
pkt = get_beacon_message(WD_IAM_COORDINATOR_MESSAGE,replyFor);
break;
+
+ case WD_FAILOVER_START:
+ case WD_FAILOVER_END:
case WD_REQ_INFO_MESSAGE:
case WD_STAND_FOR_COORDINATOR_MESSAGE:
case WD_DECLARE_COORDINATOR_MESSAGE:
@@ -5598,6 +5769,7 @@ static int watchdog_state_machine_coordinator(WD_EVENTS event, WatchdogNode* wdN
}
/* inform to the cluster about the new quorum status */
send_message_of_type(NULL, WD_INFO_MESSAGE,NULL);
+ register_watchdog_quorum_change_interupt();
}
}
break;
@@ -6189,6 +6361,9 @@ static int watchdog_state_machine_standby(WD_EVENTS event, WatchdogNode* wdNode,
case WD_EVENT_PACKET_RCV:
switch (pkt->type)
{
+ case WD_FAILOVER_END:
+ register_backend_state_sync_req_interupt();
+ break;
case WD_STAND_FOR_COORDINATOR_MESSAGE:
{
if (WD_MASTER_NODE == NULL)
@@ -6394,6 +6569,7 @@ static void process_pgpool_remote_failover_command(WatchdogNode* wdNode, WDPacke
char* func_name;
int node_count = 0;
int *node_id_list = NULL;
+ unsigned char flags;
if (pkt->data == NULL || pkt->len == 0)
{
@@ -6412,7 +6588,7 @@ static void process_pgpool_remote_failover_command(WatchdogNode* wdNode, WDPacke
reply_with_minimal_message(wdNode, WD_ERROR_MESSAGE, pkt);
return;
}
- if (parse_wd_node_function_json(pkt->data, pkt->len, &func_name, &node_id_list, &node_count))
+ if (parse_wd_node_function_json(pkt->data, pkt->len, &func_name, &node_id_list, &node_count, &flags))
{
ereport(LOG,
(errmsg("watchdog received the failover command from \"%s\"",wdNode->nodeName)));
@@ -6437,6 +6613,7 @@ static void process_remote_online_recovery_command(WatchdogNode* wdNode, WDPacke
char* func_name;
int node_count = 0;
int *node_id_list = NULL;
+ unsigned char flags;
if (pkt->data == NULL || pkt->len == 0)
{
@@ -6450,7 +6627,7 @@ static void process_remote_online_recovery_command(WatchdogNode* wdNode, WDPacke
ereport(LOG,
(errmsg("watchdog received online recovery request from \"%s\"",wdNode->nodeName)));
- if (parse_wd_node_function_json(pkt->data, pkt->len, &func_name, &node_id_list, &node_count))
+ if (parse_wd_node_function_json(pkt->data, pkt->len, &func_name, &node_id_list, &node_count, &flags))
{
if (strcasecmp(WD_FUNCTION_START_RECOVERY, func_name) == 0)
{
@@ -6530,7 +6707,7 @@ static bool process_wd_command_function(WatchdogNode* wdNode, WDPacketData* pkt,
}
else
{
- ret = send_failback_request(node_id_list[0],false, failover_id);
+ ret = send_failback_request(node_id_list[0],false, REQ_DETAIL_WATCHDOG);
}
}
@@ -6544,7 +6721,7 @@ static bool process_wd_command_function(WatchdogNode* wdNode, WDPacketData* pkt,
}
else
{
- ret = degenerate_backend_set(node_id_list, node_count, false, failover_id);
+// ret = degenerate_backend_set(node_id_list, node_count, false, fla);
}
}
diff --git a/src/watchdog/wd_commands.c b/src/watchdog/wd_commands.c
index 42c77705..a089f53e 100644
--- a/src/watchdog/wd_commands.c
+++ b/src/watchdog/wd_commands.c
@@ -54,16 +54,12 @@
#define WD_INTERLOCK_TIMEOUT_SEC 10
#define WD_INTERLOCK_WAIT_COUNT ((int) ((WD_INTERLOCK_TIMEOUT_SEC * 1000)/WD_INTERLOCK_WAIT_MSEC))
-static void sleep_in_waiting(void);
static void FreeCmdResult(WDIPCCmdResult* res);
-
-static WDFailoverCMDResults wd_issue_failover_lock_command(char* syncReqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id);
-static char* get_wd_failover_cmd_type_json(char* reqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id);
-static WDFailoverCMDResults wd_send_failover_sync_command(char* syncReqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id);
+static char* get_wd_failover_state_json(bool start);
static int open_wd_command_sock(bool throw_error);
static WDFailoverCMDResults wd_get_failover_result_from_data(WDIPCCmdResult *result, unsigned int *wd_failover_id);
-
+static WDFailoverCMDResults wd_issue_failover_command(char* func_name, int *node_id_set, int count, unsigned char flags);
/* shared memory variables */
char *watchdog_ipc_address = NULL;
bool *watchdog_require_cleanup = NULL; /* shared memory variable set to true
@@ -135,6 +131,30 @@ WD_STATES get_watchdog_local_node_state(void)
return ret;
}
+int get_watchdog_quorum_state(void)
+{
+ WD_STATES ret = WD_DEAD;
+ WDGenericData *state = get_wd_runtime_variable_value(WD_RUNTIME_VAR_QUORUM_STATE);
+ if (state == NULL)
+ {
+ ereport(LOG,
+ (errmsg("failed to get quorum state of watchdog cluster"),
+ errdetail("get runtime variable value from watchdog returned no data")));
+ return WD_DEAD;
+ }
+ if (state->valueType != VALUE_DATA_TYPE_INT)
+ {
+ ereport(LOG,
+ (errmsg("failed to get quorum state of watchdog cluster"),
+ errdetail("get runtime variable value from watchdog returned invalid value type")));
+ pfree(state);
+ return WD_DEAD;
+ }
+ ret = (WD_STATES)state->data.intVal;
+ pfree(state);
+ return ret;
+}
+
char* get_watchdog_ipc_address(void)
{
return watchdog_ipc_address;
@@ -539,7 +559,7 @@ wd_start_recovery(void)
char type;
unsigned int *shared_key = get_ipc_shared_key();
- char* func = get_wd_node_function_json(WD_FUNCTION_START_RECOVERY, NULL,0,
+ char* func = get_wd_node_function_json(WD_FUNCTION_START_RECOVERY, NULL,0, 0,
shared_key?*shared_key:0,pool_config->wd_authkey);
WDIPCCmdResult *result = issue_command_to_watchdog(WD_IPC_ONLINE_RECOVERY_COMMAND,
@@ -585,7 +605,7 @@ wd_end_recovery(void)
char type;
unsigned int *shared_key = get_ipc_shared_key();
- char* func = get_wd_node_function_json(WD_FUNCTION_END_RECOVERY, NULL, 0,
+ char* func = get_wd_node_function_json(WD_FUNCTION_END_RECOVERY, NULL, 0, 0,
shared_key?*shared_key:0,pool_config->wd_authkey);
@@ -627,29 +647,7 @@ wd_end_recovery(void)
return COMMAND_FAILED;
}
-
-WDFailoverCMDResults
-wd_send_failback_request(int node_id, unsigned int *wd_failover_id)
-{
- int n = node_id;
- char* func;
- unsigned int *shared_key = get_ipc_shared_key();
- WDFailoverCMDResults res;
-
- func = get_wd_node_function_json(WD_FUNCTION_FAILBACK_REQUEST,&n, 1,
- shared_key?*shared_key:0,pool_config->wd_authkey);
-
- WDIPCCmdResult *result = issue_command_to_watchdog(WD_IPC_FAILOVER_COMMAND,
- WD_DEFAULT_IPC_COMMAND_TIMEOUT,
- func, strlen(func), true);
- pfree(func);
-
- res = wd_get_failover_result_from_data(result, wd_failover_id);
- FreeCmdResult(result);
- return res;
-}
-
-static char* get_wd_failover_cmd_type_json(char* reqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id)
+static char* get_wd_failover_state_json(bool start)
{
char* json_str;
JsonNode* jNode = jw_create_with_object(true);
@@ -659,9 +657,7 @@ static char* get_wd_failover_cmd_type_json(char* reqType, enum WDFailoverLocks l
if (pool_config->wd_authkey != NULL && strlen(pool_config->wd_authkey) > 0)
jw_put_string(jNode, WD_IPC_AUTH_KEY, pool_config->wd_authkey); /* put the auth key*/
- jw_put_string(jNode, "SyncRequestType", reqType);
- jw_put_int(jNode, "FailoverLockID", lockID);
- jw_put_int(jNode, "WDFailoverID", wd_failover_id);
+ jw_put_int(jNode, "FailoverFuncState", start?0:1);
jw_finish_document(jNode);
json_str = pstrdup(jw_get_json_string(jNode));
jw_destroy(jNode);
@@ -669,14 +665,14 @@ static char* get_wd_failover_cmd_type_json(char* reqType, enum WDFailoverLocks l
}
static WDFailoverCMDResults
-wd_send_failover_sync_command(char* syncReqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id)
+wd_send_failover_func_status_command(bool start)
{
WDFailoverCMDResults res;
unsigned int failover_id;
- char* json_data = get_wd_failover_cmd_type_json(syncReqType, lockID, wd_failover_id);
+ char* json_data = get_wd_failover_state_json(start);
- WDIPCCmdResult *result = issue_command_to_watchdog(WD_FAILOVER_LOCKING_REQUEST
+ WDIPCCmdResult *result = issue_command_to_watchdog(WD_FAILOVER_INDICATION
,pool_config->recovery_timeout,
json_data, strlen(json_data), true);
@@ -743,43 +739,59 @@ static WDFailoverCMDResults wd_get_failover_result_from_data(WDIPCCmdResult *res
return FAILOVER_RES_ERROR;
}
-WDFailoverCMDResults
-wd_degenerate_backend_set(int *node_id_set, int count, unsigned int *wd_failover_id)
+static WDFailoverCMDResults
+wd_issue_failover_command(char* func_name, int *node_id_set, int count, unsigned char flags)
{
WDFailoverCMDResults res;
char* func;
unsigned int *shared_key = get_ipc_shared_key();
+ unsigned int wd_failover_id;
- func = get_wd_node_function_json(WD_FUNCTION_DEGENERATE_REQUEST,node_id_set, count,
+ func = get_wd_node_function_json(func_name,node_id_set, count, flags,
shared_key?*shared_key:0,pool_config->wd_authkey);
-
+
WDIPCCmdResult *result = issue_command_to_watchdog(WD_IPC_FAILOVER_COMMAND ,
WD_DEFAULT_IPC_COMMAND_TIMEOUT,
func, strlen(func), true);
pfree(func);
- res = wd_get_failover_result_from_data(result, wd_failover_id);
+ res = wd_get_failover_result_from_data(result, &wd_failover_id);
FreeCmdResult(result);
return res;
}
+/*
+ * send the degenerate backend request to watchdog.
+ * now watchdog can respond to the request in following ways.
+ *
+ * 1 - It can tell the caller to procees with failover. This
+ * happens when the current node is the master watchdog node.
+ *
+ * 2 - It can tell the caller to failover not allowed
+ * this happens when either cluster does not have the quorum
+ *
+ */
WDFailoverCMDResults
-wd_promote_backend(int node_id, unsigned int *wd_failover_id)
+wd_degenerate_backend_set(int *node_id_set, int count, unsigned char flags)
{
- WDFailoverCMDResults res;
- int n = node_id;
- char* func;
- WDIPCCmdResult *result;
- unsigned int *shared_key = get_ipc_shared_key();
-
- func = get_wd_node_function_json(WD_FUNCTION_PROMOTE_REQUEST,&n, 1,
- shared_key?*shared_key:0,pool_config->wd_authkey);
- result = issue_command_to_watchdog(WD_IPC_FAILOVER_COMMAND,
- WD_DEFAULT_IPC_COMMAND_TIMEOUT,
- func, strlen(func), true);
- pfree(func);
- res = wd_get_failover_result_from_data(result, wd_failover_id);
- FreeCmdResult(result);
- return res;
+ if (pool_config->use_watchdog)
+ return wd_issue_failover_command(WD_FUNCTION_DEGENERATE_REQUEST, node_id_set, count, flags);
+ return FAILOVER_RES_PROCEED;
+}
+
+WDFailoverCMDResults
+wd_promote_backend(int node_id, unsigned char flags)
+{
+ if (pool_config->use_watchdog)
+ return wd_issue_failover_command(WD_FUNCTION_PROMOTE_REQUEST, &node_id, 1, flags);
+ return FAILOVER_RES_PROCEED;
+}
+
+WDFailoverCMDResults
+wd_send_failback_request(int node_id, unsigned char flags)
+{
+ if (pool_config->use_watchdog)
+ return wd_issue_failover_command(WD_FUNCTION_FAILBACK_REQUEST, &node_id, 1, flags);
+ return FAILOVER_RES_PROCEED;
}
/*
@@ -878,86 +890,20 @@ open_wd_command_sock(bool throw_error)
return sock;
}
-WDFailoverCMDResults wd_start_failover_interlocking(unsigned int wd_failover_id)
-{
- if (pool_config->use_watchdog)
- return wd_issue_failover_lock_command(WD_REQ_FAILOVER_START, 0, wd_failover_id);
- return FAILOVER_RES_I_AM_LOCK_HOLDER;
-}
-
-WDFailoverCMDResults wd_end_failover_interlocking(unsigned int wd_failover_id)
-{
- if (pool_config->use_watchdog)
- return wd_issue_failover_lock_command(WD_REQ_FAILOVER_END, 0, wd_failover_id);
- return FAILOVER_RES_SUCCESS;
-}
-
-WDFailoverCMDResults wd_failover_lock_release(enum WDFailoverLocks lock, unsigned int wd_failover_id)
+WDFailoverCMDResults wd_failover_start(void)
{
if (pool_config->use_watchdog)
- return wd_issue_failover_lock_command(WD_REQ_FAILOVER_RELEASE_LOCK, lock, wd_failover_id);
- return FAILOVER_RES_SUCCESS;
+ return wd_send_failover_func_status_command(0);
+ return FAILOVER_RES_PROCEED;
}
-WDFailoverCMDResults wd_failover_lock_status(enum WDFailoverLocks lock, unsigned int wd_failover_id)
+WDFailoverCMDResults wd_failover_end(void)
{
if (pool_config->use_watchdog)
- return wd_issue_failover_lock_command(WD_REQ_FAILOVER_LOCK_STATUS, lock, wd_failover_id);
- return FAILOVER_RES_UNLOCKED;
+ return wd_send_failover_func_status_command(1);
+ return FAILOVER_RES_PROCEED;
}
-void wd_wait_until_command_complete_or_timeout(enum WDFailoverLocks lock, unsigned int wd_failover_id)
-{
- WDFailoverCMDResults res = FAILOVER_RES_TRANSITION;
- int count = WD_INTERLOCK_WAIT_COUNT;
-
- while (pool_config->use_watchdog)
- {
- res = wd_failover_lock_status(lock, wd_failover_id);
- if (res == FAILOVER_RES_UNLOCKED ||
- res == FAILOVER_RES_NO_LOCKHOLDER)
- {
- /* we have the permision */
- return;
- }
- sleep_in_waiting();
- if (--count < 0)
- {
- ereport(WARNING,
- (errmsg("timeout wating for unlock")));
- break;
- }
- }
-}
-
-/*
- * This is just a wrapper over wd_send_failover_sync_command()
- * but try to wait for WD_INTERLOCK_TIMEOUT_SEC amount of time
- * if watchdog is in transition state
- */
-
-static WDFailoverCMDResults wd_issue_failover_lock_command(char* syncReqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id)
-{
- WDFailoverCMDResults res;
- int x;
- for (x=0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION/2; x++)
- {
- res = wd_send_failover_sync_command(syncReqType, lockID, wd_failover_id);
- if (res != FAILOVER_RES_TRANSITION)
- break;
- sleep(2);
- }
- return res;
-}
-
-static void
-sleep_in_waiting(void)
-{
- struct timeval t = {0, WD_INTERLOCK_WAIT_MSEC * 1000};
- select(0, NULL, NULL, NULL, &t);
-}
-
-
static void FreeCmdResult(WDIPCCmdResult* res)
{
diff --git a/src/watchdog/wd_json_data.c b/src/watchdog/wd_json_data.c
index cc95e28f..97dc1129 100644
--- a/src/watchdog/wd_json_data.c
+++ b/src/watchdog/wd_json_data.c
@@ -690,7 +690,7 @@ WDNodeInfo* get_WDNodeInfo_from_wd_node_json(json_value* source)
}
-char* get_wd_node_function_json(char* func_name, int *node_id_set, int count, unsigned int sharedKey, char* authKey)
+char* get_wd_node_function_json(char* func_name, int *node_id_set, int count, unsigned char flags, unsigned int sharedKey, char* authKey)
{
char* json_str;
int i;
@@ -702,6 +702,7 @@ char* get_wd_node_function_json(char* func_name, int *node_id_set, int count, un
jw_put_string(jNode, WD_IPC_AUTH_KEY, authKey); /* put the auth key*/
jw_put_string(jNode, "Function", func_name);
+ jw_put_int(jNode, "Flags", (int)flags);
jw_put_int(jNode, "NodeCount", count);
if (count > 0)
{
@@ -717,7 +718,7 @@ char* get_wd_node_function_json(char* func_name, int *node_id_set, int count, un
return json_str;
}
-bool parse_wd_node_function_json(char* json_data, int data_len, char** func_name, int **node_id_set, int *count)
+bool parse_wd_node_function_json(char* json_data, int data_len, char** func_name, int **node_id_set, int *count, unsigned char *flags)
{
json_value *root, *value;
char* ptr;
@@ -750,6 +751,12 @@ bool parse_wd_node_function_json(char* json_data, int data_len, char** func_name
}
*func_name = pstrdup(ptr);
/* If it is a node function ?*/
+ if (json_get_int_value_for_key(root, "Flags", flags))
+ {
+ /*node count not found, But we don't care much about this*/
+ *flags = 0;
+ /* it may be from the old version */
+ }
if (json_get_int_value_for_key(root, "NodeCount", &node_count))
{
/*node count not found, But we don't care much about this*/
More information about the pgpool-hackers
mailing list