From 6b42ec91d554bd637c175020555c20ece21e778e Mon Sep 17 00:00:00 2001 From: LeoDiSarli Date: Thu, 12 Mar 2026 15:42:19 -0300 Subject: [PATCH] including redis monitor keys --- examples/monitor/monitor.py | 342 ++++++++++++++++++ src/omniq/core/scripts/ack_fail.lua | 29 +- src/omniq/core/scripts/ack_success.lua | 29 +- src/omniq/core/scripts/enqueue.lua | 29 +- src/omniq/core/scripts/promote_delayed.lua | 59 ++- src/omniq/core/scripts/reap_expired.lua | 49 ++- src/omniq/core/scripts/remove_job.lua | 57 ++- src/omniq/core/scripts/remove_jobs_batch.lua | 89 ++++- src/omniq/core/scripts/reserve.lua | 49 ++- src/omniq/core/scripts/retry_failed.lua | 50 ++- src/omniq/core/scripts/retry_failed_batch.lua | 60 ++- 11 files changed, 808 insertions(+), 34 deletions(-) create mode 100644 examples/monitor/monitor.py diff --git a/examples/monitor/monitor.py b/examples/monitor/monitor.py new file mode 100644 index 0000000..e5927a1 --- /dev/null +++ b/examples/monitor/monitor.py @@ -0,0 +1,342 @@ +#!/usr/bin/env python3 +""" +OmniQ observer + +Polls OmniQ monitoring keys and optional transactional keys to help validate +behavior under stress. + +Examples: + python omniq_observer.py --redis-url redis://omniq-redis:6379/0 + python omniq_observer.py --redis-url redis://omniq-redis:6379/0 --interval 0.5 --csv omniq_observer.csv + python omniq_observer.py --redis-url redis://omniq-redis:6379/0 --queues emails,pdfs --raw-verify +""" + +from __future__ import annotations + +import argparse +import csv +import os +import signal +import sys +import time +from dataclasses import dataclass +from typing import Iterable + +import redis + + +@dataclass +class QueueSnapshot: + ts_ms: int + queue: str + paused: int + waiting: int + group_waiting: int + waiting_total: int + active: int + delayed: int + failed: int + completed_kept: int + groups_ready: int + last_activity_ms: int + last_enqueue_ms: int + last_reserve_ms: int + last_finish_ms: int + raw_waiting: int | None = None + raw_group_waiting: int | None = None + raw_waiting_total: int | None = None + raw_active: int | None = None + raw_delayed: int | None = None + raw_failed: int | None = None + raw_completed_kept: int | None = None + raw_groups_ready: int | None = None + ok_waiting: int | None = None + ok_group_waiting: int | None = None + ok_waiting_total: int | None = None + ok_active: int | None = None + ok_delayed: int | None = None + ok_failed: int | None = None + ok_completed_kept: int | None = None + ok_groups_ready: int | None = None + + +class Observer: + def __init__( + self, + redis_client: redis.Redis, + queues: list[str] | None, + raw_verify: bool, + csv_path: str | None, + console_every: int, + ) -> None: + self.r = redis_client + self.explicit_queues = queues + self.raw_verify = raw_verify + self.csv_path = csv_path + self.console_every = max(1, console_every) + self._stop = False + self._loop_n = 0 + self._csv_file = None + self._csv_writer = None + + def stop(self, *_args) -> None: + self._stop = True + + def install_signal_handlers(self) -> None: + signal.signal(signal.SIGINT, self.stop) + signal.signal(signal.SIGTERM, self.stop) + + @staticmethod + def _to_i(value: object) -> int: + if value is None: + return 0 + if isinstance(value, bytes): + value = value.decode("utf-8", errors="replace") + if value == "": + return 0 + try: + return int(float(value)) + except Exception: + return 0 + + def _discover_queues(self) -> list[str]: + if self.explicit_queues is not None: + return self.explicit_queues + names = sorted( + q.decode("utf-8", errors="replace") if isinstance(q, bytes) else str(q) + for q in self.r.smembers("omniq:queues") + ) + return names + + def _read_stats(self, queue: str) -> dict[str, int]: + stats = self.r.hgetall(f"{queue}:stats") + decoded: dict[str, int] = {} + for k, v in stats.items(): + key = k.decode("utf-8", errors="replace") if isinstance(k, bytes) else str(k) + decoded[key] = self._to_i(v) + return decoded + + def _scan_group_waiting(self, queue: str) -> int: + total = 0 + cursor = 0 + pattern = f"{queue}:g:*:wait" + while True: + cursor, keys = self.r.scan(cursor=cursor, match=pattern, count=200) + if keys: + pipe = self.r.pipeline(transaction=False) + for key in keys: + pipe.llen(key) + lengths = pipe.execute() + total += sum(self._to_i(x) for x in lengths) + if cursor == 0: + break + return total + + def _read_raw(self, queue: str) -> dict[str, int]: + raw = { + "waiting": self._to_i(self.r.llen(f"{queue}:wait")), + "active": self._to_i(self.r.zcard(f"{queue}:active")), + "delayed": self._to_i(self.r.zcard(f"{queue}:delayed")), + "failed": self._to_i(self.r.llen(f"{queue}:failed")), + "completed_kept": self._to_i(self.r.llen(f"{queue}:completed")), + "groups_ready": self._to_i(self.r.zcard(f"{queue}:groups:ready")), + } + raw["group_waiting"] = self._scan_group_waiting(queue) + raw["waiting_total"] = raw["waiting"] + raw["group_waiting"] + return raw + + def snapshot_queue(self, queue: str) -> QueueSnapshot: + now_ms = int(time.time() * 1000) + stats = self._read_stats(queue) + paused = 1 if self.r.exists(f"{queue}:paused") else 0 + + snap = QueueSnapshot( + ts_ms=now_ms, + queue=queue, + paused=paused, + waiting=stats.get("waiting", 0), + group_waiting=stats.get("group_waiting", 0), + waiting_total=stats.get("waiting_total", 0), + active=stats.get("active", 0), + delayed=stats.get("delayed", 0), + failed=stats.get("failed", 0), + completed_kept=stats.get("completed_kept", 0), + groups_ready=stats.get("groups_ready", 0), + last_activity_ms=stats.get("last_activity_ms", 0), + last_enqueue_ms=stats.get("last_enqueue_ms", 0), + last_reserve_ms=stats.get("last_reserve_ms", 0), + last_finish_ms=stats.get("last_finish_ms", 0), + ) + + if self.raw_verify: + raw = self._read_raw(queue) + snap.raw_waiting = raw["waiting"] + snap.raw_group_waiting = raw["group_waiting"] + snap.raw_waiting_total = raw["waiting_total"] + snap.raw_active = raw["active"] + snap.raw_delayed = raw["delayed"] + snap.raw_failed = raw["failed"] + snap.raw_completed_kept = raw["completed_kept"] + snap.raw_groups_ready = raw["groups_ready"] + + snap.ok_waiting = int(snap.waiting == snap.raw_waiting) + snap.ok_group_waiting = int(snap.group_waiting == snap.raw_group_waiting) + snap.ok_waiting_total = int(snap.waiting_total == snap.raw_waiting_total) + snap.ok_active = int(snap.active == snap.raw_active) + snap.ok_delayed = int(snap.delayed == snap.raw_delayed) + snap.ok_failed = int(snap.failed == snap.raw_failed) + snap.ok_completed_kept = int(snap.completed_kept == snap.raw_completed_kept) + snap.ok_groups_ready = int(snap.groups_ready == snap.raw_groups_ready) + + return snap + + def _ensure_csv(self) -> None: + if not self.csv_path or self._csv_writer is not None: + return + path = Path(self.csv_path) + path.parent.mkdir(parents=True, exist_ok=True) + exists = path.exists() and path.stat().st_size > 0 + self._csv_file = path.open("a", newline="", encoding="utf-8") + self._csv_writer = csv.DictWriter(self._csv_file, fieldnames=list(QueueSnapshot.__dataclass_fields__.keys())) + if not exists: + self._csv_writer.writeheader() + self._csv_file.flush() + + def write_csv(self, snaps: Iterable[QueueSnapshot]) -> None: + if not self.csv_path: + return + self._ensure_csv() + assert self._csv_writer is not None + for snap in snaps: + self._csv_writer.writerow(snap.__dict__) + assert self._csv_file is not None + self._csv_file.flush() + + def print_console(self, snaps: list[QueueSnapshot]) -> None: + if not snaps: + print(f"[{int(time.time())}] no queues discovered") + return + + headers = ["queue", "paused", "wait", "gwait", "wtotal", "active", "delayed", "failed", "done", "gready"] + rows = [] + for s in snaps: + rows.append([ + s.queue, + str(s.paused), + str(s.waiting), + str(s.group_waiting), + str(s.waiting_total), + str(s.active), + str(s.delayed), + str(s.failed), + str(s.completed_kept), + str(s.groups_ready), + ]) + + widths = [len(h) for h in headers] + for row in rows: + for i, cell in enumerate(row): + widths[i] = max(widths[i], len(cell)) + + def fmt(row: list[str]) -> str: + return " ".join(cell.ljust(widths[i]) for i, cell in enumerate(row)) + + print() + print(fmt(headers)) + print(fmt(["-" * w for w in widths])) + for row in rows: + print(fmt(row)) + + if self.raw_verify: + mismatches = [] + for s in snaps: + bad = [] + if s.ok_waiting == 0: + bad.append("waiting") + if s.ok_group_waiting == 0: + bad.append("group_waiting") + if s.ok_waiting_total == 0: + bad.append("waiting_total") + if s.ok_active == 0: + bad.append("active") + if s.ok_delayed == 0: + bad.append("delayed") + if s.ok_failed == 0: + bad.append("failed") + if s.ok_completed_kept == 0: + bad.append("completed_kept") + if s.ok_groups_ready == 0: + bad.append("groups_ready") + if bad: + mismatches.append(f"{s.queue}: {', '.join(bad)}") + if mismatches: + print("verify:", " | ".join(mismatches)) + else: + print("verify: all stats match raw keys") + + def run(self, interval_s: float, duration_s: float | None, once: bool) -> int: + self.install_signal_handlers() + started = time.monotonic() + + while not self._stop: + queues = self._discover_queues() + snaps = [self.snapshot_queue(q) for q in queues] + self.write_csv(snaps) + + if self._loop_n % self.console_every == 0: + self.print_console(snaps) + + self._loop_n += 1 + + if once: + break + if duration_s is not None and (time.monotonic() - started) >= duration_s: + break + + time.sleep(interval_s) + + if self._csv_file is not None: + self._csv_file.close() + return 0 + + +def build_parser() -> argparse.ArgumentParser: + p = argparse.ArgumentParser(description="Observe OmniQ queue stats and optional raw Redis validation.") + p.add_argument("--redis-url", default=os.getenv("REDIS_URL", "redis://omniq-redis:6379/0")) + p.add_argument("--queues", default="", help="Comma-separated queue names. Empty means discover from omniq:queues.") + p.add_argument("--interval", type=float, default=1.0, help="Polling interval in seconds.") + p.add_argument("--duration", type=float, default=None, help="Optional total run duration in seconds.") + p.add_argument("--csv", default="", help="Optional CSV output path.") + p.add_argument("--raw-verify", action="store_true", help="Compare monitoring stats against transactional keys.") + p.add_argument("--console-every", type=int, default=1, help="Print every N loops.") + p.add_argument("--once", action="store_true", help="Run a single snapshot and exit.") + return p + + +def main() -> int: + args = build_parser().parse_args() + try: + client = redis.Redis.from_url(args.redis_url, decode_responses=False) + client.ping() + except Exception as exc: + print(f"Redis connection failed: {exc}", file=sys.stderr) + return 2 + + queues = [q.strip() for q in args.queues.split(",") if q.strip()] or None + + observer = Observer( + redis_client=client, + queues=queues, + raw_verify=args.raw_verify, + csv_path=args.csv or None, + console_every=args.console_every, + ) + return observer.run( + interval_s=max(0.05, args.interval), + duration_s=args.duration, + once=args.once, + ) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/omniq/core/scripts/ack_fail.lua b/src/omniq/core/scripts/ack_fail.lua index 1db7f9a..3b63edf 100644 --- a/src/omniq/core/scripts/ack_fail.lua +++ b/src/omniq/core/scripts/ack_fail.lua @@ -22,6 +22,8 @@ local k_active = base .. ":active" local k_delayed = base .. ":delayed" local k_failed = base .. ":failed" local k_gready = base .. ":groups:ready" +local k_stats = base .. ":stats" +local k_queues = "omniq:queues" local function to_i(v) if v == false or v == nil or v == '' then return 0 end @@ -39,6 +41,15 @@ local function dec_floor0(key) return v end +local function hincrby_floor0(key, field, delta) + local v = to_i(redis.call("HINCRBY", key, field, delta)) + if v < 0 then + redis.call("HSET", key, field, "0") + return 0 + end + return v +end + local function group_limit_for(gid) local k_glimit = base .. ":g:" .. gid .. ":limit" local lim = to_i(redis.call("GET", k_glimit)) @@ -70,6 +81,14 @@ if redis.call("ZREM", k_active, job_id) ~= 1 then return {"ERR", "NOT_ACTIVE"} end +redis.call("SADD", k_queues, base) + +hincrby_floor0(k_stats, "active", -1) +redis.call("HSET", k_stats, + "last_activity_ms", tostring(now_ms), + "last_finish_ms", tostring(now_ms) +) + maybe_store_last_error() local gid = redis.call("HGET", k_job, "gid") @@ -79,7 +98,10 @@ if gid and gid ~= "" then local limit = group_limit_for(gid) local k_gwait = base .. ":g:" .. gid .. ":wait" if inflight < limit and to_i(redis.call("LLEN", k_gwait)) > 0 then - redis.call("ZADD", k_gready, now_ms, gid) + local added = redis.call("ZADD", k_gready, "NX", now_ms, gid) + if added == 1 then + redis.call("HINCRBY", k_stats, "groups_ready", 1) + end end end @@ -96,6 +118,8 @@ if attempt >= max_attempts then "lock_until_ms", "" ) redis.call("LPUSH", k_failed, job_id) + redis.call("HINCRBY", k_stats, "failed", 1) + return {"FAILED"} end @@ -108,5 +132,6 @@ redis.call("HSET", k_job, "lock_until_ms", "" ) redis.call("ZADD", k_delayed, due_ms, job_id) +redis.call("HINCRBY", k_stats, "delayed", 1) -return {"RETRY", tostring(due_ms)} +return {"RETRY", tostring(due_ms)} \ No newline at end of file diff --git a/src/omniq/core/scripts/ack_success.lua b/src/omniq/core/scripts/ack_success.lua index 5a036e8..e5ada30 100644 --- a/src/omniq/core/scripts/ack_success.lua +++ b/src/omniq/core/scripts/ack_success.lua @@ -20,6 +20,8 @@ local k_job = base .. ":job:" .. job_id local k_active = base .. ":active" local k_completed = base .. ":completed" local k_gready = base .. ":groups:ready" +local k_stats = base .. ":stats" +local k_queues = "omniq:queues" local function to_i(v) if v == false or v == nil or v == '' then return 0 end @@ -37,6 +39,15 @@ local function dec_floor0(key) return v end +local function hincrby_floor0(key, field, delta) + local v = to_i(redis.call("HINCRBY", key, field, delta)) + if v < 0 then + redis.call("HSET", key, field, "0") + return 0 + end + return v +end + local function group_limit_for(gid) local k_glimit = base .. ":g:" .. gid .. ":limit" local lim = to_i(redis.call("GET", k_glimit)) @@ -57,6 +68,8 @@ if redis.call("ZREM", k_active, job_id) ~= 1 then return {"ERR", "NOT_ACTIVE"} end +redis.call("SADD", k_queues, base) + redis.call("HSET", k_job, "state", "completed", "updated_ms", tostring(now_ms), @@ -64,14 +77,24 @@ redis.call("HSET", k_job, "lock_until_ms", "" ) +hincrby_floor0(k_stats, "active", -1) +redis.call("HSET", k_stats, + "last_activity_ms", tostring(now_ms), + "last_finish_ms", tostring(now_ms) +) + local gid = redis.call("HGET", k_job, "gid") if gid and gid ~= "" then local k_ginflight = base .. ":g:" .. gid .. ":inflight" local inflight = dec_floor0(k_ginflight) local limit = group_limit_for(gid) local k_gwait = base .. ":g:" .. gid .. ":wait" + if inflight < limit and to_i(redis.call("LLEN", k_gwait)) > 0 then - redis.call("ZADD", k_gready, now_ms, gid) + local added = redis.call("ZADD", k_gready, "NX", now_ms, gid) + if added == 1 then + redis.call("HINCRBY", k_stats, "groups_ready", 1) + end end end @@ -83,4 +106,6 @@ while redis.call("LLEN", k_completed) > KEEP_COMPLETED do end end -return {"OK"} +redis.call("HSET", k_stats, "completed_kept", tostring(to_i(redis.call("LLEN", k_completed)))) + +return {"OK"} \ No newline at end of file diff --git a/src/omniq/core/scripts/enqueue.lua b/src/omniq/core/scripts/enqueue.lua index 8fcbda7..aa3c56b 100644 --- a/src/omniq/core/scripts/enqueue.lua +++ b/src/omniq/core/scripts/enqueue.lua @@ -26,9 +26,12 @@ local k_job = base .. ":job:" .. job_id local k_delayed = base .. ":delayed" local k_wait = base .. ":wait" local k_has_groups = base .. ":has_groups" - +local k_stats = base .. ":stats" +local k_queues = "omniq:queues" local is_grouped = (gid ~= nil and gid ~= "") +redis.call("SADD", k_queues, base) + if is_grouped then redis.call("HSET", k_job, "id", job_id, @@ -68,10 +71,21 @@ end if due_ms ~= nil and due_ms > now_ms then redis.call("ZADD", k_delayed, due_ms, job_id) redis.call("HSET", k_job, "state", "delayed", "due_ms", tostring(due_ms)) + redis.call("HINCRBY", k_stats, "delayed", 1) + redis.call("HSET", k_stats, + "last_activity_ms", tostring(now_ms), + "last_enqueue_ms", tostring(now_ms) + ) else if is_grouped then local k_gwait = base .. ":g:" .. gid .. ":wait" redis.call("RPUSH", k_gwait, job_id) + redis.call("HINCRBY", k_stats, "group_waiting", 1) + redis.call("HINCRBY", k_stats, "waiting_total", 1) + redis.call("HSET", k_stats, + "last_activity_ms", tostring(now_ms), + "last_enqueue_ms", tostring(now_ms) + ) local k_ginflight = base .. ":g:" .. gid .. ":inflight" local inflight = tonumber(redis.call("GET", k_ginflight) or "0") @@ -79,11 +93,20 @@ else local limit = tonumber(redis.call("GET", base .. ":g:" .. gid .. ":limit") or tostring(DEFAULT_GROUP_LIMIT)) if inflight < limit then local k_gready = base .. ":groups:ready" - redis.call("ZADD", k_gready, now_ms, gid) + local added = redis.call("ZADD", k_gready, "NX", now_ms, gid) + if added == 1 then + redis.call("HINCRBY", k_stats, "groups_ready", 1) + end end else redis.call("RPUSH", k_wait, job_id) + redis.call("HINCRBY", k_stats, "waiting", 1) + redis.call("HINCRBY", k_stats, "waiting_total", 1) + redis.call("HSET", k_stats, + "last_activity_ms", tostring(now_ms), + "last_enqueue_ms", tostring(now_ms) + ) end end -return {"OK", job_id} +return {"OK", job_id} \ No newline at end of file diff --git a/src/omniq/core/scripts/promote_delayed.lua b/src/omniq/core/scripts/promote_delayed.lua index 47df929..089d929 100644 --- a/src/omniq/core/scripts/promote_delayed.lua +++ b/src/omniq/core/scripts/promote_delayed.lua @@ -18,6 +18,9 @@ local k_delayed = base .. ":delayed" local k_wait = base .. ":wait" local k_gready = base .. ":groups:ready" +local k_stats = base .. ":stats" +local k_queues = "omniq:queues" + local function to_i(v) if v == false or v == nil or v == '' then return 0 end local n = tonumber(v) @@ -25,6 +28,15 @@ local function to_i(v) return math.floor(n) end +local function hincrby_floor0(key, field, delta) + local v = to_i(redis.call("HINCRBY", key, field, delta)) + if v < 0 then + redis.call("HSET", key, field, "0") + return 0 + end + return v +end + local function group_limit_for(gid) local k_glimit = base .. ":g:" .. gid .. ":limit" local lim = to_i(redis.call("GET", k_glimit)) @@ -35,28 +47,71 @@ end local ids = redis.call("ZRANGEBYSCORE", k_delayed, "-inf", now_ms, "LIMIT", 0, max_promote) local promoted = 0 +local dec_delayed = 0 +local inc_waiting = 0 +local inc_group_waiting = 0 +local inc_waiting_total = 0 +local inc_groups_ready = 0 + for i=1,#ids do local job_id = ids[i] if redis.call("ZREM", k_delayed, job_id) == 1 then local k_job = base .. ":job:" .. job_id redis.call("HSET", k_job, "state", "wait", "updated_ms", tostring(now_ms)) + dec_delayed = dec_delayed - 1 + local gid = redis.call("HGET", k_job, "gid") if gid and gid ~= "" then local k_gwait = base .. ":g:" .. gid .. ":wait" redis.call("RPUSH", k_gwait, job_id) + inc_group_waiting = inc_group_waiting + 1 + inc_waiting_total = inc_waiting_total + 1 + local inflight = to_i(redis.call("GET", base .. ":g:" .. gid .. ":inflight")) local limit = group_limit_for(gid) if inflight < limit then - redis.call("ZADD", k_gready, now_ms, gid) + local added = redis.call("ZADD", k_gready, "NX", now_ms, gid) + if added == 1 then + inc_groups_ready = inc_groups_ready + 1 + end end else redis.call("RPUSH", k_wait, job_id) + + inc_waiting = inc_waiting + 1 + inc_waiting_total = inc_waiting_total + 1 end promoted = promoted + 1 end end -return {"OK", tostring(promoted)} +if promoted > 0 then + redis.call("SADD", k_queues, base) + + if dec_delayed ~= 0 then + hincrby_floor0(k_stats, "delayed", dec_delayed) + end + + if inc_waiting ~= 0 then + redis.call("HINCRBY", k_stats, "waiting", inc_waiting) + end + + if inc_group_waiting ~= 0 then + redis.call("HINCRBY", k_stats, "group_waiting", inc_group_waiting) + end + + if inc_waiting_total ~= 0 then + redis.call("HINCRBY", k_stats, "waiting_total", inc_waiting_total) + end + + if inc_groups_ready ~= 0 then + redis.call("HINCRBY", k_stats, "groups_ready", inc_groups_ready) + end + + redis.call("HSET", k_stats, "last_activity_ms", tostring(now_ms)) +end + +return {"OK", tostring(promoted)} \ No newline at end of file diff --git a/src/omniq/core/scripts/reap_expired.lua b/src/omniq/core/scripts/reap_expired.lua index ae57315..1f17c01 100644 --- a/src/omniq/core/scripts/reap_expired.lua +++ b/src/omniq/core/scripts/reap_expired.lua @@ -18,6 +18,8 @@ local k_active = base .. ":active" local k_delayed = base .. ":delayed" local k_failed = base .. ":failed" local k_gready = base .. ":groups:ready" +local k_stats = base .. ":stats" +local k_queues = "omniq:queues" local function to_i(v) if v == false or v == nil or v == '' then return 0 end @@ -35,6 +37,15 @@ local function dec_floor0(key) return v end +local function hincrby_floor0(key, field, delta) + local v = to_i(redis.call("HINCRBY", key, field, delta)) + if v < 0 then + redis.call("HSET", key, field, "0") + return 0 + end + return v +end + local function group_limit_for(gid) local k_glimit = base .. ":g:" .. gid .. ":limit" local lim = to_i(redis.call("GET", k_glimit)) @@ -45,6 +56,11 @@ end local ids = redis.call("ZRANGEBYSCORE", k_active, "-inf", now_ms, "LIMIT", 0, max_reap) local reaped = 0 +local dec_active = 0 +local inc_delayed = 0 +local inc_failed = 0 +local inc_groups_ready = 0 + for i=1,#ids do local job_id = ids[i] @@ -52,6 +68,8 @@ for i=1,#ids do if score and tonumber(score) and tonumber(score) > now_ms then else if redis.call("ZREM", k_active, job_id) == 1 then + dec_active = dec_active - 1 + local k_job = base .. ":job:" .. job_id if redis.call("EXISTS", k_job) == 0 then @@ -66,7 +84,10 @@ for i=1,#ids do local limit = group_limit_for(gid) local k_gwait = base .. ":g:" .. gid .. ":wait" if inflight < limit and to_i(redis.call("LLEN", k_gwait)) > 0 then - redis.call("ZADD", k_gready, now_ms, gid) + local added = redis.call("ZADD", k_gready, "NX", now_ms, gid) + if added == 1 then + inc_groups_ready = inc_groups_ready + 1 + end end end @@ -83,6 +104,7 @@ for i=1,#ids do "lock_until_ms", "" ) redis.call("LPUSH", k_failed, job_id) + inc_failed = inc_failed + 1 else local due_ms = now_ms + backoff_ms redis.call("HSET", k_job, @@ -93,6 +115,7 @@ for i=1,#ids do "lock_until_ms", "" ) redis.call("ZADD", k_delayed, due_ms, job_id) + inc_delayed = inc_delayed + 1 end reaped = reaped + 1 @@ -101,4 +124,26 @@ for i=1,#ids do end end -return {"OK", tostring(reaped)} +if reaped > 0 then + redis.call("SADD", k_queues, base) + + if dec_active ~= 0 then + hincrby_floor0(k_stats, "active", dec_active) + end + + if inc_delayed ~= 0 then + redis.call("HINCRBY", k_stats, "delayed", inc_delayed) + end + + if inc_failed ~= 0 then + redis.call("HINCRBY", k_stats, "failed", inc_failed) + end + + if inc_groups_ready ~= 0 then + redis.call("HINCRBY", k_stats, "groups_ready", inc_groups_ready) + end + + redis.call("HSET", k_stats, "last_activity_ms", tostring(now_ms)) +end + +return {"OK", tostring(reaped)} \ No newline at end of file diff --git a/src/omniq/core/scripts/remove_job.lua b/src/omniq/core/scripts/remove_job.lua index 26923c3..d24e7a4 100644 --- a/src/omniq/core/scripts/remove_job.lua +++ b/src/omniq/core/scripts/remove_job.lua @@ -28,6 +28,15 @@ local function dec_floor0(key) return v end +local function hincrby_floor0(key, field, delta) + local v = to_i(redis.call("HINCRBY", key, field, delta)) + if v < 0 then + redis.call("HSET", key, field, "0") + return 0 + end + return v +end + local function group_limit_for(base, gid) local lim = to_i(redis.call("GET", base .. ":g:" .. gid .. ":limit")) if lim <= 0 then return DEFAULT_GROUP_LIMIT end @@ -43,6 +52,8 @@ local k_delayed = base .. ":delayed" local k_failed = base .. ":failed" local k_completed = base .. ":completed" local k_gready = base .. ":groups:ready" +local k_stats = base .. ":stats" +local k_queues = "omniq:queues" if redis.call("EXISTS", k_job) ~= 1 then return {"ERR", "NO_JOB"} @@ -86,6 +97,7 @@ if gid ~= "" then end local removed = 0 +local groups_ready_delta = 0 if lane == "wait" then removed = redis.call("LREM", k_wait, 1, job_id) @@ -104,16 +116,27 @@ elseif lane == "completed" then elseif lane == "gwait" then local k_gwait = base .. ":g:" .. gid .. ":wait" + + local was_ready = (redis.call("ZSCORE", k_gready, gid) ~= false) + removed = redis.call("LREM", k_gwait, 1, job_id) local inflight = to_i(redis.call("GET", base .. ":g:" .. gid .. ":inflight")) local limit = group_limit_for(base, gid) local qlen = to_i(redis.call("LLEN", k_gwait)) - if qlen > 0 and inflight < limit then - redis.call("ZADD", k_gready, 0, gid) + local should_be_ready = (qlen > 0 and inflight < limit) + + if should_be_ready then + local added = redis.call("ZADD", k_gready, "NX", 0, gid) + if added == 1 then + groups_ready_delta = groups_ready_delta + 1 + end else - redis.call("ZREM", k_gready, gid) + local removed_ready = redis.call("ZREM", k_gready, gid) + if removed_ready == 1 then + groups_ready_delta = groups_ready_delta - 1 + end end end @@ -123,4 +146,30 @@ end redis.call("DEL", k_job) -return {"OK"} +redis.call("SADD", k_queues, base) + +if lane == "wait" then + hincrby_floor0(k_stats, "waiting", -1) + hincrby_floor0(k_stats, "waiting_total", -1) + +elseif lane == "delayed" then + hincrby_floor0(k_stats, "delayed", -1) + +elseif lane == "failed" then + hincrby_floor0(k_stats, "failed", -1) + +elseif lane == "completed" then + hincrby_floor0(k_stats, "completed_kept", -1) + +elseif lane == "gwait" then + hincrby_floor0(k_stats, "group_waiting", -1) + hincrby_floor0(k_stats, "waiting_total", -1) + + if groups_ready_delta ~= 0 then + hincrby_floor0(k_stats, "groups_ready", groups_ready_delta) + end +end + +redis.call("HSET", k_stats, "last_activity_ms", tostring(redis.call("TIME")[1] * 1000)) + +return {"OK"} \ No newline at end of file diff --git a/src/omniq/core/scripts/remove_jobs_batch.lua b/src/omniq/core/scripts/remove_jobs_batch.lua index 584daf2..7e4c93d 100644 --- a/src/omniq/core/scripts/remove_jobs_batch.lua +++ b/src/omniq/core/scripts/remove_jobs_batch.lua @@ -1,6 +1,7 @@ local anchor = KEYS[1] local lane = ARGV[1] or "" local count = tonumber(ARGV[2] or "0") +local now_ms = tonumber(ARGV[3 + count] or "0") local DEFAULT_GROUP_LIMIT = 1 local MAX_BATCH = 100 @@ -29,6 +30,15 @@ local function dec_floor0(key) return v end +local function hincrby_floor0(key, field, delta) + local v = to_i(redis.call("HINCRBY", key, field, delta)) + if v < 0 then + redis.call("HSET", key, field, "0") + return 0 + end + return v +end + local function group_limit_for(base, gid) local lim = to_i(redis.call("GET", base .. ":g:" .. gid .. ":limit")) if lim <= 0 then return DEFAULT_GROUP_LIMIT end @@ -52,6 +62,8 @@ local k_delayed = base .. ":delayed" local k_failed = base .. ":failed" local k_completed = base .. ":completed" local k_gready = base .. ":groups:ready" +local k_stats = base .. ":stats" +local k_queues = "omniq:queues" local out = {} @@ -80,6 +92,15 @@ if #ARGV < (2 + count) then return {"ERR", "BAD_ARGS"} end +local dec_waiting = 0 +local dec_group_waiting = 0 +local dec_waiting_total = 0 +local dec_delayed = 0 +local dec_failed = 0 +local dec_completed_kept = 0 +local groups_ready_delta = 0 +local removed_ok = 0 + for i = 1, count do local job_id = ARGV[2 + i] if job_id == nil or job_id == "" then @@ -117,6 +138,9 @@ for i = 1, count do push(job_id, "ERR", "NOT_IN_LANE") else redis.call("DEL", k_job) + dec_waiting = dec_waiting - 1 + dec_waiting_total = dec_waiting_total - 1 + removed_ok = removed_ok + 1 push(job_id, "OK", nil) end @@ -124,9 +148,15 @@ for i = 1, count do if redis.call("ZSCORE", k_delayed, job_id) == false then push(job_id, "ERR", "NOT_IN_LANE") else - redis.call("ZREM", k_delayed, job_id) - redis.call("DEL", k_job) - push(job_id, "OK", nil) + removed = redis.call("ZREM", k_delayed, job_id) + if removed <= 0 then + push(job_id, "ERR", "NOT_IN_LANE") + else + redis.call("DEL", k_job) + dec_delayed = dec_delayed - 1 + removed_ok = removed_ok + 1 + push(job_id, "OK", nil) + end end elseif lane == "failed" then @@ -135,6 +165,8 @@ for i = 1, count do push(job_id, "ERR", "NOT_IN_LANE") else redis.call("DEL", k_job) + dec_failed = dec_failed - 1 + removed_ok = removed_ok + 1 push(job_id, "OK", nil) end @@ -144,6 +176,8 @@ for i = 1, count do push(job_id, "ERR", "NOT_IN_LANE") else redis.call("DEL", k_job) + dec_completed_kept = dec_completed_kept - 1 + removed_ok = removed_ok + 1 push(job_id, "OK", nil) end @@ -158,12 +192,21 @@ for i = 1, count do local qlen = to_i(redis.call("LLEN", k_gwait)) if qlen > 0 and inflight < limit then - redis.call("ZADD", k_gready, 0, gid) + local added = redis.call("ZADD", k_gready, "NX", 0, gid) + if added == 1 then + groups_ready_delta = groups_ready_delta + 1 + end else - redis.call("ZREM", k_gready, gid) + local removed_ready = redis.call("ZREM", k_gready, gid) + if removed_ready == 1 then + groups_ready_delta = groups_ready_delta - 1 + end end redis.call("DEL", k_job) + dec_group_waiting = dec_group_waiting - 1 + dec_waiting_total = dec_waiting_total - 1 + removed_ok = removed_ok + 1 push(job_id, "OK", nil) end end @@ -173,4 +216,38 @@ for i = 1, count do end end -return out +if removed_ok > 0 then + redis.call("SADD", k_queues, base) + + if dec_waiting ~= 0 then + hincrby_floor0(k_stats, "waiting", dec_waiting) + end + + if dec_group_waiting ~= 0 then + hincrby_floor0(k_stats, "group_waiting", dec_group_waiting) + end + + if dec_waiting_total ~= 0 then + hincrby_floor0(k_stats, "waiting_total", dec_waiting_total) + end + + if dec_delayed ~= 0 then + hincrby_floor0(k_stats, "delayed", dec_delayed) + end + + if dec_failed ~= 0 then + hincrby_floor0(k_stats, "failed", dec_failed) + end + + if dec_completed_kept ~= 0 then + hincrby_floor0(k_stats, "completed_kept", dec_completed_kept) + end + + if groups_ready_delta ~= 0 then + hincrby_floor0(k_stats, "groups_ready", groups_ready_delta) + end + + redis.call("HSET", k_stats, "last_activity_ms", tostring(now_ms)) +end + +return out \ No newline at end of file diff --git a/src/omniq/core/scripts/reserve.lua b/src/omniq/core/scripts/reserve.lua index 8173882..5adb1f3 100644 --- a/src/omniq/core/scripts/reserve.lua +++ b/src/omniq/core/scripts/reserve.lua @@ -19,12 +19,13 @@ end local DEFAULT_GROUP_LIMIT = 1 local MAX_GROUP_POPS = 10 -local k_wait = base .. ":wait" -local k_active = base .. ":active" -local k_gready = base .. ":groups:ready" -local k_rr = base .. ":lane:rr" - +local k_wait = base .. ":wait" +local k_active = base .. ":active" +local k_gready = base .. ":groups:ready" +local k_rr = base .. ":lane:rr" local k_token_seq = base .. ":lease:seq" +local k_stats = base .. ":stats" +local k_queues = "omniq:queues" local function to_i(v) if v == false or v == nil or v == '' then return 0 end @@ -33,6 +34,15 @@ local function to_i(v) return math.floor(n) end +local function hincrby_floor0(key, field, delta) + local v = to_i(redis.call("HINCRBY", key, field, delta)) + if v < 0 then + redis.call("HSET", key, field, "0") + return 0 + end + return v +end + local function new_lease_token(job_id) local seq = redis.call("INCR", k_token_seq) return redis.sha1hex(job_id .. ":" .. tostring(now_ms) .. ":" .. tostring(seq)) @@ -62,6 +72,12 @@ local function lease_job(job_id) redis.call("ZADD", k_active, lock_until, job_id) + redis.call("HINCRBY", k_stats, "active", 1) + redis.call("HSET", k_stats, + "last_activity_ms", tostring(now_ms), + "last_reserve_ms", tostring(now_ms) + ) + return {"JOB", job_id, payload, tostring(lock_until), tostring(attempt), gid, lease_token} end @@ -70,6 +86,10 @@ local function try_ungrouped() if not job_id then return nil end + + hincrby_floor0(k_stats, "waiting", -1) + hincrby_floor0(k_stats, "waiting_total", -1) + return lease_job(job_id) end @@ -90,6 +110,8 @@ local function try_grouped() local gid = popped[1] if not gid or gid == "" then else + hincrby_floor0(k_stats, "groups_ready", -1) + local k_gwait = base .. ":g:" .. gid .. ":wait" local k_ginflight = base .. ":g:" .. gid .. ":inflight" @@ -97,15 +119,24 @@ local function try_grouped() local limit = group_limit_for(gid) if inflight >= limit then - redis.call("ZADD", k_gready, now_ms + 1, gid) + local added = redis.call("ZADD", k_gready, "NX", now_ms + 1, gid) + if added == 1 then + redis.call("HINCRBY", k_stats, "groups_ready", 1) + end else local job_id = redis.call("LPOP", k_gwait) if not job_id then else inflight = to_i(redis.call("INCR", k_ginflight)) + hincrby_floor0(k_stats, "group_waiting", -1) + hincrby_floor0(k_stats, "waiting_total", -1) + if inflight < limit and to_i(redis.call("LLEN", k_gwait)) > 0 then - redis.call("ZADD", k_gready, now_ms, gid) + local added = redis.call("ZADD", k_gready, "NX", now_ms, gid) + if added == 1 then + redis.call("HINCRBY", k_stats, "groups_ready", 1) + end end return lease_job(job_id) @@ -117,6 +148,8 @@ local function try_grouped() return nil end +redis.call("SADD", k_queues, base) + local rr = to_i(redis.call("GET", k_rr)) local res @@ -138,4 +171,4 @@ else redis.call("SET", k_rr, "0") end -return res +return res \ No newline at end of file diff --git a/src/omniq/core/scripts/retry_failed.lua b/src/omniq/core/scripts/retry_failed.lua index 64d0247..e7ec45f 100644 --- a/src/omniq/core/scripts/retry_failed.lua +++ b/src/omniq/core/scripts/retry_failed.lua @@ -19,6 +19,15 @@ local function to_i(v) return math.floor(n) end +local function hincrby_floor0(key, field, delta) + local v = to_i(redis.call("HINCRBY", key, field, delta)) + if v < 0 then + redis.call("HSET", key, field, "0") + return 0 + end + return v +end + local base = derive_base(anchor) local k_job = base .. ":job:" .. job_id @@ -27,6 +36,8 @@ local k_active = base .. ":active" local k_delayed = base .. ":delayed" local k_failed = base .. ":failed" local k_gready = base .. ":groups:ready" +local k_stats = base .. ":stats" +local k_queues = "omniq:queues" if redis.call("EXISTS", k_job) ~= 1 then return {"ERR", "NO_JOB"} @@ -53,6 +64,12 @@ redis.call("HSET", k_job, local gid = redis.call("HGET", k_job, "gid") or "" +local inc_waiting = 0 +local inc_group_waiting = 0 +local inc_waiting_total = 0 +local dec_failed = -1 +local inc_groups_ready = 0 + if gid ~= "" then local k_gwait = base .. ":g:" .. gid .. ":wait" local k_ginflight = base .. ":g:" .. gid .. ":inflight" @@ -60,15 +77,44 @@ if gid ~= "" then redis.call("RPUSH", k_gwait, job_id) + inc_group_waiting = 1 + inc_waiting_total = 1 + local inflight = to_i(redis.call("GET", k_ginflight)) local limit = to_i(redis.call("GET", k_glimit)) if limit <= 0 then limit = DEFAULT_GROUP_LIMIT end if inflight < limit then - redis.call("ZADD", k_gready, now_ms, gid) + local added = redis.call("ZADD", k_gready, "NX", now_ms, gid) + if added == 1 then + inc_groups_ready = 1 + end end else redis.call("RPUSH", k_wait, job_id) + inc_waiting = 1 + inc_waiting_total = 1 +end + +redis.call("SADD", k_queues, base) +hincrby_floor0(k_stats, "failed", dec_failed) + +if inc_waiting ~= 0 then + redis.call("HINCRBY", k_stats, "waiting", inc_waiting) end -return {"OK"} +if inc_group_waiting ~= 0 then + redis.call("HINCRBY", k_stats, "group_waiting", inc_group_waiting) +end + +if inc_waiting_total ~= 0 then + redis.call("HINCRBY", k_stats, "waiting_total", inc_waiting_total) +end + +if inc_groups_ready ~= 0 then + redis.call("HINCRBY", k_stats, "groups_ready", inc_groups_ready) +end + +redis.call("HSET", k_stats, "last_activity_ms", tostring(now_ms)) + +return {"OK"} \ No newline at end of file diff --git a/src/omniq/core/scripts/retry_failed_batch.lua b/src/omniq/core/scripts/retry_failed_batch.lua index 038ac31..96013aa 100644 --- a/src/omniq/core/scripts/retry_failed_batch.lua +++ b/src/omniq/core/scripts/retry_failed_batch.lua @@ -20,6 +20,15 @@ local function to_i(v) return math.floor(n) end +local function hincrby_floor0(key, field, delta) + local v = to_i(redis.call("HINCRBY", key, field, delta)) + if v < 0 then + redis.call("HSET", key, field, "0") + return 0 + end + return v +end + local base = derive_base(anchor) local k_wait = base .. ":wait" @@ -27,6 +36,8 @@ local k_active = base .. ":active" local k_delayed = base .. ":delayed" local k_failed = base .. ":failed" local k_gready = base .. ":groups:ready" +local k_stats = base .. ":stats" +local k_queues = "omniq:queues" local out = {} @@ -50,6 +61,13 @@ if #ARGV < (2 + count) then return {"ERR", "BAD_ARGS"} end +local dec_failed = 0 +local inc_waiting = 0 +local inc_group_waiting = 0 +local inc_waiting_total = 0 +local inc_groups_ready = 0 +local ok_count = 0 + for i = 1, count do local job_id = ARGV[2 + i] if job_id == nil or job_id == "" then @@ -64,7 +82,6 @@ for i = 1, count do if st ~= "failed" then push(job_id, "ERR", "NOT_FAILED") else - -- cleanup any lane remnants (same as single) redis.call("ZREM", k_active, job_id) redis.call("ZREM", k_delayed, job_id) redis.call("LREM", k_wait, 0, job_id) @@ -81,6 +98,8 @@ for i = 1, count do local gid = redis.call("HGET", k_job, "gid") or "" + dec_failed = dec_failed - 1 + if gid ~= "" then local k_gwait = base .. ":g:" .. gid .. ":wait" local k_ginflight = base .. ":g:" .. gid .. ":inflight" @@ -88,21 +107,56 @@ for i = 1, count do redis.call("RPUSH", k_gwait, job_id) + inc_group_waiting = inc_group_waiting + 1 + inc_waiting_total = inc_waiting_total + 1 + local inflight = to_i(redis.call("GET", k_ginflight)) local limit = to_i(redis.call("GET", k_glimit)) if limit <= 0 then limit = DEFAULT_GROUP_LIMIT end if inflight < limit then - redis.call("ZADD", k_gready, now_ms, gid) + local added = redis.call("ZADD", k_gready, "NX", now_ms, gid) + if added == 1 then + inc_groups_ready = inc_groups_ready + 1 + end end else redis.call("RPUSH", k_wait, job_id) + inc_waiting = inc_waiting + 1 + inc_waiting_total = inc_waiting_total + 1 end + ok_count = ok_count + 1 push(job_id, "OK", nil) end end end end -return out +if ok_count > 0 then + redis.call("SADD", k_queues, base) + + if dec_failed ~= 0 then + hincrby_floor0(k_stats, "failed", dec_failed) + end + + if inc_waiting ~= 0 then + redis.call("HINCRBY", k_stats, "waiting", inc_waiting) + end + + if inc_group_waiting ~= 0 then + redis.call("HINCRBY", k_stats, "group_waiting", inc_group_waiting) + end + + if inc_waiting_total ~= 0 then + redis.call("HINCRBY", k_stats, "waiting_total", inc_waiting_total) + end + + if inc_groups_ready ~= 0 then + redis.call("HINCRBY", k_stats, "groups_ready", inc_groups_ready) + end + + redis.call("HSET", k_stats, "last_activity_ms", tostring(now_ms)) +end + +return out \ No newline at end of file