From 6676ddd8d0ac17e94d832e0003a87c6aa673b2f4 Mon Sep 17 00:00:00 2001 From: LeoDiSarli Date: Thu, 19 Feb 2026 09:15:02 -0300 Subject: [PATCH 1/4] improving connections --- chaos/connections.py | 101 +++++++++++++++++++++++++++++++++++++++++ src/connections.py | 101 +++++++++++++++++++++++++++++++++++++++++ src/omniq/_ops.py | 10 ++-- src/omniq/client.py | 34 ++++++++++++++ src/omniq/consumer.py | 99 ++++++++++++++++++++++++++-------------- src/omniq/scripts.py | 21 ++++++++- src/omniq/transport.py | 62 ++++++++++++++++++------- 7 files changed, 373 insertions(+), 55 deletions(-) create mode 100644 chaos/connections.py create mode 100644 src/connections.py diff --git a/chaos/connections.py b/chaos/connections.py new file mode 100644 index 0000000..e1e55f5 --- /dev/null +++ b/chaos/connections.py @@ -0,0 +1,101 @@ +import os +import time +import random +import argparse +import subprocess +import sys + +from omniq.client import OmniqClient + + +def run_publish_churn(host: str, port: int, queue: str, n: int, sleep_ms: int) -> None: + for i in range(n): + # create client -> publish -> close (this is the churn case) + uq = OmniqClient(host=host, port=port, client_name=f"omniq-chaos-pub:{os.getpid()}:{i}") + try: + uq.publish(queue=queue, payload={"i": i, "hello": "world"}) + finally: + uq.close() + + if sleep_ms > 0: + time.sleep(sleep_ms / 1000.0) + + +def run_shared_client_publish(host: str, port: int, queue: str, n: int, sleep_ms: int) -> None: + uq = OmniqClient(host=host, port=port, client_name=f"omniq-chaos-shared:{os.getpid()}") + try: + for i in range(n): + uq.publish(queue=queue, payload={"i": i, "mode": "shared"}) + if sleep_ms > 0: + time.sleep(sleep_ms / 1000.0) + finally: + uq.close() + + +def run_consumer_bounce(host: str, port: int, queue: str, runs: int, alive_s: float) -> None: + """ + Starts a consumer in a subprocess and kills it. + - SIGTERM should exit cleanly (your consumer closes client in finally) + - SIGKILL will not run finally (expected), but connections should disappear after OS/Redis detects it + """ + code = f""" +import time +from omniq.client import OmniqClient + +def handler(ctx): + time.sleep(0.2) + +uq = OmniqClient(host={host!r}, port={port}, client_name="omniq-chaos-consumer-subproc") +uq.consume(queue={queue!r}, handler=handler, verbose=False, drain=True) +""" + for i in range(runs): + p = subprocess.Popen([sys.executable, "-c", code]) + time.sleep(alive_s) + + # mostly SIGTERM; sometimes SIGKILL + if random.random() < 0.85: + p.terminate() + else: + p.kill() + + try: + p.wait(timeout=5) + except subprocess.TimeoutExpired: + p.kill() + p.wait() + + time.sleep(0.25) + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--host", default=os.getenv("REDIS_HOST", "omniq-redis")) + ap.add_argument("--port", type=int, default=int(os.getenv("REDIS_PORT", "6379"))) + ap.add_argument("--queue", default=os.getenv("OMNIQ_QUEUE", "chaos")) + ap.add_argument("--n", type=int, default=2000) + ap.add_argument("--sleep-ms", type=int, default=0) + ap.add_argument("--mode", choices=["churn", "shared", "consumer-bounce", "all"], default="all") + args = ap.parse_args() + + print(f"[chaos] host={args.host} port={args.port} queue={args.queue} mode={args.mode}") + + if args.mode in ("churn", "all"): + print("[chaos] publish churn: create->publish->close ...") + run_publish_churn(args.host, args.port, args.queue, args.n, args.sleep_ms) + print("[chaos] publish churn done") + + if args.mode in ("shared", "all"): + print("[chaos] shared client publish ...") + run_shared_client_publish(args.host, args.port, args.queue, args.n, args.sleep_ms) + print("[chaos] shared publish done") + + if args.mode in ("consumer-bounce", "all"): + print("[chaos] consumer bounce (subprocess) ...") + run_consumer_bounce(args.host, args.port, args.queue, runs=30, alive_s=0.8) + print("[chaos] consumer bounce done") + + print("[chaos] done") + + +if __name__ == "__main__": + main() diff --git a/src/connections.py b/src/connections.py new file mode 100644 index 0000000..e1e55f5 --- /dev/null +++ b/src/connections.py @@ -0,0 +1,101 @@ +import os +import time +import random +import argparse +import subprocess +import sys + +from omniq.client import OmniqClient + + +def run_publish_churn(host: str, port: int, queue: str, n: int, sleep_ms: int) -> None: + for i in range(n): + # create client -> publish -> close (this is the churn case) + uq = OmniqClient(host=host, port=port, client_name=f"omniq-chaos-pub:{os.getpid()}:{i}") + try: + uq.publish(queue=queue, payload={"i": i, "hello": "world"}) + finally: + uq.close() + + if sleep_ms > 0: + time.sleep(sleep_ms / 1000.0) + + +def run_shared_client_publish(host: str, port: int, queue: str, n: int, sleep_ms: int) -> None: + uq = OmniqClient(host=host, port=port, client_name=f"omniq-chaos-shared:{os.getpid()}") + try: + for i in range(n): + uq.publish(queue=queue, payload={"i": i, "mode": "shared"}) + if sleep_ms > 0: + time.sleep(sleep_ms / 1000.0) + finally: + uq.close() + + +def run_consumer_bounce(host: str, port: int, queue: str, runs: int, alive_s: float) -> None: + """ + Starts a consumer in a subprocess and kills it. + - SIGTERM should exit cleanly (your consumer closes client in finally) + - SIGKILL will not run finally (expected), but connections should disappear after OS/Redis detects it + """ + code = f""" +import time +from omniq.client import OmniqClient + +def handler(ctx): + time.sleep(0.2) + +uq = OmniqClient(host={host!r}, port={port}, client_name="omniq-chaos-consumer-subproc") +uq.consume(queue={queue!r}, handler=handler, verbose=False, drain=True) +""" + for i in range(runs): + p = subprocess.Popen([sys.executable, "-c", code]) + time.sleep(alive_s) + + # mostly SIGTERM; sometimes SIGKILL + if random.random() < 0.85: + p.terminate() + else: + p.kill() + + try: + p.wait(timeout=5) + except subprocess.TimeoutExpired: + p.kill() + p.wait() + + time.sleep(0.25) + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--host", default=os.getenv("REDIS_HOST", "omniq-redis")) + ap.add_argument("--port", type=int, default=int(os.getenv("REDIS_PORT", "6379"))) + ap.add_argument("--queue", default=os.getenv("OMNIQ_QUEUE", "chaos")) + ap.add_argument("--n", type=int, default=2000) + ap.add_argument("--sleep-ms", type=int, default=0) + ap.add_argument("--mode", choices=["churn", "shared", "consumer-bounce", "all"], default="all") + args = ap.parse_args() + + print(f"[chaos] host={args.host} port={args.port} queue={args.queue} mode={args.mode}") + + if args.mode in ("churn", "all"): + print("[chaos] publish churn: create->publish->close ...") + run_publish_churn(args.host, args.port, args.queue, args.n, args.sleep_ms) + print("[chaos] publish churn done") + + if args.mode in ("shared", "all"): + print("[chaos] shared client publish ...") + run_shared_client_publish(args.host, args.port, args.queue, args.n, args.sleep_ms) + print("[chaos] shared publish done") + + if args.mode in ("consumer-bounce", "all"): + print("[chaos] consumer bounce (subprocess) ...") + run_consumer_bounce(args.host, args.port, args.queue, runs=30, alive_s=0.8) + print("[chaos] consumer bounce done") + + print("[chaos] done") + + +if __name__ == "__main__": + main() diff --git a/src/omniq/_ops.py b/src/omniq/_ops.py index 2e8b8fe..2928cca 100644 --- a/src/omniq/_ops.py +++ b/src/omniq/_ops.py @@ -2,7 +2,7 @@ import redis from dataclasses import dataclass -from typing import Optional, Any, List +from typing import Optional, Any, List, ClassVar from threading import Lock from .clock import now_ms @@ -14,7 +14,7 @@ @dataclass class OmniqOps: - _script_lock = Lock() + _script_lock: ClassVar[Lock] = Lock() r: RedisLike scripts: OmniqScripts @@ -29,7 +29,11 @@ def _evalsha_with_noscript_fallback( return self.r.evalsha(sha, numkeys, *keys_and_args) except redis.exceptions.NoScriptError: with self._script_lock: - return self.r.eval(src, numkeys, *keys_and_args) + try: + return self.r.evalsha(sha, numkeys, *keys_and_args) + except redis.exceptions.NoScriptError: + new_sha = self.r.script_load(src) + return self.r.evalsha(new_sha, numkeys, *keys_and_args) def publish( self, diff --git a/src/omniq/client.py b/src/omniq/client.py index 70d7c0c..6a8d949 100644 --- a/src/omniq/client.py +++ b/src/omniq/client.py @@ -7,6 +7,19 @@ from .types import ReserveResult, AckFailResult from .helper import queue_base +def _safe_close_redis(r: Any) -> None: + if r is None: + return + try: + r.close() + return + except Exception: + pass + try: + r.connection_pool.disconnect() + except Exception: + pass + @dataclass class OmniqClient: _ops: OmniqOps @@ -23,7 +36,10 @@ def __init__( password: Optional[str] = None, ssl: bool = False, scripts_dir: Optional[str] = None, + client_name: Optional[str] = None, ): + self._owns_redis = redis is None + if redis is not None: r = redis else: @@ -38,6 +54,12 @@ def __init__( ssl=ssl, ) ) + + if client_name: + try: + r.client_setname(str(client_name)) + except Exception: + pass if scripts_dir is None: scripts_dir = default_scripts_dir() @@ -45,6 +67,18 @@ def __init__( self._ops = OmniqOps(r=r, scripts=scripts) + def close(self) -> None: + if not getattr(self, "_owns_redis", False): + return + r = getattr(self._ops, "r", None) + _safe_close_redis(r) + + def __enter__(self) -> "OmniqClient": + return self + + def __exit__(self, exc_type, exc, tb) -> None: + self.close() + @staticmethod def queue_base(queue_name: str) -> str: return queue_base(queue_name) diff --git a/src/omniq/consumer.py b/src/omniq/consumer.py index 3429e87..df78252 100644 --- a/src/omniq/consumer.py +++ b/src/omniq/consumer.py @@ -31,36 +31,50 @@ def start_heartbeater( stop_evt = threading.Event() flags: Dict[str, bool] = {"lost": False} + def _lost(msg: str) -> bool: + msg_u = (msg or "").upper() + return ("NOT_ACTIVE" in msg_u) or ("TOKEN_MISMATCH" in msg_u) + def hb_loop(): try: client.heartbeat(queue=queue, job_id=job_id, lease_token=lease_token) except Exception as e: + if stop_evt.is_set(): + return msg = str(e) - if "NOT_ACTIVE" in msg or "TOKEN_MISMATCH" in msg: + if _lost(msg): flags["lost"] = True stop_evt.set() return + time.sleep(min(0.2, max(0.01, float(interval_s)))) - while not stop_evt.wait(interval_s): + while True: + if stop_evt.wait(interval_s): + return try: client.heartbeat(queue=queue, job_id=job_id, lease_token=lease_token) except Exception as e: + if stop_evt.is_set(): + return msg = str(e) - if "NOT_ACTIVE" in msg or "TOKEN_MISMATCH" in msg: + if _lost(msg): flags["lost"] = True stop_evt.set() return + time.sleep(min(0.2, max(0.01, float(interval_s)))) t = threading.Thread(target=hb_loop, daemon=True) t.start() return HeartbeatHandle(stop_evt=stop_evt, flags=flags, thread=t) + def _safe_log(logger: Callable[[str], None], msg: str) -> None: try: logger(msg) except Exception: pass + def _payload_preview(payload: Any, max_len: int = 300) -> str: try: s = payload if isinstance(payload, str) else json.dumps(payload, ensure_ascii=False) @@ -70,6 +84,7 @@ def _payload_preview(payload: Any, max_len: int = 300) -> str: return s[:max_len] + "…" return s + def consume( client: OmniqClient, *, @@ -93,37 +108,35 @@ def consume( ctrl = StopController(stop=False, sigint_count=0) - if stop_on_ctrl_c and threading.current_thread() is threading.main_thread(): - def on_sigterm(signum, _frame): - ctrl.stop = True - if verbose: - _safe_log(logger, f"[consume] SIGTERM received; stopping... queue={queue}") + prev_sigterm = None + prev_sigint = None - signal.signal(signal.SIGTERM, on_sigterm) + try: + if stop_on_ctrl_c and threading.current_thread() is threading.main_thread(): + def on_sigterm(signum, _frame): + ctrl.stop = True + if verbose: + _safe_log(logger, f"[consume] SIGTERM received; stopping... queue={queue}") - if drain: - prev = signal.getsignal(signal.SIGINT) + prev_sigterm = signal.getsignal(signal.SIGTERM) + signal.signal(signal.SIGTERM, on_sigterm) - def on_sigint(signum, frame): - ctrl.sigint_count += 1 - if ctrl.sigint_count >= 2: - if verbose: - _safe_log(logger, f"[consume] SIGINT x2; hard exit now. queue={queue}") - try: - signal.signal(signal.SIGINT, prev if prev else signal.SIG_DFL) - except Exception: - signal.signal(signal.SIGINT, signal.SIG_DFL) - raise KeyboardInterrupt + if drain: + prev_sigint = signal.getsignal(signal.SIGINT) - ctrl.stop = True - if verbose: - _safe_log(logger, f"[consume] Ctrl+C received; draining current job then exiting. queue={queue}") + def on_sigint(signum, frame): + ctrl.sigint_count += 1 + if ctrl.sigint_count >= 2: + if verbose: + _safe_log(logger, f"[consume] SIGINT x2; hard exit now. queue={queue}") + raise KeyboardInterrupt - signal.signal(signal.SIGINT, on_sigint) - else: - pass + ctrl.stop = True + if verbose: + _safe_log(logger, f"[consume] Ctrl+C received; draining current job then exiting. queue={queue}") + + signal.signal(signal.SIGINT, on_sigint) - try: while True: if ctrl.stop: if verbose: @@ -214,8 +227,6 @@ def on_sigint(signum, frame): try: handler(ctx) - hb.stop_evt.set() - if not hb.flags.get("lost", False): try: client.ack_success(queue=queue, job_id=res.job_id, lease_token=res.lease_token) @@ -228,12 +239,9 @@ def on_sigint(signum, frame): except KeyboardInterrupt: if verbose: _safe_log(logger, f"[consume] KeyboardInterrupt; exiting now. queue={queue}") - hb.stop_evt.set() return except Exception as e: - hb.stop_evt.set() - if not hb.flags.get("lost", False): try: err = f"{type(e).__name__}: {e}" @@ -255,7 +263,12 @@ def on_sigint(signum, frame): finally: try: - hb.thread.join(timeout=0.1) + hb.stop_evt.set() + except Exception: + pass + try: + join_timeout = max(0.2, min(2.0, hb_s * 1.5)) + hb.thread.join(timeout=join_timeout) except Exception: pass @@ -268,3 +281,21 @@ def on_sigint(signum, frame): if verbose: _safe_log(logger, f"[consume] KeyboardInterrupt (outer); exiting now. queue={queue}") return + + finally: + if stop_on_ctrl_c and threading.current_thread() is threading.main_thread(): + try: + if prev_sigterm is not None: + signal.signal(signal.SIGTERM, prev_sigterm) + except Exception: + pass + try: + if prev_sigint is not None: + signal.signal(signal.SIGINT, prev_sigint) + except Exception: + pass + + try: + client.close() + except Exception: + pass diff --git a/src/omniq/scripts.py b/src/omniq/scripts.py index 45b9296..3de042d 100644 --- a/src/omniq/scripts.py +++ b/src/omniq/scripts.py @@ -1,15 +1,19 @@ import os from dataclasses import dataclass +from threading import Lock from typing import Protocol + class ScriptLoader(Protocol): def script_load(self, script: str) -> str: ... + @dataclass(frozen=True) class ScriptDef: sha: str src: str + @dataclass(frozen=True) class OmniqScripts: enqueue: ScriptDef @@ -28,11 +32,21 @@ class OmniqScripts: childs_init: ScriptDef child_ack: ScriptDef + def default_scripts_dir() -> str: here = os.path.dirname(__file__) return os.path.join(here, "core", "scripts") +_scripts_cache: dict[str, OmniqScripts] = {} +_scripts_cache_lock = Lock() + + def load_scripts(r: ScriptLoader, scripts_dir: str) -> OmniqScripts: + with _scripts_cache_lock: + cached = _scripts_cache.get(scripts_dir) + if cached is not None: + return cached + def load_one(name: str) -> ScriptDef: path = os.path.join(scripts_dir, name) with open(path, "r", encoding="utf-8") as f: @@ -40,7 +54,7 @@ def load_one(name: str) -> ScriptDef: sha = r.script_load(src) return ScriptDef(sha=sha, src=src) - return OmniqScripts( + scripts = OmniqScripts( enqueue=load_one("enqueue.lua"), reserve=load_one("reserve.lua"), ack_success=load_one("ack_success.lua"), @@ -57,3 +71,8 @@ def load_one(name: str) -> ScriptDef: childs_init=load_one("childs_init.lua"), child_ack=load_one("child_ack.lua"), ) + + with _scripts_cache_lock: + _scripts_cache[scripts_dir] = scripts + + return scripts diff --git a/src/omniq/transport.py b/src/omniq/transport.py index 94414e1..417d920 100644 --- a/src/omniq/transport.py +++ b/src/omniq/transport.py @@ -10,6 +10,7 @@ RedisArg = Union[str, bytes, int, float] + class RedisLike(Protocol): def evalsha(self, sha: str, numkeys: int, *args: RedisArg) -> Any: ... def eval(self, script: str, numkeys: int, *args: RedisArg) -> Any: ... @@ -22,7 +23,7 @@ def zcard(self, key: str) -> int: ... def zrange(self, key: str, start: int, end: int) -> list[Optional[str]]: ... def get(self, key: str) -> Optional[str]: ... def hmget(self, key: str, *fields: str) -> list[Optional[str]]: ... - def zscore(self, key: str, member: str) -> list[Optional[str]]: ... + def zscore(self, key: str, member: str) -> Optional[float]: ... @dataclass(frozen=True) class RedisConnOpts: @@ -33,16 +34,31 @@ class RedisConnOpts: username: Optional[str] = None password: Optional[str] = None ssl: bool = False + socket_timeout: Optional[float] = None socket_connect_timeout: Optional[float] = None + max_connections: Optional[int] = None + health_check_interval: Optional[int] = 30 + socket_keepalive: bool = True + +def _safe_close(client: Any) -> None: + try: + client.close() + return + except Exception: + pass + try: + client.connection_pool.disconnect() + except Exception: + pass + def _looks_like_cluster_error(e: Exception) -> bool: msg = str(e).lower() - return ( "cluster support disabled" in msg or "cluster mode is not enabled" in msg - or "unknown command" in msg and "cluster" in msg + or ("unknown command" in msg and "cluster" in msg) or "this instance has cluster support disabled" in msg or "err this instance has cluster support disabled" in msg or "only (p)subscribe / (p)unsubscribe / ping / quit allowed in this context" in msg @@ -50,28 +66,46 @@ def _looks_like_cluster_error(e: Exception) -> bool: or "ask" in msg ) +def _common_kwargs(opts: RedisConnOpts) -> dict[str, Any]: + kw: dict[str, Any] = { + "decode_responses": True, + "ssl": bool(opts.ssl), + "username": opts.username, + "password": opts.password, + "socket_timeout": opts.socket_timeout, + "socket_connect_timeout": opts.socket_connect_timeout, + "socket_keepalive": bool(opts.socket_keepalive), + } + + if opts.max_connections is not None: + kw["max_connections"] = int(opts.max_connections) + if opts.health_check_interval is not None: + kw["health_check_interval"] = int(opts.health_check_interval) + + return {k: v for k, v in kw.items() if v is not None} + def build_redis_client(opts: RedisConnOpts) -> redis.Redis: + kw = _common_kwargs(opts) + if opts.redis_url: - return redis.Redis.from_url(opts.redis_url, decode_responses=True) + return redis.Redis.from_url(opts.redis_url, **kw) if not opts.host: raise ValueError("RedisConnOpts requires host (or redis_url)") if RedisCluster is not None: + rc = None try: rc = RedisCluster( host=opts.host, port=int(opts.port), - username=opts.username, - password=opts.password, - ssl=bool(opts.ssl), - socket_timeout=opts.socket_timeout, - socket_connect_timeout=opts.socket_connect_timeout, - decode_responses=True, + **kw, ) rc.ping() return rc except Exception as e: + if rc is not None: + _safe_close(rc) if _looks_like_cluster_error(e): pass else: @@ -81,11 +115,5 @@ def build_redis_client(opts: RedisConnOpts) -> redis.Redis: host=opts.host, port=int(opts.port), db=int(opts.db), - username=opts.username, - password=opts.password, - ssl=bool(opts.ssl), - socket_timeout=opts.socket_timeout, - socket_connect_timeout=opts.socket_connect_timeout, - decode_responses=True, + **kw, ) - From f70bc61ed6c8302b8bd00d32b953a2f20671e385 Mon Sep 17 00:00:00 2001 From: LeoDiSarli Date: Thu, 19 Feb 2026 12:42:01 -0300 Subject: [PATCH 2/4] removing test files --- chaos/connections.py | 101 ------------------------------------------- src/connections.py | 101 ------------------------------------------- 2 files changed, 202 deletions(-) delete mode 100644 chaos/connections.py delete mode 100644 src/connections.py diff --git a/chaos/connections.py b/chaos/connections.py deleted file mode 100644 index e1e55f5..0000000 --- a/chaos/connections.py +++ /dev/null @@ -1,101 +0,0 @@ -import os -import time -import random -import argparse -import subprocess -import sys - -from omniq.client import OmniqClient - - -def run_publish_churn(host: str, port: int, queue: str, n: int, sleep_ms: int) -> None: - for i in range(n): - # create client -> publish -> close (this is the churn case) - uq = OmniqClient(host=host, port=port, client_name=f"omniq-chaos-pub:{os.getpid()}:{i}") - try: - uq.publish(queue=queue, payload={"i": i, "hello": "world"}) - finally: - uq.close() - - if sleep_ms > 0: - time.sleep(sleep_ms / 1000.0) - - -def run_shared_client_publish(host: str, port: int, queue: str, n: int, sleep_ms: int) -> None: - uq = OmniqClient(host=host, port=port, client_name=f"omniq-chaos-shared:{os.getpid()}") - try: - for i in range(n): - uq.publish(queue=queue, payload={"i": i, "mode": "shared"}) - if sleep_ms > 0: - time.sleep(sleep_ms / 1000.0) - finally: - uq.close() - - -def run_consumer_bounce(host: str, port: int, queue: str, runs: int, alive_s: float) -> None: - """ - Starts a consumer in a subprocess and kills it. - - SIGTERM should exit cleanly (your consumer closes client in finally) - - SIGKILL will not run finally (expected), but connections should disappear after OS/Redis detects it - """ - code = f""" -import time -from omniq.client import OmniqClient - -def handler(ctx): - time.sleep(0.2) - -uq = OmniqClient(host={host!r}, port={port}, client_name="omniq-chaos-consumer-subproc") -uq.consume(queue={queue!r}, handler=handler, verbose=False, drain=True) -""" - for i in range(runs): - p = subprocess.Popen([sys.executable, "-c", code]) - time.sleep(alive_s) - - # mostly SIGTERM; sometimes SIGKILL - if random.random() < 0.85: - p.terminate() - else: - p.kill() - - try: - p.wait(timeout=5) - except subprocess.TimeoutExpired: - p.kill() - p.wait() - - time.sleep(0.25) - - -def main(): - ap = argparse.ArgumentParser() - ap.add_argument("--host", default=os.getenv("REDIS_HOST", "omniq-redis")) - ap.add_argument("--port", type=int, default=int(os.getenv("REDIS_PORT", "6379"))) - ap.add_argument("--queue", default=os.getenv("OMNIQ_QUEUE", "chaos")) - ap.add_argument("--n", type=int, default=2000) - ap.add_argument("--sleep-ms", type=int, default=0) - ap.add_argument("--mode", choices=["churn", "shared", "consumer-bounce", "all"], default="all") - args = ap.parse_args() - - print(f"[chaos] host={args.host} port={args.port} queue={args.queue} mode={args.mode}") - - if args.mode in ("churn", "all"): - print("[chaos] publish churn: create->publish->close ...") - run_publish_churn(args.host, args.port, args.queue, args.n, args.sleep_ms) - print("[chaos] publish churn done") - - if args.mode in ("shared", "all"): - print("[chaos] shared client publish ...") - run_shared_client_publish(args.host, args.port, args.queue, args.n, args.sleep_ms) - print("[chaos] shared publish done") - - if args.mode in ("consumer-bounce", "all"): - print("[chaos] consumer bounce (subprocess) ...") - run_consumer_bounce(args.host, args.port, args.queue, runs=30, alive_s=0.8) - print("[chaos] consumer bounce done") - - print("[chaos] done") - - -if __name__ == "__main__": - main() diff --git a/src/connections.py b/src/connections.py deleted file mode 100644 index e1e55f5..0000000 --- a/src/connections.py +++ /dev/null @@ -1,101 +0,0 @@ -import os -import time -import random -import argparse -import subprocess -import sys - -from omniq.client import OmniqClient - - -def run_publish_churn(host: str, port: int, queue: str, n: int, sleep_ms: int) -> None: - for i in range(n): - # create client -> publish -> close (this is the churn case) - uq = OmniqClient(host=host, port=port, client_name=f"omniq-chaos-pub:{os.getpid()}:{i}") - try: - uq.publish(queue=queue, payload={"i": i, "hello": "world"}) - finally: - uq.close() - - if sleep_ms > 0: - time.sleep(sleep_ms / 1000.0) - - -def run_shared_client_publish(host: str, port: int, queue: str, n: int, sleep_ms: int) -> None: - uq = OmniqClient(host=host, port=port, client_name=f"omniq-chaos-shared:{os.getpid()}") - try: - for i in range(n): - uq.publish(queue=queue, payload={"i": i, "mode": "shared"}) - if sleep_ms > 0: - time.sleep(sleep_ms / 1000.0) - finally: - uq.close() - - -def run_consumer_bounce(host: str, port: int, queue: str, runs: int, alive_s: float) -> None: - """ - Starts a consumer in a subprocess and kills it. - - SIGTERM should exit cleanly (your consumer closes client in finally) - - SIGKILL will not run finally (expected), but connections should disappear after OS/Redis detects it - """ - code = f""" -import time -from omniq.client import OmniqClient - -def handler(ctx): - time.sleep(0.2) - -uq = OmniqClient(host={host!r}, port={port}, client_name="omniq-chaos-consumer-subproc") -uq.consume(queue={queue!r}, handler=handler, verbose=False, drain=True) -""" - for i in range(runs): - p = subprocess.Popen([sys.executable, "-c", code]) - time.sleep(alive_s) - - # mostly SIGTERM; sometimes SIGKILL - if random.random() < 0.85: - p.terminate() - else: - p.kill() - - try: - p.wait(timeout=5) - except subprocess.TimeoutExpired: - p.kill() - p.wait() - - time.sleep(0.25) - - -def main(): - ap = argparse.ArgumentParser() - ap.add_argument("--host", default=os.getenv("REDIS_HOST", "omniq-redis")) - ap.add_argument("--port", type=int, default=int(os.getenv("REDIS_PORT", "6379"))) - ap.add_argument("--queue", default=os.getenv("OMNIQ_QUEUE", "chaos")) - ap.add_argument("--n", type=int, default=2000) - ap.add_argument("--sleep-ms", type=int, default=0) - ap.add_argument("--mode", choices=["churn", "shared", "consumer-bounce", "all"], default="all") - args = ap.parse_args() - - print(f"[chaos] host={args.host} port={args.port} queue={args.queue} mode={args.mode}") - - if args.mode in ("churn", "all"): - print("[chaos] publish churn: create->publish->close ...") - run_publish_churn(args.host, args.port, args.queue, args.n, args.sleep_ms) - print("[chaos] publish churn done") - - if args.mode in ("shared", "all"): - print("[chaos] shared client publish ...") - run_shared_client_publish(args.host, args.port, args.queue, args.n, args.sleep_ms) - print("[chaos] shared publish done") - - if args.mode in ("consumer-bounce", "all"): - print("[chaos] consumer bounce (subprocess) ...") - run_consumer_bounce(args.host, args.port, args.queue, runs=30, alive_s=0.8) - print("[chaos] consumer bounce done") - - print("[chaos] done") - - -if __name__ == "__main__": - main() From 4f17d9a663620e657d2cc5f80e93e6c4c86c457c Mon Sep 17 00:00:00 2001 From: LeoDiSarli Date: Thu, 19 Feb 2026 14:32:39 -0300 Subject: [PATCH 3/4] code review improvements --- src/omniq/_ops.py | 1 - src/omniq/exec.py | 1 - src/omniq/scripts.py | 5 ----- src/omniq/transport.py | 1 - 4 files changed, 8 deletions(-) diff --git a/src/omniq/_ops.py b/src/omniq/_ops.py index 2928cca..9059c62 100644 --- a/src/omniq/_ops.py +++ b/src/omniq/_ops.py @@ -504,7 +504,6 @@ def child_ack(self, *, key: str, child_id: str) -> int: except Exception: return -1 - @staticmethod def paused_backoff_s(poll_interval_s: float) -> float: return max(0.25, float(poll_interval_s) * 10.0) diff --git a/src/omniq/exec.py b/src/omniq/exec.py index 11f6ea2..5bb3df9 100644 --- a/src/omniq/exec.py +++ b/src/omniq/exec.py @@ -3,7 +3,6 @@ from .client import OmniqClient - @dataclass(frozen=True) class Exec: client: OmniqClient diff --git a/src/omniq/scripts.py b/src/omniq/scripts.py index 3de042d..f7830f5 100644 --- a/src/omniq/scripts.py +++ b/src/omniq/scripts.py @@ -3,17 +3,14 @@ from threading import Lock from typing import Protocol - class ScriptLoader(Protocol): def script_load(self, script: str) -> str: ... - @dataclass(frozen=True) class ScriptDef: sha: str src: str - @dataclass(frozen=True) class OmniqScripts: enqueue: ScriptDef @@ -32,7 +29,6 @@ class OmniqScripts: childs_init: ScriptDef child_ack: ScriptDef - def default_scripts_dir() -> str: here = os.path.dirname(__file__) return os.path.join(here, "core", "scripts") @@ -40,7 +36,6 @@ def default_scripts_dir() -> str: _scripts_cache: dict[str, OmniqScripts] = {} _scripts_cache_lock = Lock() - def load_scripts(r: ScriptLoader, scripts_dir: str) -> OmniqScripts: with _scripts_cache_lock: cached = _scripts_cache.get(scripts_dir) diff --git a/src/omniq/transport.py b/src/omniq/transport.py index 417d920..9717ee1 100644 --- a/src/omniq/transport.py +++ b/src/omniq/transport.py @@ -10,7 +10,6 @@ RedisArg = Union[str, bytes, int, float] - class RedisLike(Protocol): def evalsha(self, sha: str, numkeys: int, *args: RedisArg) -> Any: ... def eval(self, script: str, numkeys: int, *args: RedisArg) -> Any: ... From 416ddb482fc84821da54850201598c9b431afc05 Mon Sep 17 00:00:00 2001 From: LeoDiSarli Date: Thu, 19 Feb 2026 14:34:30 -0300 Subject: [PATCH 4/4] code review improvements --- src/omniq/consumer.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/omniq/consumer.py b/src/omniq/consumer.py index df78252..c946e24 100644 --- a/src/omniq/consumer.py +++ b/src/omniq/consumer.py @@ -67,14 +67,12 @@ def hb_loop(): t.start() return HeartbeatHandle(stop_evt=stop_evt, flags=flags, thread=t) - def _safe_log(logger: Callable[[str], None], msg: str) -> None: try: logger(msg) except Exception: pass - def _payload_preview(payload: Any, max_len: int = 300) -> str: try: s = payload if isinstance(payload, str) else json.dumps(payload, ensure_ascii=False) @@ -84,7 +82,6 @@ def _payload_preview(payload: Any, max_len: int = 300) -> str: return s[:max_len] + "…" return s - def consume( client: OmniqClient, *,