[Pgpool-general] Error in pgpool
Tatsuo Ishii
ishii at sraoss.co.jp
Fri Apr 3 14:11:41 UTC 2009
Ok, I have included the patched file.
--
Tatsuo Ishii
SRA OSS, Inc. Japan
> --- On Fri, 3/4/09, Tatsuo Ishii <ishii at sraoss.co.jp> wrote:
> > > I'm trying to patch against the pgpool-II-2.2
> > source, either I'm either too much of a cretin to apply
> > it or pool_proto_modules.c has other changes?
> >
> > The patch was against CVS HEAD. Could you try included new
> > patch? It
> > was generated using diff -u (previous one was diff -c) and
> > seems to be
> > nicely applied to 2.2-stable.
>
> Hmm, I still get the same when I try to patch with that.
>
>
>
-------------- next part --------------
/* -*-pgsql-c-*- */
/*
* $Header: /cvsroot/pgpool/pgpool-II/pool_proto_modules.c,v 1.6 2009/01/22 09:16:37 y-mori Exp $
*
* pgpool: a language independent connection pool server for PostgreSQL
* written by Tatsuo Ishii
*
* Copyright (c) 2003-2009 PgPool Global Development Group
*
* Permission to use, copy, modify, and distribute this software and
* its documentation for any purpose and without fee is hereby
* granted, provided that the above copyright notice appear in all
* copies and that both that copyright notice and this permission
* notice appear in supporting documentation, and that the name of the
* author not be used in advertising or publicity pertaining to
* distribution of the software without specific, written prior
* permission. The author makes no representations about the
* suitability of this software for any purpose. It is provided "as
* is" without express or implied warranty.
*
*---------------------------------------------------------------------
* pool_proto_modules.c: modules corresponding to message protocols.
* used by pool_process_query()
*---------------------------------------------------------------------
*/
#include "config.h"
#include <errno.h>
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <netinet/in.h>
#include <ctype.h>
#include "pool.h"
#include "pool_signal.h"
#include "pool_proto_modules.h"
int force_replication;
int replication_was_enabled; /* replication mode was enabled */
int master_slave_was_enabled; /* master/slave mode was enabled */
int internal_transaction_started; /* to issue table lock command a transaction
has been started internally */
int in_progress = 0; /* indicates while doing something after receiving Query */
int mismatch_ntuples; /* number of updated tuples */
char *copy_table = NULL; /* copy table name */
char *copy_schema = NULL; /* copy table name */
char copy_delimiter; /* copy delimiter char */
char *copy_null = NULL; /* copy null string */
void (*pending_function)(PreparedStatementList *p, Portal *portal) = NULL;
Portal *pending_prepared_portal = NULL;
Portal *unnamed_statement = NULL;
Portal *unnamed_portal = NULL;
int select_in_transaction = 0; /* non 0 if select query is in transaction */
int execute_select = 0; /* non 0 if select query is in transaction */
/* non 0 if "BEGIN" query with extended query protocol received */
int receive_extended_begin = 0;
/* non 0 if allow to close internal transaction */
int allow_close_transaction = 1;
PreparedStatementList prepared_list; /* prepared statement name list */
int is_select_pgcatalog = 0;
int is_select_for_update = 0; /* 1 if SELECT INTO or SELECT FOR UPDATE */
bool is_parallel_table = false;
/*
* last query string sent to simpleQuery()
*/
char query_string_buffer[QUERY_STRING_BUFFER_LEN];
/*
* query string produced by nodeToString() in simpleQuery().
* this variable only usefull when enable_query_cache is true.
*/
char *parsed_query = NULL;
POOL_STATUS NotificationResponse(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend)
{
int pid, pid1;
char *condition, *condition1 = NULL;
int len, len1 = 0;
int i;
POOL_STATUS status;
pool_write(frontend, "A", 1);
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i))
{
if (pool_read(CONNECTION(backend, i), &pid, sizeof(pid)) < 0)
return POOL_ERROR;
condition = pool_read_string(CONNECTION(backend, i), &len, 0);
if (condition == NULL)
return POOL_END;
if (IS_MASTER_NODE_ID(i))
{
pid1 = pid;
len1 = len;
condition1 = strdup(condition);
}
}
}
pool_write(frontend, &pid1, sizeof(pid1));
status = pool_write_and_flush(frontend, condition1, len1);
free(condition1);
return status;
}
/*
* Process Query('Q') message
* Query messages include a SQL string.
*/
POOL_STATUS SimpleQuery(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend, char *query)
{
char *string, *string1;
int len;
static char *sq = "show pool_status";
int i, commit;
List *parse_tree_list;
Node *node = NULL, *node1;
POOL_STATUS status;
int serialization_error_detected = 0;
int deadlock_detected = 0;
int active_sql_transaction_error = 0;
POOL_MEMORY_POOL *old_context = NULL;
Portal *portal;
force_replication = 0;
if (query == NULL) /* need to read query from frontend? */
{
/* read actual query */
if (MAJOR(backend) == PROTO_MAJOR_V3)
{
if (pool_read(frontend, &len, sizeof(len)) < 0)
return POOL_END;
len = ntohl(len) - 4;
string = pool_read2(frontend, len);
}
else
string = pool_read_string(frontend, &len, 0);
if (string == NULL)
return POOL_END;
}
else
{
len = strlen(query)+1;
string = query;
}
/* save last query string for logging purpose */
strncpy(query_string_buffer, string, sizeof(query_string_buffer));
/* show ps status */
query_ps_status(string, backend);
/* log query to log file if necessary */
if (pool_config->log_statement)
{
pool_log("statement: %s", string);
}
else
{
pool_debug("statement2: %s", string);
}
/* parse SQL string */
parse_tree_list = raw_parser(string);
if (parse_tree_list != NIL)
{
node = (Node *) lfirst(list_head(parse_tree_list));
if (PARALLEL_MODE)
is_parallel_table = is_partition_table(backend,node);
if (pool_config->enable_query_cache &&
SYSDB_STATUS == CON_UP &&
IsA(node, SelectStmt) &&
!(is_select_pgcatalog = IsSelectpgcatalog(node, backend)))
{
SelectStmt *select = (SelectStmt *)node;
if (! (select->intoClause || select->lockingClause))
{
parsed_query = strdup(nodeToString(node));
if (parsed_query == NULL)
{
pool_error("pool_process_query: malloc failed");
return POOL_ERROR;
}
if (parsed_query)
{
if (pool_query_cache_lookup(frontend, parsed_query, backend->info->database, TSTATE(backend)) == POOL_CONTINUE)
{
free(parsed_query);
parsed_query = NULL;
free_parser();
return POOL_CONTINUE;
}
}
is_select_for_update = 0;
}
else
{
is_select_for_update = 1;
}
}
if (pool_config->parallel_mode)
{
/* The Query is analyzed first in a parallel mode(in_parallel_query),
* and, next, the Query is rewritten(rewrite_query_stmt).
*/
/* analyze the query */
RewriteQuery *r_query = is_parallel_query(node,backend);
if(r_query->is_loadbalance)
{
/* Usual processing of pgpool is done by using the rewritten Query
* if judged a possible load-balancing as a result of analyzing
* the Query.
* Of course, the load is distributed only for load_balance_mode=true.
*/
if(r_query->r_code == SEND_LOADBALANCE_ENGINE)
{
/* use rewritten query */
string = r_query->rewrite_query;
/* change query length */
len = strlen(string)+1;
}
pool_debug("SimpleQuery: loadbalance_query =%s",string);
}
else if (r_query->is_parallel)
{
/*
* For the Query that the parallel processing is possible.
* Call parallel exe engine and return status to the upper layer.
*/
POOL_STATUS stats = pool_parallel_exec(frontend,backend,r_query->rewrite_query, node,true);
free_parser();
in_progress = 0;
return stats;
}
else if(!r_query->is_pg_catalog)
{
/* rewrite query and execute */
r_query = rewrite_query_stmt(node,frontend,backend,r_query);
if(r_query->type == T_InsertStmt)
{
free_parser();
if(r_query->r_code != INSERT_DIST_NO_RULE) {
in_progress = 0;
return r_query->status;
}
}
else if(r_query->type == T_SelectStmt)
{
free_parser();
in_progress = 0;
return r_query->status;
}
}
/*
* The same processing as usual pgpool is done to other Query type.
*/
}
/* check COPY FROM STDIN
* if true, set copy_* variable
*/
check_copy_from_stdin(node);
/*
* if this is DROP DATABASE command, send USR1 signal to parent and
* ask it to close all idle connections.
* XXX This is overkill. It would be better to close the idle
* connection for the database which DROP DATABASE command tries
* to drop. This is impossible at this point, since we have no way
* to pass such info to other processes.
*/
if (is_drop_database(node))
{
int stime = 5; /* XXX give arbitrary time to allow closing idle connections */
pool_debug("Query: sending SIGUSR1 signal to parent");
Req_info->kind = CLOSE_IDLE_REQUEST;
kill(getppid(), SIGUSR1); /* send USR1 signal to parent */
/* we need to loop over here since we will get USR1 signal while sleeping */
while (stime > 0)
{
stime = sleep(stime);
}
}
/* process status reporting? */
if (IsA(node, VariableShowStmt) && strncasecmp(sq, string, strlen(sq)) == 0)
{
StartupPacket *sp;
char psbuf[1024];
pool_debug("process reporting");
process_reporting(frontend, backend);
in_progress = 0;
/* show ps status */
sp = MASTER_CONNECTION(backend)->sp;
snprintf(psbuf, sizeof(psbuf), "%s %s %s idle",
sp->user, sp->database, remote_ps_data);
set_ps_display(psbuf, false);
free_parser();
return POOL_CONTINUE;
}
if (IsA(node, PrepareStmt) || IsA(node, DeallocateStmt) ||
IsA(node, VariableSetStmt) || IsA(node, DiscardStmt))
{
/*
* PREPARE, DEALLOCATE and SET statements must be replicated.
*/
if (MASTER_SLAVE && TSTATE(backend) != 'E')
force_replication = 1;
/*
* Before we do followings only when frontend == NULL,
* which was wrong since if, for example, reset_query_list
* contains "DISCARD ALL", then it does not register
* pending function and it causes trying to DEALLOCATE non
* existing prepared statment(2009/4/3 Tatsuo).
*/
if (IsA(node, PrepareStmt))
{
pending_function = add_prepared_list;
portal = create_portal();
if (portal == NULL)
{
pool_error("SimpleQuery: create_portal() failed");
return POOL_END;
}
/* switch memory context */
old_context = pool_memory;
pool_memory = portal->prepare_ctxt;
portal->portal_name = NULL;
portal->stmt = copyObject(node);
portal->sql_string = NULL;
pending_prepared_portal = portal;
}
else if (IsA(node, DeallocateStmt))
{
pending_function = del_prepared_list;
portal = create_portal();
if (portal == NULL)
{
pool_error("SimpleQuery: create_portal() failed");
return POOL_END;
}
/* switch memory context */
old_context = pool_memory;
pool_memory = portal->prepare_ctxt;
portal->portal_name = NULL;
portal->stmt = copyObject(node);
portal->sql_string = NULL;
pending_prepared_portal = portal;
}
else if (IsA(node, DiscardStmt))
{
DiscardStmt *stmt = (DiscardStmt *)node;
if (stmt->target == DISCARD_ALL || stmt->target == DISCARD_PLANS)
{
pending_function = delete_all_prepared_list;
pending_prepared_portal = NULL;
}
}
/* switch old memory context */
if (old_context)
pool_memory = old_context;
/* end of wrong if (see 2009/4/3 comment above) */
}
if (frontend && IsA(node, ExecuteStmt))
{
Portal *portal;
PrepareStmt *p_stmt;
ExecuteStmt *e_stmt = (ExecuteStmt *)node;
portal = lookup_prepared_statement_by_statement(&prepared_list,
e_stmt->name);
if (!portal)
{
string1 = string;
node1 = node;
}
else
{
p_stmt = (PrepareStmt *)portal->stmt;
string1 = nodeToString(p_stmt->query);
node1 = (Node *)p_stmt->query;
}
}
else
{
string1 = string;
node1 = node;
}
/* load balance trick */
if (load_balance_enabled(backend, node1, string1))
start_load_balance(backend);
else if (MASTER_SLAVE)
{
pool_debug("SimpleQuery: set master_slave_dml query: %s", string);
master_slave_was_enabled = 1;
MASTER_SLAVE = 0;
master_slave_dml = 1;
if (force_replication)
{
replication_was_enabled = 0;
REPLICATION = 1;
}
}
else if (REPLICATION &&
!pool_config->replicate_select &&
is_select_query(node1, string1) &&
!is_sequence_query(node1))
{
selected_slot = MASTER_NODE_ID;
replication_was_enabled = 1;
REPLICATION = 0;
LOAD_BALANCE_STATUS(MASTER_NODE_ID) = LOAD_SELECTED;
in_load_balance = 1;
select_in_transaction = 1;
}
/*
* determine if we need to lock the table
* to keep SERIAL data consistency among servers
* conditions:
* - replication is enabled
* - protocol is V3
* - statement is INSERT
* - either "INSERT LOCK" comment exists or insert_lock directive specified
*/
if (REPLICATION)
{
/* start a transaction if needed */
if (start_internal_transaction(backend, (Node *)node) != POOL_CONTINUE)
return POOL_END;
/* check if need lock */
if (need_insert_lock(backend, string, node))
{
/* if so, issue lock command */
status = insert_lock(backend, string, (InsertStmt *)node);
if (status != POOL_CONTINUE)
{
free_parser();
return status;
}
}
}
else if (REPLICATION && query == NULL && start_internal_transaction(backend, node))
{
free_parser();
return POOL_ERROR;
}
}
else
{ /* syntax error */
if (MASTER_SLAVE)
{
pool_debug("SimpleQuery: set master_slave_dml query: %s", string);
master_slave_was_enabled = 1;
MASTER_SLAVE = 0;
master_slave_dml = 1;
}
}
if (MAJOR(backend) == PROTO_MAJOR_V2 && is_start_transaction_query(node))
{
TSTATE(backend) = 'T';
}
if (REPLICATION || PARALLEL_MODE)
{
/* check if query is "COMMIT" */
commit = is_commit_query(node);
free_parser();
/* send query to master node */
if (!commit)
{
if (send_simplequery_message(MASTER(backend), len, string, MAJOR(backend)) != POOL_CONTINUE)
return POOL_END;
if (wait_for_query_response(MASTER(backend), string) != POOL_CONTINUE)
return POOL_END;
/*
* Check dead lock error on the master node and abort
* transactions on all nodes if so.
*/
deadlock_detected = detect_deadlock_error(MASTER(backend), MAJOR(backend));
if (deadlock_detected < 0)
return POOL_END;
/*
* Check serialization failure error and abort
* transactions on all nodes if so. Otherwise we allow
* data inconsistency among DB nodes. See following
* scenario: (M:master, S:slave)
*
* M:S1:BEGIN;
* M:S2:BEGIN;
* S:S1:BEGIN;
* S:S2:BEGIN;
* M:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
* M:S2:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
* S:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
* S:S2:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
* M:S1:UPDATE t1 SET i = i + 1;
* S:S1:UPDATE t1 SET i = i + 1;
* M:S2:UPDATE t1 SET i = i + 1; <-- blocked
* S:S1:COMMIT;
* M:S1:COMMIT;
* M:S2:ERROR: could not serialize access due to concurrent update
* S:S2:UPDATE t1 SET i = i + 1; <-- success in UPDATE and data becomes inconsistent!
*/
serialization_error_detected = detect_serialization_error(MASTER(backend), MAJOR(backend));
if (serialization_error_detected < 0)
return POOL_END;
/*
* check "SET TRANSACTION ISOLATION LEVEL must be called before any query" error.
* This happens in following scenario:
*
* M:S1:BEGIN;
* S:S1:BEGIN;
* M:S1:SELECT 1; <-- only sent to MASTER
* M:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
* S:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
* M: <-- error
* S: <-- ok since no previous SELECT is sent. kind mismatch error occurs!
*/
active_sql_transaction_error = detect_active_sql_transaction_error(MASTER(backend), MAJOR(backend));
if (active_sql_transaction_error < 0)
return POOL_END;
}
if (deadlock_detected == SPECIFIED_ERROR || serialization_error_detected == SPECIFIED_ERROR ||
active_sql_transaction_error == SPECIFIED_ERROR)
{
if (deadlock_detected == SPECIFIED_ERROR)
pool_log("SimpleQuery: received deadlock error message from master node. query: %s", string);
else if (serialization_error_detected == SPECIFIED_ERROR)
pool_log("SimpleQuery: received serialization failure error message from master node. query: %s", string);
else
pool_log("SimpleQuery: received SET TRANSACTION ISOLATION LEVEL must be called before any query error. query: %s", string);
string = POOL_ERROR_QUERY;
len = strlen(string) + 1;
}
/* send query to other than master nodes */
for (i=0;i<NUM_BACKENDS;i++)
{
if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
continue;
if (send_simplequery_message(CONNECTION(backend, i), len, string, MAJOR(backend)) != POOL_CONTINUE)
return POOL_END;
}
/* wait for response except MASTER node */
for (i=0;i<NUM_BACKENDS;i++)
{
if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
continue;
if (wait_for_query_response(CONNECTION(backend, i), string) != POOL_CONTINUE)
return POOL_END;
}
/* send "COMMIT" to master node if query is "COMMIT" */
if (commit)
{
if (send_simplequery_message(MASTER(backend), len, string, MAJOR(backend)) != POOL_CONTINUE)
return POOL_END;
if (wait_for_query_response(MASTER(backend), string) != POOL_CONTINUE)
return POOL_END;
TSTATE(backend) = 'I';
}
}
else
{
free_parser();
if (send_simplequery_message(MASTER(backend), len, string, MAJOR(backend)) != POOL_CONTINUE)
return POOL_END;
if (wait_for_query_response(MASTER(backend), string) != POOL_CONTINUE)
return POOL_END;
}
return POOL_CONTINUE;
}
/*
* process EXECUTE (V3 only)
*/
POOL_STATUS Execute(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend)
{
char *string; /* portal name + null terminate + max_tobe_returned_rows */
int len;
int i;
char kind;
int status, commit = 0;
Portal *portal;
char *string1;
PrepareStmt *p_stmt;
int deadlock_detected = 0;
int serialization_error_detected = 0;
int active_sql_transaction_error = 0;
POOL_STATUS ret;
/* read Execute packet */
if (pool_read(frontend, &len, sizeof(len)) < 0)
return POOL_END;
len = ntohl(len) - 4;
string = pool_read2(frontend, len);
pool_debug("Execute: portal name <%s>", string);
if (receive_extended_begin)
{
/* send sync message */
send_extended_protocol_message(backend, MASTER_NODE_ID, "S", 0, "");
kind = pool_read_kind(backend);
if (kind != 'Z')
return POOL_END;
if (ReadyForQuery(frontend, backend, 0) != POOL_CONTINUE)
return POOL_END;
}
portal = lookup_prepared_statement_by_portal(&prepared_list,
string);
/* load balance trick */
if (portal)
{
Node *node;
p_stmt = (PrepareStmt *)portal->stmt;
string1 = portal->sql_string;
node = (Node *)p_stmt->query;
if ((IsA(node, PrepareStmt) || IsA(node, DeallocateStmt) ||
IsA(node, VariableSetStmt)) &&
MASTER_SLAVE && TSTATE(backend) != 'E')
{
force_replication = 1;
}
/*
* JDBC driver sends "BEGIN" query internally if setAutoCommit(false).
* But it does not send Sync message after "BEGIN" query.
* In extended query protocol, PostgreSQL returns
* ReadyForQuery when a client sends Sync message.
* We can't know a transaction state...
* So pgpool send Sync message internally.
*/
else if (IsA(node, TransactionStmt) && MASTER_SLAVE)
{
TransactionStmt *stmt = (TransactionStmt *) node;
if (stmt->kind == TRANS_STMT_BEGIN ||
stmt->kind == TRANS_STMT_START)
receive_extended_begin = true;
}
if (load_balance_enabled(backend, node, string1))
start_load_balance(backend);
else if (REPLICATION &&
!pool_config->replicate_select &&
is_select_query((Node *)p_stmt->query, string1) &&
!is_sequence_query((Node *)p_stmt->query))
{
selected_slot = MASTER_NODE_ID;
replication_was_enabled = 1;
REPLICATION = 0;
LOAD_BALANCE_STATUS(MASTER_NODE_ID) = LOAD_SELECTED;
in_load_balance = 1;
select_in_transaction = 1;
execute_select = 1;
}
/*
else if (REPLICATION && start_internal_transaction(backend, (Node *)p_stmt->query))
{
return POOL_END;
}
*/
commit = is_commit_query((Node *)p_stmt->query);
}
if (MASTER_SLAVE)
{
master_slave_was_enabled = 1;
MASTER_SLAVE = 0;
master_slave_dml = 1;
if (force_replication)
{
replication_was_enabled = 0;
REPLICATION = 1;
}
}
if (REPLICATION || PARALLEL_MODE)
{
/* send query to master node */
if (!commit)
{
if (send_execute_message(backend, MASTER_NODE_ID, len, string) != POOL_CONTINUE)
return POOL_END;
pool_debug("waiting for backend completing the query");
if (synchronize(CONNECTION(backend, MASTER_NODE_ID)))
return POOL_END;
/*
* Check dead lock error on the master node and abort
* transactions on all nodes if so.
*/
deadlock_detected = detect_deadlock_error(MASTER(backend), MAJOR(backend));
if (deadlock_detected < 0)
return POOL_END;
/*
* Check serialization failure error and abort all nodes
* if so. Otherwise we allow data inconsistency among DB
* nodes. See following scenario: (M:master, S:slave)
*/
serialization_error_detected = detect_serialization_error(MASTER(backend), MAJOR(backend));
if (serialization_error_detected < 0)
return POOL_END;
/*
* check "SET TRANSACTION ISOLATION LEVEL must be called before any query" error.
* This happens in following scenario:
*
* M:S1:BEGIN;
* S:S1:BEGIN;
* M:S1:SELECT 1; <-- only sent to MASTER
* M:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
* S:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
* M: <-- error
* S: <-- ok since no previous SELECT is sent. kind mismatch error occurs!
*/
active_sql_transaction_error = detect_active_sql_transaction_error(MASTER(backend), MAJOR(backend));
if (active_sql_transaction_error < 0)
return POOL_END;
}
/* send query to other nodes */
for (i=0;i<NUM_BACKENDS;i++)
{
if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
continue;
if (deadlock_detected == SPECIFIED_ERROR || serialization_error_detected == SPECIFIED_ERROR ||
active_sql_transaction_error == SPECIFIED_ERROR)
{
char msg[1024] = "pgpoool_error_portal"; /* large enough */
int len = strlen(msg);
if (deadlock_detected == SPECIFIED_ERROR)
pool_log("Execute: received deadlock error message from master node. query: %s", string);
else if (serialization_error_detected == SPECIFIED_ERROR)
pool_log("SimpleQuery: received serialization failure error message from master node. query: %s", string);
else
pool_log("SimpleQuery: received SET TRANSACTION ISOLATION LEVEL must be called before any query error. query: %s", string);
memset(msg + len, 0, sizeof(int));
if (send_execute_message(backend, i, len + 5, msg))
return POOL_END;
}
else if (send_execute_message(backend, i, len, string) != POOL_CONTINUE)
return POOL_END;
}
/* wait for nodes excepted for master node */
for (i=0;i<NUM_BACKENDS;i++)
{
if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
continue;
pool_debug("waiting for backend completing the query");
if (synchronize(CONNECTION(backend, i)))
return POOL_END;
}
if (commit)
{
if (send_execute_message(backend, MASTER_NODE_ID, len, string) != POOL_CONTINUE)
return POOL_END;
pool_debug("waiting for backend completing the query");
if (synchronize(MASTER(backend)))
return POOL_END;
}
}
else
{
if (send_execute_message(backend, MASTER_NODE_ID, len, string) != POOL_CONTINUE)
return POOL_END;
pool_debug("waiting for backend completing the query");
if (synchronize(CONNECTION(backend, MASTER_NODE_ID)))
return POOL_END;
}
while ((ret = read_kind_from_backend(frontend, backend, &kind)) == POOL_CONTINUE)
{
/*
* forward message until receiving CommandComplete,
* ErrorResponse, EmptyQueryResponse or PortalSuspend.
*/
if (kind == 'C' || kind == 'E' || kind == 'I' || kind == 's')
break;
status = SimpleForwardToFrontend(kind, frontend, backend);
if (status != POOL_CONTINUE)
return status;
}
if (ret != POOL_CONTINUE)
return ret;
status = SimpleForwardToFrontend(kind, frontend, backend);
if (status != POOL_CONTINUE)
return status;
return POOL_CONTINUE;
}
/*
* process Parse (V3 only)
*/
POOL_STATUS Parse(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend)
{
char kind;
int len;
char *string;
int i;
Portal *portal;
POOL_MEMORY_POOL *old_context;
PrepareStmt *p_stmt;
char *name, *stmt;
List *parse_tree_list;
Node *node = NULL;
int deadlock_detected = 0;
int insert_stmt_with_lock = 0;
POOL_STATUS status;
/* read Parse packet */
if (pool_read(frontend, &len, sizeof(len)) < 0)
return POOL_END;
len = ntohl(len) - 4;
string = pool_read2(frontend, len);
pool_debug("Parse: portal name <%s>", string);
name = string;
stmt = string + strlen(string) + 1;
parse_tree_list = raw_parser(stmt);
if (parse_tree_list == NIL)
{
free_parser();
}
else
{
node = (Node *) lfirst(list_head(parse_tree_list));
insert_stmt_with_lock = need_insert_lock(backend, stmt, node);
portal = create_portal();
if (portal == NULL)
{
pool_error("Parse: create_portal() failed");
return POOL_END;
}
/* switch memory context */
old_context = pool_memory;
pool_memory = portal->prepare_ctxt;
/* translate Parse message to PrepareStmt */
p_stmt = palloc(sizeof(PrepareStmt));
p_stmt->type = T_PrepareStmt;
p_stmt->name = pstrdup(name);
p_stmt->query = copyObject(node);
portal->stmt = (Node *)p_stmt;
portal->portal_name = NULL;
portal->sql_string = pstrdup(stmt);
if (*name)
{
pending_function = add_prepared_list;
pending_prepared_portal = portal;
}
else /* unnamed statement */
{
pending_function = add_unnamed_portal;
pfree(p_stmt->name);
p_stmt->name = NULL;
pending_prepared_portal = portal;
}
/* switch old memory context */
pool_memory = old_context;
if (REPLICATION)
{
char kind;
if (TSTATE(backend) != 'T')
{
/* synchronize transaction state */
for (i = 0; i < NUM_BACKENDS; i++)
{
if (!VALID_BACKEND(i))
continue;
/* send sync message */
send_extended_protocol_message(backend, i, "S", 0, "");
}
kind = pool_read_kind(backend);
if (kind != 'Z')
return POOL_END;
if (ReadyForQuery(frontend, backend, 0) != POOL_CONTINUE)
return POOL_END;
}
if (is_strict_query(node))
start_internal_transaction(backend, node);
if (insert_stmt_with_lock)
{
/* start a transaction if needed and lock the table */
status = insert_lock(backend, stmt, (InsertStmt *)node);
if (status != POOL_CONTINUE)
{
return status;
}
}
}
free_parser();
}
/* send to master node */
if (send_extended_protocol_message(backend, MASTER_NODE_ID,
"P", len, string))
return POOL_END;
if (REPLICATION || PARALLEL_MODE || MASTER_SLAVE)
{
/* We must synchronize because Parse message acquires table
* locks.
*/
pool_debug("waiting for master completing the query");
if (synchronize(MASTER(backend)))
return POOL_END;
/*
* We must check deadlock error because a aborted transaction
* by detecting deadlock isn't same on all nodes.
* If a transaction is aborted on master node, pgpool send a
* error query to another nodes.
*/
deadlock_detected = detect_deadlock_error(MASTER(backend), MAJOR(backend));
if (deadlock_detected < 0)
return POOL_END;
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
{
if (deadlock_detected)
{
pool_log("Parse: received deadlock error message from master node");
if (send_simplequery_message(CONNECTION(backend, i),
strlen(POOL_ERROR_QUERY)+1,
POOL_ERROR_QUERY,
MAJOR(backend)))
return POOL_END;
}
else if (send_extended_protocol_message(backend, i,
"P", len, string))
return POOL_END;
}
}
/* wait for DB nodes completing query except master node */
for (i=0;i<NUM_BACKENDS;i++)
{
if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
continue;
pool_debug("waiting for %th backend completing the query", i);
if (synchronize(CONNECTION(backend, i)))
return POOL_END;
}
}
for (;;)
{
POOL_STATUS ret;
ret = read_kind_from_backend(frontend, backend, &kind);
if (ret != POOL_CONTINUE)
return ret;
SimpleForwardToFrontend(kind, frontend, backend);
if (pool_flush(frontend) < 0)
return POOL_ERROR;
/* Ignore warning messages */
if (kind != 'N')
break;
}
return POOL_CONTINUE;
}
/*
* Process ReadyForQuery('Z') message.
*
* - if the global error status "mismatch_ntuples" is set, send an error query
* to all DB nodes to abort transaction.
* - internal transaction is closed
*/
POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend, int send_ready)
{
StartupPacket *sp;
char psbuf[1024];
int i;
int len;
signed char state;
/*
* If the numbers of update tuples are differ, we need to abort transaction
* by using do_error_command. This only works with PROTO_MAJOR_V3.
*/
if (mismatch_ntuples && MAJOR(backend) == PROTO_MAJOR_V3)
{
int i;
signed char state;
char kind;
/*
* XXX: discard rest of ReadyForQuery packet
*/
if (pool_read_message_length(backend) < 0)
return POOL_END;
state = pool_read_kind(backend);
if (state < 0)
return POOL_END;
pool_debug("ReadyForQuery: transaction state: %c", state);
for (i = 0; i < NUM_BACKENDS; i++)
{
if (VALID_BACKEND(i))
{
/* abort transaction on all nodes. */
do_error_command(CONNECTION(backend, i), PROTO_MAJOR_V3);
}
}
/* loop through until we get ReadyForQuery */
for(;;)
{
kind = pool_read_kind(backend);
if (kind < 0)
return POOL_END;
if (kind == 'Z')
break;
/* put the message back to read buffer */
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i))
{
pool_unread(CONNECTION(backend,i), &kind, 1);
}
}
/* discard rest of the packet */
if (pool_discard_packet(backend) != POOL_CONTINUE)
{
pool_error("ReadyForQuery: pool_discard_packet failed");
return POOL_END;
}
}
mismatch_ntuples = 0;
}
/*
* if a transaction is started for insert lock, we need to close
* the transaction.
*/
if (internal_transaction_started && allow_close_transaction)
{
int len;
signed char state;
if (MAJOR(backend) == PROTO_MAJOR_V3)
{
if ((len = pool_read_message_length(backend)) < 0)
return POOL_END;
pool_debug("ReadyForQuery: message length: %d", len);
len = htonl(len);
state = pool_read_kind(backend);
if (state < 0)
return POOL_END;
/* set transaction state */
pool_debug("ReadyForQuery: transaction state: %c", state);
}
if (end_internal_transaction(backend) != POOL_CONTINUE)
return POOL_ERROR;
}
if (MAJOR(backend) == PROTO_MAJOR_V3)
{
if ((len = pool_read_message_length(backend)) < 0)
return POOL_END;
pool_debug("ReadyForQuery: message length: %d", len);
/*
* Do not check transaction state in master/slave mode.
* Because SET, PREPARE, DEALLOCATE are replicated.
* If these queries are executed inside a transaction block,
* transation state will be inconsistent. But it is no problem.
*/
if (master_slave_dml)
{
char kind, kind1;
if (pool_read(MASTER(backend), &kind, sizeof(kind)))
return POOL_END;
for (i = 0; i < NUM_BACKENDS; i++)
{
if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
continue;
if (pool_read(CONNECTION(backend, i), &kind1, sizeof(kind)))
return POOL_END;
}
state = kind;
}
else
{
state = pool_read_kind(backend);
if (state < 0)
return POOL_END;
}
/* set transaction state */
pool_debug("ReadyForQuery: transaction state: %c", state);
for (i=0;i<NUM_BACKENDS;i++)
{
if (!VALID_BACKEND(i))
continue;
CONNECTION(backend, i)->tstate = state;
}
}
if (send_ready)
{
pool_write(frontend, "Z", 1);
if (MAJOR(backend) == PROTO_MAJOR_V3)
{
len = htonl(len);
pool_write(frontend, &len, sizeof(len));
pool_write(frontend, &state, 1);
}
if (pool_flush(frontend))
return POOL_END;
}
in_progress = 0;
/* end load balance mode */
if (in_load_balance)
end_load_balance(backend);
if (master_slave_dml)
{
MASTER_SLAVE = 1;
master_slave_was_enabled = 0;
master_slave_dml = 0;
if (force_replication)
{
force_replication = 0;
REPLICATION = 0;
replication_was_enabled = 0;
}
}
#ifdef NOT_USED
return ProcessFrontendResponse(frontend, backend);
#endif
sp = MASTER_CONNECTION(backend)->sp;
if (MASTER(backend)->tstate == 'T')
snprintf(psbuf, sizeof(psbuf), "%s %s %s idle in transaction",
sp->user, sp->database, remote_ps_data);
else
snprintf(psbuf, sizeof(psbuf), "%s %s %s idle",
sp->user, sp->database, remote_ps_data);
set_ps_display(psbuf, false);
return POOL_CONTINUE;
}
POOL_STATUS FunctionCall(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend)
{
char dummy[2];
int oid;
int argn;
int i;
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i))
{
pool_write(CONNECTION(backend, i), "F", 1);
}
}
/* dummy */
if (pool_read(frontend, dummy, sizeof(dummy)) < 0)
return POOL_ERROR;
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i))
{
pool_write(CONNECTION(backend, i), dummy, sizeof(dummy));
}
}
/* function object id */
if (pool_read(frontend, &oid, sizeof(oid)) < 0)
return POOL_ERROR;
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i))
{
pool_write(CONNECTION(backend, i), &oid, sizeof(oid));
}
}
/* number of arguments */
if (pool_read(frontend, &argn, sizeof(argn)) < 0)
return POOL_ERROR;
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i))
{
pool_write(CONNECTION(backend, i), &argn, sizeof(argn));
}
}
argn = ntohl(argn);
for (i=0;i<argn;i++)
{
int len;
char *arg;
/* length of each argument in bytes */
if (pool_read(frontend, &len, sizeof(len)) < 0)
return POOL_ERROR;
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i))
{
pool_write(CONNECTION(backend, i), &len, sizeof(len));
}
}
len = ntohl(len);
/* argument value itself */
if ((arg = pool_read2(frontend, len)) == NULL)
return POOL_ERROR;
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i))
{
pool_write(CONNECTION(backend, i), arg, len);
}
}
}
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i))
{
if (pool_flush(CONNECTION(backend, i)))
return POOL_ERROR;
}
}
return POOL_CONTINUE;
}
POOL_STATUS FunctionResultResponse(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend)
{
char dummy;
int len;
char *result = 0;
int i;
pool_write(frontend, "V", 1);
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i))
{
if (pool_read(CONNECTION(backend, i), &dummy, 1) < 0)
return POOL_ERROR;
}
}
pool_write(frontend, &dummy, 1);
/* non empty result? */
if (dummy == 'G')
{
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i))
{
/* length of result in bytes */
if (pool_read(CONNECTION(backend, i), &len, sizeof(len)) < 0)
return POOL_ERROR;
}
}
pool_write(frontend, &len, sizeof(len));
len = ntohl(len);
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i))
{
/* result value itself */
if ((result = pool_read2(MASTER(backend), len)) == NULL)
return POOL_ERROR;
}
}
pool_write(frontend, result, len);
}
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i))
{
/* unused ('0') */
if (pool_read(MASTER(backend), &dummy, 1) < 0)
return POOL_ERROR;
}
}
pool_write(frontend, "0", 1);
return pool_flush(frontend);
}
POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend)
{
char fkind;
POOL_STATUS status;
int i;
if (frontend->len <= 0 && frontend->no_forward != 0)
return POOL_CONTINUE;
if (pool_read(frontend, &fkind, 1) < 0)
{
pool_log("ProcessFrontendResponse: failed to read kind from frontend. frontend abnormally exited");
return POOL_END;
}
pool_debug("read kind from frontend %c(%02x)", fkind, fkind);
switch (fkind)
{
case 'X': /* Terminate message*/
if (MAJOR(backend) == PROTO_MAJOR_V3)
{
int len;
pool_read(frontend, &len, sizeof(len));
}
return POOL_END;
case 'Q': /* Query message*/
in_progress = 1;
allow_close_transaction = 1;
status = SimpleQuery(frontend, backend, NULL);
break;
case 'E': /* Execute message */
allow_close_transaction = 1;
status = Execute(frontend, backend);
break;
case 'P': /* Parse message */
allow_close_transaction = 0;
status = Parse(frontend, backend);
break;
case 'S':
receive_extended_begin = 0;
/* fall through */
default:
if (MAJOR(backend) == PROTO_MAJOR_V3)
{
if (MASTER_SLAVE &&
(TSTATE(backend) != 'I' || receive_extended_begin))
{
pool_debug("kind: %c master_slave_dml enabled", fkind);
master_slave_was_enabled = 1;
MASTER_SLAVE = 0;
master_slave_dml = 1;
}
status = SimpleForwardToBackend(fkind, frontend, backend);
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i))
{
if (pool_flush(CONNECTION(backend, i)))
status = POOL_ERROR;
}
}
}
else if (MAJOR(backend) == PROTO_MAJOR_V2 && fkind == 'F')
status = FunctionCall(frontend, backend);
else
{
pool_error("ProcessFrontendResponse: unknown message type %c(%02x)", fkind, fkind);
status = POOL_ERROR;
}
break;
}
if (status != POOL_CONTINUE)
status = POOL_ERROR;
return status;
}
POOL_STATUS CompleteCommandResponse(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend)
{
int i;
char *string = NULL;
char *string1 = NULL;
int len, len1 = 0;
/* read command tag */
string = pool_read_string(MASTER(backend), &len, 0);
if (string == NULL)
return POOL_END;
len1 = len;
string1 = strdup(string);
for (i=0;i<NUM_BACKENDS;i++)
{
if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
continue;
/* read command tag */
string = pool_read_string(CONNECTION(backend, i), &len, 0);
if (string == NULL)
return POOL_END;
if (len != len1)
{
pool_debug("Complete Command Response: message length does not match between master(%d \"%s\",) and %d th server (%d \"%s\",)",
len, string, len1, string1);
free(string1);
return POOL_END;
}
}
/* forward to the frontend */
pool_write(frontend, "C", 1);
pool_debug("Complete Command Response: string: \"%s\"", string1);
if (pool_write(frontend, string1, len1) < 0)
{
free(string1);
return POOL_END;
}
free(string1);
return pool_flush(frontend);
}
int RowDescription(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend,
short *result)
{
short num_fields, num_fields1 = 0;
int oid, mod;
int oid1, mod1;
short size, size1;
char *string;
int len, len1;
int i;
pool_read(MASTER(backend), &num_fields, sizeof(short));
num_fields1 = num_fields;
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
{
/* # of fields (could be 0) */
pool_read(CONNECTION(backend, i), &num_fields, sizeof(short));
if (num_fields != num_fields1)
{
pool_error("RowDescription: num_fields does not match between backends master(%d) and %d th backend(%d)",
num_fields, i, num_fields1);
return POOL_FATAL;
}
}
}
/* forward it to the frontend */
pool_write(frontend, "T", 1);
pool_write(frontend, &num_fields, sizeof(short));
num_fields = ntohs(num_fields);
for (i = 0;i<num_fields;i++)
{
int j;
/* field name */
string = pool_read_string(MASTER(backend), &len, 0);
if (string == NULL)
return POOL_END;
len1 = len;
if (pool_write(frontend, string, len) < 0)
return POOL_END;
for (j=0;j<NUM_BACKENDS;j++)
{
if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
{
string = pool_read_string(CONNECTION(backend, j), &len, 0);
if (string == NULL)
return POOL_END;
if (len != len1)
{
pool_error("RowDescription: field length does not match between backends master(%d) and %d th backend(%d)",
ntohl(len), ntohl(len1));
return POOL_FATAL;
}
}
}
/* type oid */
pool_read(MASTER(backend), &oid, sizeof(int));
oid1 = oid;
pool_debug("RowDescription: type oid: %d", ntohl(oid));
for (j=0;j<NUM_BACKENDS;j++)
{
if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
{
pool_read(CONNECTION(backend, j), &oid, sizeof(int));
/* we do not regard oid mismatch as fatal */
if (oid != oid1)
{
pool_debug("RowDescription: field oid does not match between backends master(%d) and %d th backend(%d)",
ntohl(oid), j, ntohl(oid1));
}
}
}
if (pool_write(frontend, &oid1, sizeof(int)) < 0)
return POOL_END;
/* size */
pool_read(MASTER(backend), &size, sizeof(short));
size1 = size;
for (j=0;j<NUM_BACKENDS;j++)
{
if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
{
pool_read(CONNECTION(backend, j), &size, sizeof(short));
if (size1 != size1)
{
pool_error("RowDescription: field size does not match between backends master(%d) and %d th backend(%d)",
ntohs(size), j, ntohs(size1));
return POOL_FATAL;
}
}
}
pool_debug("RowDescription: field size: %d", ntohs(size));
pool_write(frontend, &size1, sizeof(short));
/* modifier */
pool_read(MASTER(backend), &mod, sizeof(int));
pool_debug("RowDescription: modifier: %d", ntohs(mod));
mod1 = mod;
for (j=0;j<NUM_BACKENDS;j++)
{
if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
{
pool_read(CONNECTION(backend, j), &mod, sizeof(int));
if (mod != mod1)
{
pool_debug("RowDescription: modifier does not match between backends master(%d) and %d th backend(%d)",
ntohl(mod), j, ntohl(mod1));
}
}
}
if (pool_write(frontend, &mod1, sizeof(int)) < 0)
return POOL_END;
}
*result = num_fields;
return pool_flush(frontend);
}
POOL_STATUS AsciiRow(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend,
short num_fields)
{
static char nullmap[8192], nullmap1[8192];
int nbytes;
int i, j;
unsigned char mask;
int size, size1 = 0;
char *buf = NULL, *sendbuf = NULL;
char msgbuf[1024];
pool_write(frontend, "D", 1);
nbytes = (num_fields + 7)/8;
if (nbytes <= 0)
return POOL_CONTINUE;
/* NULL map */
pool_read(MASTER(backend), nullmap, nbytes);
memcpy(nullmap1, nullmap, nbytes);
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
{
pool_read(CONNECTION(backend, i), nullmap, nbytes);
if (memcmp(nullmap, nullmap1, nbytes))
{
/* XXX: NULLMAP maybe different among
backends. If we were a paranoid, we have to treat
this as a fatal error. However in the real world
we'd better to adapt this situation. Just throw a
log... */
pool_debug("AsciiRow: NULLMAP differ between master and %d th backend", i);
}
}
}
if (pool_write(frontend, nullmap1, nbytes) < 0)
return POOL_END;
mask = 0;
for (i = 0;i<num_fields;i++)
{
if (mask == 0)
mask = 0x80;
/* NOT NULL? */
if (mask & nullmap[i/8])
{
/* field size */
if (pool_read(MASTER(backend), &size, sizeof(int)) < 0)
return POOL_END;
size1 = ntohl(size) - 4;
/* read and send actual data only when size > 0 */
if (size1 > 0)
{
sendbuf = pool_read2(MASTER(backend), size1);
if (sendbuf == NULL)
return POOL_END;
}
/* forward to frontend */
pool_write(frontend, &size, sizeof(int));
pool_write(frontend, sendbuf, size1);
snprintf(msgbuf, Min(sizeof(msgbuf), size1+1), "%s", sendbuf);
pool_debug("AsciiRow: len: %d data: %s", size1, msgbuf);
for (j=0;j<NUM_BACKENDS;j++)
{
if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
{
/* field size */
if (pool_read(CONNECTION(backend, j), &size, sizeof(int)) < 0)
return POOL_END;
buf = NULL;
size = ntohl(size) - 4;
/* XXX: field size maybe different among
backends. If we were a paranoid, we have to treat
this as a fatal error. However in the real world
we'd better to adapt this situation. Just throw a
log... */
if (size != size1)
pool_debug("AsciiRow: %d th field size does not match between master(%d) and %d th backend(%d)",
i, ntohl(size), j, ntohl(size1));
/* read and send actual data only when size > 0 */
if (size > 0)
{
buf = pool_read2(CONNECTION(backend, j), size);
if (buf == NULL)
return POOL_END;
}
}
}
}
mask >>= 1;
}
if (pool_flush(frontend))
return POOL_END;
return POOL_CONTINUE;
}
POOL_STATUS BinaryRow(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend,
short num_fields)
{
static char nullmap[8192], nullmap1[8192];
int nbytes;
int i, j;
unsigned char mask;
int size, size1 = 0;
char *buf = NULL;
pool_write(frontend, "B", 1);
nbytes = (num_fields + 7)/8;
if (nbytes <= 0)
return POOL_CONTINUE;
/* NULL map */
pool_read(MASTER(backend), nullmap, nbytes);
if (pool_write(frontend, nullmap, nbytes) < 0)
return POOL_END;
memcpy(nullmap1, nullmap, nbytes);
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
{
pool_read(CONNECTION(backend, i), nullmap, nbytes);
if (memcmp(nullmap, nullmap1, nbytes))
{
/* XXX: NULLMAP maybe different among
backends. If we were a paranoid, we have to treat
this as a fatal error. However in the real world
we'd better to adapt this situation. Just throw a
log... */
pool_debug("BinaryRow: NULLMAP differ between master and %d th backend", i);
}
}
}
mask = 0;
for (i = 0;i<num_fields;i++)
{
if (mask == 0)
mask = 0x80;
/* NOT NULL? */
if (mask & nullmap[i/8])
{
/* field size */
if (pool_read(MASTER(backend), &size, sizeof(int)) < 0)
return POOL_END;
for (j=0;j<NUM_BACKENDS;j++)
{
if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
{
/* field size */
if (pool_read(CONNECTION(backend, i), &size, sizeof(int)) < 0)
return POOL_END;
/* XXX: field size maybe different among
backends. If we were a paranoid, we have to treat
this as a fatal error. However in the real world
we'd better to adapt this situation. Just throw a
log... */
if (size != size1)
pool_debug("BinaryRow: %d th field size does not match between master(%d) and %d th backend(%d)",
i, ntohl(size), j, ntohl(size1));
}
buf = NULL;
/* forward to frontend */
if (IS_MASTER_NODE_ID(j))
pool_write(frontend, &size, sizeof(int));
size = ntohl(size) - 4;
/* read and send actual data only when size > 0 */
if (size > 0)
{
buf = pool_read2(CONNECTION(backend, j), size);
if (buf == NULL)
return POOL_END;
if (IS_MASTER_NODE_ID(j))
{
pool_write(frontend, buf, size);
}
}
}
mask >>= 1;
}
}
if (pool_flush(frontend))
return POOL_END;
return POOL_CONTINUE;
}
POOL_STATUS CursorResponse(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend)
{
char *string = NULL;
char *string1 = NULL;
int len, len1 = 0;
int i;
/* read cursor name */
string = pool_read_string(MASTER(backend), &len, 0);
if (string == NULL)
return POOL_END;
len1 = len;
string1 = strdup(string);
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
{
/* read cursor name */
string = pool_read_string(CONNECTION(backend, i), &len, 0);
if (string == NULL)
return POOL_END;
if (len != len1)
{
pool_error("CursorResponse: length does not match between master(%d) and %d th backend(%d)",
len, i, len1);
pool_error("CursorResponse: master(%s) %d th backend(%s)", string1, string);
free(string1);
return POOL_END;
}
}
}
/* forward to the frontend */
pool_write(frontend, "P", 1);
if (pool_write(frontend, string1, len1) < 0)
{
free(string1);
return POOL_END;
}
free(string1);
if (pool_flush(frontend))
return POOL_END;
return POOL_CONTINUE;
}
POOL_STATUS ErrorResponse(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend)
{
char *string = NULL;
int len;
int i;
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i))
{
/* read error message */
string = pool_read_string(CONNECTION(backend, i), &len, 0);
if (string == NULL)
return POOL_END;
}
}
/* forward to the frontend */
pool_write(frontend, "E", 1);
if (pool_write_and_flush(frontend, string, len) < 0)
return POOL_END;
/* change transaction state */
if (TSTATE(backend) == 'T')
TSTATE(backend) = 'E';
else
TSTATE(backend) = 'I';
return POOL_CONTINUE;
}
POOL_STATUS NoticeResponse(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend)
{
char *string = NULL;
int len;
int i;
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i))
{
/* read notice message */
string = pool_read_string(CONNECTION(backend, i), &len, 0);
if (string == NULL)
return POOL_END;
}
}
/* forward to the frontend */
pool_write(frontend, "N", 1);
if (pool_write_and_flush(frontend, string, len) < 0)
{
return POOL_END;
}
return POOL_CONTINUE;
}
POOL_STATUS CopyInResponse(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend)
{
POOL_STATUS status;
/* forward to the frontend */
if (MAJOR(backend) == PROTO_MAJOR_V3)
{
if (SimpleForwardToFrontend('G', frontend, backend) != POOL_CONTINUE)
return POOL_END;
if (pool_flush(frontend) != POOL_CONTINUE)
return POOL_END;
}
else
if (pool_write_and_flush(frontend, "G", 1) < 0)
return POOL_END;
status = CopyDataRows(frontend, backend, 1);
return status;
}
POOL_STATUS CopyOutResponse(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend)
{
POOL_STATUS status;
/* forward to the frontend */
if (MAJOR(backend) == PROTO_MAJOR_V3)
{
if (SimpleForwardToFrontend('H', frontend, backend) != POOL_CONTINUE)
return POOL_END;
if (pool_flush(frontend) != POOL_CONTINUE)
return POOL_END;
}
else
if (pool_write_and_flush(frontend, "H", 1) < 0)
return POOL_END;
status = CopyDataRows(frontend, backend, 0);
return status;
}
POOL_STATUS CopyDataRows(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend, int copyin)
{
char *string = NULL;
int len;
int i;
DistDefInfo *info = NULL;
#ifdef DEBUG
int j = 0;
char buf[1024];
#endif
if (copyin && pool_config->parallel_mode == TRUE)
{
info = pool_get_dist_def_info(MASTER_CONNECTION(backend)->sp->database,
copy_schema,
copy_table);
}
for (;;)
{
if (copyin)
{
if (MAJOR(backend) == PROTO_MAJOR_V3)
{
char kind;
int sendlen;
char *p, *p1;
if (pool_read(frontend, &kind, 1) < 0)
return POOL_END;
if (info && kind == 'd')
{
int id;
if (pool_read(frontend, &sendlen, sizeof(sendlen)))
{
return POOL_END;
}
len = ntohl(sendlen) - 4;
if (len <= 0)
return POOL_CONTINUE;
p = pool_read2(frontend, len);
if (p == NULL)
return POOL_END;
/* copy end ? */
if (len == 3 && memcmp(p, "\\.\n", 3) == 0)
{
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i))
{
if (pool_write(CONNECTION(backend, i), &kind, 1))
return POOL_END;
if (pool_write(CONNECTION(backend, i), &sendlen, sizeof(sendlen)))
return POOL_END;
if (pool_write(CONNECTION(backend, i), p, len))
return POOL_END;
}
}
}
else
{
p1 = parse_copy_data(p, len, copy_delimiter, info->dist_key_col_id);
if (!p1)
{
pool_error("CopyDataRow: cannot parse data");
return POOL_END;
}
else if (strcmp(p1, copy_null) == 0)
{
pool_error("CopyDataRow: key parameter is NULL");
free(p1);
return POOL_END;
}
id = pool_get_id(info, p1);
pool_debug("CopyDataRow: copying id: %d", id);
free(p1);
if (!VALID_BACKEND(id))
{
exit(1);
}
if (pool_write(CONNECTION(backend, id), &kind, 1))
{
return POOL_END;
}
if (pool_write(CONNECTION(backend, id), &sendlen, sizeof(sendlen)))
{
return POOL_END;
}
if (pool_write_and_flush(CONNECTION(backend, id), p, len))
{
return POOL_END;
}
}
}
else
{
SimpleForwardToBackend(kind, frontend, backend);
}
/* CopyData? */
if (kind == 'd')
continue;
else
{
pool_debug("CopyDataRows: copyin kind other than d (%c)", kind);
break;
}
}
else
string = pool_read_string(frontend, &len, 1);
}
else
{
/* CopyOut */
if (MAJOR(backend) == PROTO_MAJOR_V3)
{
signed char kind;
if ((kind = pool_read_kind(backend)) < 0)
return POOL_END;
SimpleForwardToFrontend(kind, frontend, backend);
/* CopyData? */
if (kind == 'd')
continue;
else
break;
}
else
{
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i))
{
string = pool_read_string(CONNECTION(backend, i), &len, 1);
}
}
}
}
if (string == NULL)
return POOL_END;
#ifdef DEBUG
strncpy(buf, string, len);
pool_debug("copy line %d %d bytes :%s:", j++, len, buf);
#endif
if (copyin)
{
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i))
{
pool_write(CONNECTION(backend, i), string, len);
}
}
}
else
pool_write(frontend, string, len);
if (len == PROTO_MAJOR_V3)
{
/* end of copy? */
if (string[0] == '\\' &&
string[1] == '.' &&
string[2] == '\n')
{
break;
}
}
}
if (copyin)
{
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i))
{
if (pool_flush(CONNECTION(backend, i)) <0)
return POOL_END;
if (synchronize(CONNECTION(backend, i)))
return POOL_END;
}
}
}
else
if (pool_flush(frontend) <0)
return POOL_END;
return POOL_CONTINUE;
}
POOL_STATUS EmptyQueryResponse(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend)
{
char c;
int i;
for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i))
{
if (pool_read(CONNECTION(backend, i), &c, sizeof(c)) < 0)
return POOL_END;
}
}
pool_write(frontend, "I", 1);
return pool_write_and_flush(frontend, "", 1);
}
More information about the Pgpool-general
mailing list