Add global and per-handler post-receive hooks to EventBus#26
Add global and per-handler post-receive hooks to EventBus#26Gautham Goli (GauthamGoli) wants to merge 3 commits intomainfrom
Conversation
Introduce a global post_receive_hook on EventBus called after every event handler Add optional post_hook argument to the receive decorator for per-handler hooks Both hooks receive the event instance and success status Enables flexible post-processing like resource cleanup or logging after event handling Backward compatible; existing handlers unaffected if hooks are not set
| self._topic_to_event: dict[str, str] = {} | ||
| self._event_to_topic: dict[str, str] = {} | ||
| self._receivers: set[ReceiverWrappedT] = set() | ||
| self.post_receive_hook: Callable[[Event, bool], None] | None = None |
There was a problem hiding this comment.
What kind of things do you foresee the post_receive_hook doing? Can you give me some examples?
Not sure if this is something the framework should handle because this could get hairy and generate more issues to deal with if people start adding complex post-receive methods.
Since a consumer can receive/process millions of events while its up, we can't shouldn't do much cleanup per event like db connection handling. The post hook event needs to be ultra-fast, and allowed to fail as i see in the handling below. Then is it of any use? Is it plainly to add logging? Can't it be added as the last line of the consumer code itself? What does a separate handler give us?
Not saying NO here, but need to make sure this is a cost/complexity we need to pay forever for all events, for all consumers in the future.
What i do foresee is needing hooks that allow us to restart processes say after X events, or Ygb rss memory just like gunicorn/celery. This helps with memory leaks releasing unused memory back.
We might also need to allow timeouts. Eg. each event is allowed maximum of 30 seconds to do its work. This is just like celery. Because without the timeout, even if the consumer acks it might get ignored due to timeouts on kafkas side where(because kafka things consumer is dead and hasn't responded for 15 mins)
There was a problem hiding this comment.
Sid Mitra (@sidmitra) Aiming to achieve similar functionality as task_postrun signal for celery workers, for resource clean up not logging.
Check out def close_db_connection_after_task(*args, **kwargs): in this related PR https://github.com/Airbase/airbase-backend/pull/29832
Introduce a global post_receive_hook on EventBus called after every event handler
Add optional post_hook argument to the receive decorator for per-handler hooks
Both hooks receive the event instance and success status
Enables flexible post-processing like resource cleanup after event handling
Backward compatible; existing handlers unaffected if hooks are not set