[pgpool-general-jp: 8] 拡張問い合わせプロトコルの負荷分散パッチ
Yoshiyuki Asaba
y-asaba @ sraoss.co.jp
2006年 10月 4日 (水) 18:45:11 JST
浅羽です。
現在の pgpool では JDBC ドライバ等を使った場合に負荷分散できないという
制限があります。
README.euc_jp
1.4 負荷分散(load balance)について
...
なお、Extended query(Pare/Prepare/Bind/Executeを使用する)問い合わせ
は負荷分散の対象になりません。これは、Execute時にSELECT文かどうかの
判定ができないからです。たとえば、PostgreSQL 8.0以降のJDBCドライバ
では必ずExtended queryを使うため、実際問題として負荷分散できません。
これは最新のドライバ(JDBC や ODBC など)で prepared statement を使用す
るとひっかかるかと思います。
そこでこの制限を解除するパッチを作成しました。手元では JDBC による簡単
なテストを行い、ちゃんと動いているのと、メモリリークしていないのを確認
しました。ただし、すべてのドライバでテストをできていません。そこでどな
たか実験していただくことは可能でしょうか?
パッチは CVS 最新版に対するものになります。 configure のあるディレクト
リで
% patch -p0 < pool_process_query.c.diff
を実行してください。もしくは以下の URL から CVS 最新版を tar で固めた
ものを置きました。
http://pgpool.sraoss.jp/index.php?plugin=attach&pcmd=open&file=pgpool-3.1.1.tar.gz&refer=pgpool
ご協力よろしくお願い致します。
--
Yoshiyuki Asaba
y-asaba @ sraoss.co.jp
-------------- next part --------------
Index: pool_process_query.c
===================================================================
RCS file: /cvsroot/pgpool/pgpool/pool_process_query.c,v
retrieving revision 1.31
diff -c -r1.31 pool_process_query.c
*** pool_process_query.c 13 Sep 2006 10:13:20 -0000 1.31
--- pool_process_query.c 4 Oct 2006 08:51:46 -0000
***************
*** 41,53 ****
#define INIT_STATEMENT_LIST_SIZE 8
/*
* prepared statement list
*/
typedef struct {
int size;
int cnt;
! char **stmt_list;
} PreparedStatementList;
static POOL_STATUS NotificationResponse(POOL_CONNECTION *frontend,
--- 41,64 ----
#define INIT_STATEMENT_LIST_SIZE 8
+ typedef struct {
+ char *statement_name;
+ char *portal_name;
+ char *prepared_string;
+ } PreparedStatement;
+
+ typedef struct {
+ char *portal_name;
+ PreparedStatement *prepared_statement;
+ } Portal;
+
/*
* prepared statement list
*/
typedef struct {
int size;
int cnt;
! PreparedStatement **stmt_list;
} PreparedStatementList;
static POOL_STATUS NotificationResponse(POOL_CONNECTION *frontend,
***************
*** 116,126 ****
static int need_insert_lock(POOL_CONNECTION_POOL *backend, char *query);
static POOL_STATUS insert_lock(POOL_CONNECTION_POOL *backend, char *query);
static char *get_insert_command_table_name(char *query);
static char *skip_comment(char *query);
! static void add_prepared_list(PreparedStatementList *p, char *name);
! static void del_prepared_list(PreparedStatementList *p, char *name);
static void reset_prepared_list(PreparedStatementList *p);
static int send_deallocate(POOL_CONNECTION_POOL *backend, PreparedStatementList *p, int n);
static POOL_CONNECTION_POOL_SLOT *slots[MAX_CONNECTION_SLOTS];
--- 127,142 ----
static int need_insert_lock(POOL_CONNECTION_POOL *backend, char *query);
static POOL_STATUS insert_lock(POOL_CONNECTION_POOL *backend, char *query);
static char *get_insert_command_table_name(char *query);
+ static char *get_execute_command_portal_name(char *query);
+ static PreparedStatement *get_prepared_command_portal_and_statement(char *query);
static char *skip_comment(char *query);
! static void add_prepared_list(PreparedStatementList *p, PreparedStatement *stmt);
! static void add_unnamed_portal(PreparedStatementList *p, PreparedStatement *stmt);
! static void del_prepared_list(PreparedStatementList *p, PreparedStatement *stmt);
static void reset_prepared_list(PreparedStatementList *p);
+ static PreparedStatement *lookup_prepared_statement_by_statement(PreparedStatementList *p, const char *name);
+ static PreparedStatement *lookup_prepared_statement_by_portal(PreparedStatementList *p, const char *name);
static int send_deallocate(POOL_CONNECTION_POOL *backend, PreparedStatementList *p, int n);
static POOL_CONNECTION_POOL_SLOT *slots[MAX_CONNECTION_SLOTS];
***************
*** 131,140 ****
static int master_slave_was_enabled; /* master/slave mode was enabled */
static int internal_transaction_started; /* to issue table lock command a transaction
has been started internally */
! static void (*pending_function)(PreparedStatementList *p, char *name) = NULL;
! static char *pending_prepared_name = NULL;
static PreparedStatementList prepared_list; /* prepared statement name list */
static int is_drop_database(char *query); /* returns non 0 if this is a DROP DATABASE command */
POOL_STATUS pool_process_query(POOL_CONNECTION *frontend,
--- 147,158 ----
static int master_slave_was_enabled; /* master/slave mode was enabled */
static int internal_transaction_started; /* to issue table lock command a transaction
has been started internally */
! static void (*pending_function)(PreparedStatementList *p, PreparedStatement *statement) = NULL;
! static PreparedStatement *pending_prepared_stmt = NULL;
static PreparedStatementList prepared_list; /* prepared statement name list */
+ static PreparedStatement *unnamed_statement = NULL;
+ static PreparedStatement *unnamed_portal = NULL;
static int is_drop_database(char *query); /* returns non 0 if this is a DROP DATABASE command */
POOL_STATUS pool_process_query(POOL_CONNECTION *frontend,
***************
*** 544,550 ****
static POOL_STATUS Query(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend, char *query)
{
! char *string;
int len;
static char *sq = "show pool_status";
POOL_STATUS status;
--- 562,568 ----
static POOL_STATUS Query(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend, char *query)
{
! char *string, *string1;
int len;
static char *sq = "show pool_status";
POOL_STATUS status;
***************
*** 613,620 ****
}
if (frontend &&
! (strncasecmp("prepare", string, 7) == 0 ||
! strncasecmp("deallocate", string, 10) == 0))
{
char *query = string;
char *buf, *name;
--- 631,651 ----
}
if (frontend &&
! (strncasecmp("prepare", string, 7) == 0))
! {
! PreparedStatement *stmt;
!
! stmt = get_prepared_command_portal_and_statement(string);
! /* could not get info. probably wrong SQL command */
! if (stmt == NULL)
! {
! return POOL_CONTINUE;
! }
! pending_function = add_prepared_list;
! pending_prepared_stmt = stmt;
! }
! else if (frontend &&
! strncasecmp("deallocate", string, 10) == 0)
{
char *query = string;
char *buf, *name;
***************
*** 630,650 ****
buf = strdup(query);
name = strtok(buf, "\t\r\n (;");
! if (name && (*string == 'p' || *string == 'P'))
{
! pending_function = add_prepared_list;
! pending_prepared_name = strdup(name);
}
! else if (name && (*string == 'd' || *string == 'D'))
{
! pending_function = del_prepared_list;
! pending_prepared_name = strdup(name);
}
free(buf);
}
/* load balance trick */
! if (load_balance_enabled(backend, string))
start_load_balance(backend);
else if (MASTER_SLAVE)
{
--- 661,713 ----
buf = strdup(query);
name = strtok(buf, "\t\r\n (;");
!
! pending_function = del_prepared_list;
! pending_prepared_stmt = malloc(sizeof(PreparedStatement));
! if (pending_prepared_stmt == NULL)
{
! pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno));
! return POOL_END;
}
!
! pending_prepared_stmt->statement_name = strdup(name);
! pending_prepared_stmt->portal_name = NULL;
! if (pending_prepared_stmt->portal_name == NULL ||
! pending_prepared_stmt->statement_name == NULL)
{
! pool_error("SimpleForwardToBackend: strdup failed: %s", strerror(errno));
! return POOL_END;
}
free(buf);
}
+ if (frontend &&
+ (strncasecmp("execute", string, 7) == 0))
+ {
+ char *portal_name = get_execute_command_portal_name(string);
+ PreparedStatement *stmt;
+
+ /* could not get portal name. probably wrong SQL command */
+ if (portal_name == NULL)
+ {
+ return POOL_CONTINUE;
+ }
+
+ stmt = lookup_prepared_statement_by_statement(&prepared_list,
+ portal_name);
+
+ if (!stmt)
+ string1 = string;
+ else
+ string1 = stmt->prepared_string;
+ }
+ else
+ {
+ string1 = string;
+ }
+
/* load balance trick */
! if (load_balance_enabled(backend, string1))
start_load_balance(backend);
else if (MASTER_SLAVE)
{
***************
*** 749,754 ****
--- 812,818 ----
int i;
char kind;
int status;
+ PreparedStatement *stmt;
/* read Execute packet */
if (pool_read(frontend, &len, sizeof(len)) < 0)
***************
*** 759,764 ****
--- 823,840 ----
pool_debug("Execute: portal name <%s>", string);
+ stmt = lookup_prepared_statement_by_portal(&prepared_list, string);
+
+ /* load balance trick */
+ if (stmt && load_balance_enabled(backend, stmt->prepared_string))
+ start_load_balance(backend);
+ else if (MASTER_SLAVE)
+ {
+ master_slave_was_enabled = 1;
+ MASTER_SLAVE = 0;
+ master_slave_dml = 1;
+ }
+
for (i = 0;i < backend->num;i++)
{
POOL_CONNECTION *cp = backend->slots[i]->con;
***************
*** 809,814 ****
--- 885,901 ----
return status;
pool_flush(frontend);
+ /* end load balance mode */
+ if (in_load_balance)
+ end_load_balance(backend);
+
+ if (master_slave_dml)
+ {
+ MASTER_SLAVE = 1;
+ master_slave_was_enabled = 0;
+ master_slave_dml = 0;
+ }
+
return POOL_CONTINUE;
}
***************
*** 2369,2382 ****
* pending prepared statement.
*/
if ((kind == 'C' || kind == '1' || kind == '3') &&
! pending_function && pending_prepared_name)
{
! pending_function(&prepared_list, pending_prepared_name);
}
- free(pending_prepared_name);
pending_function = NULL;
! pending_prepared_name = NULL;
status = pool_read(MASTER(backend), &len, sizeof(len));
if (status < 0)
--- 2456,2468 ----
* pending prepared statement.
*/
if ((kind == 'C' || kind == '1' || kind == '3') &&
! pending_function && pending_prepared_stmt)
{
! pending_function(&prepared_list, pending_prepared_stmt);
}
pending_function = NULL;
! pending_prepared_stmt = NULL;
status = pool_read(MASTER(backend), &len, sizeof(len));
if (status < 0)
***************
*** 2574,2581 ****
if (pool_write_and_flush(SECONDARY(backend), p, len))
return POOL_END;
! if (kind == 'P' && *p)
{
name_len = strlen(p) + 3;
name = malloc(name_len);
if (name == NULL)
--- 2660,2669 ----
if (pool_write_and_flush(SECONDARY(backend), p, len))
return POOL_END;
! if (kind == 'P') /* Parse message? */
{
+ char *stmt;
+
name_len = strlen(p) + 3;
name = malloc(name_len);
if (name == NULL)
***************
*** 2584,2591 ****
return POOL_END;
}
sprintf(name, "\"%s\"", p);
! pending_function = add_prepared_list;
! pending_prepared_name = name;
}
else if (kind == 'C' && *p == 'S' && *(p + 1))
{
--- 2672,2739 ----
return POOL_END;
}
sprintf(name, "\"%s\"", p);
!
! pending_prepared_stmt = malloc(sizeof(PreparedStatement));
! if (pending_prepared_stmt == NULL)
! {
! pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno));
! return POOL_END;
! }
! pending_prepared_stmt->portal_name = NULL;
!
! if (*p)
! {
! pending_function = add_prepared_list;
! pending_prepared_stmt->statement_name = name;
! }
! else
! {
! pending_function = add_unnamed_portal;
! pending_prepared_stmt->statement_name = NULL;
! free(name);
! }
!
! /* copy prepared statement string */
! stmt = p;
! stmt += strlen(p) + 1;
! pending_prepared_stmt->prepared_string = strdup(stmt);
! if (pending_prepared_stmt->prepared_string == NULL)
! {
! pool_error("SimpleForwardToBackend: strdup failed: %s", strerror(errno));
! return POOL_END;
! }
! }
! else if (kind == 'B') /* Bind message? */
! {
! char *stmt_name, *portal_name;
! PreparedStatement *stmt;
!
! portal_name = p;
! stmt_name = p + strlen(portal_name) + 1;
!
! pool_debug("bind message: portal_name %s stmt_name %s", portal_name, stmt_name);
!
! if (*stmt_name == '\0')
! stmt = unnamed_statement;
! else
! {
! char *name = malloc(strlen(stmt_name) + 3);
! sprintf(name, "\"%s\"", stmt_name);
! stmt = lookup_prepared_statement_by_statement(&prepared_list, name);
! free(name);
! }
!
! if (stmt == NULL)
! free(name);
! else if (*portal_name == '\0')
! unnamed_portal = stmt;
! else
! {
! if (stmt->portal_name)
! free(stmt->portal_name);
! stmt->portal_name = strdup(portal_name);
! }
!
}
else if (kind == 'C' && *p == 'S' && *(p + 1))
{
***************
*** 2598,2604 ****
}
sprintf(name, "\"%s\"", p + 1);
pending_function = del_prepared_list;
! pending_prepared_name = name;
}
if (kind == 'P' || kind == 'B' || kind == 'D' || kind == 'C')
--- 2746,2765 ----
}
sprintf(name, "\"%s\"", p + 1);
pending_function = del_prepared_list;
! pending_prepared_stmt = malloc(sizeof(PreparedStatement));
! if (pending_prepared_stmt == NULL)
! {
! pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno));
! return POOL_END;
! }
!
! pending_prepared_stmt->statement_name = strdup(name);
! if (pending_prepared_stmt->statement_name == NULL)
! {
! pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno));
! return POOL_END;
! }
! pending_prepared_stmt->prepared_string = NULL;
}
if (kind == 'P' || kind == 'B' || kind == 'D' || kind == 'C')
***************
*** 3144,3149 ****
--- 3305,3445 ----
return table;
}
+ /*
+ * obtain portal name in EXECUTE statement
+ */
+ static char *get_execute_command_portal_name(char *query)
+ {
+ static char portal[1024];
+ char *qbuf;
+ char *token;
+
+ portal[0] = '\0';
+
+ /* skip comment */
+ query = skip_comment(query);
+
+ if (*query == '\0')
+ return portal;
+
+ /* skip spaces */
+ while (*query && isspace(*query))
+ query++;
+
+ /* skip non spaces(EXECUTE) */
+ while (*query && !isspace(*query))
+ query++;
+
+ /* skip spaces */
+ while (*query && isspace(*query))
+ query++;
+
+ /* get portal name */
+ qbuf = strdup(query);
+ token = strtok(qbuf, "\r\n\t (");
+
+ if (token == NULL)
+ {
+ pool_error("get_execute_command_portal_name: could not get portal name");
+ return NULL;
+ }
+
+ strncpy(portal, token, sizeof(portal));
+ free(qbuf);
+
+ pool_debug("get_execute_command_portal_name: extracted portal name: %s", portal);
+
+ return portal;
+ }
+
+ /*
+ * obtain portal name and statement in PREPARED statement
+ */
+ static PreparedStatement *get_prepared_command_portal_and_statement(char *query)
+ {
+ PreparedStatement *stmt;
+ static char portal[1024];
+ char *string = NULL;
+ char *qbuf;
+ char *token;
+ int len;
+
+ portal[0] = '\0';
+
+ /* skip comment */
+ query = skip_comment(query);
+
+ if (*query == '\0')
+ return NULL;
+
+ /* skip spaces */
+ while (*query && isspace(*query))
+ query++;
+
+ /* skip non spaces(PREPARED) */
+ while (*query && !isspace(*query))
+ query++;
+
+ /* skip spaces */
+ while (*query && isspace(*query))
+ query++;
+
+ /* get portal name */
+ qbuf = strdup(query);
+ token = strtok(qbuf, "\r\n\t (");
+
+ if (token == NULL)
+ {
+ pool_error("get_prepared_command_portal_and_statement: could not get portal name");
+ return NULL;
+ }
+
+ strncpy(portal, token, sizeof(portal));
+ free(qbuf);
+
+ /* skip data type list */
+ while (*query && *query != ')')
+ query++;
+ query++;
+
+ /* skip spaces */
+ while (*query && isspace(*query))
+ query++;
+
+ /* skip non spaces(AS) */
+ while (*query && !isspace(*query))
+ query++;
+
+ /* skip spaces */
+ while (*query && isspace(*query))
+ query++;
+
+ if (!*query)
+ {
+ pool_error("get_prepared_command_portal_and_statement: could not get statement");
+ return NULL;
+ }
+
+ len = strlen(query) + 1;
+ string = malloc(len);
+ if (string == NULL)
+ {
+ pool_error("get_prepared_command_portal_and_statement: malloc failed: %s", strerror(errno));
+ return NULL;
+ }
+ memcpy(string, query, len);
+
+ stmt = malloc(sizeof(PreparedStatement));
+ stmt->statement_name = strdup(portal);
+ stmt->portal_name = NULL;
+ stmt->prepared_string = string;
+
+ pool_debug("get_prepared_command_portal_and_statement: extracted portal name: %s portal statement: %s", stmt->portal_name, stmt->prepared_string);
+
+ return stmt;
+ }
+
+
/* judge if this is a DROP DATABASE command */
static int is_drop_database(char *query)
{
***************
*** 3207,3213 ****
}
}
! static void add_prepared_list(PreparedStatementList *p, char *name)
{
if (p->cnt == p->size)
{
--- 3503,3509 ----
}
}
! static void add_prepared_list(PreparedStatementList *p, PreparedStatement *stmt)
{
if (p->cnt == p->size)
{
***************
*** 3220,3246 ****
}
}
! p->stmt_list[p->cnt++] = strdup(name);
}
! static void del_prepared_list(PreparedStatementList *p, char *name)
{
int i;
for (i = 0; i < p->cnt; i++)
{
! if (strcmp(p->stmt_list[i], name) == 0)
! break;
}
if (i == p->cnt)
return;
free(p->stmt_list[i]);
if (i != p->cnt - 1)
{
memmove(&p->stmt_list[i], &p->stmt_list[i+1],
! sizeof(char *) * (p->cnt - i - 1));
}
p->cnt--;
}
--- 3516,3560 ----
}
}
! p->stmt_list[p->cnt++] = stmt;
! }
!
! static void add_unnamed_portal(PreparedStatementList *p, PreparedStatement *stmt)
! {
! if (unnamed_statement && unnamed_statement->statement_name == NULL)
! {
! free(unnamed_statement->prepared_string);
! free(unnamed_statement);
! }
!
! unnamed_portal = NULL;
! unnamed_statement = stmt;
}
! static void del_prepared_list(PreparedStatementList *p, PreparedStatement *stmt)
{
int i;
for (i = 0; i < p->cnt; i++)
{
! if (strcmp(p->stmt_list[i]->statement_name, stmt->statement_name) == 0)
! break;
}
+
+ free(stmt->statement_name);
+ free(stmt);
if (i == p->cnt)
return;
+ free(p->stmt_list[i]->statement_name);
+ free(p->stmt_list[i]->portal_name);
+ free(p->stmt_list[i]->prepared_string);
free(p->stmt_list[i]);
if (i != p->cnt - 1)
{
memmove(&p->stmt_list[i], &p->stmt_list[i+1],
! sizeof(PreparedStatement *) * (p->cnt - i - 1));
}
p->cnt--;
}
***************
*** 3250,3259 ****
--- 3564,3613 ----
int i;
for (i = 0; i < p->cnt; i++)
+ {
+ free(p->stmt_list[i]->statement_name);
+ free(p->stmt_list[i]->portal_name);
+ free(p->stmt_list[i]->prepared_string);
free(p->stmt_list[i]);
+ }
p->cnt = 0;
}
+ static PreparedStatement *lookup_prepared_statement_by_statement(PreparedStatementList *p, const char *name)
+ {
+ int i;
+
+ /* unnamed portal? */
+ if (name == NULL || name[0] == '\0' || (name[0] == '\"' && name[1] == '\"'))
+ return unnamed_statement;
+
+ for (i = 0; i < p->cnt; i++)
+ {
+ if (strcmp(p->stmt_list[i]->statement_name, name) == 0)
+ return p->stmt_list[i];
+ }
+
+ return NULL;
+ }
+
+ static PreparedStatement *lookup_prepared_statement_by_portal(PreparedStatementList *p, const char *name)
+ {
+ int i;
+
+ /* unnamed portal? */
+ if (name == NULL || name[0] == '\0' || (name[0] == '\"' && name[1] == '\"'))
+ return unnamed_portal;
+
+ for (i = 0; i < p->cnt; i++)
+ {
+ if (p->stmt_list[i]->portal_name &&
+ strcmp(p->stmt_list[i]->portal_name, name) == 0)
+ return p->stmt_list[i];
+ }
+
+ return NULL;
+ }
+
static int send_deallocate(POOL_CONNECTION_POOL *backend, PreparedStatementList *p,
int n)
{
***************
*** 3263,3276 ****
if (p->cnt <= n)
return 1;
! len = strlen(p->stmt_list[n]) + 12; /* "DEALLOCATE " + '\0' */
query = malloc(len);
if (query == NULL)
{
pool_error("send_deallocate: malloc failed: %s", strerror(errno));
exit(1);
}
! sprintf(query, "DEALLOCATE %s", p->stmt_list[n]);
if (Query(NULL, backend, query) != POOL_CONTINUE)
{
free(query);
--- 3617,3630 ----
if (p->cnt <= n)
return 1;
! len = strlen(p->stmt_list[n]->statement_name) + 12; /* "DEALLOCATE " + '\0' */
query = malloc(len);
if (query == NULL)
{
pool_error("send_deallocate: malloc failed: %s", strerror(errno));
exit(1);
}
! sprintf(query, "DEALLOCATE %s", p->stmt_list[n]->statement_name);
if (Query(NULL, backend, query) != POOL_CONTINUE)
{
free(query);
pgpool-general-jp メーリングリストの案内