Skip to content

Worker class reference

In most cases, you can run worker using CLI command sheppy work. See CLI reference for more details.

If you need to run it programmatically, you can use the Worker class.

from sheppy import Worker

worker = Worker()

sheppy.Worker

Worker(
    queue_name: str | list[str],
    backend: Backend,
    shutdown_timeout: float = 30.0,
    max_concurrent_tasks: int = 10,
    enable_job_processing: bool = True,
    enable_scheduler: bool = True,
    enable_cron_manager: bool = True,
)

Worker that processes tasks from the queue.

The Worker monitors the specified queue(s) for pending tasks and processes them asynchronously. It uses blocking pop operations to efficiently wait for new tasks. The worker can handle multiple tasks concurrently, up to a specified limit. It also handles scheduled tasks and cron jobs.

PARAMETER DESCRIPTION
queue_name

Name of the queue or list of queue names to process tasks from.

TYPE: str | list[str]

backend

Instance of the backend to use for storing and retrieving tasks.

TYPE: Backend

shutdown_timeout

Time in seconds to wait for active tasks to complete during shutdown. Default is 30.0 seconds.

TYPE: float DEFAULT: 30.0

max_concurrent_tasks

Maximum number of tasks to process concurrently. Default is 10.

TYPE: int DEFAULT: 10

enable_job_processing

If True, enables job processing. Default is True.

TYPE: bool DEFAULT: True

enable_scheduler

If True, enables the scheduler to enqueue scheduled tasks. Default is True.

TYPE: bool DEFAULT: True

enable_cron_manager

If True, enables the cron manager to handle cron jobs. Default is True.

TYPE: bool DEFAULT: True

ATTRIBUTE DESCRIPTION
queues

List of Queue instances corresponding to the specified queue names.

TYPE: list[Queue]

worker_id

Unique identifier for the worker instance.

TYPE: str

stats

Statistics about processed and failed tasks.

TYPE: WorkerStats

enable_job_processing

Indicates if job processing is enabled.

TYPE: bool

enable_scheduler

Indicates if the scheduler is enabled.

TYPE: bool

enable_cron_manager

Indicates if the cron manager is enabled.

TYPE: bool

RAISES DESCRIPTION
ValueError

If none of the processing types (job processing, scheduler, cron manager) are enabled.

Example
import asyncio
from sheppy import Worker, RedisBackend

async def main():
    backend = RedisBackend()
    worker = Worker(queue_name="default", backend=backend)

    await worker.work()

if __name__ == "__main__":
    asyncio.run(main())
Source code in src/sheppy/worker.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def __init__(
    self,
    queue_name: str | list[str],
    backend: Backend,
    shutdown_timeout: float = 30.0,
    max_concurrent_tasks: int = 10,
    enable_job_processing: bool = True,
    enable_scheduler: bool = True,
    enable_cron_manager: bool = True,
):
    if not any([enable_job_processing, enable_scheduler, enable_cron_manager]):
        raise ValueError("At least one processing type must be enabled")

    self._backend = backend

    if not isinstance(queue_name, list|tuple):
        queue_name = [str(queue_name)]
    self.queues = [Queue(backend, q) for q in queue_name]

    self.shutdown_timeout = shutdown_timeout
    self.worker_id = generate_unique_worker_id("worker")
    self.stats = WorkerStats()

    self._task_processor = TaskProcessor()
    self._task_semaphore = asyncio.Semaphore(max_concurrent_tasks)
    self._shutdown_event = asyncio.Event()
    self._ctrl_c_counter = 0

    self._blocking_timeout = 5
    self._scheduler_polling_interval = 1.0
    self._cron_polling_interval = 10.0

    self._active_tasks: dict[str, dict[asyncio.Task[Task], Task]] = {queue.name: {} for queue in self.queues}

    self.enable_job_processing = enable_job_processing
    self.enable_scheduler = enable_scheduler
    self.enable_cron_manager = enable_cron_manager

    self._work_queue_tasks: list[asyncio.Task[None]] = []
    self._scheduler_task: asyncio.Task[None] | None = None
    self._cron_manager_task: asyncio.Task[None] | None = None

    self._tasks_to_process: int | None = None
    self._empty_queues: list[str] = []

work

work(
    max_tasks: int | None = None,
    oneshot: bool = False,
    register_signal_handlers: bool = True,
) -> None

Start worker to process tasks from the queue.

PARAMETER DESCRIPTION
max_tasks

