mirror of
https://github.com/MacRimi/ProxMenux.git
synced 2026-04-28 22:20:38 +00:00
Update notification service
This commit is contained in:
@@ -566,13 +566,228 @@ def _temperature_collector_loop():
|
||||
cleanup_counter = 0
|
||||
while True:
|
||||
_record_temperature()
|
||||
_record_latency() # Also record latency in the same loop
|
||||
cleanup_counter += 1
|
||||
if cleanup_counter >= 60: # Every 60 iterations = 60 minutes
|
||||
_cleanup_old_temperature_data()
|
||||
_cleanup_old_latency_data()
|
||||
cleanup_counter = 0
|
||||
time.sleep(60)
|
||||
|
||||
|
||||
# ── Latency History (SQLite) ──────────────────────────────────────────────────
|
||||
# Stores network latency readings every 60s in the same database as temperature.
|
||||
# Supports multiple targets (gateway, cloudflare, google).
|
||||
# Retention: 7 days max, cleaned up every hour.
|
||||
|
||||
LATENCY_TARGETS = {
|
||||
'gateway': None, # Auto-detect default gateway
|
||||
'cloudflare': '1.1.1.1',
|
||||
'google': '8.8.8.8',
|
||||
}
|
||||
|
||||
def _get_default_gateway():
|
||||
"""Get the default gateway IP address."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
['ip', 'route', 'show', 'default'],
|
||||
capture_output=True, text=True, timeout=5
|
||||
)
|
||||
if result.returncode == 0:
|
||||
# Parse: "default via 192.168.1.1 dev eth0"
|
||||
parts = result.stdout.strip().split()
|
||||
if 'via' in parts:
|
||||
idx = parts.index('via')
|
||||
if idx + 1 < len(parts):
|
||||
return parts[idx + 1]
|
||||
except Exception:
|
||||
pass
|
||||
return '192.168.1.1' # Fallback
|
||||
|
||||
def init_latency_db():
|
||||
"""Create the latency_history table if it doesn't exist."""
|
||||
try:
|
||||
conn = _get_temp_db()
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS latency_history (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp INTEGER NOT NULL,
|
||||
target TEXT NOT NULL,
|
||||
latency_avg REAL,
|
||||
latency_min REAL,
|
||||
latency_max REAL,
|
||||
packet_loss REAL DEFAULT 0
|
||||
)
|
||||
""")
|
||||
conn.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_latency_timestamp_target
|
||||
ON latency_history(timestamp, target)
|
||||
""")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"[ProxMenux] Latency DB init failed: {e}")
|
||||
return False
|
||||
|
||||
def _measure_latency(target_ip: str) -> dict:
|
||||
"""Ping a target and return latency stats."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
['ping', '-c', '3', '-W', '2', target_ip],
|
||||
capture_output=True, text=True, timeout=10
|
||||
)
|
||||
|
||||
if result.returncode == 0:
|
||||
latencies = []
|
||||
for line in result.stdout.split('\n'):
|
||||
if 'time=' in line:
|
||||
try:
|
||||
latency_str = line.split('time=')[1].split()[0]
|
||||
latencies.append(float(latency_str))
|
||||
except:
|
||||
pass
|
||||
|
||||
if latencies:
|
||||
return {
|
||||
'success': True,
|
||||
'avg': round(sum(latencies) / len(latencies), 1),
|
||||
'min': round(min(latencies), 1),
|
||||
'max': round(max(latencies), 1),
|
||||
'packet_loss': round((3 - len(latencies)) / 3 * 100, 1)
|
||||
}
|
||||
|
||||
# Ping failed - 100% packet loss
|
||||
return {'success': False, 'avg': None, 'min': None, 'max': None, 'packet_loss': 100.0}
|
||||
except Exception:
|
||||
return {'success': False, 'avg': None, 'min': None, 'max': None, 'packet_loss': 100.0}
|
||||
|
||||
def _record_latency():
|
||||
"""Record latency to the default gateway."""
|
||||
try:
|
||||
gateway = _get_default_gateway()
|
||||
stats = _measure_latency(gateway)
|
||||
|
||||
conn = _get_temp_db()
|
||||
conn.execute(
|
||||
"""INSERT INTO latency_history
|
||||
(timestamp, target, latency_avg, latency_min, latency_max, packet_loss)
|
||||
VALUES (?, ?, ?, ?, ?, ?)""",
|
||||
(int(time.time()), 'gateway', stats['avg'], stats['min'], stats['max'], stats['packet_loss'])
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _cleanup_old_latency_data():
|
||||
"""Remove latency records older than 7 days."""
|
||||
try:
|
||||
cutoff = int(time.time()) - (7 * 24 * 3600)
|
||||
conn = _get_temp_db()
|
||||
conn.execute("DELETE FROM latency_history WHERE timestamp < ?", (cutoff,))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def get_latency_history(target='gateway', timeframe='hour'):
|
||||
"""Get latency history with downsampling for longer timeframes."""
|
||||
try:
|
||||
now = int(time.time())
|
||||
if timeframe == "hour":
|
||||
since = now - 3600
|
||||
interval = None # All points (~60)
|
||||
elif timeframe == "6hour":
|
||||
since = now - 6 * 3600
|
||||
interval = 300 # 5 min avg
|
||||
elif timeframe == "day":
|
||||
since = now - 86400
|
||||
interval = 600 # 10 min avg
|
||||
elif timeframe == "3day":
|
||||
since = now - 3 * 86400
|
||||
interval = 1800 # 30 min avg
|
||||
elif timeframe == "week":
|
||||
since = now - 7 * 86400
|
||||
interval = 3600 # 1h avg
|
||||
else:
|
||||
since = now - 3600
|
||||
interval = None
|
||||
|
||||
conn = _get_temp_db()
|
||||
|
||||
if interval is None:
|
||||
cursor = conn.execute(
|
||||
"""SELECT timestamp, latency_avg, latency_min, latency_max, packet_loss
|
||||
FROM latency_history
|
||||
WHERE timestamp >= ? AND target = ?
|
||||
ORDER BY timestamp ASC""",
|
||||
(since, target)
|
||||
)
|
||||
rows = cursor.fetchall()
|
||||
data = [{"timestamp": r[0], "value": r[1], "min": r[2], "max": r[3], "packet_loss": r[4]} for r in rows if r[1] is not None]
|
||||
else:
|
||||
cursor = conn.execute(
|
||||
"""SELECT (timestamp / ?) * ? as bucket,
|
||||
ROUND(AVG(latency_avg), 1) as avg_val,
|
||||
ROUND(MIN(latency_min), 1) as min_val,
|
||||
ROUND(MAX(latency_max), 1) as max_val,
|
||||
ROUND(AVG(packet_loss), 1) as avg_loss
|
||||
FROM latency_history
|
||||
WHERE timestamp >= ? AND target = ?
|
||||
GROUP BY bucket
|
||||
ORDER BY bucket ASC""",
|
||||
(interval, interval, since, target)
|
||||
)
|
||||
rows = cursor.fetchall()
|
||||
data = [{"timestamp": r[0], "value": r[1], "min": r[2], "max": r[3], "packet_loss": r[4]} for r in rows if r[1] is not None]
|
||||
|
||||
conn.close()
|
||||
|
||||
# Compute stats
|
||||
if data:
|
||||
values = [d["value"] for d in data if d["value"] is not None]
|
||||
if values:
|
||||
mins = [d["min"] for d in data if d.get("min") is not None]
|
||||
maxs = [d["max"] for d in data if d.get("max") is not None]
|
||||
stats = {
|
||||
"min": round(min(mins) if mins else min(values), 1),
|
||||
"max": round(max(maxs) if maxs else max(values), 1),
|
||||
"avg": round(sum(values) / len(values), 1),
|
||||
"current": values[-1] if values else 0
|
||||
}
|
||||
else:
|
||||
stats = {"min": 0, "max": 0, "avg": 0, "current": 0}
|
||||
else:
|
||||
stats = {"min": 0, "max": 0, "avg": 0, "current": 0}
|
||||
|
||||
return {"data": data, "stats": stats, "target": target}
|
||||
except Exception as e:
|
||||
return {"data": [], "stats": {"min": 0, "max": 0, "avg": 0, "current": 0}, "target": target}
|
||||
|
||||
def get_current_latency(target='gateway'):
|
||||
"""Get the most recent latency measurement for a target."""
|
||||
try:
|
||||
# If gateway, resolve to actual IP
|
||||
if target == 'gateway':
|
||||
target_ip = _get_default_gateway()
|
||||
else:
|
||||
target_ip = LATENCY_TARGETS.get(target, target)
|
||||
|
||||
stats = _measure_latency(target_ip)
|
||||
return {
|
||||
'target': target,
|
||||
'target_ip': target_ip,
|
||||
'latency_avg': stats['avg'],
|
||||
'latency_min': stats['min'],
|
||||
'latency_max': stats['max'],
|
||||
'packet_loss': stats['packet_loss'],
|
||||
'status': 'ok' if stats['success'] and stats['avg'] and stats['avg'] < 100 else 'warning' if stats['success'] else 'error'
|
||||
}
|
||||
except Exception:
|
||||
return {'target': target, 'latency_avg': None, 'status': 'error'}
|
||||
|
||||
|
||||
def _health_collector_loop():
|
||||
"""Background thread: run full health checks every 5 minutes.
|
||||
Keeps the health cache always fresh and records events/errors in the DB.
|
||||
@@ -621,9 +836,22 @@ def _health_collector_loop():
|
||||
# Compare each category's current status to previous cycle.
|
||||
# Notify when a category DEGRADES (OK->WARNING, WARNING->CRITICAL, etc.)
|
||||
# Include the detailed 'reason' so the user knows exactly what triggered it.
|
||||
#
|
||||
# IMPORTANT: Some health categories map to specific notification toggles:
|
||||
# - network + latency issue -> 'network_latency' toggle
|
||||
# - network + connectivity issue -> 'network_down' toggle
|
||||
# If the specific toggle is disabled, skip that notification.
|
||||
details = result.get('details', {})
|
||||
degraded = []
|
||||
|
||||
# Map health categories to specific event types for toggle checks
|
||||
_CATEGORY_EVENT_MAP = {
|
||||
# (category, reason_contains) -> event_type to check
|
||||
('network', 'latency'): 'network_latency',
|
||||
('network', 'connectivity'): 'network_down',
|
||||
('network', 'unreachable'): 'network_down',
|
||||
}
|
||||
|
||||
for cat_key, cat_data in details.items():
|
||||
cur_status = cat_data.get('status', 'OK')
|
||||
prev_status = _prev_statuses.get(cat_key, 'OK')
|
||||
@@ -632,12 +860,23 @@ def _health_collector_loop():
|
||||
|
||||
if cur_rank > prev_rank and cur_rank >= 2: # WARNING or CRITICAL
|
||||
reason = cat_data.get('reason', f'{cat_key} status changed to {cur_status}')
|
||||
reason_lower = reason.lower()
|
||||
cat_name = _CAT_NAMES.get(cat_key, cat_key)
|
||||
degraded.append({
|
||||
'category': cat_name,
|
||||
'status': cur_status,
|
||||
'reason': reason,
|
||||
})
|
||||
|
||||
# Check if this specific notification type is enabled
|
||||
skip_notification = False
|
||||
for (map_cat, map_keyword), event_type in _CATEGORY_EVENT_MAP.items():
|
||||
if cat_key == map_cat and map_keyword in reason_lower:
|
||||
if not notification_manager.is_event_enabled(event_type):
|
||||
skip_notification = True
|
||||
break
|
||||
|
||||
if not skip_notification:
|
||||
degraded.append({
|
||||
'category': cat_name,
|
||||
'status': cur_status,
|
||||
'reason': reason,
|
||||
})
|
||||
|
||||
_prev_statuses[cat_key] = cur_status
|
||||
|
||||
@@ -5438,6 +5677,44 @@ def api_temperature_history():
|
||||
return jsonify({'data': [], 'stats': {'min': 0, 'max': 0, 'avg': 0, 'current': 0}}), 500
|
||||
|
||||
|
||||
@app.route('/api/network/latency/history', methods=['GET'])
|
||||
@require_auth
|
||||
def api_latency_history():
|
||||
"""Get latency history for charts.
|
||||
|
||||
Query params:
|
||||
target: gateway (default), cloudflare, google
|
||||
timeframe: hour, 6hour, day, 3day, week
|
||||
"""
|
||||
try:
|
||||
target = request.args.get('target', 'gateway')
|
||||
if target not in ('gateway', 'cloudflare', 'google'):
|
||||
target = 'gateway'
|
||||
timeframe = request.args.get('timeframe', 'hour')
|
||||
if timeframe not in ('hour', '6hour', 'day', '3day', 'week'):
|
||||
timeframe = 'hour'
|
||||
result = get_latency_history(target, timeframe)
|
||||
return jsonify(result)
|
||||
except Exception as e:
|
||||
return jsonify({'data': [], 'stats': {'min': 0, 'max': 0, 'avg': 0, 'current': 0}, 'target': 'gateway'}), 500
|
||||
|
||||
|
||||
@app.route('/api/network/latency/current', methods=['GET'])
|
||||
@require_auth
|
||||
def api_latency_current():
|
||||
"""Get current latency measurement for a target.
|
||||
|
||||
Query params:
|
||||
target: gateway (default), cloudflare, google, or custom IP
|
||||
"""
|
||||
try:
|
||||
target = request.args.get('target', 'gateway')
|
||||
result = get_current_latency(target)
|
||||
return jsonify(result)
|
||||
except Exception as e:
|
||||
return jsonify({'target': target, 'latency_avg': None, 'status': 'error'}), 500
|
||||
|
||||
|
||||
@app.route('/api/storage', methods=['GET'])
|
||||
@require_auth
|
||||
def api_storage():
|
||||
@@ -7382,17 +7659,18 @@ if __name__ == '__main__':
|
||||
except Exception as e:
|
||||
print(f"[ProxMenux] journald check skipped: {e}")
|
||||
|
||||
# ── Temperature history collector ──
|
||||
# Initialize SQLite DB and start background thread to record CPU temp every 60s
|
||||
if init_temperature_db():
|
||||
# Record initial reading immediately
|
||||
# ── Temperature & Latency history collector ──
|
||||
# Initialize SQLite DB and start background thread to record CPU temp + latency every 60s
|
||||
if init_temperature_db() and init_latency_db():
|
||||
# Record initial readings immediately
|
||||
_record_temperature()
|
||||
# Start background collector thread
|
||||
_record_latency()
|
||||
# Start background collector thread (handles both temp and latency)
|
||||
temp_thread = threading.Thread(target=_temperature_collector_loop, daemon=True)
|
||||
temp_thread.start()
|
||||
print("[ProxMenux] Temperature history collector started (60s interval, 30d retention)")
|
||||
print("[ProxMenux] Temperature & Latency history collector started (60s interval)")
|
||||
else:
|
||||
print("[ProxMenux] Temperature history disabled (DB init failed)")
|
||||
print("[ProxMenux] Temperature/Latency history disabled (DB init failed)")
|
||||
|
||||
# ── Background Health Monitor ──
|
||||
# Run full health checks every 5 min, keeping cache fresh and recording events for notifications
|
||||
|
||||
@@ -2006,7 +2006,11 @@ class HealthMonitor:
|
||||
return {'status': 'UNKNOWN', 'reason': f'Network check unavailable: {str(e)}', 'checks': {}}
|
||||
|
||||
def _check_network_latency(self) -> Optional[Dict[str, Any]]:
|
||||
"""Check network latency to 1.1.1.1 (cached)"""
|
||||
"""Check network latency to 1.1.1.1 using 3 consecutive pings.
|
||||
|
||||
Uses 3 pings to avoid false positives from transient network spikes.
|
||||
Reports the average latency and only warns if all 3 exceed threshold.
|
||||
"""
|
||||
cache_key = 'network_latency'
|
||||
current_time = time.time()
|
||||
|
||||
@@ -2015,42 +2019,60 @@ class HealthMonitor:
|
||||
return self.cached_results.get(cache_key)
|
||||
|
||||
try:
|
||||
# Use 3 pings to get reliable latency measurement
|
||||
result = subprocess.run(
|
||||
['ping', '-c', '1', '-W', '1', '1.1.1.1'],
|
||||
['ping', '-c', '3', '-W', '2', '1.1.1.1'],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=self.NETWORK_TIMEOUT
|
||||
timeout=self.NETWORK_TIMEOUT + 6 # Allow time for 3 pings
|
||||
)
|
||||
|
||||
if result.returncode == 0:
|
||||
# Parse individual ping times
|
||||
latencies = []
|
||||
for line in result.stdout.split('\n'):
|
||||
if 'time=' in line:
|
||||
try:
|
||||
latency_str = line.split('time=')[1].split()[0]
|
||||
latency = float(latency_str)
|
||||
|
||||
if latency > self.NETWORK_LATENCY_CRITICAL:
|
||||
status = 'CRITICAL'
|
||||
reason = f'Latency {latency:.1f}ms >{self.NETWORK_LATENCY_CRITICAL}ms'
|
||||
elif latency > self.NETWORK_LATENCY_WARNING:
|
||||
status = 'WARNING'
|
||||
reason = f'Latency {latency:.1f}ms >{self.NETWORK_LATENCY_WARNING}ms'
|
||||
else:
|
||||
status = 'OK'
|
||||
reason = None
|
||||
|
||||
latency_result = {
|
||||
'status': status,
|
||||
'latency_ms': round(latency, 1)
|
||||
}
|
||||
if reason:
|
||||
latency_result['reason'] = reason
|
||||
|
||||
self.cached_results[cache_key] = latency_result
|
||||
self.last_check_times[cache_key] = current_time
|
||||
return latency_result
|
||||
latencies.append(float(latency_str))
|
||||
except:
|
||||
pass
|
||||
|
||||
if latencies:
|
||||
# Calculate average latency
|
||||
avg_latency = sum(latencies) / len(latencies)
|
||||
max_latency = max(latencies)
|
||||
min_latency = min(latencies)
|
||||
|
||||
# Count how many pings exceeded thresholds
|
||||
critical_count = sum(1 for l in latencies if l > self.NETWORK_LATENCY_CRITICAL)
|
||||
warning_count = sum(1 for l in latencies if l > self.NETWORK_LATENCY_WARNING)
|
||||
|
||||
# Only report WARNING/CRITICAL if majority of pings exceed threshold
|
||||
# This prevents false positives from single transient spikes
|
||||
if critical_count >= 2: # 2 or more of 3 pings are critical
|
||||
status = 'CRITICAL'
|
||||
reason = f'Latency {avg_latency:.1f}ms avg >{self.NETWORK_LATENCY_CRITICAL}ms (min:{min_latency:.0f} max:{max_latency:.0f})'
|
||||
elif warning_count >= 2: # 2 or more of 3 pings exceed warning
|
||||
status = 'WARNING'
|
||||
reason = f'Latency {avg_latency:.1f}ms avg >{self.NETWORK_LATENCY_WARNING}ms (min:{min_latency:.0f} max:{max_latency:.0f})'
|
||||
else:
|
||||
status = 'OK'
|
||||
reason = None
|
||||
|
||||
latency_result = {
|
||||
'status': status,
|
||||
'latency_ms': round(avg_latency, 1),
|
||||
'latency_min': round(min_latency, 1),
|
||||
'latency_max': round(max_latency, 1),
|
||||
'samples': len(latencies),
|
||||
}
|
||||
if reason:
|
||||
latency_result['reason'] = reason
|
||||
|
||||
self.cached_results[cache_key] = latency_result
|
||||
self.last_check_times[cache_key] = current_time
|
||||
return latency_result
|
||||
|
||||
# If ping failed (timeout, unreachable) - distinguish the reason
|
||||
stderr_lower = (result.stderr or '').lower() if hasattr(result, 'stderr') else ''
|
||||
|
||||
@@ -902,10 +902,21 @@ class NotificationManager:
|
||||
def send_notification(self, event_type: str, severity: str,
|
||||
title: str, message: str,
|
||||
data: Optional[Dict] = None,
|
||||
source: str = 'api') -> Dict[str, Any]:
|
||||
source: str = 'api',
|
||||
skip_toggle_check: bool = False) -> Dict[str, Any]:
|
||||
"""Send a notification directly (bypasses queue and cooldown).
|
||||
|
||||
Used by CLI and API for explicit sends.
|
||||
|
||||
Args:
|
||||
event_type: Type of event (must match TEMPLATES key)
|
||||
severity: INFO, WARNING, CRITICAL
|
||||
title: Notification title
|
||||
message: Notification body
|
||||
data: Extra data for template rendering
|
||||
source: Origin of notification (api, cli, health_monitor, etc.)
|
||||
skip_toggle_check: If True, send even if event toggle is disabled.
|
||||
Use for 'custom' or 'other' events that should always send.
|
||||
"""
|
||||
if not self._channels:
|
||||
self._load_config()
|
||||
@@ -917,6 +928,17 @@ class NotificationManager:
|
||||
'channels_sent': [],
|
||||
}
|
||||
|
||||
# Check if this event type is enabled (unless explicitly skipped)
|
||||
# 'custom' and 'other' events always send (used for manual/script notifications)
|
||||
if not skip_toggle_check and event_type not in ('custom', 'other'):
|
||||
if not self.is_event_enabled(event_type):
|
||||
return {
|
||||
'success': False,
|
||||
'error': f'Event type "{event_type}" is disabled in notification settings',
|
||||
'channels_sent': [],
|
||||
'skipped': True,
|
||||
}
|
||||
|
||||
# Render template if available
|
||||
if event_type in TEMPLATES and not message:
|
||||
rendered = render_template(event_type, data or {})
|
||||
@@ -1076,6 +1098,54 @@ class NotificationManager:
|
||||
|
||||
return {'success': True, 'enabled': enabled}
|
||||
|
||||
def is_event_enabled(self, event_type: str) -> bool:
|
||||
"""Check if a specific event type is enabled for notifications.
|
||||
|
||||
Returns True if ANY active channel has this event enabled.
|
||||
Returns False only if ALL channels have explicitly disabled this event.
|
||||
Used by callers like health_polling_thread to skip notifications
|
||||
for disabled events.
|
||||
|
||||
The UI stores toggles per-channel as '{channel}.event.{event_type}'.
|
||||
We check all configured channels - if any has it enabled, return True.
|
||||
"""
|
||||
if not self._config:
|
||||
self._load_config()
|
||||
|
||||
# Get template info for default state
|
||||
tmpl = TEMPLATES.get(event_type, {})
|
||||
default_enabled = 'true' if tmpl.get('default_enabled', True) else 'false'
|
||||
event_group = tmpl.get('group', 'other')
|
||||
|
||||
# Check each configured channel
|
||||
# A channel is "active" if it has .enabled = true
|
||||
channel_types = ['telegram', 'gotify', 'discord', 'email', 'ntfy', 'pushover', 'slack']
|
||||
active_channels = []
|
||||
|
||||
for ch_name in channel_types:
|
||||
if self._config.get(f'{ch_name}.enabled', 'false') == 'true':
|
||||
active_channels.append(ch_name)
|
||||
|
||||
# If no channels are configured, consider events as "disabled"
|
||||
# (no point generating notifications with no destination)
|
||||
if not active_channels:
|
||||
return False
|
||||
|
||||
# Check if ANY active channel has this event enabled
|
||||
for ch_name in active_channels:
|
||||
# First check category toggle for this channel
|
||||
ch_group_key = f'{ch_name}.events.{event_group}'
|
||||
if self._config.get(ch_group_key, 'true') == 'false':
|
||||
continue # Category disabled for this channel
|
||||
|
||||
# Then check event-specific toggle
|
||||
ch_event_key = f'{ch_name}.event.{event_type}'
|
||||
if self._config.get(ch_event_key, default_enabled) == 'true':
|
||||
return True # At least one channel has it enabled
|
||||
|
||||
# All active channels have this event disabled
|
||||
return False
|
||||
|
||||
def list_channels(self) -> Dict[str, Any]:
|
||||
"""List all channel types with their configuration status."""
|
||||
if not self._config:
|
||||
|
||||
Reference in New Issue
Block a user