Skip to content

Integration with FastAPI

Sheppy is designed to feel native to FastAPI users. If you know how to use Depends() for database connections, you already know how to use Sheppy.

This guide demonstrates building a FastAPI application with background task processing, from basic setup to production testing patterns.

Basic Setup

We will build an email service that processes messages in the background. Start by defining your task with Pydantic models for type safety:

app/tasks.py
import asyncio
from datetime import datetime
from pydantic import BaseModel

from sheppy import task


class Email(BaseModel):
    to: str
    subject: str
    body: str


class Status(BaseModel):
    ok: bool


@task
async def send_email_task(email: Email) -> Status:
    print(f"Sending email to {email.to} with subject '{email.subject}'")
    await asyncio.sleep(1)  # simulate sending email
    print(f"Email sent to {email.to}")
    return Status(ok=True)

Notice that send_email_task accepts and returns Pydantic models. Sheppy handles validation automatically.

Creating the FastAPI Application

The queue is injected exactly like you would inject a database session:

app/main.py
from fastapi import Depends, FastAPI
from sheppy import RedisBackend, Queue

from tasks import Email, Status, send_email_task

backend = RedisBackend("redis://127.0.0.1:6379")

# FastAPI dependency injection
def get_queue() -> Queue:
    return Queue(backend)


app = FastAPI(title="Fancy Website")


@app.post("/send-email", status_code=200)
async def send_email(email: Email, queue: Queue = Depends(get_queue)) -> Status:

    t = send_email_task(email)
    await queue.add(t)

    processed = await queue.wait_for(t, timeout=5)

    if processed.error:
        raise Exception(f"Task failed: {processed.error}")

    return processed.result

Key points:

  • get_queue() is a standard FastAPI dependency
  • queue.add(t) enqueues the task for background processing
  • queue.wait_for(t, timeout=5) blocks until the task completes (useful for synchronous APIs)
  • The worker process runs separately and picks up tasks from the queue

Running the Application

Start the FastAPI server:

fastapi dev app/main.py

In a separate terminal, start the worker:

sheppy work

Visit http://localhost:8000/docs to test the API interactively.

Testing Strategies

Sheppy provides flexible testing approaches depending on what you want to verify. For more details on testing with Sheppy, see the Testing guide.

Unit Testing: Test Tasks Directly

The simplest approach is testing the task logic in isolation using TestQueue:

tests/test_tasks.py
from sheppy import TestQueue
from tasks import Status, send_email_task


def test_send_email_task():
    q = TestQueue()

    email_data = {
        "to": "[email protected]",
        "subject": "Test Email",
        "body": "This is a test email."
    }

    t = send_email_task(email_data)
    q.add(t)

    processed_task = q.process_next()

    assert processed_task.completed is True
    assert processed_task.error is None
    assert processed_task.result == Status(ok=True)

TestQueue provides a synchronous API with explicit control over task processing. Perfect for fast unit tests.

Integration Testing: Test the Full Stack

For end-to-end testing, you need to test the FastAPI endpoint with an actual worker processing tasks.

Synchronous Tests

Using FastAPI's TestClient requires running a worker in a background thread:

tests/test_app.py
import pytest
from fastapi.testclient import TestClient
from sheppy import MemoryBackend, Queue, Worker
from main import app, get_queue


@pytest.fixture
def backend():
    return MemoryBackend()


@pytest.fixture
def queue(backend):
    return Queue(backend, "pytest")


@pytest.fixture
def worker(backend):
    w = Worker("pytest", backend)
    # speed up tests (temporary solution)
    w._blocking_timeout = 0.01
    w._scheduler_polling_interval = 0.01
    w._cron_polling_interval = 0.01
    return w


def test_fastapi_send_email_route(queue, worker):

    app.dependency_overrides[get_queue] = lambda: queue

    with TestClient(app) as client:
        # run worker process (temporary solution)
        client.portal.start_task_soon(
            lambda: worker.work(max_tasks=1, register_signal_handlers=False)
        )

        # Define email data
        email_data = {
            "to": "[email protected]",
            "subject": "Welcome Email",
            "body": "Hello, pytest!"
        }

        response = client.post("/send-email", json=email_data)

        assert response.status_code == 200
        assert response.json() == {"ok": True}

Note

Work in Progress - future versions will provide simpler testing utilities for FastAPI.

For async test suites, use httpx.AsyncClient with asyncio.create_task for cleaner worker management:

tests/test_app_async.py
import asyncio
import pytest
from httpx import ASGITransport, AsyncClient
from sheppy import MemoryBackend, Queue, Worker
from main import app, get_queue


@pytest.fixture
def backend():
    return MemoryBackend()


@pytest.fixture
def queue(backend):
    return Queue(backend, "pytest")


@pytest.fixture
def worker(backend):
    w = Worker("pytest", backend)
    # speed up tests (temporary solution)
    w._blocking_timeout = 0.01
    w._scheduler_polling_interval = 0.01
    w._cron_polling_interval = 0.01
    return w


async def test_fastapi_send_email_route(queue, worker):
    # start worker process
    asyncio.create_task(worker.work(max_tasks=1, register_signal_handlers=False))

    # override queue dependency
    app.dependency_overrides[get_queue] = lambda: queue

    async with AsyncClient(
        transport=ASGITransport(app=app), base_url="http://test"
    ) as client:
        # Define email data
        email_data = {
            "to": "[email protected]",
            "subject": "Welcome Email",
            "body": "Hello, pytest!"
        }

        response = await client.post("/send-email", json=email_data)

        assert response.status_code == 200
        assert response.json() == {"ok": True}