SQLAlchemy

O módulo SQLAlchemy facilita a configuração de conexões com bancos de dados, provendo AsyncEngine e async_sessionmaker como serviçoes no contexto de injeção de dependências, como também AsyncSession associado à requisição atual.

Utilização

Instale o extra sqlalchemy e um driver de banco de dados com suporte a async:

$ pip install zayt[sqlalchemy] aiosqlite asyncpg aiomysql oracledb

Com o driver de banco de dados instalado, nós podemos definir as conexões no arquivo de configuração:

settings = {
    "modules": [
        "zayt.ext.sqlalchemy" # (1)
    ],
    "sqlalchemy": {
        "connections": {
            "": { # (2)
                "url": "sqlite+aiosqlite:///var/db.sqlite3"
            },
            "postgres": { # (3)
                "url": "postgresql+asyncpg://user:pass@localhost/dbname"
            },
            "mysql": { # (4)
                "url": "mysql+aiomysql://user:pass@localhost/dbname"
            },
            "oracle": { # (5)
                "url": "oracle+oracledb_async://user:pass@localhost/DBNAME"
                # "oracle+oracledb_async://user:pass@localhost/?service_name=DBNAME"
            },
        },
    },
}

Nota

  1. Ativar a extensão sqlalchemy

  2. Connection will be registered without a name

  3. Conexão registrada com nome “postgres”

  4. Conexão registrada com nome “mysql”

  5. Conexão registrada com nome “oracle”

Once we define the connections, we can inject AsyncEngine into our services. For each connection, an instance of AsyncEngine will be registered, the "" connection will be registered without a name, and the others will be registered with their respective names.

from typing import Annotated
from sqlalchemy.ext.asyncio import AsyncEngine
from zayt.di import service, Inject


@service
class MyService:
    # default service
    engine: Annotated[AsyncEngine, Inject]

    # named services
    engine_postgres: Annotated[AsyncEngine, Inject("postgres")]
    engine_mysql: Annotated[AsyncEngine, Inject("mysql")]
    engine_oracle: Annotated[AsyncEngine, Inject("oracle")]

Sessão em Escopo

Nos handlers o AsyncSession pode ser injetado como um serviço no escopo da requisição atual. Cada requisição será injetada com seu próprio objeto de sessão:

from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from asgikit import Request
from zayt.web import get


@get
async def handler(request: Request, session: AsyncSession):
    result = await session.scalar(text("select 'ok'"))
    await request.respond_text(result)

Configuração

Conexões com bancos de dados também podem ser definidas com usuário e senha separados da url, ou então como componentes individuais:

settings = {
    "sqlalchemy": {
        "connections": {
            "": {
                "drivername": "sqlite+aiosqlite",
                "database": "/var/db.sqlite3",
            },

            "postgres": {  # (1)
                "url": "postgresql+asyncpg://localhost/dbname",
                "username": "user",
                "password": "pass",
            },

            "mysql": {  # (2)
                "drivername": "mysql+aiomysql",
                "host": "localhost",
                "port": 3306,
                "database": "dbname",
                "username": "user",
                "password": "pass",
            },

            "oracle": {  # (3)
                "drivername": "oracle+oracledb_async",
                "host": "localhost",
                "port": 1521,
                "username": "user",
                "password": "pass",
                "query": {"service_name": "DBNAME"},
            }
        },
    },
}

Nota

  1. Usuário e senha separados da url do banco de dados

  2. Cada componente definido individualmente

  3. Parâmetros de query podem ser definidos em um dict

Trabalhando com async_sessionmaker

Different from the AsyncEngine, only one async_sessionmaker is created. We can bind specific subclasses of DeclarativeBase through the sqlalchemy.session.binds configuration, otherwise it is bound to the default connection.

application/model.py
from sqlalchemy.orm import DeclarativeBase


class Base(DeclarativeBase):
    pass


class OtherBase(DeclarativeBase):
    pass
