[pgpool-general-jp: 166] Re: pgpoolでのupdateについて

Yoshiyuki Asaba y-asaba @ sraoss.co.jp
2007年 6月 6日 (水) 16:38:37 JST


浅羽です。

From: "Mizuno Shinya" <098.mizuno.shinya @ gmail.com>
Subject: [pgpool-general-jp: 165] Re: pgpoolでのupdateについて
Date: Wed, 6 Jun 2007 15:43:27 +0900

> さっそく、パッチをあてて動作の確認を進めておりましたが、問題
> がありましたので、ご報告いたします。
> 
> testDB=> begin;
> WARNING:  there is already a transaction in progress
> BEGIN
> testDB=> commit;
> server closed the connection unexpectedly
>         This probably means the server terminated abnormally
>         before or while processing the request.
> The connection to the server was lost. Attempting reset: Succeeded.
> 
> 
> 「begin」「commit」を実行したときにも「begin」「commit」でく
> くられてしまっているように見受けられます。

申し訳ありません。新しく作りなおしたので、再度お試しいただけないでしょ
うか?

よろしくお願いします。
--
Yoshiyuki Asaba
y-asaba @ sraoss.co.jp
-------------- next part --------------
Index: pool_process_query.c
===================================================================
RCS file: /cvsroot/pgpool/pgpool-II/pool_process_query.c,v
retrieving revision 1.19
diff -c -r1.19 pool_process_query.c
*** pool_process_query.c	2 Jun 2007 00:54:40 -0000	1.19
--- pool_process_query.c	6 Jun 2007 07:33:14 -0000
***************
*** 122,127 ****
--- 122,128 ----
  static int reset_backend(POOL_CONNECTION_POOL *backend, int qcnt);
  
  static int is_select_query(Node *node, char *sql);
+ static int is_commit_query(Node *node);
  static int is_sequence_query(Node *node);
  static int load_balance_enabled(POOL_CONNECTION_POOL *backend, Node* node, char *sql);
  static void start_load_balance(POOL_CONNECTION_POOL *backend);
***************
*** 179,184 ****
--- 180,188 ----
  static POOL_MEMORY_POOL *prepare_memory_context = NULL;
  
  static void query_cache_register(char kind, POOL_CONNECTION *frontend, char *database, char *data, int data_len);
+ static POOL_STATUS start_internal_transaction(POOL_CONNECTION_POOL *backend, Node *node);
+ static POOL_STATUS end_internal_transaction(POOL_CONNECTION_POOL *backend);
+ 
  
  POOL_STATUS pool_process_query(POOL_CONNECTION *frontend, 
  							   POOL_CONNECTION_POOL *backend,
***************
*** 742,748 ****
  	char *string, *string1;
  	int len;
  	static char *sq = "show pool_status";
! 	int i;
  	List *parse_tree_list;
  	Node *node, *node1;
  	POOL_STATUS status;
--- 746,752 ----
  	char *string, *string1;
  	int len;
  	static char *sq = "show pool_status";
! 	int i, commit;
  	List *parse_tree_list;
  	Node *node, *node1;
  	POOL_STATUS status;
***************
*** 1003,1010 ****
--- 1007,1021 ----
  				return status;
  			}
  		}
+ 		else if (REPLICATION && query == NULL && start_internal_transaction(backend, node))
+ 		{
+ 			free_parser();
+ 			return POOL_ERROR;
+ 		}
  	}
  
+ 	/* check if query is "COMMIT" */
+ 	commit = is_commit_query(node);
  	free_parser();
  
  	for (i=0;i<NUM_BACKENDS;i++)
***************
*** 1012,1017 ****
--- 1023,1063 ----
  		if (!VALID_BACKEND(i))
  			continue;
  
