View Issue Details

IDProjectCategoryView StatusLast Update
0000794Pgpool-IIEnhancementpublic2023-05-06 19:47
Reporterchamway Assigned Topengbo  
PrioritynormalSeveritytrivialReproducibilityalways
Status assignedResolutionreopened 
OScentosOS Version7 
Product Version4.3.2 
Summary0000794: why prepare statement always send to primary node?
DescriptionI saw in the function send_ to_where(), the prepare statement returned POOL_ EITHER; but in the function is_ select_ query() did not add a PrepareStmt judgment, so in the pool_where_to_send() always send prepare to the primary node.
TagsNo tags attached.

Activities

pengbo

2023-04-20 16:34

developer   ~0004347

> I saw in the function send_ to_where(), the prepare statement returned POOL_ EITHER;
> but in the function is_ select_ query() did not add a PrepareStmt judgment, so in the pool_where_to_send() always send prepare to the primary node.

Yes. This is the current behaviour of load balancing feature.
Currently Pgpool-II doesn't load balance PREPARE/EXECUTE/DEALLOCATE to avoid error like "ERROR: prepared statement ... does not exist".

If you set "disable_load_balance_on_write = transaction" (default), when a write query is issued inside an explicit transaction,
subsequent queries will be sent to Primary only until the end of this transaction in order to avoid the replication delay.

If pgpool load balances "PREPARE/EXECUTE/DEALLOCATE", after issuing a WRITE query, "EXECUTE" will be sent to Primary and it can cause "ERROR: prepared statement ... does not exist".

For example:
----------------------------------------
BEGIN;
  PREPARE test AS SELECT 1; -- Load balanced to standby
  EXECUTE test; -- Load balanced to standby
  UPDATE ... -- WRITE query to primary
  EXECUTE test; -- Don't be load balanced because a WRITE query was issued.
                                                           In this case the error occurs: "ERROR: prepared statement test does not exist"
----------------------------------------

To load balance "PREPARE/EXECUTE/DEALLOCATE" and avoid an error, pgpool needs to run "PREPARE" again on Primary after issuing a WRITE query.

We have added some restrictions in the documentation:
    https://git.postgresql.org/gitweb/?p=pgpool2.git;a=commit;h=e81597a10193f0ad0221db80912879eb97fb5d91
and TODO:
    https://www.pgpool.net/mediawiki/index.php/TODO#Allow_load_balance_for_PREPARE.2FEXECUTE.2FDEALLOCATE

chamway

2023-04-21 20:05

reporter   ~0004351

ok, I already understand. thanks.

pengbo

2023-04-25 09:18

developer   ~0004359

Thank you!
I am going to close this issue.

chamway

2023-05-06 19:46

reporter   ~0004375

I have modified this issue and made a patch based on version 4.4.2. When the Prepare statement occurs, I check whether it is a read-only statement and send it to all nodes, and decide whether to perform load balancing during the Execute statement. I passed the test, maybe you can check if it is a feasible solution

chamway

2023-05-06 19:47

reporter   ~0004376

