Skip to content

Queue reference

Sheppy provides a Queue class to manage and execute background tasks. The queue supports adding tasks, scheduling them for future execution, retrying failed tasks, and managing periodic tasks using cron expressions.

See Getting Started guide for more details and examples.

sheppy.Queue

Queue(backend: Backend, name: str = 'default')

Queue class provides an easy way to manage task queue.

PARAMETER DESCRIPTION
backend

An instance of task backend (e.g. sheppy.RedisBackend)

TYPE: Backend

name

Name of the queue

TYPE: str DEFAULT: 'default'

Source code in src/sheppy/queue.py
19
20
21
def __init__(self, backend: Backend, name: str = "default"):
    self.name = name
    self.backend = backend

add

add(task: Task) -> bool
add(task: list[Task]) -> list[bool]

Add task into the queue. Accept list of tasks for batch add.

PARAMETER DESCRIPTION
task

Instance of a Task, or list of Task instances for batch mode.

TYPE: Task | list[Task]

RETURNS DESCRIPTION
bool | list[bool]

Success boolean, or list of booleans in batch mode.

Example
q = Queue(...)
success = await q.add(task)
assert success is True

# batch mode
success = await q.add([task1, task2])
assert success == [True, True]  # returns list of booleans in batch mode
Source code in src/sheppy/queue.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
async def add(self, task: Task | list[Task]) -> bool | list[bool]:
    """
    Add task into the queue. Accept list of tasks for batch add.

    Args:
        task: Instance of a Task, or list of Task instances for batch mode.

    Returns:
        Success boolean, or list of booleans in batch mode.

    Example:
        ```python
        q = Queue(...)
        success = await q.add(task)
        assert success is True

        # batch mode
        success = await q.add([task1, task2])
        assert success == [True, True]  # returns list of booleans in batch mode
        ```
    """
    await self.__ensure_backend_is_connected()

    if isinstance(task, list):
        batch_mode = True
        tasks = [t.model_dump(mode='json') for t in task]
    else:
        batch_mode = False
        tasks = [task.model_dump(mode='json')]

    success = await self.backend.append(self.name, tasks)

    return success if batch_mode else success[0]

schedule

schedule(task: Task, at: datetime | timedelta) -> bool

Schedule task to be processed after certain time.

PARAMETER DESCRIPTION
task

Instance of a Task

TYPE: Task

at

When to process the task.
If timedelta is provided, it will be added to current time.
Note: datetime must be offset-aware (i.e. have timezone info).

TYPE: datetime | timedelta

RETURNS DESCRIPTION
bool

Success boolean

Example
from datetime import datetime, timedelta

q = Queue(...)
# schedule task to be processed after 10 minutes
await q.schedule(task, timedelta(minutes=10))

# ... or at specific time
await q.schedule(task, datetime.fromisoformat("2026-01-01 00:00:00 +00:00"))
Source code in src/sheppy/queue.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
async def schedule(self, task: Task, at: datetime | timedelta) -> bool:
    """Schedule task to be processed after certain time.

    Args:
        task (Task): Instance of a Task
        at (datetime | timedelta): When to process the task.<br>
                                   If timedelta is provided, it will be added to current time.<br>
                                   *Note: datetime must be offset-aware (i.e. have timezone info).*

    Returns:
        Success boolean

    Example:
        ```python
        from datetime import datetime, timedelta

        q = Queue(...)
        # schedule task to be processed after 10 minutes
        await q.schedule(task, timedelta(minutes=10))

        # ... or at specific time
        await q.schedule(task, datetime.fromisoformat("2026-01-01 00:00:00 +00:00"))
        ```
    """
    await self.__ensure_backend_is_connected()

    if isinstance(at, timedelta):
        at = datetime.now(timezone.utc) + at

    if not at.tzinfo:
        raise TypeError("provided datetime must be offset-aware")

    task.__dict__["scheduled_at"] = at

    return await self.backend.schedule(self.name, task.model_dump(mode="json"), at)

get_task

get_task(task: Task | UUID | str) -> Task | None
get_task(task: list[Task | UUID | str]) -> dict[UUID, Task]

Get task by id.

PARAMETER DESCRIPTION
task

Instance of a Task or its ID, or list of Task instances/IDs for batch mode.

TYPE: Task | UUID | str | list[Task | UUID | str]

RETURNS DESCRIPTION
Task | None

Instance of a Task or None if not found.

dict[UUID, Task]

(In batch mode) Returns Dictionary of Task IDs to Task instances.

