SQLAlchemy¶
O módulo SQLAlchemy facilita a configuração de conexões com bancos de dados, provendo AsyncEngine e async_sessionmaker como serviçoes no contexto de injeção de dependências, como também AsyncSession associado à requisição atual.
Utilização¶
Instale o extra sqlalchemy e um driver de banco de dados com suporte a async:
$ pip install zayt[sqlalchemy] aiosqlite asyncpg aiomysql oracledb
Com o driver de banco de dados instalado, nós podemos definir as conexões no arquivo de configuração:
settings = {
"modules": [
"zayt.ext.sqlalchemy" # (1)
],
"sqlalchemy": {
"connections": {
"": { # (2)
"url": "sqlite+aiosqlite:///var/db.sqlite3"
},
"postgres": { # (3)
"url": "postgresql+asyncpg://user:pass@localhost/dbname"
},
"mysql": { # (4)
"url": "mysql+aiomysql://user:pass@localhost/dbname"
},
"oracle": { # (5)
"url": "oracle+oracledb_async://user:pass@localhost/DBNAME"
# "oracle+oracledb_async://user:pass@localhost/?service_name=DBNAME"
},
},
},
}
Nota
Ativar a extensão sqlalchemy
Connection will be registered without a name
Conexão registrada com nome “postgres”
Conexão registrada com nome “mysql”
Conexão registrada com nome “oracle”
Once we define the connections, we can inject AsyncEngine
into our services. For each connection, an instance of AsyncEngine
will be registered, the "" connection will be registered without a name, and
the others will be registered with their respective names.
from typing import Annotated
from sqlalchemy.ext.asyncio import AsyncEngine
from zayt.di import service, Inject
@service
class MyService:
# default service
engine: Annotated[AsyncEngine, Inject]
# named services
engine_postgres: Annotated[AsyncEngine, Inject("postgres")]
engine_mysql: Annotated[AsyncEngine, Inject("mysql")]
engine_oracle: Annotated[AsyncEngine, Inject("oracle")]
Sessão em Escopo¶
Nos handlers o AsyncSession pode ser injetado como um serviço no escopo da requisição atual. Cada requisição será injetada com seu próprio objeto de sessão:
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from asgikit import Request
from zayt.web import get
@get
async def handler(request: Request, session: AsyncSession):
result = await session.scalar(text("select 'ok'"))
await request.respond_text(result)
Configuração¶
Conexões com bancos de dados também podem ser definidas com usuário e senha separados da url, ou então como componentes individuais:
settings = {
"sqlalchemy": {
"connections": {
"": {
"drivername": "sqlite+aiosqlite",
"database": "/var/db.sqlite3",
},
"postgres": { # (1)
"url": "postgresql+asyncpg://localhost/dbname",
"username": "user",
"password": "pass",
},
"mysql": { # (2)
"drivername": "mysql+aiomysql",
"host": "localhost",
"port": 3306,
"database": "dbname",
"username": "user",
"password": "pass",
},
"oracle": { # (3)
"drivername": "oracle+oracledb_async",
"host": "localhost",
"port": 1521,
"username": "user",
"password": "pass",
"query": {"service_name": "DBNAME"},
}
},
},
}
Nota
Usuário e senha separados da url do banco de dados
Cada componente definido individualmente
Parâmetros de query podem ser definidos em um dict
Trabalhando com async_sessionmaker¶
Different from the AsyncEngine, only one
async_sessionmaker is created. We can bind specific
subclasses of DeclarativeBase through the sqlalchemy.session.binds
configuration, otherwise it is bound to the default connection.
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class OtherBase(DeclarativeBase):
pass
settings = {
"sqlalchemy": {
"connections": {
"": {
"url": "sqlite+aiosqlite://db1.sqlite3",
},
"other": {
"url": "sqlite+aiosqlite://db2.sqlite3",
},
},
"session": {
"binds": {
"application.model.Base": "",
"application.model.OtherBase": "other",
},
},
},
}
Exemplo¶
from sqlalchemy import select
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncEngine, AsyncSession
from asgikit import Request
from zayt.web import get
from .model import Base, MyModel
@get
async def index(
request: Request,
engine: AsyncEngine,
sessionmaker: async_sessionmaker,
session: AsyncSession,
):
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with sessionmaker() as session:
my_model = MyModel(name="MyModel")
session.add(my_model)
await session.commit()
my_model = await session.scalar(select(MyModel).limit(1))
await request.respond_json({
"id": my_model.id,
"name": my_model.name,
})
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class MyModel(Base):
__tablename__ = 'my_model'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(length=100))
def __repr__(self):
return f"<MyModel(id={self.id}, name={self.name})>"
settings = {
"sqlalchemy": {
"url": "sqlite+aiosqlite://:memory:",
},
}
Opções de configuração¶
As opções disponíveis são mostradas abaixo:
settings = {
"sqlalchemy": {
"session": {
"binds": { # (1)
"application.model.Base": "",
"application.model.OtherBase": "other",
},
"options": { # (2)
"class": "sqlalchemy.ext.asyncio.AsyncSession",
"autoflush": True,
"expire_on_commit": True,
"autobegin": True,
"twophase": False,
"enable_baked_queries": True,
"info": {},
"query_cls": QueryClass,
"join_transaction_mode": "conditional_savepoint", # "rollback_only", "control_fully", "create_savepoint"
"close_resets_only": None,
},
},
"connections": {
"": {
"url": "",
"host": "localhost",
"port": 5432,
"database": "dbname",
"username": "user",
"password": "pass",
"options": { # (3)
"connect_args": {},
"echo": False,
"echo_pool": False,
"enable_from_linting": False,
"hide_parameters": False,
"insertmanyvalues_page_size": 1,
"isolation_level": "",
"json_deserializer": json.loads,
"json_serializer": json.dumps,
"label_length": 1,
"logging_name": "",
"max_identifier_length": 1,
"max_overflow": 1,
"module": "",
"paramstyle": "qmark", # "numeric", "named", "format", "pyformat"
"poolclass": PoolClass, # dotted path to the pool class
"pool_logging_name": "",
"pool_pre_ping": False,
"pool_size": 1,
"pool_recycle": 3600,
"pool_reset_on_return": "rollback", # "commit"
"pool_timeout": 1,
"pool_use_lifo": False,
"plugins": [],
"query_cache_size": 1,
"use_insertmanyvalues": False,
"execution_options": { # (4)
"logging_token": "",
"isolation_level": "",
"no_parameters": False,
"stream_results": False,
"max_row_buffer": 1,
"yield_per": 1,
"insertmanyvalues_page_size": 1,
"schema_translate_map": {
"null": "my_schema",
"some_schema": "other_schema",
},
},
},
},
},
},
}
Nota
Associa subclasses de
DeclarativeBasea nomes de conexões definidos emconnectionsValores de
session.optionssão descritos em sqlalchemy.orm.SessionValores de
connection.optionssão descritos em sqlalchemy.create_engineValores de
execution_optionssão descritos em sqlalchemy.engine.Connection.execution_options