prepare_20230506.patch (17,784 bytes)   
diff --git a/src/context/pool_query_context.c b/src/context/pool_query_context.c
index 2cf5132..1543ac0 100644
--- a/src/context/pool_query_context.c
+++ b/src/context/pool_query_context.c
@@ -61,6 +61,7 @@ static void where_to_send_deallocate(POOL_QUERY_CONTEXT * query_context, Node *n
 static char *remove_read_write(int len, const char *contents, int *rewritten_len);
 static void set_virtual_main_node(POOL_QUERY_CONTEXT *query_context);
 static void set_load_balance_info(POOL_QUERY_CONTEXT *query_context);
+static bool load_balance_if_possible(POOL_QUERY_CONTEXT * query_context, char *query, Node *node);
 
 static bool is_in_list(char *name, List *list);
 static bool is_select_object_in_temp_write_list(Node *node, void *context);
@@ -467,10 +468,15 @@ pool_where_to_send(POOL_QUERY_CONTEXT * query_context, char *query, Node *node)
 	else if (MAIN_REPLICA)
 	{
 		POOL_DEST	dest;
+		Node *real_node;
+		POOL_QUERY_CONTEXT *prepare_context = NULL;
+		char *real_query = query;
 
-		dest = send_to_where(node, query);
+		get_really_node(node, &real_node, &prepare_context, &real_query);
 
-		dml_adaptive(node, query);
+		dest = send_to_where(real_node, query);
+
+		dml_adaptive(real_node, query);
 
 		ereport(DEBUG1,
 				(errmsg("decide where to send the query"),
@@ -497,146 +503,29 @@ pool_where_to_send(POOL_QUERY_CONTEXT * query_context, char *query, Node *node)
 		 */
 		else
 		{
-			if (pool_config->load_balance_mode &&
-				is_select_query(node, query) &&
-				MAJOR(backend) == PROTO_MAJOR_V3)
+			if (IsA(node, ExecuteStmt) && prepare_context && !prepare_context->prepare_stmt_load_balance)
 			{
-				/*
-				 * If (we are outside of an explicit transaction) OR (the
-				 * transaction has not issued a write query yet, AND
-				 * transaction isolation level is not SERIALIZABLE) we might
-				 * be able to load balance.
-				 */
-
-				ereport(DEBUG1,
-						(errmsg("checking load balance preconditions. TSTATE:%c writing_transaction:%d failed_transaction:%d isolation:%d",
-								TSTATE(backend, PRIMARY_NODE_ID),
-								pool_is_writing_transaction(),
-								pool_is_failed_transaction(),
-								pool_get_transaction_isolation()),
-						 errdetail("destination = %d for query= \"%s\"", dest, query)));
-
-				if (TSTATE(backend, PRIMARY_NODE_ID) == 'I' ||
-					(!pool_is_writing_transaction() &&
-					 !pool_is_failed_transaction() &&
-					 pool_get_transaction_isolation() != POOL_SERIALIZABLE))
-				{
-					BackendInfo *bkinfo = pool_get_node_info(session_context->load_balance_node_id);
-
-					/*
-					 * Load balance if possible
-					 */
-
-					/*
-					 * As streaming replication delay is too much, if
-					 * prefer_lower_delay_standby is true then elect new
-					 * load balance node which is lowest delayed,
-					 * false then send to the primary.
-					 */
-					if (STREAM &&
-						pool_config->delay_threshold &&
-						bkinfo->standby_delay > pool_config->delay_threshold)
-					{
-						ereport(DEBUG1,
-								(errmsg("could not load balance because of too much replication delay"),
-								 errdetail("destination = %d for query= \"%s\"", dest, query)));
-
-						if (pool_config->prefer_lower_delay_standby)
-						{
-							int new_load_balancing_node = select_load_balancing_node();
-
-							session_context->load_balance_node_id = new_load_balancing_node;
-							session_context->query_context->load_balance_node_id = session_context->load_balance_node_id;
-							pool_set_node_to_be_sent(query_context, session_context->query_context->load_balance_node_id);
-						}
-						else
-						{
-							pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
-						}
-					}
-
-					/*
-					 * If system catalog is used in the SELECT, we prefer to
-					 * send to the primary. Example: SELECT * FROM pg_class
-					 * WHERE relname = 't1'; Because 't1' is a constant, it's
-					 * hard to recognize as table name.  Most use case such
-					 * query is against system catalog, and the table name can
-					 * be a temporary table, it's best to query against
-					 * primary system catalog. Please note that this test must
-					 * be done *before* test using pool_has_temp_table.
-					 */
-					else if (pool_has_system_catalog(node))
-					{
-						ereport(DEBUG1,
-								(errmsg("could not load balance because systems catalogs are used"),
-								 errdetail("destination = %d for query= \"%s\"", dest, query)));
-
-						pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
-					}
-
-					/*
-					 * If temporary table is used in the SELECT, we prefer to
-					 * send to the primary.
-					 */
-					else if (pool_config->check_temp_table && pool_has_temp_table(node))
-					{
-						ereport(DEBUG1,
-								(errmsg("could not load balance because temporary tables are used"),
-								 errdetail("destination = %d for query= \"%s\"", dest, query)));
-
-						pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
-					}
-
-					/*
-					 * If unlogged table is used in the SELECT, we prefer to
-					 * send to the primary.
-					 */
-					else if (pool_config->check_unlogged_table && pool_has_unlogged_table(node))
-					{
-						ereport(DEBUG1,
-								(errmsg("could not load balance because unlogged tables are used"),
-								 errdetail("destination = %d for query= \"%s\"", dest, query)));
-
-						pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
-					}
-					/*
-					 * When query match the query patterns in primary_routing_query_pattern_list, we
-					 * send only to main node.
-					 */
-					else if (pattern_compare(query, WRITELIST, "primary_routing_query_pattern_list") == 1)
-					{
-						pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
-					}
-					/*
-					 * If a writing function call is used, we prefer to send
-					 * to the primary.
-					 */
-					else if (pool_has_function_call(node))
-					{
-						ereport(DEBUG1,
-								(errmsg("could not load balance because writing functions are used"),
-								 errdetail("destination = %d for query= \"%s\"", dest, query)));
-
-						pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
-					}
-					else if (is_select_object_in_temp_write_list(node, query))
-					{
-						pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
-					}
-					else
-					{
-						if (pool_config->statement_level_load_balance)
-							session_context->load_balance_node_id = select_load_balancing_node();
+				pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+			}
+			else if(load_balance_if_possible(query_context, real_query, real_node))
+			{
+				session_context->query_context->load_balance_node_id = session_context->load_balance_node_id;
+				pool_set_node_to_be_sent(query_context,
+											session_context->query_context->load_balance_node_id);
 
-						session_context->query_context->load_balance_node_id = session_context->load_balance_node_id;
-						pool_set_node_to_be_sent(query_context,
-												 session_context->query_context->load_balance_node_id);
-					}
-				}
-				else
+				/*
+				* If the Prepare statement is load balanced to a standby node,
+				* when a write statement occurs in a multi-statement transaction,
+				* and with the level of 'disable_load_balance_on_write' 
+				* does not allow load balancing of subsequent statements in the transaction,
+				* then the Execute statement will be sent to the primary node 
+				* and an error 'prepared statement xx not exists' will appear.
+				* Therefore, here, the Prepare statement is set to send to all nodes.
+				*/
+				if (IsA(node, PrepareStmt))
 				{
-					/* Send to the primary only */
-					pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+					query_context->prepare_stmt_load_balance = true;
+					pool_setall_node_to_be_sent(query_context);
 				}
 			}
 			else
@@ -645,6 +534,10 @@ pool_where_to_send(POOL_QUERY_CONTEXT * query_context, char *query, Node *node)
 				pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
 			}
 		}
+
+		// release the real_query string if necessary
+		if (real_query != query)
+			pfree(real_query);
 	}
 	else if (REPLICATION)
 	{
@@ -760,14 +653,14 @@ pool_where_to_send(POOL_QUERY_CONTEXT * query_context, char *query, Node *node)
 	 */
 	if (IsA(node, ExecuteStmt))
 	{
-		POOL_SENT_MESSAGE *msg;
-
-		msg = pool_get_sent_message('Q', ((ExecuteStmt *) node)->name, POOL_SENT_MESSAGE_CREATED);
-		if (!msg)
-			msg = pool_get_sent_message('P', ((ExecuteStmt *) node)->name, POOL_SENT_MESSAGE_CREATED);
-		if (msg)
-			pool_copy_prep_where(msg->query_context->where_to_send,
-								 query_context->where_to_send);
+		// POOL_SENT_MESSAGE *msg;
+
+		// msg = pool_get_sent_message('Q', ((ExecuteStmt *) node)->name, POOL_SENT_MESSAGE_CREATED);
+		// if (!msg)
+		// 	msg = pool_get_sent_message('P', ((ExecuteStmt *) node)->name, POOL_SENT_MESSAGE_CREATED);
+		// if (msg)
+		// 	pool_copy_prep_where(msg->query_context->where_to_send,
+		// 						 query_context->where_to_send);
 	}
 
 	/*
@@ -2272,3 +2165,196 @@ dml_adaptive(Node *node, char *query)
 
 	}
 }
+
+static bool 
+load_balance_if_possible(POOL_QUERY_CONTEXT * query_context, char *query, Node *node)
+{
+	POOL_SESSION_CONTEXT *session_context;
+	POOL_CONNECTION_POOL *backend;
+	BackendInfo *bkinfo;
+	bool load_balance = true;
+
+	session_context = pool_get_session_context(false);
+	backend = session_context->backend;
+
+	/*
+	* If load_balance_mode is on, AND is_select_query() is true
+	* we might be able to load balance.
+	*/
+	if (!pool_config->load_balance_mode ||
+		!is_select_query(node, query)   ||
+		MAJOR(backend) != PROTO_MAJOR_V3)
+	{
+		return false;
+	}
+
+	/*
+	* If (we are outside of an explicit transaction) OR (the
+	* transaction has not issued a write query yet, AND
+	* transaction isolation level is not SERIALIZABLE) we might
+	* be able to load balance.
+	*/
+	ereport(DEBUG1,
+			(errmsg("checking load balance preconditions. TSTATE:%c writing_transaction:%d failed_transaction:%d isolation:%d",
+					TSTATE(backend, PRIMARY_NODE_ID),
+					pool_is_writing_transaction(),
+					pool_is_failed_transaction(),
+					pool_get_transaction_isolation())));
+
+	if (TSTATE(backend, PRIMARY_NODE_ID) != 'I' &&
+		(pool_is_writing_transaction() ||
+		 pool_is_failed_transaction()  ||
+		 pool_get_transaction_isolation() == POOL_SERIALIZABLE))
+	{
+		return false;
+	}
+
+	/*
+	* Do not perform these checks on ExecuteStmt, 
+	* because they have already been done during PrepareStmt
+	*/ 
+	if (!IsA(query_context->parse_tree, ExecuteStmt))
+	{
+		/*
+		* If system catalog is used in the SELECT, we prefer to
+		* send to the primary. Example: SELECT * FROM pg_class
+		* WHERE relname = 't1'; Because 't1' is a constant, it's
+		* hard to recognize as table name.  Most use case such
+		* query is against system catalog, and the table name can
+		* be a temporary table, it's best to query against
+		* primary system catalog. Please note that this test must
+		* be done *before* test using pool_has_temp_table.
+		*/
+		if (pool_has_system_catalog(node))
+		{
+			ereport(DEBUG1,
+					(errmsg("could not load balance because systems catalogs are used")));
+
+			return false;
+		}
+
+		/*
+		* If temporary table is used in the SELECT, we prefer to
+		* send to the primary.
+		*/
+		else if (pool_config->check_temp_table && pool_has_temp_table(node))
+		{
+			ereport(DEBUG1,
+					(errmsg("could not load balance because temporary tables are used")));
+
+			return false;
+		}
+
+		/*
+		* If unlogged table is used in the SELECT, we prefer to
+		* send to the primary.
+		*/
+		else if (pool_config->check_unlogged_table && pool_has_unlogged_table(node))
+		{
+			ereport(DEBUG1,
+					(errmsg("could not load balance because unlogged tables are used")));
+
+			return false;
+		}
+
+		/*
+		* When query match the query patterns in primary_routing_query_pattern_list, we
+		* send only to main node.
+		*/
+		else if (pattern_compare(query, WRITELIST, "primary_routing_query_pattern_list") == 1)
+		{
+			return false;
+		}
+
+		/*
+		* If a writing function call is used, we prefer to send
+		* to the primary.
+		*/
+		else if (pool_has_function_call(node))
+		{
+			ereport(DEBUG1,
+					(errmsg("could not load balance because writing functions are used")));
+
+			return false;
+		}
+	}
+
+	/*
+	* PrepareStmt does not perform this check, 
+	* it will be checked when ExecuteStmt occurs
+	*/ 
+	if (!IsA(query_context->parse_tree, PrepareStmt))
+	{
+		if (is_select_object_in_temp_write_list(node, query))
+			return false;
+	}
+	else
+		return load_balance;
+
+	// checks done, perform load balancing
+	bkinfo = pool_get_node_info(session_context->load_balance_node_id);
+
+	if (pool_config->statement_level_load_balance)
+	{
+		session_context->load_balance_node_id = select_load_balancing_node();
+	}			
+	else if (STREAM &&
+		pool_config->delay_threshold &&
+		bkinfo->standby_delay > pool_config->delay_threshold)
+	{
+		/*
+		* As streaming replication delay is too much, if
+		* prefer_lower_delay_standby is true then select new
+		* load balance node which is lowest delayed,
+		* false then send to the primary.
+		*/							
+		ereport(DEBUG1,
+				(errmsg("could not load balance because of too much replication delay")));
+
+		if (pool_config->prefer_lower_delay_standby)
+		{
+			session_context->load_balance_node_id = select_load_balancing_node();
+		}
+		else
+		{
+			load_balance = false;
+		}
+	}		
+
+	return load_balance;
+}
+
+/*
+* get the clause for PrepareStmt/ExecuteStmt
+*/
+void 
+get_really_node(Node *node, Node **realNode, POOL_QUERY_CONTEXT **context, char **sql)
+{
+	*realNode = node;
+
+	if (IsA(node, PrepareStmt))
+	{
+		*realNode = ((PrepareStmt *)node)->query;
+	}
+	else if (IsA(node, ExecuteStmt))
+	{
+		POOL_SENT_MESSAGE *msg;
+
+		msg = pool_get_sent_message('Q', ((ExecuteStmt *) node)->name, POOL_SENT_MESSAGE_CREATED);
+		if (!msg)
+			msg = pool_get_sent_message('P', ((ExecuteStmt *) node)->name, POOL_SENT_MESSAGE_CREATED);
+		if (msg)
+		{
+			PrepareStmt *prepare_statement = (PrepareStmt *)msg->query_context->parse_tree;
+			*realNode = ((PrepareStmt *)prepare_statement)->query;
+
+			if (context)
+				*context = msg->query_context;
+		}	
+	}
+
+	if (sql && *realNode != node)
+	{
+		*sql = nodeToString(*realNode);
+	}
+}
\ No newline at end of file
diff --git a/src/include/context/pool_query_context.h b/src/include/context/pool_query_context.h
index b732c90..c51efef 100644
--- a/src/include/context/pool_query_context.h
+++ b/src/include/context/pool_query_context.h
@@ -91,6 +91,8 @@ typedef struct
 									 * this flag is true. */
 
 	MemoryContext memory_context;	/* memory context for query context */
+
+	bool prepare_stmt_load_balance; /* Can the prepare statement be load balanced*/
 }			POOL_QUERY_CONTEXT;
 
 extern POOL_QUERY_CONTEXT * pool_init_query_context(void);
@@ -127,4 +129,5 @@ extern void pool_unset_cache_exceeded(void);
 extern bool pool_is_transaction_read_only(Node *node);
 extern void pool_force_query_node_to_backend(POOL_QUERY_CONTEXT * query_context, int backend_id);
 extern void check_object_relationship_list(char *name, bool is_func_name);
+extern void get_really_node(Node *node, Node **realNode, POOL_QUERY_CONTEXT **context, char **sql);
 #endif							/* POOL_QUERY_CONTEXT_H */
diff --git a/src/include/protocol/pool_proto_modules.h b/src/include/protocol/pool_proto_modules.h
index 57280a3..f663f1f 100644
--- a/src/include/protocol/pool_proto_modules.h
+++ b/src/include/protocol/pool_proto_modules.h
@@ -150,6 +150,7 @@ extern int RowDescription(POOL_CONNECTION * frontend,
 extern void wait_for_query_response_with_trans_cleanup(POOL_CONNECTION * frontend, POOL_CONNECTION * backend, int protoVersion, int pid, int key);
 extern POOL_STATUS wait_for_query_response(POOL_CONNECTION * frontend, POOL_CONNECTION * backend, int protoVersion);
 extern bool is_select_query(Node *node, char *sql);
+extern bool is_select_query_ex(Node *node, char *sql);
 extern bool is_commit_query(Node *node);
 extern bool is_rollback_query(Node *node);
 extern bool is_commit_or_rollback_query(Node *node);
diff --git a/src/protocol/pool_process_query.c b/src/protocol/pool_process_query.c
index f2db65e..b088330 100644
--- a/src/protocol/pool_process_query.c
+++ b/src/protocol/pool_process_query.c
@@ -1099,6 +1099,19 @@ reset_backend(POOL_CONNECTION_POOL * backend, int qcnt)
 }
 
 /*
+* support PrepareStmt/ExecuteStmt 
+* Internal call is_select_query()
+*/
+bool 
+is_select_query_ex(Node *node, char *sql)
+{
+	Node *stmt;
+
+	get_really_node(node, &stmt, NULL, &sql);
+	return is_select_query(stmt, sql);
+}
+
+/*
  * Returns true if the SQL statement is regarded as read SELECT from syntax's
  * point of view. However callers need to do additional checking such as if the
  * SELECT does not have write functions or not to make sure that the SELECT is
diff --git a/src/protocol/pool_proto_modules.c b/src/protocol/pool_proto_modules.c
index 81ef4af..d27c279 100644
--- a/src/protocol/pool_proto_modules.c
+++ b/src/protocol/pool_proto_modules.c
@@ -1088,7 +1088,7 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
 		/*
 		 * Take care of "writing transaction" flag.
 		 */
-		if ((!is_select_query(node, query) || pool_has_function_call(node)) &&
+		if ((!is_select_query_ex(node, query) || pool_has_function_call(node)) &&
 			 !is_start_transaction_query(node) &&
 			 !is_commit_or_rollback_query(node))
 		{
@@ -4204,7 +4204,7 @@ pool_at_command_success(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backe
 		close_standby_transactions(frontend, backend);
 	}
 
-	else if (!is_select_query(node, query) || pool_has_function_call(node))
+	else if (!is_select_query_ex(node, query) || pool_has_function_call(node))
 	{
 		/*
 		 * If the query was not READ SELECT, and we are in an explicit
prepare_20230506.patch (17,784 bytes)   

Issue History

Date Modified Username Field Change
2023-04-18 19:58 chamway New Issue
2023-04-19 09:22 pengbo Assigned To => pengbo
2023-04-19 09:22 pengbo Status new => assigned
2023-04-20 16:34 pengbo Note Added: 0004347
2023-04-20 16:34 pengbo Status assigned => feedback
2023-04-21 20:05 chamway Note Added: 0004351
2023-04-21 20:05 chamway Status feedback => assigned
2023-04-25 09:18 pengbo Note Added: 0004359
2023-04-25 09:18 pengbo Status assigned => closed
2023-05-06 19:46 chamway Status closed => feedback
2023-05-06 19:46 chamway Resolution open => reopened
2023-05-06 19:46 chamway Note Added: 0004375
2023-05-06 19:47 chamway Note Added: 0004376
2023-05-06 19:47 chamway File Added: prepare_20230506.patch
2023-05-06 19:47 chamway Status feedback => assigned