Skip to content

Support timeouts in Connection.close() and Pool.release() #222

New issue

Have a question about this project? No Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “No Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? No Sign in to your account

Merged
merged 3 commits into from
Nov 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .ci/appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ branches:

install:
- "%PYTHON% -m pip install --upgrade pip wheel setuptools"
- "%PYTHON% -m pip install -r .ci/requirements-win.txt"
- "%PYTHON% -m pip install --upgrade -r .ci/requirements-win.txt"

build_script:
- "%PYTHON% setup.py build_ext --inplace"

test_script:
- "%PYTHON% -m unittest discover -s tests"
- "%PYTHON% setup.py test"

after_test:
- "%PYTHON% setup.py bdist_wheel"
Expand Down
2 changes: 1 addition & 1 deletion .ci/requirements-win.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
cython>=0.24
cython>=0.27.2
tinys3
4 changes: 2 additions & 2 deletions .ci/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
cython>=0.24
cython>=0.27.2
flake8>=3.4.1
uvloop>=0.5.0
uvloop>=0.8.0
tinys3
twine
2 changes: 1 addition & 1 deletion .ci/travis-install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ fi

pip install --upgrade pip wheel
pip install --upgrade setuptools
pip install -r .ci/requirements.txt
pip install --upgrade -r .ci/requirements.txt
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*.ymlc~
*.scssc
*.so
*.pyd
*~
.#*
.DS_Store
Expand Down
132 changes: 114 additions & 18 deletions asyncpg/_testbase.py → asyncpg/_testbase/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from asyncpg import connection as pg_connection
from asyncpg import pool as pg_pool

from . import fuzzer


@contextlib.contextmanager
def silence_asyncio_long_exec_warning():
Expand All @@ -36,7 +38,16 @@ def flt(log_record):
logger.removeFilter(flt)


def with_timeout(timeout):
def wrap(func):
func.__timeout__ = timeout
return func

return wrap


class TestCaseMeta(type(unittest.TestCase)):
TEST_TIMEOUT = None

@staticmethod
def _iter_methods(bases, ns):
Expand Down Expand Up @@ -64,7 +75,18 @@ def __new__(mcls, name, bases, ns):
for methname, meth in mcls._iter_methods(bases, ns):
@functools.wraps(meth)
def wrapper(self, *args, __meth__=meth, **kwargs):
self.loop.run_until_complete(__meth__(self, *args, **kwargs))
coro = __meth__(self, *args, **kwargs)
timeout = getattr(__meth__, '__timeout__', mcls.TEST_TIMEOUT)
if timeout:
coro = asyncio.wait_for(coro, timeout, loop=self.loop)
try:
self.loop.run_until_complete(coro)
except asyncio.TimeoutError:
raise self.failureException(
'test timed out after {} seconds'.format(
timeout)) from None
else:
self.loop.run_until_complete(coro)
ns[methname] = wrapper

return super().__new__(mcls, name, bases, ns)
Expand Down Expand Up @@ -128,16 +150,30 @@ def handler(loop, ctx):
_default_cluster = None


def _start_cluster(ClusterCls, cluster_kwargs, server_settings):
def _start_cluster(ClusterCls, cluster_kwargs, server_settings,
initdb_options=None):
cluster = ClusterCls(**cluster_kwargs)
cluster.init()
cluster.init(**(initdb_options or {}))
cluster.trust_local_connections()
cluster.start(port='dynamic', server_settings=server_settings)
atexit.register(_shutdown_cluster, cluster)
return cluster


def _start_default_cluster(server_settings={}):
def _get_initdb_options(initdb_options=None):
if not initdb_options:
initdb_options = {}
else:
initdb_options = dict(initdb_options)

# Make the default superuser name stable.
if 'username' not in initdb_options:
initdb_options['username'] = 'postgres'

return initdb_options


def _start_default_cluster(server_settings={}, initdb_options=None):
global _default_cluster

