Worker
class reference¶
In most cases, you can run worker using CLI command sheppy work
. See CLI reference for more details.
If you need to run it programmatically, you can use the Worker
class.
from sheppy import Worker
worker = Worker()
sheppy.Worker
¶
Worker(
queue_name: str | list[str],
backend: Backend,
shutdown_timeout: float = 30.0,
max_concurrent_tasks: int = 10,
enable_job_processing: bool = True,
enable_scheduler: bool = True,
enable_cron_manager: bool = True,
)
Worker that processes tasks from the queue.
The Worker monitors the specified queue(s) for pending tasks and processes them asynchronously. It uses blocking pop operations to efficiently wait for new tasks. The worker can handle multiple tasks concurrently, up to a specified limit. It also handles scheduled tasks and cron jobs.
PARAMETER | DESCRIPTION |
---|---|
queue_name
|
Name of the queue or list of queue names to process tasks from.
TYPE:
|
backend
|
Instance of the backend to use for storing and retrieving tasks.
TYPE:
|
shutdown_timeout
|
Time in seconds to wait for active tasks to complete during shutdown. Default is 30.0 seconds.
TYPE:
|
max_concurrent_tasks
|
Maximum number of tasks to process concurrently. Default is 10.
TYPE:
|
enable_job_processing
|
If True, enables job processing. Default is True.
TYPE:
|
enable_scheduler
|
If True, enables the scheduler to enqueue scheduled tasks. Default is True.
TYPE:
|
enable_cron_manager
|
If True, enables the cron manager to handle cron jobs. Default is True.
TYPE:
|
ATTRIBUTE | DESCRIPTION |
---|---|
queues |
List of Queue instances corresponding to the specified queue names.
TYPE:
|
worker_id |
Unique identifier for the worker instance.
TYPE:
|
stats |
Statistics about processed and failed tasks.
TYPE:
|
enable_job_processing |
Indicates if job processing is enabled.
TYPE:
|
enable_scheduler |
Indicates if the scheduler is enabled.
TYPE:
|
enable_cron_manager |
Indicates if the cron manager is enabled.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
ValueError
|
If none of the processing types (job processing, scheduler, cron manager) are enabled. |
Example
import asyncio
from sheppy import Worker, RedisBackend
async def main():
backend = RedisBackend()
worker = Worker(queue_name="default", backend=backend)
await worker.work()
if __name__ == "__main__":
asyncio.run(main())
Source code in src/sheppy/worker.py
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
|
work
¶
work(
max_tasks: int | None = None,
oneshot: bool = False,
register_signal_handlers: bool = True,
) -> None
Start worker to process tasks from the queue.
PARAMETER | DESCRIPTION |
---|---|
max_tasks
|
Maximum number of tasks to process before shutting down. If None, process indefinitely.
TYPE:
|
oneshot
|
If True, process tasks until the queue is empty, then shut down.
TYPE:
|
register_signal_handlers
|
If True, register SIGINT and SIGTERM signal handlers for graceful shutdown. Default is True.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
None
|
None |
RAISES | DESCRIPTION |
---|---|
BackendError
|
If there is an issue connecting to the backend. |
Note
- The worker can be gracefully shut down by sending a SIGINT or SIGTERM signal (e.g., pressing CTRL+C).
- If the worker is already shutting down, pressing CTRL+C multiple times (default 3) will force an immediate shutdown.
- The worker will attempt to complete active tasks before shutting down, up to the specified shutdown timeout.
- If there are still active tasks after the timeout, they will be cancelled.
Source code in src/sheppy/worker.py
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
|