SQLAlchemy: The Async-ening
Background
SQLAlchemy is one of the seven wonders of the modern Python ecosystem1.
SQLAlchemy recently released 1.4 (2021-03-15) as their first step to a long-term-sustainable SQL ORM architecture. Version 1.4 is an intermediate release until 2.0 can fully remove less modern features of the library2.
Modern SQLAlchemy 1.4 to 2.0+ lets you use fully async query handling and transparent compiled statement string caching which can improve your app’s throughput and performance considerably. The new AsyncEngine
and AsyncSession
also means you can now use the great asyncpg
library for the async engine driver instead of being limited to legacy psycopg2
(which is just a wrapper around default C bindings).
Conversion Thoughts
Unfortunately, converting from on-demand blocking query running to explicit blocking async waitpoints also requires a large conceptual refactoring of what the SQLAlchemy ORM feels like to work with. Short version: migrating to 100% async ORM is the equivalent of annotating all relationships with raiseload
where attempting to access an unloaded attribute hard crashes your entire query.
Historically, like most ORMs, SQLAlchemy ORM queries were lazily loaded on access. The “generate statement, run query, return result” on dot member access is always great conceptually, but also always awful from a production performance perspective. Developers inevitably end up creating platforms throwing off thousands of queries unexpectedly. Excessive query load ends up hampering user experience due to increased server loads caused by O(N) or worse queries being serially fired per request.
Now, since queries can now be async by default, all of the magic ORM “potentially implicit blocking operations on any dot member access” no longer works, data modeling and data access requires more upfront thought.
With such a change in access semantics from “everything works now, but it may be bad, and we don’t even know” to “everything must be pre-retrieved and pre-specified before any data usage,” the problem becomes: how to upgrade? You can imagine refactoring a 10k+ or 100k+ LOC ORM-oriented platform relying on dynamic query triggering suddenly requiring explicit await
points before all data loading points would be a pain, and you would be correct. But we can get through it.
Conversion Example
First, convert your existing Session
to AsyncSession
as described in the SQLAlchemy asyncio getting started guide.
If you have a User
model with email_addresses
as a one-to-many foreign relationship, the original ORM would generate a second query joined on email_addresses
to populate a list on first access (with all query-generation points running basically hidden from the developer unless they had statement debug logging turned on):
user = db.query(User).filter(User.id == 3).first() # <-- SQL Query, creates user object
email = user.email_addresses[0] # <-- SQL Query and list creation of email_address objects
But using the SQLAlchemy 1.4 AsyncEngine
, you can’t do any IO without an explicit await
, so you must request all accessed relationships on the first query:
stmt = select(User).where(User.id == 3).options(selectinload(User.email_addresses))
result = await db.execute(stmt) # <-- SQL Queries: 1 for User, 1 for email_addresses
user = result.scalar_one_or_none()
email = user.email_addresses[0] # <-- No SQL query here; data was already populated!
Careful Note: New Unwrapping Required
Unlike the previous ability to just call .first()
or .one_or_none()
at the end of a .query()
chain, .execute()
always returns an AsyncResult
, which is an iterable of Row
s which must be unwrapped to obtain ORM objects again (this pattern also enables more flexible usage if you are just fetching individual columns and not entire objects e.g. select(User.id, User.name)
).
You must first ask the AsyncResult
for .scalars()
then ask for .first()
or .one()
or .one_or_none()
etc (or, you can iterate over scalars directly too: for s in result.scalars(): ...
).
SQLAlchemy provides helpers including: .scalar_one_or_none()
and .scalar_one()
which are the same as calling .scalars().one_or_none()
etc.
Upgrade Paths
The two easiest upgrade paths seem to be:
- LEAVE IT ALONE
- Cleverly swap
session.Query()
forselect()
and.filter()
for.where()
Upgrade Path: LEAVE IT ALONE
SQLAlchemy anticipated lots of existing serialized Query-based code already existing and not needing to change much, so they gave us the very useful async_session.run_sync()
coroutine.
.run_sync()
is simple to use:
def oldformat(db: Session, arg2):
return db.Query(User).join(UserSession).filter(UserSession.key == arg2).one()
arg2 = "some unique session key"
user_for_session = await async_session.run_sync(oldformat, arg2)
.run_sync()
lets you use existing non-async and even legacy query-based ORM code directly with your current AsyncSession
.
Instead of needing to modify any of your existing ORM methods, .run_sync()
hides all the async details by providing a traditional non-async Session
object (specially instrumented to run async operations, via greenlet magic, internally). The first parameter to the function/method given will be a special legacy-compatible Session
object, then the remaining arguments to .run_sync()
follow.
.run_sync()
should be seen as a helpful way to partially upgrade your codebase from old-style SQLAlchemy to new async 1.4-2.0+ SQLAlchemy without needing a full conversion upfront (but you will still need to instrument every legacy call point with the await session.run_sync(func, arg2, arg3, ...)
pattern).
Upgrade Path: Actually Upgrade
For basic ORM usage, the upgrade is simple:
session.Query()
becomesselect()
.filter()
becomes.where()
- Instead of single-fire commands
- build a statement object from the
select()
- remove any
.one()
,.first()
,.one_or_none()
, etc from your previous combined statement generation + statement execution line - gather the result from
await session.execute(statement)
- NOTE: Previously,
.first()
would also apply aLIMIT 1
, but this does not happen using theselect()
->execute()
->result.scalars().first()
pattern because the.first()
is now applied after the query happens.
- NOTE: Previously,
- run your expected
.scalars().one()
or.scalars().first()
or.scalars().one_or_none()
against the result which is anAsyncResult
object, not on the select statement itself.
- build a statement object from the
Then run your changes and fix anything crashing in the runtime logs.
Common errors you’ll see:
sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called; can't call await_() here. Was IO attempted in an unexpected place?
- this means you are attempting to on-demand, lazy-load some attribute or collection when using the
AsyncEngine
, but we can’t async load attributes anymore. - all attributes must be loaded during an original object request
- loading fixes include:
- globally setting a relationship to always load its mapping by setting
lazy='selectin'
on the foreign relationship itself, - or specify explicit join/select conditions per query.
- per-query example:
- FAIL:
select(User).where(User.id == 3)
but then you accessuser.email_addresses[3]
and get the error above because you didn’t load the external.email_addresses
relationship. - SOLVED: find the missing attributes add them to the primary object request:
select(User).where(User.id == 3).options(selectinload(User.email_addresses))
- now SQLAlchemy will generate a second
SELECT
statement using the primary key ofUser
joined against theemail_addresses
attribute and load the result into the returnedUser
object (after it is properly retrieved viaresult = await session.execute(query), user = result.scalars().one()
)
- now SQLAlchemy will generate a second
- FAIL:
- globally setting a relationship to always load its mapping by setting
- this means you are attempting to on-demand, lazy-load some attribute or collection when using the
AttributeError: 'ChunkedIteratorResult' object has no attribute [attribute name]
- this just means you are attempting to read the value of
await session.execute(query)
directly without extracting any values first
- this just means you are attempting to read the value of
AttributeError: Could not locate column in row for column [column name]
- this means you ran
.first()
or.one()
etc directly on theAsyncResult
without extracting.scalars()
- solution: don’t run
result.one()
etc, runresult.scalars().one()
orresult.scalar_one_or_none()
- also see: all
AsyncScalarResult
access methods
- this means you ran
Conclusion
That’s a good start to updating your previously 100% auto-blocking-on-member-access SQLAlchemy ORM to a more modern async-driven approach.
There are of course tradeoffs when going to the async-only approach if you’re upgrading a sizable existing codebase (as you’ll discover every time you accessed a lazy-loaded element from a web template you now have to search out and destroy).
Are the changes worth it? If your queries are large and introduce latency spikes due to client concurrency limits, you’ll be able to increase your app server concurrency due to having many of the requests/sessions now just sleeping in async wait states waiting for the off-server DB processing to return. You also get the added benefits of now being able to use asyncpg
with its better internal prepared statement cache architecture as well as using better JSON deserialization loading.
go forth and upgrade ur sqls
modern python wonders, in no specific order:
- sqlalchemy
- pydantic / fastapi / starlette
- including all modern python features enabling it:
- dataclasses
- async (including aiohttp and aiofiles)
- annotations / Annotated / type hints / typing
- plus h*ckin mypy / pytype / vscode real time linting / type checking
- including all modern python features enabling it:
- PyPy3.7+
- with reservations, like hopefully you never call out to C bindings which can cause unlimited memory bloat
- try it yourself under PyPy:
- with reservations, like hopefully you never call out to C bindings which can cause unlimited memory bloat
- numpy / pandas / numba / dask / distributed
- plus the NVIDIA RAPIDS collection enabling transparent distributed GPU offload for dataframe calculations
- scikit-learn / scikit-image / scipy / jupyter / matplotlib
- orjson
- loguru
- pyproject / poetry for venv management and automated package building
like removing some features we maybe shouldn’t have been using, but worked their way into our systems anyway, like nested subtransactions whoops↩