Testing Tasks¶
Sheppy is built with testing in mind. The TestQueue
provides a synchronous, deterministic API that makes testing tasks easy and straightforward.
This guide covers testing strategies from basic unit tests to complex retry logic.
Basic Testing¶
Here's a simple task to test:
from sheppy import task
@task
async def add(x: int, y: int) -> int:
return x + y
Testing it is straightforward with TestQueue
:
from sheppy import TestQueue
from tasks import add
def test_add():
q = TestQueue()
# instantiate task
t = add(1, 2)
# add to the test queue
q.add(t)
# process the next task in the queue
processed_task = q.process_next()
# verify the task result
assert processed_task.completed is True
assert processed_task.error is None
assert processed_task.result == 3
Key differences from production Queue
:
- Synchronous API (no
await
needed) - Explicit task processing with
process_next()
orprocess_all()
- Tasks execute immediately in the test process without needing background workers
- Perfect for fast, deterministic unit tests
Testing Task Failures¶
Tasks can fail, and you should test that they fail correctly:
from sheppy import TestQueue, task
@task
def divide(x: int, y: int) -> float:
return x / y
def test_divide_by_zero():
q = TestQueue()
# instantiate two tasks
t1 = divide(1, 2)
t2 = divide(1, 0)
# add both tasks to the test queue
q.add([t1, t2])
# process all tasks in the queue (processed in order)
processed_tasks = q.process_all()
# verify the first task result
assert processed_tasks[0].completed is True
assert processed_tasks[0].error is None
assert processed_tasks[0].result == 0.5
# verify the second task result (should fail)
assert processed_tasks[1].completed is False
assert processed_tasks[1].error == "division by zero"
When a task fails, the exception is captured in the error
attribute. You can assert on this to verify correct error handling.
Testing Retry Logic¶
Test that retry configuration works as expected:
from sheppy import TestQueue, task
FAIL_COUNTER = 0
@task(retry=2, retry_delay=0)
def fail_once() -> str:
global FAIL_COUNTER
if FAIL_COUNTER < 1:
FAIL_COUNTER += 1
raise ValueError("task failed")
return "success"
def test_fail_once():
q = TestQueue()
# instantiate the task
t = fail_once()
# add the task to the test queue
q.add(t)
assert q.size() == 1 # one task in the queue
# process all tasks in the queue
processed = q.process_all()
# there should be two processed tasks: the original + one retry
assert len(processed) == 2
# verify the task result
assert processed[0].completed is False
assert processed[0].error == "task failed"
# retry should succeed
assert processed[1].completed is True
assert processed[1].error is None
assert processed[1].result == "success"
# both processed tasks should have the same id
assert processed[0].id == processed[1].id
When using TaskQueue
, retries happen immediately with no delay, keeping tests fast while still validating retry behavior.
Assertion Helpers¶
Sheppy provides assertion helpers for cleaner test code:
assert_is_new(task)
- Task is new (not yet processed)assert_is_completed(task)
- Task completed successfully with a resultassert_is_failed(task)
- Task failed with an error
These raise clear assertion errors if the task isn't in the expected state:
from sheppy import TestQueue, task
from sheppy.testqueue import (
assert_is_completed,
assert_is_failed,
assert_is_new
)
@task
def add(x: int, y: int) -> int:
return x + y
@task
def divide(x: int, y: int) -> float:
return x / y
def test_fail_once():
q = TestQueue()
t1 = add(1, 2)
t2 = divide(1, 0)
# add both tasks to the test queue
q.add([t1, t2])
# process all tasks in the queue
processed = q.process_all()
# quick assertions to verify original tasks weren't modified
# (note: this is always the case, we never modify original tasks)
assert_is_new(t1)
assert_is_new(t2)
# quick assertions using helper functions
assert_is_completed(processed[0])
assert_is_failed(processed[1])
# check results manually as well
assert processed[0].result == 3
assert processed[1].error == "division by zero"