TestQueue reference¶
Sheppy offers a first class support for testing tasks using the TestQueue class. This class mimics the behavior of a real queue but operates synchronously, making it ideal for predictable and fast unit tests.
See Testing tasks guide for more details and examples.
sheppy.TestQueue
¶
TestQueue(name: str = 'test-queue')
A simple in-memory queue for testing purposes.
This queue does not require any external services and processes tasks synchronously. It is designed to be used in synchronous tests and follows the same execution flow as a real queue.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Name of the queue. Defaults to "test-queue".
TYPE:
|
| ATTRIBUTE | DESCRIPTION |
|---|---|
processed_tasks |
List of tasks that have been processed.
TYPE:
|
failed_tasks |
List of tasks that have failed during processing.
TYPE:
|
Example
# tests/test_tasks.py
from sheppy import task, TestQueue
from sheppy.testqueue import assert_is_new, assert_is_completed, assert_is_failed
@task
async def add(x: int, y: int) -> int:
return x + y
@task
async def divide(x: int, y: int) -> float:
return x / y
def test_add_task():
q = TestQueue()
t = add(1, 2)
# use helper function to check task is new
assert_is_new(t)
# add task to the queue
success = q.add(t)
assert success is True
# check queue size
assert q.size() == 1
# process the task
processed_task = q.process_next()
# check task is completed
assert_is_completed(processed_task)
assert processed_task.result == 3
# check queue size is now zero
assert q.size() == 0
def test_failing_task():
q = TestQueue()
t = divide(1, 0)
# use helper function to check task is new
assert_is_new(t)
# add task to the queue
success = q.add(t)
assert success is True
# check queue size
assert q.size() == 1
# process the task
processed_task = q.process_next()
# check task is failed
assert_is_failed(processed_task)
assert processed_task.result is None
assert processed_task.error == "ZeroDivisionError: division by zero"
# check queue size is now zero
assert q.size() == 0
Source code in src/sheppy/testqueue.py
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 | |
add
¶
Add task into the queue. Accept list of tasks for batch add.
| PARAMETER | DESCRIPTION |
|---|---|
task
|
Instance of a Task, or list of Task instances for batch mode. |
| RETURNS | DESCRIPTION |
|---|---|
bool | list[bool]
|
Success boolean, or list of booleans in batch mode. |
Example
q = TestQueue()
# add single task
success = q.add(task)
assert success is True
# add multiple tasks
results = q.add([task1, task2, task3])
assert results == [True, True, True]
Source code in src/sheppy/testqueue.py
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | |
schedule
¶
schedule(task: Task, at: datetime | timedelta) -> bool
Schedule task to be processed after certain time.
| PARAMETER | DESCRIPTION |
|---|---|
task
|
Instance of a Task
TYPE:
|
at
|
When to process the task.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
Success boolean |
Example
q = TestQueue()
# schedule task to be processed after 10 minutes
success = q.schedule(task, timedelta(minutes=10))
assert success is True
# ... or at specific time
q.schedule(task, datetime.fromisoformat("2026-01-01 00:00:00 +00:00"))
Source code in src/sheppy/testqueue.py
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 | |
get_task
¶
Get task by id.
| PARAMETER | DESCRIPTION |
|---|---|
task
|
Instance of a Task or its ID, or list of Task instances/IDs for batch mode. |
| RETURNS | DESCRIPTION |
|---|---|
Task | None
|
Instance of a Task or None if not found. |
dict[UUID, Task]
|
(In batch mode) Returns Dictionary of Task IDs to Task instances. |
Source code in src/sheppy/testqueue.py
151 152 153 154 155 156 157 158 159 160 161 | |
get_all_tasks
¶
get_all_tasks() -> list[Task]
Get all tasks, including completed/failed ones.
| RETURNS | DESCRIPTION |
|---|---|
list[Task]
|
List of all tasks |
Source code in src/sheppy/testqueue.py
163 164 165 166 167 168 169 | |
get_scheduled
¶
get_scheduled() -> list[Task]
List scheduled tasks.
| RETURNS | DESCRIPTION |
|---|---|
list[Task]
|
List of scheduled tasks |
Source code in src/sheppy/testqueue.py
208 209 210 211 212 213 214 | |
get_pending
¶
get_pending(count: int = 1) -> list[Task]
List pending tasks.
| PARAMETER | DESCRIPTION |
|---|---|
count
|
Number of pending tasks to retrieve.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Task]
|
List of pending tasks |
Source code in src/sheppy/testqueue.py
171 172 173 174 175 176 177 178 179 180 | |
retry
¶
retry(
task: Task | UUID | str,
at: datetime | timedelta | None = None,
force: bool = False,
) -> bool
Retry failed task.
| PARAMETER | DESCRIPTION |
|---|---|
task
|
Instance of a Task or its ID
TYPE:
|
at
|
When to retry the task.
TYPE:
|
force
|
If True, allows retrying even if task has completed successfully. Defaults to False.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
Success boolean |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If task has already completed successfully and force is not set to True. |
TypeError
|
If provided datetime is not offset-aware. |
Example
q = TestQueue()
# retry immediately
success = q.retry(task)
assert success is True
# or retry after 5 minutes
q.retry(task, at=timedelta(minutes=5))
# or at specific time
q.retry(task, at=datetime.fromisoformat("2026-01-01 00:00:00 +00:00"))
# force retry even if task completed successfully
q.retry(task, force=True)
Source code in src/sheppy/testqueue.py
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 | |
size
¶
size() -> int
Get number of pending tasks in the queue.
| RETURNS | DESCRIPTION |
|---|---|
int
|
Number of pending tasks |
Example
q = TestQueue()
q.add(task)
count = q.size()
assert count == 1
Source code in src/sheppy/testqueue.py
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 | |
clear
¶
clear() -> int
Clear all tasks, including completed ones.
Source code in src/sheppy/testqueue.py
272 273 274 | |
add_cron
¶
add_cron(task: Task, cron: str) -> bool
Add a cron job to run a task on a schedule.
| PARAMETER | DESCRIPTION |
|---|---|
task
|
Instance of a Task
TYPE:
|
cron
|
Cron expression string (e.g. "*/5 * * * *" to run every 5 minutes)
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
Success boolean |
Example
q = TestQueue()
@task
async def say_hello(to: str) -> str:
print(f"[{datetime.now()}] Hello, {to}!")
# schedule task to run every minute
q.add_cron(say_hello("World"), "* * * * *")
Source code in src/sheppy/testqueue.py
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 | |
delete_cron
¶
delete_cron(task: Task, cron: str) -> bool
Delete a cron job.
| PARAMETER | DESCRIPTION |
|---|---|
task
|
Instance of a Task
TYPE:
|
cron
|
Cron expression string used when adding the cron job
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
Success boolean |
Example
q = TestQueue()
# delete previously added cron job
success = q.delete_cron(say_hello("World"), "* * * * *")
assert success is True
Source code in src/sheppy/testqueue.py
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 | |
get_crons
¶
get_crons() -> list[TaskCron]
List all cron jobs.
| RETURNS | DESCRIPTION |
|---|---|
list[TaskCron]
|
List of TaskCron instances |
Example
q = TestQueue()
crons = q.get_crons()
for cron in crons:
print(f"Cron ID: {cron.id}, Expression: {cron.expression}, TaskSpec: {cron.spec}")
Source code in src/sheppy/testqueue.py
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 | |
process_next
¶
process_next() -> Task | None
Process the next pending task in the queue.
| RETURNS | DESCRIPTION |
|---|---|
Task | None
|
The processed Task instance, or None if no pending tasks. |
Example
q = TestQueue()
q.add(task)
processed_task = q.process_next()
assert processed_task is not None
assert processed_task.completed is True
Source code in src/sheppy/testqueue.py
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 | |
process_all
¶
process_all() -> list[Task]
Process all pending tasks in the queue.
| RETURNS | DESCRIPTION |
|---|---|
list[Task]
|
List of processed Task instances. |
Source code in src/sheppy/testqueue.py
361 362 363 364 365 366 367 368 369 370 371 372 | |
process_scheduled
¶
process_scheduled(
at: datetime | timedelta | None = None,
) -> list[Task]
Process scheduled tasks that are due by the specified time.
| PARAMETER | DESCRIPTION |
|---|---|
at
|
The cutoff time to process scheduled tasks. - If datetime is provided, tasks scheduled up to that time will be processed. - If timedelta is provided, it will be added to the current time to determine the cutoff time. - If None (default), processes tasks scheduled up to the current time. Note: datetime must be offset-aware (i.e. have timezone info).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Task]
|
List of processed Task instances. |
Source code in src/sheppy/testqueue.py
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 | |
sheppy.testqueue.assert_is_new
¶
assert_is_new(task: Task | None) -> None
Assert that the task is new (not yet processed). Useful in tests.
| PARAMETER | DESCRIPTION |
|---|---|
task
|
The task to check.
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
AssertionError
|
If the task is not new. |
Source code in src/sheppy/testqueue.py
431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 | |
sheppy.testqueue.assert_is_completed
¶
assert_is_completed(task: Task | None) -> None
Assert that the task is completed (processed successfully). Useful in tests.
| PARAMETER | DESCRIPTION |
|---|---|
task
|
The task to check.
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
AssertionError
|
If the task is not completed successfully. |
Source code in src/sheppy/testqueue.py
449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 | |
sheppy.testqueue.assert_is_failed
¶
assert_is_failed(task: Task | None) -> None
Assert that the task has failed (processed with error). Useful in tests.
| PARAMETER | DESCRIPTION |
|---|---|
task
|
The task to check.
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
AssertionError
|
If the task has not failed. |
Source code in src/sheppy/testqueue.py
466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 | |