snippets/python/push-status/push_status/monitors.py

57 lines
1.5 KiB
Python

import os
import logging
from typing import Optional
from pathlib import Path
from retry import retry
from diskcache import Cache
from uptime_kuma_api import UptimeKumaApi, MonitorType
from uptime_kuma_api.exceptions import UptimeKumaException
cache_dir = os.environ.get("XDG_CACHE_HOME")
if cache_dir is None:
cache_dir = Path.home() / ".cache"
else:
cache_dir = Path(cache_dir)
cache = Cache(directory=cache_dir / "uptime_kuma")
@cache.memoize('monitors', expire=60*60*24, ignore=(2,))
@retry(tries=6, exceptions=UptimeKumaException)
def get(url: str, username: str, password: str):
logging.debug("Fetching monitors from: %s with user %s...", url, username)
with UptimeKumaApi(url) as api:
api.login(username, password)
monitors = api.get_monitors()
logging.debug("Found %d monitors", len(monitors))
return monitors
def get_systemd(url, username, password, host, unit) -> Optional[dict]:
monitors = get(url, username, password)
for monitor in monitors:
if monitor.get('type') != MonitorType.PUSH:
continue
if not monitor.get('active'):
continue
u = None
h = None
for tag in monitor.get('tags', []):
if tag.get('name') == 'systemd-unit' and tag.get('value'):
u = tag.get('value')
if tag.get('name') == 'host' and tag.get('value'):
h = tag.get('value')
if unit != u or host != h:
continue
return monitor