SQLAlchemy

The SQLAlchemy module makes it easy to set up database connections, providing AsyncEngine and async_sessionmaker as services in the dependency injection context, as well as AsyncSession tied to the current request.

Usage

Install sqlalchemy extra and a database driver that supports async:

$ pip install zayt[sqlalchemy] aiosqlite asyncpg aiomysql oracledb

With database drivers are installed, we can define the connections in the configuration file:

settings = {
    "modules": [
        "zayt.ext.sqlalchemy" # (1)
    ],
    "sqlalchemy": {
        "connections": {
            "": { # (2)
                "url": "sqlite+aiosqlite:///var/db.sqlite3"
            },
            "postgres": { # (3)
                "url": "postgresql+asyncpg://user:pass@localhost/dbname"
            },
            "mysql": { # (4)
                "url": "mysql+aiomysql://user:pass@localhost/dbname"
            },
            "oracle": { # (5)
                "url": "oracle+oracledb_async://user:pass@localhost/DBNAME"
                # "oracle+oracledb_async://user:pass@localhost/?service_name=DBNAME"
            },
        },
    },
}

Note

  1. Activate the sqlalchemy module

  2. Connection will be registered without a name

  3. Connection registered with name “postgres”

  4. Connection registered with name “mysql”

  5. Connection registered with name “oracle”

Once we define the connections, we can inject AsyncEngine into our services. For each connection, an instance of AsyncEngine will be registered, the "" connection will be registered without a name, and the others will be registered with their respective names.

from typing import Annotated
from sqlalchemy.ext.asyncio import AsyncEngine
from zayt.di import service, Inject


@service
class MyService:
    # default service
    engine: Annotated[AsyncEngine, Inject]

    # named services
    engine_postgres: Annotated[AsyncEngine, Inject("postgres")]
    engine_mysql: Annotated[AsyncEngine, Inject("mysql")]
    engine_oracle: Annotated[AsyncEngine, Inject("oracle")]

Scoped Session

In handlers, the AsyncSession can be injected as a service scoped to the current request. Each request will be injected its own session object:

from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from asgikit import Request
from zayt.web import get


@get
async def handler(request: Request, session: AsyncSession):
    result = await session.scalar(text("select 'ok'"))
    await request.respond_text(result)

Configuration

Database connections can also be defined with username and password separated from the url, or even with individual components:

settings = {
    "sqlalchemy": {
        "connections": {
            "": {
                "drivername": "sqlite+aiosqlite",
                "database": "/var/db.sqlite3",
            },

            "postgres": {  # (1)
                "url": "postgresql+asyncpg://localhost/dbname",
                "username": "user",
                "password": "pass",
            },

            "mysql": {  # (2)
                "drivername": "mysql+aiomysql",
                "host": "localhost",
                "port": 3306,
                "database": "dbname",
                "username": "user",
                "password": "pass",
            },

            "oracle": {  # (3)
                "drivername": "oracle+oracledb_async",
                "host": "localhost",
                "port": 1521,
                "username": "user",
                "password": "pass",
                "query": {"service_name": "DBNAME"},
            }
        },
    },
}

Note

  1. Username and password separated from the database url

  2. Each component defined individually

  3. Query parameters can be defined in a dict

Working with async_sessionmaker

Different from the AsyncEngine, only one async_sessionmaker is created. We can bind specific subclasses of DeclarativeBase through the sqlalchemy.session.binds configuration, otherwise it is bound to the default connection.

application/model.py
from sqlalchemy.orm import DeclarativeBase


class Base(DeclarativeBase):
    pass


class OtherBase(DeclarativeBase):
    pass
configuration/settings.py
settings = {
    "sqlalchemy": {
        "connections": {
            "": {
                "url": "sqlite+aiosqlite://db1.sqlite3",
            },
            "other": {
                "url": "sqlite+aiosqlite://db2.sqlite3",
            },
        },
        "session": {
            "binds": {
                "application.model.Base": "",
                "application.model.OtherBase": "other",
            },
        },
    },
}

Example

application/handler.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncEngine, AsyncSession

from asgikit import Request
from zayt.web import get

from .model import Base, MyModel


@get
async def index(
    request: Request,
    engine: AsyncEngine,
    sessionmaker: async_sessionmaker,
    session: AsyncSession,
):
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    async with sessionmaker() as session:
        my_model = MyModel(name="MyModel")
        session.add(my_model)
        await session.commit()

    my_model = await session.scalar(select(MyModel).limit(1))

    await request.respond_json({
        "id": my_model.id,
        "name": my_model.name,
    })
application/model.py
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import DeclarativeBase


class Base(DeclarativeBase):
    pass


class MyModel(Base):
    __tablename__ = 'my_model'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(length=100))

    def __repr__(self):
        return f"<MyModel(id={self.id}, name={self.name})>"
configuration/settings.py
settings = {
    "sqlalchemy": {
        "url": "sqlite+aiosqlite://:memory:",
    },
}

Configuration options

The available options are shown below:

settings = {
    "sqlalchemy": {
        "session": {
            "binds": {  # (1)
                "application.model.Base": "",
                "application.model.OtherBase": "other",
            },
            "options": {  # (2)
                "class": "sqlalchemy.ext.asyncio.AsyncSession",
                "autoflush": True,
                "expire_on_commit": True,
                "autobegin": True,
                "twophase": False,
                "enable_baked_queries": True,
                "info": {},
                "query_cls": QueryClass,
                "join_transaction_mode": "conditional_savepoint", # "rollback_only", "control_fully", "create_savepoint"
                "close_resets_only": None,
            },
        },
        "connections": {
            "": {
                "url": "",
                "host": "localhost",
                "port": 5432,
                "database": "dbname",
                "username": "user",
                "password": "pass",
                "options": {  # (3)
                    "connect_args": {},
                    "echo": False,
                    "echo_pool": False,
                    "enable_from_linting": False,
                    "hide_parameters": False,
                    "insertmanyvalues_page_size": 1,
                    "isolation_level": "",
                    "json_deserializer": json.loads,
                    "json_serializer": json.dumps,
                    "label_length": 1,
                    "logging_name": "",
                    "max_identifier_length": 1,
                    "max_overflow": 1,
                    "module": "",
                    "paramstyle": "qmark",  # "numeric", "named", "format", "pyformat"
                    "poolclass": PoolClass,  # dotted path to the pool class
                    "pool_logging_name": "",
                    "pool_pre_ping": False,
                    "pool_size": 1,
                    "pool_recycle": 3600,
                    "pool_reset_on_return": "rollback",  # "commit"
                    "pool_timeout": 1,
                    "pool_use_lifo": False,
                    "plugins": [],
                    "query_cache_size": 1,
                    "use_insertmanyvalues": False,
                    "execution_options": {  # (4)
                        "logging_token": "",
                        "isolation_level": "",
                        "no_parameters": False,
                        "stream_results": False,
                        "max_row_buffer": 1,
                        "yield_per": 1,
                        "insertmanyvalues_page_size": 1,
                        "schema_translate_map": {
                            "null": "my_schema",
                            "some_schema": "other_schema",
                        },
                    },
                },
            },
        },
    },
}

Note

  1. Binds subclasses of DeclarativeBase to connection names defined in connections

  2. session.options values are describe in sqlalchemy.orm.Session

  3. connection.options values are described in sqlalchemy.create_engine

  4. execution_options values are describe in sqlalchemy.engine.Connection.execution_options