Skip to content

Connection not being returned to the pool after connection loss #220

New issue

Have a question about this project? No Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “No Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? No Sign in to your account

Closed
GabrielSalla opened this issue Nov 1, 2017 · 17 comments · Fixed by #222
Closed

Connection not being returned to the pool after connection loss #220

GabrielSalla opened this issue Nov 1, 2017 · 17 comments · Fixed by #222
Labels

Comments

@GabrielSalla
Copy link

GabrielSalla commented Nov 1, 2017

  • asyncpg version: 0.13.0
  • PostgreSQL version: 9.4
  • Do you use a PostgreSQL SaaS? If so, which? Can you reproduce
    the issue with a local PostgreSQL install?
    : using docker image aidanlister/postgres-hstore
  • Python version: 3.6.3
  • Platform: Fedora 27
  • Do you use pgbouncer?: no
  • Did you install asyncpg with pip?: yes
  • If you built asyncpg locally, which version of Cython did you use?:
  • Can the issue be reproduced under both asyncio and
    uvloop?
    : didn't try uvloop

While my application is running some queries I interrupt the connection by removing the ethernet cable from the computer. After doing so some connections are never returned to the pool, even though the timeout is set for the acquire() and fetch() methods. I know they are never returned to the pool because I print the queue size every time it finishes.

I can't send the whole code because it's quite extensive, but the database operations are concentrated in a single file:

import src.controllers.configs as configs_controller
import asyncio
import logging
import asyncpg
import traceback
import decimal

QUERY_TRIES = 2
POOL_MAX_SIZE = 3
_databases = dict()
_logger = logging.getLogger("DatabaseController")


async def _create_pool(access_information):
    return await asyncpg.create_pool(
        **access_information,
        min_size=0,
        max_size=POOL_MAX_SIZE,
        max_queries=30,
        timeout=5,
        command_timeout=10,
        max_inactive_connection_lifetime=180
    )

async def connect():
    # Create a connection pool for each database defined in the configuration
    global _databases
    _databases = {
        database_name: await _create_pool(
            configs_controller.database_access[database_name])
        for database_name in configs_controller.database_access
    }


async def close_connections():
    for database_name, database_pool in _databases.items():
        await database_pool.close()


def check_database(database):
    if database not in _databases:
        error = f"Database '{database}' not initialized"
        _logger.error(error)
        raise Exception(error)


async def execute(database, query, *args):
    # Acquire a connection
    check_database(database)
    async with _databases[database].acquire() as connection:
        await connection.execute(query, *args)


async def executemany(database, query, *args):
    # Acquire a connection
    check_database(database)
    async with _databases[database].acquire() as connection:
        await connection.executemany(query, *args)


def _decimal_to_float(data):
    for row in data:
        for key, value in row.items():
            if isinstance(value, decimal.Decimal):
                row[key] = float(value)


async def _fetch_data(database, query, *args):
    # Acquire a connection
    async with _databases[database].acquire(timeout=20) as connection:
        try:
            result = await connection.fetch(query, *args)
            result = [dict(row) for row in result]
            _decimal_to_float(result)
            return result
        # Any exception while fetching the data shouldn't trigger a retry, so
        # they are caught here
        except asyncio.TimeoutError:
            _logger.error(f"Query timed out\n{query}{args}")

async def print_counts():
    for database_name, database in _databases.items():
        print(database_name, database._queue.qsize(), POOL_MAX_SIZE)

async def fetch(database, query, *args):
    check_database(database)
    # Try to run the query a number of times
    count = 0
    while count != QUERY_TRIES:
        count += 1
        try:
            return await _fetch_data(database, query, *args)
        # The following exceptions may retry to fetch the data
        # If caught SerializationError
        except asyncpg.exceptions.SerializationError:
            _logger.info("Conflict with recovery, retrying")
        # If caught TimeoutError (a connection timeout, not a query timeout)
        except asyncio.TimeoutError:
            _logger.info("Connection timed out, retrying")
        # Return None if caught any other exception
        except:
            _logger.error(f"{traceback.format_exc()}\n{query} {args}")
            return None
        # Delay before retrying
        await asyncio.sleep(1)

After removing the ethernet cable, I wait for some time so an external timeout is triggered (await asyncio.wait(futures, timeout=30)). When this happens, the application should have finished all the tasks (if everything went well) and I would be able to finish it safelly. Before letting the loop close, there's a delay and I interrupt the execution using Ctrl+C. It works fine when there are no pending tasks, but when the previous event happens, some of the tasks "lost" are interrupted, generating the a stack trace like the following one.