Source code in src/sheppy/queue.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
async def get_task(self, task: Task | UUID | str | list[Task | UUID | str]) -> dict[UUID, Task] | Task | None:
    """Get task by id.

    Args:
        task: Instance of a Task or its ID, or list of Task instances/IDs for batch mode.

    Returns:
        (Task|None): Instance of a Task or None if not found.
        (dict[UUID, Task]): *(In batch mode)* Returns Dictionary of Task IDs to Task instances.
    """
    await self.__ensure_backend_is_connected()

    task_ids, batch_mode = self._get_task_ids(task)
    task_results = await self.backend.get_tasks(self.name, task_ids)

    if batch_mode:
        return {UUID(t_id): Task.model_validate(t) for t_id, t in task_results.items()}

    td = task_results.get(task_ids[0])

    return Task.model_validate(td) if td else None

wait_for

wait_for(
    task: Task | UUID | str, timeout: float = 0
) -> Task | None
wait_for(
    task: list[Task | UUID | str], timeout: float = 0
) -> dict[UUID, Task]

Wait for task to complete and return updated task instance.

PARAMETER DESCRIPTION
task

Instance of a Task or its ID, or list of Task instances/IDs for batch mode.

TYPE: Task | UUID | str | list[Task | UUID | str]

timeout

Maximum time to wait in seconds. Default is 0 (wait indefinitely).
If timeout is reached, returns None (or partial results in batch mode).
In batch mode, this is the maximum time to wait for all tasks to complete.
Note: In non-batch mode, if timeout is reached and no task is found, a TimeoutError is raised.

TYPE: float DEFAULT: 0

RETURNS DESCRIPTION
dict[UUID, Task] | Task | None

Instance of a Task or None if not found or timeout reached.
In batch mode, returns dictionary of Task IDs to Task instances (partial results possible on timeout).

RAISES DESCRIPTION
TimeoutError

If timeout is reached and no task is found (only in non-batch mode).

Example
q = Queue(...)

# wait indefinitely for task to complete
updated_task = await q.wait_for(task)
assert updated_task.completed is True

# wait up to 5 seconds for task to complete
try:
    updated_task = await q.wait_for(task, timeout=5)
    if updated_task:
        assert updated_task.completed is True
    else:
        print("Task not found or still pending after timeout")
except TimeoutError:
    print("Task did not complete within timeout")

# batch mode
updated_tasks = await q.wait_for([task1, task2, task3], timeout=10)

for task_id, task in updated_tasks.items():
    print(f"Task {task_id} completed: {task.completed}")

# Note: updated_tasks may contain only a subset of tasks if timeout is reached
Source code in src/sheppy/queue.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
async def wait_for(self, task: Task | UUID | str | list[Task | UUID | str], timeout: float = 0) -> dict[UUID, Task] | Task | None:
    """Wait for task to complete and return updated task instance.

    Args:
        task: Instance of a Task or its ID, or list of Task instances/IDs for batch mode.
        timeout: Maximum time to wait in seconds. Default is 0 (wait indefinitely).<br>
                 If timeout is reached, returns None (or partial results in batch mode).<br>
                 In batch mode, this is the maximum time to wait for all tasks to complete.<br>
                 Note: In non-batch mode, if timeout is reached and no task is found, a TimeoutError is raised.

    Returns:
        Instance of a Task or None if not found or timeout reached.<br>In batch mode, returns dictionary of Task IDs to Task instances (partial results possible on timeout).

    Raises:
        TimeoutError: If timeout is reached and no task is found (only in non-batch mode).

    Example:
        ```python
        q = Queue(...)

        # wait indefinitely for task to complete
        updated_task = await q.wait_for(task)
        assert updated_task.completed is True

        # wait up to 5 seconds for task to complete
        try:
            updated_task = await q.wait_for(task, timeout=5)
            if updated_task:
                assert updated_task.completed is True
            else:
                print("Task not found or still pending after timeout")
        except TimeoutError:
            print("Task did not complete within timeout")

        # batch mode
        updated_tasks = await q.wait_for([task1, task2, task3], timeout=10)

        for task_id, task in updated_tasks.items():
            print(f"Task {task_id} completed: {task.completed}")

        # Note: updated_tasks may contain only a subset of tasks if timeout is reached
        ```
    """
    await self.__ensure_backend_is_connected()

    task_ids, batch_mode = self._get_task_ids(task)
    task_results = await self.backend.get_results(self.name, task_ids, timeout)

    if batch_mode:
        return {UUID(t_id): Task.model_validate(t) for t_id, t in task_results.items()}

    td = task_results.get(task_ids[0])

    return Task.model_validate(td) if td else None

get_all_tasks