configuration/settings.py
settings = {
    "sqlalchemy": {
        "connections": {
            "": {
                "url": "sqlite+aiosqlite://db1.sqlite3",
            },
            "other": {
                "url": "sqlite+aiosqlite://db2.sqlite3",
            },
        },
        "session": {
            "binds": {
                "application.model.Base": "",
                "application.model.OtherBase": "other",
            },
        },
    },
}

Exemplo

application/handler.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncEngine, AsyncSession

from asgikit import Request
from zayt.web import get

from .model import Base, MyModel


@get
async def index(
    request: Request,
    engine: AsyncEngine,
    sessionmaker: async_sessionmaker,
    session: AsyncSession,
):
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    async with sessionmaker() as session:
        my_model = MyModel(name="MyModel")
        session.add(my_model)
        await session.commit()

    my_model = await session.scalar(select(MyModel).limit(1))

    await request.respond_json({
        "id": my_model.id,
        "name": my_model.name,
    })
application/model.py
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import DeclarativeBase


class Base(DeclarativeBase):
    pass


class MyModel(Base):
    __tablename__ = 'my_model'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(length=100))

    def __repr__(self):
        return f"<MyModel(id={self.id}, name={self.name})>"
configuration/settings.py
settings = {
    "sqlalchemy": {
        "url": "sqlite+aiosqlite://:memory:",
    },
}

Opções de configuração

As opções disponíveis são mostradas abaixo:

settings = {
    "sqlalchemy": {
        "session": {
            "binds": {  # (1)
                "application.model.Base": "",
                "application.model.OtherBase": "other",
            },
            "options": {  # (2)
                "class": "sqlalchemy.ext.asyncio.AsyncSession",
                "autoflush": True,
                "expire_on_commit": True,
                "autobegin": True,
                "twophase": False,
                "enable_baked_queries": True,
                "info": {},
                "query_cls": QueryClass,
                "join_transaction_mode": "conditional_savepoint", # "rollback_only", "control_fully", "create_savepoint"
                "close_resets_only": None,
            },
        },
        "connections": {
            "": {
                "url": "",
                "host": "localhost",
                "port": 5432,
                "database": "dbname",
                "username": "user",
                "password": "pass",
                "options": {  # (3)
                    "connect_args": {},
                    "echo": False,
                    "echo_pool": False,
                    "enable_from_linting": False,
                    "hide_parameters": False,
                    "insertmanyvalues_page_size": 1,
                    "isolation_level": "",
                    "json_deserializer": json.loads,
                    "json_serializer": json.dumps,
                    "label_length": 1,
                    "logging_name": "",
                    "max_identifier_length": 1,
                    "max_overflow": 1,
                    "module": "",
                    "paramstyle": "qmark",  # "numeric", "named", "format", "pyformat"
                    "poolclass": PoolClass,  # dotted path to the pool class
                    "pool_logging_name": "",
                    "pool_pre_ping": False,
                    "pool_size": 1,
                    "pool_recycle": 3600,
                    "pool_reset_on_return": "rollback",  # "commit"
                    "pool_timeout": 1,
                    "pool_use_lifo": False,
                    "plugins": [],
                    "query_cache_size": 1,
                    "use_insertmanyvalues": False,
                    "execution_options": {  # (4)
                        "logging_token": "",
                        "isolation_level": "",
                        "no_parameters": False,
                        "stream_results": False,
                        "max_row_buffer": 1,
                        "yield_per": 1,
                        "insertmanyvalues_page_size": 1,
                        "schema_translate_map": {
                            "null": "my_schema",
                            "some_schema": "other_schema",
                        },
                    },
                },
            },
        },
    },
}

Nota

  1. Associa subclasses de DeclarativeBase a nomes de conexões definidos em connections

  2. Valores de session.options são descritos em sqlalchemy.orm.Session

  3. Valores de connection.options são descritos em sqlalchemy.create_engine

  4. Valores de execution_options são descritos em sqlalchemy.engine.Connection.execution_options