Update notification service

This commit is contained in:
MacRimi
2026-03-04 19:11:38 +01:00
parent 66d2a68167
commit 9089035f18
5 changed files with 504 additions and 36 deletions
+262 -1
View File
@@ -150,6 +150,45 @@ class HealthPersistence:
cursor.execute('CREATE INDEX IF NOT EXISTS idx_notif_severity ON notification_history(severity)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_nls_ts ON notification_last_sent(last_sent_ts)')
# ── Disk Observations System ──
# Registry of all physical disks seen by the system
cursor.execute('''
CREATE TABLE IF NOT EXISTS disk_registry (
id INTEGER PRIMARY KEY AUTOINCREMENT,
device_name TEXT NOT NULL,
serial TEXT,
model TEXT,
size_bytes INTEGER,
first_seen TEXT NOT NULL,
last_seen TEXT NOT NULL,
removed INTEGER DEFAULT 0,
UNIQUE(device_name, serial)
)
''')
# Observation log: deduplicated error events per disk
cursor.execute('''
CREATE TABLE IF NOT EXISTS disk_observations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
disk_registry_id INTEGER NOT NULL,
error_type TEXT NOT NULL,
error_signature TEXT NOT NULL,
first_occurrence TEXT NOT NULL,
last_occurrence TEXT NOT NULL,
occurrence_count INTEGER DEFAULT 1,
raw_message TEXT,
severity TEXT DEFAULT 'warning',
dismissed INTEGER DEFAULT 0,
FOREIGN KEY(disk_registry_id) REFERENCES disk_registry(id),
UNIQUE(disk_registry_id, error_type, error_signature)
)
''')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_disk_serial ON disk_registry(serial)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_disk_device ON disk_registry(device_name)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_obs_disk ON disk_observations(disk_registry_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_obs_dismissed ON disk_observations(dismissed)')
conn.commit()
conn.close()
@@ -519,10 +558,12 @@ class HealthPersistence:
}
child_prefix = CASCADE_PREFIXES.get(error_key)
if child_prefix:
# Only cascade to active (unresolved) child errors.
# Already-resolved/expired entries must NOT be re-surfaced.
cursor.execute('''
UPDATE errors
SET acknowledged = 1, resolved_at = ?, suppression_hours = ?
WHERE error_key LIKE ? AND acknowledged = 0
WHERE error_key LIKE ? AND acknowledged = 0 AND resolved_at IS NULL
''', (now, sup_hours, child_prefix + '%'))
result = {
@@ -1119,5 +1160,225 @@ class HealthPersistence:
print(f"[HealthPersistence] Error recording UNKNOWN persistent: {e}")
# ────────────────────────────────────────────────────────────────
# Disk Observations API
# ────────────────────────────────────────────────────────────────
def register_disk(self, device_name: str, serial: Optional[str] = None,
model: Optional[str] = None, size_bytes: Optional[int] = None):
"""Register or update a physical disk in the registry.
Uses (device_name, serial) as unique key. If the disk was previously
marked removed, it's re-activated.
"""
now = datetime.now().isoformat()
try:
conn = self._get_conn()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO disk_registry (device_name, serial, model, size_bytes, first_seen, last_seen, removed)
VALUES (?, ?, ?, ?, ?, ?, 0)
ON CONFLICT(device_name, serial) DO UPDATE SET
model = COALESCE(excluded.model, model),
size_bytes = COALESCE(excluded.size_bytes, size_bytes),
last_seen = excluded.last_seen,
removed = 0
''', (device_name, serial or '', model, size_bytes, now, now))
conn.commit()
conn.close()
except Exception as e:
print(f"[HealthPersistence] Error registering disk {device_name}: {e}")
def _get_disk_registry_id(self, cursor, device_name: str,
serial: Optional[str] = None) -> Optional[int]:
"""Find disk_registry.id, matching by serial first, then device_name."""
if serial:
cursor.execute(
'SELECT id FROM disk_registry WHERE serial = ? AND serial != "" ORDER BY last_seen DESC LIMIT 1',
(serial,))
row = cursor.fetchone()
if row:
return row[0]
# Fallback: match by device_name (strip /dev/ prefix)
clean_dev = device_name.replace('/dev/', '')
cursor.execute(
'SELECT id FROM disk_registry WHERE device_name = ? ORDER BY last_seen DESC LIMIT 1',
(clean_dev,))
row = cursor.fetchone()
return row[0] if row else None
def record_disk_observation(self, device_name: str, serial: Optional[str],
error_type: str, error_signature: str,
raw_message: str = '',
severity: str = 'warning'):
"""Record or deduplicate a disk error observation.
error_type: 'smart_error', 'io_error', 'connection_error'
error_signature: Normalized unique string for dedup (e.g. 'FailedReadSmartSelfTestLog')
"""
now = datetime.now().isoformat()
try:
conn = self._get_conn()
cursor = conn.cursor()
# Auto-register the disk if not present
clean_dev = device_name.replace('/dev/', '')
self.register_disk(clean_dev, serial)
disk_id = self._get_disk_registry_id(cursor, clean_dev, serial)
if not disk_id:
conn.close()
return
# Upsert observation: if same (disk, type, signature), bump count + update last_occurrence
cursor.execute('''
INSERT INTO disk_observations
(disk_registry_id, error_type, error_signature, first_occurrence,
last_occurrence, occurrence_count, raw_message, severity, dismissed)
VALUES (?, ?, ?, ?, ?, 1, ?, ?, 0)
ON CONFLICT(disk_registry_id, error_type, error_signature) DO UPDATE SET
last_occurrence = excluded.last_occurrence,
occurrence_count = occurrence_count + 1,
severity = CASE WHEN excluded.severity = 'critical' THEN 'critical' ELSE severity END,
dismissed = 0
''', (disk_id, error_type, error_signature, now, now, raw_message, severity))
conn.commit()
conn.close()
except Exception as e:
print(f"[HealthPersistence] Error recording disk observation: {e}")
def get_disk_observations(self, device_name: Optional[str] = None,
serial: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get active (non-dismissed) observations for one disk or all disks."""
try:
conn = self._get_conn()
cursor = conn.cursor()
if device_name or serial:
disk_id = self._get_disk_registry_id(cursor,
device_name or '', serial)
if not disk_id:
conn.close()
return []
cursor.execute('''
SELECT o.id, o.error_type, o.error_signature,
o.first_occurrence, o.last_occurrence,
o.occurrence_count, o.raw_message, o.severity, o.dismissed,
d.device_name, d.serial, d.model
FROM disk_observations o
JOIN disk_registry d ON o.disk_registry_id = d.id
WHERE o.disk_registry_id = ? AND o.dismissed = 0
ORDER BY o.last_occurrence DESC
''', (disk_id,))
else:
cursor.execute('''
SELECT o.id, o.error_type, o.error_signature,
o.first_occurrence, o.last_occurrence,
o.occurrence_count, o.raw_message, o.severity, o.dismissed,
d.device_name, d.serial, d.model
FROM disk_observations o
JOIN disk_registry d ON o.disk_registry_id = d.id
WHERE o.dismissed = 0
ORDER BY o.last_occurrence DESC
''')
rows = cursor.fetchall()
conn.close()
return [{
'id': r[0],
'error_type': r[1],
'error_signature': r[2],
'first_occurrence': r[3],
'last_occurrence': r[4],
'occurrence_count': r[5],
'raw_message': r[6] or '',
'severity': r[7],
'dismissed': bool(r[8]),
'device_name': r[9],
'serial': r[10],
'model': r[11],
} for r in rows]
except Exception as e:
print(f"[HealthPersistence] Error getting observations: {e}")
return []
def get_disks_observation_counts(self) -> Dict[str, int]:
"""Return {device_name: count} of active observations per disk.
Also includes serial-keyed entries for cross-device matching.
"""
try:
conn = self._get_conn()
cursor = conn.cursor()
cursor.execute('''
SELECT d.device_name, d.serial, COUNT(o.id) as cnt
FROM disk_observations o
JOIN disk_registry d ON o.disk_registry_id = d.id
WHERE o.dismissed = 0
GROUP BY d.id
''')
result = {}
for device_name, serial, cnt in cursor.fetchall():
result[device_name] = cnt
if serial:
result[f'serial:{serial}'] = cnt
conn.close()
return result
except Exception as e:
print(f"[HealthPersistence] Error getting observation counts: {e}")
return {}
def dismiss_disk_observation(self, observation_id: int):
"""Mark a single observation as dismissed."""
try:
conn = self._get_conn()
cursor = conn.cursor()
cursor.execute(
'UPDATE disk_observations SET dismissed = 1 WHERE id = ?',
(observation_id,))
conn.commit()
conn.close()
except Exception as e:
print(f"[HealthPersistence] Error dismissing observation: {e}")
def cleanup_stale_observations(self, max_age_days: int = 30):
"""Auto-dismiss observations not seen in max_age_days."""
try:
from datetime import timedelta
cutoff = (datetime.now() - timedelta(days=max_age_days)).isoformat()
conn = self._get_conn()
cursor = conn.cursor()
cursor.execute('''
UPDATE disk_observations
SET dismissed = 1
WHERE dismissed = 0 AND last_occurrence < ?
''', (cutoff,))
conn.commit()
conn.close()
except Exception as e:
print(f"[HealthPersistence] Error cleaning stale observations: {e}")
def mark_removed_disks(self, active_device_names: List[str]):
"""Mark disks not in active_device_names as removed."""
try:
now = datetime.now().isoformat()
conn = self._get_conn()
cursor = conn.cursor()
if active_device_names:
placeholders = ','.join('?' for _ in active_device_names)
cursor.execute(f'''
UPDATE disk_registry SET removed = 1
WHERE device_name NOT IN ({placeholders}) AND removed = 0
''', active_device_names)
conn.commit()
conn.close()
except Exception as e:
print(f"[HealthPersistence] Error marking removed disks: {e}")
# Global instance
health_persistence = HealthPersistence()