View Issue Details

IDProjectCategoryView StatusLast Update
0000271Pgpool-IIBugpublic2017-03-31 17:06
Reportersupp_kAssigned Tot-ishii 
PriorityurgentSeveritymajorReproducibilityrandom
Status closedResolutionopen 
PlatformOSCentOS OS Version6.x
Product Version3.5.4 
Target Version3.6.3Fixed in Version 
Summary0000271: balanced requests after started transaction
DescriptionWe see that in transaction pgpool balances requests to standby read-only backend. But this should happen because slave doesn't contain required data until the transaction committed on slave.


The bug is related to pgpool v3.6.
Additional InformationWe analysed the log records and did the conclusion:

First 5 lines show the transaction was initiated and for some period pgpool routes requests to the backend #0 only which is master. But then we see the request (2016-12-15T12:46:13.229382+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79083-1]) was routed to standby backend 0000001. At this we moment we face data inconsistency in the frontend because for sure the backend 0000001 doesn't contain the data which was created in backend #0 until it is commited. The SELECT for the data is performed within the same transaction in which it was INSERT-ed.





2016-12-15T12:46:12.007225+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle[16742]: [78172-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:12.007229+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle[16742]: [78172-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:12.007240+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle[16742]: [78175-1] LOG: DB node id: 1 backend pid: 2946 statement: B message
2016-12-15T12:46:12.007242+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle[16742]: [78175-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:12.007446+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle[16742]: [78189-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: BEGIN
2016-12-15T12:46:12.007455+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle[16742]: [78189-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:12.007480+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle[16742]: [78194-1] LOG: DB node id: 1 backend pid: 2946 statement: Execute: BEGIN
2016-12-15T12:46:12.007481+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle[16742]: [78194-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:12.008519+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle[16742]: [78260-1] LOG: DB node id: 1 backend pid: 2946 statement: Parse: SELECT 1
2016-12-15T12:46:12.008525+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle[16742]: [78260-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:12.008653+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle[16742]: [78274-1] LOG: DB node id: 1 backend pid: 2946 statement: B message
2016-12-15T12:46:12.008659+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle[16742]: [78274-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:12.008823+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle[16742]: [78287-1] LOG: DB node id: 1 backend pid: 2946 statement: D message
2016-12-15T12:46:12.008825+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle[16742]: [78287-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:12.008906+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle[16742]: [78297-1] LOG: DB node id: 1 backend pid: 2946 statement: Execute: SELECT 1
2016-12-15T12:46:12.008915+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle[16742]: [78297-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.218025+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78372-1] LOG: DB node id: 1 backend pid: 2946 statement: B message
2016-12-15T12:46:13.218034+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78372-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.218197+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78386-1] LOG: DB node id: 1 backend pid: 2946 statement: Execute: SELECT CASE WHEN sr.sub_limit = -1 THEN 1 ELSE 0 END AS is_inf, sr.sub_limit - sr.curr_usage AS res_avail, sr.rt_instance_id AS rt_instance_id FROM resource_classes rc JOIN resource_types rt ON (rc.class_id = rt.class_id) JOIN subs_resources sr ON (sr.rt_id = rt.rt_id) JOIN subscriptions s ON (sr.sub_id = s.sub_id) WHERE rc.name = $1 AND s.owner_id = $2 AND s.is_active = 1 ORDER BY is_inf DESC, res_avail DESC, sr.rt_instance_id
2016-12-15T12:46:13.218203+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78386-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.221080+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78446-1] LOG: DB node id: 0 backend pid: 17007 statement: Parse: INSERT INTO accounts_serial DEFAULT VALUES
2016-12-15T12:46:13.221083+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78446-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.221220+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78460-1] LOG: DB node id: 0 backend pid: 17007 statement: D message
2016-12-15T12:46:13.221223+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78460-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.222001+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78520-1] LOG: DB node id: 0 backend pid: 17007 statement: Parse: INSERT INTO accounts_serial DEFAULT VALUES
2016-12-15T12:46:13.222006+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78520-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.222146+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78534-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:13.222150+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78534-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.222272+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78547-1] LOG: DB node id: 0 backend pid: 17007 statement: D message
2016-12-15T12:46:13.222276+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78547-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.222351+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78557-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: INSERT INTO accounts_serial DEFAULT VALUES
2016-12-15T12:46:13.222355+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78557-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.223461+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78629-1] LOG: DB node id: 0 backend pid: 17007 statement: Parse: select currval('accounts_serial_seq')
2016-12-15T12:46:13.223465+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78629-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.223592+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78642-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:13.223594+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78642-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.223708+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78655-1] LOG: DB node id: 0 backend pid: 17007 statement: D message
2016-12-15T12:46:13.223711+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78655-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.223814+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78665-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: select currval('accounts_serial_seq')
2016-12-15T12:46:13.223822+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78665-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.224918+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78742-1] LOG: DB node id: 0 backend pid: 17007 statement: Parse: DELETE FROM accounts_serial WHERE account_id = $1
2016-12-15T12:46:13.224926+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78742-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.225052+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78755-1] LOG: DB node id: 0 backend pid: 17007 statement: D message
2016-12-15T12:46:13.225056+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78755-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.225842+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78815-1] LOG: DB node id: 0 backend pid: 17007 statement: Parse: DELETE FROM accounts_serial WHERE account_id = $1
2016-12-15T12:46:13.225846+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78815-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.225965+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78829-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:13.225973+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78829-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.226088+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78842-1] LOG: DB node id: 0 backend pid: 17007 statement: D message
2016-12-15T12:46:13.226092+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78842-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.226179+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78852-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: DELETE FROM accounts_serial WHERE account_id = $1
2016-12-15T12:46:13.226183+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78852-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.227219+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78921-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:13.227222+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78921-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.227380+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78935-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: insert into addresses (address, address2, address3, city, country, postal_code, state) values ($1, $2, $3, $4, $5, $6, $7)
2016-12-15T12:46:13.227383+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78935-2] #011RETURNING *
2016-12-15T12:46:13.227386+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78935-3] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.228185+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78995-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:13.228188+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [78995-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.228343+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79009-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: insert into contacts (display_name) values ($1)
2016-12-15T12:46:13.228345+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79009-2] #011RETURNING *
2016-12-15T12:46:13.228348+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79009-3] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.229216+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79069-1] LOG: DB node id: 1 backend pid: 2946 statement: B message
2016-12-15T12:46:13.229220+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79069-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.229382+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79083-1] LOG: DB node id: 1 backend pid: 2946 statement: Execute: select account0_.account_id as account_1_3_0_, account0_.address_id as address21_3_0_, account0_.adm_contact_id as adm_con22_3_0_, account0_.aps_uuid as aps_uuid2_3_0_, account0_.ext_system_id as ext_sys23_3_0_, account0_.bill_contact_id as bill_co24_3_0_, account0_.brand_name as brand_na3_3_0_, account0_.ccp_version as ccp_vers4_3_0_, account0_.company_name as company_5_3_0_, account0_.customers_quantity as customer6_3_0_, account0_.l_country_code_default as l_countr7_3_0_, account0_.l_language_code_default as l_langua8_3_0_, account0_.l_variant_default as l_varian9_3_0_, account0_.ext_account_id as ext_acc10_3_0_, account0_.c_time as c_time11_3_0_, account0_.l_country_code as l_count12_3_0_, account0_.l_language_code as l_langu13_3_0_, account0_.l_variant as l_varia14_3_0_, account0_.is_locked as is_lock15_3_0_, account0_.note
2016-12-15T12:46:13.229388+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79083-2] as note16_3_0_, account0_.owner_id as owner_i25_3_0_, account0_.path as path17_3_0_, account0_.is_personal as is_pers18_3_0_, account0_.rt_instance_id as rt_inst19_3_0_, acc
2016-12-15T12:46:13.229390+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79083-3] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.230519+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79133-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:13.230524+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79133-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.230658+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79147-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: insert into contact_properties (value, contact_id, name) values ($1, $2, $3)
2016-12-15T12:46:13.230665+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79147-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.231363+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79199-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:13.231371+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79199-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.231498+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79213-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: insert into contact_properties (value, contact_id, name) values ($1, $2, $3)
2016-12-15T12:46:13.231502+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79213-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.232170+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79265-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:13.232174+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79265-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.232311+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79279-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: insert into contact_properties (value, contact_id, name) values ($1, $2, $3)
2016-12-15T12:46:13.232314+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79279-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.232971+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79331-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:13.232974+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79331-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.233138+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79345-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: insert into contact_properties (value, contact_id, name) values ($1, $2, $3)
2016-12-15T12:46:13.233146+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79345-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.233854+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79397-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:13.233856+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79397-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.233973+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79411-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: insert into contact_properties (value, contact_id, name) values ($1, $2, $3)
2016-12-15T12:46:13.233978+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79411-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.234643+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79463-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:13.234647+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79463-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.234790+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79477-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: insert into contact_properties (value, contact_id, name) values ($1, $2, $3)
2016-12-15T12:46:13.234799+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79477-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.235426+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79529-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:13.235430+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79529-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.235583+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79543-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: insert into contact_properties (value, contact_id, name) values ($1, $2, $3)
2016-12-15T12:46:13.235587+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79543-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.236258+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79595-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:13.236266+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79595-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.236424+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79609-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: insert into contact_properties (value, contact_id, name) values ($1, $2, $3)
2016-12-15T12:46:13.236429+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79609-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.237049+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79661-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:13.237054+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79661-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.237188+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79675-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: insert into contact_properties (value, contact_id, name) values ($1, $2, $3)
2016-12-15T12:46:13.237191+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79675-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.237771+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79727-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:13.237782+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79727-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.237907+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79741-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: insert into contact_properties (value, contact_id, name) values ($1, $2, $3)
2016-12-15T12:46:13.237915+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79741-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.238572+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79793-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:13.238575+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79793-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.238715+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79807-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: insert into contact_properties (value, contact_id, name) values ($1, $2, $3)
2016-12-15T12:46:13.238719+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79807-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.239370+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79859-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:13.239373+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79859-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.239513+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79873-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: insert into accounts (address_id, adm_contact_id, aps_uuid, ext_system_id, bill_contact_id, brand_name, ccp_version, company_name, customers_quantity, l_country_code_default, l_language_code_default, l_variant_default, ext_account_id, c_time, l_country_code, l_language_code, l_variant, is_locked, note, owner_id, path, is_personal, rt_instance_id, tech_contact_id, account_type, account_id) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26)
2016-12-15T12:46:13.239518+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79873-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.240744+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79925-1] LOG: DB node id: 1 backend pid: 2946 statement: B message
2016-12-15T12:46:13.240748+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79925-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.240901+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79939-1] LOG: DB node id: 1 backend pid: 2946 statement: Execute: select user0_.user_id as user_id1_193_0_, identity1_.identity_id as identity1_102_1_, account2_.account_id as account_1_3_2_, user0_.scope_id as scope_i16_193_0_, user0_.aps_uuid as aps_uuid2_193_0_, user0_.contact_id as contact17_193_0_, user0_.date_format as date_for3_193_0_, user0_.deleting as deleting4_193_0_, user0_.dns_path_of_domain_part as dns_path5_193_0_, user0_.domain_id as domain_i6_193_0_, user0_.is_enabled as is_enabl7_193_0_, user0_.auth_identity_id as auth_id18_193_0_, user0_.l_country_code as l_countr8_193_0_, user0_.l_language_code as l_langua9_193_0_, user0_.l_variant as l_varia10_193_0_, user0_.locked_by as locked_11_193_0_, user0_.member_id as member_12_193_0_, user0_.sub_id as sub_id13_193_0_, user0_.timezone as timezon14_193_0_, user0_.type as type15_193_0_, identity1_.system_id as system_i4_102_1_,
2016-12-15T12:46:13.240906+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79939-2] identity1_.login as login2_102_1_, identity1_.login_lowered as login_lo3_102_1_, account2_.address_id as address21_3_2_, account2_.adm_contact_id as adm_con22_3_2_, account2_.aps_u
2016-12-15T12:46:13.240908+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79939-3] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.241819+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79997-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:13.241827+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79997-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.241969+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [80011-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: select localident0_.identity_id as identity1_109_0_, localident0_.pwd_expired as pwd_expi2_109_0_, localident0_.pwd_ctime as pwd_ctim3_109_0_, localident0_.pwd_hash as pwd_hash4_109_0_, localident0_.pwd_type as pwd_type5_109_0_ from local_identities localident0_ where localident0_.identity_id=$1
2016-12-15T12:46:13.241977+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [80011-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.242963+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [80069-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:13.242971+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [80069-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.243102+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [80083-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: select role0_.role_id as role_id1_160_, role0_.account_id as account_6_160_, role0_.description as descript2_160_, role0_.name as name3_160_, role0_.admin as admin4_160_, role0_.role_type as role_typ5_160_ from roles role0_ where role0_.account_id=$1 and role0_.name=$2 and (role0_.role_type=$3 or role0_.role_type=$4)
2016-12-15T12:46:13.243106+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [80083-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.243851+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [80133-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:13.243853+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [80133-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.244015+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [80147-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: insert into roles (account_id, description, name, admin, role_type) values ($1, $2, $3, $4, $5)
2016-12-15T12:46:13.244024+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [80147-2] #011RETURNING *
2016-12-15T12:46:13.244028+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [80147-3] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.245087+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [80207-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:13.245091+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [80207-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.245241+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [80221-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: select privilege0_.privilege_id as privileg1_144_, privilege0_.allow_locked as allow_lo2_144_, privilege0_.area as area3_144_, privilege0_.name as name4_144_, privilege0_1_.group_id as group_id1_143_ from privileges privilege0_ left outer join privilege_groups_content privilege0_1_ on privilege0_.privilege_id=privilege0_1_.privilege_id where privilege0_.name=$1
TagsNo tags attached.

Activities

supp_k

2016-12-16 04:59

reporter   ~0001225

One more strange case:
if we do pcp_detach_node for the SLAVE backend and then pcp_attach_node then the problem of "incorrect balancing" disappeares.

supp_k

2016-12-17 02:33

reporter   ~0001226

Please pay attention to the following information. In the following piece of code the requests are coming with interval less then ONE millisecond. It is possible that the 1st request is not complete yet. Does it make sense? Probably pgpool has not created the "flag" that balancing is prohibited after this moment?


2016-12-15T12:46:13.238715+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79807-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: insert into contact_properties (value, contact_id, name) values ($1, $2, $3)
2016-12-15T12:46:13.238719+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79807-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.239370+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79859-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:13.239373+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79859-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.239513+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79873-1] LOG: DB node id: 0 backend pid: 17007 statement: Execute: insert into accounts (address_id, adm_contact_id, aps_uuid, ext_system_id, bill_contact_id, brand_name, ccp_version, company_name, customers_quantity, l_country_code_default, l_language_code_default, l_variant_default, ext_account_id, c_time, l_country_code, l_language_code, l_variant, is_locked, note, owner_id, path, is_personal, rt_instance_id, tech_contact_id, account_type, account_id) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26)
2016-12-15T12:46:13.239518+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79873-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.240744+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79925-1] LOG: DB node id: 1 backend pid: 2946 statement: B message
2016-12-15T12:46:13.240748+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79925-2] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.240901+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79939-1] LOG: DB node id: 1 backend pid: 2946 statement: Execute: select user0_.user_id as user_id1_193_0_, identity1_.identity_id as identity1_102_1_, account2_.account_id as account_1_3_2_, user0_.scope_id as scope_i16_193_0_, user0_.aps_uuid as aps_uuid2_193_0_, user0_.contact_id as contact17_193_0_, user0_.date_format as date_for3_193_0_, user0_.deleting as deleting4_193_0_, user0_.dns_path_of_domain_part as dns_path5_193_0_, user0_.domain_id as domain_i6_193_0_, user0_.is_enabled as is_enabl7_193_0_, user0_.auth_identity_id as auth_id18_193_0_, user0_.l_country_code as l_countr8_193_0_, user0_.l_language_code as l_langua9_193_0_, user0_.l_variant as l_varia10_193_0_, user0_.locked_by as locked_11_193_0_, user0_.member_id as member_12_193_0_, user0_.sub_id as sub_id13_193_0_, user0_.timezone as timezon14_193_0_, user0_.type as type15_193_0_, identity1_.system_id as system_i4_102_1_,
2016-12-15T12:46:13.240906+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79939-2] identity1_.login as login2_102_1_, identity1_.login_lowered as login_lo3_102_1_, account2_.address_id as address21_3_2_, account2_.adm_contact_id as adm_con22_3_2_, account2_.aps_u
2016-12-15T12:46:13.240908+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79939-3] LOCATION: pool_proto_modules.c:3085
2016-12-15T12:46:13.241819+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79997-1] LOG: DB node id: 0 backend pid: 17007 statement: B message
2016-12-15T12:46:13.241827+03:00 srv-60b2f6e99b9b lesk plesk 192.168.102.240(59893) idle in transaction[16742]: [79997-2] LOCATION: pool_proto_modules.c:3085

serk

2016-12-19 06:10

reporter   ~0001231

Hi Pgpool developers!

Please consider the logic that reproduces the error.

In approximately 30% of cases the attached class (TestPgpool.java) reproduces the issue.
The target testing DB has 2 tables used for testing (tables.ddl). We did several modifications in pgpool code that show the cases (as we could detect it) when Pgpool does balancing instead of forwarding the SELECT request to MASTER backend after DDL statement was processed.

In several runnings one will see messages: "USE BACKEND: {1} THOUGH THE WRITE TRANSACTION IS ACTIVE!!!!!!!!" which say that previously the UPDATE statement was executed but the next SELECT method is forwarded to SLAVE – which should not happen.

We enforced balancing ratio in pgpool by proportion MASTER/SLAVE as 0/1000. Thus the majority of requests will be forwarded to SLAVE. The attached configuration assumes the 2 backends are synchronous and backend #0 is master

Notes: The attached DIFF file also contains patch for the issue 0000231.

issues_271_244.zip (4,453 bytes)

t-ishii

2016-12-20 17:47

developer   ~0001242

It seems there's no evidence that the SELECT in question was performed in an explicit transaction in your log.

supp_k

2016-12-20 18:07

reporter   ~0001243

Please have a look at the attached log file.
The issue is reproduced when time lag between requests is very small. This is why we create attached java multithread java test.

Regarding the attached log:
1) at 2016-12-20T12:02:38.200806 there was a write statement
2) Later (2016-12-20T12:02:38.202535) one can see that the new SELECT request was balanced to the node 1 which is read-only slave.

test7.log (139,187 bytes)

supp_k

2016-12-20 18:08

reporter   ~0001244

Multuthread java test:

import java.sql.*;
import java.util.*;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.locks.*;

public class TestPgpoolLoad {

    private static final int TEST_THREADS_NUM = 40;

    public static void main(String[] args) throws Exception {

        try (Connection conn = DriverManager.getConnection("jdbc:postgresql://10.28.78.103:5432/postgres", "postgres", "password")) {

            Lock lock = new ReentrantLock();

            
            conn.setAutoCommit(false);

            CyclicBarrier cb = new CyclicBarrier(TEST_THREADS_NUM);

            try (PreparedStatement stmtSelect = conn.prepareStatement("select * from read_table where id <= ?");
                PreparedStatement stmtWrite = conn.prepareStatement("update write_table set id = 1 where id = ?")) {

                List<Thread> thrs = new ArrayList<>();
                for (int i = 0; i < TEST_THREADS_NUM; i++) {
                    boolean isRead = i < TEST_THREADS_NUM - 1;
                    boolean isSleep = i < (TEST_THREADS_NUM / 2);
                    thrs.add(new TestReader(isSleep, cb, isRead, isRead ? stmtSelect : stmtWrite, lock));
                }

                for (Thread thr : thrs) {
                    thr.start();
                }

                Thread.sleep(1000);

                for (Thread thr : thrs) {
                    thr.join();
                }

                System.out.println("ok");
            } catch (Exception ex) {
                ex.printStackTrace();

                throw ex;
            }
            conn.commit();
        }
    }

    public static class TestReader extends Thread {

        private final CyclicBarrier cb;
        private final boolean readThread;
        private final PreparedStatement stmt;
        private final Lock lock;
        private final boolean isSleep;
        private final boolean isRead;

        TestReader(boolean isSleep, CyclicBarrier cb, boolean readThread, PreparedStatement readStmt, Lock lock) {
            this.isSleep = isSleep;
            this.cb = cb;
            this.isRead = readThread;
            this.readThread = readThread;
            this.stmt = readStmt;
            this.lock = lock;
        }

        @Override
        public void run() {
            try {
                cb.await();

                if (!isRead) {
                    Thread.sleep(1);
                }

                lock.lock();
                stmt.setInt(1, 1);
                stmt.execute();

            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                lock.unlock();
            }

        }

    }

}

supp_k

2016-12-20 18:11

reporter   ~0001245

Also there is the transaction start mentioned in the log:

2016-12-20T12:02:38.032665+03:00 srv-60b2f6e99b9b lesk plesk 10.192.36.35(43669) idle[30813]: [208-1] 2016-12-20 12:02:38: pid 30813:LOG: DB node id: 0 backend pid: 31091 statement: Parse: BEGIN
2016-12-20T12:02:38.032667+03:00 srv-60b2f6e99b9b lesk plesk 10.192.36.35(43669) idle[30813]: [208-2] 2016-12-20 12:02:38: pid 30813:LOCATION: pool_proto_modules.c:3089
2016-12-20T12:02:38.032800+03:00 srv-60b2f6e99b9b lesk plesk 10.192.36.35(43669) idle[30813]: [211-1] 2016-12-20 12:02:38: pid 30813:LOG: DB node id: 1 backend pid: 13293 statement: Parse: BEGIN
2016-12-20T12:02:38.032806+03:00 srv-60b2f6e99b9b lesk plesk 10.192.36.35(43669) idle[30813]: [211-2] 2016-12-20 12:02:38: pid 30813:LOCATION: pool_proto_modules.c:3089
2016-12-20T12:02:38.033030+03:00 srv-60b2f6e99b9b lesk plesk 10.192.36.35(43669) idle[30813]: [224-1] 2016-12-20 12:02:38: pid 30813:LOG: DB node id: 0 backend pid: 31091 statement: B message
2016-12-20T12:02:38.033032+03:00 srv-60b2f6e99b9b lesk plesk 10.192.36.35(43669) idle[30813]: [224-2] 2016-12-20 12:02:38: pid 30813:LOCATION: pool_proto_modules.c:3089
2016-12-20T12:02:38.033178+03:00 srv-60b2f6e99b9b lesk plesk 10.192.36.35(43669) idle[30813]: [227-1] 2016-12-20 12:02:38: pid 30813:LOG: DB node id: 1 backend pid: 13293 statement: B message
2016-12-20T12:02:38.033179+03:00 srv-60b2f6e99b9b lesk plesk 10.192.36.35(43669) idle[30813]: [227-2] 2016-12-20 12:02:38: pid 30813:LOCATION: pool_proto_modules.c:3089
2016-12-20T12:02:38.033435+03:00 srv-60b2f6e99b9b lesk plesk 10.192.36.35(43669) idle[30813]: [239-1] 2016-12-20 12:02:38: pid 30813:LOG: DB node id: 0 backend pid: 31091 statement: Execute: BEGIN
2016-12-20T12:02:38.033437+03:00 srv-60b2f6e99b9b lesk plesk 10.192.36.35(43669) idle[30813]: [239-2] 2016-12-20 12:02:38: pid 30813:LOCATION: pool_proto_modules.c:3089
2016-12-20T12:02:38.033577+03:00 srv-60b2f6e99b9b lesk plesk 10.192.36.35(43669) idle[30813]: [244-1] 2016-12-20 12:02:38: pid 30813:LOG: DB node id: 1 backend pid: 13293 statement: Execute: BEGIN
2016-12-20T12:02:38.033578+03:00 srv-60b2f6e99b9b lesk plesk 10.192.36.35(43669) idle[30813]: [244-2] 2016-12-20 12:02:38: pid 30813:LOCATION: pool_proto_modules.c:3089

t-ishii

2016-12-20 18:15

developer   ~0001247

But the process id is different (30813 vs. 16742). So I can not confirm that the SELECT and DML are in a same transaction.

supp_k

2016-12-20 18:24

reporter   ~0001248

It seems you missed the freshest attached log file. Please have a look at the freshest "test7.log".

Also the pgpool settings are:

backend_hostname0 = 'a.db.node'
backend_port0 = 15432
backend_weight0 = 0
backend_data_directory0 = '/var/lib/pgsql/9.5/data'
backend_flag0 = 'ALLOW_TO_FAILOVER'

backend_hostname1 = 'b.db.node'
backend_port1 = 15432
backend_weight1 = 1000
backend_data_directory1 = '/var/lib/pgsql/9.5/data'
backend_flag1 = 'ALLOW_TO_FAILOVER'


enable_pool_hba = on
pool_passwd = 'pool_passwd'
authentication_timeout = 60
ssl = off




num_init_children = 240
max_pool = 1
child_life_time = 0
child_max_connections = 1
connection_life_time = 0
client_idle_limit = 0




log_destination = 'syslog'
log_line_prefix = '%t: pid %p:'
log_connections = on
log_hostname = on
log_statement = on
log_per_node_statement = on
log_standby_delay = 'none'
syslog_facility = 'LOCAL0'
syslog_ident = 'pgpool'

debug_level = 5
log_error_verbosity = verbose
client_min_messages = warning
log_min_messages = debug1

supp_k

2016-12-21 05:42

reporter   ~0001250

I have updated the test scenario. Please look at the source code below.

There are N-1 threads that read Value which was previously inserted by the Single thread. The N threads start simultaneously in front of the barrier. There is a semaphore (ReentrantLock) that prevents logic from parallel queries but we still have very high concurrency mode and minimal time period between receiving response and sending of the next query. After the INSERT threads completes insert operation it sets valatile flag (writePerformed) and all SELECT threads start trying to fetch the value. If the value was not returned then it definely means the SELECT was balanced to the node which doesn't contain the value inserted so far.

Try to launch the test several times and you will see the exception in stdout. In our environment we reproduce the issue in 50%.

If you wish I can create fully functional test case: debug it in the local QA environment and provide you so that you will be able to use it in your CI solution.



package com;

import java.sql.*;
import java.util.*;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.locks.*;

public class TestPgpool {

    private static final int TEST_THREADS_NUM = 40;
    static volatile boolean writePerformed = false; // variable will be set into TRUE when the write thread did INSERT operation

    public static void main(String[] args) throws Exception {
        
        Random random = new Random(System.currentTimeMillis());

        try (Connection conn = DriverManager.getConnection("jdbc:postgresql://10.28.78.103/plesk", "plesk", "jjHCF7sGHq")) {

            Lock lock = new ReentrantLock();

            conn.setAutoCommit(false); // here we define explicit transaction
            CyclicBarrier cb = new CyclicBarrier(TEST_THREADS_NUM);

            final int valueTest = random.nextInt();
            System.out.println("test value:" + valueTest);

            try (PreparedStatement stmtSelect = conn.prepareStatement("select * from write_table where id = ?");
                    PreparedStatement stmtWrite = conn.prepareStatement("insert into write_table(id) values(?)")) {

                List<Thread> thrs = new ArrayList<>();
                for (int i = 0; i < TEST_THREADS_NUM; i++) {
                    boolean isRead = i > 0;
                    //boolean isSleep = i < (TEST_THREADS_NUM / 2);
                    thrs.add(new TestReader(valueTest, cb, isRead, isRead ? stmtSelect : stmtWrite, lock));
                }

                for (Thread thr : thrs) {
                    thr.start();
                }

                Thread.sleep(1000);

                for (Thread thr : thrs) {
                    thr.join();
                }

                
            } catch (Exception ex) {
                ex.printStackTrace();

                throw ex;
            }
            conn.commit();
        }
    }

    public static class TestReader extends Thread {

        private final CyclicBarrier cb;

        private final PreparedStatement stmt;
        private final Lock lock;

        private final boolean isRead;
        private final int valueTest;

        TestReader(int valueTest, CyclicBarrier cb, boolean readThread, PreparedStatement stmt, Lock lock) {
            this.valueTest = valueTest;
            this.cb = cb;
            this.isRead = readThread;
            this.stmt = stmt;
            this.lock = lock;
        }

        @Override
        public void run() {
            try {
                cb.await();

                lock.lock();
                stmt.setInt(1, valueTest);
                if (isRead) {
                    try (ResultSet rs = stmt.executeQuery()) {
                        if (writePerformed) {
                            if (!rs.next()) {
                                throw new RuntimeException("The request was balanced. The value was inserted but the the following select returned nothing");
                            } else {
                                int val = rs.getInt(1);
                                System.out.println("recevied value:" + val);
                            }
                        } else {
                            // write was not performed so one should not expect the value is in the database
                        }
                    }
                } else {
                    stmt.execute();
                    writePerformed = true; // set mark the write performed. since this moment the READ threads will be trying to read the value.
                }

            } catch (Exception ex) {
                ex.printStackTrace();
                throw new RuntimeException(ex);
            } finally {
                lock.unlock();
            }

        }

    }

}

t-ishii

2016-12-21 13:39

developer   ~0001254

I have took a look at test7.log and confirmed the issue. Looking into the source code now...

t-ishii

2016-12-21 14:24

developer   ~0001255

Is it possible to provide a Java debug trace when the issue occurs?

Pgpool-II decides the transaction is writing when a write query is sent and backend returns the "ready for query" message to Pgpool-II. My guess is, client sends parse message for a select *before* Pgpool-II receives the "ready for query message". If it actually happens, the select can be sent to standby because the writing flag is not set yet.

supp_k

2016-12-21 15:18

reporter   ~0001256

Yes, please have a look at the attached test10.zip file. It contains pgpool log and the corresponding java/jdbc log.

test10.zip (15,231 bytes)

supp_k

2016-12-21 18:11

reporter   ~0001257

'Probably it makes sense to set the flag on the receiving of a WRITE query. In this case the pgpool's state machine is free of the problem and even the possibility (in this particular case only).

t-ishii

2016-12-21 21:35

developer   ~0001258

It's not that simple. By looking at the jdbc log, I found that:

1) "prepare select" is was sent to node 1
2) write query was sent to node 0
3) bind/execute request for 1) was coming. They were sent to node 1 (oops!)

