Skip to content

TestQueue reference

Sheppy offers a first class support for testing tasks using the TestQueue class. This class mimics the behavior of a real queue but operates synchronously, making it ideal for predictable and fast unit tests.

See Testing tasks guide for more details and examples.

sheppy.TestQueue

TestQueue(name: str = 'test-queue')

A simple in-memory queue for testing purposes.

This queue does not require any external services and processes tasks synchronously. It is designed to be used in synchronous tests and follows the same execution flow as a real queue.

PARAMETER DESCRIPTION
name

Name of the queue. Defaults to "test-queue".

TYPE: str DEFAULT: 'test-queue'

ATTRIBUTE DESCRIPTION
processed_tasks

List of tasks that have been processed.

TYPE: list[Task]

failed_tasks

List of tasks that have failed during processing.

TYPE: list[Task]

Example
# tests/test_tasks.py
from sheppy import task, TestQueue
from sheppy.testqueue import assert_is_new, assert_is_completed, assert_is_failed


@task
async def add(x: int, y: int) -> int:
    return x + y


@task
async def divide(x: int, y: int) -> float:
    return x / y


def test_add_task():
    q = TestQueue()

    t = add(1, 2)

    # use helper function to check task is new
    assert_is_new(t)

    # add task to the queue
    success = q.add(t)
    assert success is True

    # check queue size
    assert q.size() == 1

    # process the task
    processed_task = q.process_next()

    # check task is completed
    assert_is_completed(processed_task)
    assert processed_task.result == 3

    # check queue size is now zero
    assert q.size() == 0


def test_failing_task():
    q = TestQueue()

    t = divide(1, 0)

    # use helper function to check task is new
    assert_is_new(t)

    # add task to the queue
    success = q.add(t)
    assert success is True

    # check queue size
    assert q.size() == 1

    # process the task
    processed_task = q.process_next()

    # check task is failed
    assert_is_failed(processed_task)
    assert processed_task.result is None
    assert processed_task.error == "ZeroDivisionError: division by zero"

    # check queue size is now zero
    assert q.size() == 0
Source code in src/sheppy/testqueue.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def __init__(
    self,
    name: str = "test-queue",
    #dependency_overrides: dict[Callable[..., Any], Callable[..., Any]] | None = None  # ! FIXME
):
    self.name = name

    self._backend = MemoryBackend()
    self._backend._connected = True
    self._queue = Queue(self._backend, self.name)
    #self._dependency_resolver = DependencyResolver(dependency_overrides)
    self._worker_id = "TestQueue"
    self._task_processor = TaskProcessor()

    self.processed_tasks: list[Task] = []
    self.failed_tasks: list[Task] = []

add

add(task: Task) -> bool
add(task: list[Task]) -> list[bool]

Add task into the queue. Accept list of tasks for batch add.

PARAMETER DESCRIPTION
task

Instance of a Task, or list of Task instances for batch mode.

TYPE: Task | list[Task]

RETURNS DESCRIPTION
bool | list[bool]

Success boolean, or list of booleans in batch mode.

Example
q = TestQueue()

# add single task
success = q.add(task)
assert success is True

# add multiple tasks
results = q.add([task1, task2, task3])
assert results == [True, True, True]
Source code in src/sheppy/testqueue.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def add(self, task: Task | list[Task]) -> bool | list[bool]:
    """
    Add task into the queue. Accept list of tasks for batch add.

    Args:
        task: Instance of a Task, or list of Task instances for batch mode.

    Returns:
        Success boolean, or list of booleans in batch mode.

    Example:
        ```python
        q = TestQueue()

        # add single task
        success = q.add(task)
        assert success is True

        # add multiple tasks
        results = q.add([task1, task2, task3])
        assert results == [True, True, True]
        ```
    """
    return asyncio.run(self._queue.add(task))  # type: ignore[return-value]

schedule

schedule(task: Task, at: datetime | timedelta) -> bool

Schedule task to be processed after certain time.

PARAMETER DESCRIPTION
task

Instance of a Task

TYPE: Task

at

When to process the task.
If timedelta is provided, it will be added to current time.
Note: datetime must be offset-aware (i.e. have timezone info).

TYPE: datetime | timedelta

RETURNS DESCRIPTION
bool

Success boolean

Example
q = TestQueue()

# schedule task to be processed after 10 minutes
success = q.schedule(task, timedelta(minutes=10))
assert success is True

