Tutorial

Vamos nos aprofundar um pouco mais e aprender os conceitos básicos de Zayt.

Nós criaremos uma api de saudações que registra as requisições.

Instalando Zayt

Antes de prosseguir, precisamos instalar Zayt and uvicorn.

$ pip install zayt uvicorn[standard]

Estrutura da aplicação

Uma aplicação zayt pode ser estruturada de várias formas:

Mínimo
project/
├── application.py
├── configuration/
│   └── settings.py
└── resources/
Com módulos
project/
├── application/
│   ├── __init__.py
│   ├── handler.py
│   ├── repository.py
│   └── service.py
├── configuration/
│   └── settings.py
└── resources/

E… é isso! Um módulo ou pacote chamado application será automaticamente importado e examinado por handlers e serviços.

Você pode estruturar o pacote application da forma que lhe convier.

Executando a aplicação

Nós usaremos o uvicorn para executar a aplicação e recarregá-la automaticamente quando houver mudanças no código:

$ uvicorn zayt.run:app --reload
INFO:     Will watch for changes in these directories: ['/home/user/projects/zayt-tutorial']
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [1001] using WatchFiles
INFO:     Started server process [1000]
INFO:     Waiting for application startup.
INFO:     Application startup complete.

Funções handler

Funções handler responderão à requisções HTTP e WebSocket. Eles podem receber serviços através de injeção de dependências.

application/handler.py
from asgikit import Request
from zayt.web import get

@get("hello/{name}")
async def hello(request: Request):
    name = request.path_params["name"]
    await request.respond_json({"greeting": f"Hello, {name}!"})

@get("hello/{name}") define a função como um handler no caminho informado. Se nenhum caminho for informado, o caminho raiz (“/”) será usado.

{name} define um parâmetro de caminho que será armazenado em request.path_params, o qual é um objeto dict.

E agora nós testamos se nosso handler está funcionando:

$ curl localhost:8000/hello/World
{"greeting": "Hello, World!"}

Por enquanto nosso apenas recupera o nome do caminho e responde com dados JSON para o cliente.

Criar o serviço Greeter

Nosso serviço terá um método que recebe o nome e retorna uma saudação. Ele será injetado no handler que criamos anteriormente.

application/service.py
from zayt.di import service


@service
class Greeter:
    def greet(self, name: str) -> str:
        return f"Hello, {name}!"

@service registra a classe no sistema de injeção de dependências para que possa ser injetado em outras classes ou funções handler

application/handler.py
from asgikit import Request
from zayt.web import get
from .service import Greeter


@get("/hello/{name}")
async def hello(request: Request, greeter: Greeter):
    name = request.path_params["name"]
    greeting = greeter.greet(name)
    await request.respond_json({"greeting": greeting})

O parâmetro greeter será injetado no handler.

Adicionar um banco de dados

Nossa aplicação de saudações está funcionando bem, mas nós podemos registrar as requisições de saudações em um banco de dados persistente, para propósitos de auditoria.

Para tanto, nós precisamos criar um serviço de banco de dados e injetá-lo noserviço Greeter. Para isso nós podemos utilizar o aiosqlite:

$ pip install aiosqlite

aiosqlite provê uma classe chamada Connection. Entretando, nós não podemos decorá-la com @service, então nesse caso nós precisamos criar uma função factory para ele:

application/repository.py
from datetime import datetime
from typing import Annotated
import aiosqlite
from zayt.di import service, Inject

@service # (1)
async def connection_factory() -> aiosqlite.Connection:
    conn = await aiosqlite.connect("sqlite:///db.sqlite3")
    yield conn
    await conn.close()


@service
class GreetingRepository:
    database: Annotated[aiosqlite.Connection, Inject] # (2)

    async def initialize(self): # (3)
        await self.database.execute(
            """
            create table if not exists greeting_log(
                greeting text not null,
                datetime text not null
            );
            """
        )

    async def finalize(self): # (4)
        await self.database.execute("drop table if exists greeting_log;")

    async def save_greeting(self, greeting: str, date: datetime):
        query = """
            insert into greeting_log (greeting, datetime)
            values (:greeting, datetime(:datetime))
        """
        params = {"greeting": greeting, "datetime": date}
        await self.database.execute(query, params)

