Skip to content
119 changes: 55 additions & 64 deletions src/wyzeapy/services/base_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
olive_create_get_payload_irrigation,
olive_create_post_payload_irrigation_stop,
olive_create_post_payload_irrigation_quickrun,
olive_create_get_payload_irrigation_schedule_runs,
)
from ..types import PropertyIDs, Device, DeviceMgmtToggleType
from ..utils import (
Expand Down Expand Up @@ -907,13 +906,13 @@ async def _get_zone_by_device(self, url: str, device: Device) -> Dict[Any, Any]:
payload = olive_create_get_payload_irrigation(device.mac)
signature = olive_create_signature(payload, self._auth_lib.token.access_token)
headers = {
"Accept-Encoding": "gzip",
"User-Agent": "myapp",
"appid": OLIVE_APP_ID,
"appinfo": APP_INFO,
"phoneid": PHONE_ID,
"access_token": self._auth_lib.token.access_token,
"signature2": signature,
'Accept-Encoding': 'gzip',
'User-Agent': 'myapp',
'appid': OLIVE_APP_ID,
'appinfo': APP_INFO,
'phoneid': PHONE_ID,
'access_token': self._auth_lib.token.access_token,
'signature2': signature
}

response_json = await self._auth_lib.get(url, headers=headers, params=payload)
Expand All @@ -922,87 +921,79 @@ async def _get_zone_by_device(self, url: str, device: Device) -> Dict[Any, Any]:

return response_json

async def _stop_running_schedule(
self, url: str, device: Device, action: str
) -> Dict[Any, Any]:
async def _get_schedule_runs(self, url: str, device: Device, limit: int) -> Dict[Any, Any]:
"""Get schedule runs from the irrigation API."""
await self._auth_lib.refresh_if_should()

payload = olive_create_post_payload_irrigation_stop(device.mac, action)
signature = olive_create_signature(
json.dumps(payload, separators=(",", ":")),
self._auth_lib.token.access_token,
)
nonce = str(int(time.time() * 1000))
payload = {
'device_id': device.mac,
'limit': str(limit),
'nonce': nonce
}

# Create signature from the actual payload being sent
signature = olive_create_signature(payload, self._auth_lib.token.access_token)

headers = {
"Accept-Encoding": "gzip",
"Content-Type": "application/json",
"User-Agent": "myapp",
"appid": OLIVE_APP_ID,
"appinfo": APP_INFO,
"phoneid": PHONE_ID,
"access_token": self._auth_lib.token.access_token,
"signature2": signature,
'Accept-Encoding': 'gzip',
'User-Agent': 'myapp',
'appid': OLIVE_APP_ID,
'appinfo': APP_INFO,
'phoneid': PHONE_ID,
'access_token': self._auth_lib.token.access_token,
'signature2': signature
}

payload_str = json.dumps(payload, separators=(",", ":"))
response_json = await self._auth_lib.post(
url, headers=headers, data=payload_str
)
response_json = await self._auth_lib.get(url, headers=headers, params=payload)

check_for_errors_iot(self, response_json)

return response_json

async def _start_zone(
self, url: str, device: Device, zone_number: int, duration: int
) -> Dict[Any, Any]:
async def _stop_running_schedule(self, url: str, device: Device, action: str) -> Dict[Any, Any]:
await self._auth_lib.refresh_if_should()

payload = olive_create_post_payload_irrigation_quickrun(
device.mac, zone_number, duration
)
signature = olive_create_signature(
json.dumps(payload, separators=(",", ":")),
self._auth_lib.token.access_token,
)
payload = olive_create_post_payload_irrigation_stop(device.mac, action)
signature = olive_create_signature(json.dumps(payload, separators=(',', ':')),
self._auth_lib.token.access_token)
headers = {
"Accept-Encoding": "gzip",
"Content-Type": "application/json",
"User-Agent": "myapp",
"appid": OLIVE_APP_ID,
"appinfo": APP_INFO,
"phoneid": PHONE_ID,
"access_token": self._auth_lib.token.access_token,
"signature2": signature,
'Accept-Encoding': 'gzip',
'Content-Type': 'application/json',
'User-Agent': 'myapp',
'appid': OLIVE_APP_ID,
'appinfo': APP_INFO,
'phoneid': PHONE_ID,
'access_token': self._auth_lib.token.access_token,
'signature2': signature
}

payload_str = json.dumps(payload, separators=(",", ":"))
response_json = await self._auth_lib.post(
url, headers=headers, data=payload_str
)
payload_str = json.dumps(payload, separators=(',', ':'))
response_json = await self._auth_lib.post(url, headers=headers, data=payload_str)

check_for_errors_iot(self, response_json)

return response_json

async def _get_schedule_runs(
self, url: str, device: Device, limit: int = 2
) -> Dict[Any, Any]:
async def _start_zone(self, url: str, device: Device, zone_number: int, duration: int) -> Dict[Any, Any]:
await self._auth_lib.refresh_if_should()

payload = olive_create_get_payload_irrigation_schedule_runs(device.mac)
payload["limit"] = limit
signature = olive_create_signature(payload, self._auth_lib.token.access_token)
payload = olive_create_post_payload_irrigation_quickrun(device.mac, zone_number, duration)
signature = olive_create_signature(json.dumps(payload, separators=(',', ':')),
self._auth_lib.token.access_token)
headers = {
"Accept-Encoding": "gzip",
"User-Agent": "myapp",
"appid": OLIVE_APP_ID,
"appinfo": APP_INFO,
"phoneid": PHONE_ID,
"access_token": self._auth_lib.token.access_token,
"signature2": signature,
'Accept-Encoding': 'gzip',
'Content-Type': 'application/json',
'User-Agent': 'myapp',
'appid': OLIVE_APP_ID,
'appinfo': APP_INFO,
'phoneid': PHONE_ID,
'access_token': self._auth_lib.token.access_token,
'signature2': signature
}

response_json = await self._auth_lib.get(url, headers=headers, params=payload)
payload_str = json.dumps(payload, separators=(',', ':'))
response_json = await self._auth_lib.post(url, headers=headers, data=payload_str)

check_for_errors_iot(self, response_json)

Expand Down
Loading