# ... or at specific time
q.schedule(task, datetime.fromisoformat("2026-01-01 00:00:00 +00:00"))
Source code in src/sheppy/testqueue.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def schedule(self, task: Task, at: datetime | timedelta) -> bool:
    """Schedule task to be processed after certain time.

    Args:
        task (Task): Instance of a Task
        at (datetime | timedelta): When to process the task.<br>
                                   If timedelta is provided, it will be added to current time.<br>
                                   *Note: datetime must be offset-aware (i.e. have timezone info).*

    Returns:
        Success boolean

    Example:
        ```python
        q = TestQueue()

        # schedule task to be processed after 10 minutes
        success = q.schedule(task, timedelta(minutes=10))
        assert success is True

        # ... or at specific time
        q.schedule(task, datetime.fromisoformat("2026-01-01 00:00:00 +00:00"))
        ```
    """
    return asyncio.run(self._queue.schedule(task, at))

get_task

get_task(task: Task | UUID | str) -> Task | None
get_task(task: list[Task | UUID | str]) -> dict[UUID, Task]

Get task by id.

PARAMETER DESCRIPTION
task

Instance of a Task or its ID, or list of Task instances/IDs for batch mode.

TYPE: Task | UUID | str | list[Task | UUID | str]

RETURNS DESCRIPTION
Task | None

Instance of a Task or None if not found.

dict[UUID, Task]

(In batch mode) Returns Dictionary of Task IDs to Task instances.

Source code in src/sheppy/testqueue.py
151
152
153
154
155
156
157
158
159
160
161
def get_task(self, task: Task | UUID | str | list[Task | UUID | str]) -> Task | None | dict[UUID, Task]:
    """Get task by id.

    Args:
        task: Instance of a Task or its ID, or list of Task instances/IDs for batch mode.

    Returns:
        (Task|None): Instance of a Task or None if not found.
        (dict[UUID, Task]): *(In batch mode)* Returns Dictionary of Task IDs to Task instances.
    """
    return asyncio.run(self._queue.get_task(task))

get_all_tasks

get_all_tasks() -> list[Task]

Get all tasks, including completed/failed ones.

RETURNS DESCRIPTION
list[Task]

List of all tasks

Source code in src/sheppy/testqueue.py
163
164
165
166
167
168
169
def get_all_tasks(self) -> list[Task]:
    """Get all tasks, including completed/failed ones.

    Returns:
        List of all tasks
    """
    return asyncio.run(self._queue.get_all_tasks())

get_scheduled

get_scheduled() -> list[Task]

List scheduled tasks.

RETURNS DESCRIPTION
list[Task]

List of scheduled tasks

Source code in src/sheppy/testqueue.py
208
209
210
211
212
213
214
def get_scheduled(self) -> list[Task]:
    """List scheduled tasks.

    Returns:
        List of scheduled tasks
    """
    return asyncio.run(self._queue.get_scheduled())

get_pending

get_pending(count: int = 1) -> list[Task]

List pending tasks.

PARAMETER DESCRIPTION
count

Number of pending tasks to retrieve.

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION
list[Task]

List of pending tasks

Source code in src/sheppy/testqueue.py
171
172
173
174
175
176
177
178
179
180
def get_pending(self, count: int = 1) -> list[Task]:
    """List pending tasks.

    Args:
        count: Number of pending tasks to retrieve.

    Returns:
        List of pending tasks
    """
    return asyncio.run(self._queue.get_pending(count))

retry

retry(
    task: Task | UUID | str,
    at: datetime | timedelta | None = None,
    force: bool = False,
) -> bool

Retry failed task.

PARAMETER DESCRIPTION
task

Instance of a Task or its ID

TYPE: Task | UUID | str

at

When to retry the task.
- If None (default), retries immediately.
- If timedelta is provided, it will be added to current time.
Note: datetime must be offset-aware (i.e. have timezone info).

TYPE: datetime | timedelta | None DEFAULT: None

force

If True, allows retrying even if task has completed successfully. Defaults to False.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
bool

Success boolean

RAISES DESCRIPTION
ValueError

If task has already completed successfully and force is not set to True.

TypeError

If provided datetime is not offset-aware.

Example
q = TestQueue()

# retry immediately
success = q.retry(task)
assert success is True

# or retry after 5 minutes
q.retry(task, at=timedelta(minutes=5))

# or at specific time
q.retry(task, at=datetime.fromisoformat("2026-01-01 00:00:00 +00:00"))

# force retry even if task completed successfully
q.retry(task, force=True)
Source code in src/sheppy/testqueue.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def retry(self, task: Task | UUID | str, at: datetime | timedelta | None = None, force: bool = False) -> bool:
    """Retry failed task.

    Args:
        task: Instance of a Task or its ID
        at: When to retry the task.<br>
            - If None (default), retries immediately.<br>
            - If timedelta is provided, it will be added to current time.<br>
            *Note: datetime must be offset-aware (i.e. have timezone info).*
        force: If True, allows retrying even if task has completed successfully. Defaults to False.

    Returns:
        Success boolean

    Raises:
        ValueError: If task has already completed successfully and force is not set to True.
        TypeError: If provided datetime is not offset-aware.

    Example:
        ```python
        q = TestQueue()

        # retry immediately
        success = q.retry(task)
        assert success is True

        # or retry after 5 minutes
        q.retry(task, at=timedelta(minutes=5))

        # or at specific time
        q.retry(task, at=datetime.fromisoformat("2026-01-01 00:00:00 +00:00"))

        # force retry even if task completed successfully
        q.retry(task, force=True)
        ```
    """
    return asyncio.run(self._queue.retry(task, at, force))

