Skip to content

Testing Tasks

Sheppy is built with testing in mind. The TestQueue provides a synchronous, deterministic API that makes testing tasks easy and straightforward.

This guide covers testing strategies from basic unit tests to complex retry logic.

Basic Testing

Here's a simple task to test:

tasks.py
from sheppy import task

@task
async def add(x: int, y: int) -> int:
    return x + y

Testing it is straightforward with TestQueue:

tests/test_tasks.py
from sheppy import TestQueue
from tasks import add

def test_add():
    q = TestQueue()

    # instantiate task
    t = add(1, 2)

    # add to the test queue
    q.add(t)

    # process the next task in the queue
    processed_task = q.process_next()

    # verify the task result
    assert processed_task.completed is True
    assert processed_task.error is None
    assert processed_task.result == 3

Key differences from production Queue:

  • Synchronous API (no await needed)
  • Explicit task processing with process_next() or process_all()
  • Tasks execute immediately in the test process without needing background workers
  • Perfect for fast, deterministic unit tests

Testing Task Failures

Tasks can fail, and you should test that they fail correctly:

tests/test_failure.py
from sheppy import TestQueue, task


@task
def divide(x: int, y: int) -> float:
    return x / y


def test_divide_by_zero():
    q = TestQueue()

    # instantiate two tasks
    t1 = divide(1, 2)
    t2 = divide(1, 0)

    # add both tasks to the test queue
    q.add([t1, t2])

    # process all tasks in the queue (processed in order)
    processed_tasks = q.process_all()

    # verify the first task result
    assert processed_tasks[0].completed is True
    assert processed_tasks[0].error is None
    assert processed_tasks[0].result == 0.5

    # verify the second task result (should fail)
    assert processed_tasks[1].completed is False
    assert processed_tasks[1].error == "division by zero"

When a task fails, the exception is captured in the error attribute. You can assert on this to verify correct error handling.

Testing Retry Logic

Test that retry configuration works as expected:

tests/test_retry_logic.py
from sheppy import TestQueue, task

FAIL_COUNTER = 0

@task(retry=2, retry_delay=0)
def fail_once() -> str:
    global FAIL_COUNTER

    if FAIL_COUNTER < 1:
        FAIL_COUNTER += 1
        raise ValueError("task failed")

    return "success"

def test_fail_once():
    q = TestQueue()

    # instantiate the task
    t = fail_once()

    # add the task to the test queue
    q.add(t)

    assert q.size() == 1  # one task in the queue

    # process all tasks in the queue
    processed = q.process_all()

    # there should be two processed tasks: the original + one retry
    assert len(processed) == 2

    # verify the task result
    assert processed[0].completed is False
    assert processed[0].error == "task failed"

    # retry should succeed
    assert processed[1].completed is True
    assert processed[1].error is None
    assert processed[1].result == "success"

    # both processed tasks should have the same id
    assert processed[0].id == processed[1].id

When using TaskQueue, retries happen immediately with no delay, keeping tests fast while still validating retry behavior.

Assertion Helpers

Sheppy provides assertion helpers for cleaner test code:

  • assert_is_new(task) - Task is new (not yet processed)
  • assert_is_completed(task) - Task completed successfully with a result
  • assert_is_failed(task) - Task failed with an error

These raise clear assertion errors if the task isn't in the expected state:

tests/test_assert_helper_functions.py
from sheppy import TestQueue, task
from sheppy.testqueue import (
    assert_is_completed,
    assert_is_failed,
    assert_is_new
)

@task
def add(x: int, y: int) -> int:
    return x + y

@task
def divide(x: int, y: int) -> float:
    return x / y

def test_fail_once():
    q = TestQueue()

    t1 = add(1, 2)
    t2 = divide(1, 0)

    # add both tasks to the test queue
    q.add([t1, t2])

    # process all tasks in the queue
    processed = q.process_all()

    # quick assertions to verify original tasks weren't modified
    # (note: this is always the case, we never modify original tasks)
    assert_is_new(t1)
    assert_is_new(t2)

    # quick assertions using helper functions
    assert_is_completed(processed[0])
    assert_is_failed(processed[1])

    # check results manually as well
    assert processed[0].result == 3
    assert processed[1].error == "division by zero"