Update notification service

This commit is contained in:
MacRimi
2026-03-26 19:05:11 +01:00
parent d497763e38
commit 839a20df97
5 changed files with 269 additions and 63 deletions

View File

@@ -95,6 +95,7 @@ cp "$SCRIPT_DIR/notification_manager.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo
cp "$SCRIPT_DIR/notification_channels.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ notification_channels.py not found"
cp "$SCRIPT_DIR/notification_templates.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ notification_templates.py not found"
cp "$SCRIPT_DIR/notification_events.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ notification_events.py not found"
cp "$SCRIPT_DIR/startup_grace.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ startup_grace.py not found"
cp "$SCRIPT_DIR/flask_notification_routes.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ flask_notification_routes.py not found"
cp "$SCRIPT_DIR/oci_manager.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ oci_manager.py not found"
cp "$SCRIPT_DIR/flask_oci_routes.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ flask_oci_routes.py not found"

View File

@@ -876,10 +876,8 @@ def _health_collector_loop():
'updates': 'System Updates',
'security': 'Security',
}
# Categories to suppress during startup grace period (transient issues)
_STARTUP_GRACE_CATEGORIES = {'storage', 'vms', 'network', 'services'}
_STARTUP_GRACE_SECONDS = 300 # 5 minutes
_collector_start_time = time.time()
# Import centralized startup grace management
import startup_grace
while True:
try:
@@ -939,8 +937,7 @@ def _health_collector_loop():
# Startup grace period: skip transient issues from categories
# that typically need time to stabilize after boot
in_grace_period = (time.time() - _collector_start_time) < _STARTUP_GRACE_SECONDS
if in_grace_period and cat_key in _STARTUP_GRACE_CATEGORIES:
if startup_grace.should_suppress_category(cat_key):
skip_notification = True
if not skip_notification:

View File

@@ -33,18 +33,16 @@ except ImportError:
# ============================================================================
DEBUG_PERF = False
# Startup grace period: suppress transient issues during boot
# This is set when the module loads (service start)
_MODULE_START_TIME = time.time()
_STARTUP_HEALTH_GRACE_SECONDS = 300 # 5 minutes
# ─── Startup Grace Period ────────────────────────────────────────────────────
# Import centralized startup grace management for consistent behavior
import startup_grace
def _is_startup_health_grace() -> bool:
"""Check if we're within the startup health grace period (5 min).
Used to downgrade transient errors (high latency, storage not ready)
to INFO level during system boot, preventing false CRITICAL alerts.
Uses centralized startup_grace module for consistency across all components.
"""
return (time.time() - _MODULE_START_TIME) < _STARTUP_HEALTH_GRACE_SECONDS
return startup_grace.is_startup_health_grace()
def _perf_log(section: str, elapsed_ms: float):
"""Log performance timing for a section. Only logs if DEBUG_PERF is True."""

View File

