Testing Tasks¶
Sheppy is built with testing in mind. The TestQueue provides a synchronous, deterministic API that makes testing tasks easy and straightforward.
This guide covers testing strategies from basic unit tests to complex retry logic.
Basic Testing¶
Here's a simple task to test:
tasks.py
from sheppy import task
@task
async def add(x: int, y: int) -> int:
return x + y
Testing it is straightforward with TestQueue:
tests/test_tasks.py
from sheppy import TestQueue
from tasks import add
def test_add():
q = TestQueue()
# instantiate task
t = add(1, 2)
# add to the test queue
q.add(t)
# process the next task in the queue
processed_task = q.process_next()
# verify the task result
assert processed_task.status == 'completed'
assert processed_task.error is None
assert processed_task.result == 3
Key differences from production Queue:
- Synchronous API (no
awaitneeded) - Explicit task processing with
process_next()orprocess_all() - Tasks execute immediately in the test process without needing background workers
- Perfect for fast, deterministic unit tests
Testing Task Failures¶
Tasks can fail, and you should test that they fail correctly:
tests/test_failure.py
from sheppy import TestQueue, task
@task
def divide(x: int, y: int) -> float:
return x / y
def test_divide_by_zero():
q = TestQueue()
# instantiate two tasks
t1 = divide(1, 2)
t2 = divide(1, 0)
# add both tasks to the test queue
q.add([t1, t2])
# process all tasks in the queue (processed in order)
processed_tasks = q.process_all()
# verify the first task result
assert processed_tasks[0].status == 'completed'
assert processed_tasks[0].error is None
assert processed_tasks[0].result == 0.5
# verify the second task result (should fail)
assert processed_tasks[1].status == 'failed'
assert processed_tasks[1].error == "ZeroDivisionError: division by zero"
When a task fails, the exception is captured in the error attribute. You can assert on this to verify correct error handling.
Testing Retry Logic¶
Test that retry configuration works as expected:
tests/test_retry_logic.py
from sheppy import TestQueue, task
FAIL_COUNTER = 0
@task(retry=2, retry_delay=0)
def fail_once() -> str:
global FAIL_COUNTER
if FAIL_COUNTER < 1:
FAIL_COUNTER += 1
raise ValueError("task failed")
return "success"
def test_fail_once():
q = TestQueue()
# instantiate the task
t = fail_once()
# add the task to the test queue
q.add(t)
assert q.size() == 1 # one task in the queue
# process all tasks in the queue
processed = q.process_all()
# there should be two processed tasks: the original + one retry
assert len(processed) == 2
# verify the task result
assert processed[0].status == 'failed'
assert processed[0].error == "ValueError: task failed"
# retry should succeed
assert processed[1].status == 'completed'
assert processed[1].error is None
assert processed[1].result == "success"
# both processed tasks should have the same id
assert processed[0].id == processed[1].id
When using TaskQueue, retries happen immediately with no delay, keeping tests fast while still validating retry behavior.