+ 		/* skip master node */
+ 		if (commit && IS_MASTER_NODE_ID(i))
+ 			continue;
+ 
+ 		/* forward the query to the backend */
+ 		pool_write(CONNECTION(backend, i), "Q", 1);
+ 
+ 		if (MAJOR(backend) == PROTO_MAJOR_V3)
+ 		{
+ 			int sendlen = htonl(len + 4);
+ 			pool_write(CONNECTION(backend, i), &sendlen, sizeof(sendlen));
+ 		}
+ 
+ 		if (pool_write_and_flush(CONNECTION(backend, i), string, len) < 0)
+ 		{
+ 			return POOL_END;
+ 		}
+ 
+ 		/*
+ 		 * in "strict mode" we need to wait for backend completing the query.
+ 		 * note that this is not applied if "NO STRICT" is specified as a comment.
+ 		 */
+ 		if ((pool_config->replication_strict && !NO_STRICT_MODE(string)) ||
+ 			STRICT_MODE(string))
+ 		{
+ 			pool_debug("waiting for backend %d completing the query", i);
+ 			if (synchronize(CONNECTION(backend, i)))
+ 				return POOL_END;
+ 		}
+ 	}
+ 
+ 	if (commit)
+ 	{
+ 		int i = MASTER_NODE_ID;
+ 
  		/* forward the query to the backend */
  		pool_write(CONNECTION(backend, i), "Q", 1);
  
***************
*** 1053,1059 ****
  	int sendlen;
  	int i;
  	char kind;
! 	int status;
  	Portal *portal;
  	char *string1;
  	PrepareStmt *p_stmt;
--- 1099,1105 ----
  	int sendlen;
  	int i;
  	char kind;
! 	int status, commit = 0;
  	Portal *portal;
  	char *string1;
  	PrepareStmt *p_stmt;
***************
*** 1090,1095 ****
--- 1136,1147 ----
  			in_load_balance = 1;
  			select_in_transaction = 1;
  		}
+ 		else if (REPLICATION && start_internal_transaction(backend, (Node *)p_stmt->query))
+ 		{
+ 			return POOL_END;
+ 		}
+ 
+ 		commit = is_commit_query((Node *)p_stmt->query);
  		pool_memory_delete(pool_memory);
  	}
  	else if (MASTER_SLAVE)
***************
*** 1106,1111 ****
--- 1158,1166 ----
  		if (!VALID_BACKEND(i))
  			continue;
  
+ 		if (commit && IS_MASTER_NODE_ID(i))
+ 			continue;
+ 
  		cp = CONNECTION(backend, i);
  
  		/* forward the query to the backend */
***************
*** 1135,1140 ****
--- 1190,1220 ----
  		}
  	}
  
+ 	if (commit)
+ 	{
+ 		POOL_CONNECTION *cp;
+ 		i = MASTER_NODE_ID;
+ 
+ 		cp = CONNECTION(backend, i);
+ 
+ 		/* forward the query to the backend */
+ 		pool_write(cp, "E", 1);
+ 		sendlen = htonl(len + 4);
+ 		pool_write(cp, &sendlen, sizeof(sendlen));
+ 		pool_write(cp, string, len);
+ 
+ 		/*
+ 		 * send "Flush" message so that backend notices us
+ 		 * the completion of the command
+ 		 */
+ 		pool_write(cp, "H", 1);
+ 		sendlen = htonl(4);
+ 		if (pool_write_and_flush(cp, &sendlen, sizeof(sendlen)) < 0)
+ 		{
+ 			return POOL_ERROR;
+ 		}
+ 	}
+ 
  	while ((kind = pool_read_kind(backend)),
  		   (kind != 'C' && kind != 'E' && kind != 'I' && kind != 's'))
  	{
***************
*** 1253,1270 ****
  		/* set transaction state */
  		pool_debug("ReadyForQuery: transaction state: %c", state);
  
! 		for (i=0;i<NUM_BACKENDS;i++)
! 		{
! 			if (!VALID_BACKEND(i))
! 				continue;
! 
! 			CONNECTION(backend, i)->tstate = state;
! 
! 			if (do_command(CONNECTION(backend, i), "COMMIT", PROTO_MAJOR_V3, 1) != POOL_CONTINUE)
! 				return POOL_ERROR;
! 		}
! 
! 		internal_transaction_started = 0;
  	}
  
  	pool_flush(frontend);
--- 1333,1340 ----
  		/* set transaction state */
  		pool_debug("ReadyForQuery: transaction state: %c", state);
  
! 		if (end_internal_transaction(backend) != POOL_CONTINUE)
! 			return POOL_ERROR;
  	}
  
  	pool_flush(frontend);