size

size() -> int

Get number of pending tasks in the queue.

RETURNS DESCRIPTION
int

Number of pending tasks

Example
q = TestQueue()

q.add(task)

count = q.size()
assert count == 1
Source code in src/sheppy/testqueue.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def size(self) -> int:
    """Get number of pending tasks in the queue.

    Returns:
        Number of pending tasks

    Example:
        ```python
        q = TestQueue()

        q.add(task)

        count = q.size()
        assert count == 1
        ```
    """
    return asyncio.run(self._queue.size())

clear

clear() -> int

Clear all tasks, including completed ones.

Source code in src/sheppy/testqueue.py
272
273
274
def clear(self) -> int:
    """Clear all tasks, including completed ones."""
    return asyncio.run(self._queue.clear())

add_cron

add_cron(task: Task, cron: str) -> bool

Add a cron job to run a task on a schedule.

PARAMETER DESCRIPTION
task

Instance of a Task

TYPE: Task

cron

Cron expression string (e.g. "*/5 * * * *" to run every 5 minutes)

TYPE: str

RETURNS DESCRIPTION
bool

Success boolean

Example
q = TestQueue()

@task
async def say_hello(to: str) -> str:
    print(f"[{datetime.now()}] Hello, {to}!")

# schedule task to run every minute
q.add_cron(say_hello("World"), "* * * * *")
Source code in src/sheppy/testqueue.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def add_cron(self, task: Task, cron: str) -> bool:
    """Add a cron job to run a task on a schedule.

    Args:
        task: Instance of a Task
        cron: Cron expression string (e.g. "*/5 * * * *" to run every 5 minutes)

    Returns:
        Success boolean

    Example:
        ```python
        q = TestQueue()

        @task
        async def say_hello(to: str) -> str:
            print(f"[{datetime.now()}] Hello, {to}!")

        # schedule task to run every minute
        q.add_cron(say_hello("World"), "* * * * *")
        ```
    """
    return asyncio.run(self._queue.add_cron(task, cron))

delete_cron

delete_cron(task: Task, cron: str) -> bool

Delete a cron job.

PARAMETER DESCRIPTION
task

Instance of a Task

TYPE: Task

cron

Cron expression string used when adding the cron job

TYPE: str

RETURNS DESCRIPTION
bool

Success boolean

Example
q = TestQueue()

# delete previously added cron job
success = q.delete_cron(say_hello("World"), "* * * * *")
assert success is True
Source code in src/sheppy/testqueue.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def delete_cron(self, task: Task, cron: str) -> bool:
    """Delete a cron job.

    Args:
        task: Instance of a Task
        cron: Cron expression string used when adding the cron job

    Returns:
        Success boolean

    Example:
        ```python
        q = TestQueue()

        # delete previously added cron job
        success = q.delete_cron(say_hello("World"), "* * * * *")
        assert success is True
        ```
    """
    return asyncio.run(self._queue.delete_cron(task, cron))

get_crons

get_crons() -> list[TaskCron]

List all cron jobs.

RETURNS DESCRIPTION
list[TaskCron]

List of TaskCron instances

Example
q = TestQueue()

crons = q.get_crons()

for cron in crons:
    print(f"Cron ID: {cron.id}, Expression: {cron.expression}, TaskSpec: {cron.spec}")
Source code in src/sheppy/testqueue.py
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
def get_crons(self) -> list[TaskCron]:
    """List all cron jobs.

    Returns:
        List of TaskCron instances

    Example:
        ```python
        q = TestQueue()

        crons = q.get_crons()

        for cron in crons:
            print(f"Cron ID: {cron.id}, Expression: {cron.expression}, TaskSpec: {cron.spec}")
        ```
    """
    return asyncio.run(self._queue.get_crons())

process_next

process_next() -> Task | None

Process the next pending task in the queue.

RETURNS DESCRIPTION
Task | None

The processed Task instance, or None if no pending tasks.

Example
q = TestQueue()