get_all_tasks() -> list[Task]

Get all tasks, including completed/failed ones.

RETURNS DESCRIPTION
list[Task]

List of all tasks

Source code in src/sheppy/queue.py
91
92
93
94
95
96
97
98
99
async def get_all_tasks(self) -> list[Task]:
    """Get all tasks, including completed/failed ones.

    Returns:
        List of all tasks
    """
    await self.__ensure_backend_is_connected()
    tasks_data = await self.backend.get_all_tasks(self.name)
    return [Task.model_validate(t) for t in tasks_data]

get_scheduled

get_scheduled() -> list[Task]

List scheduled tasks.

RETURNS DESCRIPTION
list[Task]

List of scheduled tasks

Source code in src/sheppy/queue.py
153
154
155
156
157
158
159
160
async def get_scheduled(self) -> list[Task]:
    """List scheduled tasks.

    Returns:
        List of scheduled tasks
    """
    await self.__ensure_backend_is_connected()
    return [Task.model_validate(t) for t in await self.backend.get_scheduled(self.name)]

get_pending

get_pending(count: int = 1) -> list[Task]

List pending tasks.

PARAMETER DESCRIPTION
count

Number of pending tasks to retrieve.

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION
list[Task]

List of pending tasks

Source code in src/sheppy/queue.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
async def get_pending(self, count: int = 1) -> list[Task]:
    """List pending tasks.

    Args:
        count: Number of pending tasks to retrieve.

    Returns:
        List of pending tasks
    """
    if count <= 0:
        raise ValueError("Value must be larger than zero")

    await self.__ensure_backend_is_connected()

    return [Task.model_validate(t) for t in await self.backend.get_pending(self.name, count)]

retry

retry(
    task: Task | UUID | str,
    at: datetime | timedelta | None = None,
    force: bool = False,
) -> bool

Retry failed task.

PARAMETER DESCRIPTION
task

Instance of a Task or its ID

TYPE: Task | UUID | str

at

When to retry the task.
- If None (default), retries immediately.
- If timedelta is provided, it will be added to current time.
Note: datetime must be offset-aware (i.e. have timezone info).

TYPE: datetime | timedelta | None DEFAULT: None

force

If True, allows retrying even if task has completed successfully. Defaults to False.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
bool

Success boolean

RAISES DESCRIPTION
ValueError

If task has already completed successfully and force is not set to True.

TypeError

If provided datetime is not offset-aware.

Example
q = Queue(...)

# retry task immediately
success = await q.retry(task)
assert success is True

# or retry after 5 minutes
await q.retry(task, at=timedelta(minutes=5))

# or at specific time
await q.retry(task, at=datetime.fromisoformat("2026-01-01 00:00:00 +00:00"))

# force retry even if task is completed (= finished successfully)
await q.retry(task, force=True)
Source code in src/sheppy/queue.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
async def retry(self, task: Task | UUID | str, at: datetime | timedelta | None = None, force: bool = False) -> bool:
    """Retry failed task.

    Args:
        task: Instance of a Task or its ID
        at: When to retry the task.<br>
            - If None (default), retries immediately.<br>
            - If timedelta is provided, it will be added to current time.<br>
            *Note: datetime must be offset-aware (i.e. have timezone info).*
        force: If True, allows retrying even if task has completed successfully. Defaults to False.

    Returns:
        Success boolean

    Raises:
        ValueError: If task has already completed successfully and force is not set to True.
        TypeError: If provided datetime is not offset-aware.

    Example:
        ```python
        q = Queue(...)

        # retry task immediately
        success = await q.retry(task)
        assert success is True

        # or retry after 5 minutes
        await q.retry(task, at=timedelta(minutes=5))

        # or at specific time
        await q.retry(task, at=datetime.fromisoformat("2026-01-01 00:00:00 +00:00"))

        # force retry even if task is completed (= finished successfully)
        await q.retry(task, force=True)
        ```
    """
    _task = await self.get_task(task)  # ensure_backend_is_connected is called in get_task already
    if not _task:
        return False

    if not force and _task.completed:
        raise ValueError("Task has already completed successfully, use force to retry anyways")

    needs_update = False  # temp hack
    if _task.finished_at:
        needs_update = True
        _task.__dict__["last_retry_at"] = datetime.now(timezone.utc)
        _task.__dict__["next_retry_at"] = datetime.now(timezone.utc)
        _task.__dict__["finished_at"] = None

    if at:
        if isinstance(at, timedelta):
            at = datetime.now(timezone.utc) + at

        if not at.tzinfo:
            raise TypeError("provided datetime must be offset-aware")

        if needs_update:
            _task.__dict__["next_retry_at"] = at
            _task.__dict__["scheduled_at"] = at

        return await self.backend.schedule(self.name, _task.model_dump(mode="json"), at, unique=False)

    success = await self.backend.append(self.name, [_task.model_dump(mode="json")], unique=False)
    return success[0]

