Integration with FastAPI¶
Sheppy is designed to feel native to FastAPI users. If you know how to use Depends()
for database connections, you already know how to use Sheppy.
This guide demonstrates building a FastAPI application with background task processing, from basic setup to production testing patterns.
Basic Setup¶
We will build an email service that processes messages in the background. Start by defining your task with Pydantic models for type safety:
import asyncio
from datetime import datetime
from pydantic import BaseModel
from sheppy import task
class Email(BaseModel):
to: str
subject: str
body: str
class Status(BaseModel):
ok: bool
@task
async def send_email_task(email: Email) -> Status:
print(f"Sending email to {email.to} with subject '{email.subject}'")
await asyncio.sleep(1) # simulate sending email
print(f"Email sent to {email.to}")
return Status(ok=True)
Notice that send_email_task
accepts and returns Pydantic models. Sheppy handles validation automatically.
Creating the FastAPI Application¶
The queue is injected exactly like you would inject a database session:
from fastapi import Depends, FastAPI
from sheppy import RedisBackend, Queue
from tasks import Email, Status, send_email_task
backend = RedisBackend("redis://127.0.0.1:6379")
# FastAPI dependency injection
def get_queue() -> Queue:
return Queue(backend)
app = FastAPI(title="Fancy Website")
@app.post("/send-email", status_code=200)
async def send_email(email: Email, queue: Queue = Depends(get_queue)) -> Status:
t = send_email_task(email)
await queue.add(t)
processed = await queue.wait_for(t, timeout=5)
if processed.error:
raise Exception(f"Task failed: {processed.error}")
return processed.result
Key points:
get_queue()
is a standard FastAPI dependencyqueue.add(t)
enqueues the task for background processingqueue.wait_for(t, timeout=5)
blocks until the task completes (useful for synchronous APIs)- The worker process runs separately and picks up tasks from the queue
Running the Application¶
Start the FastAPI server:
fastapi dev app/main.py
In a separate terminal, start the worker:
sheppy work
Visit http://localhost:8000/docs to test the API interactively.
Testing Strategies¶
Sheppy provides flexible testing approaches depending on what you want to verify. For more details on testing with Sheppy, see the Testing guide.
Unit Testing: Test Tasks Directly¶
The simplest approach is testing the task logic in isolation using TestQueue
:
from sheppy import TestQueue
from tasks import Status, send_email_task
def test_send_email_task():
q = TestQueue()
email_data = {
"to": "[email protected]",
"subject": "Test Email",
"body": "This is a test email."
}
t = send_email_task(email_data)
q.add(t)
processed_task = q.process_next()
assert processed_task.completed is True
assert processed_task.error is None
assert processed_task.result == Status(ok=True)
TestQueue
provides a synchronous API with explicit control over task processing. Perfect for fast unit tests.
Integration Testing: Test the Full Stack¶
For end-to-end testing, you need to test the FastAPI endpoint with an actual worker processing tasks.
Synchronous Tests¶
Using FastAPI's TestClient
requires running a worker in a background thread:
import pytest
from fastapi.testclient import TestClient
from sheppy import MemoryBackend, Queue, Worker
from main import app, get_queue
@pytest.fixture
def backend():
return MemoryBackend()
@pytest.fixture
def queue(backend):
return Queue(backend, "pytest")
@pytest.fixture
def worker(backend):
w = Worker("pytest", backend)
# speed up tests (temporary solution)
w._blocking_timeout = 0.01
w._scheduler_polling_interval = 0.01
w._cron_polling_interval = 0.01
return w
def test_fastapi_send_email_route(queue, worker):
app.dependency_overrides[get_queue] = lambda: queue
with TestClient(app) as client:
# run worker process (temporary solution)
client.portal.start_task_soon(
lambda: worker.work(max_tasks=1, register_signal_handlers=False)
)
# Define email data
email_data = {
"to": "[email protected]",
"subject": "Welcome Email",
"body": "Hello, pytest!"
}
response = client.post("/send-email", json=email_data)
assert response.status_code == 200
assert response.json() == {"ok": True}
Note
Work in Progress - future versions will provide simpler testing utilities for FastAPI.
Async Tests (Recommended)¶
For async test suites, use httpx.AsyncClient
with asyncio.create_task
for cleaner worker management:
import asyncio
import pytest
from httpx import ASGITransport, AsyncClient
from sheppy import MemoryBackend, Queue, Worker
from main import app, get_queue
@pytest.fixture
def backend():
return MemoryBackend()
@pytest.fixture
def queue(backend):
return Queue(backend, "pytest")
@pytest.fixture
def worker(backend):
w = Worker("pytest", backend)
# speed up tests (temporary solution)
w._blocking_timeout = 0.01
w._scheduler_polling_interval = 0.01
w._cron_polling_interval = 0.01
return w
async def test_fastapi_send_email_route(queue, worker):
# start worker process
asyncio.create_task(worker.work(max_tasks=1, register_signal_handlers=False))
# override queue dependency
app.dependency_overrides[get_queue] = lambda: queue
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
# Define email data
email_data = {
"to": "[email protected]",
"subject": "Welcome Email",
"body": "Hello, pytest!"
}
response = await client.post("/send-email", json=email_data)
assert response.status_code == 200
assert response.json() == {"ok": True}