SQLAlchemy¶
The SQLAlchemy module makes it easy to set up database connections, providing
AsyncEngine and async_sessionmaker
as services in the dependency injection context, as well as AsyncSession
tied to the current request.
Usage¶
Install sqlalchemy extra and a database driver that supports async:
$ pip install zayt[sqlalchemy] aiosqlite asyncpg aiomysql oracledb
With database drivers are installed, we can define the connections in the configuration file:
settings = {
"modules": [
"zayt.ext.sqlalchemy" # (1)
],
"sqlalchemy": {
"connections": {
"": { # (2)
"url": "sqlite+aiosqlite:///var/db.sqlite3"
},
"postgres": { # (3)
"url": "postgresql+asyncpg://user:pass@localhost/dbname"
},
"mysql": { # (4)
"url": "mysql+aiomysql://user:pass@localhost/dbname"
},
"oracle": { # (5)
"url": "oracle+oracledb_async://user:pass@localhost/DBNAME"
# "oracle+oracledb_async://user:pass@localhost/?service_name=DBNAME"
},
},
},
}
Note
Activate the sqlalchemy module
Connection will be registered without a name
Connection registered with name “postgres”
Connection registered with name “mysql”
Connection registered with name “oracle”
Once we define the connections, we can inject AsyncEngine
into our services. For each connection, an instance of AsyncEngine
will be registered, the "" connection will be registered without a name, and
the others will be registered with their respective names.
from typing import Annotated
from sqlalchemy.ext.asyncio import AsyncEngine
from zayt.di import service, Inject
@service
class MyService:
# default service
engine: Annotated[AsyncEngine, Inject]
# named services
engine_postgres: Annotated[AsyncEngine, Inject("postgres")]
engine_mysql: Annotated[AsyncEngine, Inject("mysql")]
engine_oracle: Annotated[AsyncEngine, Inject("oracle")]
Scoped Session¶
In handlers, the AsyncSession can be injected as a service scoped to the current
request. Each request will be injected its own session object:
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from asgikit import Request
from zayt.web import get
@get
async def handler(request: Request, session: AsyncSession):
result = await session.scalar(text("select 'ok'"))
await request.respond_text(result)
Configuration¶
Database connections can also be defined with username and password separated from the url, or even with individual components:
settings = {
"sqlalchemy": {
"connections": {
"": {
"drivername": "sqlite+aiosqlite",
"database": "/var/db.sqlite3",
},
"postgres": { # (1)
"url": "postgresql+asyncpg://localhost/dbname",
"username": "user",
"password": "pass",
},
"mysql": { # (2)
"drivername": "mysql+aiomysql",
"host": "localhost",
"port": 3306,
"database": "dbname",
"username": "user",
"password": "pass",
},
"oracle": { # (3)
"drivername": "oracle+oracledb_async",
"host": "localhost",
"port": 1521,
"username": "user",
"password": "pass",
"query": {"service_name": "DBNAME"},
}
},
},
}
Note
Username and password separated from the database url
Each component defined individually
Query parameters can be defined in a dict
Working with async_sessionmaker¶
Different from the AsyncEngine, only one
async_sessionmaker is created. We can bind specific
subclasses of DeclarativeBase through the sqlalchemy.session.binds
configuration, otherwise it is bound to the default connection.
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class OtherBase(DeclarativeBase):
pass
settings = {
"sqlalchemy": {
"connections": {
"": {
"url": "sqlite+aiosqlite://db1.sqlite3",
},
"other": {
"url": "sqlite+aiosqlite://db2.sqlite3",
},
},
"session": {
"binds": {
"application.model.Base": "",
"application.model.OtherBase": "other",
},
},
},
}
Example¶
from sqlalchemy import select
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncEngine, AsyncSession
from asgikit import Request
from zayt.web import get
from .model import Base, MyModel
@get
async def index(
request: Request,
engine: AsyncEngine,
sessionmaker: async_sessionmaker,
session: AsyncSession,
):
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with sessionmaker() as session:
my_model = MyModel(name="MyModel")
session.add(my_model)
await session.commit()
my_model = await session.scalar(select(MyModel).limit(1))
await request.respond_json({
"id": my_model.id,
"name": my_model.name,
})
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class MyModel(Base):
__tablename__ = 'my_model'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(length=100))
def __repr__(self):
return f"<MyModel(id={self.id}, name={self.name})>"
settings = {
"sqlalchemy": {
"url": "sqlite+aiosqlite://:memory:",
},
}
Configuration options¶
The available options are shown below:
settings = {
"sqlalchemy": {
"session": {
"binds": { # (1)
"application.model.Base": "",
"application.model.OtherBase": "other",
},
"options": { # (2)
"class": "sqlalchemy.ext.asyncio.AsyncSession",
"autoflush": True,
"expire_on_commit": True,
"autobegin": True,
"twophase": False,
"enable_baked_queries": True,
"info": {},
"query_cls": QueryClass,
"join_transaction_mode": "conditional_savepoint", # "rollback_only", "control_fully", "create_savepoint"
"close_resets_only": None,
},
},
"connections": {
"": {
"url": "",
"host": "localhost",
"port": 5432,
"database": "dbname",
"username": "user",
"password": "pass",
"options": { # (3)
"connect_args": {},
"echo": False,
"echo_pool": False,
"enable_from_linting": False,
"hide_parameters": False,
"insertmanyvalues_page_size": 1,
"isolation_level": "",
"json_deserializer": json.loads,
"json_serializer": json.dumps,
"label_length": 1,
"logging_name": "",
"max_identifier_length": 1,
"max_overflow": 1,
"module": "",
"paramstyle": "qmark", # "numeric", "named", "format", "pyformat"
"poolclass": PoolClass, # dotted path to the pool class
"pool_logging_name": "",
"pool_pre_ping": False,
"pool_size": 1,
"pool_recycle": 3600,
"pool_reset_on_return": "rollback", # "commit"
"pool_timeout": 1,
"pool_use_lifo": False,
"plugins": [],
"query_cache_size": 1,
"use_insertmanyvalues": False,
"execution_options": { # (4)
"logging_token": "",
"isolation_level": "",
"no_parameters": False,
"stream_results": False,
"max_row_buffer": 1,
"yield_per": 1,
"insertmanyvalues_page_size": 1,
"schema_translate_map": {
"null": "my_schema",
"some_schema": "other_schema",
},
},
},
},
},
},
}
Note
Binds subclasses of
DeclarativeBaseto connection names defined inconnectionssession.optionsvalues are describe in sqlalchemy.orm.Sessionconnection.optionsvalues are described in sqlalchemy.create_engineexecution_optionsvalues are describe in sqlalchemy.engine.Connection.execution_options