Update scripts

This commit is contained in:
MacRimi
2026-04-12 20:32:34 +02:00
parent 4fa4bbb08b
commit 4843fae0eb
47 changed files with 8313 additions and 3014 deletions
+590
View File
@@ -1784,6 +1784,270 @@ def is_disk_removable(disk_name):
return False
def _is_system_mount(mountpoint):
"""Check if mountpoint is a critical system path (matching bash scripts logic)."""
system_mounts = ('/', '/boot', '/boot/efi', '/efi', '/usr', '/var', '/etc',
'/lib', '/lib64', '/run', '/proc', '/sys')
if mountpoint in system_mounts:
return True
# Also check if it's under these paths
for prefix in ('/usr/', '/var/', '/lib/', '/lib64/'):
if mountpoint.startswith(prefix):
return True
return False
def _get_zfs_root_pool():
"""Get the ZFS pool containing the root filesystem, if any (matches bash _get_zfs_root_pool)."""
try:
result = subprocess.run(['df', '/'], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
if len(lines) >= 2:
root_fs = lines[1].split()[0]
# A ZFS dataset looks like "rpool/ROOT/pve-1" — not /dev/
if not root_fs.startswith('/dev/') and '/' in root_fs:
return root_fs.split('/')[0]
except Exception:
pass
return None
def _resolve_zfs_entry(entry):
"""
Resolve a ZFS device entry to a base disk name.
Handles: /dev/paths, by-id names, short kernel names (matches bash _resolve_zfs_entry).
"""
path = None
try:
if entry.startswith('/dev/'):
path = os.path.realpath(entry)
elif os.path.exists(f'/dev/disk/by-id/{entry}'):
path = os.path.realpath(f'/dev/disk/by-id/{entry}')
elif os.path.exists(f'/dev/{entry}'):
path = os.path.realpath(f'/dev/{entry}')
if path:
# Get parent disk (base disk without partition number)
result = subprocess.run(
['lsblk', '-no', 'PKNAME', path],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
# If no parent, path is the base disk itself
return os.path.basename(path)
except Exception:
pass
return None
def get_system_disks():
"""
Detect which physical disks are used by the system (Proxmox).
Returns a dict mapping disk names to their system usage info:
{
'sda': {'is_system': True, 'usage': ['root', 'boot']},
'nvme0n1': {'is_system': True, 'usage': ['zfs:rpool']},
}
Detects (matching logic from disk-passthrough.sh, format-disk.sh, disk_host.sh):
- Root filesystem (/) and critical system mounts (/boot, /usr, /var, etc.)
- Boot partition (/boot, /boot/efi)
- Active swap partitions
- ZFS pools (especially root pool - these disks are critical)
- LVM physical volumes (especially 'pve' volume group)
- RAID members (mdadm)
- Disks with Proxmox partition labels
"""
system_disks = {}
def add_usage(disk_name, usage_type):
"""Helper to add a usage type to a disk."""
if not disk_name:
return
# Normalize disk name (strip partition numbers to get base disk)
base_disk = disk_name
if disk_name and disk_name[-1].isdigit():
if 'nvme' in disk_name or 'mmcblk' in disk_name:
# NVMe/eMMC: nvme0n1p1 -> nvme0n1
if 'p' in disk_name:
base_disk = disk_name.rsplit('p', 1)[0]
else:
# SATA/SAS: sda1 -> sda
base_disk = disk_name.rstrip('0123456789')
if base_disk not in system_disks:
system_disks[base_disk] = {'is_system': True, 'usage': []}
if usage_type not in system_disks[base_disk]['usage']:
system_disks[base_disk]['usage'].append(usage_type)
# Get ZFS root pool first (critical - these disks should never be touched)
zfs_root_pool = _get_zfs_root_pool()
try:
# 1. Check mounted filesystems for system-critical mounts
with open('/proc/mounts', 'r') as f:
for line in f:
parts = line.split()
if len(parts) < 2:
continue
device, mountpoint = parts[0], parts[1]
# Skip non-block devices
if not device.startswith('/dev/'):
continue
disk_name = device.replace('/dev/', '').replace('mapper/', '')
# Identify system mountpoints (expanded from bash _is_system_mount)
if _is_system_mount(mountpoint):
if mountpoint == '/':
add_usage(disk_name, 'root')
elif mountpoint.startswith('/boot'):
add_usage(disk_name, 'boot')
else:
add_usage(disk_name, 'system')
except Exception:
pass
try:
# 2. Check active swap partitions
result = subprocess.run(
['swapon', '--noheadings', '--raw', '--show=NAME'],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
device = line.strip()
if device.startswith('/dev/'):
disk_name = device.replace('/dev/', '')
add_usage(disk_name, 'swap')
except Exception:
# Fallback to /proc/swaps
try:
with open('/proc/swaps', 'r') as f:
lines = f.readlines()[1:] # Skip header
for line in lines:
parts = line.split()
if len(parts) >= 1:
device = parts[0]
if device.startswith('/dev/'):
disk_name = device.replace('/dev/', '').replace('mapper/', '')
add_usage(disk_name, 'swap')
except Exception:
pass
try:
# 3. Check ZFS pools using zpool list -v -H (matches bash _build_pool_disks)
result = subprocess.run(
['zpool', 'list', '-v', '-H'],
capture_output=True, text=True, timeout=8
)
if result.returncode == 0:
current_pool = None
for line in result.stdout.split('\n'):
if not line.strip():
continue
parts = line.split()
if not parts:
continue
# First column is pool name or device
entry = parts[0]
# Skip metadata entries
if entry in ('-', 'mirror', 'raidz', 'raidz1', 'raidz2', 'raidz3', 'spare', 'log', 'cache'):
continue
# Check if this is a pool name (pools have no leading whitespace)
if not line.startswith('\t') and not line.startswith(' '):
current_pool = entry
continue
# This is a device entry - resolve it
base_disk = _resolve_zfs_entry(entry)
if base_disk:
pool_label = current_pool or 'unknown'
# Mark root pool specially
if zfs_root_pool and pool_label == zfs_root_pool:
add_usage(base_disk, f'zfs:{pool_label} (root)')
else:
add_usage(base_disk, f'zfs:{pool_label}')
except Exception:
pass
try:
# 4. Check LVM physical volumes
result = subprocess.run(
['pvs', '--noheadings', '-o', 'pv_name,vg_name'],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if line.strip():
parts = line.split()
if len(parts) >= 2:
pv_device = parts[0]
vg_name = parts[1]
if pv_device.startswith('/dev/'):
# Resolve to real path
try:
real_path = os.path.realpath(pv_device)
disk_name = os.path.basename(real_path)
except Exception:
disk_name = pv_device.replace('/dev/', '')
# Proxmox typically uses 'pve' volume group
if 'pve' in vg_name.lower():
add_usage(disk_name, f'lvm:{vg_name} (pve)')
else:
add_usage(disk_name, f'lvm:{vg_name}')
except Exception:
pass
try:
# 5. Check active RAID arrays
if os.path.exists('/proc/mdstat'):
with open('/proc/mdstat', 'r') as f:
content = f.read()
if 'active' in content:
# Parse mdstat for active arrays
for line in content.split('\n'):
if 'active raid' in line:
# Extract device names from line like "md0 : active raid1 sda1[0] sdb1[1]"
parts = line.split()
for part in parts:
if '[' in part:
dev = part.split('[')[0]
add_usage(dev, 'raid')
except Exception:
pass
try:
# 6. Check for disks with Proxmox/system partition labels
result = subprocess.run(
['lsblk', '-o', 'NAME,PARTLABEL', '-n', '-l'],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
parts = line.split(None, 1)
if len(parts) >= 2:
disk_name = parts[0]
partlabel = parts[1].lower() if len(parts) > 1 else ''
# Proxmox-specific and system partition labels
system_labels = ['pve', 'proxmox', 'bios', 'esp', 'efi', 'boot', 'grub']
if any(label in partlabel for label in system_labels):
add_usage(disk_name, 'system-partition')
except Exception:
pass
return system_disks
def get_storage_info():
"""Get storage and disk information"""
try:
@@ -1802,6 +2066,9 @@ def get_storage_info():
physical_disks = {}
total_disk_size_bytes = 0
# Get system disk information (disks used by Proxmox)
system_disks = get_system_disks()
try:
# List all block devices
result = subprocess.run(['lsblk', '-b', '-d', '-n', '-o', 'NAME,SIZE,TYPE'],
@@ -1843,6 +2110,11 @@ def get_storage_info():
conn_type = get_disk_connection_type(disk_name)
removable = is_disk_removable(disk_name)
# Check if this disk is used by the system
sys_info = system_disks.get(disk_name, {})
is_system_disk = sys_info.get('is_system', False)
system_usage = sys_info.get('usage', [])
physical_disks[disk_name] = {
'name': disk_name,
'size': disk_size_kb, # In KB for formatMemory() in Storage Summary
@@ -1866,6 +2138,8 @@ def get_storage_info():
'ssd_life_left': smart_data.get('ssd_life_left'),
'connection_type': conn_type,
'removable': removable,
'is_system_disk': is_system_disk,
'system_usage': system_usage,
}
except Exception as e:
@@ -6075,6 +6349,322 @@ def api_proxmox_storage():
"""Get Proxmox storage information"""
return jsonify(get_proxmox_storage())
# ─── SMART Disk Testing API ───────────────────────────────────────────────────
SMART_DIR = '/usr/local/share/proxmenux/smart'
def _is_nvme(disk_name):
"""Check if disk is NVMe."""
return disk_name.startswith('nvme')
def _get_smart_json_path(disk_name):
"""Get path to SMART JSON file for a disk."""
return os.path.join(SMART_DIR, f"{disk_name}.json")
def _ensure_smart_tools():
"""Check if SMART tools are installed."""
has_smartctl = shutil.which('smartctl') is not None
has_nvme = shutil.which('nvme') is not None
return {'smartctl': has_smartctl, 'nvme': has_nvme}
def _parse_smart_attributes(output_lines):
"""Parse SMART attributes from smartctl output."""
attributes = []
in_attrs = False
for line in output_lines:
if 'ID#' in line and 'ATTRIBUTE_NAME' in line:
in_attrs = True
continue
if in_attrs:
if not line.strip():
break
parts = line.split()
if len(parts) >= 10 and parts[0].isdigit():
attr_id = int(parts[0])
attr_name = parts[1]
value = int(parts[3]) if parts[3].isdigit() else 0
worst = int(parts[4]) if parts[4].isdigit() else 0
threshold = int(parts[5]) if parts[5].isdigit() else 0
raw_value = parts[9] if len(parts) > 9 else ''
# Determine status
status = 'ok'
if threshold > 0 and value <= threshold:
status = 'critical'
elif threshold > 0 and value <= threshold + 10:
status = 'warning'
attributes.append({
'id': attr_id,
'name': attr_name,
'value': value,
'worst': worst,
'threshold': threshold,
'raw_value': raw_value,
'status': status
})
return attributes
@app.route('/api/storage/smart/<disk_name>', methods=['GET'])
@require_auth
def api_smart_status(disk_name):
"""Get SMART status and data for a specific disk."""
try:
# Validate disk name (security)
if not re.match(r'^[a-zA-Z0-9]+$', disk_name):
return jsonify({'error': 'Invalid disk name'}), 400
device = f'/dev/{disk_name}'
if not os.path.exists(device):
return jsonify({'error': 'Device not found'}), 404
tools = _ensure_smart_tools()
result = {
'status': 'idle',
'tools_installed': tools
}
# Check if tools are available
is_nvme = _is_nvme(disk_name)
if is_nvme and not tools['nvme']:
result['error'] = 'nvme-cli not installed'
return jsonify(result)
if not is_nvme and not tools['smartctl']:
result['error'] = 'smartmontools not installed'
return jsonify(result)
# Check for existing JSON file (from previous test)
json_path = _get_smart_json_path(disk_name)
if os.path.exists(json_path):
try:
with open(json_path, 'r') as f:
saved_data = json.load(f)
result['saved_data'] = saved_data
result['saved_timestamp'] = os.path.getmtime(json_path)
except (json.JSONDecodeError, IOError):
pass
# Get current SMART status
if is_nvme:
# NVMe: Check for running test
proc = subprocess.run(
['nvme', 'self-test-log', device],
capture_output=True, text=True, timeout=10
)
if 'in progress' in proc.stdout.lower():
result['status'] = 'running'
result['test_type'] = 'nvme'
# Get smart-log data
proc = subprocess.run(
['nvme', 'smart-log', device],
capture_output=True, text=True, timeout=10
)
if proc.returncode == 0:
lines = proc.stdout.strip().split('\n')
smart_data = {}
for line in lines:
if ':' in line:
key, value = line.split(':', 1)
smart_data[key.strip().lower().replace(' ', '_')] = value.strip()
result['smart_data'] = smart_data
# Check health
crit_warn = smart_data.get('critical_warning', '0')
result['smart_status'] = 'passed' if crit_warn == '0' else 'warning'
else:
# SATA/SAS: Check for running test
proc = subprocess.run(
['smartctl', '-c', device],
capture_output=True, text=True, timeout=10
)
if 'Self-test routine in progress' in proc.stdout or '% of test remaining' in proc.stdout:
result['status'] = 'running'
# Extract progress percentage
match = re.search(r'(\d+)% of test remaining', proc.stdout)
if match:
result['progress'] = 100 - int(match.group(1))
# Get SMART health
proc = subprocess.run(
['smartctl', '-H', device],
capture_output=True, text=True, timeout=10
)
if 'PASSED' in proc.stdout:
result['smart_status'] = 'passed'
elif 'FAILED' in proc.stdout:
result['smart_status'] = 'failed'
else:
result['smart_status'] = 'unknown'
# Get SMART attributes
proc = subprocess.run(
['smartctl', '-A', device],
capture_output=True, text=True, timeout=10
)
if proc.returncode == 0:
attrs = _parse_smart_attributes(proc.stdout.split('\n'))
result['smart_data'] = {'attributes': attrs}
# Get self-test log for last test result
proc = subprocess.run(
['smartctl', '-l', 'selftest', device],
capture_output=True, text=True, timeout=10
)
if proc.returncode == 0:
lines = proc.stdout.split('\n')
for line in lines:
if line.startswith('# ') or line.startswith('# '):
parts = line.split()
if len(parts) >= 5:
test_type = 'short' if 'Short' in line else 'long' if 'Extended' in line or 'Long' in line else 'unknown'
test_status = 'passed' if 'Completed without error' in line else 'failed'
result['last_test'] = {
'type': test_type,
'status': test_status,
'timestamp': ' '.join(parts[-5:-2]) if len(parts) > 5 else 'unknown'
}
break
return jsonify(result)
except subprocess.TimeoutExpired:
return jsonify({'error': 'Command timeout'}), 504
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/storage/smart/<disk_name>/test', methods=['POST'])
@require_auth
def api_smart_run_test(disk_name):
"""Start a SMART self-test on a disk."""
try:
# Validate disk name (security)
if not re.match(r'^[a-zA-Z0-9]+$', disk_name):
return jsonify({'error': 'Invalid disk name'}), 400
device = f'/dev/{disk_name}'
if not os.path.exists(device):
return jsonify({'error': 'Device not found'}), 404
data = request.get_json() or {}
test_type = data.get('test_type', 'short')
if test_type not in ('short', 'long'):
return jsonify({'error': 'Invalid test type. Use "short" or "long"'}), 400
tools = _ensure_smart_tools()
is_nvme = _is_nvme(disk_name)
# Ensure SMART directory exists
os.makedirs(SMART_DIR, exist_ok=True)
json_path = _get_smart_json_path(disk_name)
if is_nvme:
if not tools['nvme']:
return jsonify({'error': 'nvme-cli not installed'}), 400
# NVMe: self-test-code 1=short, 2=long
code = 1 if test_type == 'short' else 2
proc = subprocess.run(
['nvme', 'device-self-test', device, f'--self-test-code={code}'],
capture_output=True, text=True, timeout=30
)
if proc.returncode != 0:
return jsonify({'error': f'Failed to start test: {proc.stderr}'}), 500
# For long test, start background monitor
if test_type == 'long':
subprocess.Popen(
f'''
while nvme device-self-test {device} --self-test-code=0 2>/dev/null | grep -qi 'in progress'; do
sleep 60
done
nvme smart-log -o json {device} > {json_path} 2>/dev/null
''',
shell=True, start_new_session=True,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
else:
if not tools['smartctl']:
return jsonify({'error': 'smartmontools not installed'}), 400
test_flag = '-t short' if test_type == 'short' else '-t long'
proc = subprocess.run(
['smartctl'] + test_flag.split() + [device],
capture_output=True, text=True, timeout=30
)
if proc.returncode not in (0, 4): # 4 = test started successfully
return jsonify({'error': f'Failed to start test: {proc.stderr}'}), 500
# For long test, start background monitor
if test_type == 'long':
subprocess.Popen(
f'''
while smartctl -c {device} 2>/dev/null | grep -qiE 'Self-test routine in progress|[1-9][0-9]?% of test remaining'; do
sleep 60
done
smartctl --json=c {device} > {json_path} 2>/dev/null
''',
shell=True, start_new_session=True,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
return jsonify({
'success': True,
'test_type': test_type,
'device': device,
'message': f'{test_type.capitalize()} test started on {device}'
})
except subprocess.TimeoutExpired:
return jsonify({'error': 'Command timeout'}), 504
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/storage/smart/tools', methods=['GET'])
@require_auth
def api_smart_tools_status():
"""Check if SMART tools are installed."""
tools = _ensure_smart_tools()
return jsonify(tools)
@app.route('/api/storage/smart/tools/install', methods=['POST'])
@require_auth
def api_smart_tools_install():
"""Install SMART tools (smartmontools and nvme-cli)."""
try:
data = request.get_json() or {}
packages = data.get('packages', ['smartmontools', 'nvme-cli'])
results = {}
for pkg in packages:
if pkg not in ('smartmontools', 'nvme-cli'):
results[pkg] = {'success': False, 'error': 'Invalid package name'}
continue
# Update apt cache and install
proc = subprocess.run(
['apt-get', 'install', '-y', pkg],
capture_output=True, text=True, timeout=120
)
results[pkg] = {
'success': proc.returncode == 0,
'output': proc.stdout if proc.returncode == 0 else proc.stderr
}
return jsonify(results)
except subprocess.TimeoutExpired:
return jsonify({'error': 'Installation timeout'}), 504
except Exception as e:
return jsonify({'error': str(e)}), 500
# ─── END SMART API ────────────────────────────────────────────────────────────
@app.route('/api/network', methods=['GET'])
@require_auth
def api_network():