If we simply redirect 3) to node 0, it will fail because there's no prepared statement for the select on node 0. So the only solution is sending prepare select to node 0 before 3). Actually the code for the logic was in 3.4. I seem to forgot to port it to 3.5 or later:-)

supp_k

2016-12-21 21:54

reporter   ~0001259

Still I don't understand the real cause of the problem because I'm not an expert in Pgpool's state machine and Postgres protocol.

But from my point of view it seems the Parse message should be sent to all backends if load balancing is activated. This is True becase if for instance there are no DML requests Pgpool still has this right to balance requests.

Right now from the logs I see the Parse is forwarded to the node 1 (from the test10.log) and later after the INSERT is complete Pgpool still forwards BIND/EXECUTE for the Selects to the node 1..... this is what I see.

But I see you already have the solution which was forgotten in the past ))) It seems the lost code should be restored )))

t-ishii

2016-12-21 23:56

developer   ~0001260

> But from my point of view it seems the Parse message should be sent to all backends if load balancing is activated.

Actually we did this long time ago and people blamed us because that way was ineffective and slow.

supp_k

2016-12-22 00:04

reporter   ~0001261

Ok, probably there are some reasons exist. But we still need to fix the problem.

supp_k

2016-12-22 18:24

reporter   ~0001262

Would it be possible to restore the lost code? We still facing problems because of the issue (((

elepuser

2016-12-23 20:57

reporter   ~0001263

Hi Guys,

recently under not very high load we faced exactly the same issue. It resulted in data loses and integrity problems.
The program logic was inserting data but than occasionally it failed to fetch the same data within the same transaction. First moment we decided that postgres violates the ACID concept, but later revealed problem in pgpool's behaviour (((

I see there is the fix for the problem exists. Can you please apply it please ASAP? We look forward to obtain the fixed build.

t-ishii

2016-12-26 15:06

developer   ~0001264

> Would it be possible to restore the lost code?

Of course. But it would not be straight forward because of the message handling logic difference between Pgpool-II 3.5 and pre 3.5. Also I need a tool to send arbitrary message stream for testing/debugging. I'm working on developing it...

supp_k

2016-12-28 20:01

reporter   ~0001271

Do you have any estimations when you will be able to provide the patch for testing? We look forward to obtain it.

t-ishii

2017-01-04 18:06

developer   ~0001277

From December 29 to January 3rd my office was closed because of national holidays. I would like to finish the work by the end of this month.

supp_k

2017-01-04 19:17

reporter   ~0001278

Happy new year and Merry Christmas Mr. T-Ishii!
We look forward to obtain the fixed version. Lots of our activities are delayed because of this problem. If possible we'd like to participate in testing as soon as you are ready to release the fix. Probably if we participate you will be able to accomplish with the fix faster than the end of the month with a better quality.

elepuser

2017-01-10 05:15

reporter   ~0001286

Yes, is it possible to provide the fix earlier than the end of the month?
Recently we faced several more issues of this type. Every time it takes much efforts to rollback the business logic and restore data ((( We are also ready to participate in the testing.

PLEASE if possible provide the fix faster!

supp_k

2017-01-25 19:21

reporter   ~0001295

Dear Tatsui Ishii,

do you have any information about estimated timelines when you will be able to provide the fix? We are experiencing a lot of troubles because of the issue (((

Please is possible provide the fix or the information wehn it will be available.
Thank you!

supp_k

2017-01-25 19:22

reporter   ~0001296

...I'm sorry for the mistake in your name!!

elepuser

2017-01-29 07:35

reporter   ~0001310

Any updates on the issue?

From the message thread I see there is a fundamental problem in pgpool(((
The attached by supp_k test shows that the problem easily reproduced when the time lag between response from pgpool and the next request is minimal. Also the PARSE message is being sent to one node only. Just wonder how do you guys are hoing to fix the trouble if you dont want to send the parse request to all backends?

t-ishii

2017-01-30 18:05

developer   ~0001312

We don't need to send PARSE message to all backends.

If we know that a write query was sent in the transaction, then succeeding SELECT is sent to the primary node. If we already send a parse message for the SELECT to other than the primary, then we will send the parse message to the primary.

I'm working on the implementation in this direction now.

supp_k

2017-02-06 16:40

reporter   ~0001329

Hi T-Ishii,
are there any updates about this problem?
We are ready to start testing it.

elepuser

2017-02-07 17:48

reporter   ~0001336

Guys,

any updates on the issue? Can you please provide the fix?

t-ishii

2017-02-08 10:27

developer   ~0001337

Still working on it. However I'm pretty busy now and I'm not sure when the work will finish.

t-ishii

2017-02-14 16:12

developer   ~0001341

I'm getting a slight progress. With over 800 lines diff, now in certain case the issue seems to be fixed. Here is the case described by pgproto (https://github.com/tatsuo-ishii/pgproto).

However I am experiencing a regression with our regression test suit, and definitely I need to work on it.

# Test data for bug271.
# In an explicit transaction, SELECT is parsed, DML issued, same SELECT/bind issued.
# In this case Pgpool-II should parse resend to primary node.
#

# Create test table
'Q' "DROP TABLE IF EXISTS pgproto_test1"
'Y'
'Q' "CREATE TABLE pgproto_test1(i INT)"
'Y'

# Start a transaction
'P' "S1" "BEGIN" 0
'B' "" "S1" 0 0 0
'E' "" 0
'C' 'S' "S1"

# Issue SELECT
'P' "S2" "SELECT 1" 0
#'B' "" "S1" 0 0 0
#'E' "" 0
#'C' 'S' "S1"

# Issue INSERT
'P' "S1" "INSERT INTO pgproto_test1 VALUES(1)" 0
'B' "" "S1" 0 0 0
'E' "" 0
'C' 'S' "S1"
#'S'
#'Y'

# Issue SELECT. This should be sent to primary node.
#'P' "S1" "SELECT 1" 0
'B' "" "S2" 0 0 0
'E' "" 0
'C' 'S' "S2"

# Issue COMMIT
'P' "S1" "COMMIT" 0
'B' "" "S1" 0 0 0
'E' "" 0
'C' 'S' "S1"
'S'
'Y'
'X'

t-ishii

2017-02-23 15:20

developer   ~0001353

I finally managed to create the initial cut of the patch I was talking about. The patch is relatively large (1652 lines of diff) and I would appreciate if you guys test it out. The patch is attached as "bug271.diff". It was originally created against master branch, also can be applied to 3.6 and 3.5 stable trees as well.

t-ishii

2017-02-23 15:20

developer  

bug271.diff (54,508 bytes)
diff --git a/src/auth/pool_auth.c b/src/auth/pool_auth.c
index ca503ae..434511b 100644
--- a/src/auth/pool_auth.c
+++ b/src/auth/pool_auth.c
@@ -1112,6 +1112,7 @@ int pool_read_message_length(POOL_CONNECTION_POOL *cp)
 
 	for (i=0;i<NUM_BACKENDS;i++)
 	{
+#ifdef NOT_USED
 		if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i) || use_sync_map == POOL_SYNC_MAP_EMPTY)
 		{
 			continue;
@@ -1121,6 +1122,11 @@ int pool_read_message_length(POOL_CONNECTION_POOL *cp)
 		{
 			continue;
 		}
+#endif
+		if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
+		{
+			continue;
+		}
 
 		pool_read(CONNECTION(cp, i), &length, sizeof(length));
 
diff --git a/src/context/pool_query_context.c b/src/context/pool_query_context.c
index a87db43..717f708 100644
--- a/src/context/pool_query_context.c
+++ b/src/context/pool_query_context.c
@@ -6,7 +6,7 @@
  * pgpool: a language independent connection pool server for PostgreSQL 
  * written by Tatsuo Ishii
  *
- * Copyright (c) 2003-2016	PgPool Global Development Group
+ * Copyright (c) 2003-2017	PgPool Global Development Group
  *
  * Permission to use, copy, modify, and distribute this software and
  * its documentation for any purpose and without fee is hereby
@@ -86,6 +86,10 @@ void pool_query_context_destroy(POOL_QUERY_CONTEXT *query_context)
 	if (query_context)
 	{
 		MemoryContext memory_context = query_context->memory_context;
+
+		ereport(LOG,
+				(errmsg("pool_query_context_destroy: query context:%x", query_context)));
+
 		session_context = pool_get_session_context(false);
 		pool_unset_query_in_progress();
 		if (!pool_is_command_success() && query_context->pg_terminate_backend_conn)
@@ -102,6 +106,41 @@ void pool_query_context_destroy(POOL_QUERY_CONTEXT *query_context)
 	}
 }
 
+#ifdef NOT_USED
+/*
+ * Perform deep copy of given query context.
+ */
+POOL_QUERY_CONTEXT *pool_query_context_copy(POOL_QUERY_CONTEXT *query_context)
+{
+	MemoryContext old_context;
+	POOL_QUERY_CONTEXT *qc;
+	int len;
+
+	qc = pool_init_query_context();
+	memcpy(qc, query_context, sizeof(POOL_QUERY_CONTEXT));
+
+	old_context = MemoryContextSwitchTo(query_context->memory_context);
+
+	if (query_context->original_query)
+	{
+		len = strlen(query_context->original_query)+1;
+		qc->originarl_query = palloc(len);
+		memcpy(qc->originarl_query, query_context->original_query, len);
+	}
+
+	if (query_context->rewritten_query)
+	{
+		len = strlen(query_context->rewritten_query)+1;
+		qc->originarl_query = palloc(len);
+		memcpy(qc->originarl_query, query_context->rewritten_query, len);
+	}
+
+	if (query_context->parse_tree)
+	{
+	}
+}
+#endif
+
 /*
  * Start query
  */
@@ -326,6 +365,12 @@ int pool_virtual_master_db_node_id(void)
 	 */
 	if (MASTER_SLAVE)
 	{
+		int node_id;
+
+		node_id = pool_get_preferred_master_node_id();
+		if (node_id >= 0)
+			return node_id;
+
 		return PRIMARY_NODE_ID;
 	}
 	return my_master_node_id;
@@ -602,9 +647,9 @@ void pool_where_to_send(POOL_QUERY_CONTEXT *query_context, char *query, Node *no
 	{
 		POOL_SENT_MESSAGE *msg;
 
-		msg = pool_get_sent_message('Q', ((ExecuteStmt *)node)->name);
+		msg = pool_get_sent_message('Q', ((ExecuteStmt *)node)->name, POOL_SENT_MESSAGE_CREATED);
 		if (!msg)
-			msg = pool_get_sent_message('P', ((ExecuteStmt *)node)->name);
+			msg = pool_get_sent_message('P', ((ExecuteStmt *)node)->name, POOL_SENT_MESSAGE_CREATED);
 		if (msg)
 			pool_copy_prep_where(msg->query_context->where_to_send,
 								 query_context->where_to_send);
@@ -920,11 +965,11 @@ POOL_STATUS pool_extended_send_and_wait(POOL_QUERY_CONTEXT *query_context,
 
 		send_extended_protocol_message(backend, i, kind, str_len, str);
 
-		if ((*kind == 'E' || *kind == 'C') && STREAM)
+		if ((*kind == 'P' || *kind == 'E' || *kind == 'C') && STREAM)
 		{
 			/*
 			 * Send flush message to backend to make sure that we get any response
-			 * from backend in Sream replication mode.
+			 * from backend in Streaming replication mode.
 			 */
 
 			POOL_CONNECTION *cp = CONNECTION(backend, i);
@@ -1359,9 +1404,9 @@ void where_to_send_deallocate(POOL_QUERY_CONTEXT *query_context, Node *node)
 	}
 	else
 	{
-		msg = pool_get_sent_message('Q', d->name);
+		msg = pool_get_sent_message('Q', d->name, POOL_SENT_MESSAGE_CREATED);
 		if (!msg)
-			msg = pool_get_sent_message('P', d->name);
+			msg = pool_get_sent_message('P', d->name, POOL_SENT_MESSAGE_CREATED);
 		if (msg)
 		{
 			/* Inherit same map from PREPARE or PARSE */
diff --git a/src/context/pool_session_context.c b/src/context/pool_session_context.c
index 773ff78..daa1346 100644
--- a/src/context/pool_session_context.c
+++ b/src/context/pool_session_context.c
@@ -6,7 +6,7 @@
  * pgpool: a language independent connection pool server for PostgreSQL 
  * written by Tatsuo Ishii
  *
- * Copyright (c) 2003-2016	PgPool Global Development Group
+ * Copyright (c) 2003-2017	PgPool Global Development Group
  *
  * Permission to use, copy, modify, and distribute this software and
  * its documentation for any purpose and without fee is hereby
@@ -30,6 +30,7 @@
 #include "utils/elog.h"
 #include "pool_config.h"
 #include "context/pool_session_context.h"
+#include "protocol/pool_proto_modules.h"
 
 static POOL_SESSION_CONTEXT session_context_d;
 static POOL_SESSION_CONTEXT *session_context = NULL;
@@ -149,6 +150,12 @@ void pool_init_session_context(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *
 
 	/* Initialize pending message list */
 	pool_pending_messages_init();
+
+	/* Initialize previous pending message */
+	pool_pending_message_reset_previous_message();
+
+	/* Initialize preferred master node id */
+	pool_reset_preferred_master_node_id();
 }
 
 /*
@@ -439,13 +446,14 @@ void pool_clear_sent_message_list(void)
 static void dump_sent_message(char *caller, POOL_SENT_MESSAGE *m)
 {
 	ereport(DEBUG1,
-			(errmsg("called by %s: sent message: address: %p kind: %c name: =%s=", caller, m, m->kind, m->name)));
+			(errmsg("called by %s: sent message: address: %p kind: %c name: =%s= state:%d",
+					caller, m, m->kind, m->name, m->state)));
 }
 
 /*
- * Create a sent message
- * kind: one of 'P':Parse, 'B':Bind or'Q':Query(PREPARE)
- * len: message length that is not network byte order
+ * Create a sent message.
+ * kind: one of 'P':Parse, 'B':Bind or 'Q':Query(PREPARE)
+ * len: message length in host order
  * contents: message contents
  * num_tsparams: number of timestamp parameters
  * name: prepared statement name or portal name
@@ -467,6 +475,7 @@ POOL_SENT_MESSAGE *pool_create_sent_message(char kind, int len, char *contents,
 	msg->len = len;
 	msg->contents = palloc(len);
 	memcpy(msg->contents, contents, len);
+	msg->state = POOL_SENT_MESSAGE_CREATED;
 	msg->num_tsparams = num_tsparams;
 	msg->name = pstrdup(name);
 	msg->query_context = query_context;
@@ -493,7 +502,7 @@ void pool_add_sent_message(POOL_SENT_MESSAGE *message)
 		return;
 	}
 
-	old_msg = pool_get_sent_message(message->kind, message->name);
+	old_msg = pool_get_sent_message(message->kind, message->name, POOL_SENT_MESSAGE_CREATED);
 
 	if (old_msg == message)
 	{
@@ -545,7 +554,7 @@ void pool_add_sent_message(POOL_SENT_MESSAGE *message)
 /*
  * Get a sent message
  */
-POOL_SENT_MESSAGE *pool_get_sent_message(char kind, const char *name)
+POOL_SENT_MESSAGE *pool_get_sent_message(char kind, const char *name, POOL_SENT_MESSAGE_STATE state)
 {
 	int i;
 	POOL_SENT_MESSAGE_LIST *msglist;
@@ -555,7 +564,8 @@ POOL_SENT_MESSAGE *pool_get_sent_message(char kind, const char *name)
 	for (i = 0; i < msglist->size; i++)
 	{
 		if (msglist->sent_messages[i]->kind == kind &&
-			!strcmp(msglist->sent_messages[i]->name, name))
+			!strcmp(msglist->sent_messages[i]->name, name) &&
+			msglist->sent_messages[i]->state == state)
 			return msglist->sent_messages[i];
 	}
 
@@ -563,6 +573,17 @@ POOL_SENT_MESSAGE *pool_get_sent_message(char kind, const char *name)
 }
 
 /*
+ * Set message state to POOL_SENT_MESSAGE_STATE to POOL_SENT_MESSAGE_CLOSED.
+ */
+void pool_set_sent_message_state(POOL_SENT_MESSAGE *message)
+{
+	ereport(LOG,
+			(errmsg("pool_set_sent_message_state: name:%s kind:%c previous state: %d",
+					message->name, message->kind, message->state)));
+	message->state = POOL_SENT_MESSAGE_CLOSED;
+}
+
+/*
  * We don't have a write query in this transaction yet.
  */
 void pool_unset_writing_transaction(void)
@@ -863,6 +884,8 @@ bool can_query_context_destroy(POOL_QUERY_CONTEXT *qc)
 	int i;
 	int count = 0;
 	POOL_SENT_MESSAGE_LIST *msglist;
+	ListCell   *cell;
+	ListCell   *next;
 
 	msglist = &session_context->message_list;
 
@@ -875,11 +898,32 @@ bool can_query_context_destroy(POOL_QUERY_CONTEXT *qc)
 	{
 		ereport(DEBUG1,
 			(errmsg("checking if query context can be safely destroyed"),
-				 errdetail("query context %p is still used %d times. query:\"%s\"",
+				 errdetail("query context %p is still used %d times in sent message list. query:\"%s\"",
 						   qc, count,qc->original_query)));
 		return false;
 	}
 
+	count = 0;
+
+	for (cell = list_head(session_context->pending_messages); cell; cell = next)
+	{
+		POOL_PENDING_MESSAGE *message = (POOL_PENDING_MESSAGE *) lfirst(cell);
+
+		if (message->query_context == qc)
+		{
+			count++;
+		}
+		next = lnext(cell);
+	}
+
+	if (count >= 1)
+	{
+		ereport(DEBUG1,
+				(errmsg("checking if query context can be safely destroyed"),
+				 errdetail("query context %p is still used %d times in pending message list", qc, count, qc->original_query)));
+		return false;
+	}
+
 	return true;
 }
 
@@ -1094,6 +1138,14 @@ POOL_PENDING_MESSAGE *pool_pending_messages_create(char kind, int len, char *con
 		msg->type = POOL_BIND;
 		break;
 
+		case 'E':
+		msg->type = POOL_EXECUTE;
+		break;
+
+		case 'D':
+		msg->type = POOL_DESCRIBE;
+		break;
+
 		case 'C':
 		msg->type = POOL_CLOSE;
 		break;
@@ -1107,6 +1159,11 @@ POOL_PENDING_MESSAGE *pool_pending_messages_create(char kind, int len, char *con
 	msg->contents = palloc(len);
 	memcpy(msg->contents, contents, len);
 	msg->contents_len = len;
+	msg->query[0] = '\0';
+	msg->statement[0] = '\0';
+	msg->portal[0] = '\0';
+	msg->is_rows_returned = false;
+	msg->node_ids[0] = msg->node_ids[1] = -1;
 
 	MemoryContextSwitchTo(old_context);
 
@@ -1114,7 +1171,46 @@ POOL_PENDING_MESSAGE *pool_pending_messages_create(char kind, int len, char *con
 }
 
 /*
- * Add one message
+ * Set node_ids field of message which indicates which backend nodes the
+ * message was sent.
+ */
+void pool_pending_messages_dest_set(POOL_PENDING_MESSAGE* message, POOL_QUERY_CONTEXT *query_context)
+{
+	int i;
+	int j = 0;
+
+	for (i=0;i<MAX_NUM_BACKENDS;i++)
+	{
+		if (query_context->where_to_send[i])
+		{
+			if (j > 1)
+			{
+				ereport(ERROR,
+						(errmsg("pool_pending_messages_dest_set: node ids exceeds 2")));
+				return;
+			}
+			message->node_ids[j++] = i;
+		}
+	}
+
+	message->query_context = query_context;
+
+	if (is_select_query(query_context->parse_tree, query_context->original_query))
+	{
+		message->is_rows_returned = true;
+	}
+}
+
+/*
+ * Set query field of message.
+ */
+void pool_pending_messages_query_set(POOL_PENDING_MESSAGE* message, POOL_QUERY_CONTEXT *query_context)
+{
+	StrNCpy(message->query, query_context->original_query, sizeof(message->query));
+}
+
+/*
+ * Add one message to the tail of the list
  */
 void pool_pending_message_add(POOL_PENDING_MESSAGE* message)
 {
@@ -1125,9 +1221,41 @@ void pool_pending_message_add(POOL_PENDING_MESSAGE* message)
 		ereport(ERROR,
 				(errmsg("pool_pending_message_add: session context is not initialized")));
 
-	ereport(DEBUG1,
-			(errmsg("pool_pending_message_add: message type:%d message len:%d",
-					message->type, message->contents_len)));
+	switch (message->type)
+	{
+		case POOL_PARSE:
+			StrNCpy(message->statement, message->contents, sizeof(message->statement));
+			StrNCpy(message->query, message->contents+strlen(message->contents)+1, sizeof(message->query));
+			break;
+
+		case POOL_BIND:
+			StrNCpy(message->portal, message->contents, sizeof(message->portal));
+			StrNCpy(message->statement, message->contents+strlen(message->contents)+1, sizeof(message->statement));
+			break;
+
+		case POOL_EXECUTE:
+			StrNCpy(message->portal, message->contents, sizeof(message->portal));
+			break;
+
+		case POOL_CLOSE:
+		case POOL_DESCRIBE:
+			if (*message->contents == 'S')
+				StrNCpy(message->statement, message->contents+1, sizeof(message->statement));
+			else
+				StrNCpy(message->portal, message->contents+1, sizeof(message->portal));
+			break;
+
+		default:
+			ereport(ERROR,
+					(errmsg("pool_pending_message_add: unknown message type:%d", message->type)));
+			return;
+			break;
+	}
+
+	ereport(LOG,
+			(errmsg("pool_pending_message_add: message type:%d message len:%d query:%s statement:%s portal:%s node_ids[0]:%d node_ids[1]:%d",
+					message->type, message->contents_len, message->query, message->statement, message->portal,
+					message->node_ids[0], message->node_ids[1])));
 
 	old_context = MemoryContextSwitchTo(session_context->memory_context);
 	msg = copy_pending_message(message);
@@ -1136,6 +1264,79 @@ void pool_pending_message_add(POOL_PENDING_MESSAGE* message)
 }
 
 /*
+ * Return the message from the head of the list.  If the list is not empty, a
+ * copy of the message is returned. If the list is empty, returns NULL.
+ */
+POOL_PENDING_MESSAGE *pool_pending_message_head_message(void)
+{
+	ListCell   *cell;
+	POOL_PENDING_MESSAGE *message;
+	POOL_PENDING_MESSAGE *m;
+	MemoryContext old_context;
+
+	if (!session_context)
+		ereport(ERROR,
+				(errmsg("pool_pending_message_head_message: session context is not initialized")));
+
+	if (list_length(session_context->pending_messages) == 0)
+	{
+		return NULL;
+	}
+
+	old_context = MemoryContextSwitchTo(session_context->memory_context);
+
+	cell = list_head(session_context->pending_messages);
+	m = (POOL_PENDING_MESSAGE *) lfirst(cell);
+	message = copy_pending_message(m);
+	ereport(LOG,
+			(errmsg("pool_pending_message_head_message: message type:%d message len:%d query:%s statement:%s portal:%s node_ids[0]:%d node_ids[1]:%d",
+					message->type, message->contents_len, message->query, message->statement, message->portal,
+					message->node_ids[0], message->node_ids[1])));
+
+	MemoryContextSwitchTo(old_context);
+	return message;
+}
+
+
+/*
+ * Remove one message from the head of the list.  If the list is not empty, a
+ * copy of the message is returned and the message is removed the message
+ * list. If the list is empty, returns NULL.
+ */
+POOL_PENDING_MESSAGE *pool_pending_message_pull_out(void)
+{
+	ListCell   *cell;
+	POOL_PENDING_MESSAGE *message;
+	POOL_PENDING_MESSAGE *m;
+	MemoryContext old_context;
+
+	if (!session_context)
+		ereport(ERROR,
+				(errmsg("pool_pending_message_pull_out: session context is not initialized")));
+
+	if (list_length(session_context->pending_messages) == 0)
+	{
+		return NULL;
+	}
+
+	old_context = MemoryContextSwitchTo(session_context->memory_context);
+
+	cell = list_head(session_context->pending_messages);
+	m = (POOL_PENDING_MESSAGE *) lfirst(cell);
+	message = copy_pending_message(m);
+	ereport(LOG,
+			(errmsg("pool_pending_message_pull_out: message type:%d message len:%d query:%s statement:%s portal:%s node_ids[0]:%d node_ids[1]:%d",
+					message->type, message->contents_len, message->query, message->statement, message->portal,
+					message->node_ids[0], message->node_ids[1])));
+
+	session_context->pending_messages =
+		list_delete_cell(session_context->pending_messages, cell, NULL);
+
+	MemoryContextSwitchTo(old_context);
+	return message;
+}
+
+/*
  * Try to find the first message specified by the message type in the message
  * list. If found, a copy of the message is returned and the message is
  * removed the message list. If not, returns NULL.
@@ -1199,7 +1400,8 @@ char *pool_get_close_message_name(POOL_PENDING_MESSAGE *msg)
 }
 
 /*
- * Perform deep copy of POOL_PENDING_MESSAGE object in the current memory context.
+ * Perform deep copy of POOL_PENDING_MESSAGE object in the current memory
+ * context except the query context.
  */
 static POOL_PENDING_MESSAGE *copy_pending_message(POOL_PENDING_MESSAGE *message)
 {
@@ -1214,6 +1416,90 @@ static POOL_PENDING_MESSAGE *copy_pending_message(POOL_PENDING_MESSAGE *message)
 }
 
 /*
+ * Reset previous message.
+ */
+void pool_pending_message_reset_previous_message(void)
+{
+	if (!session_context)
+	{
+		ereport(ERROR,
+				(errmsg("pool_pending_message_reset_previous_message: session context is not initialized")));
+		return;
+	}
+	session_context->previous_message = NULL;
+}
+
+/*
+ * Set previous message.
+ */
+void pool_pending_message_set_previous_message(POOL_PENDING_MESSAGE *message)
+{
+	if (!session_context)
+	{
+		ereport(ERROR,
+				(errmsg("pool_pending_message_set_previous_message: session context is not initialized")));
+		return;
+	}
+	session_context->previous_message = message;
+}
+
+/*
+ * Get previous message.
+ */
+POOL_PENDING_MESSAGE *pool_pending_message_get_previous_message(void)
+{
+	if (!session_context)
+	{
+		ereport(ERROR,
+				(errmsg("pool_pending_message_get_previous_message: session context is not initialized")));
+		return NULL;
+	}
+	return session_context->previous_message;
+}
+
+/*
+ * Return true if there's any pending message.
+ */
+bool pool_pending_message_exists(void)
+{
+	return list_length(session_context->pending_messages) > 0;
+}
+
+/*
+ * Dump whole pending message list
+ */
+void dump_pending_message(void)
+{
+	ListCell   *cell;
+	ListCell   *next;
+
+	if (!session_context)
+	{
+		ereport(ERROR,
+				(errmsg("dump_pending_message: session context is not initialized")));
+		return;
+	}
+
+	ereport(LOG,
+			(errmsg("start dumping pending message list")));
+
+	for (cell = list_head(session_context->pending_messages); cell; cell = next)
+	{
+		POOL_PENDING_MESSAGE *message = (POOL_PENDING_MESSAGE *) lfirst(cell);
+
+		ereport(LOG,
+				(errmsg("pool_pending_message_dump: message type:%d message len:%d query:%s statement:%s portal:%s node_ids[0]:%d node_ids[1]:%d",
+						message->type, message->contents_len, message->query, message->statement, message->portal,
+						message->node_ids[0], message->node_ids[1])));
+
+		next = lnext(cell);
+	}
+
+	ereport(LOG,
+			(errmsg("end dumping pending message list")));
+}
+
+/*
  * Set protocol major version number
  */
 void pool_set_major_version(int major)
@@ -1258,3 +1544,30 @@ int pool_get_minor_version(void)
 	}
 	return 0;
 }
+
+/*
+ * Set preferred "master" node id.
+ * Only used for SimpleForwardToFrontend.
+ */
+void pool_set_preferred_master_node_id(int node_id)
+{
+	session_context->preferred_master_node_id = node_id;
+}
+
+/*
+ * Return preferred "master" node id.
+ * Only used for SimpleForwardToFrontend.
+ */
+int pool_get_preferred_master_node_id(void)
+{
+	return session_context->preferred_master_node_id;
+}
+
+/*
+ * Reset preferred "master" node id.
+ * Only used for SimpleForwardToFrontend.
+ */
+void pool_reset_preferred_master_node_id(void)
+{
+	session_context->preferred_master_node_id = -1;
+}
diff --git a/src/include/context/pool_session_context.h b/src/include/context/pool_session_context.h
index 2be0c20..3300add 100644
--- a/src/include/context/pool_session_context.h
+++ b/src/include/context/pool_session_context.h
@@ -6,7 +6,7 @@
  * pgpool: a language independent connection pool server for PostgreSQL 
  * written by Tatsuo Ishii
  *
- * Copyright (c) 2003-2016	PgPool Global Development Group
+ * Copyright (c) 2003-2017	PgPool Global Development Group
  *
  * Permission to use, copy, modify, and distribute this software and
  * its documentation for any purpose and without fee is hereby
@@ -54,12 +54,25 @@ typedef enum {
 } POOL_SYNC_MAP_STATE;
 
 /*
+ * Status of sent message
+ */
+typedef enum {
+	POOL_SENT_MESSAGE_CREATED,	/* initial state of sent meesage */
+	POOL_SENT_MESSAGE_CLOSED	/* sent meesage closed but close complete message has not arrived yet */
+} POOL_SENT_MESSAGE_STATE;
+/*
  * Message content of extended query
  */
 typedef struct {
-	char kind;	/* one of 'P':Parse, 'B':Bind or 'Q':Query(PREPARE) */
-	int len;	/* in host byte order */
+	/*
+	 * One of 'P':Parse, 'B':Bind or 'Q':Query (PREPARE).  If kind = 'B', it
+	 * is assumed that the message is a portal.
+	 */
+	char kind;
+
+	int len;	/* message length in host byte order */
 	char *contents;
+	POOL_SENT_MESSAGE_STATE state;		/* message state */
 	int num_tsparams;
 	char *name;		/* object name of prepared statement or portal */
 	POOL_QUERY_CONTEXT *query_context;
@@ -74,7 +87,8 @@ typedef struct {
 } POOL_SENT_MESSAGE;
 
 /*
- * List of POOL_SENT_MESSAGE
+ * List of POOL_SENT_MESSAGE (XXX this should have been implemented using a
+ * list, rather than an array)
  */
 typedef struct {
 	int capacity;	/* capacity of list */
@@ -83,12 +97,13 @@ typedef struct {
 } POOL_SENT_MESSAGE_LIST;
 
 /*
- * Received message queue used in extended protocol/streaming replication
- * mode.  The queue is an FIFO, allow to de-queue in the middle of the queue
- * however.  When Parse/Bind/Close message are received, each message is
- * en-queued.  The information is used to process those response messages,
- * when Parse complete/Bind completes and Close compete message are received
- * because they don't have any information regarding statement/portal.
+ * Received message queue used in extended query/streaming replication mode.
+ * The queue is an FIFO.  When Parse/Bind/Describe/Execute/Close message are
+ * received, each message is en-queued.  The information is used to process
+ * those response messages, when Parse complete/Bind completes, Parameter
+ * description, row description, command complete and close compete message
+ * are received because they don't have any information regarding
+ * statement/portal.
  *
  * The memory used for the queue lives in the session context mememory.
  */
@@ -96,6 +111,8 @@ typedef struct {
 typedef enum {
 	POOL_PARSE,
 	POOL_BIND,
+	POOL_EXECUTE,
+	POOL_DESCRIBE,
 	POOL_CLOSE
 } POOL_MESSAGE_TYPE;
 
@@ -103,8 +120,17 @@ typedef struct {
 	POOL_MESSAGE_TYPE type;
 	char *contents;		/* message packet contents excluding message kind */
 	int contents_len;	/* message packet length */
+	char query[QUERY_STRING_BUFFER_LEN];	/* copy of original query */
+	char statement[MAX_IDENTIFIER_LEN];	/* prepared statment name if any */
+	char portal[MAX_IDENTIFIER_LEN];	/* portal name if any */
+	bool is_rows_returned;		/* true if the message could produce row data */
+	int node_ids[2];	/* backend node ids this message was sent to. -1 means no message was sent. */
+	POOL_QUERY_CONTEXT *query_context;	/* query context */
 } POOL_PENDING_MESSAGE;
 
+/* Return true if node_id is one of node_ids */
+#define IS_SENT_NODE_ID(msg, node_id)	(msg->node_ids[0] == node_id || msg->node_ids[1] == node_id)
+
 /*
  * Per session context:
  */
@@ -207,14 +233,22 @@ typedef struct {
 	bool is_pending_response;
 
 	/*
-	 * Parse/Bind/Close message queue.
+	 * Parse/Bind/Decribe/Execute/Close message queue.
 	 */
 	List *pending_messages;
 
+	/*
+	 * The last pending message. Reset at Ready for query.
+	 */
+	POOL_PENDING_MESSAGE *previous_message;
+
 	/* Protocol major version number */
 	int major;
 	/* Protocol minor version number */
 	int minor;
+
+	/* Preferred "master" node id. Only used for SimpleForwardToFrontend. */
+	int preferred_master_node_id;
 } POOL_SESSION_CONTEXT;
 
 extern void pool_init_session_context(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend);
@@ -241,7 +275,8 @@ extern bool pool_remove_sent_message(char kind, const char *name);
 extern void pool_remove_sent_messages(char kind);
 extern void pool_clear_sent_message_list(void);
 extern void pool_sent_message_destroy(POOL_SENT_MESSAGE *message);
-extern POOL_SENT_MESSAGE *pool_get_sent_message(char kind, const char *name);
+extern POOL_SENT_MESSAGE *pool_get_sent_message(char kind, const char *name, POOL_SENT_MESSAGE_STATE state);
+extern void pool_set_sent_message_state(POOL_SENT_MESSAGE *message);
 extern void pool_unset_writing_transaction(void);
 extern void pool_set_writing_transaction(void);
 extern bool pool_is_writing_transaction(void);
@@ -267,14 +302,26 @@ extern bool pool_is_pending_response(void);
 extern void pool_pending_messages_init (void);
 extern void pool_pending_messages_destroy(void);
 extern POOL_PENDING_MESSAGE *pool_pending_messages_create(char kind, int len, char *contents);
+extern void pool_pending_messages_dest_set(POOL_PENDING_MESSAGE* message, POOL_QUERY_CONTEXT *query_context);
+extern void pool_pending_messages_query_set(POOL_PENDING_MESSAGE* message, POOL_QUERY_CONTEXT *query_context);
 extern void pool_pending_message_add(POOL_PENDING_MESSAGE* message);
+extern POOL_PENDING_MESSAGE *pool_pending_message_head_message(void);
+extern POOL_PENDING_MESSAGE *pool_pending_message_pull_out(void);
 extern POOL_PENDING_MESSAGE *pool_pending_message_remove(POOL_MESSAGE_TYPE type);
 extern char pool_get_close_message_spec(POOL_PENDING_MESSAGE *msg);
 extern char *pool_get_close_message_name(POOL_PENDING_MESSAGE *msg);
+extern void pool_pending_message_reset_previous_message(void);
+extern void pool_pending_message_set_previous_message(POOL_PENDING_MESSAGE *message);
+extern POOL_PENDING_MESSAGE *pool_pending_message_get_previous_message(void);
+extern bool pool_pending_message_exists(void);
+extern void dump_pending_message(void);
 extern void pool_set_major_version(int major);
 extern int pool_get_major_version(void);
 extern void pool_set_minor_version(int minor);
 extern int pool_get_minor_version(void);
+extern void pool_set_preferred_master_node_id(int node_id);
+extern int pool_get_preferred_master_node_id(void);
+extern void pool_reset_preferred_master_node_id(void);
 
 #ifdef NOT_USED
 extern void pool_add_prep_where(char *name, bool *map);
diff --git a/src/include/pool.h b/src/include/pool.h
index e2a9cdb..3feef1f 100644
--- a/src/include/pool.h
+++ b/src/include/pool.h
@@ -376,6 +376,7 @@ extern int my_master_node_id;
 #define MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION 10 /* time in seconds to keep retrying for a
 											   * watchdog command if the cluster is not
 											   * in stable state */
+#define MAX_IDENTIFIER_LEN		128
 
 #define SERIALIZE_ACCEPT (pool_config->serialize_accept == true && \
 						  pool_config->child_life_time == 0)
@@ -674,6 +675,7 @@ extern int compare(const void *p1, const void *p2);
 extern void do_error_execute_command(POOL_CONNECTION_POOL *backend, int node_id, int major);
 extern POOL_STATUS pool_discard_packet_contents(POOL_CONNECTION_POOL *cp);
 extern void pool_dump_valid_backend(int backend_id);
+extern bool pool_push_pending_data(POOL_CONNECTION *backend);
 
 /* pool_auth.c */
 extern void pool_random_salt(char *md5Salt);
diff --git a/src/protocol/CommandComplete.c b/src/protocol/CommandComplete.c
index 832e5e4..c7543a7 100644
--- a/src/protocol/CommandComplete.c
+++ b/src/protocol/CommandComplete.c
@@ -150,8 +150,15 @@ POOL_STATUS CommandComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *bac
 		pool_set_query_state(session_context->query_context, POOL_EXECUTE_COMPLETE);
 	}
 
+	/*
+	 * If we are in streaming replication mode and we are doing extended
+	 * query, reset query in progress flag and prevoius pending message.
+	*/
 	if (STREAM && pool_is_doing_extended_query_message())
+	{
 		pool_unset_query_in_progress();
+		pool_pending_message_reset_previous_message();
+	}
 
 	return POOL_CONTINUE;
 }
diff --git a/src/protocol/pool_process_query.c b/src/protocol/pool_process_query.c
index 58e75e9..0a1f756 100644
--- a/src/protocol/pool_process_query.c
+++ b/src/protocol/pool_process_query.c
@@ -5,7 +5,7 @@
  * pgpool: a language independent connection pool server for PostgreSQL
  * written by Tatsuo Ishii
  *
- * Copyright (c) 2003-2016	PgPool Global Development Group
+ * Copyright (c) 2003-2017	PgPool Global Development Group
  *
  * Permission to use, copy, modify, and distribute this software and
  * its documentation for any purpose and without fee is hereby
@@ -367,7 +367,7 @@ POOL_STATUS pool_process_query(POOL_CONNECTION *frontend,
 											pool_unread(CONNECTION(backend, MASTER_NODE_ID), &kind, sizeof(kind));
 										}
 									}
-									else
+									else if (!STREAM)
 									{
                                         ereport(LOG,
                                                 (errmsg("pool process query"),
@@ -1969,7 +1969,6 @@ void do_query(POOL_CONNECTION *backend, char *query, POOL_SELECT_RESULT **result
 	int num_close_complete;
 	int state;
 	bool data_pushed;
-	POOL_SESSION_CONTEXT *session_context;
 
 	data_pushed = false;
 
@@ -2009,71 +2008,9 @@ void do_query(POOL_CONNECTION *backend, char *query, POOL_SELECT_RESULT **result
 		 * backend. The saved packets will be poped up before returning to
 		 * caller. This preserves the user's expectation of packet sequence.
 		 */
-		if (pool_is_pending_response())
+		if (STREAM && pool_pending_message_exists())
 		{
-			pool_write(backend, "H", 1);
-			len = htonl(sizeof(len));
-			pool_write_and_flush(backend, &len, sizeof(len));
-			ereport(DEBUG1,
-					(errmsg("do_query: send flush message to %d", backend->db_node_id)));
-
-			/*
-			 * If we have not send the flush message to load balance node yet,
-			 * send a flush message to the load balance node. Otherwise only
-			 * the non load balance node (usually the master node) produces
-			 * response if we do not send sync message to it yet.
-			 */
-			session_context = pool_get_session_context(false);
-
-			if (backend->db_node_id != session_context->load_balance_node_id)
-			{
-				POOL_CONNECTION *con;
-
-				con = session_context->backend->slots[session_context->load_balance_node_id]->con;
-				pool_write(con, "H", 1);
-				len = htonl(sizeof(len));
-				pool_write_and_flush(con, &len, sizeof(len));
-				ereport(DEBUG1,
-						(errmsg("do_query: send flush message to %d", con->db_node_id)));
-
-			}
-
-			for(;;)
-			{
-				int len;
-				char *buf;
-
-				pool_set_timeout(-1);
-
-				pool_read(backend, &kind, 1);
-				pool_push(backend, &kind, 1);
-				data_pushed = true;
-
-				pool_read(backend, &len, sizeof(len));
-				pool_push(backend, &len, sizeof(len));
-
-				len = ntohl(len);
-				if ((len - sizeof(len)) > 0)
-				{
-					len -= sizeof(len);
-					buf = palloc(len);
-					pool_read(backend, buf, len);
-					pool_push(backend, buf, len);
-				}
-
-				/* check if there's any pending data */
-				if (!pool_ssl_pending(backend) && pool_read_buffer_is_empty(backend))
-				{
-					pool_set_timeout(0);
-					if (pool_check_fd(backend) != 0)
-					{
-						ereport(DEBUG1,
-								(errmsg("do_query: no pending data")));
-						pool_set_timeout(-1);
-						break;
-					}
-				}
-			}
+			data_pushed = pool_push_pending_data(backend);
 		}
 
 		if (pname_len == 0)
@@ -3280,16 +3217,63 @@ void read_kind_from_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *bac
 	double max_count = 0;
 	int degenerate_node_num = 0;                /* number of backends degeneration requested */
 	int degenerate_node[MAX_NUM_BACKENDS];      /* degeneration requested backend list */
-	bool doing_extended_message = false;		/* are we doing extended protocol? */
 	POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false);
 	POOL_QUERY_CONTEXT *query_context = session_context->query_context;
-	POOL_SYNC_MAP_STATE use_sync_map = pool_use_sync_map();
+	POOL_PENDING_MESSAGE *msg = NULL;
+	POOL_PENDING_MESSAGE *previous_message;
 
 	int num_executed_nodes = 0;
 	int first_node = -1;
 
 	memset(kind_map, 0, sizeof(kind_map));
 
+	if (STREAM && pool_get_session_context(true) && pool_is_doing_extended_query_message())
+	{
+		msg = pool_pending_message_head_message();
+		previous_message = pool_pending_message_get_previous_message();
+		if (!msg)
+		{
+			/*
+			 * There is no pending message in the queue. This could mean we
+			 * are receiving data rows.  If so, previous_msg must exist and the
+			 * query must be SELECT.
+			 */
+			if (previous_message == NULL)
+			{
+				/* no previous message. let's unset query in progress flag. */
+				ereport(DEBUG1,
+						(errmsg("read_kind_from_backend: no pending message, no previous message")));
+				pool_unset_query_in_progress();
+			}
+			else
+			{
+				/*
+				 * Previous message exists. Let's see if it could return
+				 * rows. If not, we cannot predict what kind of message will
+				 * arrive, so just unset query in progress.
+				 */
+				if (previous_message->is_rows_returned)
+				{
+					ereport(DEBUG1,
+							(errmsg("read_kind_from_backend: no pending message, previous message exists, rows returning")));
+					session_context->query_context = previous_message->query_context;
+					pool_set_query_in_progress();
+				}
+				else
+					pool_unset_query_in_progress();
+			}
+		}
+		else
+		{
+			ereport(LOG,
+					(errmsg("read_kind_from_backend: pending message exists. query context: %x",
+						msg->query_context)));
+			pool_pending_message_set_previous_message(msg);
+			session_context->query_context = msg->query_context;
+			pool_set_query_in_progress();
+		}
+	}
+
 	if (MASTER_SLAVE)
 	{
 		ereport(DEBUG1,
@@ -3321,20 +3305,34 @@ void read_kind_from_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *bac
 	{
 		/* initialize degenerate record */
 		degenerate_node[i] = 0;
+		kind_list[i] = 0;
 
+#ifdef NOT_USED
 		if (!VALID_BACKEND(i) || use_sync_map == POOL_SYNC_MAP_EMPTY)
 		{
 			kind_list[i] = 0;
 			continue;
 		}
+#endif
 
-		if (VALID_BACKEND(i))
+#ifdef NOT_USED
+		if (STREAM && pool_is_doing_extended_query_message())
 		{
-			if (use_sync_map == POOL_SYNC_MAP_IS_VALID && !pool_is_set_sync_map(i))
+			if (msg && IS_SENT_NODE_ID(msg, i))
 			{
-				continue;
+				do_this_node_id = true;
 			}
-
+		}
+		else
+		{
+			if (VALID_BACKEND(i))
+			{
+				do_this_node_id = true;
+			}
+		}
+#endif
+		if (VALID_BACKEND(i))
+		{
 			num_executed_nodes++;
 
 			if (first_node < 0)
@@ -3362,9 +3360,14 @@ void read_kind_from_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *bac
 							 errdetail("kind == 0")));
 				}
 
+				ereport(LOG,
+					(errmsg("reading backend data packet kind"),
+						 errdetail("backend:%d kind:'%c'",i, kind)));
+#ifdef NOT_USED
 				ereport(DEBUG2,
 					(errmsg("reading backend data packet kind"),
 						 errdetail("backend:%d kind:'%c'",i, kind)));
+#endif
 
 				/*
 				 * Read and discard parameter status and notice messages
@@ -3671,9 +3674,9 @@ void read_kind_from_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *bac
 								POOL_SENT_MESSAGE *sent_msg;
 								DeallocateStmt *d = (DeallocateStmt *)node;
 
-								sent_msg = pool_get_sent_message('Q', d->name);
+								sent_msg = pool_get_sent_message('Q', d->name, POOL_SENT_MESSAGE_CREATED);
 								if (!sent_msg)
-									sent_msg = pool_get_sent_message('P', d->name);
+									sent_msg = pool_get_sent_message('P', d->name, POOL_SENT_MESSAGE_CREATED);
 								if (sent_msg)
 								{
 									if (sent_msg->query_context->original_query)
@@ -3709,6 +3712,34 @@ void read_kind_from_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *bac
 
 	}
 
+	/*
+	 * If we are in in streaming replication mode and we doing an extended
+	 * query, check the kind we just read.  If it's one of 'D', 'E', or 'N',
+	 * and the pulled out message was 'execute', the message must be put back
+	 * to the tail of queue so that next Command Complete message from backend
+	 * matches the execute message.
+	 *
+	 * Also if it's 't' (parameter description) and the pulled message was
+	 * 'describe', the message must be put back to the tail of queue so that
+	 * the row description message from backend matches the describe message.
+	 */
+	if (STREAM && pool_is_doing_extended_query_message() && msg)
+	{
+		if ((msg->type == POOL_EXECUTE &&
+			 (*decided_kind == 'D' || *decided_kind == 'E' || *decided_kind == 'N')) ||
+			(msg->type == POOL_DESCRIBE && *decided_kind == 't'))
+		{
+			ereport(LOG,
+					(errmsg("read_kind_from_backend: pending message was left")));
+		}
+		else
+		{
+			ereport(LOG,
+					(errmsg("read_kind_from_backend: pending message was pulled out")));
+			pool_pending_message_pull_out();
+		}
+	}
+
 	return;
 }
 
@@ -4836,3 +4867,111 @@ void pool_dump_valid_backend(int backend_id)
                     RAW_MODE, REAL_MASTER_NODE_ID, pool_is_node_to_be_sent_in_current_query(backend_id),
                     *my_backend_status[backend_id])));
 }
+
+/*
+ * Read pending data from backend and push them into pending statck if any.
+ * Should be used for streaming replication mode and extended query.
+ * Returns true if data was actually pushed.
+ */
+bool pool_push_pending_data(POOL_CONNECTION *backend)
+{
+	POOL_SESSION_CONTEXT *session_context;
+	int len;
+	bool data_pushed = false;
+	static char random_statement[] = "pgpool_non_existant";
+
+	if (!pool_get_session_context(true) || !pool_is_doing_extended_query_message())
+		return data_pushed;
+
+	/*
+	 * In streaming replication mode, send a Close message for none existing
+	 * prepared statement and flush message before going any further to
+	 * retrieve and save any pending response packet from backend. This
+	 * ensures that at least "close complete" message is retured from backend.
+	 *
+	 * The saved packets will be poped up before returning to caller. This
+	 * preserves the user's expectation of packet sequence.
+	 */
+	pool_write(backend, "C", 1);
+	len = htonl(sizeof(len)+1+sizeof(random_statement));
+	pool_write(backend, &len, sizeof(len));
+	pool_write(backend, "S", 1);
+	pool_write(backend, random_statement, sizeof(random_statement));
+	pool_write(backend, "H", 1);
+	len = htonl(sizeof(len));
+	pool_write_and_flush(backend, &len, sizeof(len));
+	ereport(LOG,
+			(errmsg("pool_push_pending_data: send flush message to %d", backend->db_node_id)));
+
+	/*
+	 * If we have not send the flush message to load balance node yet,
+	 * send a flush message to the load balance node. Otherwise only
+	 * the non load balance node (usually the master node) produces
+	 * response if we do not send sync message to it yet.
+	 */
+	session_context = pool_get_session_context(false);
+
+	if (backend->db_node_id != session_context->load_balance_node_id)
+	{
+		POOL_CONNECTION *con;
+
+		con = session_context->backend->slots[session_context->load_balance_node_id]->con;
+		pool_write(con, "H", 1);
+		len = htonl(sizeof(len));
+		pool_write_and_flush(con, &len, sizeof(len));
+		ereport(LOG,
+				(errmsg("pool_push_pending_data: send flush message to %d", con->db_node_id)));
+	}
+
+	for(;;)
+	{
+		int len;
+		int len_save;
+		char *buf;
+		char kind;
+
+		pool_set_timeout(-1);
+
+		pool_read(backend, &kind, 1);
+		ereport(LOG,
+				(errmsg("pool_push_pending_data: kind: %c", kind)));
+		pool_read(backend, &len, sizeof(len));
+
+		len_save = len;
+		len = ntohl(len);
+		buf = NULL;
+		if ((len - sizeof(len)) > 0)
+		{
+			len -= sizeof(len);
+			buf = palloc(len);
+			pool_read(backend, buf, len);
+		}
+
+		/* check if there's any pending data */
+		if (!pool_ssl_pending(backend) && pool_read_buffer_is_empty(backend))
+		{
+			pool_set_timeout(0);
+			if (pool_check_fd(backend) != 0)
+			{
+				ereport(LOG,
+						(errmsg("pool_push_pending_data: no pending data")));
+				pool_set_timeout(-1);
+				if (buf)
+					pfree(buf);
+				break;
+			}
+		}
+
+		pool_push(backend, &kind, 1);
+		pool_push(backend, &len_save, sizeof(len_save));
+		len = htonl(len_save);
+		len -= sizeof(len);
+		if (len > 0)
+		{
+			pool_push(backend, buf, len);
+			pfree(buf);
+		}
+		data_pushed = true;
+	}
+	return data_pushed;
+}
diff --git a/src/protocol/pool_proto_modules.c b/src/protocol/pool_proto_modules.c
index 8f69636..fb1be64 100644
--- a/src/protocol/pool_proto_modules.c
+++ b/src/protocol/pool_proto_modules.c
@@ -5,7 +5,7 @@
  * pgpool: a language independent connection pool server for PostgreSQL 
  * written by Tatsuo Ishii
  *
- * Copyright (c) 2003-2016	PgPool Global Development Group
+ * Copyright (c) 2003-2017	PgPool Global Development Group
  *
  * Permission to use, copy, modify, and distribute this software and
  * its documentation for any purpose and without fee is hereby
@@ -597,9 +597,9 @@ POOL_STATUS SimpleQuery(POOL_CONNECTION *frontend,
 				}
 				else if (IsA(node, ExecuteStmt))
 				{
-					msg = pool_get_sent_message('Q', ((ExecuteStmt *)node)->name);
+					msg = pool_get_sent_message('Q', ((ExecuteStmt *)node)->name, POOL_SENT_MESSAGE_CREATED);
 					if (!msg)
-						msg = pool_get_sent_message('P', ((ExecuteStmt *)node)->name);
+						msg = pool_get_sent_message('P', ((ExecuteStmt *)node)->name, POOL_SENT_MESSAGE_CREATED);
 				}
 
 				/* rewrite `now()' to timestamp literal */
@@ -701,7 +701,7 @@ POOL_STATUS Execute(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
 	ereport(DEBUG2,
             (errmsg("Execute: portal name <%s>", contents)));
 
-	bind_msg = pool_get_sent_message('B', contents);
+	bind_msg = pool_get_sent_message('B', contents, POOL_SENT_MESSAGE_CREATED);
 	if (!bind_msg)
         ereport(FATAL,
             (return_code(2),
@@ -872,9 +872,17 @@ POOL_STATUS Execute(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
 	}
 	else		/* streaming replication mode */
 	{
+		POOL_PENDING_MESSAGE *pmsg;
+
 		pool_extended_send_and_wait(query_context, "E", len, contents, 1, MASTER_NODE_ID, true);
 		pool_extended_send_and_wait(query_context, "E", len, contents, -1, MASTER_NODE_ID, true);
 
+		/* Add pending message */
+		pmsg = pool_pending_messages_create('E', len, contents);
+		pool_pending_messages_dest_set(pmsg, query_context);
+		pool_pending_messages_query_set(pmsg, query_context);
+		pool_pending_message_add(pmsg);
+
 #ifdef NOT_USED
 		/*
 		 * Send flush message to backend to make sure that we get any response
@@ -1251,13 +1259,23 @@ POOL_STATUS Parse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
 	}
 	else if (STREAM)
 	{
+		POOL_PENDING_MESSAGE *pmsg;
+
 		/* XXX fix me:even with streaming replication mode, couldn't we have a deadlock */
 		pool_set_query_in_progress();
 		pool_clear_sync_map();
 		pool_extended_send_and_wait(query_context, "P", len, contents, 1, MASTER_NODE_ID, true);
 		pool_extended_send_and_wait(query_context, "P", len, contents, -1, MASTER_NODE_ID, true);
 		pool_add_sent_message(session_context->uncompleted_message);
+
+		/* Add pending message */
+		pmsg = pool_pending_messages_create('P', len, contents);
+		pool_pending_messages_dest_set(pmsg, query_context);
+		pool_pending_message_add(pmsg);
+
 		pool_unset_query_in_progress();
+
+		pool_set_pending_response();
 	}
 	else
 	{
@@ -1289,9 +1307,9 @@ POOL_STATUS Bind(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
 	portal_name = contents;
 	pstmt_name = contents + strlen(portal_name) + 1;
 
-	parse_msg = pool_get_sent_message('Q', pstmt_name);
+	parse_msg = pool_get_sent_message('Q', pstmt_name, POOL_SENT_MESSAGE_CREATED);
 	if (!parse_msg)
-		parse_msg = pool_get_sent_message('P', pstmt_name);
+		parse_msg = pool_get_sent_message('P', pstmt_name, POOL_SENT_MESSAGE_CREATED);
 	if (!parse_msg)
 	{
         ereport(ERROR,
@@ -1331,19 +1349,22 @@ POOL_STATUS Bind(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
 
 	session_context->query_context = query_context;
 
-/*
- * Fix me
- */
-#ifdef NOT_USED
+	/*
+	 * Take care the case when the previous parse message has been sent to
+	 * other than primary node. In this case, we send a parse message to the
+	 * primary node.
+	 */
 	if (pool_config->load_balance_mode && pool_is_writing_transaction())
 	{
-		pool_where_to_send(query_context, query_context->original_query,
-						   query_context->parse_tree);
+		if (!STREAM)
+		{
+			pool_where_to_send(query_context, query_context->original_query,
+							   query_context->parse_tree);
+		}
 
 		if (parse_before_bind(frontend, backend, parse_msg) != POOL_CONTINUE)
 			return POOL_END;
 	}
-#endif
 
 	/*
 	 * Start a transaction if necessary in replication mode
@@ -1389,8 +1410,16 @@ POOL_STATUS Bind(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
 
 	if (STREAM)
 	{
+		POOL_PENDING_MESSAGE *pmsg;
+
 		pool_unset_query_in_progress();
 		pool_add_sent_message(session_context->uncompleted_message);
+
+		/* Add pending message */
+		pmsg = pool_pending_messages_create('B', len, contents);
+		pool_pending_messages_dest_set(pmsg, query_context);
+		pool_pending_messages_query_set(pmsg, query_context);
+		pool_pending_message_add(pmsg);
 	}
 	
 	if(rewrite_msg)
@@ -1413,9 +1442,9 @@ POOL_STATUS Describe(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
 	/* Prepared Statement */
 	if (*contents == 'S')
 	{
-		msg = pool_get_sent_message('Q', contents+1);
+		msg = pool_get_sent_message('Q', contents+1, POOL_SENT_MESSAGE_CREATED);
 		if (!msg)
-			msg = pool_get_sent_message('P', contents+1);
+			msg = pool_get_sent_message('P', contents+1, POOL_SENT_MESSAGE_CREATED);
 		if (!msg)
             ereport(FATAL,
                 (return_code(2),
@@ -1425,7 +1454,7 @@ POOL_STATUS Describe(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
 	/* Portal */
 	else
 	{
-		msg = pool_get_sent_message('B', contents+1);
+		msg = pool_get_sent_message('B', contents+1, POOL_SENT_MESSAGE_CREATED);
 		if (!msg)
             ereport(FATAL,
                     (return_code(2),
@@ -1459,7 +1488,17 @@ POOL_STATUS Describe(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
 	pool_extended_send_and_wait(query_context, "D", len, contents, -1, MASTER_NODE_ID, nowait);
 
 	if (STREAM)
+	{
+		POOL_PENDING_MESSAGE *pmsg;
+
+		/* Add pending message */
+		pmsg = pool_pending_messages_create('D', len, contents);
+		pool_pending_messages_dest_set(pmsg, query_context);
+		pool_pending_messages_query_set(pmsg, query_context);
+		pool_pending_message_add(pmsg);
+
 		pool_unset_query_in_progress();
+	}
 
 	return POOL_CONTINUE;
 }
@@ -1478,14 +1517,14 @@ POOL_STATUS Close(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
 	/* Prepared Statement */
 	if (*contents == 'S')
 	{
-		msg = pool_get_sent_message('Q', contents+1);
+		msg = pool_get_sent_message('Q', contents+1, POOL_SENT_MESSAGE_CREATED);
 		if (!msg)
-			msg = pool_get_sent_message('P', contents+1);
+			msg = pool_get_sent_message('P', contents+1, POOL_SENT_MESSAGE_CREATED);
 	}
 	/* Portal */
 	else if (*contents == 'P')
 	{
-		msg = pool_get_sent_message('B', contents+1);
+		msg = pool_get_sent_message('B', contents+1, POOL_SENT_MESSAGE_CREATED);
 	}
 	else
         ereport(FATAL,
@@ -1539,8 +1578,14 @@ POOL_STATUS Close(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
 		pool_extended_send_and_wait(query_context, "C", len, contents, 1, MASTER_NODE_ID, false);
 		pool_extended_send_and_wait(query_context, "C", len, contents, -1, MASTER_NODE_ID, false);
 
+		/* Add pending message */
 		pmsg = pool_pending_messages_create('C', len, contents);
+		pool_pending_messages_dest_set(pmsg, query_context);
+		pool_pending_messages_query_set(pmsg, query_context);
 		pool_pending_message_add(pmsg);
+
+		dump_pending_message();
+
 		pool_unset_query_in_progress();
 		/*
 		 * Remeber that we send flush or sync message to backend.
@@ -1548,12 +1593,12 @@ POOL_STATUS Close(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
 		pool_unset_pending_response();
 
 		/*
-		 * Remove send message
+		 * Remove sent message
 		 */
 		ereport(DEBUG1,
 				(errmsg("Close: removing sent message %c %s", *contents, contents+1)));
-
-		pool_remove_sent_message(*contents == 'S'?'P':'B', contents+1);
+		pool_set_sent_message_state(msg);
+//		pool_remove_sent_message(*contents == 'S'?'P':'B', contents+1);
 	}
 
 	return POOL_CONTINUE;
@@ -1600,7 +1645,6 @@ POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend,
 	POOL_SESSION_CONTEXT *session_context;
 	Node *node = NULL;
 	char *query = NULL;
-	POOL_SYNC_MAP_STATE use_sync_map;
 
 	/*
 	 * It is possible that the "ignore until sync is received" flag was set if
@@ -1610,9 +1654,11 @@ POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend,
 	 */
 	pool_unset_ignore_till_sync();
 
+	/* Reset previous message */
+	pool_pending_message_reset_previous_message();
+
 	/* Get session context */
 	session_context = pool_get_session_context(false);
-	use_sync_map = pool_use_sync_map();
 
 	/*
 	 * If the numbers of update tuples are differ and
@@ -1684,6 +1730,7 @@ POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend,
 		/*
 		 * XXX: discard rest of ReadyForQuery packet
 		 */
+
 		if (pool_read_message_length(backend) < 0)
 			return POOL_END;
 
@@ -1750,6 +1797,10 @@ POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend,
 
 		for (i=0;i<NUM_BACKENDS;i++)
 		{
+			if (!VALID_BACKEND(i))
+				continue;
+
+#ifdef NOT_USED
 			if (!VALID_BACKEND(i) || use_sync_map == POOL_SYNC_MAP_EMPTY)
 				continue;
 
@@ -1757,6 +1808,7 @@ POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend,
 			{
 				continue;
 			}
+#endif
 
 			if (pool_read(CONNECTION(backend, i), &kind, sizeof(kind)))
 				return POOL_END;
@@ -1991,7 +2043,7 @@ POOL_STATUS ParseComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backe
 	/* Get session context */
 	session_context = pool_get_session_context(false);
 
-	if (session_context->uncompleted_message)
+	if (!STREAM && session_context->uncompleted_message)
 	{
 		POOL_QUERY_CONTEXT *qc;
 
@@ -2014,7 +2066,7 @@ POOL_STATUS BindComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backen
 	/* Get session context */
 	session_context = pool_get_session_context(false);
 
-	if (session_context->uncompleted_message)
+	if (!STREAM && session_context->uncompleted_message)
 	{
 		POOL_QUERY_CONTEXT *qc;
 
@@ -2473,7 +2525,10 @@ POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend,
 			pool_set_doing_extended_query_message();
 			if (pool_is_ignore_till_sync())
 				pool_unset_ignore_till_sync();
-			if (!pool_is_query_in_progress())
+
+			if (STREAM)
+				pool_unset_query_in_progress();
+			else if (!pool_is_query_in_progress())
 				pool_set_query_in_progress();
 			status = SimpleForwardToBackend(fkind, frontend, backend, len, contents);
 			pool_unset_pending_response();
@@ -2564,6 +2619,7 @@ POOL_STATUS ProcessBackendResponse(POOL_CONNECTION *frontend,
 	}
 
     read_kind_from_backend(frontend, backend, &kind);
+
 	/*
 	 * Sanity check
 	 */
@@ -2601,21 +2657,21 @@ POOL_STATUS ProcessBackendResponse(POOL_CONNECTION *frontend,
 			case '1':	/* ParseComplete */
 				status = ParseComplete(frontend, backend);
 				pool_set_command_success();
-				if (REPLICATION||RAW_MODE)
+				if (STREAM||REPLICATION||RAW_MODE)
 					pool_unset_query_in_progress();
 				break;
 
 			case '2':	/* BindComplete */
 				status = BindComplete(frontend, backend);
 				pool_set_command_success();
-				if (REPLICATION||RAW_MODE)
+				if (STREAM||REPLICATION||RAW_MODE)
 					pool_unset_query_in_progress();
 				break;
 
 			case '3':	/* CloseComplete */
 				status = CloseComplete(frontend, backend);
 				pool_set_command_success();
-				if (REPLICATION||RAW_MODE)
+				if (STREAM||REPLICATION||RAW_MODE)
 					pool_unset_query_in_progress();
 				break;
 
@@ -2629,13 +2685,18 @@ POOL_STATUS ProcessBackendResponse(POOL_CONNECTION *frontend,
 				{
 					pool_set_ignore_till_sync();
 					pool_unset_query_in_progress();
+
+					/* Remove all pending messages */
+					while (pool_pending_message_pull_out())
+						;
+					pool_pending_message_reset_previous_message();
 				}
 				break;
 
 			case 'C':	/* CommandComplete */				
 				status = CommandComplete(frontend, backend);
 				pool_set_command_success();
-				if ((REPLICATION || RAW_MODE) && pool_is_doing_extended_query_message())
+				if (pool_is_doing_extended_query_message())
 					pool_unset_query_in_progress();
 				break;
 
@@ -2675,12 +2736,11 @@ POOL_STATUS ProcessBackendResponse(POOL_CONNECTION *frontend,
 
 			default:
 				status = SimpleForwardToFrontend(kind, frontend, backend);
-#ifdef NOT_USED
-				if (pool_flush(frontend))
-					return POOL_END;
-#endif
 				break;
 		}
+
+		if (STREAM && pool_is_doing_extended_query_message())
+			pool_reset_preferred_master_node_id();
 	}
 	else
 	{
@@ -3139,6 +3199,11 @@ void per_node_error_log(POOL_CONNECTION_POOL *backend, int node_id, char *query,
 	}
 }
 
+/*
+ * Send parse message to primary/master node and wait for reply if particular
+ * message is not yet parsed on the primary/master node but parsed on other
+ * node. Caller must provide the parse message data as "message".
+ */
 static POOL_STATUS parse_before_bind(POOL_CONNECTION *frontend,
 									 POOL_CONNECTION_POOL *backend,
 									 POOL_SENT_MESSAGE *message)
@@ -3153,20 +3218,57 @@ static POOL_STATUS parse_before_bind(POOL_CONNECTION *frontend,
 
 	memcpy(backup, qc->where_to_send, sizeof(qc->where_to_send));
 
-	/* expect to send to master node only */
-	for (i = 0; i < NUM_BACKENDS; i++)
+	if (STREAM)
 	{
-		if (qc->where_to_send[i] && statecmp(qc->query_state[i], POOL_PARSE_COMPLETE) < 0)
+		if (message->kind == 'P' && qc->where_to_send[PRIMARY_NODE_ID] == 0)
 		{
+			POOL_PENDING_MESSAGE *pmsg;
+
+			/* we are in streaming replication mode and the parse message has not
+			 * been sent to primary yet */
+
+			/* Send parse message to primary node */
 			ereport(DEBUG1,
-				(errmsg("parse before bind"),
-					 errdetail("waiting for backend %d completing parse", i)));
+					(errmsg("parse before bind"),
+					 errdetail("waiting for primary completing parse")));
 
-			pool_extended_send_and_wait(qc, "P", len, contents, 1, i, false);
+			pool_extended_send_and_wait(qc, "P", len, contents, 1, PRIMARY_NODE_ID, false);
+			memset(qc->where_to_send, 0, sizeof(qc->where_to_send));
+			qc->where_to_send[PRIMARY_NODE_ID] = 1;
+			qc->virtual_master_node_id = PRIMARY_NODE_ID;
+
+			/* Add pending message */
+			pmsg = pool_pending_messages_create('P', len, contents);
+			pool_pending_messages_dest_set(pmsg, qc);
+			pool_pending_message_add(pmsg);
+
+			return POOL_CONTINUE;
 		}
 		else
 		{
-			qc->where_to_send[i] = 0;
+			ereport(DEBUG1,
+					(errmsg("parse before bind"),
+					 errdetail("no need to re-send parse")));
+			return POOL_CONTINUE;
+		}
+	}
+	else
+	{
+		/* expect to send to master node only */
+		for (i = 0; i < NUM_BACKENDS; i++)
+		{
+			if (qc->where_to_send[i] && statecmp(qc->query_state[i], POOL_PARSE_COMPLETE) < 0)
+			{
+				ereport(DEBUG1,
+						(errmsg("parse before bind"),
+						 errdetail("waiting for backend %d completing parse", i)));
+
+				pool_extended_send_and_wait(qc, "P", len, contents, 1, i, false);
+			}
+			else
+			{
+				qc->where_to_send[i] = 0;
+			}
 		}
 	}
 
bug271.diff (54,508 bytes)

t-ishii

2017-02-27 18:10

developer   ~0001366

The discussion has been continued at pgpool-hackers mailing list (pgpool-hackers: 2043].

enruquekl

2017-03-17 07:37

reporter   ~0001378

The issue was reproduced by us in Pgpool 3.6.1 several times in production environment. Seems the bug is very problem since it is not closed for several months. This fact is very painful (((

Are there any estimations for the fix?

t-ishii

2017-03-17 14:22

developer   ~0001379

See the mailing archive discussions. I am working with Sergey, and he kindly have been testing my patches. I cannot say the estimated date for the fix but I believe we are getting closer to the end.

t-ishii

2017-03-31 17:05

developer   ~0001402

Now that fixes are incorporated into the stable branch and discussions/activities have been moved to the mailing list, I'm going to close this item.
(I asked Sergey and he agreed on it).

Issue History

Date Modified Username Field Change
2016-12-16 04:46 supp_k New Issue
2016-12-16 04:59 supp_k Note Added: 0001225
2016-12-17 02:33 supp_k Note Added: 0001226
2016-12-19 06:10 serk File Added: issues_271_244.zip
2016-12-19 06:10 serk Note Added: 0001231
2016-12-20 09:35 t-ishii Assigned To => t-ishii
2016-12-20 09:35 t-ishii Status new => assigned
2016-12-20 17:47 t-ishii Note Added: 0001242
2016-12-20 17:47 t-ishii Status assigned => feedback
2016-12-20 18:07 supp_k File Added: test7.log
2016-12-20 18:07 supp_k Note Added: 0001243
2016-12-20 18:07 supp_k Status feedback => assigned
2016-12-20 18:08 supp_k Note Added: 0001244
2016-12-20 18:11 supp_k Note Added: 0001245
2016-12-20 18:15 t-ishii Note Added: 0001247
2016-12-20 18:24 supp_k Note Added: 0001248
2016-12-21 05:42 supp_k Note Added: 0001250
2016-12-21 13:39 t-ishii Note Added: 0001254
2016-12-21 14:24 t-ishii Note Added: 0001255
2016-12-21 14:25 t-ishii Status assigned => feedback
2016-12-21 15:18 supp_k File Added: test10.zip
2016-12-21 15:18 supp_k Note Added: 0001256
2016-12-21 15:18 supp_k Status feedback => assigned
2016-12-21 18:11 supp_k Note Added: 0001257
2016-12-21 21:35 t-ishii Note Added: 0001258
2016-12-21 21:54 supp_k Note Added: 0001259
2016-12-21 23:56 t-ishii Note Added: 0001260
2016-12-22 00:04 supp_k Note Added: 0001261
2016-12-22 18:24 supp_k Note Added: 0001262
2016-12-23 20:57 elepuser Note Added: 0001263
2016-12-26 15:06 t-ishii Note Added: 0001264
2016-12-28 20:01 supp_k Note Added: 0001271
2017-01-04 18:06 t-ishii Note Added: 0001277
2017-01-04 19:17 supp_k Note Added: 0001278
2017-01-10 05:15 elepuser Note Added: 0001286
2017-01-25 19:21 supp_k Note Added: 0001295
2017-01-25 19:22 supp_k Note Added: 0001296
2017-01-29 07:35 elepuser Note Added: 0001310
2017-01-30 18:05 t-ishii Note Added: 0001312
2017-02-06 16:40 supp_k Note Added: 0001329
2017-02-07 17:48 elepuser Note Added: 0001336
2017-02-08 10:27 t-ishii Note Added: 0001337
2017-02-14 16:12 t-ishii Note Added: 0001341
2017-02-23 15:20 t-ishii Note Added: 0001353
2017-02-23 15:20 t-ishii File Added: bug271.diff
2017-02-23 15:21 t-ishii Status assigned => feedback
2017-02-27 18:10 t-ishii Note Added: 0001366
2017-03-17 07:37 enruquekl Note Added: 0001378
2017-03-17 14:22 t-ishii Note Added: 0001379
2017-03-31 17:05 t-ishii Note Added: 0001402
2017-03-31 17:06 t-ishii Status feedback => closed
2017-03-31 17:06 t-ishii Target Version => 3.6.3