Maximum number of tasks to process before shutting down. If None, process indefinitely.

TYPE: int | None DEFAULT: None

oneshot

If True, process tasks until the queue is empty, then shut down.

TYPE: bool DEFAULT: False

register_signal_handlers

If True, register SIGINT and SIGTERM signal handlers for graceful shutdown. Default is True.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
None

None

RAISES DESCRIPTION
BackendError

If there is an issue connecting to the backend.

Note
  • The worker can be gracefully shut down by sending a SIGINT or SIGTERM signal (e.g., pressing CTRL+C).
  • If the worker is already shutting down, pressing CTRL+C multiple times (default 3) will force an immediate shutdown.
  • The worker will attempt to complete active tasks before shutting down, up to the specified shutdown timeout.
  • If there are still active tasks after the timeout, they will be cancelled.
Source code in src/sheppy/worker.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
async def work(self, max_tasks: int | None = None, oneshot: bool = False, register_signal_handlers: bool = True) -> None:
    """Start worker to process tasks from the queue.

    Args:
        max_tasks (int | None): Maximum number of tasks to process before shutting down. If None, process indefinitely.
        oneshot (bool): If True, process tasks until the queue is empty, then shut down.
        register_signal_handlers (bool): If True, register SIGINT and SIGTERM signal handlers for graceful shutdown. Default is True.

    Returns:
        None

    Raises:
        BackendError: If there is an issue connecting to the backend.

    Note:
        - The worker can be gracefully shut down by sending a SIGINT or SIGTERM signal (e.g., pressing CTRL+C).
        - If the worker is already shutting down, pressing CTRL+C multiple times (default 3) will force an immediate shutdown.
        - The worker will attempt to complete active tasks before shutting down, up to the specified shutdown timeout.
        - If there are still active tasks after the timeout, they will be cancelled.
    """
    # register signals
    loop = asyncio.get_event_loop()
    if register_signal_handlers:
        self.__register_signal_handlers(loop)

    self._tasks_to_process = max_tasks
    self._empty_queues.clear()

    # reset state (likely relevant only for tests)
    self._shutdown_event.clear()
    self._ctrl_c_counter = 0

    # test connection
    await self._verify_connection(self._backend)

    # start scheduler
    if self.enable_scheduler:
        self._scheduler_task = asyncio.create_task(self._run_scheduler(self._scheduler_polling_interval))

    # start cron manager
    if self.enable_cron_manager:
        self._cron_manager_task = asyncio.create_task(self._run_cron_manager(self._cron_polling_interval))

    # start job processing
    if self.enable_job_processing:
        for queue in self.queues:
            self._work_queue_tasks.append(asyncio.create_task(self._run_worker_loop(queue, oneshot)))

    # blocking wait for created asyncio tasks
    _futures = self._work_queue_tasks
    _futures += [self._scheduler_task] if self._scheduler_task else []
    _futures += [self._cron_manager_task] if self._cron_manager_task else []
    await asyncio.gather(*_futures, return_exceptions=True)
    self._shutdown_event.set()

    # this is starting to feel like Perl
    remaining_tasks = {k: v for inner_dict in self._active_tasks.values() for k, v in inner_dict.items()}

    # attempt to exit cleanly
    if remaining_tasks:
        logger.info(WORKER_PREFIX + f"Waiting for {len(remaining_tasks)} active tasks to complete...")
        try:
            await asyncio.wait_for(
                asyncio.gather(*remaining_tasks.keys(), return_exceptions=True),
                timeout=self.shutdown_timeout
            )
        except asyncio.TimeoutError:
            logger.warning("Some tasks did not complete within shutdown timeout")

            # ! FIXME - what should we do here with the existing tasks? (maybe DLQ?)

            for task_future in remaining_tasks:
                if not task_future.done():
                    task_future.cancel()

                    # ! FIXME - should we try reqeueue here or just store state?
                    # task = remaining_tasks[task_future]
                    # try:
                    #     await queue.add(task)
                    # except Exception as e:
                    #     logger.error(f"Failed to requeue task {task.id}: {e}", exc_info=True)

    # unregister signals
    if register_signal_handlers:
        for sig in (signal.SIGTERM, signal.SIGINT):
            loop.remove_signal_handler(sig)

    logger.info(f"Worker stopped. Processed: {self.stats.processed}, Failed: {self.stats.failed}")

sheppy.worker.WorkerStats

Bases: BaseModel

processed

processed: int = 0

failed

failed: int = 0