***************
*** 3615,3620 ****
--- 3685,3701 ----
  	return 0;
  }
  
+ static int is_commit_query(Node *node)
+ {
+ 	TransactionStmt *stmt;
+ 
+ 	if (!IsA(node, TransactionStmt))
+ 		return 0;
+ 
+ 	stmt = (TransactionStmt *)node;
+ 	return stmt->kind == TRANS_STMT_COMMIT;
+ }
+ 
  /*
   * start load balance mode
   */
***************
*** 4020,4042 ****
  
  	snprintf(qbuf, sizeof(qbuf), "LOCK TABLE %s IN SHARE ROW EXCLUSIVE MODE", table);
  
! 	/* if we are not in a transaction block,
! 	 * start a new transaction
! 	 */
! 	if (TSTATE(backend) == 'I')
! 	{
! 		for (i=0;i<NUM_BACKENDS;i++)
! 		{
! 			if (VALID_BACKEND(i))
! 			{
! 				if (do_command(CONNECTION(backend, i), "BEGIN", PROTO_MAJOR_V3, 0) != POOL_CONTINUE)
! 					return POOL_END;
! 			}
! 		}
! 
! 		/* mark that we started new transaction */
! 		internal_transaction_started = 1;
! 	}
  
  	status = POOL_CONTINUE;
  
--- 4101,4108 ----
  
  	snprintf(qbuf, sizeof(qbuf), "LOCK TABLE %s IN SHARE ROW EXCLUSIVE MODE", table);
  
! 	if (start_internal_transaction(backend, (Node *)node) != POOL_CONTINUE)
! 		return POOL_END;
  
  	status = POOL_CONTINUE;
  
***************
*** 4530,4532 ****
--- 4596,4660 ----
  
  	set_ps_display(psbuf, false);
  }
+ 
+ static POOL_STATUS start_internal_transaction(POOL_CONNECTION_POOL *backend, Node *node)
+ {
+ 	int i;
+ 
+ 	/* if we are not in a transaction block,
+ 	 * start a new transaction
+ 	 */
+ 	if (TSTATE(backend) == 'I' &&
+ 		(IsA(node, InsertStmt) || IsA(node, UpdateStmt) ||
+ 		 IsA(node, DeleteStmt) || IsA(node, SelectStmt)))
+ 	{
+ 		for (i=0;i<NUM_BACKENDS;i++)
+ 		{
+ 			if (VALID_BACKEND(i))
+ 			{
+ 				if (do_command(CONNECTION(backend, i), "BEGIN", PROTO_MAJOR_V3, 0) != POOL_CONTINUE)
+ 					return POOL_END;
+ 			}
+ 		}
+ 
+ 		/* mark that we started new transaction */
+ 		internal_transaction_started = 1;
+ 	}
+ 	return POOL_CONTINUE;
+ }
+ 
+ 
+ static POOL_STATUS end_internal_transaction(POOL_CONNECTION_POOL *backend)
+ {
+ 	int i;
+ 
+ 	/* We need to commit from secondary to master. */
+ 	for (i=0;i<NUM_BACKENDS;i++)
+ 	{
+ 		if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
+ 		{
+ 			if (do_command(CONNECTION(backend, i), "COMMIT", PROTO_MAJOR_V3, 1) != POOL_CONTINUE)
+ 			{
+ 				internal_transaction_started = 0;
+ 				return POOL_END;
+ 			}
+ 		}
+ 	}
+ 
+ 	/* commit on master */
+ 	for (i=0;i<NUM_BACKENDS;i++)
+ 	{
+ 		if (VALID_BACKEND(i) && IS_MASTER_NODE_ID(i))
+ 		{
+ 			if (do_command(CONNECTION(backend, i), "COMMIT", PROTO_MAJOR_V3, 1) != POOL_CONTINUE)
+ 			{
+ 				internal_transaction_started = 0;
+ 				return POOL_END;
+ 			}
+ 			break;
+ 		}
+ 	}
+ 
+ 	internal_transaction_started = 0;
+ 	return POOL_CONTINUE;	
+ }


pgpool-general-jp メーリングリストの案内