q.add(task)
processed_task = q.process_next()
assert processed_task is not None
assert processed_task.completed is True
Source code in src/sheppy/testqueue.py
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
def process_next(self) -> Task | None:
    """Process the next pending task in the queue.

    Returns:
        The processed Task instance, or None if no pending tasks.

    Example:
        ```python
        q = TestQueue()

        q.add(task)
        processed_task = q.process_next()
        assert processed_task is not None
        assert processed_task.completed is True
        ```
    """
    async def _process_next_async() -> Task | None:
        tasks = await self._queue._pop_pending(limit=1)
        return await self._process_task(tasks[0]) if tasks else None

    return asyncio.run(_process_next_async())

process_all

process_all() -> list[Task]

Process all pending tasks in the queue.

RETURNS DESCRIPTION
list[Task]

List of processed Task instances.

Source code in src/sheppy/testqueue.py
361
362
363
364
365
366
367
368
369
370
371
372
def process_all(self) -> list[Task]:
    """Process all pending tasks in the queue.

    Returns:
        List of processed Task instances.
    """
    processed = []

    while task := self.process_next():
        processed.append(task)

    return processed

process_scheduled

process_scheduled(
    at: datetime | timedelta | None = None,
) -> list[Task]

Process scheduled tasks that are due by the specified time.

PARAMETER DESCRIPTION
at

The cutoff time to process scheduled tasks. - If datetime is provided, tasks scheduled up to that time will be processed. - If timedelta is provided, it will be added to the current time to determine the cutoff time. - If None (default), processes tasks scheduled up to the current time. Note: datetime must be offset-aware (i.e. have timezone info).

TYPE: datetime | timedelta | None DEFAULT: None

RETURNS DESCRIPTION
list[Task]

List of processed Task instances.

Source code in src/sheppy/testqueue.py
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
def process_scheduled(self, at: datetime | timedelta | None = None) -> list[Task]:
    """Process scheduled tasks that are due by the specified time.

    Args:
        at (datetime | timedelta | None): The cutoff time to process scheduled tasks.
            - If datetime is provided, tasks scheduled up to that time will be processed.
            - If timedelta is provided, it will be added to the current time to determine the cutoff time.
            - If None (default), processes tasks scheduled up to the current time.
            *Note: datetime must be offset-aware (i.e. have timezone info).*

    Returns:
        List of processed Task instances.
    """
    if isinstance(at, timedelta):
        at = datetime.now(timezone.utc) + at
    elif at is None:
        at = datetime.now(timezone.utc)

    async def _process_scheduled_async(at: datetime) -> list[Task]:
        tasks = [Task.model_validate(t) for t in await self._backend.pop_scheduled(self.name, at)]
        return [await self._process_task(task) for task in tasks]

    return asyncio.run(_process_scheduled_async(at))

sheppy.testqueue.assert_is_new

assert_is_new(task: Task | None) -> None

Assert that the task is new (not yet processed). Useful in tests.

PARAMETER DESCRIPTION
task

The task to check.

TYPE: Task | None

RAISES DESCRIPTION
AssertionError

If the task is not new.

Source code in src/sheppy/testqueue.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
def assert_is_new(task: Task | None) -> None:
    """Assert that the task is new (not yet processed). Useful in tests.

    Args:
        task (Task|None): The task to check.

    Raises:
        AssertionError: If the task is not new.
    """
    assert task is not None
    assert isinstance(task, Task)

    assert task.completed is False
    assert task.error is None
    assert task.result is None
    assert task.finished_at is None

sheppy.testqueue.assert_is_completed

assert_is_completed(task: Task | None) -> None

Assert that the task is completed (processed successfully). Useful in tests.

PARAMETER DESCRIPTION
task

The task to check.

TYPE: Task | None

RAISES DESCRIPTION
AssertionError

If the task is not completed successfully.

Source code in src/sheppy/testqueue.py
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
def assert_is_completed(task: Task | None) -> None:
    """Assert that the task is completed (processed successfully). Useful in tests.

    Args:
        task (Task|None): The task to check.

    Raises:
        AssertionError: If the task is not completed successfully.
    """
    assert task is not None
    assert isinstance(task, Task)

    assert task.completed is True
    assert task.error is None
    assert task.finished_at is not None

sheppy.testqueue.assert_is_failed

assert_is_failed(task: Task | None) -> None

Assert that the task has failed (processed with error). Useful in tests.

PARAMETER DESCRIPTION
task

The task to check.

TYPE: Task | None

RAISES DESCRIPTION
AssertionError

If the task has not failed.

Source code in src/sheppy/testqueue.py
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
def assert_is_failed(task: Task | None) -> None:
    """Assert that the task has failed (processed with error). Useful in tests.

    Args:
        task (Task|None): The task to check.

    Raises:
        AssertionError: If the task has not failed.
    """
    assert task is not None
    assert isinstance(task, Task)

    assert not task.completed
    assert task.error is not None
    assert task.result is None

    if not task.should_retry:
        assert task.finished_at is not None