[2017-11-01 00:09:25,800] (ERROR) asyncio: Task was destroyed but it is pending!
task: <Task pending coro=<Pool.release.<locals>._release_impl() running at /usr/local/lib/python3.6/site-packages/asyncpg/pool.py:465> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f830f109d68>()]> cb=[shield.<locals>._done_callback() at /usr/local/lib/python3.6/asyncio/tasks.py:672]>
[2017-11-01 00:09:25,804] (ERROR) asyncio: Task was destroyed but it is pending!
task: <Task pending coro=<Pool.release.<locals>._release_impl() running at /usr/local/lib/python3.6/site-packages/asyncpg/pool.py:465> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f830f0891f8>()]> cb=[shield.<locals>._done_callback() at /usr/local/lib/python3.6/asyncio/tasks.py:672]>
[2017-11-01 00:09:25,808] (ERROR) asyncio: Fatal write error on socket transport
protocol: <asyncpg.protocol.protocol.Protocol object at 0x7f830f6bb588>
transport: <_SelectorSocketTransport fd=9>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 192, in release
    await self._con.reset()
  File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 986, in reset
    await self.execute(reset_query)
  File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 238, in execute
    return await self._protocol.query(query, timeout)
  File "asyncpg/protocol/protocol.pyx", line 296, in query
AttributeError: 'weakref' object has no attribute 'cline_in_traceback'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 762, in write
    n = self._sock.send(data)
OSError: [Errno 9] Bad file descriptor
Exception ignored in: <coroutine object Pool.release.<locals>._release_impl at 0x7f830f197678>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 465, in _release_impl
  File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 203, in release
  File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 192, in release
  File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 986, in reset
  File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 238, in execute
  File "asyncpg/protocol/protocol.pyx", line 296, in query
AttributeError: 'weakref' object has no attribute 'cline_in_traceback'
[2017-11-01 00:09:25,813] (ERROR) asyncio: Fatal write error on socket transport
protocol: <asyncpg.protocol.protocol.Protocol object at 0x7f830f6bb6d8>
transport: <_SelectorSocketTransport fd=10>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 192, in release
    await self._con.reset()
  File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 986, in reset
    await self.execute(reset_query)
  File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 238, in execute
    return await self._protocol.query(query, timeout)
  File "asyncpg/protocol/protocol.pyx", line 296, in query
AttributeError: 'weakref' object has no attribute 'cline_in_traceback'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 762, in write
    n = self._sock.send(data)
OSError: [Errno 9] Bad file descriptor
Exception ignored in: <coroutine object Pool.release.<locals>._release_impl at 0x7f830f197990>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 465, in _release_impl
  File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 203, in release
  File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 192, in release
  File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 986, in reset
  File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 238, in execute
  File "asyncpg/protocol/protocol.pyx", line 296, in query
AttributeError: 'weakref' object has no attribute 'cline_in_traceback'
[2017-11-01 00:09:25,817] (ERROR) asyncio: Task was destroyed but it is pending!
task: <Task pending coro=<DefaultModule.run() running at ./src/models/module.py:52> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f830f1093a8>()]>>
[2017-11-01 00:09:25,821] (ERROR) asyncio: Task was destroyed but it is pending!
task: <Task pending coro=<DefaultModule.run() running at ./src/models/module.py:52> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f830f089198>()]>>
[2017-11-01 00:09:25,825] (ERROR) DatabaseController: Traceback (most recent call last):
  File "./src/controllers/database.py", line 102, in fetch
    _logger.info("Conflict with recovery, retrying")
GeneratorExit

I've tried adding some timeouts in other places, but there's nothing I can do to make it go back to the pool. I even tried to add some logs trying to track where it's happening, but couldn't find it.

A simple version of the application is:

async def run():
    queries = []
    futures = [database_controller.fetch(query) for query in queries]
    await asyncio.wait(futures, timeout=30) # Connection drops while executing this line
    await database_controller.print_counts() # Prints a queue size smaller than the pool max size when the connection was lost
    await asyncio.sleep(1000) # Interrupting the execution here after waiting a lot more than every timeout set in the code
@elprans
Copy link
Member

elprans commented Nov 1, 2017

This looks similar to a bug in Cython 0.27.1: cython/cython#1907

I mean this bit: AttributeError: 'weakref' object has no attribute 'cline_in_traceback'

@elprans
Copy link
Member

elprans commented Nov 1, 2017

