mirror of
https://github.com/MacRimi/ProxMenux.git
synced 2026-04-28 06:00:40 +00:00
Update notification service
This commit is contained in:
@@ -686,6 +686,16 @@ def proxmox_webhook():
|
|||||||
else:
|
else:
|
||||||
return _reject(400, 'empty_payload', 400)
|
return _reject(400, 'empty_payload', 400)
|
||||||
|
|
||||||
|
# DEBUG: Log full webhook payload to file for analysis
|
||||||
|
import json as _json
|
||||||
|
try:
|
||||||
|
with open('/tmp/proxmenux_webhook_payload.log', 'a') as _f:
|
||||||
|
_f.write(f"\n{'='*60}\n{time.strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||||||
|
_f.write(_json.dumps(payload, indent=2, default=str, ensure_ascii=False))
|
||||||
|
_f.write('\n')
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
result = notification_manager.process_webhook(payload)
|
result = notification_manager.process_webhook(payload)
|
||||||
# Always return 200 to PVE -- a non-200 makes PVE report the webhook as broken.
|
# Always return 200 to PVE -- a non-200 makes PVE report the webhook as broken.
|
||||||
# The 'accepted' field in the JSON body indicates actual processing status.
|
# The 'accepted' field in the JSON body indicates actual processing status.
|
||||||
|
|||||||
@@ -941,9 +941,6 @@ class ProxmoxHookWatcher:
|
|||||||
def __init__(self, event_queue: Queue):
|
def __init__(self, event_queue: Queue):
|
||||||
self._queue = event_queue
|
self._queue = event_queue
|
||||||
self._hostname = _hostname()
|
self._hostname = _hostname()
|
||||||
# Shared dict: TaskWatcher._webhook_delivered points here.
|
|
||||||
# Records (event_type:entity_id -> timestamp) for cross-source dedup.
|
|
||||||
self._delivered: Dict[str, float] = {}
|
|
||||||
|
|
||||||
def process_webhook(self, payload: dict) -> dict:
|
def process_webhook(self, payload: dict) -> dict:
|
||||||
"""Process an incoming Proxmox webhook payload.
|
"""Process an incoming Proxmox webhook payload.
|
||||||
@@ -1026,15 +1023,18 @@ class ProxmoxHookWatcher:
|
|||||||
if dur_m:
|
if dur_m:
|
||||||
data['duration'] = dur_m.group(1).strip()
|
data['duration'] = dur_m.group(1).strip()
|
||||||
|
|
||||||
# Record in the shared delivered dict so TaskWatcher can dedup.
|
# Record this event for cross-source dedup.
|
||||||
# TaskWatcher._webhook_delivered points at self._delivered (same object).
|
# TaskWatcher checks this dict before emitting backup/replication
|
||||||
self._delivered[f"{event_type}:{entity_id}"] = time.time()
|
# events so it yields to the richer webhook data.
|
||||||
# Prune old entries periodically
|
import time
|
||||||
|
if not hasattr(self, '_delivered'):
|
||||||
|
self._delivered = {}
|
||||||
|
dedup_key = f"{event_type}:{entity_id}"
|
||||||
|
self._delivered[dedup_key] = time.time()
|
||||||
|
# Cleanup old entries
|
||||||
if len(self._delivered) > 200:
|
if len(self._delivered) > 200:
|
||||||
cutoff = time.time() - 300
|
cutoff = time.time() - 300
|
||||||
to_del = [k for k, v in self._delivered.items() if v < cutoff]
|
self._delivered = {k: v for k, v in self._delivered.items() if v > cutoff}
|
||||||
for k in to_del:
|
|
||||||
del self._delivered[k]
|
|
||||||
|
|
||||||
event = NotificationEvent(
|
event = NotificationEvent(
|
||||||
event_type=event_type,
|
event_type=event_type,
|
||||||
|
|||||||
@@ -419,14 +419,6 @@ class NotificationManager:
|
|||||||
self._task_watcher = TaskWatcher(self._event_queue)
|
self._task_watcher = TaskWatcher(self._event_queue)
|
||||||
self._polling_collector = PollingCollector(self._event_queue)
|
self._polling_collector = PollingCollector(self._event_queue)
|
||||||
|
|
||||||
# Create hook_watcher eagerly so we can wire the cross-source dedup.
|
|
||||||
# TaskWatcher._webhook_delivered points at ProxmoxHookWatcher._delivered
|
|
||||||
# so TaskWatcher can skip backup/replication events the webhook already
|
|
||||||
# delivered with richer data (full logs, sizes, durations).
|
|
||||||
if not self._hook_watcher:
|
|
||||||
self._hook_watcher = ProxmoxHookWatcher(self._event_queue)
|
|
||||||
self._task_watcher._webhook_delivered = self._hook_watcher._delivered
|
|
||||||
|
|
||||||
self._journal_watcher.start()
|
self._journal_watcher.start()
|
||||||
self._task_watcher.start()
|
self._task_watcher.start()
|
||||||
self._polling_collector.start()
|
self._polling_collector.start()
|
||||||
@@ -874,6 +866,10 @@ class NotificationManager:
|
|||||||
"""Process incoming Proxmox webhook. Delegates to ProxmoxHookWatcher."""
|
"""Process incoming Proxmox webhook. Delegates to ProxmoxHookWatcher."""
|
||||||
if not self._hook_watcher:
|
if not self._hook_watcher:
|
||||||
self._hook_watcher = ProxmoxHookWatcher(self._event_queue)
|
self._hook_watcher = ProxmoxHookWatcher(self._event_queue)
|
||||||
|
# Share the webhook's delivery record with TaskWatcher
|
||||||
|
# so tasks can yield to richer webhook data for backup/replication.
|
||||||
|
if self._task_watcher:
|
||||||
|
self._task_watcher._webhook_delivered = self._hook_watcher._delivered
|
||||||
return self._hook_watcher.process_webhook(payload)
|
return self._hook_watcher.process_webhook(payload)
|
||||||
|
|
||||||
def get_webhook_secret(self) -> str:
|
def get_webhook_secret(self) -> str:
|
||||||
|
|||||||
@@ -302,12 +302,6 @@ TEMPLATES = {
|
|||||||
'group': 'system',
|
'group': 'system',
|
||||||
'default_enabled': True,
|
'default_enabled': True,
|
||||||
},
|
},
|
||||||
'system_mail': {
|
|
||||||
'title': '{hostname}: System mail notification',
|
|
||||||
'body': '{reason}',
|
|
||||||
'group': 'system',
|
|
||||||
'default_enabled': True,
|
|
||||||
},
|
|
||||||
'service_fail': {
|
'service_fail': {
|
||||||
'title': '{hostname}: Service failed - {service_name}',
|
'title': '{hostname}: Service failed - {service_name}',
|
||||||
'body': 'Service {service_name} has failed.\n{reason}',
|
'body': 'Service {service_name} has failed.\n{reason}',
|
||||||
|
|||||||
Reference in New Issue
Block a user