Nota

  1. A função decorada com @service é usada para criar um servço quando você precisa prover tipos que você não possui

  2. Injeta o serviço aiosqlite.Connection em GreetingRepository

  3. Um método chamado initialize será chamado após o serviço ser construído para executar qualquer lógica de inicialização

  4. Um método chamado finalize será chamado antes do serviço ser destruído para executar qualquer lógica de limpeza

application/handler.py
from datetime import datetime
from asgikit import Request
from zayt.web import get
from .repository import GreetingRepository
from .service import Greeter


@get("hello/{name}")
async def hello_name(
    request: Request,
    greeter: Greeter,
    repository: GreetingRepository,
):
    name = request.path_params["name"]
    greeting = greeter.greet(name)
    await repository.save_greeting(greeting, datetime.now())
    await request.respond_json({"greeting": greeting})

Executar ações após a resposta

As saudações está sendo salvas no banco de dados, mas agora nós temos um problema: o usuário precisa esperar até que a saudação seja salva antes de recebê-la.

Para resolver este problema e melhorar a experiência do usuário, nós podemos salvar a saudação após a requisição ser finaliza:

application/handler.py
from datetime import datetime
from asgikit import Request
from zayt.web import get
from .repository import GreetingRepository
from .service import Greeter


@get("hello/{name}")
async def hello_name(
    request: Request,
    greeter: Greeter,
    repository: GreetingRepository,
):
    name = request.path_params["name"]
    greeting = greeter.greet(name)
    await request.respond_json({"greeting": greeting}) # (1)

    await repository.save_greeting(greeting, datetime.now()) # (2)

Nota

  1. A chamada a respond_json finaliza a requisição

  2. A saudação é salva após a resposta ser finaliza

Recuperando os registros de saudações

Para ser as saudações salvas no banco de dados, nós precisamos apenas adicionar um handler para recuperar os registros e retorná-los:

application/repository.py
@service
class GreetingRepository:
    # ...
    async def get_greetings(self) -> list[dict[str, str]]:
        query = """
            select l.greeting, datetime(l.datetime) from greeting_log l
            order by rowid desc
        """

        async with self.database.execute(query) as cur:
            rows = await cur.fetchall()

        return [
            {"greeting": row[0], "datetime": row[1]}
            for row in rows
        ]
application/handler.py
# ...
@get("/logs")
async def greeting_logs(request: Request, repository: GreetingRepository):
    greetings = await repository.get_greetings()
    await request.respond_json(greetings)

Agora vamos tentar requisitar algumas saudações e recuperar os registros

$ curl localhost:8000/hello/Python
{"greeting": "Hello, World!"}

$ curl localhost:8000/hello/World
{"greeting": "Hello, Python!"}

$ curl -s localhost:8000/logs | python -m json.tool
[
    {
        "greeting": "Hello, World!",
        "datetime": "2025-01-01 12:00:10"
    },
    {
        "greeting": "Hello, Python!",
        "datetime": "2025-01-01 12:00:20"
    },
]

Recebendo dados de POST

Nos podemos enviar o nome no corpo da requisição ao invés da url, e utilizar pydantic para fazer o parse do corpo da requisição:

application/handler.py
from zayt.web import get, post
# ...
@post("hello")
async def hello_post(
    request: Request,
    greeter: Greeter,
    repository: GreetingRepository,
):
    greeting_request = await request.body.read_json()
    name = greeting_request["name"]
    greeting = greeter.greet(name)
    await request.respond_json({"greeting": greeting})
    await repository.save_greeting(greeting, datetime.now())

E para testar:

$ curl \
    -H 'Content-Type: application/json' \
    -d '{"name": "World"}' \
    localhost:8000/hello
{"greeting": "Hello, World!"}