#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ ProxMenux OCI Manager Manages deployment and lifecycle of OCI container applications using Proxmox VE 9.1+ native LXC from OCI images functionality. Usage: - As library: import oci_manager; oci_manager.deploy_app(...) - As CLI: python oci_manager.py deploy --app-id secure-gateway --config '{...}' """ import base64 import json import logging import os import re import secrets import shutil import subprocess import time from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple # Note: We use a simple XOR-based encryption for local token storage # This avoids the cryptography dependency which has Python version compatibility issues # (PyO3 modules compiled for specific Python versions cause ImportError on different versions) ENCRYPTION_AVAILABLE = False # Logging logger = logging.getLogger("proxmenux.oci") # ================================================================= # Paths # ================================================================= OCI_BASE_DIR = "/usr/local/share/proxmenux/oci" CATALOG_FILE = os.path.join(OCI_BASE_DIR, "catalog.json") INSTALLED_FILE = os.path.join(OCI_BASE_DIR, "installed.json") INSTANCES_DIR = os.path.join(OCI_BASE_DIR, "instances") # Source catalog from Scripts (bundled with ProxMenux) SCRIPTS_CATALOG = "/usr/local/share/proxmenux/scripts/oci/catalog.json" DEV_SCRIPTS_CATALOG = os.path.join(os.path.dirname(__file__), "..", "..", "Scripts", "oci", "catalog.json") # Encryption key file ENCRYPTION_KEY_FILE = os.path.join(OCI_BASE_DIR, ".encryption_key") # Default storage for templates DEFAULT_STORAGE = "local" # Default storage for rootfs (will be auto-detected) DEFAULT_ROOTFS_STORAGE = "local-lvm" # VMID range for OCI containers (9000-9999 to avoid conflicts) OCI_VMID_START = 9000 OCI_VMID_END = 9999 # ================================================================= # Encryption Functions for Sensitive Data # ================================================================= def _get_or_create_encryption_key() -> bytes: """Get or create the encryption key for sensitive data.""" if os.path.exists(ENCRYPTION_KEY_FILE): with open(ENCRYPTION_KEY_FILE, 'rb') as f: return f.read() # Generate a 32-byte random key key = secrets.token_bytes(32) os.makedirs(os.path.dirname(ENCRYPTION_KEY_FILE), exist_ok=True) with open(ENCRYPTION_KEY_FILE, 'wb') as f: f.write(key) os.chmod(ENCRYPTION_KEY_FILE, 0o600) return key def encrypt_sensitive_value(value: str) -> str: """ Encrypt a sensitive value using XOR with a random key. Returns base64-encoded string with 'ENC:' prefix. Note: Uses XOR encryption which is sufficient for local token storage and avoids cryptography library compatibility issues across Python versions. """ if not value: return value key = _get_or_create_encryption_key() value_bytes = value.encode() encrypted = bytes(v ^ key[i % len(key)] for i, v in enumerate(value_bytes)) return "ENC:" + base64.b64encode(encrypted).decode() def decrypt_sensitive_value(encrypted: str) -> str: """Decrypt a sensitive value.""" if not encrypted or not encrypted.startswith("ENC:"): return encrypted encrypted_data = encrypted[4:] key = _get_or_create_encryption_key() try: encrypted_bytes = base64.b64decode(encrypted_data) decrypted = bytes(v ^ key[i % len(key)] for i, v in enumerate(encrypted_bytes)) return decrypted.decode() except Exception as e: logger.error(f"Failed to decrypt value: {e}") return encrypted def encrypt_config_sensitive_fields(config: Dict[str, Any], schema: Dict[str, Any]) -> Dict[str, Any]: """Encrypt sensitive fields in config based on schema.""" encrypted_config = config.copy() for field_name, field_schema in schema.items(): if field_schema.get("sensitive") and field_name in encrypted_config: value = encrypted_config[field_name] if value and not str(value).startswith("ENC:"): encrypted_config[field_name] = encrypt_sensitive_value(str(value)) return encrypted_config def decrypt_config_sensitive_fields(config: Dict[str, Any], schema: Dict[str, Any]) -> Dict[str, Any]: """Decrypt sensitive fields in config based on schema.""" decrypted_config = config.copy() for field_name, field_schema in schema.items(): if field_schema.get("sensitive") and field_name in decrypted_config: value = decrypted_config[field_name] if value and str(value).startswith("ENC:"): decrypted_config[field_name] = decrypt_sensitive_value(str(value)) return decrypted_config # ================================================================= # Directory Management # ================================================================= def ensure_oci_directories(): """Ensure OCI directories exist and catalog is available.""" os.makedirs(OCI_BASE_DIR, exist_ok=True) os.makedirs(INSTANCES_DIR, exist_ok=True) if not os.path.exists(CATALOG_FILE): if os.path.exists(SCRIPTS_CATALOG): shutil.copy2(SCRIPTS_CATALOG, CATALOG_FILE) elif os.path.exists(DEV_SCRIPTS_CATALOG): shutil.copy2(DEV_SCRIPTS_CATALOG, CATALOG_FILE) if not os.path.exists(INSTALLED_FILE): with open(INSTALLED_FILE, 'w') as f: json.dump({"version": "1.0.0", "instances": {}}, f, indent=2) # ================================================================= # Proxmox VE Detection and Compatibility # ================================================================= def check_proxmox_version() -> Dict[str, Any]: """Check Proxmox VE version and OCI support.""" result = { "is_proxmox": False, "version": None, "oci_support": False, "error": None } try: # Check if pveversion exists if not shutil.which("pveversion"): result["error"] = "Not running on Proxmox VE" return result proc = subprocess.run( ["pveversion"], capture_output=True, text=True, timeout=5 ) if proc.returncode != 0: result["error"] = "Failed to get Proxmox version" return result # Parse version: "pve-manager/9.1-2/abc123..." version_str = proc.stdout.strip() result["is_proxmox"] = True match = re.search(r'pve-manager/(\d+)\.(\d+)', version_str) if match: major = int(match.group(1)) minor = int(match.group(2)) result["version"] = f"{major}.{minor}" # OCI support requires Proxmox VE 9.1+ result["oci_support"] = (major > 9) or (major == 9 and minor >= 1) # Also check if skopeo is available (required for OCI) if result["oci_support"] and not shutil.which("skopeo"): result["oci_support"] = False result["error"] = "skopeo not found. Install with: apt install skopeo" if not result["oci_support"] and not result.get("error"): result["error"] = f"OCI support requires Proxmox VE 9.1+, found {result['version']}" except Exception as e: result["error"] = str(e) return result def _run_pve_cmd(cmd: List[str], timeout: int = 60) -> Tuple[int, str, str]: """Run a Proxmox VE command.""" try: proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) return proc.returncode, proc.stdout.strip(), proc.stderr.strip() except subprocess.TimeoutExpired: return -1, "", "Command timed out" except Exception as e: return -1, "", str(e) # ================================================================= # VMID Management # ================================================================= def _get_next_vmid() -> int: """Get next available VMID in OCI range (9000-9999).""" try: # Get list of all VMIDs in use rc, out, _ = _run_pve_cmd(["pvesh", "get", "/cluster/resources", "--type", "vm", "--output-format", "json"]) if rc != 0: # Fallback: try pct list rc, out, _ = _run_pve_cmd(["pct", "list"]) used_vmids = set() if rc == 0: for line in out.splitlines()[1:]: # Skip header parts = line.split() if parts: try: used_vmids.add(int(parts[0])) except ValueError: pass else: resources = json.loads(out) used_vmids = {r.get("vmid") for r in resources if r.get("vmid")} # Find first available in OCI range for vmid in range(OCI_VMID_START, OCI_VMID_END + 1): if vmid not in used_vmids: return vmid raise RuntimeError("No available VMID in OCI range (9000-9999)") except Exception as e: logger.error(f"Failed to get next VMID: {e}") # Return a high number and hope for the best return OCI_VMID_START + int(time.time()) % 1000 def _get_vmid_for_app(app_id: str) -> Optional[int]: """Get the VMID for an installed app.""" installed = _load_installed() instance = installed.get("instances", {}).get(app_id) return instance.get("vmid") if instance else None def get_available_storages() -> List[Dict[str, Any]]: """Get list of storages available for LXC rootfs. Returns storages that support 'images' or 'rootdir' content types, which are required for LXC container disks. """ storages = [] try: # Use pvesm status to get all storages (parse text output for compatibility) rc, out, err = _run_pve_cmd(["pvesm", "status"]) if rc != 0: logger.error(f"Failed to get storage status: {err}") return storages # Parse text output: Name Type Status Total Used Available % # Example: local-lvm lvmthin active 464031744 15491072 448540672 3.34% lines = out.strip().split('\n') for line in lines[1:]: # Skip header parts = line.split() if len(parts) < 4: continue storage_name = parts[0] storage_type = parts[1] status = parts[2] # Skip if storage is not active if status != "active": continue # Get capacity info total_bytes = int(parts[3]) * 1024 if len(parts) > 3 and parts[3].isdigit() else 0 used_bytes = int(parts[4]) * 1024 if len(parts) > 4 and parts[4].isdigit() else 0 avail_bytes = int(parts[5]) * 1024 if len(parts) > 5 and parts[5].isdigit() else total_bytes - used_bytes # Check if storage supports rootdir/images content (for LXC) # Read from /etc/pve/storage.cfg directly for compatibility content = "" try: with open("/etc/pve/storage.cfg", "r") as f: cfg_content = f.read() # Find the storage section import re pattern = rf'{storage_type}:\s*{re.escape(storage_name)}\s*\n((?:\s+[^\n]+\n)*)' match = re.search(pattern, cfg_content) if match: section = match.group(1) content_match = re.search(r'content\s+(\S+)', section) if content_match: content = content_match.group(1) except Exception: # Fallback: assume block storages support images if storage_type in ("lvm", "lvmthin", "zfspool", "rbd"): content = "images,rootdir" elif storage_type in ("dir", "nfs", "cifs", "btrfs"): content = "rootdir,images" # Storage must support "images" or "rootdir" for LXC rootfs if "images" in content or "rootdir" in content: # Determine recommendation priority # Prefer: zfspool > lvmthin > btrfs > lvm > others priority = 99 if storage_type == "zfspool": priority = 1 elif storage_type == "lvmthin": priority = 2 elif storage_type == "btrfs": priority = 3 elif storage_type == "lvm": priority = 4 elif storage_type in ("dir", "nfs", "cifs"): priority = 10 storages.append({ "name": storage_name, "type": storage_type, "total": total_bytes, "used": used_bytes, "avail": avail_bytes, "active": True, "enabled": True, "priority": priority, "recommended": False # Will be set after sorting }) # Sort by priority (lower = better), then by available space storages.sort(key=lambda s: (s.get("priority", 99), -s.get("avail", 0))) # Mark the first one as recommended if storages: storages[0]["recommended"] = True except Exception as e: logger.error(f"Failed to get available storages: {e}") return storages def _download_alpine_template(storage: str = DEFAULT_STORAGE) -> bool: """Download the latest Alpine LXC template using pveam.""" print("[*] Downloading Alpine Linux template...") logger.info("Downloading Alpine template via pveam") # Update template list first rc, out, err = _run_pve_cmd(["pveam", "update"], timeout=60) if rc != 0: logger.warning(f"Failed to update template list: {err}") # Get available Alpine templates rc, out, err = _run_pve_cmd(["pveam", "available", "--section", "system"], timeout=30) if rc != 0: logger.error(f"Failed to list available templates: {err}") return False # Find latest Alpine template alpine_template = None for line in out.strip().split('\n'): if 'alpine-' in line.lower(): parts = line.split() if len(parts) >= 2: alpine_template = parts[1] # Template name is usually second column if not alpine_template: logger.error("No Alpine template found in available templates") return False # Download the template print(f"[*] Downloading template: {alpine_template}") rc, out, err = _run_pve_cmd(["pveam", "download", storage, alpine_template], timeout=300) if rc != 0: logger.error(f"Failed to download template: {err}") return False print("[+] Alpine template downloaded successfully") logger.info(f"Successfully downloaded Alpine template: {alpine_template}") return True def _find_alpine_template(storage: str = DEFAULT_STORAGE, auto_download: bool = True) -> Optional[str]: """Find an available Alpine LXC template, optionally downloading if not found.""" template_dir = "/var/lib/vz/template/cache" # Try to get correct path from storage config rc, out, _ = _run_pve_cmd(["pvesm", "path", f"{storage}:vztmpl/test"]) if rc == 0 and out.strip(): template_dir = os.path.dirname(out.strip()) # Look for Alpine templates try: templates = os.listdir(template_dir) alpine_templates = [t for t in templates if t.startswith("alpine-") and t.endswith((".tar.xz", ".tar.gz", ".tar"))] if alpine_templates: # Sort to get latest version alpine_templates.sort(reverse=True) return f"{storage}:vztmpl/{alpine_templates[0]}" except Exception as e: logger.error(f"Failed to find Alpine template: {e}") # Template not found - try to download it automatically if auto_download: logger.info("Alpine template not found, attempting to download...") if _download_alpine_template(storage): # Try to find it again after download return _find_alpine_template(storage, auto_download=False) return None def _install_packages_in_lxc(vmid: int, packages: List[str], install_method: str = "apk") -> bool: """Install packages inside an LXC container.""" if not packages: return True print(f"[*] Installing packages: {', '.join(packages)}") if install_method == "apk": # Alpine Linux cmd = ["pct", "exec", str(vmid), "--", "apk", "add"] + packages elif install_method == "apt": # Debian/Ubuntu _run_pve_cmd(["pct", "exec", str(vmid), "--", "apt-get", "update"], timeout=120) cmd = ["pct", "exec", str(vmid), "--", "apt-get", "install", "-y"] + packages else: logger.error(f"Unknown install method: {install_method}") return False rc, out, err = _run_pve_cmd(cmd, timeout=300) if rc != 0: logger.error(f"Failed to install packages: {err}") return False print(f"[OK] Packages installed") return True def _enable_services_in_lxc(vmid: int, services: List[str], install_method: str = "apk") -> bool: """Enable and start services inside an LXC container.""" if not services: return True print(f"[*] Enabling services: {', '.join(services)}") for service in services: if install_method == "apk": # Alpine uses OpenRC _run_pve_cmd(["pct", "exec", str(vmid), "--", "rc-update", "add", service], timeout=30) _run_pve_cmd(["pct", "exec", str(vmid), "--", "service", service, "start"], timeout=30) else: # Debian/Ubuntu uses systemd _run_pve_cmd(["pct", "exec", str(vmid), "--", "systemctl", "enable", service], timeout=30) _run_pve_cmd(["pct", "exec", str(vmid), "--", "systemctl", "start", service], timeout=30) print(f"[OK] Services enabled") return True def _configure_tailscale(vmid: int, config: Dict[str, Any]) -> bool: """Configure Tailscale inside the container.""" auth_key = config.get("auth_key", "") if not auth_key: logger.warning("No auth_key provided for Tailscale") return False print(f"[*] Configuring Tailscale...") # Build tailscale up command ts_cmd = ["tailscale", "up", f"--authkey={auth_key}"] hostname = config.get("hostname") if hostname: ts_cmd.append(f"--hostname={hostname}") advertise_routes = config.get("advertise_routes") if advertise_routes: if isinstance(advertise_routes, list): advertise_routes = ",".join(advertise_routes) ts_cmd.append(f"--advertise-routes={advertise_routes}") if config.get("exit_node"): ts_cmd.append("--advertise-exit-node") if config.get("accept_routes"): ts_cmd.append("--accept-routes") # Run tailscale up rc, out, err = _run_pve_cmd(["pct", "exec", str(vmid), "--"] + ts_cmd, timeout=60) if rc != 0: logger.error(f"Tailscale configuration failed: {err}") print(f"[!] Tailscale error: {err}") return False print(f"[OK] Tailscale configured") return True # ================================================================= # OCI Image Management # ================================================================= def pull_oci_image(image: str, tag: str = "latest", storage: str = DEFAULT_STORAGE) -> Dict[str, Any]: """ Pull an OCI image from a registry and store as LXC template. Uses Proxmox's pvesh API to download OCI images (same as GUI). Args: image: Image name (e.g., "docker.io/tailscale/tailscale") tag: Image tag (e.g., "stable") storage: Proxmox storage to save template Returns: Dict with success status and template path """ result = { "success": False, "message": "", "template": None } # Check Proxmox OCI support pve_info = check_proxmox_version() if not pve_info["oci_support"]: result["message"] = pve_info.get("error", "OCI not supported") return result # Normalize image name - ensure full registry path if not image.startswith(("docker.io/", "ghcr.io/", "quay.io/", "registry.")): image = f"docker.io/{image}" # For docker.io, library images need explicit library/ prefix parts = image.split("/") if parts[0] == "docker.io" and len(parts) == 2: image = f"docker.io/library/{parts[1]}" full_ref = f"{image}:{tag}" logger.info(f"Pulling OCI image: {full_ref}") print(f"[*] Pulling OCI image: {full_ref}") # Create a safe filename from the image reference # e.g., docker.io/tailscale/tailscale:stable -> tailscale-tailscale-stable.tar # Note: Use .tar extension (not .tar.zst) - skopeo creates uncompressed tar filename = image.replace("docker.io/", "").replace("ghcr.io/", "").replace("library/", "").replace("/", "-") filename = f"{filename}-{tag}.tar" # Get hostname for API hostname = os.uname().nodename # Use Proxmox's pvesh API to download the OCI image # This is exactly what the GUI does rc, out, err = _run_pve_cmd([ "pvesh", "create", f"/nodes/{hostname}/storage/{storage}/download-url", "--content", "vztmpl", "--filename", filename, "--url", f"docker://{full_ref}" ], timeout=600) if rc != 0: # Fallback: try direct skopeo if pvesh API fails logger.warning(f"pvesh download failed: {err}, trying skopeo fallback") if not shutil.which("skopeo"): result["message"] = f"Failed to pull image via API: {err}" return result # Get template directory template_dir = "/var/lib/vz/template/cache" rc2, out2, _ = _run_pve_cmd(["pvesm", "path", f"{storage}:vztmpl/test"]) if rc2 == 0 and out2.strip(): template_dir = os.path.dirname(out2.strip()) template_path = os.path.join(template_dir, filename) # Use skopeo with docker-archive format (works with multi-layer images in Proxmox 9.1) # Note: oci-archive fails with multi-layer images, docker-archive works try: proc = subprocess.run( ["skopeo", "copy", "--override-os", "linux", "--override-arch", "amd64", f"docker://{full_ref}", f"docker-archive:{template_path}"], capture_output=True, text=True, timeout=600 ) if proc.returncode != 0: result["message"] = f"Failed to pull image: {proc.stderr}" logger.error(f"skopeo copy failed: {proc.stderr}") return result except subprocess.TimeoutExpired: result["message"] = "Image pull timed out after 10 minutes" return result except Exception as e: result["message"] = f"Failed to pull image: {e}" logger.error(f"Pull failed: {e}") return result # Template was created via API or skopeo result["success"] = True result["template"] = f"{storage}:vztmpl/{filename}" result["message"] = "Image pulled successfully" print(f"[OK] Image pulled: {result['template']}") return result # ================================================================= # Catalog Management # ================================================================= def load_catalog() -> Dict[str, Any]: """Load the OCI app catalog.""" ensure_oci_directories() for path in [CATALOG_FILE, SCRIPTS_CATALOG, DEV_SCRIPTS_CATALOG]: if os.path.exists(path): try: with open(path, 'r') as f: return json.load(f) except Exception as e: logger.error(f"Failed to load catalog from {path}: {e}") return {"version": "1.0.0", "apps": {}} def get_app_definition(app_id: str) -> Optional[Dict[str, Any]]: """Get the definition for a specific app.""" catalog = load_catalog() return catalog.get("apps", {}).get(app_id) def list_available_apps() -> List[Dict[str, Any]]: """List all available apps from the catalog.""" catalog = load_catalog() apps = [] for app_id, app_def in catalog.get("apps", {}).items(): apps.append({ "id": app_id, "name": app_def.get("name", app_id), "short_name": app_def.get("short_name", app_def.get("name", app_id)), "category": app_def.get("category", "uncategorized"), "subcategory": app_def.get("subcategory", ""), "icon": app_def.get("icon", "box"), "color": app_def.get("color", "#6366F1"), "summary": app_def.get("summary", ""), "installed": is_installed(app_id) }) return apps # ================================================================= # Installed Apps Management # ================================================================= def _load_installed() -> Dict[str, Any]: """Load the installed apps registry.""" ensure_oci_directories() if not os.path.exists(INSTALLED_FILE): return {"version": "1.0.0", "instances": {}} try: with open(INSTALLED_FILE, 'r') as f: return json.load(f) except Exception: return {"version": "1.0.0", "instances": {}} def _save_installed(data: Dict[str, Any]) -> bool: """Save the installed apps registry.""" try: with open(INSTALLED_FILE, 'w') as f: json.dump(data, f, indent=2) return True except Exception as e: logger.error(f"Failed to save installed: {e}") return False def is_installed(app_id: str) -> bool: """Check if an app is installed.""" installed = _load_installed() return app_id in installed.get("instances", {}) def list_installed_apps() -> List[Dict[str, Any]]: """List all installed apps with their status.""" installed = _load_installed() apps = [] for app_id, instance in installed.get("instances", {}).items(): status = get_app_status(app_id) apps.append({ "id": app_id, "instance_name": instance.get("instance_name", app_id), "vmid": instance.get("vmid"), "installed_at": instance.get("installed_at"), "installed_by": instance.get("installed_by", "unknown"), "status": status }) return apps def get_installed_app(app_id: str) -> Optional[Dict[str, Any]]: """Get details of an installed app.""" installed = _load_installed() instance = installed.get("instances", {}).get(app_id) if not instance: return None instance["status"] = get_app_status(app_id) return instance # ================================================================= # Container Status # ================================================================= def get_app_status(app_id: str) -> Dict[str, Any]: """Get the current status of an app's LXC container.""" result = { "state": "not_installed", "health": "unknown", "uptime_seconds": 0, "last_check": datetime.now().isoformat() } if not is_installed(app_id): return result vmid = _get_vmid_for_app(app_id) if not vmid: result["state"] = "error" result["health"] = "unknown" return result # Get LXC status using pct rc, out, _ = _run_pve_cmd(["pct", "status", str(vmid)]) if rc != 0: result["state"] = "error" result["health"] = "unhealthy" return result # Parse status: "status: running" or "status: stopped" if "running" in out.lower(): result["state"] = "running" result["health"] = "healthy" # Get uptime rc, out, _ = _run_pve_cmd(["pct", "exec", str(vmid), "--", "cat", "/proc/uptime"]) if rc == 0: try: uptime = float(out.split()[0]) result["uptime_seconds"] = int(uptime) except: pass elif "stopped" in out.lower(): result["state"] = "stopped" result["health"] = "stopped" else: result["state"] = "unknown" return result # ================================================================= # Deployment # ================================================================= def deploy_app(app_id: str, config: Dict[str, Any], installed_by: str = "web") -> Dict[str, Any]: """ Deploy an OCI app as a Proxmox LXC container. Args: app_id: ID of the app from the catalog config: User configuration values (includes 'storage' for rootfs location) installed_by: Source of installation ('web' or 'cli') Returns: Dict with success status and details """ result = { "success": False, "message": "", "app_id": app_id, "vmid": None } # Note: We don't require PVE 9.1+ here because secure-gateway uses standard LXC, # not OCI containers. OCI support will be checked when deploying actual OCI apps. # Get app definition app_def = get_app_definition(app_id) if not app_def: result["message"] = f"App '{app_id}' not found in catalog" return result # Check if already installed if is_installed(app_id): result["message"] = f"App '{app_id}' is already installed" return result container_def = app_def.get("container", {}) container_type = container_def.get("type", "oci") # Get storage for rootfs - from config or auto-detect rootfs_storage = config.get("storage") if not rootfs_storage: # Auto-detect available storage available = get_available_storages() if available: # Use the first one (should be recommended) rootfs_storage = available[0]["name"] logger.info(f"Auto-detected rootfs storage: {rootfs_storage}") else: rootfs_storage = DEFAULT_ROOTFS_STORAGE logger.warning(f"No storage detected, using default: {rootfs_storage}") # Get next available VMID vmid = _get_next_vmid() result["vmid"] = vmid hostname = config.get("hostname", f"proxmenux-{app_id}") logger.info(f"Deploying {app_id} as LXC {vmid}") print(f"[*] Deploying {app_id} as LXC container (VMID: {vmid})") # Determine deployment method: LXC traditional or OCI image if container_type == "lxc": # Use traditional LXC with Alpine template + package installation # This is more reliable than OCI images which have bugs in Proxmox 9.1 template = _find_alpine_template() if not template: result["message"] = "Alpine template not found. Please download it from CT Templates." return result print(f"[*] Using LXC template: {template}") use_oci = False else: # OCI image deployment (may have issues with multi-layer images) image = container_def.get("image", "") if not image: result["message"] = "No container image specified in app definition" return result if ":" in image: image_name, tag = image.rsplit(":", 1) else: image_name, tag = image, "latest" print(f"[*] Pulling OCI image: {image}") pull_result = pull_oci_image(image_name, tag) if not pull_result["success"]: result["message"] = pull_result["message"] return result template = pull_result["template"] use_oci = True # Step 2: Create LXC container print(f"[*] Creating LXC container...") pct_cmd = [ "pct", "create", str(vmid), template, "--hostname", hostname, "--memory", str(container_def.get("memory", 512)), "--cores", str(container_def.get("cores", 1)), "--rootfs", f"{rootfs_storage}:{container_def.get('disk_size', 4)}", "--unprivileged", "0" if container_def.get("privileged") else "1", "--onboot", "1" ] # Add ostype for OCI containers if use_oci: pct_cmd.extend(["--ostype", "unmanaged"]) # Add features (nesting, etc.) features = container_def.get("features", []) if features: pct_cmd.extend(["--features", ",".join(features)]) # Network configuration - use simple bridge with DHCP pct_cmd.extend(["--net0", "name=eth0,bridge=vmbr0,ip=dhcp"]) # Run pct create rc, out, err = _run_pve_cmd(pct_cmd, timeout=120) if rc != 0: result["message"] = f"Failed to create container: {err}" logger.error(f"pct create failed: {err}") return result # Step 3: Apply extra LXC configuration (for unprivileged containers) lxc_config = container_def.get("lxc_config", []) if lxc_config: conf_file = f"/etc/pve/lxc/{vmid}.conf" try: with open(conf_file, 'a') as f: f.write("\n# ProxMenux OCI extra config\n") for config_line in lxc_config: f.write(f"{config_line}\n") logger.info(f"Applied extra LXC config to {conf_file}") except Exception as e: logger.warning(f"Could not apply extra LXC config: {e}") # Step 3.1: Set HTML description for Proxmox Notes panel html_desc = _generate_html_description(app_def, container_def, hostname) if html_desc: rc, _, err = _run_pve_cmd(["pct", "set", str(vmid), "-description", html_desc]) if rc == 0: logger.info(f"Set HTML description for container {vmid}") else: logger.warning(f"Could not set description: {err}") # Step 4: Configure environment variables (only for OCI containers) if use_oci: env_vars = [] for env in container_def.get("environment", []): env_name = env.get("name", "") env_value = env.get("value", "") if env_value.startswith("$"): config_key = env_value[1:] env_value = config.get(config_key, env.get("default", "")) if env_name and env_value: env_vars.append(f"{env_name}={env_value}") if env_vars: for env in env_vars: _run_pve_cmd(["pct", "set", str(vmid), f"--lxc.environment", env]) # Step 5: Enable IP forwarding if needed (for VPN containers) if container_def.get("requires_ip_forward"): _enable_host_ip_forwarding() # Step 6: Start the container print(f"[*] Starting container...") rc, _, err = _run_pve_cmd(["pct", "start", str(vmid)]) if rc != 0: result["message"] = f"Container created but failed to start: {err}" logger.error(f"pct start failed: {err}") return result # Step 7: For LXC containers, install packages and configure if not use_oci: packages = container_def.get("packages", []) install_method = container_def.get("install_method", "apk") if packages: if not _install_packages_in_lxc(vmid, packages, install_method): result["message"] = "Container created but package installation failed" return result services = container_def.get("services", []) if services: _enable_services_in_lxc(vmid, services, install_method) # Special handling for Tailscale if "tailscale" in packages: _configure_tailscale(vmid, config) # Step 8: Save instance data instance_dir = os.path.join(INSTANCES_DIR, app_id) os.makedirs(instance_dir, exist_ok=True) config_file = os.path.join(instance_dir, "config.json") config_schema = app_def.get("config_schema", {}) encrypted_config = encrypt_config_sensitive_fields(config, config_schema) with open(config_file, 'w') as f: json.dump({ "app_id": app_id, "vmid": vmid, "created_at": datetime.now().isoformat(), "values": encrypted_config }, f, indent=2) os.chmod(config_file, 0o600) # Update installed registry installed = _load_installed() installed["instances"][app_id] = { "app_id": app_id, "vmid": vmid, "instance_name": hostname, "installed_at": datetime.now().isoformat(), "installed_by": installed_by, "template": template, "container_type": container_type } _save_installed(installed) result["success"] = True result["message"] = f"App deployed successfully as LXC {vmid}" print(f"[OK] Container {vmid} ({hostname}) deployed successfully!") return result def _generate_html_description(app_def: Dict, container_def: Dict, hostname: str) -> str: """ Generate HTML description for Proxmox Notes panel. Tries to use the shared description_templates module if available, otherwise falls back to inline implementation. """ try: # Try to import the shared templates module import sys templates_path = Path("/usr/local/share/proxmenux/scripts/oci") if templates_path.exists(): sys.path.insert(0, str(templates_path)) from description_templates import generate_description return generate_description(app_def, container_def, hostname) except ImportError: pass # Fallback: inline implementation import urllib.parse app_name = app_def.get("name", "ProxMenux App") app_subtitle = app_def.get("subtitle", app_def.get("short_name", "")) app_color = app_def.get("color", "#0EA5E9") app_icon_type = app_def.get("icon_type", "default") doc_url = app_def.get("documentation_url", "https://macrimi.github.io/ProxMenux/") code_url = app_def.get("code_url", "https://github.com/MacRimi/ProxMenux") installer_url = app_def.get("installer_url", "") # Generate icon based on type if app_icon_type == "shield": icon_svg = f"""""" else: icon_svg = f"""""" icon_data = "data:image/svg+xml," + urllib.parse.quote(icon_svg) # Build badges badges = [ f"Docs", f"Code", ] if installer_url: badges.append(f"Installer") badges.append("Ko-fi") badges_html = "\n".join(badges) html_desc = f"""
ProxMenux Logo

