Skip to content

Add v1 of concept inventory, to be curated and extended.#692

Open
SimonHeybrock wants to merge 3 commits intomainfrom
concept-inventory
Open

Add v1 of concept inventory, to be curated and extended.#692
SimonHeybrock wants to merge 3 commits intomainfrom
concept-inventory

Conversation

@SimonHeybrock
Copy link
Member

Mostly generated overview of concepts/topics to cover.

Document how "latest value wins" semantics at every stage (Kafka queue,
cached plotter state, dirty flags) provide implicit backpressure handling.
Slow sessions gracefully degrade to lower frame rates without blocking
other sessions or the data pipeline.

Cross-reference from D3 (dirty-flag polling) to the new entry.

Prompt: Find out how the dashboard handles backpressure [...] Can we add some info in #692?
Follow-up: Don't comment on the PR. Checkout the branch in a worktree, change the .md there.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@nvaytet nvaytet self-assigned this Feb 27, 2026
Copy link
Member

@nvaytet nvaytet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally very useful stuff. You mentioned before that every time you tried to make some documentation describing how livedata works, it became out of date a few days later.
Do we have the same issue here?


### A1. Service Topology and Data Flow

The system is a pipeline of independent processes communicating via Kafka. Upstream (raw instrument data) flows through four processing services (monitor_data, detector_data, data_reduction, timeseries) which publish results to downstream topics. The dashboard consumes downstream data for visualization and publishes commands/ROI definitions back upstream. Each processing service is a separate OS process with its own OrchestratingProcessor.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Easier to review/comment with one sentence per line 🙏


### A1. Service Topology and Data Flow

The system is a pipeline of independent processes communicating via Kafka. Upstream (raw instrument data) flows through four processing services (monitor_data, detector_data, data_reduction, timeseries) which publish results to downstream topics. The dashboard consumes downstream data for visualization and publishes commands/ROI definitions back upstream. Each processing service is a separate OS process with its own OrchestratingProcessor.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The system is a pipeline of independent processes communicating via Kafka. Upstream (raw instrument data) flows through four processing services (monitor_data, detector_data, data_reduction, timeseries) which publish results to downstream topics. The dashboard consumes downstream data for visualization and publishes commands/ROI definitions back upstream. Each processing service is a separate OS process with its own OrchestratingProcessor.
The system is a pipeline of independent processes communicating via Kafka. Upstream (raw instrument) data flows through four processing services (monitor_data, detector_data, data_reduction, timeseries) which publish results to downstream topics. The dashboard consumes downstream data for visualization and publishes commands/ROI definitions back upstream. Each processing service is a separate OS process with its own OrchestratingProcessor.


### A2. Timestamps and the Time Model

All data timestamps are nanoseconds since epoch (UTC), carried in `Message.timestamp`. They originate from the instrument's event formation unit (EFU) or slow-control system, not from wall clocks. The `SimpleMessageBatcher` uses data timestamps as its clock: batches close only when a message with a future timestamp arrives. Wall clock is used only for heartbeats (2s), metrics logging (30s), and sleep durations. Time coordinates (`start_time`, `end_time`) are added to workflow outputs for lag calculation in the dashboard.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heartbeats (2s), metrics logging (30s)

What do the durations in brackets represent here?


### A6. Two-Phase Workflow Registration

Workflow registration is split into lightweight spec registration (Phase 1: happens at import time, registers `WorkflowSpec` with metadata, params models, and output templates) and heavy factory attachment (Phase 2: happens at runtime via `load_factories()`, imports scientific libraries and attaches factory callables). This avoids slow imports of `essdiffraction`, `essspectroscopy`, etc. at startup.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This avoids slow imports of essdiffraction, essspectroscopy, etc. at startup.

Are the imports really that slow? I remember we talked about using lazy_loader in all packages at some point?


### B1. Service-Processor-Workflow Pattern

Each backend service is a `Service` wrapping a single `Processor` (always `OrchestratingProcessor` in production). The processor orchestrates the full pipeline: message separation (commands vs data), batching, preprocessing, job routing, result computation, and publishing. The `Service` manages the OS-level lifecycle (signal handling, threading, graceful shutdown).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know how difficult it would be to keep up to date, but could it be useful to also add links to either where these classes are defined, or to the API reference docs, where we could click on Service to see more info?


### B3. Accumulators and Preprocessing

Accumulators (`Accumulator[T, U]`) are per-stream stateful objects that transform and accumulate raw messages into workflow-consumable form. A `PreprocessorFactory` creates the right accumulator for each `StreamId`. Key accumulators: `Cumulative` (sums DataArrays), `CollectTOA` (concatenates event arrays), `ToNXevent_data` (builds NeXus event format), `ToNXlog` (builds NeXus log format with doubling-capacity buffer), `LatestValueHandler` (keeps latest value only).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Accumulators (`Accumulator[T, U]`) are per-stream stateful objects that transform and accumulate raw messages into workflow-consumable form. A `PreprocessorFactory` creates the right accumulator for each `StreamId`. Key accumulators: `Cumulative` (sums DataArrays), `CollectTOA` (concatenates event arrays), `ToNXevent_data` (builds NeXus event format), `ToNXlog` (builds NeXus log format with doubling-capacity buffer), `LatestValueHandler` (keeps latest value only).
Accumulators (`Accumulator[T, U]`) are per-stream stateful objects that transform and accumulate raw messages into a workflow-consumable form. A `PreprocessorFactory` creates the right accumulator for each `StreamId`. Key accumulators: `Cumulative` (sums DataArrays), `CollectTOA` (concatenates event arrays), `ToNXevent_data` (builds NeXus event format), `ToNXlog` (builds NeXus log format with doubling-capacity buffer), `LatestValueHandler` (keeps latest value only).


### B3. Accumulators and Preprocessing

Accumulators (`Accumulator[T, U]`) are per-stream stateful objects that transform and accumulate raw messages into workflow-consumable form. A `PreprocessorFactory` creates the right accumulator for each `StreamId`. Key accumulators: `Cumulative` (sums DataArrays), `CollectTOA` (concatenates event arrays), `ToNXevent_data` (builds NeXus event format), `ToNXlog` (builds NeXus log format with doubling-capacity buffer), `LatestValueHandler` (keeps latest value only).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clickable links to the accumulators would also be useful here.


The `f144_attribute_registry` on `Instrument` maps log stream source names to metadata (Pydantic `LogDataAttributes`: unit, dtype, optional `value_min`/`value_max`). Only log streams present in this registry are preprocessed; unknown sources are dropped. This controls which slow-control parameters are available in the system.

### G9. Backpressure and Graceful Degradation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I missed it further up, but a short description of how we handle cases where one of the services goes down?
For example a plotter is relying on a job to get new data, but the job goes down. Does it crash the plotter? Does the job try to restart automatically?

@SimonHeybrock
Copy link
Member Author

Generally very useful stuff. You mentioned before that every time you tried to make some documentation describing how livedata works, it became out of date a few days later. Do we have the same issue here?

This was mainly meant as an overview of topics we might want to cover in the "seminars" about ESSlivedata, not to be meant for reading linearly. And (I hope) the difference is that while in the past we made detailed architecture docs, here we focus on concepts which (hopefully) change less frequently, or if they do it has to be very deliberate.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants