-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsensor.py
More file actions
127 lines (103 loc) · 4.76 KB
/
sensor.py
File metadata and controls
127 lines (103 loc) · 4.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
"""Platform for sensor integration."""
from __future__ import annotations
import asyncio
import logging
from datetime import timedelta
from typing import Any
from homeassistant.components.sensor import SensorEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_NAME, CONF_HOST, EVENT_HOMEASSISTANT_STARTED
from homeassistant.core import HomeAssistant, CoreState
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity import generate_entity_id
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.util import slugify
from .const import DOMAIN, CONF_SERVICE
from .coordinator import SshDockerCoordinator, STATE_UNKNOWN
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(hours=24)
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up SSH Docker sensor platform from a config entry."""
coordinator: SshDockerCoordinator = hass.data[DOMAIN][entry.entry_id]
sensor = DockerContainerSensor(coordinator, entry, hass)
async_add_entities([sensor])
class DockerContainerSensor(SensorEntity):
"""Sensor representing a Docker container on a remote host.
All I/O is delegated to the ``SshDockerCoordinator``; this class only
reflects the coordinator's state in the HA entity model.
"""
_attr_has_entity_name = True
_attr_translation_key = "state"
_attr_should_poll = True
def __init__(
self,
coordinator: SshDockerCoordinator,
entry: ConfigEntry,
hass: HomeAssistant,
) -> None:
"""Initialize the sensor."""
super().__init__()
self.coordinator = coordinator
self.entry = entry
self.hass = hass
self._name = entry.data[CONF_NAME]
# service is the container name used in docker commands; falls back to
# name for backwards compatibility with entries created before the split.
self._service = entry.data.get(CONF_SERVICE, self._name)
self._attr_unique_id = f"{entry.entry_id}_state"
self.entity_id = generate_entity_id(
"sensor.ssh_docker_{}", slugify(self._name), hass=hass
)
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, entry.entry_id)},
manufacturer="SSH Docker",
model="Docker Container",
name=self._name,
)
@property
def native_value(self) -> str:
"""Return current state, preferring coordinator's pending state when set."""
return self.coordinator.pending_state or self.coordinator.data.get(
"state", STATE_UNKNOWN
)
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return state attributes sourced from the coordinator."""
return self.coordinator.data.get("attributes", {})
async def async_added_to_hass(self) -> None:
"""Register coordinator listener and schedule the first refresh."""
await super().async_added_to_hass()
# When coordinator data changes (including pending state transitions),
# push the new state to HA immediately without waiting for the next poll.
self.async_on_remove(
self.coordinator.async_add_listener(self.async_write_ha_state)
)
# Show a transitional state so the UI doesn't sit on "unknown" while
# the first SSH fetch is queued/in-progress.
self.coordinator.set_pending_state("initializing")
_host = self.entry.options.get(CONF_HOST, "")
if self.hass.state == CoreState.running:
# HA is already up — only one entry initializes at a time, no stagger needed.
async def _immediate_update():
await self.async_update_ha_state(force_refresh=True)
self.hass.async_create_task(_immediate_update())
else:
# During startup all entries on the same host race to initialize; stagger
# them to spread the SSH load across the remote host.
_same_host_count = sum(
1 for e in self.hass.config_entries.async_entries(DOMAIN)
if e.options.get(CONF_HOST, "") == _host
)
stagger_secs = abs(hash(self.entry.entry_id)) % max(_same_host_count, 1)
async def _staggered_update(_event=None):
if stagger_secs > 0:
await asyncio.sleep(stagger_secs)
await self.async_update_ha_state(force_refresh=True)
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, _staggered_update)
async def async_update(self) -> None:
"""Delegate data fetching to the coordinator."""
await self.coordinator.async_request_refresh()