From 7254dad32d1c911ce5eb0693a19b565856469b73 Mon Sep 17 00:00:00 2001 From: Tomas Farago Date: Fri, 21 Nov 2025 16:29:34 +0100 Subject: [PATCH 01/15] Make zmq connection an AsyncObject which allows us to wait for stopping receiver's subscription, which in turn enables us to have unreliable tango processing servers (unreliable=zmq.PUB/SUB) --- concert/devices/cameras/base.py | 6 +- concert/ext/tangoservers/base.py | 78 +++++++++++++-- concert/ext/tangoservers/reco.py | 4 +- concert/ext/viewers.py | 9 +- concert/networking/base.py | 115 ++++++++++++++++------ concert/storage.py | 2 +- concert/tests/unit/devices/test_camera.py | 4 +- concert/tests/unit/test_networking.py | 65 ++++++------ 8 files changed, 200 insertions(+), 83 deletions(-) diff --git a/concert/devices/cameras/base.py b/concert/devices/cameras/base.py index e1d887388..d35384643 100644 --- a/concert/devices/cameras/base.py +++ b/concert/devices/cameras/base.py @@ -253,7 +253,7 @@ async def unregister_endpoint(self, endpoint: CommData) -> None: :type endpoint: concert.helpers.CommData """ if endpoint in self._senders: - self._senders[endpoint].close() + await self._senders[endpoint].close() del self._senders[endpoint] async def register_endpoint(self, endpoint: CommData) -> None: @@ -266,7 +266,7 @@ async def register_endpoint(self, endpoint: CommData) -> None: if endpoint in self._senders: raise ValueError("zmq endpoint already in list") - self._senders[endpoint] = ZmqSender( + self._senders[endpoint] = await ZmqSender( endpoint.server_endpoint, reliable=endpoint.socket_type == zmq.PUSH, sndhwm=endpoint.sndhwm @@ -274,7 +274,7 @@ async def register_endpoint(self, endpoint: CommData) -> None: async def unregister_all(self) -> None: for sender in self._senders.values(): - sender.close() + await sender.close() self._senders = {} @abstractmethod diff --git a/concert/ext/tangoservers/base.py b/concert/ext/tangoservers/base.py index 2bd72c3a2..437a826ac 100644 --- a/concert/ext/tangoservers/base.py +++ b/concert/ext/tangoservers/base.py @@ -20,9 +20,27 @@ class TangoRemoteProcessing(Device, metaclass=DeviceMeta): fset="set_endpoint" ) + receiver_reliable = attribute( + label="Is ZMQ receiver reliable or not", + dtype=bool, + access=AttrWriteType.READ_WRITE, + fget="get_receiver_reliable", + fset="set_receiver_reliable" + ) + + receiver_rcvhwm = attribute( + label="Receive high water mark for receiver", + dtype=int, + access=AttrWriteType.READ_WRITE, + fget="get_receiver_rcvhwm", + fset="set_receiver_rcvhwm" + ) + def __init__(self, cl, name): - self._endpoint = None - self._receiver = ZmqReceiver() + self._endpoint = "" + self._receiver_reliable = True + self._receiver_rcvhm = 0 + self._receiver = None self._task = None super().__init__(cl, name) @@ -32,24 +50,36 @@ async def init_device(self): await super().init_device() if self._task and not self._task.done(): self.debug_stream("Cancelling task: %s", self._task.cancel()) - if self._receiver.endpoint: - self._receiver.connect(self._receiver.endpoint) + if self._endpoint: + await self._create_and_connect_receiver() self.set_state(tango.DevState.STANDBY) + @DebugIt() + async def _create_and_connect_receiver(self): + if not self._receiver: + self._receiver = await ZmqReceiver( + endpoint=self._endpoint, + reliable=self._receiver_reliable, + rcvhwm=self._receiver_rcvhm + ) + + await self._receiver.connect(self._endpoint) + @DebugIt() @command() async def connect_endpoint(self): """Connect to the zmq endpoint.""" if not self._endpoint: raise RuntimeError("Endpoint not set") - self._receiver.connect(self._endpoint) + await self._create_and_connect_receiver() @DebugIt() @command() async def disconnect_endpoint(self): """Disconnect from the zmq endpoint.""" if self._receiver: - self._receiver.close() + await self._receiver.close() + self._receiver = None @DebugIt() @command() @@ -57,22 +87,46 @@ async def reset_connection(self): """Stop receiving data forever.""" if not self._endpoint: raise RuntimeError('Endpoint not set') - self._receiver.close() - self._receiver.connect(self._endpoint) + await self._receiver.close() + self._receiver = None + await self._create_and_connect_receiver() @InfoIt() async def get_endpoint(self): """Get current endpoint.""" - return self._receiver.endpoint if self._receiver.endpoint else '' + return self._endpoint @InfoIt(show_args=True) async def set_endpoint(self, endpoint): """Set endpoint.""" if self._task and not self._task.done(): raise RuntimeError("Endpoint cannot be set while streaming") - self._receiver.connect(endpoint) self._endpoint = endpoint + @InfoIt() + async def get_receiver_reliable(self): + """Get if ZMQ receiver is reliable or not.""" + return self._receiver_reliable + + @InfoIt(show_args=True) + async def set_receiver_reliable(self, reliable): + """Set if ZMQ receiver is reliable or not.""" + if self._task and not self._task.done(): + raise RuntimeError("Receiver options cannot be set while streaming") + self._receiver_reliable = reliable + + @InfoIt() + async def get_receiver_rcvhwm(self): + """Get receiver high water mark.""" + return self._receiver_rcvhm + + @InfoIt(show_args=True) + async def set_receiver_rcvhwm(self, rcvhwm): + """Set receiver high water mark.""" + if self._task and not self._task.done(): + raise RuntimeError("Receiver options cannot be set while streaming") + self._receiver_rcvhm = rcvhwm + async def _process_stream(self, consumer_coro): """Process the data stream by *consumer_coro* and handle state.""" def callback(task): @@ -93,6 +147,10 @@ def callback(task): if self._task and not self._task.done(): raise RuntimeError("Previous stream still running") + + if not self._receiver: + await self._create_and_connect_receiver() + self._task = start(consumer_coro) self._task.add_done_callback(callback) self.set_state(tango.DevState.RUNNING) diff --git a/concert/ext/tangoservers/reco.py b/concert/ext/tangoservers/reco.py index 62e0731e5..d4e9bb34f 100644 --- a/concert/ext/tangoservers/reco.py +++ b/concert/ext/tangoservers/reco.py @@ -151,8 +151,8 @@ async def setup_walker(self, args): bytes_per_file=2 ** 40, ) if self._sender: - self._sender.close() - self._sender = ZmqSender(endpoint=f"{protocol}://*:{port}", reliable=True) + await self._sender.close() + self._sender = await ZmqSender(endpoint=f"{protocol}://*:{port}", reliable=True) async def _reconstruct(self, cached=False, slice_directory=""): if cached: diff --git a/concert/ext/viewers.py b/concert/ext/viewers.py index c82098d5a..68a1d06e0 100644 --- a/concert/ext/viewers.py +++ b/concert/ext/viewers.py @@ -409,7 +409,9 @@ def subscribe(self, address): self._loop = asyncio.get_event_loop() if self._receiver: self.unsubscribe() - self._receiver = ZmqReceiver(endpoint=address, reliable=False, rcvhwm=1) + self._receiver = self._loop.run_until_complete( + ZmqReceiver(endpoint=address, reliable=False, rcvhwm=1) + ) def recv_array(self): available = self._loop.run_until_complete(self._receiver.is_message_available()) @@ -425,8 +427,11 @@ def recv_array(self): return available def unsubscribe(self, arg): + if not self._loop: + return + if self._receiver: - self._receiver.close() + self._loop.run_until_complete(self._receiver.close()) self._receiver = None diff --git a/concert/networking/base.py b/concert/networking/base.py index fbd9a17c2..d6f6b8855 100644 --- a/concert/networking/base.py +++ b/concert/networking/base.py @@ -7,6 +7,7 @@ import time import zmq import zmq.asyncio +from concert.base import AsyncObject from concert.quantities import q from concert.config import AIODEBUG, PERFDEBUG from concert.helpers import ImageWithMetadata @@ -130,6 +131,11 @@ def zmq_create_image_metadata(image): return result +async def zmq_receive_json(socket): + """Receive a dictionary from a zmq *socket*.""" + return await socket.recv_json() + + async def zmq_receive_image(socket): """Receive image data from a zmq *socket*.""" if socket is None: @@ -154,6 +160,13 @@ async def zmq_receive_image(socket): return (metadata, array) +async def zmq_send_json(socket, metadata): + try: + await socket.send_json(metadata, zmq.NOBLOCK) + except zmq.Again: + LOG.debug('No listeners or queue full on %s', socket.get(zmq.LAST_ENDPOINT)) + + async def zmq_send_image(socket, image, metadata=None): """Send *image* over zmq *socket*. If *metadata* is None, it is created. If *image* is None, {'end': True} is sent in metadata and no actual payload is sent. @@ -193,7 +206,7 @@ def zmq_setup_sending_socket(context, endpoint, reliable, sndhwm): return socket -class ZmqBase(abc.ABC): +class ZmqBase(AsyncObject, abc.ABC): """ Base for sending/receiving zmq image streams. @@ -207,7 +220,7 @@ class ZmqBase(abc.ABC): is approximate. """ - def __init__(self, endpoint=None, reliable=True, polling_timeout=100 * q.ms, timeout=None): + async def __ainit__(self, endpoint=None, reliable=True, polling_timeout=100 * q.ms, timeout=None): self._endpoint = endpoint self._poller = zmq.asyncio.Poller() self._polling_timeout = polling_timeout @@ -216,14 +229,14 @@ def __init__(self, endpoint=None, reliable=True, polling_timeout=100 * q.ms, tim self._context = zmq.asyncio.Context() self._socket = None if endpoint: - self.connect(endpoint) + await self.connect(endpoint) @property def endpoint(self): """endpoint in form transport://address""" return self._endpoint - def connect(self, endpoint): + async def connect(self, endpoint): """Connect to an *endpoint* which must not be None. If it's the same one as the one we are connected to now nothing happens, otherwise current socket is disconnected and the new connection is made. @@ -236,24 +249,24 @@ def connect(self, endpoint): # We are connected to the desired endpoint already return # New endpoint specified, first disconnect - self.close() + await self.close() self._endpoint = endpoint self._setup_socket() - def close(self): + async def close(self): """Close the socket.""" if self._socket: self._socket.close() self._socket = None - def __enter__(self): + async def __aenter__(self): LOG.log(AIODEBUG, 'ZMQ Socket connection enter') return self - def __exit__(self, exc_type, exc, tb): + async def __aexit__(self, exc_type, exc, tb): LOG.log(AIODEBUG, 'ZMQ Socket connection exit') - self.close() + await self.close() @abstractmethod def _setup_socket(self): @@ -275,7 +288,7 @@ class ZmqSender(ZmqBase): is approximate. """ - def __init__( + async def __ainit__( self, endpoint=None, reliable=True, @@ -284,7 +297,7 @@ def __init__( timeout=None ): self._sndhwm = sndhwm - super().__init__( + await super().__ainit__( endpoint=endpoint, reliable=reliable, polling_timeout=polling_timeout, @@ -301,10 +314,10 @@ def _setup_socket(self): ) self._poller.register(self._socket, zmq.POLLOUT) - def close(self): + async def close(self): if self._socket: self._poller.unregister(self._socket) - super().close() + await super().close() async def is_consumer_available(self, polling_timeout=None): """Wait on the socket *polling_timeout* and if a consumer is available return True, False @@ -318,8 +331,8 @@ async def is_consumer_available(self, polling_timeout=None): return self._socket in sockets and sockets[self._socket] == zmq.POLLOUT - async def send_image(self, image): - """Send *image*.""" + async def _send_payload(self, payload, send_func): + """Send *payload*.""" timeout = self._timeout.to(q.s).magnitude if self._timeout else None num_tries = 0 st = time.perf_counter() @@ -328,7 +341,7 @@ async def send_image(self, image): if not self._socket: raise NetworkingError("Cannot send over a closed socket.") if await self.is_consumer_available(): - await zmq_send_image(self._socket, image) + await send_func(self._socket, payload) break else: num_tries += 1 @@ -336,6 +349,14 @@ async def send_image(self, image): if timeout and time.perf_counter() - st > timeout: raise TimeoutError("Sending timeout exceeded") + async def send_image(self, image): + """Send *image*.""" + await self._send_payload(image, zmq_send_image) + + async def send_json(self, metadata): + """Send *metadata*, which is a dictionary.""" + await self._send_payload(metadata, zmq_send_json) + class ZmqReceiver(ZmqBase): @@ -351,7 +372,7 @@ class ZmqReceiver(ZmqBase): is approximate. """ - def __init__( + async def __ainit__( self, endpoint=None, reliable=True, @@ -360,8 +381,9 @@ def __init__( timeout=None ): self._rcvhwm = rcvhwm + self._stopped = None self._request_stop = False - super().__init__( + await super().__ainit__( endpoint=endpoint, reliable=reliable, polling_timeout=polling_timeout, @@ -379,14 +401,28 @@ def _setup_socket(self): self._socket.connect(self._endpoint) self._poller.register(self._socket, zmq.POLLIN) - def stop(self): - """Stop receiving data.""" + async def stop(self): + """ + Stop receiving data. Never await this in an async for loop like this:: + async for _ in receiver.subscribe(): + await receiver.stop() + + becuase it will lead to deadlock. + """ + if not self._stopped: + # Not subscribed + return + self._request_stop = True + await self._stopped.wait() - def close(self): + async def close(self): + """Stop receiving and close the socket.""" + # Stop receiving data + await self.stop() if self._socket: self._poller.unregister(self._socket) - super().close() + await super().close() async def is_message_available(self, polling_timeout=None): """Wait on the socket *polling_timeout* and if an image is available return True, False @@ -400,8 +436,8 @@ async def is_message_available(self, polling_timeout=None): return self._socket in sockets and sockets[self._socket] == zmq.POLLIN - async def receive_image(self): - """Receive image.""" + async def _receive_payload(self, recv_func): + """Receive payload with *recv_func*.""" timeout = self._timeout.to(q.s).magnitude if self._timeout else None num_tries = 0 st = time.perf_counter() @@ -411,19 +447,35 @@ async def receive_image(self): raise NetworkingError("Cannot receive over a closed socket.") if await self.is_message_available(): # There is something to consume - return await zmq_receive_image(self._socket) + return await recv_func(self._socket) elif self._request_stop: - return (None, None) + return None else: num_tries += 1 if timeout and time.perf_counter() - st > timeout: raise TimeoutError("Receiving timeout exceeded") + async def receive_image(self): + """Receive *image* with metadata.""" + result = await self._receive_payload(zmq_receive_image) + + if result is None: + result = (None, None) + + return result + + async def receive_json(self): + """Receive metadata as a dictionary.""" + return await self._receive_payload(zmq_receive_json) + async def subscribe(self, return_metadata=False): """Receive images.""" - i = 0 - finished = False + if self._stopped: + self._stopped.clear() + else: + self._stopped = asyncio.Event() + i = 0 try: while True: metadata, image = await self.receive_image() @@ -457,6 +509,7 @@ async def subscribe(self, return_metadata=False): flush_i += 1 if flush_i: LOG.debug("Flushed %d messages", flush_i) + self._stopped.set() class ZmqBroadcaster(ZmqReceiver): @@ -471,8 +524,8 @@ class ZmqBroadcaster(ZmqReceiver): high water mark (use 1 for always getting the newest image, only applicable for non-reliable case) """ - def __init__(self, endpoint, broadcast_endpoints, polling_timeout=100 * q.ms): - super().__init__(endpoint, polling_timeout=polling_timeout) + async def __ainit__(self, endpoint, broadcast_endpoints, polling_timeout=100 * q.ms): + await super().__ainit__(endpoint, polling_timeout=polling_timeout) self._broadcast_sockets = set([]) self._poller_out = zmq.asyncio.Poller() self._finished = None @@ -559,7 +612,7 @@ async def shutdown(self): if self._finished and not self._finished.is_set(): # We have been started and are not finished - self.stop() + await self.stop() # Wait for the forwarding to finish gracefully await self._finished.wait() diff --git a/concert/storage.py b/concert/storage.py index 19d6faf38..c02e6aaa1 100644 --- a/concert/storage.py +++ b/concert/storage.py @@ -587,7 +587,7 @@ async def _create_writer(self, producer: AsyncIterable[ArrayLike], try: f = self.device.write_sequence("") - with ZmqSender( + async with await ZmqSender( self._commdata.server_endpoint, reliable=self._commdata.socket_type == zmq.PUSH, sndhwm=self._commdata.sndhwm diff --git a/concert/tests/unit/devices/test_camera.py b/concert/tests/unit/devices/test_camera.py index 449c92dae..ada0ce5fb 100644 --- a/concert/tests/unit/devices/test_camera.py +++ b/concert/tests/unit/devices/test_camera.py @@ -101,14 +101,14 @@ async def grab(): await self.camera.register_endpoint( CommData("localhost", 8991 + i, "tcp", zmq.PUSH, 0) ) - receiver = ZmqReceiver(endpoint=f"tcp://localhost:{8991+i}") + receiver = await ZmqReceiver(endpoint=f"tcp://localhost:{8991+i}") self.camera.set_mirror(mirrored) self.camera.set_rotate(rotated) async with self.camera.recording(): await self.camera.grab_send(1) (metadata, image) = await receiver.receive_image() - receiver.close() + await receiver.close() i += 1 meta = {"mirror": mirrored, "rotate": rotated} np.testing.assert_equal( diff --git a/concert/tests/unit/test_networking.py b/concert/tests/unit/test_networking.py index 4774c2c0d..c5e0066cf 100644 --- a/concert/tests/unit/test_networking.py +++ b/concert/tests/unit/test_networking.py @@ -18,29 +18,29 @@ SERVER = "tcp://*:9999" -def setup_broadcaster(): - sender = ZmqSender(endpoint="tcp://*:19997") +async def setup_broadcaster(): + sender = await ZmqSender(endpoint="tcp://*:19997") senders = (("tcp://*:19998", True, None), ("tcp://*:19999", False, 1)) - broadcast = ZmqBroadcaster("tcp://localhost:19997", senders) + broadcast = await ZmqBroadcaster("tcp://localhost:19997", senders) # Receivers must be created after broadcast servers, otherwise first image would be lost. This # however does not happen at normal runtime, only during tests. - receiver_1 = ZmqReceiver(endpoint="tcp://localhost:19998", reliable=True) - receiver_2 = ZmqReceiver(endpoint="tcp://localhost:19999", reliable=False, rcvhwm=1) + receiver_1 = await ZmqReceiver(endpoint="tcp://localhost:19998", reliable=True) + receiver_2 = await ZmqReceiver(endpoint="tcp://localhost:19999", reliable=False, rcvhwm=1) return (sender, broadcast, receiver_1, receiver_2) class TestZmq(TestCase): - def setUp(self): + async def asyncSetUp(self): super(TestZmq, self).setUp() self.context = zmq.asyncio.Context() - self.sender = ZmqSender(endpoint=SERVER) - self.receiver = ZmqReceiver(endpoint=CLIENT) + self.sender = await ZmqSender(endpoint=SERVER) + self.receiver = await ZmqReceiver(endpoint=CLIENT) self.image = np.ones((12, 16), dtype="uint16") - def tearDown(self): - self.sender.close() - self.receiver.close() + async def asyncTearDown(self): + await self.sender.close() + await self.receiver.close() def test_zmq_create_image_metadata(self): # numpy @@ -58,28 +58,28 @@ def test_zmq_create_image_metadata(self): # End self.assertEqual(zmq_create_image_metadata(None), {"end": True}) - def test_connect(self): + async def test_connect(self): # re-connection must work - self.receiver.connect(CLIENT) - self.sender.connect(SERVER) + await self.receiver.connect(CLIENT) + await self.sender.connect(SERVER) async def test_close(self): - self.sender.close() + await self.sender.close() with self.assertRaises(NetworkingError): await self.sender.send_image(self.image) async def test_contextmanager(self): - self.sender.close() - with ZmqSender(SERVER) as sender: + await self.sender.close() + async with await ZmqSender(SERVER) as sender: pass with self.assertRaises(NetworkingError): await sender.send_image(self.image) - def test_sndhwm(self): - self.sender.close() + async def test_sndhwm(self): + await self.sender.close() with self.assertRaises(ValueError): - sender = ZmqSender(endpoint=SERVER, sndhwm=-1) + sender = await ZmqSender(endpoint=SERVER, sndhwm=-1) async def test_is_message_available(self): self.assertFalse(await self.receiver.is_message_available(polling_timeout=10 * q.ms)) @@ -93,11 +93,11 @@ async def test_send_receive(self): async def test_publish_subscribe(self): # Make new ones - self.sender.close() - self.receiver.close() + await self.sender.close() + await self.receiver.close() - sender = ZmqSender(endpoint=SERVER, reliable=False, sndhwm=1) - receiver = ZmqReceiver(endpoint=CLIENT, reliable=False, rcvhwm=1) + sender = await ZmqSender(endpoint=SERVER, reliable=False, sndhwm=1) + receiver = await ZmqReceiver(endpoint=CLIENT, reliable=False, rcvhwm=1) # Start ahead to make sure we catch the image f = start(receiver.receive_image()) await start(sender.send_image(self.image)) @@ -113,25 +113,26 @@ async def test_subscribe(self): pass # Stop requested - await start(self.sender.send_image(self.image)) - await start(self.sender.send_image(self.image)) + for i in range(10): + await start(self.sender.send_image(self.image)) await start(self.sender.send_image(None)) i = 0 async for _ in self.receiver.subscribe(return_metadata=False): i += 1 - self.receiver.stop() + f = start(self.receiver.stop()) - self.assertEqual(i, 1) + await f + self.assertLessEqual(1, i) async def test_broadcast_immediate_shutdown(self): - sender, broadcast, receiver_1, receiver_2 = setup_broadcaster() + sender, broadcast, receiver_1, receiver_2 = await setup_broadcaster() f = start(broadcast.serve()) await broadcast.shutdown() await asyncio.wait_for(f, 1) async def test_broadcast(self): - sender, broadcast, receiver_1, receiver_2 = setup_broadcaster() + sender, broadcast, receiver_1, receiver_2 = await setup_broadcaster() f = start(broadcast.serve()) # Start ahead to make sure we catch the image f_sub = start(receiver_2.receive_image()) @@ -145,11 +146,11 @@ async def test_broadcast(self): await f async def test_receiver_timeout(self): - receiver = ZmqReceiver(endpoint=CLIENT, reliable=True, timeout=0.3 * q.s) + receiver = await ZmqReceiver(endpoint=CLIENT, reliable=True, timeout=0.3 * q.s) with self.assertRaises(TimeoutError): await receiver.receive_image() async def test_sender_timeout(self): - sender = ZmqSender(endpoint="tcp://*:19999", timeout=0.3 * q.s) + sender = await ZmqSender(endpoint="tcp://*:19999", timeout=0.3 * q.s) with self.assertRaises(TimeoutError): await sender.send_image(self.image) From 90d6416f0bbc58d31825bde220d69b43e7b70d0a Mon Sep 17 00:00:00 2001 From: Tomas Farago Date: Fri, 21 Nov 2025 16:32:24 +0100 Subject: [PATCH 02/15] addons: properly shutdown unreliable consumers unreliable=zmq.PUB/SUB --- concert/experiments/base.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/concert/experiments/base.py b/concert/experiments/base.py index 3ab52a79a..ade49b5e2 100644 --- a/concert/experiments/base.py +++ b/concert/experiments/base.py @@ -41,10 +41,11 @@ class Consumer: :param corofunc_kwargs: a list or tuple of *corofunc* keyword arguemnts :param addon: a :class:`~concert.experiments.addons.Addon` object """ - def __init__(self, corofunc, corofunc_args=(), corofunc_kwargs=None, addon=None): + def __init__(self, corofunc, corofunc_args=(), corofunc_kwargs=None, addon=None, reliable=True): self._corofunc = corofunc self.addon = addon self.args = corofunc_args + self.reliable = reliable self.kwargs = {} if corofunc_kwargs is None else corofunc_kwargs @property @@ -173,7 +174,10 @@ async def cancel_and_wait(tasks): tasks = [start(producer_coro())] self._producer_task = tasks[0] - tasks += [start(consumer(None)) for consumer in self._consumers] + tasks += [start(consumer(None)) for consumer in self._consumers if consumer.reliable] + unreliable = set( + [start(consumer(None)) for consumer in self._consumers if not consumer.reliable] + ) LOG.debug( "`%s': starting producer `%s' and consumers %s", self.name, @@ -204,7 +208,7 @@ async def cancel_and_wait(tasks): # remotes might still be waiting for data, so cancel processing. Processing is # responsible for stopping the remote processing as well (e.g. call cancel_remote() on # a Tango addon)! - pending_result = await cancel_and_wait(tasks) + pending_result = await cancel_and_wait(tasks.union(unreliable)) LOG.debug( "`%s': `%s' during remote processing, results: `%s'", self.name, From 278438a49c8fdd207caf647d0bcc441d630bbd96 Mon Sep 17 00:00:00 2001 From: Tomas Farago Date: Fri, 21 Nov 2025 16:33:18 +0100 Subject: [PATCH 03/15] Viewer: enable metadata subscription E.g. sample's bounding box is broadcasted as metadata. --- concert/experiments/addons/tango.py | 8 +- concert/ext/viewers.py | 113 +++++++++++++++++++++------- 2 files changed, 89 insertions(+), 32 deletions(-) diff --git a/concert/experiments/addons/tango.py b/concert/experiments/addons/tango.py index 41185f3a5..1639010f5 100644 --- a/concert/experiments/addons/tango.py +++ b/concert/experiments/addons/tango.py @@ -104,20 +104,20 @@ async def __ainit__(self, viewer, endpoint, experiment, acquisitions=None): self._orig_limits = await viewer.get_limits() async def connect_endpoint(self): - self._viewer.subscribe(self.endpoint.client_endpoint) + self._viewer.subscribe(self.endpoint.client_endpoint, "image") async def disconnect_endpoint(self): - self._viewer.unsubscribe() + self._viewer.unsubscribe(self.endpoint.client_endpoint) @remote async def consume(self): try: if await self._viewer.get_limits() == 'stream': - self._viewer.unsubscribe() + self._viewer.unsubscribe(self.endpoint.client_endpoint) # Force viewer to update the limits by unsubscribing and re-subscribing after # setting limits to stream await self._viewer.set_limits('stream') - self._viewer.subscribe(self.endpoint.client_endpoint) + self._viewer.subscribe(self.endpoint.client_endpoint, "image") finally: self._orig_limits = await self._viewer.get_limits() diff --git a/concert/ext/viewers.py b/concert/ext/viewers.py index 68a1d06e0..10d66cf92 100644 --- a/concert/ext/viewers.py +++ b/concert/ext/viewers.py @@ -12,6 +12,7 @@ from concert.base import Parameterizable, Parameter from concert.coroutines.base import background, run_in_executor from concert.quantities import q +from concert.helpers import ImageWithMetadata LOG = logging.getLogger(__name__) @@ -142,13 +143,15 @@ def resume(self): """Resume the viewer.""" self._paused = False - def subscribe(self, address): + def subscribe(self, address, content): + if content not in ["image", "metadata"]: + raise ValueError("Content must be one of `image', `metadata'") self._ensure_updater_runs() - self._queue.put(('subscribe', address)) + self._queue.put(('subscribe', (address, content))) - def unsubscribe(self): + def unsubscribe(self, address): if self._proc and self._proc.is_alive(): - self._queue.put(('unsubscribe', None)) + self._queue.put(('unsubscribe', address)) def _ensure_updater_runs(self): """ @@ -333,6 +336,12 @@ def _make_updater(self): return _PyQtGraphUpdater(self._queue, limits=self._limits, title=self._title, show_refresh_rate=self._show_refresh_rate) + async def draw_rectangle(self, bbox): + self._queue.put(("sample-bbox", bbox)) + + async def clear_rectangle(self): + self._queue.put(("clear-bbox", None)) + class PyplotImageViewer(ImageViewerBase): @@ -394,7 +403,7 @@ class _ImageUpdaterBase(abc.ABC): """Image updated base.""" def __init__(self): - self._receiver = None + self._receivers = {} self._loop = None self.commands = { 'subscribe': self.subscribe, @@ -402,37 +411,57 @@ def __init__(self): 'title': self.change_title } - def subscribe(self, address): + def subscribe(self, data): import asyncio from concert.networking.base import ZmqReceiver + + endpoint, content = data + if not self._loop: self._loop = asyncio.get_event_loop() - if self._receiver: - self.unsubscribe() - self._receiver = self._loop.run_until_complete( - ZmqReceiver(endpoint=address, reliable=False, rcvhwm=1) + if endpoint in self._receivers: + self.unsubscribe(endpoint) + # Less polling time useful when there are multiple receivers + self._receivers[endpoint] = self._loop.run_until_complete( + ZmqReceiver(endpoint=endpoint, reliable=False, rcvhwm=1, polling_timeout=10 * q.ms) ) + # Tag receiver to receive specific content + self._receivers[endpoint].content = content def recv_array(self): - available = self._loop.run_until_complete(self._receiver.is_message_available()) - - if available: - meta, image = self._loop.run_until_complete(self._receiver.receive_image()) - if image is None: - # No actual image data available - available = False - else: - self.commands['image'](image) - - return available - - def unsubscribe(self, arg): + any_available = False + + for receiver in list(self._receivers.values()): + available = self._loop.run_until_complete(receiver.is_message_available()) + + if available: + if receiver.content == "image": + meta, image = self._loop.run_until_complete(receiver.receive_image()) + if image is None: + # No actual image data available + available = False + else: + self.commands['image'](image) + if receiver.content == "metadata": + meta = self._loop.run_until_complete(receiver.receive_json()) + for key, value in meta.items(): + if key in self.commands: + self.commands[key](value) + + if available: + any_available = True + + return any_available + + def unsubscribe(self, endpoint): if not self._loop: + # We are not subscribed return - if self._receiver: - self._loop.run_until_complete(self._receiver.close()) - self._receiver = None + if endpoint in self._receivers: + self._loop.run_until_complete(self._receivers[endpoint].close()) + + del self._receivers[endpoint] class _PyQtGraphUpdater(_ImageUpdaterBase): @@ -448,6 +477,7 @@ def __init__(self, queue: mp.Queue, limits: str = 'stream', title: str = "", self.show_refresh_rate = show_refresh_rate self.text = None self.plot = None + self.rect = None # main graphics window self.view = None self.last_text_time = time.perf_counter() @@ -457,6 +487,8 @@ def __init__(self, queue: mp.Queue, limits: str = 'stream', title: str = "", 'image': self.proces_image, 'clim': self.update_limits, 'show-fps': self.toggle_show_refresh_rate, + 'sample-bbox': self.process_bbox, + 'clear-bbox': self.clear_bbox, } ) @@ -468,7 +500,7 @@ def process(self): except Empty: # Taking orders has priority, but if no order is available on the queu then receive an # image if subscribed - if self._receiver: + if self._receivers: self.recv_array() def update_all(self, image): @@ -560,6 +592,31 @@ def toggle_show_refresh_rate(self, value): self.view.removeItem(self.text) self.text = None + def process_bbox(self, bbox): + if not self.view: + # No image displayed yet + return + + x_0 = bbox[0] + y_0 = bbox[1] + width = bbox[2] - bbox[0] + height = bbox[3] - bbox[1] + + if self.rect is None: + import pyqtgraph as pg + from pyqtgraph.Qt import QtWidgets + + self.rect = QtWidgets.QGraphicsRectItem(x_0, y_0, width, height) + self.rect.setPen(pg.mkPen('#5050D3', width=2)) + self.view.addItem(self.rect) + else: + self.rect.setRect(x_0, y_0, width, height) + + def clear_bbox(self, arg): + if self.view and self.rect: + self.view.removeItem(self.rect) + self.rect = None + def change_title(self, title): self.title = title if self.view: @@ -802,7 +859,7 @@ def __init__(self, queue: mp.Queue, imshow_kwargs: dict, limits: str = 'stream', def on_empty(self): """Try to process image from a socket.""" - if self._receiver: + if self._receivers: return self.recv_array() return False From 1d4b85e6ffddb1761b8941c1d83614e647e054e9 Mon Sep 17 00:00:00 2001 From: Tomas Farago Date: Fri, 21 Nov 2025 16:34:14 +0100 Subject: [PATCH 04/15] Add SampleDetector addon --- concert/experiments/addons/base.py | 23 +++ concert/experiments/addons/tango.py | 15 ++ concert/ext/cmd/tango.py | 5 +- .../ext/tangoservers/bin/TangoSampleDetect | 8 + concert/ext/tangoservers/sampledetect.py | 176 ++++++++++++++++++ 5 files changed, 226 insertions(+), 1 deletion(-) create mode 100644 concert/ext/tangoservers/bin/TangoSampleDetect create mode 100644 concert/ext/tangoservers/sampledetect.py diff --git a/concert/experiments/addons/base.py b/concert/experiments/addons/base.py index 16cde3411..7ff6ae1e9 100644 --- a/concert/experiments/addons/base.py +++ b/concert/experiments/addons/base.py @@ -114,6 +114,29 @@ async def _get_duration(self, acquisition_name): raise NotImplementedError +class SampleDetector(Addon): + + """Sample detection addon.""" + + async def __ainit__(self, experiment, acquisitions=None): + await super().__ainit__(experiment, acquisitions=acquisitions) + + def _make_consumers(self, acquisitions): + consumers = {} + + for acq in acquisitions: + consumers[acq] = AcquisitionConsumer( + self.stream_detect, + addon=self if self.stream_detect.remote else None, + reliable=False if self.stream_detect.remote else True + ) + + return consumers + + async def stream_detect(self): + raise NotImplementedError + + class ImageWriter(Addon): """An addon which writes images to disk. diff --git a/concert/experiments/addons/tango.py b/concert/experiments/addons/tango.py index 1639010f5..19e9128ba 100644 --- a/concert/experiments/addons/tango.py +++ b/concert/experiments/addons/tango.py @@ -81,6 +81,21 @@ async def _teardown(self): await self._device.reset() +class SampleDetector(TangoMixin, base.SampleDetector): + + async def __ainit__(self, experiment, device, endpoint, acquisitions=None): + await TangoMixin.__ainit__(self, device, endpoint) + await base.SampleDetector.__ainit__(self, experiment, acquisitions=acquisitions) + + @TangoMixin.cancel_remote + @remote + async def stream_detect(self): + await self._device.stream_detect() + + async def get_maximum_rectangle(self): + return await self._device.get_maximum_rectangle() + + class ImageWriter(TangoMixin, base.ImageWriter): async def __ainit__(self, experiment, endpoint, acquisitions=None): diff --git a/concert/ext/cmd/tango.py b/concert/ext/cmd/tango.py index eaf2d348b..a4ae9d334 100644 --- a/concert/ext/cmd/tango.py +++ b/concert/ext/cmd/tango.py @@ -1,6 +1,6 @@ from concert.session.utils import setup_logging, SubCommand -SERVER_NAMES = ['benchmarker', 'reco', 'walker'] +SERVER_NAMES = ['benchmarker', 'reco', 'walker', 'sampledetect'] class TangoCommand(SubCommand): @@ -58,6 +58,9 @@ def run(self, server: str, port: int, database: bool = False, device: [str, None if server == "walker": from concert.ext.tangoservers import walker server_class = {'class': walker.TangoRemoteWalker} + if server == "sampledetect": + from concert.ext.tangoservers import sampledetect + server_class = {'class': sampledetect.SampleDetect} setup_logging(server, to_stream=True, filename=logfile, loglevel=loglevel) diff --git a/concert/ext/tangoservers/bin/TangoSampleDetect b/concert/ext/tangoservers/bin/TangoSampleDetect new file mode 100644 index 000000000..c1e83d88f --- /dev/null +++ b/concert/ext/tangoservers/bin/TangoSampleDetect @@ -0,0 +1,8 @@ +#!/usr/bin/env python3 + +from concert.ext.tangoservers.sampledetect import SampleDetect +from tango import GreenMode + + +if __name__ == '__main__': + SampleDetect.run_server(green_mode=GreenMode.Asyncio) diff --git a/concert/ext/tangoservers/sampledetect.py b/concert/ext/tangoservers/sampledetect.py new file mode 100644 index 000000000..ce4bc8787 --- /dev/null +++ b/concert/ext/tangoservers/sampledetect.py @@ -0,0 +1,176 @@ +""" +Tango server for benchmarking zmq transfers. +""" +import numpy as np +import torch +import sys +from tango import CmdArgType, DebugIt, InfoIt +from tango.server import attribute, AttrWriteType, command +from .base import TangoRemoteProcessing +from concert.networking.base import ZmqSender +try: + from ultralytics import YOLO +except ImportError: + print("You must install ultralytics to use sample detection", file=sys.stderr) + + +class SampleDetect(TangoRemoteProcessing): + """ + Device server for elmo_main.py-controller based + """ + + model_path = attribute( + label="AI model path for sample detection", + dtype=str, + access=AttrWriteType.READ_WRITE, + fget="get_model_path", + fset="set_model_path" + ) + + min_confidence = attribute( + label="Minimum confidence for succesful detection [0 - 1]", + dtype=float, + access=AttrWriteType.READ_WRITE, + fget="get_min_confidence", + fset="set_min_confidence" + ) + + sending_port = attribute( + label="Port over which detected images will be sent", + dtype=int, + access=AttrWriteType.READ_WRITE, + fget="get_sending_port", + fset="set_sending_port" + ) + + async def init_device(self): + await super().init_device() + self._model_path = "" + self._model = None + self._min_confidence = 0.25 + self._sender = None + self._sending_port = 0 + + @InfoIt() + async def get_model_path(self): + """Get current model path.""" + return self._model_path + + @InfoIt(show_args=True) + async def set_model_path(self, path): + """Set model path.""" + self._model_path = path + self._model = YOLO(path) + + @InfoIt() + async def get_min_confidence(self): + """Get minimum confidence for detection.""" + return self._min_confidence + + @InfoIt(show_args=True) + async def set_min_confidence(self, confidence): + """Set minimum confidence for detection.""" + self._min_confidence = confidence + + @InfoIt() + async def get_sending_port(self): + """Get port over which images will be forwarded.""" + return self._sending_port + + @InfoIt(show_args=True) + async def set_sending_port(self, port): + """Set port over which images will be forwarded.""" + self._sending_port = port + self._sender = await ZmqSender(endpoint=f"tcp://*:{port}", reliable=False, sndhwm=1) + + @DebugIt() + @command() + async def stop_sending(self): + if self._sender: + await self._sender.close() + + @command(dtype_out=bool) + def is_cuda_available(self): + return torch.cuda.is_available() + + @DebugIt(show_args=True) + @command(dtype_in=CmdArgType.DevEncoded, dtype_out=[int, int, int, int]) + def sample_detect(self, data): + encoding, image = data + width, height, dtype = encoding.split("/") + image = np.frombuffer(image, dtype=dtype).reshape(int(height), int(width)) + bbox = self._sample_detect(image) + if bbox is None: + bbox = [0, 0, 0, 0] + + return bbox + + def _sample_detect(self, image): + device = "cuda:0" if torch.cuda.is_available() else "cpu" + result = self._model.predict( + source=_grayscale_to_rgb(image), max_det=1, conf=self._min_confidence, device=device + ) + if len(result[0]): + result = result[0].boxes[0] + box = result.xyxy[0].detach().cpu().numpy() + # Round x0 and y0 down and x1, y1 up + bbox = [ + int(np.floor(box[0])), + int(np.floor(box[1])), + int(np.ceil(box[2])), + int(np.ceil(box[3])) + ] + confidence = result.conf.detach().cpu().numpy()[0] + else: + bbox = None + confidence = 0 + + self.debug_stream("sample at: %s, confidence: %.3f", bbox, confidence) + + return bbox + + async def _stream_detect(self): + self._bboxes = [] + last = None + + async for image in self._receiver.subscribe(): + bbox = self._sample_detect(image) + if bbox is None: + bbox = [0, 0, 0, 0] + else: + self._bboxes.append(bbox) + if self._sender and last != bbox: + print("send", bbox) + await self._sender.send_json({"sample-bbox": bbox}) + last = bbox + + @DebugIt() + @command() + async def stream_detect(self): + await self._process_stream(self._stream_detect()) + + @DebugIt(show_ret=True) + @command(dtype_out=[int, int, int, int]) + async def get_maximum_rectangle(self): + if self._bboxes == []: + bbox = [0, 0, 0, 0] + else: + bboxes = np.array(self._bboxes) + bbox = [ + np.min(bboxes[:, 0]), + np.min(bboxes[:, 1]), + np.max(bboxes[:, 2]), + np.max(bboxes[:, 3]), + ] + + return bbox + + +def _grayscale_to_rgb(image, percentile=0.1): + lower = np.percentile(image, percentile) + upper = np.percentile(image, 100 - percentile) + + img_clipped = np.clip(image, lower, upper) + img_8bit = (((img_clipped - lower) / (upper - lower)) * 255).astype(np.uint8) + + return np.dstack((img_8bit,) * 3) From adca7d64c9bd404124d538209f259fbdbf2b4571 Mon Sep 17 00:00:00 2001 From: Chandan Sarkar Date: Tue, 25 Nov 2025 12:03:57 +0100 Subject: [PATCH 05/15] Include missing build dep --- Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/Dockerfile b/Dockerfile index 20a67b364..66eb1c80f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,6 +21,7 @@ RUN apt-get update && apt-get install -y \ libgirepository-1.0-dev \ ninja-build \ libzmq5-dev \ + libjson-c-dev \ libjson-glib-dev \ iputils-ping \ iproute2 \ From 855d468ad26a8ef76bea0cf9bd61384067235e36 Mon Sep 17 00:00:00 2001 From: Tomas Farago Date: Wed, 26 Nov 2025 11:51:21 +0100 Subject: [PATCH 06/15] sampledetect: return confidence --- concert/ext/tangoservers/sampledetect.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/concert/ext/tangoservers/sampledetect.py b/concert/ext/tangoservers/sampledetect.py index ce4bc8787..79876de3e 100644 --- a/concert/ext/tangoservers/sampledetect.py +++ b/concert/ext/tangoservers/sampledetect.py @@ -94,16 +94,23 @@ def is_cuda_available(self): return torch.cuda.is_available() @DebugIt(show_args=True) - @command(dtype_in=CmdArgType.DevEncoded, dtype_out=[int, int, int, int]) + @command(dtype_in=CmdArgType.DevEncoded, dtype_out=[int, int, int, int, int]) def sample_detect(self, data): + """ + Detect sample in image *data*. Returns the bounding box and confidence in one list: + [x0, y0, x1, y1, confidence]. Returned confidence is the original confidence from interval + [0, 1] multiplied by 1000 and converted to int. + """ encoding, image = data width, height, dtype = encoding.split("/") image = np.frombuffer(image, dtype=dtype).reshape(int(height), int(width)) - bbox = self._sample_detect(image) + bbox, confidence = self._sample_detect(image) if bbox is None: - bbox = [0, 0, 0, 0] + result = [0, 0, 0, 0, 0] + else: + result = bbox + [int(np.round(confidence * 1000))] - return bbox + return result def _sample_detect(self, image): device = "cuda:0" if torch.cuda.is_available() else "cpu" @@ -127,14 +134,14 @@ def _sample_detect(self, image): self.debug_stream("sample at: %s, confidence: %.3f", bbox, confidence) - return bbox + return (bbox, confidence) async def _stream_detect(self): self._bboxes = [] last = None async for image in self._receiver.subscribe(): - bbox = self._sample_detect(image) + bbox, confidence = self._sample_detect(image) if bbox is None: bbox = [0, 0, 0, 0] else: From cbf5d9d1fc4d92465f353b42505737d7fb48a26e Mon Sep 17 00:00:00 2001 From: Tomas Farago Date: Thu, 27 Nov 2025 17:46:24 +0100 Subject: [PATCH 07/15] addons: reco: Enable finding best slice online and resetting the manager. --- concert/experiments/addons/base.py | 13 ++++++++++++- concert/experiments/addons/local.py | 17 +++++++++++++++++ concert/experiments/addons/tango.py | 6 ++++++ concert/ext/tangoservers/reco.py | 21 ++++++++++++++++++++- 4 files changed, 55 insertions(+), 2 deletions(-) diff --git a/concert/experiments/addons/base.py b/concert/experiments/addons/base.py index 7ff6ae1e9..c205aebcc 100644 --- a/concert/experiments/addons/base.py +++ b/concert/experiments/addons/base.py @@ -456,6 +456,14 @@ async def update_flats(self, *args, **kwargs): async def get_volume_shape(self): ... + @abstractmethod + async def get_best_slice_index(self): + ... + + @abstractmethod + async def reset_manager(self): + ... + async def reconstruct(self, *args, **kwargs): await self._reconstruct(*args, **kwargs) await self._show_slice() @@ -470,7 +478,10 @@ async def rereconstruct(self, slice_directory=None): async def _show_slice(self): if self.viewer and len(await self.get_volume_shape()) == 3: - index = len(np.arange(*await self.get_region())) // 2 + if await self.get_z_parameter() == "center-position-x": + index = await self.get_best_slice_index() + else: + index = len(np.arange(*await self.get_region())) // 2 await self.viewer.show(await self.get_slice(z=index)) await self.viewer.set_title(await self.experiment.get_current_name()) diff --git a/concert/experiments/addons/local.py b/concert/experiments/addons/local.py index 638820c38..e4e77647e 100644 --- a/concert/experiments/addons/local.py +++ b/concert/experiments/addons/local.py @@ -1,5 +1,8 @@ +import functools +import multiprocessing import os import numpy as np +from multiprocessing.pool import ThreadPool from concert.coroutines.base import async_generate, background from concert.coroutines.sinks import Accumulate from concert.experiments.addons import base @@ -127,6 +130,20 @@ async def get_volume_shape(self): raise RuntimeError("Volume not available yet") return self._manager.volume.shape + async def get_best_slice_index(self): + def compute_sag_metric(volume, index): + return np.sum(np.abs(np.gradient(volume[index]))) + + pool = ThreadPool(processes=max(1, multiprocessing.cpu_count() - 2)) + func = functools.partial(compute_sag_metric, self._manager.volume) + result = pool.map(func, np.arange(self._manager.volume.shape[0])) + + return np.argmin(result) + + async def reset_manager(self): + if self._manager: + self._manager.reset() + async def _reconstruct(self, producer=None, slice_directory=None): if producer is None: await self._manager.backproject(async_generate(self._manager.projections)) diff --git a/concert/experiments/addons/tango.py b/concert/experiments/addons/tango.py index 19e9128ba..f16974ed2 100644 --- a/concert/experiments/addons/tango.py +++ b/concert/experiments/addons/tango.py @@ -197,6 +197,12 @@ async def update_flats(self): async def get_volume_shape(self): return await self._device.get_volume_shape() + async def get_best_slice_index(self): + return await self._device.get_best_slice_index() + + async def reset_manager(self): + await self._device.reset_manager() + @TangoMixin.cancel_remote async def _reconstruct(self, cached=False, slice_directory=None): path = "" diff --git a/concert/ext/tangoservers/reco.py b/concert/ext/tangoservers/reco.py index d4e9bb34f..453fe3083 100644 --- a/concert/ext/tangoservers/reco.py +++ b/concert/ext/tangoservers/reco.py @@ -1,4 +1,6 @@ """Tango device for online 3D reconstruction from zmq image stream.""" +import functools +import multiprocessing import numpy as np from tango import DebugIt, CmdArgType, PipeWriteType from tango.server import command, pipe @@ -7,6 +9,7 @@ from concert.ext.ufo import GeneralBackprojectManager, LocalGeneralBackprojectArgs from concert.networking.base import get_tango_device, ZmqSender from concert.storage import RemoteDirectoryWalker +from multiprocessing.pool import ThreadPool from ...config import DISTRIBUTED_TANGO_TIMEOUT MAX_DIM = 100000 @@ -108,7 +111,7 @@ async def init_device(self): @DebugIt() @command() async def reset_manager(self): - return await self._manager.reset() + return self._manager.reset() @DebugIt() @command(dtype_out=str) @@ -171,6 +174,18 @@ async def _reconstruct(self, cached=False, slice_directory=""): async def reconstruct(self, slice_directory): await self._reconstruct(cached=False, slice_directory=slice_directory) + @DebugIt() + @command(dtype_out=int) + async def get_best_slice_index(self): + if self._manager.volume is None: + raise RuntimeError("Volume is empty") + + pool = ThreadPool(processes=max(1, multiprocessing.cpu_count() - 2)) + func = functools.partial(compute_sag_metric, self._manager.volume) + result = pool.map(func, np.arange(self._manager.volume.shape[0])) + + return np.argmin(result) + @DebugIt(show_args=True) @command(dtype_in=str) async def rereconstruct(self, slice_directory): @@ -259,3 +274,7 @@ async def get_parameters(self): result.append(doc) return result + + +def compute_sag_metric(volume, index): + return np.sum(np.abs(np.gradient(volume[index]))) From 5c355f013b9e65fd194cb399d41709bd6fe1410f Mon Sep 17 00:00:00 2001 From: Tomas Farago Date: Thu, 27 Nov 2025 17:48:04 +0100 Subject: [PATCH 08/15] addons: sampledetect: add detect method for convenience. --- concert/experiments/addons/base.py | 4 ++++ concert/experiments/addons/tango.py | 8 ++++++++ 2 files changed, 12 insertions(+) diff --git a/concert/experiments/addons/base.py b/concert/experiments/addons/base.py index c205aebcc..2edb4de71 100644 --- a/concert/experiments/addons/base.py +++ b/concert/experiments/addons/base.py @@ -133,6 +133,10 @@ def _make_consumers(self, acquisitions): return consumers + async def detect(self, image): + """Detect sample in *image*.""" + raise NotImplementedError + async def stream_detect(self): raise NotImplementedError diff --git a/concert/experiments/addons/tango.py b/concert/experiments/addons/tango.py index f16974ed2..4d792911a 100644 --- a/concert/experiments/addons/tango.py +++ b/concert/experiments/addons/tango.py @@ -87,6 +87,14 @@ async def __ainit__(self, experiment, device, endpoint, acquisitions=None): await TangoMixin.__ainit__(self, device, endpoint) await base.SampleDetector.__ainit__(self, experiment, acquisitions=acquisitions) + async def detect(self, image): + encoding = f"{image.shape[1]}/{image.shape[0]}/{image.dtype}" + blob = image.tobytes() + + result = await self._device.sample_detect((encoding, blob)) + + return (result[:4], result[4] / 1000) + @TangoMixin.cancel_remote @remote async def stream_detect(self): From 48be4cc1c1ab1dc3642c99d646c0ff7acdf815b9 Mon Sep 17 00:00:00 2001 From: Tomas Farago Date: Thu, 27 Nov 2025 17:48:56 +0100 Subject: [PATCH 09/15] imageprocessing: add image conversion to different formats. --- concert/ext/tangoservers/sampledetect.py | 16 ++++--------- concert/imageprocessing.py | 29 ++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/concert/ext/tangoservers/sampledetect.py b/concert/ext/tangoservers/sampledetect.py index 79876de3e..28328263e 100644 --- a/concert/ext/tangoservers/sampledetect.py +++ b/concert/ext/tangoservers/sampledetect.py @@ -7,6 +7,7 @@ from tango import CmdArgType, DebugIt, InfoIt from tango.server import attribute, AttrWriteType, command from .base import TangoRemoteProcessing +from concert.imageprocessing import convert_image_to_nbit from concert.networking.base import ZmqSender try: from ultralytics import YOLO @@ -115,7 +116,10 @@ def sample_detect(self, data): def _sample_detect(self, image): device = "cuda:0" if torch.cuda.is_available() else "cpu" result = self._model.predict( - source=_grayscale_to_rgb(image), max_det=1, conf=self._min_confidence, device=device + source=convert_image_to_nbit(image, num_bits=8, num_channels=3), + max_det=1, + conf=self._min_confidence, + device=device ) if len(result[0]): result = result[0].boxes[0] @@ -171,13 +175,3 @@ async def get_maximum_rectangle(self): ] return bbox - - -def _grayscale_to_rgb(image, percentile=0.1): - lower = np.percentile(image, percentile) - upper = np.percentile(image, 100 - percentile) - - img_clipped = np.clip(image, lower, upper) - img_8bit = (((img_clipped - lower) / (upper - lower)) * 255).astype(np.uint8) - - return np.dstack((img_8bit,) * 3) diff --git a/concert/imageprocessing.py b/concert/imageprocessing.py index 434fc21e5..f6b0fddb3 100644 --- a/concert/imageprocessing.py +++ b/concert/imageprocessing.py @@ -473,3 +473,32 @@ def filter_low_frequencies(data, fwhm=32.): fltr = 1 - np.exp(- x ** 2 / (2 * f_sigma ** 2)) return np.fft.ifft(np.fft.fft(data) * fltr).real + mean + + +def convert_image_to_nbit(image, num_bits=8, percentile=0.1, num_channels=1): + """Convert *image* from any data type to an n-bit image. If *num_bits* <= 8, the output data + type is np.uint8, if it is <= 16, it is np.uint16, if <= 32, it is np.uint32, if <= 64, it is + np.uint64. Set black point to the gray value coresponding to *percentile* and set white point + to the gray value corresponding to 100 - *percenpercentile*. *num_channels* specifies how many + channels will the output have, which is the last dimension. If *num_channels* is 1, than the + output shape is (height, width), if it is more than one it is (height, width, *num_channels*). + You can create an RGB image by specifying num_bits=8 and num_channels=3, which leads to output + shape (height, width, 3). + """ + if num_bits <= 0 or num_bits > 64: + raise ValueError("num_bits must be in the range [1, 64]") + elif num_bits <= 8: + dtype = np.uint8 + elif num_bits <= 16: + dtype = np.uint16 + elif num_bits <= 32: + dtype = np.uint32 + elif dtype <= 64: + dtype = np.uint64 + + lower, upper = np.percentile(image, (percentile, 100 - percentile)) + + img_clipped = np.clip(image, lower, upper) + converted = (((img_clipped - lower) / (upper - lower)) * (2 ** num_bits - 1)).astype(dtype) + + return np.dstack((converted,) * num_channels) if num_channels > 1 else converted From 57fdc2661b8700cf6332a3850368fc756f80835e Mon Sep 17 00:00:00 2001 From: Tomas Farago Date: Thu, 27 Nov 2025 17:50:17 +0100 Subject: [PATCH 10/15] ufo: backproject manager resets shape --- concert/ext/ufo.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/concert/ext/ufo.py b/concert/ext/ufo.py index 669f106ee..e47eadfd7 100644 --- a/concert/ext/ufo.py +++ b/concert/ext/ufo.py @@ -513,6 +513,8 @@ class GeneralBackprojectManager(Parameterizable): async def __ainit__(self, args, average_normalization=True, regions=None, copy_inputs=False): await super().__ainit__() self.args = args + self._orig_width = args.width + self._orig_height = args.height self.regions = regions self.copy_inputs = copy_inputs self.projections = None @@ -822,6 +824,8 @@ def reset(self): self._flats_condition.done = False self._processing_task = None self._num_received_projections = self._num_processed_projections = 0 + self.args.width = self._orig_width + self.args.height = self._orig_height @background async def update_darks(self, producer): From e376fa5a2c12b6944c484f4bc6b712db79818dec Mon Sep 17 00:00:00 2001 From: Tomas Farago Date: Thu, 27 Nov 2025 17:50:48 +0100 Subject: [PATCH 11/15] storage: add write_image foo --- concert/ext/tangoservers/walker.py | 19 +++++++++++++++++++ concert/storage.py | 23 +++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/concert/ext/tangoservers/walker.py b/concert/ext/tangoservers/walker.py index 94eb0ee67..9a5e898c1 100644 --- a/concert/ext/tangoservers/walker.py +++ b/concert/ext/tangoservers/walker.py @@ -4,6 +4,7 @@ Implements a device server for file system traversal at remote host. """ import logging +import numpy as np import os from typing import Type, Dict, Tuple from tango import DebugIt, DevState, CmdArgType @@ -14,6 +15,7 @@ from concert.storage import StorageError from concert.storage import DirectoryWalker from concert.ext.tangoservers.base import TangoRemoteProcessing +from concert.writers import TiffWriter class TangoRemoteWalker(TangoRemoteProcessing): @@ -166,6 +168,23 @@ def ascend(self) -> None: def exists(self, paths: str) -> bool: return os.path.exists(os.path.join(self._current, *paths)) + @DebugIt(show_args=True) + @command(dtype_in=CmdArgType.DevEncoded) + async def write_image(self, data): + encoding, image = data + name, width, height, num_channels, dtype = encoding.split(":") + print(name, width, height, dtype) + num_channels = int(num_channels) + shape = (int(height), int(width)) + if num_channels > 1: + shape += (num_channels,) + image = np.frombuffer(image, dtype=dtype).reshape(shape) + writer = TiffWriter(os.path.join(self._current, name), 0) + try: + writer.write(image) + finally: + writer.close() + @DebugIt(show_args=True) @command(dtype_in=str) async def write_sequence(self, name): diff --git a/concert/storage.py b/concert/storage.py index c02e6aaa1..f22d7881a 100644 --- a/concert/storage.py +++ b/concert/storage.py @@ -284,6 +284,11 @@ async def create_writer(self, if name: await self.ascend() + @abstractmethod + async def write_image(self, image, filename): + """Write *image* to *filename*.""" + ... + @background async def write(self, producer: AsyncIterable[ArrayLike], @@ -362,6 +367,9 @@ async def register_logger(self, logger_name: str, log_level: int, """Provides a no-op logging handler as a placeholder""" return NoOpLoggingHandler() + async def write_image(self, image, filename): + pass + class DirectoryWalker(Walker): """ @@ -437,6 +445,13 @@ async def _create_writer(self, producer: AsyncIterable[ArrayLike], self._bytes_per_file ) + async def write_image(self, image, filename): + writer = TiffWriter(os.path.join(self._current, filename), 0) + try: + writer.write(image) + finally: + writer.close() + def _dset_exists(self, dsetname: str) -> bool: """Check if *dsetname* exists on the current level.""" if not re.match('.*{.*}.*', dsetname): @@ -601,6 +616,14 @@ async def _create_writer(self, producer: AsyncIterable[ArrayLike], await self.set_endpoint(old_endpoint) await self.device.write_attribute("dsetname", old_dsetname) + @background + async def write_image(self, image, filename): + num_channels = 1 if image.ndim == 2 else image.shape[2] + meta = f"{filename}:{image.shape[1]}:{image.shape[0]}:{num_channels}:{image.dtype}" + blob = image.tobytes() + + await self.device.write_image((meta, blob)) + @background async def write_sequence(self, name: Optional[str] = "") -> None: """ From 5b61834b1e99ee44fe5c862d5bdc69861e0815b2 Mon Sep 17 00:00:00 2001 From: Tomas Farago Date: Thu, 27 Nov 2025 17:59:38 +0100 Subject: [PATCH 12/15] SQUASH: Remove forgotten print --- concert/ext/tangoservers/walker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/concert/ext/tangoservers/walker.py b/concert/ext/tangoservers/walker.py index 9a5e898c1..a81f23285 100644 --- a/concert/ext/tangoservers/walker.py +++ b/concert/ext/tangoservers/walker.py @@ -173,7 +173,6 @@ def exists(self, paths: str) -> bool: async def write_image(self, data): encoding, image = data name, width, height, num_channels, dtype = encoding.split(":") - print(name, width, height, dtype) num_channels = int(num_channels) shape = (int(height), int(width)) if num_channels > 1: From 76dd89dc00018990e74c7642650a9a0fb5a8bd1a Mon Sep 17 00:00:00 2001 From: Tomas Farago Date: Fri, 28 Nov 2025 11:40:08 +0100 Subject: [PATCH 13/15] SQUASH: cache best slice --- concert/experiments/addons/base.py | 14 +++++++++++++- concert/experiments/addons/local.py | 2 +- concert/experiments/addons/tango.py | 2 +- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/concert/experiments/addons/base.py b/concert/experiments/addons/base.py index 2edb4de71..21b8bdf49 100644 --- a/concert/experiments/addons/base.py +++ b/concert/experiments/addons/base.py @@ -343,6 +343,7 @@ async def __ainit__(self, proxy, experiment, acquisitions=None, do_normalization self.walker = experiment.walker self._slice_directory = None self.viewer = viewer + self._best_slice = None await super().__ainit__(experiment=experiment, acquisitions=acquisitions) await self.set_slice_directory(slice_directory) await self.register_args() @@ -461,14 +462,24 @@ async def get_volume_shape(self): ... @abstractmethod - async def get_best_slice_index(self): + async def _get_best_slice_index(self): ... + async def get_best_slice_index(self): + if await self.get_z_parameter() == "center-position-x": + if self._best_slice is None: + self._best_slice = await self._get_best_slice_index() + return self._best_slice + else: + # Nothing to be computed, but let us return a reasonable default + return 0 + @abstractmethod async def reset_manager(self): ... async def reconstruct(self, *args, **kwargs): + self._best_slice = None await self._reconstruct(*args, **kwargs) await self._show_slice() @@ -477,6 +488,7 @@ async def rereconstruct(self, slice_directory=None): """Rereconstruct cached projections and saved them to *slice_directory*, which is a full path. """ + self._best_slice = None await self._rereconstruct(slice_directory=slice_directory) await self._show_slice() diff --git a/concert/experiments/addons/local.py b/concert/experiments/addons/local.py index e4e77647e..0c802a7e6 100644 --- a/concert/experiments/addons/local.py +++ b/concert/experiments/addons/local.py @@ -130,7 +130,7 @@ async def get_volume_shape(self): raise RuntimeError("Volume not available yet") return self._manager.volume.shape - async def get_best_slice_index(self): + async def _get_best_slice_index(self): def compute_sag_metric(volume, index): return np.sum(np.abs(np.gradient(volume[index]))) diff --git a/concert/experiments/addons/tango.py b/concert/experiments/addons/tango.py index 4d792911a..82ea93a77 100644 --- a/concert/experiments/addons/tango.py +++ b/concert/experiments/addons/tango.py @@ -205,7 +205,7 @@ async def update_flats(self): async def get_volume_shape(self): return await self._device.get_volume_shape() - async def get_best_slice_index(self): + async def _get_best_slice_index(self): return await self._device.get_best_slice_index() async def reset_manager(self): From f581492aa119847f1b4ee7a9f6ff96fba69ce761 Mon Sep 17 00:00:00 2001 From: Tomas Farago Date: Tue, 2 Dec 2025 16:49:28 +0100 Subject: [PATCH 14/15] SQUASH: Percentile detect --- concert/ext/tangoservers/sampledetect.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/concert/ext/tangoservers/sampledetect.py b/concert/ext/tangoservers/sampledetect.py index 28328263e..87568f5fe 100644 --- a/concert/ext/tangoservers/sampledetect.py +++ b/concert/ext/tangoservers/sampledetect.py @@ -51,6 +51,7 @@ async def init_device(self): self._min_confidence = 0.25 self._sender = None self._sending_port = 0 + self._bboxes = [] @InfoIt() async def get_model_path(self): @@ -151,7 +152,6 @@ async def _stream_detect(self): else: self._bboxes.append(bbox) if self._sender and last != bbox: - print("send", bbox) await self._sender.send_json({"sample-bbox": bbox}) last = bbox @@ -161,17 +161,18 @@ async def stream_detect(self): await self._process_stream(self._stream_detect()) @DebugIt(show_ret=True) - @command(dtype_out=[int, int, int, int]) - async def get_maximum_rectangle(self): + @command(dtype_in=float, dtype_out=[int, int, int, int]) + async def get_maximum_rectangle(self, percentile): if self._bboxes == []: bbox = [0, 0, 0, 0] else: bboxes = np.array(self._bboxes) + minmax = np.percentile(bboxes, (percentile, 100 - percentile), axis=0) bbox = [ - np.min(bboxes[:, 0]), - np.min(bboxes[:, 1]), - np.max(bboxes[:, 2]), - np.max(bboxes[:, 3]), + int(minmax[0, 0]), + int(minmax[0, 1]), + int(minmax[1, 2]), + int(minmax[1, 3]), ] return bbox From e1d2bce35d5303cc108accde8477df1bea85fe24 Mon Sep 17 00:00:00 2001 From: Tomas Farago Date: Thu, 4 Dec 2025 09:39:52 +0100 Subject: [PATCH 15/15] SQUASH: SampleDetector takes percentile --- concert/experiments/addons/tango.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/concert/experiments/addons/tango.py b/concert/experiments/addons/tango.py index 82ea93a77..6ccb9d5b6 100644 --- a/concert/experiments/addons/tango.py +++ b/concert/experiments/addons/tango.py @@ -100,8 +100,8 @@ async def detect(self, image): async def stream_detect(self): await self._device.stream_detect() - async def get_maximum_rectangle(self): - return await self._device.get_maximum_rectangle() + async def get_maximum_rectangle(self, percentile=10.0): + return await self._device.get_maximum_rectangle(percentile) class ImageWriter(TangoMixin, base.ImageWriter):