Files
ProxMenux/AppImage/scripts/proxmox_storage_monitor.py
2026-03-25 23:45:34 +01:00

246 lines
8.7 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ProxMenux - Proxmox Storage Monitor
Monitors configured Proxmox storages and tracks unavailable storages
"""
import json
import subprocess
import socket
import time
from typing import Dict, List, Any, Optional
class ProxmoxStorageMonitor:
"""Monitor Proxmox storage configuration and status"""
# Cache TTL: 177 seconds (~3 min) - offset to avoid sync with other processes
_CACHE_TTL = 177
def __init__(self):
self.configured_storages: Dict[str, Dict[str, Any]] = {}
self._node_name_cache = {'name': None, 'time': 0}
self._storage_status_cache = {'data': None, 'time': 0}
self._config_cache_time = 0 # Track when config was last loaded
self._load_configured_storages()
def _get_node_name(self) -> str:
"""Get current Proxmox node name (cached)"""
current_time = time.time()
cache = self._node_name_cache
# Return cached result if fresh
if cache['name'] and (current_time - cache['time']) < self._CACHE_TTL:
return cache['name']
try:
result = subprocess.run(
['pvesh', 'get', '/nodes', '--output-format', 'json'],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
nodes = json.loads(result.stdout)
hostname = socket.gethostname()
for node in nodes:
if node.get('node') == hostname:
cache['name'] = hostname
cache['time'] = current_time
return hostname
if nodes:
name = nodes[0].get('node', hostname)
cache['name'] = name
cache['time'] = current_time
return name
return socket.gethostname()
except Exception:
return socket.gethostname()
def _load_configured_storages(self) -> None:
"""Load configured storages from Proxmox configuration"""
try:
local_node = self._get_node_name()
# Read storage configuration from pvesh
result = subprocess.run(
['pvesh', 'get', '/storage', '--output-format', 'json'],
capture_output=True,
text=True,
timeout=5
)
if result.returncode != 0:
return
storages = json.loads(result.stdout)
for storage in storages:
storage_id = storage.get('storage')
if not storage_id:
continue
# Check if storage is enabled for this node
nodes = storage.get('nodes')
if nodes and local_node not in nodes.split(','):
continue
disabled = storage.get('disable', 0)
if disabled == 1:
continue
self.configured_storages[storage_id] = {
'name': storage_id,
'type': storage.get('type', 'unknown'),
'content': storage.get('content', ''),
'path': storage.get('path', ''),
'enabled': True
}
except Exception:
pass
def get_storage_status(self) -> Dict[str, List[Dict[str, Any]]]:
"""
Get storage status, including unavailable storages (cached)
Returns:
{
'available': [...],
'unavailable': [...]
}
"""
current_time = time.time()
cache = self._storage_status_cache
# Return cached result if fresh
if cache['data'] and (current_time - cache['time']) < self._CACHE_TTL:
return cache['data']
try:
local_node = self._get_node_name()
# Get current storage status from pvesh
result = subprocess.run(
['pvesh', 'get', '/cluster/resources', '--type', 'storage', '--output-format', 'json'],
capture_output=True,
text=True,
timeout=10
)
if result.returncode != 0:
return {'available': [], 'unavailable': list(self.configured_storages.values())}
resources = json.loads(result.stdout)
# Track which configured storages are available
available_storages = []
unavailable_storages = []
seen_storage_names = set()
for resource in resources:
node = resource.get('node', '')
# Filter only local node storages
if node != local_node:
continue
name = resource.get('storage', 'unknown')
seen_storage_names.add(name)
storage_type = resource.get('plugintype', 'unknown')
status = resource.get('status', 'unknown')
try:
total = int(resource.get('maxdisk', 0))
used = int(resource.get('disk', 0))
available = total - used if total > 0 else 0
except (ValueError, TypeError):
total = 0
used = 0
available = 0
# Calculate percentage
percent = (used / total * 100) if total > 0 else 0.0
# Convert bytes to GB
total_gb = round(total / (1024**3), 2)
used_gb = round(used / (1024**3), 2)
available_gb = round(available / (1024**3), 2)
storage_info = {
'name': name,
'type': storage_type,
'total': total_gb,
'used': used_gb,
'available': available_gb,
'percent': round(percent, 2),
'node': node
}
# Check if storage is available
if total == 0 or status.lower() != "available":
storage_info['status'] = 'error'
storage_info['status_detail'] = 'unavailable' if total == 0 else status
unavailable_storages.append(storage_info)
else:
storage_info['status'] = 'active'
available_storages.append(storage_info)
# Check for configured storages that are completely missing
for storage_name, storage_config in self.configured_storages.items():
if storage_name not in seen_storage_names:
unavailable_storages.append({
'name': storage_name,
'type': storage_config['type'],
'status': 'error',
'status_detail': 'not_found',
'total': 0,
'used': 0,
'available': 0,
'percent': 0,
'node': local_node
})
result_data = {
'available': available_storages,
'unavailable': unavailable_storages
}
# Cache the result
cache['data'] = result_data
cache['time'] = current_time
return result_data
except Exception:
return {
'available': [],
'unavailable': list(self.configured_storages.values())
}
def get_unavailable_count(self) -> int:
"""Get count of unavailable storages"""
status = self.get_storage_status()
return len(status['unavailable'])
def reload_configuration(self, force: bool = False) -> None:
"""Reload storage configuration from Proxmox (cached)
Args:
force: If True, bypass cache and force reload
"""
current_time = time.time()
# Skip reload if cache is still fresh (unless forced)
if not force and (current_time - self._config_cache_time) < self._CACHE_TTL:
return
self.configured_storages.clear()
self._load_configured_storages()
self._config_cache_time = current_time
# Global instance
proxmox_storage_monitor = ProxmoxStorageMonitor()