Try building from source using the latest Cython, and see if that helps:

$ pip install Cython && pip install --no-binary asyncpg asyncpg

@GabrielSalla
Copy link
Author

Did what you asked with Cython 0.27.2 and still got the same problem.

@GabrielSalla
Copy link
Author

GabrielSalla commented Nov 1, 2017

This error AttributeError: 'weakref' object has no attribute 'cline_in_traceback' happens only after I interrupt the execution, it doesn't happen if I let it running forever. But also, it happens because a task was left pending and the timeout did never trigger.

@elprans
Copy link
Member

elprans commented Nov 1, 2017

Did what you asked with Cython 0.27.2 and still got the same problem.

can you try doing building directly with make? I think pip install still used the precompiled C file.

@GabrielSalla
Copy link
Author

GabrielSalla commented Nov 1, 2017

When I used pip, I got the following message

pip install --no-binary asyncpg asyncpg
Collecting asyncpg
  Using cached asyncpg-0.13.0.tar.gz
Skipping bdist_wheel for asyncpg, due to binaries being disabled for it.
Installing collected packages: asyncpg
  Running setup.py install for asyncpg ... done
Successfully installed asyncpg-0.13.0

I thought it worked. As I'm using docker make will be a little hard to do, but I'll try.

@GabrielSalla
Copy link
Author

GabrielSalla commented Nov 1, 2017

Used make and installed the module using pip install .. I get this version in pip freeze asyncpg==0.14.0.dev0. Tell me if I did something wrong.

Still got the same problem but now the stack trace was different:

[2017-11-01 16:28:44,446] (ERROR) asyncio: Task was destroyed but it is pending!
task: <Task pending coro=<Pool.release.<locals>._release_impl() running at /usr/local/lib/python3.6/site-packages/asyncpg/pool.py:465> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fec929062e8>()]> cb=[shield.<locals>._done_callback() at /usr/local/lib/python3.6/asyncio/tasks.py:672]>
[2017-11-01 16:28:44,451] (ERROR) asyncio: Task was destroyed but it is pending!
task: <Task pending coro=<Pool.release.<locals>._release_impl() running at /usr/local/lib/python3.6/site-packages/asyncpg/pool.py:465> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fec929066d8>()]> cb=[shield.<locals>._done_callback() at /usr/local/lib/python3.6/asyncio/tasks.py:672]>
[2017-11-01 16:28:44,456] (ERROR) asyncio: Task was destroyed but it is pending!
task: <Task pending coro=<DefaultModule.run() running at ./src/models/module.py:49> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fec92906288>()]>>
[2017-11-01 16:28:44,460] (ERROR) asyncio: Task was destroyed but it is pending!
task: <Task pending coro=<DefaultModule.run() running at ./src/models/module.py:49> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fec92906678>()]>>
[2017-11-01 16:28:44,464] (ERROR) DatabaseController: Traceback (most recent call last):
  File "./src/controllers/database.py", line 92, in fetch
    return await _fetch_data(database, query, *args)

The tasks were destroyed as they did not trigger the timeout, but no AttributeError: 'weakref' object has no attribute 'cline_in_traceback'.

@elprans
Copy link
Member

elprans commented Nov 1, 2017