@@ -28,75 +28,51 @@ from pathlib import Path
# ─── Shared State for Cross-Watcher Coordination ──────────────────
# ─── Startup Grace Period ────────────────────────────────────────────────────
# Import centralized startup grace management
# This provides a single source of truth for all grace period logic
import startup_grace
class _SharedState:
"""Module-level state shared between all watchers.
"""Wrapper around centralized startup_grace module for backwards compatibility.
Used to coordinate behavior when host-level events affect VM/CT events:
- Suppress vm_stop/ct_stop during host shutdown (they're expected)
- Aggregate vm_start/ct_start during startup into single message
Two separate grace periods:
- startup_vm_grace: Time to aggregate VM/CT starts (3 min)
- startup_health_grace: Time to suppress transient health errors (5 min)
All grace period logic is now in startup_grace.py for consistency across:
- notification_events.py (this file)
- health_monitor.py
- flask_server.py
"""
def __init__(self):
self._lock = threading.Lock()
self._shutdown_time: float = 0 # timestamp when shutdown was detected
self._shutdown_grace = 120 # suppress VM/CT stops for 2 minutes after shutdown detected
self._startup_time: float = time.time() # when module was loaded (service start)
self._startup_vm_grace = 180 # aggregate VM/CT starts for 3 minutes after startup
self._startup_health_grace = 300 # suppress health warnings for 5 minutes after startup
self._startup_vms: list = [] # [(vmid, vmname, 'vm'|'ct'), ...]
self._startup_aggregated = False # have we already sent the aggregated message?
def mark_shutdown(self):
"""Called when system_shutdown or system_reboot is detected."""
with self._lock:
self._shutdown_time = time.time()
startup_grace.mark_shutdown()
def is_host_shutting_down(self) -> bool:
"""Check if we're within the shutdown grace period."""
with self._lock:
if self._shutdown_time == 0:
return False
return (time.time() - self._shutdown_time) < self._shutdown_grace
return startup_grace.is_host_shutting_down()
def is_startup_period(self) -> bool:
"""Check if we're within the startup VM aggregation period (3 min)."""
with self._lock:
return (time.time() - self._startup_time) < self._startup_vm_grace
return startup_grace.is_startup_vm_period()
def is_startup_health_grace(self) -> bool:
"""Check if we're within the startup health grace period (5 min).
Used by PollingCollector to suppress transient health warnings
(QMP timeout, storage not ready, high latency, etc.) during system boot.
"""
with self._lock:
return (time.time() - self._startup_time) < self._startup_health_grace
"""Check if we're within the startup health grace period (5 min)."""
return startup_grace.is_startup_health_grace()
def add_startup_vm(self, vmid: str, vmname: str, vm_type: str):
"""Record a VM/CT start during startup period for later aggregation."""
with self._lock:
self._startup_vms.append((vmid, vmname, vm_type))
startup_grace.add_startup_vm(vmid, vmname, vm_type)
def get_and_clear_startup_vms(self) -> list:
"""Get all recorded startup VMs and clear the list."""
with self._lock:
vms = self._startup_vms.copy()
self._startup_vms = []
self._startup_aggregated = True
return vms
return startup_grace.get_and_clear_startup_vms()
def has_startup_vms(self) -> bool:
"""Check if there are any startup VMs recorded."""
with self._lock:
return len(self._startup_vms) > 0
return startup_grace.has_startup_vms()
def was_startup_aggregated(self) -> bool:
"""Check if startup aggregation already happened."""
with self._lock:
return self._startup_aggregated
return startup_grace.was_startup_aggregated()
# Global shared state instance
@@ -1806,8 +1782,7 @@ class PollingCollector:
# ── Main loop ──────────────────────────────────────────────
# Categories where transient errors are suppressed during startup grace period.
# PBS storage, NFS mounts, VMs with qemu-guest-agent need time after boot.
STARTUP_GRACE_CATEGORIES = {'storage', 'vms', 'network', 'pve_services'}
# Now using centralized startup_grace module for consistency.
def _poll_loop(self):
"""Main polling loop."""
@@ -1918,11 +1893,9 @@ class PollingCollector:
# Startup grace period: ignore transient errors from categories that
# typically need time to stabilize after boot (storage, VMs, network).
# PBS storage, NFS mounts, VMs with qemu-guest-agent need time to connect.
# Uses the shared state so grace period is consistent across all watchers.
if _shared_state.is_startup_health_grace():
if category in self.STARTUP_GRACE_CATEGORIES:
# Still within grace period for this category - skip notification
continue
# Uses centralized startup_grace module for consistency.
if startup_grace.should_suppress_category(category):
continue
# On first poll, seed _last_notified for all existing errors so we
# don't re-notify old persistent errors that were already sent before

View File

@@ -0,0 +1,237 @@
"""
Centralized Startup Grace Period Management
This module provides a single source of truth for startup grace period logic.
During system boot, various transient issues occur (high latency, storage not ready,
QMP timeouts, etc.) that shouldn't trigger notifications or critical alerts.
Grace Periods:
- VM/CT aggregation: 3 minutes - Aggregate multiple VM/CT starts into one notification
- Health suppression: 5 minutes - Suppress transient health warnings/errors
- Shutdown suppression: 2 minutes - Suppress VM/CT stops during system shutdown
Categories suppressed during startup:
- storage: NFS/CIFS mounts may take time to become available
- vms: VMs may have QMP timeouts or startup delays
- network: Latency spikes during boot are normal
- services: PVE services may take time to fully initialize
"""
import time
import threading
from typing import Set, List, Tuple, Optional
# ─── Configuration ───────────────────────────────────────────────────────────
# Grace period durations (seconds)
STARTUP_VM_GRACE_SECONDS = 180 # 3 minutes for VM/CT start aggregation
STARTUP_HEALTH_GRACE_SECONDS = 300 # 5 minutes for health warning suppression
SHUTDOWN_GRACE_SECONDS = 120 # 2 minutes for VM/CT stop suppression
# Categories to suppress during startup grace period
# These categories typically have transient issues during boot
STARTUP_GRACE_CATEGORIES: Set[str] = {
'storage', # NFS/CIFS mounts may take time
'vms', # VMs may have QMP timeouts
'network', # Latency spikes during boot
'services', # PVE services initialization
}
# ─── Singleton State ─────────────────────────────────────────────────────────
class _StartupGraceState:
"""
Thread-safe singleton managing all startup/shutdown grace period state.
Initialized when the module loads (service start), which serves as the
reference point for determining if we're still in the startup period.
"""
_instance: Optional['_StartupGraceState'] = None
_init_lock = threading.Lock()
def __new__(cls) -> '_StartupGraceState':
if cls._instance is None:
with cls._init_lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._lock = threading.Lock()
# Startup time = when service started (module load time)
self._startup_time: float = time.time()
# Shutdown tracking
self._shutdown_time: float = 0
# VM/CT aggregation during startup
self._startup_vms: List[Tuple[str, str, str]] = [] # [(vmid, vmname, 'vm'|'ct'), ...]
self._startup_aggregated: bool = False
self._initialized = True
# ─── Startup Period Checks ───────────────────────────────────────────────
def is_startup_vm_period(self) -> bool:
"""
Check if we're within the VM/CT start aggregation period (3 min).
During this period, individual VM/CT start notifications are collected
and later sent as a single aggregated notification.
"""
with self._lock:
return (time.time() - self._startup_time) < STARTUP_VM_GRACE_SECONDS
def is_startup_health_grace(self) -> bool:
"""
Check if we're within the health suppression period (5 min).
During this period:
- Transient health warnings (latency, storage, etc.) are suppressed
- CRITICAL/WARNING may be downgraded to INFO for certain categories
- Health degradation notifications are skipped for grace categories
"""
with self._lock:
return (time.time() - self._startup_time) < STARTUP_HEALTH_GRACE_SECONDS
def should_suppress_category(self, category: str) -> bool:
"""
Check if notifications for a category should be suppressed.
Args:
category: Health category name (e.g., 'network', 'storage', 'vms')
Returns:
True if we're in grace period AND category is in STARTUP_GRACE_CATEGORIES
"""
if category.lower() in STARTUP_GRACE_CATEGORIES:
return self.is_startup_health_grace()
return False
def get_startup_elapsed(self) -> float:
"""Get seconds elapsed since service startup."""
with self._lock:
return time.time() - self._startup_time
# ─── Shutdown Tracking ───────────────────────────────────────────────────
def mark_shutdown(self):
"""
Called when system_shutdown or system_reboot is detected.
After this, VM/CT stop notifications will be suppressed for the
shutdown grace period (expected stops during system shutdown).
"""
with self._lock:
self._shutdown_time = time.time()
def is_host_shutting_down(self) -> bool:
"""
Check if we're within the shutdown grace period.
During this period, VM/CT stop events are expected and should not
generate notifications.
"""
with self._lock:
if self._shutdown_time == 0:
return False
return (time.time() - self._shutdown_time) < SHUTDOWN_GRACE_SECONDS
# ─── VM/CT Start Aggregation ─────────────────────────────────────────────
def add_startup_vm(self, vmid: str, vmname: str, vm_type: str):
"""
Record a VM/CT start during startup period for later aggregation.
Args:
vmid: VM/CT ID
vmname: VM/CT name
vm_type: 'vm' or 'ct'
"""
with self._lock:
self._startup_vms.append((vmid, vmname, vm_type))
def get_and_clear_startup_vms(self) -> List[Tuple[str, str, str]]:
"""
Get all recorded startup VMs and clear the list.
Should be called once after the VM aggregation grace period ends
to get all VMs that started during boot for a single notification.
Returns:
List of (vmid, vmname, vm_type) tuples
"""
with self._lock:
vms = self._startup_vms.copy()
self._startup_vms = []
self._startup_aggregated = True
return vms
def has_startup_vms(self) -> bool:
"""Check if there are any startup VMs recorded."""
with self._lock:
return len(self._startup_vms) > 0
def was_startup_aggregated(self) -> bool:
"""Check if startup aggregation has already been processed."""
with self._lock:
return self._startup_aggregated
# ─── Module-level convenience functions ──────────────────────────────────────
# Global singleton instance
_state = _StartupGraceState()
def is_startup_vm_period() -> bool:
"""Check if we're within the VM/CT start aggregation period (3 min)."""
return _state.is_startup_vm_period()
def is_startup_health_grace() -> bool:
"""Check if we're within the health suppression period (5 min)."""
return _state.is_startup_health_grace()
def should_suppress_category(category: str) -> bool:
"""Check if notifications for a category should be suppressed during startup."""
return _state.should_suppress_category(category)
def get_startup_elapsed() -> float:
"""Get seconds elapsed since service startup."""
return _state.get_startup_elapsed()
def mark_shutdown():
"""Mark that system shutdown/reboot has been detected."""
_state.mark_shutdown()
def is_host_shutting_down() -> bool:
"""Check if we're within the shutdown grace period."""
return _state.is_host_shutting_down()
def add_startup_vm(vmid: str, vmname: str, vm_type: str):
"""Record a VM/CT start during startup period for aggregation."""
_state.add_startup_vm(vmid, vmname, vm_type)
def get_and_clear_startup_vms() -> List[Tuple[str, str, str]]:
"""Get all recorded startup VMs and clear the list."""
return _state.get_and_clear_startup_vms()
def has_startup_vms() -> bool:
"""Check if there are any startup VMs recorded."""
return _state.has_startup_vms()
def was_startup_aggregated() -> bool:
"""Check if startup aggregation has already been processed."""
return _state.was_startup_aggregated()
# ─── For backwards compatibility ─────────────────────────────────────────────
# Expose constants for external use
GRACE_CATEGORIES = STARTUP_GRACE_CATEGORIES