{app_name}

Created with ProxMenux

Icon {app_name}
{app_subtitle}

{badges_html}

Hostname: {hostname}

""" return html_desc def _enable_host_ip_forwarding() -> bool: """Enable IP forwarding on the Proxmox host.""" logger.info("Enabling IP forwarding on host") print("[*] Enabling IP forwarding...") try: # Enable IPv4 forwarding with open("/proc/sys/net/ipv4/ip_forward", 'w') as f: f.write('1') # Enable IPv6 forwarding ipv6_path = "/proc/sys/net/ipv6/conf/all/forwarding" if os.path.exists(ipv6_path): with open(ipv6_path, 'w') as f: f.write('1') # Make persistent sysctl_d = "/etc/sysctl.d/99-proxmenux-ip-forward.conf" with open(sysctl_d, 'w') as f: f.write("net.ipv4.ip_forward = 1\n") f.write("net.ipv6.conf.all.forwarding = 1\n") subprocess.run(["sysctl", "-p", sysctl_d], capture_output=True) return True except Exception as e: logger.warning(f"Could not enable IP forwarding: {e}") return False # ================================================================= # Container Control # ================================================================= def start_app(app_id: str) -> Dict[str, Any]: """Start an app's LXC container.""" result = {"success": False, "message": ""} vmid = _get_vmid_for_app(app_id) if not vmid: result["message"] = f"App '{app_id}' not found" return result rc, _, err = _run_pve_cmd(["pct", "start", str(vmid)]) if rc == 0: result["success"] = True result["message"] = f"Container {vmid} started" else: result["message"] = f"Failed to start: {err}" return result def stop_app(app_id: str) -> Dict[str, Any]: """Stop an app's LXC container.""" result = {"success": False, "message": ""} vmid = _get_vmid_for_app(app_id) if not vmid: result["message"] = f"App '{app_id}' not found" return result rc, _, err = _run_pve_cmd(["pct", "stop", str(vmid)]) if rc == 0: result["success"] = True result["message"] = f"Container {vmid} stopped" else: result["message"] = f"Failed to stop: {err}" return result def restart_app(app_id: str) -> Dict[str, Any]: """Restart an app's LXC container.""" result = {"success": False, "message": ""} vmid = _get_vmid_for_app(app_id) if not vmid: result["message"] = f"App '{app_id}' not found" return result rc, _, err = _run_pve_cmd(["pct", "reboot", str(vmid)]) if rc == 0: result["success"] = True result["message"] = f"Container {vmid} restarted" else: result["message"] = f"Failed to restart: {err}" return result def remove_app(app_id: str, remove_data: bool = True) -> Dict[str, Any]: """Remove an app's LXC container.""" result = {"success": False, "message": ""} vmid = _get_vmid_for_app(app_id) if not vmid: result["message"] = f"App '{app_id}' not found" return result logger.info(f"Removing app {app_id} (VMID: {vmid})") # Stop if running _run_pve_cmd(["pct", "stop", str(vmid)]) time.sleep(2) # Destroy container rc, _, err = _run_pve_cmd(["pct", "destroy", str(vmid), "--purge"]) if rc != 0: result["message"] = f"Failed to destroy container: {err}" return result # Remove from installed registry installed = _load_installed() if app_id in installed.get("instances", {}): del installed["instances"][app_id] _save_installed(installed) # Remove instance data if remove_data: instance_dir = os.path.join(INSTANCES_DIR, app_id) if os.path.exists(instance_dir): shutil.rmtree(instance_dir) result["success"] = True result["message"] = f"App {app_id} removed successfully" return result def get_app_logs(app_id: str, lines: int = 100) -> Dict[str, Any]: """Get logs from an app's LXC container.""" result = {"success": False, "message": "", "logs": ""} vmid = _get_vmid_for_app(app_id) if not vmid: result["message"] = f"App '{app_id}' not found" return result # Try to get logs from inside container rc, out, err = _run_pve_cmd([ "pct", "exec", str(vmid), "--", "tail", "-n", str(lines), "/var/log/messages" ]) if rc != 0: # Try journalctl rc, out, err = _run_pve_cmd([ "pct", "exec", str(vmid), "--", "journalctl", "-n", str(lines), "--no-pager" ]) if rc == 0: result["success"] = True result["logs"] = out else: result["message"] = f"Failed to get logs: {err}" result["logs"] = err return result # ================================================================= # Network Detection # ================================================================= def detect_host_networks() -> List[Dict[str, Any]]: """Detect available networks on the Proxmox host.""" networks = [] try: # Get bridges from Proxmox rc, out, _ = _run_pve_cmd(["pvesh", "get", "/nodes/localhost/network", "--output-format", "json"]) if rc == 0: ifaces = json.loads(out) for iface in ifaces: if iface.get("type") == "bridge": networks.append({ "interface": iface.get("iface", ""), "type": "bridge", "address": iface.get("address", ""), "cidr": iface.get("cidr", ""), "recommended": True }) except Exception as e: logger.error(f"Failed to detect networks: {e}") return networks # ================================================================= # Network Detection # ================================================================= def detect_networks() -> List[Dict[str, str]]: """ Detect available network interfaces and their subnets. Used for suggesting routes to advertise via Tailscale. Returns: List of dicts with interface name and subnet. """ networks = [] try: # Use ip command to get interfaces and their addresses proc = subprocess.run( ["ip", "-j", "addr", "show"], capture_output=True, text=True ) if proc.returncode == 0: interfaces = json.loads(proc.stdout) for iface in interfaces: name = iface.get("ifname", "") # Skip loopback and virtual interfaces if name in ("lo", "docker0") or name.startswith(("veth", "br-", "tap", "fwbr", "fwpr")): continue # Get IPv4 addresses for addr_info in iface.get("addr_info", []): if addr_info.get("family") == "inet": ip = addr_info.get("local", "") prefix = addr_info.get("prefixlen", 24) if ip and not ip.startswith("127."): # Calculate network address ip_parts = ip.split(".") if len(ip_parts) == 4: # Simple network calculation if prefix >= 24: network = f"{ip_parts[0]}.{ip_parts[1]}.{ip_parts[2]}.0/{prefix}" elif prefix >= 16: network = f"{ip_parts[0]}.{ip_parts[1]}.0.0/{prefix}" else: network = f"{ip_parts[0]}.0.0.0/{prefix}" networks.append({ "interface": name, "subnet": network, "ip": ip }) except Exception as e: logger.error(f"Failed to detect networks: {e}") return networks # ================================================================= # Update Auth Key (for Tailscale re-authentication) # ================================================================= def update_auth_key(app_id: str, auth_key: str) -> Dict[str, Any]: """Update the Tailscale auth key for a running gateway.""" result = {"success": False, "message": "", "app_id": app_id} # Get VMID for the app vmid = _get_vmid_for_app(app_id) if not vmid: result["message"] = f"App {app_id} not found or not installed" return result # Check if container is running status = get_app_status(app_id) if status.get("state") != "running": result["message"] = "Container must be running to update auth key" return result logger.info(f"Updating auth key for {app_id} (VMID: {vmid})") print(f"[*] Updating auth key for {app_id}...") # Run tailscale logout first to clear existing state print(f"[*] Logging out of Tailscale...") _run_pve_cmd(["pct", "exec", str(vmid), "--", "tailscale", "logout"], timeout=30) # Wait a moment for logout to complete import time time.sleep(2) # Run tailscale up with new auth key print(f"[*] Authenticating with new key...") # Load saved config to get original settings config_file = os.path.join(INSTANCES_DIR, app_id, "config.json") config = {} if os.path.exists(config_file): try: with open(config_file) as f: saved_config = json.load(f) config = saved_config.get("values", {}) except: pass # Build tailscale up command ts_cmd = ["tailscale", "up", f"--authkey={auth_key}"] hostname = config.get("hostname") if hostname: ts_cmd.append(f"--hostname={hostname}") advertise_routes = config.get("advertise_routes") if advertise_routes: if isinstance(advertise_routes, list): advertise_routes = ",".join(advertise_routes) ts_cmd.append(f"--advertise-routes={advertise_routes}") if config.get("exit_node"): ts_cmd.append("--advertise-exit-node") if config.get("accept_routes"): ts_cmd.append("--accept-routes") rc, out, err = _run_pve_cmd(["pct", "exec", str(vmid), "--"] + ts_cmd, timeout=60) if rc != 0: logger.error(f"Failed to update auth key: {err}") result["message"] = f"Failed to authenticate: {err}" return result print(f"[OK] Auth key updated successfully") result["success"] = True result["message"] = "Auth key updated successfully" return result # ================================================================= # Runtime Detection (for backward compatibility) # ================================================================= def detect_runtime() -> Dict[str, Any]: """Check if Proxmox OCI support is available.""" pve_info = check_proxmox_version() return { "available": pve_info["oci_support"], "runtime": "proxmox-lxc" if pve_info["oci_support"] else None, "version": pve_info.get("version"), "path": shutil.which("pct"), "error": pve_info.get("error") } # ================================================================= # CLI Interface # ================================================================= if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="ProxMenux OCI Manager") subparsers = parser.add_subparsers(dest="command", help="Commands") # Deploy command deploy_parser = subparsers.add_parser("deploy", help="Deploy an app") deploy_parser.add_argument("--app-id", required=True, help="App ID") deploy_parser.add_argument("--config", default="{}", help="JSON config") # Status command status_parser = subparsers.add_parser("status", help="Get app status") status_parser.add_argument("--app-id", required=True, help="App ID") # Control commands start_parser = subparsers.add_parser("start", help="Start app") start_parser.add_argument("--app-id", required=True) stop_parser = subparsers.add_parser("stop", help="Stop app") stop_parser.add_argument("--app-id", required=True) remove_parser = subparsers.add_parser("remove", help="Remove app") remove_parser.add_argument("--app-id", required=True) # List commands subparsers.add_parser("list", help="List available apps") subparsers.add_parser("installed", help="List installed apps") subparsers.add_parser("runtime", help="Check runtime") args = parser.parse_args() if args.command == "deploy": config = json.loads(args.config) result = deploy_app(args.app_id, config, "cli") print(json.dumps(result, indent=2)) elif args.command == "status": print(json.dumps(get_app_status(args.app_id), indent=2)) elif args.command == "start": print(json.dumps(start_app(args.app_id), indent=2)) elif args.command == "stop": print(json.dumps(stop_app(args.app_id), indent=2)) elif args.command == "remove": print(json.dumps(remove_app(args.app_id), indent=2)) elif args.command == "list": print(json.dumps(list_available_apps(), indent=2)) elif args.command == "installed": print(json.dumps(list_installed_apps(), indent=2)) elif args.command == "runtime": print(json.dumps(detect_runtime(), indent=2)) else: parser.print_help()