diff --git a/Developer.md b/Developer.md new file mode 100644 index 0000000..d20d474 --- /dev/null +++ b/Developer.md @@ -0,0 +1,443 @@ +# OmniQ (Python) + +OmniQ Python is the **Python client** for **OmniQ**, a Redis + Lua-based task +queue designed for distributed processing with strict control over execution +concurrency, and state. The library provides a direct interface for publishing, +consuming, and managing jobs while maintaining operational safety guarantees and +consistency, even in concurrent and distributed environments. + +Core project / docs: https://github.com/not-empty/omniq + +------------------------------------------------------------------------ + +## Why OmniQ + +Redis-based queues typically sacrifice predictability under concurrency. + +**OmniQ was designed to provide:** +- Atomic state transitions (with Lua) +- Lease-based execution (no double processing) +- Group isolation with FIFO ordering +- Deterministic control over pause, retry, and concurrency + +Focused on operational safety in distributed environments. + +------------------------------------------------------------------------ + +## Execution +The development environment can be started in two different ways, depending on your setup and infrastructure preferences: + +- With Docker — recommended for isolation, reproducibility, and consistent Redis configuration. + +- Without Docker — using a locally installed and properly configured Redis instance. + +Both approaches allow you to run the examples and validate queue behavior in a local environment. + +------------------------------------------------------------------------ + +## Installation + +### Option 1 - Using Docker + +**1. Create Project** + +``` bash +mkdir omniq-sample +cd omniq-sample +``` +**2. Create docker-compose.yml** + +``` python +services: + omniq-redis: + image: redis:7.4-alpine + container_name: omniq-redis + ports: + - "6379:6379" + command: ["redis-server", "--appendonly", "yes"] + volumes: + - redis_data:/data + + omniq-valkey: + image: valkey/valkey:9-alpine + container_name: omniq-valkey + ports: + - "6380:6379" + command: ["valkey-server", "--appendonly", "yes"] + volumes: + - valkey_data:/data + + omniq-python: + image: python:3.12-slim + container_name: omniq-python + working_dir: /app + environment: + PYTHONUNBUFFERED: "1" + volumes: + - ./:/app + depends_on: + - omniq-redis + command: ["bash", "-lc", "tail -f /dev/null"] +``` +choose either Redis or Valkey to run this project. + +**3. Start environment** + +``` bash +docker compose up -d +``` +**4. Access Python container** + +``` bash +docker exec -it omniq-python bash +pip install omniq +``` + +**5. Create Simple example** + +``` bash +mkdir simple +cd simple +``` + +**publish.py** +``` python +from omniq.client import OmniqClient + +omniq = OmniqClient(host="omniq-redis", port=6379) + +job_id = omniq.publish( + queue="demo", + payload={"hello": "world"}, + timeout_ms=30_000 +) + +print("Published:", job_id) +``` + +**consumer.py** +``` python +import time +from omniq.client import OmniqClient + +def handler(ctx): + print("Processing:", ctx.job_id) + time.sleep(2) + print("Done") + +omniq = OmniqClient(host="omniq-redis", port=6379) + +omniq.consume( + queue="demo", + handler=handler, + verbose=True +) +``` +**Run:** +``` bash +python publish.py +python consumer.py +``` + +------------------------------------------------------------------------ + +### Option 2 - Running Locally Without Docker + +**1. Start Redis** +``` bash +redis-server +``` + +**2. Create Project** +``` bash +mkdir omniq-sample +cd omniq-sample +pip install omniq +``` +Then create publish.py and consumer.py as shown above + +------------------------------------------------------------------------ + +## Publishing + +### Publish Simple Job + +``` python +from omniq.client import OmniqClient + +omniq = OmniqClient(host="omniq-redis", port=6379) + +job_id = omniq.publish( + queue="demo", + payload={"hello": "world"}, + timeout_ms=30_000 +) +``` +The publish method enqueues a job with a JSON-serializable payload. + +**Key behavior:** +- The job receives a unique job_id +- timeout_ms defines the lease duration once reserved +- The payload is stored atomically +This is the simplest way to enqueue work for asynchronous processing. + +------------------------------------------------------------------------ + +### Publish Structured Payload + +``` python +from dataclasses import dataclass +from typing import Optional, List +from omniq.client import OmniqClient + +@dataclass +class OrderCreated: + order_id: str + amount: int + currency: str + tags: Optional[List[str]] = None + +omniq = OmniqClient(host="omniq-redis", port=6379) + +job_id = omniq.publish_json( + queue="orders", + payload=OrderCreated( + order_id="ORD-1", + amount=1000, + currency="USD", + tags=["priority"] + ), + max_attempts=5, + timeout_ms=60_000 +) +``` +The publish_json method allows you to send structured objects (such as dataclasses) that are automatically serialized to JSON before being stored in the queue. + +**Advantages:** +- Safe and standardized serialization +- Typed and predictable payload structure +- Explicit max_attempts control +- Better organization for event-driven systems +- Recommended for production environments where payload contracts must remain stable. + +------------------------------------------------------------------------ + +## Consumption + +``` python +omniq.consume( + queue="demo", + handler=handler, + verbose=True, + drain=False +) +``` +The consume method starts a worker loop that continuously reserves and processes jobs from the specified queue. + +**Parameters:** +- queue — Target queue name +- handler — Function responsible for processing each job +- verbose=True — Enables execution logs (reservations, ACKs, retries, errors) +- drain=False — Keeps the consumer running, waiting for new jobs + +With drain=False, the worker behaves as a long-running consumer suitable for services and background processors. + +------------------------------------------------------------------------ + +## Drain Mode + +``` python +omniq.consume( + queue="demo", + handler=handler, + drain=True +) +``` +**When drain=True, the consumer:** +- Processes all currently available jobs +- Does not wait for new jobs +- Automatically exits once the queue is empty + +**Best suited for:** +- Batch processing +- Maintenance scripts +- Controlled execution pipelines + +------------------------------------------------------------------------ + +## Heartbeat + +For long-running jobs: +``` python +ctx.exec.heartbeat() +``` +The heartbeat call renews the lease of the currently running job. + +**Behavior** +- Uses the same lease_token issued at reservation +- Extends lock_until_ms safely +- Prevents the job from returning to the queue while still being processed +- Essential for long or unpredictable execution times. + +------------------------------------------------------------------------ + +## Handler Context + +### Inside handler(ctx): + +| Field | Description | +|----------------|--------------------------------------------| +| queue | Queue name | +| job_id | Unique identifier | +| payload | Deserialized payload | +| payload_raw | Raw JSON | +| attempt | Current attempt number | +| lock_until_ms | Lease expiration timestamp | +| lease_token | Required token for ACK/heartbeat | +| gid | Group identifier | +| exec | Secure layer for administrative operations | + +------------------------------------------------------------------------ + +## Grouped Queues + +``` python +omniq.publish( + queue="payments", + payload={"invoice": 1}, + gid="company:acme", + group_limit=1 +) +``` +By defining a gid, the job becomes part of a logical group. + +**Guarantees:** +- FIFO ordering within the same group +- Parallel execution across different groups +- Round-robin scheduling between active groups +- Concurrency control via group_limit + +This allows isolation of tenants, customers, or entities without blocking the entire system. + +------------------------------------------------------------------------ + +## Retry + +### Retry Failed Job + +``` python +omniq.retry_failed( + queue="demo", + job_id="01ABC..." +) +``` +Reactivates a job currently in the failed lane. + +**Rules:** +- Only works if the job is in failed state +- Resets the attempt counter +- Preserves group and concurrency rules + +------------------------------------------------------------------------ + +### Batch Retry (up to 100) + +``` python +results = omniq.retry_failed_batch( + queue="demo", + job_ids=["01A...", "01B..."] +) +``` +Allows retrying multiple failed jobs at once. + +**Characteristics:** +- Atomic operation (Lua-backed) +- Up to 100 jobs per call +- Individual result per job (status + reason) + +------------------------------------------------------------------------ + +## Removal + +``` python +omniq.remove_jobs_batch( + queue="demo", + lane="failed", + job_ids=["01A...", "01B..."] +) +``` +Removes jobs from a specific lane. + +**Important rules:** +- Active jobs cannot be removed +- The provided lane must match the actual job state +- Group integrity is preserved +- Used for administrative cleanup and manual state control. + +------------------------------------------------------------------------ + +## Pause and Resume + +``` python +omniq.pause(queue="demo") +omniq.resume(queue="demo") +omniq.is_paused(queue="demo") +``` +Controls queue execution flow. + +**Guarantees:** +- Prevents new reservations while paused +- Active jobs continue running normally +- No job is lost or moved unexpectedly +- Immediate and consistent resume behavior + +Can be used externally or inside a handler via ctx.exec. + +------------------------------------------------------------------------ + +## Parent/Child Flow + +``` python +remaining = ctx.exec.child_ack(completion_key) + +if remaining == 0: + print("Last child completed") +``` +**Returns:** +- Pending children -> `> 0` +- Last Child -> `0` +- Counter error -> `1` + +**Properties:** +- Idempotent +- Retry-safe +- Isolated across queues + +------------------------------------------------------------------------ + +## Administrative Operations +All administrative operations are `executed atomically via Lua`, ensuring +consistency even under high concurrency. + +------------------------------------------------------------------------ + +## Examples + +For a better understanding of queue behavior and its main features, see +the examples available in the `./examples` folder. + +**You will find scenarios such as:** +- Basic job publishing and consumption +- Using the `ctx.exec` layer inside handlers +- Parent/child workflows with `childs_init` and `child_ack` +- Coordination between multiple queues (documents -> pages) +- Execution control using pause and resume inside handlers + +The examples are recommended to understand the full execution flow in +real-world environments. + +------------------------------------------------------------------------ + +## License +GPL-3.0 +Refer to the main repository for details diff --git a/README.md b/README.md index c4aac85..7ea38a3 100644 --- a/README.md +++ b/README.md @@ -1,46 +1,60 @@ # OmniQ (Python) -**OmniQ** is a Redis + Lua, language-agnostic job queue.\ -This package is the **Python client** for OmniQ v1. +Python client for **OmniQ**, a Redis-based distributed job queue designed +for deterministic **consumer-driven job execution and coordination**. -Core project / docs: https://github.com/not-empty/omniq +OmniQ provides primitives for **job reservation, execution, and coordination** +**directly inside Redis**, allowing multiple **consumers** to safely process +jobs in a distributed system. ------------------------------------------------------------------------- +Unlike traditional queues that treat jobs as transient messages, OmniQ +maintains **explicit execution and state structures**, enabling predictable +control over concurrency, ordering, and failure recovery. + +The system is **language-agnostic**, allowing producers and consumers +implemented in different runtimes to share the same execution model. -## Key Ideas - -- **Hybrid lanes** - - Ungrouped jobs by default - - Optional grouped jobs (FIFO per group + per-group concurrency) -- **Lease-based execution** - - Workers reserve a job with a time-limited lease -- **Token-gated ACK / heartbeat** - - `reserve()` returns a `lease_token` - - `heartbeat()` and `ack_*()` must include the same token -- **Pause / resume (flag-only)** - - Pausing prevents *new reserves* - - Running jobs are not interrupted - - Jobs are not moved -- **Admin-safe operations** - - Strict `retry`, `retry_batch`, `remove`, `remove_batch` -- **Handler-driven execution layer** - - `ctx.exec` exposes internal OmniQ operations safely inside handlers +Core project: + +[https://github.com/not-empty/omniq](https://github.com/not-empty/omniq) ------------------------------------------------------------------------ -## Install +## Installation -``` bash +```bash pip install omniq ``` +------------------------------------------------------------------------ + +## Features + +- **Redis-native execution model -** + - Job reservation, execution, and coordination happen atomically inside Redis +- **Consumer- driven processing -** + - Workers control execution lifecycle instead of passive message delivery +- **Deterministic job state -** + - Explicit lanes for `wait`, `delayed`, `active`, `failed`, and `completed` +- **Grouped jobs with concurrency limits -** + - FIFO within groups with parallel execution across groups +- **Atomic administrative operations -** + - Retry, removal, pause, and batch operations backed by Lua Scripts +- **Parent/Child workflow primitive -** + - Fan-out processing with idempotent completion tracking +- **Structured payload support -** + - Publish typed dataclasses as JSON +- **Language-agnostic architecture -** + - Producers and consumers can run in different runtimes. + + ------------------------------------------------------------------------ ## Quick Start ### Publish -``` python +```python # importing the lib from omniq.client import OmniqClient @@ -64,7 +78,7 @@ print("OK", job_id) ### Publish Structured JSON -``` python +```python from dataclasses import dataclass from typing import List, Optional @@ -125,12 +139,11 @@ job_id = omniq.publish_json( print("OK", job_id) ``` - ------------------------------------------------------------------------ ### Consume -``` python +```python import time # importing the lib @@ -162,38 +175,36 @@ omniq.consume( ## Handler Context Inside `handler(ctx)`: - -- `queue` -- `job_id` -- `payload_raw` -- `payload` -- `attempt` -- `lock_until_ms` -- `lease_token` -- `gid` -- `exec` → execution layer (`ctx.exec`) +- `queue` +- `job_id` +- `payload_raw` +- `payload` +- `attempt` +- `lock_until_ms` +- `lease_token` +- `gid` +- `exec` - execution layer (`ctx.exex`) ------------------------------------------------------------------------ -# Administrative Operations +## Admistrative OPerations -All admin operations are **Lua-backed and atomic**. +All admin operations are **Lua-backend and atomic** -## retry_failed() +### Retry_failed() -``` python +```bash omniq.retry_failed(queue="demo", job_id="01ABC...") ``` - -- Works only if job state is `failed` -- Resets attempt counter -- Respects grouping rules +- Works only if job state is `failed` +- Resets attempt counter +- Respects grouping rule ------------------------------------------------------------------------ -## retry_failed_batch() +### Retry_failed_batch() -``` python +```bash results = omniq.retry_failed_batch( queue="demo", job_ids=["01A...", "01B...", "01C..."] @@ -202,50 +213,47 @@ results = omniq.retry_failed_batch( for job_id, status, reason in results: print(job_id, status, reason) ``` - -- Max 100 jobs per call -- Atomic batch -- Per-job result returned +- Max 100 jobs per call +- Atomic batch +- Per-job result returned ------------------------------------------------------------------------ -## remove_job() +### Remove_job() -``` python +```bash omniq.remove_job( queue="demo", job_id="01ABC...", lane="failed", # wait | delayed | failed | completed | gwait ) ``` - -Rules: - -- Cannot remove active jobs -- Lane must match job state -- Group safety preserved +**Rules:** +- Cannot remove active jobs +- Lane must match job state +- Group safety preserved ------------------------------------------------------------------------ -## remove_jobs_batch() +### Remove_job_batch() -``` python +```bash results = omniq.remove_jobs_batch( queue="demo", lane="failed", job_ids=["01A...", "01B...", "01C..."] ) ``` - -- Max 100 per call -- Strict lane validation -- Atomic per batch +**Rules:** +- Max 100 per call +- Strict lane validation +- Atomic per batch ------------------------------------------------------------------------ -## pause() +### Pause() -``` python +```bash pause_result = omniq.pause( queue="demo", ) @@ -258,34 +266,42 @@ is_paused = omniq.is_paused( queue="demo", ) ``` +**Rules:** +- Max 100 per call +- Strict lane validation +- Atomic per batch + ------------------------------------------------------------------------ -# Handler Context +## Child ACK Control (Parent/Child Workflows) -Inside `handler(ctx)`: +This primitive enables **fan-out workflows**, where a parent job spawns +multiple child jobs that can run in parallel across one or more queues. -- `queue` -- `job_id` -- `payload_raw` -- `payload` -- `attempt` -- `lock_until_ms` -- `lease_token` -- `gid` -- `exec` +If you want to learn more about the internal execution model and architecture, +see the core project: **[OmniQ](https://github.com/not-empty/omniq)**. ------------------------------------------------------------------------- +Each child job acknowledges its completion using a **shared completion key**. +OmniQ maintains an **atomic counter in Redis** that tracks how many child jobs +are still pending. -# Child Ack Control (Parent / Child Workflows) +When a child finishes, it calls `child_ack()`, which decrements the counter +and returns the number of remaining jobs. When the counter reaches `0`, +it indicates that **all child jobs have completed**. -A handler-driven primitive for fan-out workflows. +The mechanism is **idempotent and safe under retries**, ensuring that +duplicate executions do not corrupt the completion tracking. -No TTL. Cleanup happens only when counter reaches zero. +No TTL is used, the counter is automatically cleaned up when the value +reaches zero. + +------------------------------------------------------------------------ -## Parent Example +### Parent Example The first queue will receive a document with 5 pages -``` python + +```python # importing the lib from omniq.client import OmniqClient @@ -306,8 +322,10 @@ job_id = omniq.publish( print("OK", job_id) ``` -The first consumer will publish a job for each page passing the unique key for childs tracking -``` python +The first consumer will publish a job for each page passing the unique key +for childs tracking. + +```python # importing the lib from omniq.client import OmniqClient @@ -328,11 +346,14 @@ job_id = omniq.publish( print("OK", job_id) ``` -## Child Example +------------------------------------------------------------------------ + +### Child Example -The second consumer will deal with each page and ack each child (alerting when the last page was processed) +The second consumer will deal with each page and ack each (alerting whe the last +page was processed). -``` python +```python import time # importing the lib @@ -375,35 +396,34 @@ omniq.consume( ) ``` -Properties: - -- Idempotent decrement -- Safe under retries -- Cross-queue safe -- Fully business-logic driven +**Propeties:** +- Idempotent decrement +- Safe under retries +- Cross-queue safe +- Fully business-logic driven ------------------------------------------------------------------------ ## Grouped Jobs -``` python +```python # if you provide a gid (group_id) you can limit the parallel execution for jobs in the same group omniq.publish(queue="demo", payload={"i": 1}, gid="company:acme", group_limit=1) # you can also publis ungrouped jobs that will also be executed (fairness by round-robin algorithm) omniq.publish(queue="demo", payload={"i": 2}) ``` - -- FIFO inside group -- Groups execute in parallel -- Concurrency limited per group +- FIFO inside group +- Groups execute in parallel +- Concurrency limited per group ------------------------------------------------------------------------ ## Pause and Resume inside the consumer -You publish your job as usual -``` python +You publish your as usual + +```python # importing the lib from omniq.client import OmniqClient @@ -423,7 +443,8 @@ print("OK", job_id) ``` Inside your consumer you can pause/resume your queue (or another one) -``` python + +```python import time # importing the lib @@ -486,12 +507,15 @@ omniq.consume( ) ``` +------------------------------------------------------------------------ + ## Examples -All examples can be found in the `./examples` folder. +Additional usage examples demonstrating common patterns can be found +in the `/examples` folder. ------------------------------------------------------------------------ ## License -See the repository license. +See the repository license. \ No newline at end of file