size

size() -> int

Get number of pending tasks in the queue.

RETURNS DESCRIPTION
int

Number of pending tasks

Example
q = Queue(...)

await q.add(task)

count = await q.size()
assert count == 1
Source code in src/sheppy/queue.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
async def size(self) -> int:
    """Get number of pending tasks in the queue.

    Returns:
        Number of pending tasks

    Example:
        ```python
        q = Queue(...)

        await q.add(task)

        count = await q.size()
        assert count == 1
        ```
    """
    await self.__ensure_backend_is_connected()
    return await self.backend.size(self.name)

clear

clear() -> int

Clear all tasks, including completed ones.

Source code in src/sheppy/queue.py
308
309
310
311
async def clear(self) -> int:
    """Clear all tasks, including completed ones."""
    await self.__ensure_backend_is_connected()
    return await self.backend.clear(self.name)

add_cron

add_cron(task: Task, cron: str) -> bool

Add a cron job to run a task on a schedule.

PARAMETER DESCRIPTION
task

Instance of a Task

TYPE: Task

cron

Cron expression string (e.g. "*/5 * * * *" to run every 5 minutes)

TYPE: str

RETURNS DESCRIPTION
bool

Success boolean

Example
q = Queue(...)

@task
async def say_hello(to: str) -> str:
    print(f"[{datetime.now()}] Hello, {to}!")

# schedule task to run every minute
await q.add_cron(say_hello("World"), "* * * * *")
Source code in src/sheppy/queue.py
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
async def add_cron(self, task: Task, cron: str) -> bool:
    """Add a cron job to run a task on a schedule.

    Args:
        task: Instance of a Task
        cron: Cron expression string (e.g. "*/5 * * * *" to run every 5 minutes)

    Returns:
        Success boolean

    Example:
        ```python
        q = Queue(...)

        @task
        async def say_hello(to: str) -> str:
            print(f"[{datetime.now()}] Hello, {to}!")

        # schedule task to run every minute
        await q.add_cron(say_hello("World"), "* * * * *")
        ```
    """
    await self.__ensure_backend_is_connected()
    task_cron = TaskFactory.create_cron_from_task(task, cron)
    return await self.backend.add_cron(self.name, str(task_cron.deterministic_id), task_cron.model_dump(mode="json"))

delete_cron

delete_cron(task: Task, cron: str) -> bool

Delete a cron job.

PARAMETER DESCRIPTION
task

Instance of a Task

TYPE: Task

cron

Cron expression string used when adding the cron job

TYPE: str

RETURNS DESCRIPTION
bool

Success boolean

Example
q = Queue(...)

# delete previously added cron job
success = await q.delete_cron(say_hello("World"), "* * * * *")
assert success is True
Source code in src/sheppy/queue.py
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
async def delete_cron(self, task: Task, cron: str) -> bool:
    """Delete a cron job.

    Args:
        task: Instance of a Task
        cron: Cron expression string used when adding the cron job

    Returns:
        Success boolean

    Example:
        ```python
        q = Queue(...)

        # delete previously added cron job
        success = await q.delete_cron(say_hello("World"), "* * * * *")
        assert success is True
        ```
    """
    await self.__ensure_backend_is_connected()
    task_cron = TaskFactory.create_cron_from_task(task, cron)
    return await self.backend.delete_cron(self.name, str(task_cron.deterministic_id))

get_crons

get_crons() -> list[TaskCron]

List all cron jobs.

RETURNS DESCRIPTION
list[TaskCron]

List of TaskCron instances

Example
q = Queue(...)

crons = await q.get_crons()

for cron in crons:
    print(f"Cron ID: {cron.id}, Expression: {cron.expression}, TaskSpec: {cron.spec}")
Source code in src/sheppy/queue.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
async def get_crons(self) -> list[TaskCron]:
    """List all cron jobs.

    Returns:
        List of TaskCron instances

    Example:
        ```python
        q = Queue(...)

        crons = await q.get_crons()

        for cron in crons:
            print(f"Cron ID: {cron.id}, Expression: {cron.expression}, TaskSpec: {cron.spec}")
        ```
    """
    await self.__ensure_backend_is_connected()
    return [TaskCron.model_validate(tc) for tc in await self.backend.get_crons(self.name)]