Skip to content

Commit f89c95c

Browse files
ermakov-olegelprans
authored andcommitted
Remove connection parameter caching in Pool
Currently, `asyncpt.Pool` will cache various aspects of the connection, like the selected host and connection parameters in an attempt to make subsequent connection attempts somewhat faster. This behavior is dubious because server host availability and role may change, such as when a primary becomes a standby and vice-versa or when a host becomes unavailable permanently, but another host from the DSN can be picked up. Just remove it.
1 parent 0c3bf60 commit f89c95c

File tree

2 files changed

+75
-36
lines changed

2 files changed

+75
-36
lines changed

asyncpg/pool.py

+9-36
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,6 @@ class Pool:
311311
__slots__ = (
312312
'_queue', '_loop', '_minsize', '_maxsize',
313313
'_init', '_connect_args', '_connect_kwargs',
314-
'_working_addr', '_working_config', '_working_params',
315314
'_holders', '_initialized', '_initializing', '_closing',
316315
'_closed', '_connection_class', '_record_class', '_generation',
317316
'_setup', '_max_queries', '_max_inactive_connection_lifetime'
@@ -377,10 +376,6 @@ def __init__(self, *connect_args,
377376
self._initializing = False
378377
self._queue = None
379378

380-
self._working_addr = None
381-
self._working_config = None
382-
self._working_params = None
383-
384379
self._connection_class = connection_class
385380
self._record_class = record_class
386381

@@ -430,9 +425,8 @@ async def _initialize(self):
430425
# first few connections in the queue, therefore we want to walk
431426
# `self._holders` in reverse.
432427

433-
# Connect the first connection holder in the queue so that it
434-
# can record `_working_addr` and `_working_opts`, which will
435-
# speed up successive connection attempts.
428+
# Connect the first connection holder in the queue so that
429+
# any connection issues are visible early.
436430
first_ch = self._holders[-1] # type: PoolConnectionHolder
437431
await first_ch.connect()
438432

@@ -504,36 +498,15 @@ def set_connect_args(self, dsn=None, **connect_kwargs):
504498

505499
self._connect_args = [dsn]
506500
self._connect_kwargs = connect_kwargs
507-
self._working_addr = None
508-
self._working_config = None
509-
self._working_params = None
510501

511502
async def _get_new_connection(self):
512-
if self._working_addr is None:
513-
# First connection attempt on this pool.
514-
con = await connection.connect(
515-
*self._connect_args,
516-
loop=self._loop,
517-
connection_class=self._connection_class,
518-
record_class=self._record_class,
519-
**self._connect_kwargs)
520-
521-
self._working_addr = con._addr
522-
self._working_config = con._config
523-
self._working_params = con._params
524-
525-
else:
526-
# We've connected before and have a resolved address,
527-
# and parsed options and config.
528-
con = await connect_utils._connect_addr(
529-
loop=self._loop,
530-
addr=self._working_addr,
531-
timeout=self._working_params.connect_timeout,
532-
config=self._working_config,
533-
params=self._working_params,
534-
connection_class=self._connection_class,
535-
record_class=self._record_class,
536-
)
503+
con = await connection.connect(
504+
*self._connect_args,
505+
loop=self._loop,
506+
connection_class=self._connection_class,
507+
record_class=self._record_class,
508+
**self._connect_kwargs,
509+
)
537510

538511
if self._init is not None:
539512
try:

tests/test_pool.py

+66
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import asyncio
99
import inspect
1010
import os
11+
import pathlib
1112
import platform
1213
import random
1314
import textwrap
@@ -18,6 +19,7 @@
1819
from asyncpg import _testbase as tb
1920
from asyncpg import connection as pg_connection
2021
from asyncpg import pool as pg_pool
22+
from asyncpg import cluster as pg_cluster
2123

2224
_system = platform.uname().system
2325

@@ -969,6 +971,70 @@ async def worker():
969971
await pool.release(conn)
970972

971973

974+
@unittest.skipIf(os.environ.get('PGHOST'), 'unmanaged cluster')
975+
class TestPoolReconnectWithTargetSessionAttrs(tb.ClusterTestCase):
976+
977+
@classmethod
978+
def setup_cluster(cls):
979+
cls.cluster = cls.new_cluster(pg_cluster.TempCluster)
980+
cls.start_cluster(cls.cluster)
981+
982+
async def simulate_cluster_recovery_mode(self):
983+
port = self.cluster.get_connection_spec()['port']
984+
await self.loop.run_in_executor(
985+
None,
986+
lambda: self.cluster.stop()
987+
)
988+
989+
# Simulate recovery mode
990+
(pathlib.Path(self.cluster._data_dir) / 'standby.signal').touch()
991+
992+
await self.loop.run_in_executor(
993+
None,
994+
lambda: self.cluster.start(
995+
port=port,
996+
server_settings=self.get_server_settings(),
997+
)
998+
)
999+
1000+
async def test_full_reconnect_on_node_change_role(self):
1001+
if self.cluster.get_pg_version() < (12, 0):
1002+
self.skipTest("PostgreSQL < 12 cannot support standby.signal")
1003+
return
1004+
1005+
pool = await self.create_pool(
1006+
min_size=1,
1007+
max_size=1,
1008+
target_session_attrs='primary'
1009+
)
1010+
1011+
# Force a new connection to be created
1012+
await pool.fetchval('SELECT 1')
1013+
1014+
await self.simulate_cluster_recovery_mode()
1015+
1016+
# current pool connection info cache is expired,
1017+
# but we don't know it yet
1018+
with self.assertRaises(asyncpg.TargetServerAttributeNotMatched) as cm:
1019+
await pool.execute('SELECT 1')
1020+
1021+
self.assertEqual(
1022+
cm.exception.args[0],
1023+
"None of the hosts match the target attribute requirement "
1024+
"<SessionAttribute.primary: 'primary'>"
1025+
)
1026+
1027+
# force reconnect
1028+
with self.assertRaises(asyncpg.TargetServerAttributeNotMatched) as cm:
1029+
await pool.execute('SELECT 1')
1030+
1031+
self.assertEqual(
1032+
cm.exception.args[0],
1033+
"None of the hosts match the target attribute requirement "
1034+
"<SessionAttribute.primary: 'primary'>"
1035+
)
1036+
1037+
9721038
@unittest.skipIf(os.environ.get('PGHOST'), 'using remote cluster for testing')
9731039
class TestHotStandby(tb.HotStandbyTestCase):
9741040
def create_pool(self, **kwargs):

0 commit comments

Comments
 (0)