pip install will not trigger a Cython rebuild. You need to rebuild the wheel manually: clone the source, run make and then python setup.py bdist_wheel. Then use the newly built wheel (it'll be in dist/).

@GabrielSalla
Copy link
Author

Did as you said. Exactly same stack trace.

There's a timeout that is not being triggered, I think it's in the fetch function, but I added it explicitly and no changes.

@elprans
Copy link
Member

elprans commented Nov 1, 2017

The 'cline_in_traceback' thing is what wrecks the normal exception handling, including the timeouts. Would you be able to provide a minimal working script with which this can be reproduced?

@GabrielSalla
Copy link
Author

I'll work on that!

@GabrielSalla
Copy link
Author

Got a code to reproduce the bug

https://github.com/GabrielSalla/asyncpg_test_code

Start the script and wait for some dots to be printed on the screen. As soon the dots start to be printed, drop the connection to the database and wait for the timeout of the await asyncio.wait(futures, timeout=60). This timeout is far bigger than the time the script would take to run normally or any of the other timeouts to be triggered.

As expected, the connections will never return to the pool and there will be the exceptions (including the print of the pool queue size) when the loop is closed.

@elprans
Copy link
Member

elprans commented Nov 2, 2017

Thanks, I'll look into reproducing this on my end.

elprans added a commit that referenced this issue Nov 2, 2017
Connection.close() and Pool.release() each gained the new timeout
parameter.  The pool.acquire() context manager now applies the
passed timeout to __aexit__() as well.

Connection.close() is now actually graceful.  Instead of simply dropping
the connection, it attempts to cancel the running query (if any), asks
the server to terminate the connection and waits for the connection to
terminate.

To test all this properly, implement a TCP proxy, which emulates sudden
connectivity loss (i.e. packets not reaching the server).

Closes: #220
elprans added a commit that referenced this issue Nov 4, 2017
Connection.close() and Pool.release() each gained the new timeout
parameter.  The pool.acquire() context manager now applies the
passed timeout to __aexit__() as well.

Connection.close() is now actually graceful.  Instead of simply dropping
the connection, it attempts to cancel the running query (if any), asks
the server to terminate the connection and waits for the connection to
terminate.

To test all this properly, implement a TCP proxy, which emulates sudden
connectivity loss (i.e. packets not reaching the server).

Closes: #220
elprans added a commit that referenced this issue Nov 4, 2017
Connection.close() and Pool.release() each gained the new timeout
parameter.  The pool.acquire() context manager now applies the
passed timeout to __aexit__() as well.

Connection.close() is now actually graceful.  Instead of simply dropping
the connection, it attempts to cancel the running query (if any), asks
the server to terminate the connection and waits for the connection to
terminate.

To test all this properly, implement a TCP proxy, which emulates sudden
connectivity loss (i.e. packets not reaching the server).

Closes: #220
@GabrielSalla
Copy link
Author

That's awesome! Thanks for all the help :)

@elprans
Copy link
Member

elprans commented Nov 6, 2017

No problem! Thanks for bringing this up.

@GabrielSalla
Copy link
Author

I was testing the new code but the fetch() timed out and the connection didn't return to the pool right away, it was just when the timeout on the acquire() triggered. Shouldn't it be right after the fetch timeout?

@elprans
Copy link
Member

elprans commented Nov 6, 2017

The connection is returned to the pool when __aexit__() on the pool context manager is called. The pool then attempts to "reset" the connection by executing a special query. That execution now has the same timeout as the acquire() method. If reset() times out, the connection is terminated and does not return to the pool, you will get a new connection on the next acquire() call instead.

@elprans elprans added the bug label Nov 6, 2017
elprans added a commit that referenced this issue Nov 15, 2017
Connection.close() and Pool.release() each gained the new timeout
parameter.  The pool.acquire() context manager now applies the
passed timeout to __aexit__() as well.

Connection.close() is now actually graceful.  Instead of simply dropping
the connection, it attempts to cancel the running query (if any), asks
the server to terminate the connection and waits for the connection to
terminate.

To test all this properly, implement a TCP proxy, which emulates sudden
connectivity loss (i.e. packets not reaching the server).

Closes: #220
elprans added a commit that referenced this issue Nov 15, 2017
Connection.close() and Pool.release() each gained the new timeout
parameter.  The pool.acquire() context manager now applies the
passed timeout to __aexit__() as well.

Connection.close() is now actually graceful.  Instead of simply dropping
the connection, it attempts to cancel the running query (if any), asks
the server to terminate the connection and waits for the connection to
terminate.

To test all this properly, implement a TCP proxy, which emulates sudden
connectivity loss (i.e. packets not reaching the server).

Closes: #220
elprans added a commit that referenced this issue Nov 15, 2017
Connection.close() and Pool.release() each gained the new timeout
parameter.  The pool.acquire() context manager now applies the
passed timeout to __aexit__() as well.

Connection.close() is now actually graceful.  Instead of simply dropping
the connection, it attempts to cancel the running query (if any), asks
the server to terminate the connection and waits for the connection to
terminate.

To test all this properly, implement a TCP proxy, which emulates sudden
connectivity loss (i.e. packets not reaching the server).

Closes: #220
elprans added a commit that referenced this issue Nov 15, 2017
Connection.close() and Pool.release() each gained the new timeout
parameter.  The pool.acquire() context manager now applies the
passed timeout to __aexit__() as well.

Connection.close() is now actually graceful.  Instead of simply dropping
the connection, it attempts to cancel the running query (if any), asks
the server to terminate the connection and waits for the connection to
terminate.

To test all this properly, implement a TCP proxy, which emulates sudden
connectivity loss (i.e. packets not reaching the server).

Closes: #220
No Sign up for free to join this conversation on GitHub. Already have an account? No Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants