Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ RUN apt-get update && apt-get install -y \
libgirepository-1.0-dev \
ninja-build \
libzmq5-dev \
libjson-c-dev \
libjson-glib-dev \
iputils-ping \
iproute2 \
Expand Down
6 changes: 3 additions & 3 deletions concert/devices/cameras/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ async def unregister_endpoint(self, endpoint: CommData) -> None:
:type endpoint: concert.helpers.CommData
"""
if endpoint in self._senders:
self._senders[endpoint].close()
await self._senders[endpoint].close()
del self._senders[endpoint]

async def register_endpoint(self, endpoint: CommData) -> None:
Expand All @@ -266,15 +266,15 @@ async def register_endpoint(self, endpoint: CommData) -> None:
if endpoint in self._senders:
raise ValueError("zmq endpoint already in list")

self._senders[endpoint] = ZmqSender(
self._senders[endpoint] = await ZmqSender(
endpoint.server_endpoint,
reliable=endpoint.socket_type == zmq.PUSH,
sndhwm=endpoint.sndhwm
)

async def unregister_all(self) -> None:
for sender in self._senders.values():
sender.close()
await sender.close()
self._senders = {}

@abstractmethod
Expand Down
52 changes: 51 additions & 1 deletion concert/experiments/addons/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,33 @@ async def _get_duration(self, acquisition_name):
raise NotImplementedError


class SampleDetector(Addon):

"""Sample detection addon."""

async def __ainit__(self, experiment, acquisitions=None):
await super().__ainit__(experiment, acquisitions=acquisitions)

def _make_consumers(self, acquisitions):
consumers = {}

for acq in acquisitions:
consumers[acq] = AcquisitionConsumer(
self.stream_detect,
addon=self if self.stream_detect.remote else None,
reliable=False if self.stream_detect.remote else True
)

return consumers

async def detect(self, image):
"""Detect sample in *image*."""
raise NotImplementedError

async def stream_detect(self):
raise NotImplementedError


class ImageWriter(Addon):

"""An addon which writes images to disk.
Expand Down Expand Up @@ -316,6 +343,7 @@ async def __ainit__(self, proxy, experiment, acquisitions=None, do_normalization
self.walker = experiment.walker
self._slice_directory = None
self.viewer = viewer
self._best_slice = None
await super().__ainit__(experiment=experiment, acquisitions=acquisitions)
await self.set_slice_directory(slice_directory)
await self.register_args()
Expand Down Expand Up @@ -433,7 +461,25 @@ async def update_flats(self, *args, **kwargs):
async def get_volume_shape(self):
...

@abstractmethod
async def _get_best_slice_index(self):
...

async def get_best_slice_index(self):
if await self.get_z_parameter() == "center-position-x":
if self._best_slice is None:
self._best_slice = await self._get_best_slice_index()
return self._best_slice
else:
# Nothing to be computed, but let us return a reasonable default
return 0

@abstractmethod
async def reset_manager(self):
...

async def reconstruct(self, *args, **kwargs):
self._best_slice = None
await self._reconstruct(*args, **kwargs)
await self._show_slice()

Expand All @@ -442,12 +488,16 @@ async def rereconstruct(self, slice_directory=None):
"""Rereconstruct cached projections and saved them to *slice_directory*, which is a full
path.
"""
self._best_slice = None
await self._rereconstruct(slice_directory=slice_directory)
await self._show_slice()

async def _show_slice(self):
if self.viewer and len(await self.get_volume_shape()) == 3:
index = len(np.arange(*await self.get_region())) // 2
if await self.get_z_parameter() == "center-position-x":
index = await self.get_best_slice_index()
else:
index = len(np.arange(*await self.get_region())) // 2
await self.viewer.show(await self.get_slice(z=index))
await self.viewer.set_title(await self.experiment.get_current_name())

Expand Down
17 changes: 17 additions & 0 deletions concert/experiments/addons/local.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import functools
import multiprocessing
import os
import numpy as np
from multiprocessing.pool import ThreadPool
from concert.coroutines.base import async_generate, background
from concert.coroutines.sinks import Accumulate
from concert.experiments.addons import base
Expand Down Expand Up @@ -127,6 +130,20 @@ async def get_volume_shape(self):
raise RuntimeError("Volume not available yet")
return self._manager.volume.shape

async def _get_best_slice_index(self):
def compute_sag_metric(volume, index):
return np.sum(np.abs(np.gradient(volume[index])))

pool = ThreadPool(processes=max(1, multiprocessing.cpu_count() - 2))
func = functools.partial(compute_sag_metric, self._manager.volume)
result = pool.map(func, np.arange(self._manager.volume.shape[0]))

return np.argmin(result)

async def reset_manager(self):
if self._manager:
self._manager.reset()

async def _reconstruct(self, producer=None, slice_directory=None):
if producer is None:
await self._manager.backproject(async_generate(self._manager.projections))
Expand Down
37 changes: 33 additions & 4 deletions concert/experiments/addons/tango.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,29 @@ async def _teardown(self):
await self._device.reset()


class SampleDetector(TangoMixin, base.SampleDetector):

async def __ainit__(self, experiment, device, endpoint, acquisitions=None):
await TangoMixin.__ainit__(self, device, endpoint)
await base.SampleDetector.__ainit__(self, experiment, acquisitions=acquisitions)

async def detect(self, image):
encoding = f"{image.shape[1]}/{image.shape[0]}/{image.dtype}"
blob = image.tobytes()

result = await self._device.sample_detect((encoding, blob))

return (result[:4], result[4] / 1000)

@TangoMixin.cancel_remote
@remote
async def stream_detect(self):
await self._device.stream_detect()

async def get_maximum_rectangle(self, percentile=10.0):
return await self._device.get_maximum_rectangle(percentile)


class ImageWriter(TangoMixin, base.ImageWriter):

async def __ainit__(self, experiment, endpoint, acquisitions=None):
Expand All @@ -104,20 +127,20 @@ async def __ainit__(self, viewer, endpoint, experiment, acquisitions=None):
self._orig_limits = await viewer.get_limits()

async def connect_endpoint(self):
self._viewer.subscribe(self.endpoint.client_endpoint)
self._viewer.subscribe(self.endpoint.client_endpoint, "image")

async def disconnect_endpoint(self):
self._viewer.unsubscribe()
self._viewer.unsubscribe(self.endpoint.client_endpoint)

@remote
async def consume(self):
try:
if await self._viewer.get_limits() == 'stream':
self._viewer.unsubscribe()
self._viewer.unsubscribe(self.endpoint.client_endpoint)
# Force viewer to update the limits by unsubscribing and re-subscribing after
# setting limits to stream
await self._viewer.set_limits('stream')
self._viewer.subscribe(self.endpoint.client_endpoint)
self._viewer.subscribe(self.endpoint.client_endpoint, "image")
finally:
self._orig_limits = await self._viewer.get_limits()

Expand Down Expand Up @@ -182,6 +205,12 @@ async def update_flats(self):
async def get_volume_shape(self):
return await self._device.get_volume_shape()

async def _get_best_slice_index(self):
return await self._device.get_best_slice_index()

async def reset_manager(self):
await self._device.reset_manager()

@TangoMixin.cancel_remote
async def _reconstruct(self, cached=False, slice_directory=None):
path = ""
Expand Down
10 changes: 7 additions & 3 deletions concert/experiments/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ class Consumer:
:param corofunc_kwargs: a list or tuple of *corofunc* keyword arguemnts
:param addon: a :class:`~concert.experiments.addons.Addon` object
"""
def __init__(self, corofunc, corofunc_args=(), corofunc_kwargs=None, addon=None):
def __init__(self, corofunc, corofunc_args=(), corofunc_kwargs=None, addon=None, reliable=True):
self._corofunc = corofunc
self.addon = addon
self.args = corofunc_args
self.reliable = reliable
self.kwargs = {} if corofunc_kwargs is None else corofunc_kwargs

@property
Expand Down Expand Up @@ -173,7 +174,10 @@ async def cancel_and_wait(tasks):

tasks = [start(producer_coro())]
self._producer_task = tasks[0]
tasks += [start(consumer(None)) for consumer in self._consumers]
tasks += [start(consumer(None)) for consumer in self._consumers if consumer.reliable]
unreliable = set(
[start(consumer(None)) for consumer in self._consumers if not consumer.reliable]
)
LOG.debug(
"`%s': starting producer `%s' and consumers %s",
self.name,
Expand Down Expand Up @@ -204,7 +208,7 @@ async def cancel_and_wait(tasks):
# remotes might still be waiting for data, so cancel processing. Processing is
# responsible for stopping the remote processing as well (e.g. call cancel_remote() on
# a Tango addon)!
pending_result = await cancel_and_wait(tasks)
pending_result = await cancel_and_wait(tasks.union(unreliable))
LOG.debug(
"`%s': `%s' during remote processing, results: `%s'",
self.name,
Expand Down
5 changes: 4 additions & 1 deletion concert/ext/cmd/tango.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from concert.session.utils import setup_logging, SubCommand

SERVER_NAMES = ['benchmarker', 'reco', 'walker']
SERVER_NAMES = ['benchmarker', 'reco', 'walker', 'sampledetect']


class TangoCommand(SubCommand):
Expand Down Expand Up @@ -58,6 +58,9 @@ def run(self, server: str, port: int, database: bool = False, device: [str, None
if server == "walker":
from concert.ext.tangoservers import walker
server_class = {'class': walker.TangoRemoteWalker}
if server == "sampledetect":
from concert.ext.tangoservers import sampledetect
server_class = {'class': sampledetect.SampleDetect}

setup_logging(server, to_stream=True, filename=logfile, loglevel=loglevel)

Expand Down
Loading