if _default_cluster is None:
Expand All @@ -147,13 +183,16 @@ def _start_default_cluster(server_settings={}):
_default_cluster = pg_cluster.RunningCluster()
else:
_default_cluster = _start_cluster(
pg_cluster.TempCluster, {}, server_settings)
pg_cluster.TempCluster, cluster_kwargs={},
server_settings=server_settings,
initdb_options=_get_initdb_options(initdb_options))

return _default_cluster


def _shutdown_cluster(cluster):
cluster.stop()
if cluster.get_status() == 'running':
cluster.stop()
cluster.destroy()


Expand Down Expand Up @@ -193,15 +232,78 @@ def setUpClass(cls):
super().setUpClass()
cls.setup_cluster()

def create_pool(self, pool_class=pg_pool.Pool, **kwargs):
conn_spec = self.cluster.get_connection_spec()
@classmethod
def get_connection_spec(cls, kwargs={}):
conn_spec = cls.cluster.get_connection_spec()
conn_spec.update(kwargs)
return create_pool(loop=self.loop, pool_class=pool_class, **conn_spec)
if not os.environ.get('PGHOST'):
if 'database' not in conn_spec:
conn_spec['database'] = 'postgres'
if 'user' not in conn_spec:
conn_spec['user'] = 'postgres'
return conn_spec

def create_pool(self, pool_class=pg_pool.Pool,
connection_class=pg_connection.Connection, **kwargs):
conn_spec = self.get_connection_spec(kwargs)
return create_pool(loop=self.loop, pool_class=pool_class,
connection_class=connection_class, **conn_spec)

@classmethod
def connect(cls, **kwargs):
conn_spec = cls.get_connection_spec(kwargs)
return pg_connection.connect(**conn_spec, loop=cls.loop)

@classmethod
def start_cluster(cls, ClusterCls, *,
cluster_kwargs={}, server_settings={}):
return _start_cluster(ClusterCls, cluster_kwargs, server_settings)
cluster_kwargs={}, server_settings={},
initdb_options={}):
return _start_cluster(
ClusterCls, cluster_kwargs,
server_settings, _get_initdb_options(initdb_options))


class ProxiedClusterTestCase(ClusterTestCase):
@classmethod
def get_server_settings(cls):
settings = dict(super().get_server_settings())
settings['listen_addresses'] = '127.0.0.1'
return settings

@classmethod
def get_proxy_settings(cls):
return {'fuzzing-mode': None}

@classmethod
def setUpClass(cls):
super().setUpClass()
conn_spec = cls.cluster.get_connection_spec()
host = conn_spec.get('host')
if not host:
host = '127.0.0.1'
elif host.startswith('/'):
host = '127.0.0.1'
cls.proxy = fuzzer.TCPFuzzingProxy(
backend_host=host,
backend_port=conn_spec['port'],
)
cls.proxy.start()

@classmethod
def tearDownClass(cls):
cls.proxy.stop()
super().tearDownClass()

@classmethod
def get_connection_spec(cls, kwargs):
conn_spec = super().get_connection_spec(kwargs)
conn_spec['host'] = cls.proxy.listening_addr
conn_spec['port'] = cls.proxy.listening_port
return conn_spec

def tearDown(self):
self.proxy.reset()
super().tearDown()


def with_connection_options(**options):
Expand All @@ -223,13 +325,7 @@ def setUp(self):
# Extract options set up with `with_connection_options`.
test_func = getattr(self, self._testMethodName).__func__
opts = getattr(test_func, '__connect_options__', {})
if 'database' not in opts:
opts = dict(opts)
opts['database'] = 'postgres'

self.con = self.loop.run_until_complete(
self.cluster.connect(loop=self.loop, **opts))

self.con = self.loop.run_until_complete(self.connect(**opts))
self.server_version = self.con.get_server_version()

def tearDown(self):
Expand Down
Loading