From 1ef7b9158367e35cdacf17baff3a5864540db0ea Mon Sep 17 00:00:00 2001 From: RafaJEstabile Date: Thu, 5 Mar 2026 11:58:31 -0300 Subject: [PATCH 1/3] Improve README --- README.md | 652 +++++++++++++++++++++++++----------------------------- 1 file changed, 299 insertions(+), 353 deletions(-) diff --git a/README.md b/README.md index c4aac85..b1f7752 100644 --- a/README.md +++ b/README.md @@ -1,497 +1,443 @@ # OmniQ (Python) -**OmniQ** is a Redis + Lua, language-agnostic job queue.\ -This package is the **Python client** for OmniQ v1. +OmniQ Python is the **Python client** for **OmniQ**, a Redis + Lua-based task +queue designed for distributed processing with strict control over execution, +concurrency, and state. The library provides a direct interface for publishing, +consuming, and managing jobs while maintaining operational safety guarantees and +consistency, even in concurrent and distributed environments. Core project / docs: https://github.com/not-empty/omniq + ------------------------------------------------------------------------ -## Key Ideas - -- **Hybrid lanes** - - Ungrouped jobs by default - - Optional grouped jobs (FIFO per group + per-group concurrency) -- **Lease-based execution** - - Workers reserve a job with a time-limited lease -- **Token-gated ACK / heartbeat** - - `reserve()` returns a `lease_token` - - `heartbeat()` and `ack_*()` must include the same token -- **Pause / resume (flag-only)** - - Pausing prevents *new reserves* - - Running jobs are not interrupted - - Jobs are not moved -- **Admin-safe operations** - - Strict `retry`, `retry_batch`, `remove`, `remove_batch` -- **Handler-driven execution layer** - - `ctx.exec` exposes internal OmniQ operations safely inside handlers +## Why OmniQ ------------------------------------------------------------------------- +Redis-based queues typically sacrifice predictability under concurrency. -## Install +**OmniQ was designed to provide:** +- Atomic state transitions (with Lua) +- Lease-based execution (no double processing) +- Group isolation with FIFO ordering +- Deterministic control over pause, retry, and concurrency -``` bash -pip install omniq -``` +Focused on operational safety in distributed environments. ------------------------------------------------------------------------ -## Quick Start +## Execution +The development environment can be started in two different ways, depending on your setup and infrastructure preferences: -### Publish +- With Docker — recommended for isolation, reproducibility, and consistent Redis configuration. -``` python -# importing the lib -from omniq.client import OmniqClient +- Without Docker — using a locally installed and properly configured Redis instance. -# creating OmniQ passing redis information -omniq = OmniqClient( - host="omniq-redis", - port=6379, -) +Both approaches allow you to run the examples and validate queue behavior in a local environment. -# publishing the job -job_id = omniq.publish( - queue="demo", - payload={"hello": "world"}, - timeout_ms=30_000 -) +------------------------------------------------------------------------ -print("OK", job_id) -``` +## Installation ------------------------------------------------------------------------- +### Option 1 - Using Docker -### Publish Structured JSON +**1. Create Project** -``` python -from dataclasses import dataclass -from typing import List, Optional +``` bash +mkdir omniq-sample +cd omniq-sample +``` +**2. Create docker-compose.yml** -# importing the lib -from omniq.client import OmniqClient +``` python +services: + omniq-redis: + image: redis:7.4-alpine + container_name: omniq-redis + ports: + - "6379:6379" + command: ["redis-server", "--appendonly", "yes"] + volumes: + - redis_data:/data + + omniq-valkey: + image: valkey/valkey:9-alpine + container_name: omniq-valkey + ports: + - "6380:6379" + command: ["valkey-server", "--appendonly", "yes"] + volumes: + - valkey_data:/data + + omniq-python: + image: python:3.12-slim + container_name: omniq-python + working_dir: /app + environment: + PYTHONUNBUFFERED: "1" + volumes: + - ./:/app + depends_on: + - omniq-redis + command: ["bash", "-lc", "tail -f /dev/null"] +``` +choose either Redis or Valkey to run this project. +**3. Start environment** -# Nested structure -@dataclass -class Customer: - id: str - email: str - vip: bool +``` bash +docker compose up -d +``` +**4. Access Python container** +``` bash +docker exec -it omniq-python bash +pip install omniq +``` -# Main payload -@dataclass -class OrderCreated: - order_id: str - customer: Customer - amount: int - currency: str - items: List[str] - processed: bool - retry_count: int - tags: Optional[List[str]] = None +**5. Create Simple example** +``` bash +mkdir simple +cd simple +``` -# creating OmniQ passing redis information -omniq = OmniqClient( - host="omniq-redis", - port=6379, -) +**publish.py** +``` python +from omniq.client import OmniqClient -# creating structured payload -payload = OrderCreated( - order_id="ORD-2026-0001", - customer=Customer( - id="CUST-99", - email="leo@example.com", - vip=True, - ), - amount=1500, - currency="USD", - items=["keyboard", "mouse"], - processed=False, - retry_count=0, - tags=["priority", "online"], -) +omniq = OmniqClient(host="omniq-redis", port=6379) -# publish using publish_json -job_id = omniq.publish_json( - queue="deno", - payload=payload, - max_attempts=5, - timeout_ms=60_000, +job_id = omniq.publish( + queue="demo", + payload={"hello": "world"}, + timeout_ms=30_000 ) -print("OK", job_id) +print("Published:", job_id) ``` ------------------------------------------------------------------------- - -### Consume - +**consumer.py** ``` python import time - -# importing the lib from omniq.client import OmniqClient -# creating your handler (ctx will have all the job information and actions) -def my_actions(ctx): - print("Waiting 2 seconds") +def handler(ctx): + print("Processing:", ctx.job_id) time.sleep(2) print("Done") -# creating OmniQ passing redis information -omniq = OmniqClient( - host="omniq-redis", - port=6379, -) +omniq = OmniqClient(host="omniq-redis", port=6379) -# creating the consumer that will listen and execute the actions in your handler omniq.consume( queue="demo", - handler=my_actions, - verbose=True, - drain=False, + handler=handler, + verbose=True ) ``` +**Run:** +``` bash +python publish.py +python consumer.py +``` + ------------------------------------------------------------------------ -## Handler Context +### Option 2 - Running Locally Without Docker -Inside `handler(ctx)`: +**1. Start Redis** +``` bash +redis-server +``` -- `queue` -- `job_id` -- `payload_raw` -- `payload` -- `attempt` -- `lock_until_ms` -- `lease_token` -- `gid` -- `exec` → execution layer (`ctx.exec`) +**2. Create Project** +``` bash +mkdir omniq-sample +cd omniq-sample +pip install omniq +``` +Then create publish.py and consumer.py as shown above ------------------------------------------------------------------------ -# Administrative Operations +## Publishing -All admin operations are **Lua-backed and atomic**. - -## retry_failed() +### Publish Simple Job ``` python -omniq.retry_failed(queue="demo", job_id="01ABC...") -``` - -- Works only if job state is `failed` -- Resets attempt counter -- Respects grouping rules - ------------------------------------------------------------------------- +from omniq.client import OmniqClient -## retry_failed_batch() +omniq = OmniqClient(host="omniq-redis", port=6379) -``` python -results = omniq.retry_failed_batch( +job_id = omniq.publish( queue="demo", - job_ids=["01A...", "01B...", "01C..."] + payload={"hello": "world"}, + timeout_ms=30_000 ) - -for job_id, status, reason in results: - print(job_id, status, reason) ``` +The publish method enqueues a job with a JSON-serializable payload. -- Max 100 jobs per call -- Atomic batch -- Per-job result returned +**Key behavior:** +- The job receives a unique job_id +- timeout_ms defines the lease duration once reserved +- The payload is stored atomically +This is the simplest way to enqueue work for asynchronous processing. ------------------------------------------------------------------------ -## remove_job() +### Publish Structured Payload ``` python -omniq.remove_job( - queue="demo", - job_id="01ABC...", - lane="failed", # wait | delayed | failed | completed | gwait +from dataclasses import dataclass +from typing import Optional, List +from omniq.client import OmniqClient + +@dataclass +class OrderCreated: + order_id: str + amount: int + currency: str + tags: Optional[List[str]] = None + +omniq = OmniqClient(host="omniq-redis", port=6379) + +job_id = omniq.publish_json( + queue="orders", + payload=OrderCreated( + order_id="ORD-1", + amount=1000, + currency="USD", + tags=["priority"] + ), + max_attempts=5, + timeout_ms=60_000 ) ``` +The publish_json method allows you to send structured objects (such as dataclasses) that are automatically serialized to JSON before being stored in the queue. -Rules: - -- Cannot remove active jobs -- Lane must match job state -- Group safety preserved +**Advantages:** +- Safe and standardized serialization +- Typed and predictable payload structure +- Explicit max_attempts control +- Better organization for event-driven systems +- Recommended for production environments where payload contracts must remain stable. ------------------------------------------------------------------------ -## remove_jobs_batch() +## Consumption ``` python -results = omniq.remove_jobs_batch( +omniq.consume( queue="demo", - lane="failed", - job_ids=["01A...", "01B...", "01C..."] + handler=handler, + verbose=True, + drain=False ) ``` +The consume method starts a worker loop that continuously reserves and processes jobs from the specified queue. -- Max 100 per call -- Strict lane validation -- Atomic per batch +**Parameters:** +- queue — Target queue name +- handler — Function responsible for processing each job +- verbose=True — Enables execution logs (reservations, ACKs, retries, errors) +- drain=False — Keeps the consumer running, waiting for new jobs + +With drain=False, the worker behaves as a long-running consumer suitable for services and background processors. ------------------------------------------------------------------------ -## pause() +## Drain Mode ``` python -pause_result = omniq.pause( +omniq.consume( queue="demo", + handler=handler, + drain=True ) +``` +**When drain=True, the consumer:** +- Processes all currently available jobs +- Does not wait for new jobs +- Automatically exits once the queue is empty + +**Best suited for:** +- Batch processing +- Maintenance scripts +- Controlled execution pipelines -resume_result = omniq.resume( - queue="demo", -) +------------------------------------------------------------------------ -is_paused = omniq.is_paused( - queue="demo", -) +## Heartbeat + +For long-running jobs: +``` python +ctx.exec.heartbeat() ``` +The heartbeat call renews the lease of the currently running job. + +**Behavior** +- Uses the same lease_token issued at reservation +- Extends lock_until_ms safely +- Prevents the job from returning to the queue while still being processed +- Essential for long or unpredictable execution times. + ------------------------------------------------------------------------ -# Handler Context +## Handler Context -Inside `handler(ctx)`: +### Inside handler(ctx): -- `queue` -- `job_id` -- `payload_raw` -- `payload` -- `attempt` -- `lock_until_ms` -- `lease_token` -- `gid` -- `exec` +| Field | Description | +|----------------|--------------------------------------------| +| queue | Queue name | +| job_id | Unique identifier | +| payload | Deserialized payload | +| payload_raw | Raw JSON | +| attempt | Current attempt number | +| lock_until_ms | Lease expiration timestamp | +| lease_token | Required token for ACK/heartbeat | +| gid | Group identifier | +| exec | Secure layer for administrative operations | ------------------------------------------------------------------------ -# Child Ack Control (Parent / Child Workflows) - -A handler-driven primitive for fan-out workflows. - -No TTL. Cleanup happens only when counter reaches zero. +## Grouped Queues -## Parent Example - -The first queue will receive a document with 5 pages ``` python -# importing the lib -from omniq.client import OmniqClient - -# creating OmniQ passing redis information -omniq = OmniqClient( - host="omniq-redis", - port=6379, -) - -# publishing the job -job_id = omniq.publish( - queue="documents", - payload={ - "document_id": "doc-123", # this will be our unique key to initiate childs and tracking then until completion - "pages": 5, # each page must be completed before something happen - }, +omniq.publish( + queue="payments", + payload={"invoice": 1}, + gid="company:acme", + group_limit=1 ) -print("OK", job_id) ``` +By defining a gid, the job becomes part of a logical group. -The first consumer will publish a job for each page passing the unique key for childs tracking -``` python -# importing the lib -from omniq.client import OmniqClient +**Guarantees:** +- FIFO ordering within the same group +- Parallel execution across different groups +- Round-robin scheduling between active groups +- Concurrency control via group_limit -# creating OmniQ passing redis information -omniq = OmniqClient( - host="omniq-redis", - port=6379, -) +This allows isolation of tenants, customers, or entities without blocking the entire system. -# publishing the job -job_id = omniq.publish( - queue="documents", - payload={ - "document_id": "doc-123", # this will be our unique key to initiate childs and tracking then until completion - "pages": 5, # each page must be completed before something happen - }, -) -print("OK", job_id) -``` +------------------------------------------------------------------------ -## Child Example +## Retry -The second consumer will deal with each page and ack each child (alerting when the last page was processed) +### Retry Failed Job ``` python -import time - -# importing the lib -from omniq.client import OmniqClient - -# creating your handler (ctx will have all the job information and actions) -def page_worker(ctx): - - page = ctx.payload["page"] - # getting the unique key to track the childs - completion_key = ctx.payload["completion_key"] - - print(f"[page_worker] Processing page {page} (job_id={ctx.job_id})") - time.sleep(1.5) - - # acking itself as a child the number of remaining jobs are returned so we can say when the last job was executed - remaining = ctx.exec.child_ack(completion_key) - - print(f"[page_worker] Page {page} done. Remaining={remaining}") - - - # remaining will be 0 ONLY when this is the last job - # will return > 0 when are still jobs to process - # and -1 if something goes wrong with the counter - if remaining == 0: - print("[page_worker] Last page finished.") - -# creating OmniQ passing redis information -omniq = OmniqClient( - host="omniq-redis", - port=6379, -) - -# creating the consumer that will listen and execute the actions in your handler -omniq.consume( - queue="pages", - handler=page_worker, - verbose=True, - drain=False, +omniq.retry_failed( + queue="demo", + job_id="01ABC..." ) ``` +Reactivates a job currently in the failed lane. -Properties: +**Rules:** +- Only works if the job is in failed state +- Resets the attempt counter +- Preserves group and concurrency rules -- Idempotent decrement -- Safe under retries -- Cross-queue safe -- Fully business-logic driven ------------------------------------------------------------------------- - -## Grouped Jobs +### Batch Retry (up to 100) ``` python -# if you provide a gid (group_id) you can limit the parallel execution for jobs in the same group -omniq.publish(queue="demo", payload={"i": 1}, gid="company:acme", group_limit=1) - -# you can also publis ungrouped jobs that will also be executed (fairness by round-robin algorithm) -omniq.publish(queue="demo", payload={"i": 2}) +results = omniq.retry_failed_batch( + queue="demo", + job_ids=["01A...", "01B..."] +) ``` +Allows retrying multiple failed jobs at once. -- FIFO inside group -- Groups execute in parallel -- Concurrency limited per group +**Characteristics:** +- Atomic operation (Lua-backed) +- Up to 100 jobs per call +- Individual result per job (status + reason) ------------------------------------------------------------------------ -## Pause and Resume inside the consumer +## Removal -You publish your job as usual ``` python -# importing the lib -from omniq.client import OmniqClient - -# creating OmniQ passing redis information -uq = OmniqClient( - host="omniq-redis", - port=6379, -) - -# publishing the job -job_id = uq.publish( - queue="test", - payload={"hello": "world"}, - timeout_ms=30_000 +omniq.remove_jobs_batch( + queue="demo", + lane="failed", + job_ids=["01A...", "01B..."] ) -print("OK", job_id) ``` +Removes jobs from a specific lane. -Inside your consumer you can pause/resume your queue (or another one) -``` python -import time - -# importing the lib -from omniq.client import OmniqClient +**Important rules:** +- Active jobs cannot be removed +- The provided lane must match the actual job state +- Group integrity is preserved +- Used for administrative cleanup and manual state control. -# creating your handler (ctx will have all the job information and actions) -def pause_unpause_example(ctx): - print("Waiting 2 seconds") +------------------------------------------------------------------------ - # checking if this queue it is paused (spoiler: it's not) - is_paused = ctx.exec.is_paused( - queue="test" - ) - print("Is paused", is_paused) - time.sleep(2) +## Pause and Resume +``` python +omniq.pause(queue="demo") +omniq.resume(queue="demo") +omniq.is_paused(queue="demo") +``` +Controls queue execution flow. - print("Pausing") +**Guarantees:** +- Prevents new reservations while paused +- Active jobs continue running normally +- No job is lost or moved unexpectedly +- Immediate and consistent resume behavior - # pausing this queue (this job it's and others active jobs will be not affected but not new job will be start until queue is resumed) - ctx.exec.pause( - queue="test" - ) +Can be used externally or inside a handler via ctx.exec. - # checking again now is suposed to be paused - is_paused = ctx.exec.is_paused( - queue="test" - ) - print("Is paused", is_paused) - time.sleep(2) +------------------------------------------------------------------------ - print("Resuming") +## Parent/Child Flow - # resuming this queue (all other workers can process jobs again) - ctx.exec.resume( - queue="test" - ) +``` python +remaining = ctx.exec.child_ack(completion_key) - # checking again and is suposed to be resumed - is_paused = ctx.exec.is_paused( - queue="test" - ) - print("Is paused", is_paused) - time.sleep(2) +if remaining == 0: + print("Last child completed") +``` +**Returns:** +- Pending children -> `> 0` +- Last Child -> `0` +- Counter error -> `1` - print("Done") +**Properties:** +- Idempotent +- Retry-safe +- Isolated across queues -# creating OmniQ passing redis information -omniq = OmniqClient( - host="omniq-redis", - port=6379, -) +------------------------------------------------------------------------ +## Administrative Operations +All administrative operations are `executed atomically via Lua`, ensuring +consistency even under high concurrency. -# creating the consumer that will listen and execute the actions in your handler -omniq.consume( - queue="test", - handler=pause_unpause_example, - verbose=True, - drain=False, -) -``` +------------------------------------------------------------------------ ## Examples -All examples can be found in the `./examples` folder. +For a better understanding of queue behavior and its main features, see +the examples available in the `./examples` folder. + +**You will find scenarios such as:** +- Basic job publishing and consumption +- Using the `ctx.exec` layer inside handlers +- Parent/child workflows with `childs_init` and `child_ack` +- Coordination between multiple queues (documents -> pages) +- Execution control using pause and resume inside handlers + +The examples are recommended to understand the full execution flow in +real-world environments. ------------------------------------------------------------------------ ## License - -See the repository license. +GPL-3.0 +Refer to the main repository for details From 8d8de913578a06e1be882aae7d9938e15396ad8e Mon Sep 17 00:00:00 2001 From: RafaJEstabile Date: Mon, 9 Mar 2026 13:43:03 -0300 Subject: [PATCH 2/3] Improve README and add Developer --- Developer.md | 443 ++++++++++++++++++++++++++++++ README.md | 750 +++++++++++++++++++++++++++++---------------------- 2 files changed, 873 insertions(+), 320 deletions(-) create mode 100644 Developer.md diff --git a/Developer.md b/Developer.md new file mode 100644 index 0000000..d20d474 --- /dev/null +++ b/Developer.md @@ -0,0 +1,443 @@ +# OmniQ (Python) + +OmniQ Python is the **Python client** for **OmniQ**, a Redis + Lua-based task +queue designed for distributed processing with strict control over execution +concurrency, and state. The library provides a direct interface for publishing, +consuming, and managing jobs while maintaining operational safety guarantees and +consistency, even in concurrent and distributed environments. + +Core project / docs: https://github.com/not-empty/omniq + +------------------------------------------------------------------------ + +## Why OmniQ + +Redis-based queues typically sacrifice predictability under concurrency. + +**OmniQ was designed to provide:** +- Atomic state transitions (with Lua) +- Lease-based execution (no double processing) +- Group isolation with FIFO ordering +- Deterministic control over pause, retry, and concurrency + +Focused on operational safety in distributed environments. + +------------------------------------------------------------------------ + +## Execution +The development environment can be started in two different ways, depending on your setup and infrastructure preferences: + +- With Docker — recommended for isolation, reproducibility, and consistent Redis configuration. + +- Without Docker — using a locally installed and properly configured Redis instance. + +Both approaches allow you to run the examples and validate queue behavior in a local environment. + +------------------------------------------------------------------------ + +## Installation + +### Option 1 - Using Docker + +**1. Create Project** + +``` bash +mkdir omniq-sample +cd omniq-sample +``` +**2. Create docker-compose.yml** + +``` python +services: + omniq-redis: + image: redis:7.4-alpine + container_name: omniq-redis + ports: + - "6379:6379" + command: ["redis-server", "--appendonly", "yes"] + volumes: + - redis_data:/data + + omniq-valkey: + image: valkey/valkey:9-alpine + container_name: omniq-valkey + ports: + - "6380:6379" + command: ["valkey-server", "--appendonly", "yes"] + volumes: + - valkey_data:/data + + omniq-python: + image: python:3.12-slim + container_name: omniq-python + working_dir: /app + environment: + PYTHONUNBUFFERED: "1" + volumes: + - ./:/app + depends_on: + - omniq-redis + command: ["bash", "-lc", "tail -f /dev/null"] +``` +choose either Redis or Valkey to run this project. + +**3. Start environment** + +``` bash +docker compose up -d +``` +**4. Access Python container** + +``` bash +docker exec -it omniq-python bash +pip install omniq +``` + +**5. Create Simple example** + +``` bash +mkdir simple +cd simple +``` + +**publish.py** +``` python +from omniq.client import OmniqClient + +omniq = OmniqClient(host="omniq-redis", port=6379) + +job_id = omniq.publish( + queue="demo", + payload={"hello": "world"}, + timeout_ms=30_000 +) + +print("Published:", job_id) +``` + +**consumer.py** +``` python +import time +from omniq.client import OmniqClient + +def handler(ctx): + print("Processing:", ctx.job_id) + time.sleep(2) + print("Done") + +omniq = OmniqClient(host="omniq-redis", port=6379) + +omniq.consume( + queue="demo", + handler=handler, + verbose=True +) +``` +**Run:** +``` bash +python publish.py +python consumer.py +``` + +------------------------------------------------------------------------ + +### Option 2 - Running Locally Without Docker + +**1. Start Redis** +``` bash +redis-server +``` + +**2. Create Project** +``` bash +mkdir omniq-sample +cd omniq-sample +pip install omniq +``` +Then create publish.py and consumer.py as shown above + +------------------------------------------------------------------------ + +## Publishing + +### Publish Simple Job + +``` python +from omniq.client import OmniqClient + +omniq = OmniqClient(host="omniq-redis", port=6379) + +job_id = omniq.publish( + queue="demo", + payload={"hello": "world"}, + timeout_ms=30_000 +) +``` +The publish method enqueues a job with a JSON-serializable payload. + +**Key behavior:** +- The job receives a unique job_id +- timeout_ms defines the lease duration once reserved +- The payload is stored atomically +This is the simplest way to enqueue work for asynchronous processing. + +------------------------------------------------------------------------ + +### Publish Structured Payload + +``` python +from dataclasses import dataclass +from typing import Optional, List +from omniq.client import OmniqClient + +@dataclass +class OrderCreated: + order_id: str + amount: int + currency: str + tags: Optional[List[str]] = None + +omniq = OmniqClient(host="omniq-redis", port=6379) + +job_id = omniq.publish_json( + queue="orders", + payload=OrderCreated( + order_id="ORD-1", + amount=1000, + currency="USD", + tags=["priority"] + ), + max_attempts=5, + timeout_ms=60_000 +) +``` +The publish_json method allows you to send structured objects (such as dataclasses) that are automatically serialized to JSON before being stored in the queue. + +**Advantages:** +- Safe and standardized serialization +- Typed and predictable payload structure +- Explicit max_attempts control +- Better organization for event-driven systems +- Recommended for production environments where payload contracts must remain stable. + +------------------------------------------------------------------------ + +## Consumption + +``` python +omniq.consume( + queue="demo", + handler=handler, + verbose=True, + drain=False +) +``` +The consume method starts a worker loop that continuously reserves and processes jobs from the specified queue. + +**Parameters:** +- queue — Target queue name +- handler — Function responsible for processing each job +- verbose=True — Enables execution logs (reservations, ACKs, retries, errors) +- drain=False — Keeps the consumer running, waiting for new jobs + +With drain=False, the worker behaves as a long-running consumer suitable for services and background processors. + +------------------------------------------------------------------------ + +## Drain Mode + +``` python +omniq.consume( + queue="demo", + handler=handler, + drain=True +) +``` +**When drain=True, the consumer:** +- Processes all currently available jobs +- Does not wait for new jobs +- Automatically exits once the queue is empty + +**Best suited for:** +- Batch processing +- Maintenance scripts +- Controlled execution pipelines + +------------------------------------------------------------------------ + +## Heartbeat + +For long-running jobs: +``` python +ctx.exec.heartbeat() +``` +The heartbeat call renews the lease of the currently running job. + +**Behavior** +- Uses the same lease_token issued at reservation +- Extends lock_until_ms safely +- Prevents the job from returning to the queue while still being processed +- Essential for long or unpredictable execution times. + +------------------------------------------------------------------------ + +## Handler Context + +### Inside handler(ctx): + +| Field | Description | +|----------------|--------------------------------------------| +| queue | Queue name | +| job_id | Unique identifier | +| payload | Deserialized payload | +| payload_raw | Raw JSON | +| attempt | Current attempt number | +| lock_until_ms | Lease expiration timestamp | +| lease_token | Required token for ACK/heartbeat | +| gid | Group identifier | +| exec | Secure layer for administrative operations | + +------------------------------------------------------------------------ + +## Grouped Queues + +``` python +omniq.publish( + queue="payments", + payload={"invoice": 1}, + gid="company:acme", + group_limit=1 +) +``` +By defining a gid, the job becomes part of a logical group. + +**Guarantees:** +- FIFO ordering within the same group +- Parallel execution across different groups +- Round-robin scheduling between active groups +- Concurrency control via group_limit + +This allows isolation of tenants, customers, or entities without blocking the entire system. + +------------------------------------------------------------------------ + +## Retry + +### Retry Failed Job + +``` python +omniq.retry_failed( + queue="demo", + job_id="01ABC..." +) +``` +Reactivates a job currently in the failed lane. + +**Rules:** +- Only works if the job is in failed state +- Resets the attempt counter +- Preserves group and concurrency rules + +------------------------------------------------------------------------ + +### Batch Retry (up to 100) + +``` python +results = omniq.retry_failed_batch( + queue="demo", + job_ids=["01A...", "01B..."] +) +``` +Allows retrying multiple failed jobs at once. + +**Characteristics:** +- Atomic operation (Lua-backed) +- Up to 100 jobs per call +- Individual result per job (status + reason) + +------------------------------------------------------------------------ + +## Removal + +``` python +omniq.remove_jobs_batch( + queue="demo", + lane="failed", + job_ids=["01A...", "01B..."] +) +``` +Removes jobs from a specific lane. + +**Important rules:** +- Active jobs cannot be removed +- The provided lane must match the actual job state +- Group integrity is preserved +- Used for administrative cleanup and manual state control. + +------------------------------------------------------------------------ + +## Pause and Resume + +``` python +omniq.pause(queue="demo") +omniq.resume(queue="demo") +omniq.is_paused(queue="demo") +``` +Controls queue execution flow. + +**Guarantees:** +- Prevents new reservations while paused +- Active jobs continue running normally +- No job is lost or moved unexpectedly +- Immediate and consistent resume behavior + +Can be used externally or inside a handler via ctx.exec. + +------------------------------------------------------------------------ + +## Parent/Child Flow + +``` python +remaining = ctx.exec.child_ack(completion_key) + +if remaining == 0: + print("Last child completed") +``` +**Returns:** +- Pending children -> `> 0` +- Last Child -> `0` +- Counter error -> `1` + +**Properties:** +- Idempotent +- Retry-safe +- Isolated across queues + +------------------------------------------------------------------------ + +## Administrative Operations +All administrative operations are `executed atomically via Lua`, ensuring +consistency even under high concurrency. + +------------------------------------------------------------------------ + +## Examples + +For a better understanding of queue behavior and its main features, see +the examples available in the `./examples` folder. + +**You will find scenarios such as:** +- Basic job publishing and consumption +- Using the `ctx.exec` layer inside handlers +- Parent/child workflows with `childs_init` and `child_ack` +- Coordination between multiple queues (documents -> pages) +- Execution control using pause and resume inside handlers + +The examples are recommended to understand the full execution flow in +real-world environments. + +------------------------------------------------------------------------ + +## License +GPL-3.0 +Refer to the main repository for details diff --git a/README.md b/README.md index b1f7752..7b90ec2 100644 --- a/README.md +++ b/README.md @@ -1,443 +1,553 @@ + # OmniQ (Python) -OmniQ Python is the **Python client** for **OmniQ**, a Redis + Lua-based task -queue designed for distributed processing with strict control over execution, -concurrency, and state. The library provides a direct interface for publishing, -consuming, and managing jobs while maintaining operational safety guarantees and -consistency, even in concurrent and distributed environments. +OmniQ is a **distributed job queue built on top of Redis + Lua**, +designed to run background tasks with **predictable execution,** +**concurrency control, and distributed workflows**. -Core project / docs: https://github.com/not-empty/omniq +This package is the **official Python client for OmniQ v1**. +Core project: +[https://github.com/not-empty/omniq](https://github.com/not-empty/omniq) ------------------------------------------------------------------------ -## Why OmniQ +## The Story Behind OmniQ + +Almost every modern system needs to execute tasks outside the main request flow. + +Common examples include: + +- sending emails +- generating reports or documents +- processing images or videos +- running data pipelines + +At first, solving this problem seems simple. +You create a queue, add a few workers, and start processing jobs in the background. -Redis-based queues typically sacrifice predictability under concurrency. +But as the system grows, new challenges start to appear. Problems like +**duplicate jobs, workers crashing mid-execution,or uncontrolled concurrency** +become harder to manage with a simple queue. -**OmniQ was designed to provide:** -- Atomic state transitions (with Lua) -- Lease-based execution (no double processing) -- Group isolation with FIFO ordering -- Deterministic control over pause, retry, and concurrency +Many existing solutions focus mainly on **moving messages**, but not on managing +the **full lifecycle of a job execution**. -Focused on operational safety in distributed environments. +OmniQ was created to solve this problem by providing a more reliable model for +running distributed background jobs, with explicit control over execution, +concurrency, and task coordination. ------------------------------------------------------------------------ -## Execution -The development environment can be started in two different ways, depending on your setup and infrastructure preferences: +## How OmniQ Works -- With Docker — recommended for isolation, reproducibility, and consistent Redis configuration. +The core flow of OmniQ is simple. -- Without Docker — using a locally installed and properly configured Redis instance. +An application publishes jobs, and workers process them. -Both approaches allow you to run the examples and validate queue behavior in a local environment. +``` +Application + │ + ▼ + OmniQ Queue + │ + ▼ + Workers + │ + ▼ + Processing +``` ------------------------------------------------------------------------- +Or visually: -## Installation +```mermaid +flowchart LR -### Option 1 - Using Docker - -**1. Create Project** - -``` bash -mkdir omniq-sample -cd omniq-sample -``` -**2. Create docker-compose.yml** - -``` python -services: - omniq-redis: - image: redis:7.4-alpine - container_name: omniq-redis - ports: - - "6379:6379" - command: ["redis-server", "--appendonly", "yes"] - volumes: - - redis_data:/data - - omniq-valkey: - image: valkey/valkey:9-alpine - container_name: omniq-valkey - ports: - - "6380:6379" - command: ["valkey-server", "--appendonly", "yes"] - volumes: - - valkey_data:/data - - omniq-python: - image: python:3.12-slim - container_name: omniq-python - working_dir: /app - environment: - PYTHONUNBUFFERED: "1" - volumes: - - ./:/app - depends_on: - - omniq-redis - command: ["bash", "-lc", "tail -f /dev/null"] -``` -choose either Redis or Valkey to run this project. - -**3. Start environment** - -``` bash -docker compose up -d -``` -**4. Access Python container** - -``` bash -docker exec -it omniq-python bash -pip install omniq +A[Application] --> B[OmniQ Queue] +B --> C[Worker 1] +B --> D[Worker 2] +B --> E[Worker 3] + +C --> F[Processing] +D --> F +E --> F ``` -**5. Create Simple example** +This allows multiple machines to work **in parallel** processing tasks. + +------------------------------------------------------------------------ + +## System Architecture + +OmniQ uses **Redis as its storage and coordination engine**. -``` bash -mkdir simple -cd simple +The critical queue logic runs through **atomic Lua scripts**, ensuring consistent behavior even when multiple workers operate concurrently. + +``` +Application + │ + ▼ + OmniQ Client + │ + ▼ + Redis + │ + ▼ + Workers ``` -**publish.py** -``` python -from omniq.client import OmniqClient +Or visually: -omniq = OmniqClient(host="omniq-redis", port=6379) +```mermaid +flowchart TB -job_id = omniq.publish( - queue="demo", - payload={"hello": "world"}, - timeout_ms=30_000 -) +App[Application] -print("Published:", job_id) +Client[OmniQ Python Client] + +Redis[(Redis / Valkey)] + +Worker1[Worker] +Worker2[Worker] +Worker3[Worker] + +App --> Client +Client --> Redis + +Worker1 --> Redis +Worker2 --> Redis +Worker3 --> Redis ``` -**consumer.py** -``` python -import time -from omniq.client import OmniqClient +This architecture enables: + +- distributed execution +- strong consistency under concurrency +- atomic operations +- automatic failure recovery + +------------------------------------------------------------------------ + +## Core Concepts -def handler(ctx): - print("Processing:", ctx.job_id) - time.sleep(2) - print("Done") +Before using OmniQ, it helps to understand a few fundamental concepts. -omniq = OmniqClient(host="omniq-redis", port=6379) +------------------------------------------------------------------------ + +## Job + +A **job** represents a unit of work that needs to be executed. + +Examples of jobs: + +- sending an email +- generating a report +- converting a video +- processing an image +- analyzing a file + +Each job contains basic metadata: -omniq.consume( - queue="demo", - handler=handler, - verbose=True -) ``` -**Run:** -``` bash -python publish.py -python consumer.py +Job + ├─ id + ├─ queue + ├─ payload + ├─ attempts + └─ state ``` +This allows the system to track the entire lifecycle of a task. ------------------------------------------------------------------------ -### Option 2 - Running Locally Without Docker +## Payload + +The **payload** contains the data required to execute the job. + +It represents the **context of the task**. + +Example: + +Job: send email -**1. Start Redis** -``` bash -redis-server +``` +payload + ├─ to + ├─ subject + └─ template ``` -**2. Create Project** -``` bash -mkdir omniq-sample -cd omniq-sample -pip install omniq +Another example: + +Job: generate report + +``` +payload + ├─ company_id + ├─ start_date + └─ end_date +``` + +Visual representation: + ``` -Then create publish.py and consumer.py as shown above +Job + ├─ type: generate_report + └─ payload + ├─ company_id + ├─ start_date + └─ end_date +``` + +Workers use the payload data to perform the work. ------------------------------------------------------------------------ -## Publishing +## Publishing a Job -### Publish Simple Job +Publishing a job means **sending a task to the queue**. -``` python -from omniq.client import OmniqClient +When a job is published: -omniq = OmniqClient(host="omniq-redis", port=6379) +1. it receives a unique ID +2. it is stored in the queue +3. a worker can reserve and execute it -job_id = omniq.publish( - queue="demo", - payload={"hello": "world"}, - timeout_ms=30_000 -) +Flow: + +```mermaid +sequenceDiagram + +Application->>OmniQ: publish job +OmniQ->>Queue: store job +Worker->>Queue: reserve job +Worker->>Worker: process +Worker->>Queue: ack completed ``` -The publish method enqueues a job with a JSON-serializable payload. -**Key behavior:** -- The job receives a unique job_id -- timeout_ms defines the lease duration once reserved -- The payload is stored atomically -This is the simplest way to enqueue work for asynchronous processing. +Simplified view: + +``` +Application + │ + ▼ + publish job + │ + ▼ + Queue + │ + ▼ + Worker +``` ------------------------------------------------------------------------ -### Publish Structured Payload - -``` python -from dataclasses import dataclass -from typing import Optional, List -from omniq.client import OmniqClient - -@dataclass -class OrderCreated: - order_id: str - amount: int - currency: str - tags: Optional[List[str]] = None - -omniq = OmniqClient(host="omniq-redis", port=6379) - -job_id = omniq.publish_json( - queue="orders", - payload=OrderCreated( - order_id="ORD-1", - amount=1000, - currency="USD", - tags=["priority"] - ), - max_attempts=5, - timeout_ms=60_000 -) -``` -The publish_json method allows you to send structured objects (such as dataclasses) that are automatically serialized to JSON before being stored in the queue. - -**Advantages:** -- Safe and standardized serialization -- Typed and predictable payload structure -- Explicit max_attempts control -- Better organization for event-driven systems -- Recommended for production environments where payload contracts must remain stable. +## Queues ------------------------------------------------------------------------- +Jobs are organized into **queues**. + +Each queue usually represents a type of workload. + +Example queues: + +``` +emails +documents +images +payments +reports +``` + +Visualization: -## Consumption +``` +Queue: emails -``` python -omniq.consume( - queue="demo", - handler=handler, - verbose=True, - drain=False -) + ├─ job1 + ├─ job2 + └─ job3 ``` -The consume method starts a worker loop that continuously reserves and processes jobs from the specified queue. -**Parameters:** -- queue — Target queue name -- handler — Function responsible for processing each job -- verbose=True — Enables execution logs (reservations, ACKs, retries, errors) -- drain=False — Keeps the consumer running, waiting for new jobs - -With drain=False, the worker behaves as a long-running consumer suitable for services and background processors. +Workers may consume jobs from **one or multiple queues**. ------------------------------------------------------------------------ -## Drain Mode - -``` python -omniq.consume( - queue="demo", - handler=handler, - drain=True -) -``` -**When drain=True, the consumer:** -- Processes all currently available jobs -- Does not wait for new jobs -- Automatically exits once the queue is empty - -**Best suited for:** -- Batch processing -- Maintenance scripts -- Controlled execution pipelines +## Lease-Based Execution + +OmniQ uses a **lease-based execution model**. + +When a worker reserves a job, it receives a **temporary lease**. + +``` +Worker reserves job + │ + ▼ +Job becomes locked to that worker + │ + ▼ +Worker processes it +``` + +Diagram: + +```mermaid +sequenceDiagram + +Worker->>Queue: reserve job +Queue->>Worker: job + lease_token +Worker->>Queue: heartbeat +Worker->>Queue: ack success +``` + +If the worker crashes: + +- the lease expires +- the job becomes available again + +This prevents jobs from becoming **permanently stuck**. ------------------------------------------------------------------------ -## Heartbeat +## Heartbeats + +Some jobs may take a long time to finish. + +Examples include: + +- video processing +- large data analysis +- heavy report generation -For long-running jobs: -``` python -ctx.exec.heartbeat() +Workers can send **heartbeats** to extend the lease. + +``` +Worker + ├─ start job + ├─ heartbeat + ├─ heartbeat + └─ finish job ``` -The heartbeat call renews the lease of the currently running job. -**Behavior** -- Uses the same lease_token issued at reservation -- Extends lock_until_ms safely -- Prevents the job from returning to the queue while still being processed -- Essential for long or unpredictable execution times. - +This tells the system that the job **is still actively running**. + ------------------------------------------------------------------------ -## Handler Context +## Grouped Jobs + +OmniQ supports grouping jobs using **groups**. + +This is useful when you need to limit concurrency within a logical group. + +Example: processing jobs per customer. + +``` +Queue: payments + +Group A (customer A) + ├─ job1 + └─ job2 + +Group B (customer B) + ├─ job3 + └─ job4 +``` + +Guarantees: + +- **FIFO ordering within a group** +- groups run **in parallel** +- configurable concurrency limits per group + +Diagram: + +```mermaid +flowchart TD + +A[Queue] -### Inside handler(ctx): +A --> B[Group A] +A --> C[Group B] -| Field | Description | -|----------------|--------------------------------------------| -| queue | Queue name | -| job_id | Unique identifier | -| payload | Deserialized payload | -| payload_raw | Raw JSON | -| attempt | Current attempt number | -| lock_until_ms | Lease expiration timestamp | -| lease_token | Required token for ACK/heartbeat | -| gid | Group identifier | -| exec | Secure layer for administrative operations | +B --> B1[job] +B --> B2[job] + +C --> C1[job] +C --> C2[job] +``` ------------------------------------------------------------------------ -## Grouped Queues +## Ungrouped Jobs + +Jobs can also be published **without a group**. -``` python -omniq.publish( - queue="payments", - payload={"invoice": 1}, - gid="company:acme", - group_limit=1 -) ``` -By defining a gid, the job becomes part of a logical group. +Queue -**Guarantees:** -- FIFO ordering within the same group -- Parallel execution across different groups -- Round-robin scheduling between active groups -- Concurrency control via group_limit + ├─ job A + ├─ job B + └─ job C +``` -This allows isolation of tenants, customers, or entities without blocking the entire system. +OmniQ uses a **round-robin strategy** to maintain fairness between grouped and ungrouped jobs. ------------------------------------------------------------------------ -## Retry +## Workflows with Child Jobs -### Retry Failed Job +Some tasks need to be split into smaller pieces. -``` python -omniq.retry_failed( - queue="demo", - job_id="01ABC..." -) +For example: processing a multi-page document. + +``` +Document Job + │ + ▼ +Split into pages ``` -Reactivates a job currently in the failed lane. -**Rules:** -- Only works if the job is in failed state -- Resets the attempt counter -- Preserves group and concurrency rules +Diagram: +```mermaid +flowchart TD -### Batch Retry (up to 100) +A[Document Job] -``` python -results = omniq.retry_failed_batch( - queue="demo", - job_ids=["01A...", "01B..."] -) +A --> B[Page 1] +A --> C[Page 2] +A --> D[Page 3] +A --> E[Page 4] + +B --> F[Completion] +C --> F +D --> F +E --> F ``` -Allows retrying multiple failed jobs at once. -**Characteristics:** -- Atomic operation (Lua-backed) -- Up to 100 jobs per call -- Individual result per job (status + reason) +Each page can be processed by **a different worker**, enabling massive parallel processing. ------------------------------------------------------------------------ -## Removal +## Child Job Coordination + +OmniQ provides a simple mechanism to coordinate these child jobs. -``` python -omniq.remove_jobs_batch( - queue="demo", - lane="failed", - job_ids=["01A...", "01B..."] -) +Each child reports when it finishes: + +``` +Page 1 → done +Page 2 → done +Page 3 → done +Page 4 → done ``` -Removes jobs from a specific lane. -**Important rules:** -- Active jobs cannot be removed -- The provided lane must match the actual job state -- Group integrity is preserved -- Used for administrative cleanup and manual state control. +Internally, a **remaining jobs counter** is tracked. ------------------------------------------------------------------------- +Example: -## Pause and Resume +``` +Remaining = 4 -``` python -omniq.pause(queue="demo") -omniq.resume(queue="demo") -omniq.is_paused(queue="demo") +Child 1 finished → Remaining = 3 +Child 2 finished → Remaining = 2 +Child 3 finished → Remaining = 1 +Child 4 finished → Remaining = 0 ``` -Controls queue execution flow. -**Guarantees:** -- Prevents new reservations while paused -- Active jobs continue running normally -- No job is lost or moved unexpectedly -- Immediate and consistent resume behavior +When the counter reaches **zero**, the workflow can continue. -Can be used externally or inside a handler via ctx.exec. +Key properties: + +- idempotent +- safe for retries +- works across different queues +- does not depend on TTL ------------------------------------------------------------------------ -## Parent/Child Flow +## Job States + +During its lifecycle, a job transitions through several states. -``` python -remaining = ctx.exec.child_ack(completion_key) +```mermaid +stateDiagram-v2 -if remaining == 0: - print("Last child completed") +[*] --> waiting +waiting --> reserved +reserved --> processing +processing --> completed +processing --> failed ``` -**Returns:** -- Pending children -> `> 0` -- Last Child -> `0` -- Counter error -> `1` -**Properties:** -- Idempotent -- Retry-safe -- Isolated across queues +This makes the system state **observable and traceable**. ------------------------------------------------------------------------ + ## Administrative Operations -All administrative operations are `executed atomically via Lua`, ensuring -consistency even under high concurrency. + +OmniQ provides safe administrative operations for queue management. + +Examples: + +- retry failed jobs +- remove jobs +- pause queues +- resume queues + +These operations run through **atomic Lua scripts**, ensuring consistency even under high concurrency. + +------------------------------------------------------------------------ + +## When to Use OmniQ + +OmniQ is a good fit for systems that need to: + +- run background tasks reliably +- control concurrency +- coordinate distributed pipelines +- split large workloads into smaller jobs +- prevent duplicate execution +- maintain operational reliability + +Common use cases include: + +- document processing +- data pipelines +- media processing +- report generation +- backend automation + +------------------------------------------------------------------------ + +## Installation + +``` +pip install omniq +``` ------------------------------------------------------------------------ ## Examples -For a better understanding of queue behavior and its main features, see -the examples available in the `./examples` folder. +Complete examples are available in: + +``` +./examples +``` + +They include: -**You will find scenarios such as:** -- Basic job publishing and consumption -- Using the `ctx.exec` layer inside handlers -- Parent/child workflows with `childs_init` and `child_ack` -- Coordination between multiple queues (documents -> pages) -- Execution control using pause and resume inside handlers - -The examples are recommended to understand the full execution flow in -real-world environments. +- publishing jobs +- basic workers +- structured payloads +- parent/child workflows +- queue coordination ------------------------------------------------------------------------ ## License -GPL-3.0 -Refer to the main repository for details + +See the license file in the repository. + +------------------------------------------------------------------------ \ No newline at end of file From b1e3188beba68bb10ef1077db60c0b898ef284c0 Mon Sep 17 00:00:00 2001 From: RafaJEstabile Date: Mon, 16 Mar 2026 14:35:20 -0300 Subject: [PATCH 3/3] update README --- README.md | 786 ++++++++++++++++++++++++++---------------------------- 1 file changed, 377 insertions(+), 409 deletions(-) diff --git a/README.md b/README.md index 7b90ec2..7ea38a3 100644 --- a/README.md +++ b/README.md @@ -1,553 +1,521 @@ - # OmniQ (Python) -OmniQ is a **distributed job queue built on top of Redis + Lua**, -designed to run background tasks with **predictable execution,** -**concurrency control, and distributed workflows**. - -This package is the **official Python client for OmniQ v1**. - -Core project: -[https://github.com/not-empty/omniq](https://github.com/not-empty/omniq) - ------------------------------------------------------------------------- - -## The Story Behind OmniQ +Python client for **OmniQ**, a Redis-based distributed job queue designed +for deterministic **consumer-driven job execution and coordination**. -Almost every modern system needs to execute tasks outside the main request flow. +OmniQ provides primitives for **job reservation, execution, and coordination** +**directly inside Redis**, allowing multiple **consumers** to safely process +jobs in a distributed system. -Common examples include: +Unlike traditional queues that treat jobs as transient messages, OmniQ +maintains **explicit execution and state structures**, enabling predictable +control over concurrency, ordering, and failure recovery. -- sending emails -- generating reports or documents -- processing images or videos -- running data pipelines +The system is **language-agnostic**, allowing producers and consumers +implemented in different runtimes to share the same execution model. -At first, solving this problem seems simple. -You create a queue, add a few workers, and start processing jobs in the background. - -But as the system grows, new challenges start to appear. Problems like -**duplicate jobs, workers crashing mid-execution,or uncontrolled concurrency** -become harder to manage with a simple queue. - -Many existing solutions focus mainly on **moving messages**, but not on managing -the **full lifecycle of a job execution**. +Core project: -OmniQ was created to solve this problem by providing a more reliable model for -running distributed background jobs, with explicit control over execution, -concurrency, and task coordination. +[https://github.com/not-empty/omniq](https://github.com/not-empty/omniq) ------------------------------------------------------------------------ -## How OmniQ Works - -The core flow of OmniQ is simple. - -An application publishes jobs, and workers process them. - -``` -Application - │ - ▼ - OmniQ Queue - │ - ▼ - Workers - │ - ▼ - Processing -``` - -Or visually: - -```mermaid -flowchart LR - -A[Application] --> B[OmniQ Queue] -B --> C[Worker 1] -B --> D[Worker 2] -B --> E[Worker 3] +## Installation -C --> F[Processing] -D --> F -E --> F +```bash +pip install omniq ``` -This allows multiple machines to work **in parallel** processing tasks. - ------------------------------------------------------------------------ -## System Architecture - -OmniQ uses **Redis as its storage and coordination engine**. +## Features + +- **Redis-native execution model -** + - Job reservation, execution, and coordination happen atomically inside Redis +- **Consumer- driven processing -** + - Workers control execution lifecycle instead of passive message delivery +- **Deterministic job state -** + - Explicit lanes for `wait`, `delayed`, `active`, `failed`, and `completed` +- **Grouped jobs with concurrency limits -** + - FIFO within groups with parallel execution across groups +- **Atomic administrative operations -** + - Retry, removal, pause, and batch operations backed by Lua Scripts +- **Parent/Child workflow primitive -** + - Fan-out processing with idempotent completion tracking +- **Structured payload support -** + - Publish typed dataclasses as JSON +- **Language-agnostic architecture -** + - Producers and consumers can run in different runtimes. -The critical queue logic runs through **atomic Lua scripts**, ensuring consistent behavior even when multiple workers operate concurrently. -``` -Application - │ - ▼ - OmniQ Client - │ - ▼ - Redis - │ - ▼ - Workers -``` - -Or visually: - -```mermaid -flowchart TB +------------------------------------------------------------------------ -App[Application] +## Quick Start -Client[OmniQ Python Client] +### Publish -Redis[(Redis / Valkey)] +```python +# importing the lib +from omniq.client import OmniqClient -Worker1[Worker] -Worker2[Worker] -Worker3[Worker] +# creating OmniQ passing redis information +omniq = OmniqClient( + host="omniq-redis", + port=6379, +) -App --> Client -Client --> Redis +# publishing the job +job_id = omniq.publish( + queue="demo", + payload={"hello": "world"}, + timeout_ms=30_000 +) -Worker1 --> Redis -Worker2 --> Redis -Worker3 --> Redis +print("OK", job_id) ``` -This architecture enables: - -- distributed execution -- strong consistency under concurrency -- atomic operations -- automatic failure recovery - ------------------------------------------------------------------------ -## Core Concepts - -Before using OmniQ, it helps to understand a few fundamental concepts. - +### Publish Structured JSON + +```python +from dataclasses import dataclass +from typing import List, Optional + +# importing the lib +from omniq.client import OmniqClient + + +# Nested structure +@dataclass +class Customer: + id: str + email: str + vip: bool + + +# Main payload +@dataclass +class OrderCreated: + order_id: str + customer: Customer + amount: int + currency: str + items: List[str] + processed: bool + retry_count: int + tags: Optional[List[str]] = None + + +# creating OmniQ passing redis information +omniq = OmniqClient( + host="omniq-redis", + port=6379, +) + +# creating structured payload +payload = OrderCreated( + order_id="ORD-2026-0001", + customer=Customer( + id="CUST-99", + email="leo@example.com", + vip=True, + ), + amount=1500, + currency="USD", + items=["keyboard", "mouse"], + processed=False, + retry_count=0, + tags=["priority", "online"], +) + +# publish using publish_json +job_id = omniq.publish_json( + queue="deno", + payload=payload, + max_attempts=5, + timeout_ms=60_000, +) + +print("OK", job_id) +``` ------------------------------------------------------------------------ -## Job +### Consume -A **job** represents a unit of work that needs to be executed. +```python +import time -Examples of jobs: +# importing the lib +from omniq.client import OmniqClient -- sending an email -- generating a report -- converting a video -- processing an image -- analyzing a file +# creating your handler (ctx will have all the job information and actions) +def my_actions(ctx): + print("Waiting 2 seconds") + time.sleep(2) + print("Done") -Each job contains basic metadata: +# creating OmniQ passing redis information +omniq = OmniqClient( + host="omniq-redis", + port=6379, +) +# creating the consumer that will listen and execute the actions in your handler +omniq.consume( + queue="demo", + handler=my_actions, + verbose=True, + drain=False, +) ``` -Job - ├─ id - ├─ queue - ├─ payload - ├─ attempts - └─ state -``` - -This allows the system to track the entire lifecycle of a task. ------------------------------------------------------------------------ -## Payload +## Handler Context -The **payload** contains the data required to execute the job. +Inside `handler(ctx)`: +- `queue` +- `job_id` +- `payload_raw` +- `payload` +- `attempt` +- `lock_until_ms` +- `lease_token` +- `gid` +- `exec` - execution layer (`ctx.exex`) -It represents the **context of the task**. - -Example: - -Job: send email - -``` -payload - ├─ to - ├─ subject - └─ template -``` - -Another example: +------------------------------------------------------------------------ -Job: generate report +## Admistrative OPerations -``` -payload - ├─ company_id - ├─ start_date - └─ end_date -``` +All admin operations are **Lua-backend and atomic** -Visual representation: +### Retry_failed() +```bash +omniq.retry_failed(queue="demo", job_id="01ABC...") ``` -Job - ├─ type: generate_report - └─ payload - ├─ company_id - ├─ start_date - └─ end_date -``` - -Workers use the payload data to perform the work. +- Works only if job state is `failed` +- Resets attempt counter +- Respects grouping rule ------------------------------------------------------------------------ -## Publishing a Job - -Publishing a job means **sending a task to the queue**. - -When a job is published: - -1. it receives a unique ID -2. it is stored in the queue -3. a worker can reserve and execute it +### Retry_failed_batch() -Flow: +```bash +results = omniq.retry_failed_batch( + queue="demo", + job_ids=["01A...", "01B...", "01C..."] +) -```mermaid -sequenceDiagram - -Application->>OmniQ: publish job -OmniQ->>Queue: store job -Worker->>Queue: reserve job -Worker->>Worker: process -Worker->>Queue: ack completed -``` - -Simplified view: - -``` -Application - │ - ▼ - publish job - │ - ▼ - Queue - │ - ▼ - Worker +for job_id, status, reason in results: + print(job_id, status, reason) ``` +- Max 100 jobs per call +- Atomic batch +- Per-job result returned ------------------------------------------------------------------------ -## Queues - -Jobs are organized into **queues**. - -Each queue usually represents a type of workload. +### Remove_job() -Example queues: - -``` -emails -documents -images -payments -reports +```bash +omniq.remove_job( + queue="demo", + job_id="01ABC...", + lane="failed", # wait | delayed | failed | completed | gwait +) ``` +**Rules:** +- Cannot remove active jobs +- Lane must match job state +- Group safety preserved -Visualization: +------------------------------------------------------------------------ -``` -Queue: emails +### Remove_job_batch() - ├─ job1 - ├─ job2 - └─ job3 +```bash +results = omniq.remove_jobs_batch( + queue="demo", + lane="failed", + job_ids=["01A...", "01B...", "01C..."] +) ``` - -Workers may consume jobs from **one or multiple queues**. +**Rules:** +- Max 100 per call +- Strict lane validation +- Atomic per batch ------------------------------------------------------------------------ -## Lease-Based Execution +### Pause() -OmniQ uses a **lease-based execution model**. +```bash +pause_result = omniq.pause( + queue="demo", +) -When a worker reserves a job, it receives a **temporary lease**. +resume_result = omniq.resume( + queue="demo", +) +is_paused = omniq.is_paused( + queue="demo", +) ``` -Worker reserves job - │ - ▼ -Job becomes locked to that worker - │ - ▼ -Worker processes it -``` - -Diagram: - -```mermaid -sequenceDiagram - -Worker->>Queue: reserve job -Queue->>Worker: job + lease_token -Worker->>Queue: heartbeat -Worker->>Queue: ack success -``` - -If the worker crashes: - -- the lease expires -- the job becomes available again - -This prevents jobs from becoming **permanently stuck**. +**Rules:** +- Max 100 per call +- Strict lane validation +- Atomic per batch ------------------------------------------------------------------------ -## Heartbeats +## Child ACK Control (Parent/Child Workflows) -Some jobs may take a long time to finish. +This primitive enables **fan-out workflows**, where a parent job spawns +multiple child jobs that can run in parallel across one or more queues. -Examples include: +If you want to learn more about the internal execution model and architecture, +see the core project: **[OmniQ](https://github.com/not-empty/omniq)**. -- video processing -- large data analysis -- heavy report generation +Each child job acknowledges its completion using a **shared completion key**. +OmniQ maintains an **atomic counter in Redis** that tracks how many child jobs +are still pending. -Workers can send **heartbeats** to extend the lease. +When a child finishes, it calls `child_ack()`, which decrements the counter +and returns the number of remaining jobs. When the counter reaches `0`, +it indicates that **all child jobs have completed**. -``` -Worker - ├─ start job - ├─ heartbeat - ├─ heartbeat - └─ finish job -``` +The mechanism is **idempotent and safe under retries**, ensuring that +duplicate executions do not corrupt the completion tracking. -This tells the system that the job **is still actively running**. +No TTL is used, the counter is automatically cleaned up when the value +reaches zero. ------------------------------------------------------------------------ -## Grouped Jobs - -OmniQ supports grouping jobs using **groups**. +### Parent Example -This is useful when you need to limit concurrency within a logical group. +The first queue will receive a document with 5 pages -Example: processing jobs per customer. - -``` -Queue: payments +```python +# importing the lib +from omniq.client import OmniqClient -Group A (customer A) - ├─ job1 - └─ job2 +# creating OmniQ passing redis information +omniq = OmniqClient( + host="omniq-redis", + port=6379, +) -Group B (customer B) - ├─ job3 - └─ job4 +# publishing the job +job_id = omniq.publish( + queue="documents", + payload={ + "document_id": "doc-123", # this will be our unique key to initiate childs and tracking then until completion + "pages": 5, # each page must be completed before something happen + }, +) +print("OK", job_id) ``` -Guarantees: - -- **FIFO ordering within a group** -- groups run **in parallel** -- configurable concurrency limits per group - -Diagram: - -```mermaid -flowchart TD +The first consumer will publish a job for each page passing the unique key +for childs tracking. -A[Queue] +```python +# importing the lib +from omniq.client import OmniqClient -A --> B[Group A] -A --> C[Group B] +# creating OmniQ passing redis information +omniq = OmniqClient( + host="omniq-redis", + port=6379, +) -B --> B1[job] -B --> B2[job] - -C --> C1[job] -C --> C2[job] +# publishing the job +job_id = omniq.publish( + queue="documents", + payload={ + "document_id": "doc-123", # this will be our unique key to initiate childs and tracking then until completion + "pages": 5, # each page must be completed before something happen + }, +) +print("OK", job_id) ``` ------------------------------------------------------------------------ -## Ungrouped Jobs +### Child Example -Jobs can also be published **without a group**. +The second consumer will deal with each page and ack each (alerting whe the last +page was processed). -``` -Queue +```python +import time - ├─ job A - ├─ job B - └─ job C -``` +# importing the lib +from omniq.client import OmniqClient -OmniQ uses a **round-robin strategy** to maintain fairness between grouped and ungrouped jobs. +# creating your handler (ctx will have all the job information and actions) +def page_worker(ctx): ------------------------------------------------------------------------- + page = ctx.payload["page"] + # getting the unique key to track the childs + completion_key = ctx.payload["completion_key"] -## Workflows with Child Jobs + print(f"[page_worker] Processing page {page} (job_id={ctx.job_id})") + time.sleep(1.5) -Some tasks need to be split into smaller pieces. + # acking itself as a child the number of remaining jobs are returned so we can say when the last job was executed + remaining = ctx.exec.child_ack(completion_key) -For example: processing a multi-page document. - -``` -Document Job - │ - ▼ -Split into pages -``` + print(f"[page_worker] Page {page} done. Remaining={remaining}") + -Diagram: + # remaining will be 0 ONLY when this is the last job + # will return > 0 when are still jobs to process + # and -1 if something goes wrong with the counter + if remaining == 0: + print("[page_worker] Last page finished.") -```mermaid -flowchart TD +# creating OmniQ passing redis information +omniq = OmniqClient( + host="omniq-redis", + port=6379, +) -A[Document Job] - -A --> B[Page 1] -A --> C[Page 2] -A --> D[Page 3] -A --> E[Page 4] - -B --> F[Completion] -C --> F -D --> F -E --> F +# creating the consumer that will listen and execute the actions in your handler +omniq.consume( + queue="pages", + handler=page_worker, + verbose=True, + drain=False, +) ``` -Each page can be processed by **a different worker**, enabling massive parallel processing. +**Propeties:** +- Idempotent decrement +- Safe under retries +- Cross-queue safe +- Fully business-logic driven ------------------------------------------------------------------------ -## Child Job Coordination - -OmniQ provides a simple mechanism to coordinate these child jobs. - -Each child reports when it finishes: - -``` -Page 1 → done -Page 2 → done -Page 3 → done -Page 4 → done -``` - -Internally, a **remaining jobs counter** is tracked. - -Example: +## Grouped Jobs -``` -Remaining = 4 +```python +# if you provide a gid (group_id) you can limit the parallel execution for jobs in the same group +omniq.publish(queue="demo", payload={"i": 1}, gid="company:acme", group_limit=1) -Child 1 finished → Remaining = 3 -Child 2 finished → Remaining = 2 -Child 3 finished → Remaining = 1 -Child 4 finished → Remaining = 0 +# you can also publis ungrouped jobs that will also be executed (fairness by round-robin algorithm) +omniq.publish(queue="demo", payload={"i": 2}) ``` - -When the counter reaches **zero**, the workflow can continue. - -Key properties: - -- idempotent -- safe for retries -- works across different queues -- does not depend on TTL +- FIFO inside group +- Groups execute in parallel +- Concurrency limited per group ------------------------------------------------------------------------ -## Job States +## Pause and Resume inside the consumer -During its lifecycle, a job transitions through several states. +You publish your as usual -```mermaid -stateDiagram-v2 +```python +# importing the lib +from omniq.client import OmniqClient -[*] --> waiting -waiting --> reserved -reserved --> processing -processing --> completed -processing --> failed +# creating OmniQ passing redis information +uq = OmniqClient( + host="omniq-redis", + port=6379, +) + +# publishing the job +job_id = uq.publish( + queue="test", + payload={"hello": "world"}, + timeout_ms=30_000 +) +print("OK", job_id) ``` -This makes the system state **observable and traceable**. +Inside your consumer you can pause/resume your queue (or another one) ------------------------------------------------------------------------- +```python +import time -## Administrative Operations +# importing the lib +from omniq.client import OmniqClient -OmniQ provides safe administrative operations for queue management. +# creating your handler (ctx will have all the job information and actions) +def pause_unpause_example(ctx): + print("Waiting 2 seconds") -Examples: + # checking if this queue it is paused (spoiler: it's not) + is_paused = ctx.exec.is_paused( + queue="test" + ) + print("Is paused", is_paused) + time.sleep(2) -- retry failed jobs -- remove jobs -- pause queues -- resume queues -These operations run through **atomic Lua scripts**, ensuring consistency even under high concurrency. + print("Pausing") ------------------------------------------------------------------------- + # pausing this queue (this job it's and others active jobs will be not affected but not new job will be start until queue is resumed) + ctx.exec.pause( + queue="test" + ) -## When to Use OmniQ + # checking again now is suposed to be paused + is_paused = ctx.exec.is_paused( + queue="test" + ) + print("Is paused", is_paused) + time.sleep(2) -OmniQ is a good fit for systems that need to: + print("Resuming") -- run background tasks reliably -- control concurrency -- coordinate distributed pipelines -- split large workloads into smaller jobs -- prevent duplicate execution -- maintain operational reliability + # resuming this queue (all other workers can process jobs again) + ctx.exec.resume( + queue="test" + ) -Common use cases include: + # checking again and is suposed to be resumed + is_paused = ctx.exec.is_paused( + queue="test" + ) + print("Is paused", is_paused) + time.sleep(2) -- document processing -- data pipelines -- media processing -- report generation -- backend automation + print("Done") ------------------------------------------------------------------------- +# creating OmniQ passing redis information +omniq = OmniqClient( + host="omniq-redis", + port=6379, +) -## Installation - -``` -pip install omniq +# creating the consumer that will listen and execute the actions in your handler +omniq.consume( + queue="test", + handler=pause_unpause_example, + verbose=True, + drain=False, +) ``` ------------------------------------------------------------------------ ## Examples -Complete examples are available in: - -``` -./examples -``` - -They include: - -- publishing jobs -- basic workers -- structured payloads -- parent/child workflows -- queue coordination +Additional usage examples demonstrating common patterns can be found +in the `/examples` folder. ------------------------------------------------------------------------ ## License -See the license file in the repository. - ------------------------------------------------------------------------- \ No newline at end of file +See the repository license. \ No newline at end of file