SQLAlchemy: The Async-ening
Background
SQLAlchemy is one of the seven wonders of the modern Python ecosystem1.
SQLAlchemy recently released 1.4 (2021-03-15) as their first step to a long-term-sustainable SQL ORM architecture. Version 1.4 is an intermediate release until 2.0 can fully remove less modern features of the library2.
Modern SQLAlchemy 1.4 to 2.0+ lets you use fully
async query handling and transparent compiled statement string
caching which can improve your app’s throughput and performance
considerably. The new AsyncEngine and
AsyncSession also means you can now use the great asyncpg
library for
the async engine driver instead of being limited to legacy
psycopg2 (which is just a wrapper around default C
bindings).
Conversion Thoughts
Unfortunately, converting from on-demand blocking query running to
explicit blocking async waitpoints also requires a large conceptual
refactoring of what the SQLAlchemy ORM feels like to work with. Short
version: migrating to 100% async ORM is the equivalent of annotating all
relationships with raiseload
where attempting
to access an unloaded attribute hard crashes your entire query.
Historically, like most ORMs, SQLAlchemy ORM queries were lazily loaded on access. The “generate statement, run query, return result” on dot member access is always great conceptually, but also always awful from a production performance perspective. Developers inevitably end up creating platforms throwing off thousands of queries unexpectedly. Excessive query load ends up hampering user experience due to increased server loads caused by O(N) or worse queries being serially fired per request.
Now, since queries can now be async by default, all of the magic ORM “potentially implicit blocking operations on any dot member access” no longer works, data modeling and data access requires more upfront thought.
With such a change in access semantics from “everything works now,
but it may be bad, and we don’t even know” to “everything must be
pre-retrieved and pre-specified before any data usage,” the problem
becomes: how to upgrade? You can imagine refactoring a 10k+ or 100k+ LOC
ORM-oriented platform relying on dynamic query triggering suddenly
requiring explicit await points before all data loading
points would be a pain, and you would be correct. But we can get through
it.
Conversion Example
First, convert your existing Session to
AsyncSession as described in the SQLAlchemy
asyncio getting started guide.
If you have a User model with
email_addresses as a one-to-many foreign relationship, the
original ORM would generate a second query joined on
email_addresses to populate a list on first access (with
all query-generation points running basically hidden from the developer
unless they had statement debug logging turned on):
user = db.query(User).filter(User.id == 3).first() # <-- SQL Query, creates user object
email = user.email_addresses[0] # <-- SQL Query and list creation of email_address objectsBut using the SQLAlchemy 1.4 AsyncEngine,
you can’t do any IO without an explicit await, so you must
request all accessed relationships on the first query:
stmt = select(User).where(User.id == 3).options(selectinload(User.email_addresses))
result = await db.execute(stmt) # <-- SQL Queries: 1 for User, 1 for email_addresses
user = result.scalar_one_or_none()
email = user.email_addresses[0] # <-- No SQL query here; data was already populated!Careful Note: New Unwrapping Required
Unlike the previous ability to just call .first() or
.one_or_none() at the end of a .query() chain,
.execute() always returns
an AsyncResult, which is an iterable of Rows
which must be unwrapped to obtain ORM objects again (this pattern
also enables more flexible usage if you are just fetching individual
columns and not entire objects
e.g. select(User.id, User.name)).
You must first ask the AsyncResult for
.scalars() then ask for .first() or
.one() or .one_or_none() etc (or, you can
iterate over scalars directly too:
for s in result.scalars(): ...).
SQLAlchemy provides helpers including:
.scalar_one_or_none() and .scalar_one() which
are the same as calling .scalars().one_or_none() etc.
Upgrade Paths
The two easiest upgrade paths seem to be:
- LEAVE IT ALONE
- Cleverly swap
session.Query()forselect()and.filter()for.where()
Upgrade Path: LEAVE IT ALONE
SQLAlchemy anticipated lots of existing serialized Query-based code
already existing and not needing to change much, so they gave us the
very useful async_session.run_sync() coroutine.
.run_sync() is simple to use:
def oldformat(db: Session, arg2):
return db.Query(User).join(UserSession).filter(UserSession.key == arg2).one()
arg2 = "some unique session key"
user_for_session = await async_session.run_sync(oldformat, arg2).run_sync() lets you use existing non-async and even
legacy query-based ORM code directly with your current
AsyncSession.
Instead of needing to modify any of your existing ORM methods,
.run_sync() hides all the async details by providing a
traditional non-async Session object (specially
instrumented to run async operations, via greenlet magic, internally).
The first parameter to the function/method given will be a special
legacy-compatible Session object, then the remaining
arguments to .run_sync() follow.
.run_sync() should be seen as a helpful way to partially
upgrade your codebase from old-style SQLAlchemy to new async 1.4-2.0+
SQLAlchemy without needing a full conversion upfront (but you will still
need to instrument every legacy call point with the
await session.run_sync(func, arg2, arg3, ...) pattern).
Upgrade Path: Actually Upgrade
For basic ORM usage, the upgrade is simple:
session.Query()becomesselect().filter()becomes.where()- Instead of single-fire commands
- build a statement object from the
select() - remove any
.one(),.first(),.one_or_none(), etc from your previous combined statement generation + statement execution line - gather the result from
await session.execute(statement)- NOTE: Previously,
.first()would also apply aLIMIT 1, but this does not happen using theselect()->execute()->result.scalars().first()pattern because the.first()is now applied after the query happens.
- NOTE: Previously,
- run your expected
.scalars().one()or.scalars().first()or.scalars().one_or_none()against the result which is anAsyncResultobject, not on the select statement itself.
- build a statement object from the
Then run your changes and fix anything crashing in the runtime logs.
Common errors you’ll see:
sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called; can't call await_() here. Was IO attempted in an unexpected place?- this means you are attempting to on-demand, lazy-load some attribute
or collection when using the
AsyncEngine, but we can’t async load attributes anymore. - all attributes must be loaded during an original object request
- loading fixes include:
- globally setting a relationship to always load its mapping by
setting
lazy='selectin'on the foreign relationship itself, - or specify explicit join/select conditions per query.
- per-query example:
- FAIL:
select(User).where(User.id == 3)but then you accessuser.email_addresses[3]and get the error above because you didn’t load the external.email_addressesrelationship. - SOLVED: find the missing attributes add them to the
primary object request:
select(User).where(User.id == 3).options(selectinload(User.email_addresses))- now SQLAlchemy will generate a second
SELECTstatement using the primary key ofUserjoined against theemail_addressesattribute and load the result into the returnedUserobject (after it is properly retrieved viaresult = await session.execute(query), user = result.scalars().one())
- now SQLAlchemy will generate a second
- FAIL:
- globally setting a relationship to always load its mapping by
setting
- this means you are attempting to on-demand, lazy-load some attribute
or collection when using the
AttributeError: 'ChunkedIteratorResult' object has no attribute [attribute name]- this just means you are attempting to read the value of
await session.execute(query)directly without extracting any values first
- this just means you are attempting to read the value of
AttributeError: Could not locate column in row for column [column name]- this means you ran
.first()or.one()etc directly on theAsyncResultwithout extracting.scalars() - solution: don’t run
result.one()etc, runresult.scalars().one()orresult.scalar_one_or_none() - also see: all
AsyncScalarResultaccess methods
- this means you ran
Conclusion
That’s a good start to updating your previously 100% auto-blocking-on-member-access SQLAlchemy ORM to a more modern async-driven approach.
There are of course tradeoffs when going to the async-only approach if you’re upgrading a sizable existing codebase (as you’ll discover every time you accessed a lazy-loaded element from a web template you now have to search out and destroy).
Are the changes worth it? If your queries are large and introduce
latency spikes due to client concurrency limits, you’ll be able to
increase your app server concurrency due to having many of the
requests/sessions now just sleeping in async wait states waiting for the
off-server DB processing to return. You also get the added benefits of
now being able to use asyncpg with its better internal prepared
statement cache architecture as well as using better
JSON deserialization loading.
go forth and upgrade ur sqls
modern python wonders, in no specific order:
- sqlalchemy
- pydantic / fastapi / starlette
- including all modern python features enabling it:
- dataclasses
- async (including aiohttp and aiofiles)
- annotations / Annotated / type hints / typing
- plus h*ckin mypy / pytype / vscode real time linting / type checking
- including all modern python features enabling it:
- PyPy3.7+
- with reservations, like hopefully you never call out to C bindings
which can cause unlimited memory bloat
- try it yourself under PyPy:
%timeit hashlib.md5(b"hello").hexdigest() %timeit xxhash.xxh128(b"hello").hexdigest()
- with reservations, like hopefully you never call out to C bindings
which can cause unlimited memory bloat
- numpy / pandas / numba / dask / distributed
- plus the NVIDIA RAPIDS collection enabling transparent distributed GPU offload for dataframe calculations
- scikit-learn / scikit-image / scipy / jupyter / matplotlib
- orjson
- loguru
- pyproject / poetry for venv management and automated package building
like removing some features we maybe shouldn’t have been using, but worked their way into our systems anyway, like nested subtransactions whoops↩︎