2026-02-05 20:36:15 +01:00
|
|
|
from .ssh import SSHClient
|
2026-02-05 21:04:08 +01:00
|
|
|
import fnmatch
|
|
|
|
|
import os
|
|
|
|
|
import re
|
2026-02-05 20:36:15 +01:00
|
|
|
|
|
|
|
|
class CertificateManager:
|
|
|
|
|
def __init__(self, config):
|
|
|
|
|
node = config.get_node('la-vmh-11')
|
|
|
|
|
if not node:
|
|
|
|
|
raise ValueError("Node 'la-vmh-11' not found in config")
|
|
|
|
|
|
|
|
|
|
self.host = node['host']
|
|
|
|
|
self.password = node.get('pass')
|
|
|
|
|
self.user = config.get('proxmox.user', 'root')
|
|
|
|
|
self.ssh_key = config.get('proxmox.ssh_key_path')
|
|
|
|
|
self.client = SSHClient(self.host, self.user, self.ssh_key, self.password)
|
|
|
|
|
self.lxc_id = "11215"
|
|
|
|
|
self.shared_path = "/shared-certs"
|
|
|
|
|
|
|
|
|
|
def exec_cert(self, cmd):
|
2026-02-05 21:04:08 +01:00
|
|
|
# Using bash -c to ensure globbing works if needed, though find is better
|
|
|
|
|
return self.client.run(f"pct exec {self.lxc_id} -- bash -c '{cmd}'")
|
2026-02-05 20:36:15 +01:00
|
|
|
|
|
|
|
|
def list_certs(self):
|
2026-02-05 21:04:08 +01:00
|
|
|
# Find all .pem files in the root of shared_path
|
|
|
|
|
res = self.exec_cert(f"find {self.shared_path} -maxdepth 1 -name \"*.pem\"")
|
|
|
|
|
files = [os.path.basename(c) for c in res.stdout.strip().split('\n') if c.strip()]
|
|
|
|
|
return "\n".join(sorted(files))
|
|
|
|
|
|
|
|
|
|
def get_sans(self, cert_filename):
|
|
|
|
|
cmd = f"openssl x509 -text -noout -in {self.shared_path}/{cert_filename}"
|
|
|
|
|
res = self.exec_cert(cmd)
|
|
|
|
|
sans = re.findall(r'DNS:([^, \n]+)', res.stdout)
|
|
|
|
|
return sans
|
|
|
|
|
|
|
|
|
|
def resolve_cert_for_domain(self, domain):
|
|
|
|
|
"""Finds which .pem file covers the given domain"""
|
|
|
|
|
res = self.exec_cert(f"find {self.shared_path} -maxdepth 1 -name \"*.pem\"")
|
|
|
|
|
certs = [os.path.basename(c) for c in res.stdout.strip().split('\n') if c.strip()]
|
|
|
|
|
|
|
|
|
|
for filename in certs:
|
|
|
|
|
sans = self.get_sans(filename)
|
|
|
|
|
for san in sans:
|
|
|
|
|
# Direct match
|
|
|
|
|
if domain.lower() == san.lower():
|
|
|
|
|
return filename
|
|
|
|
|
# Wildcard match
|
|
|
|
|
if san.startswith('*.'):
|
|
|
|
|
# Replace *. with nothing for base domain check, or use fnmatch
|
|
|
|
|
if fnmatch.fnmatch(domain.lower(), san.lower()):
|
|
|
|
|
return filename
|
|
|
|
|
return None
|
2026-02-05 20:36:15 +01:00
|
|
|
|
|
|
|
|
def renew(self, force=False):
|
|
|
|
|
script_path = "/root/local-config/infra-cert-mgr/scripts/dynamic-san-manager.sh"
|
|
|
|
|
cmd = f"bash {script_path}"
|
|
|
|
|
if force:
|
|
|
|
|
cmd += " --force-update"
|
|
|
|
|
res = self.exec_cert(cmd)
|
|
|
|
|
if res.returncode != 0:
|
|
|
|
|
raise RuntimeError(f"Certificate renewal failed: {res.stderr}")
|
|
|
|
|
return res.stdout
|
|
|
|
|
|
|
|
|
|
def check_expiry(self):
|
|
|
|
|
cmd = f"openssl x509 -enddate -noout -in {self.shared_path}/loopaware.com.pem"
|
|
|
|
|
res = self.exec_cert(cmd)
|
|
|
|
|
return res.stdout.strip()
|