From 069441d3ebda581323a7c3550b58a655a0acec69 Mon Sep 17 00:00:00 2001 From: Fredrick Amnehagen Date: Thu, 5 Feb 2026 21:04:08 +0100 Subject: [PATCH] feat: add cert resolve command and improve discovery logic --- infra_cli/cert.py | 39 ++++++++++++++++--- infra_cli/main.py | 98 ++++++++++++++++++++++++++++++++++++----------- tests/test_cli.py | 9 +++++ 3 files changed, 118 insertions(+), 28 deletions(-) diff --git a/infra_cli/cert.py b/infra_cli/cert.py index 1dead17..80f4473 100644 --- a/infra_cli/cert.py +++ b/infra_cli/cert.py @@ -1,8 +1,10 @@ from .ssh import SSHClient +import fnmatch +import os +import re class CertificateManager: def __init__(self, config): - # Certificate manager is on la-vmh-11 (LXC 11215) node = config.get_node('la-vmh-11') if not node: raise ValueError("Node 'la-vmh-11' not found in config") @@ -16,25 +18,50 @@ class CertificateManager: self.shared_path = "/shared-certs" def exec_cert(self, cmd): - return self.client.run(f"pct exec {self.lxc_id} -- {cmd}") + # Using bash -c to ensure globbing works if needed, though find is better + return self.client.run(f"pct exec {self.lxc_id} -- bash -c '{cmd}'") def list_certs(self): - res = self.exec_cert(f"ls -lh {self.shared_path}") - return res.stdout + # Find all .pem files in the root of shared_path + res = self.exec_cert(f"find {self.shared_path} -maxdepth 1 -name \"*.pem\"") + files = [os.path.basename(c) for c in res.stdout.strip().split('\n') if c.strip()] + return "\n".join(sorted(files)) + + def get_sans(self, cert_filename): + cmd = f"openssl x509 -text -noout -in {self.shared_path}/{cert_filename}" + res = self.exec_cert(cmd) + sans = re.findall(r'DNS:([^, \n]+)', res.stdout) + return sans + + def resolve_cert_for_domain(self, domain): + """Finds which .pem file covers the given domain""" + res = self.exec_cert(f"find {self.shared_path} -maxdepth 1 -name \"*.pem\"") + certs = [os.path.basename(c) for c in res.stdout.strip().split('\n') if c.strip()] + + for filename in certs: + sans = self.get_sans(filename) + for san in sans: + # Direct match + if domain.lower() == san.lower(): + return filename + # Wildcard match + if san.startswith('*.'): + # Replace *. with nothing for base domain check, or use fnmatch + if fnmatch.fnmatch(domain.lower(), san.lower()): + return filename + return None def renew(self, force=False): script_path = "/root/local-config/infra-cert-mgr/scripts/dynamic-san-manager.sh" cmd = f"bash {script_path}" if force: cmd += " --force-update" - res = self.exec_cert(cmd) if res.returncode != 0: raise RuntimeError(f"Certificate renewal failed: {res.stderr}") return res.stdout def check_expiry(self): - # Checks expiry of the main wildcard cert cmd = f"openssl x509 -enddate -noout -in {self.shared_path}/loopaware.com.pem" res = self.exec_cert(cmd) return res.stdout.strip() diff --git a/infra_cli/main.py b/infra_cli/main.py index 6683bf9..a34dc2c 100644 --- a/infra_cli/main.py +++ b/infra_cli/main.py @@ -9,6 +9,7 @@ from .cloudflare import CloudflareManager from .database import DatabaseManager from .cert import CertificateManager import sys +import os @click.group() @click.option('--config', help='Path to config file') @@ -21,30 +22,45 @@ def cli(ctx, config): click.echo(f"Error: {e}", err=True) sys.exit(1) -@cli.group() -def cert(): - """Manage SSL/TLS Certificates""" - pass - -@cert.command(name='list') +@cli.command() +@click.option('--domain', help='Public domain to remove from HAProxy and DNS') +@click.option('--mac', help='MAC address to remove from DHCP') +@click.option('--vmid', type=int, help='LXC VMID to destroy') +@click.option('--node', help='Proxmox node for the VMID') +@click.option('--port-name', help='Router port forward section name to delete') @click.pass_obj -def cert_list(config): - mgr = CertificateManager(config) - click.echo(mgr.list_certs()) +def decommission(config, domain, mac, vmid, node, port_name): + """Orchestrated removal of a service from all infrastructure layers""" + if domain: + click.echo(f"Removing Ingress/DNS for {domain}...") + try: + IngressManager(config).remove(domain) + DNSManager(config).remove_dns(domain) + except Exception as e: + click.echo(f"Warning: Ingress/DNS cleanup failed: {e}") -@cert.command(name='status') -@click.pass_obj -def cert_status(config): - mgr = CertificateManager(config) - click.echo(f"Main Certificate Expiry: {mgr.check_expiry()}") + if mac: + click.echo(f"Removing DHCP reservation for {mac}...") + try: + DNSManager(config).remove_host(mac) + except Exception as e: + click.echo(f"Warning: DHCP cleanup failed: {e}") -@cert.command(name='renew') -@click.option('--force', is_flag=True, help='Force full SAN discovery and renewal') -@click.pass_obj -def cert_renew(config, force): - mgr = CertificateManager(config) - click.echo("Running dynamic SAN manager...") - click.echo(mgr.renew(force)) + if port_name: + click.echo(f"Removing Router Port Forward {port_name}...") + try: + RouterManager(config).remove_forward(port_name) + except Exception as e: + click.echo(f"Warning: Router cleanup failed: {e}") + + if vmid: + click.echo(f"Destroying LXC {vmid} on {node or 'default node'}...") + try: + ProxmoxManager(config, node).delete_lxc(vmid) + except Exception as e: + click.echo(f"Warning: Proxmox cleanup failed: {e}") + + click.echo("Decommission process complete.") @cli.group() def db(): @@ -332,8 +348,46 @@ def router_list(config): for rule in mgr.list(): click.echo(f"[{rule['section']}] {rule['name']}: {rule['proto']} {rule['port']} -> {rule['dest']}") +@cli.group() +def cert(): + """Manage SSL/TLS Certificates""" + pass + +@cert.command(name='list') +@click.pass_obj +def cert_list(config): + mgr = CertificateManager(config) + click.echo(mgr.list_certs()) + +@cert.command(name='status') +@click.pass_obj +def cert_status(config): + mgr = CertificateManager(config) + click.echo(f"Main Certificate Expiry: {mgr.check_expiry()}") + +@cert.command(name='renew') +@click.option('--force', is_flag=True, help='Force full SAN discovery and renewal') +@click.pass_obj +def cert_renew(config, force): + mgr = CertificateManager(config) + click.echo("Running dynamic SAN manager...") + click.echo(mgr.renew(force)) + +@cert.command(name='resolve') +@click.argument('domain') +@click.pass_obj +def cert_resolve(config, domain): + """Find the certificate file covering a specific domain""" + mgr = CertificateManager(config) + cert_file = mgr.resolve_cert_for_domain(domain) + if cert_file: + click.echo(cert_file) + else: + click.echo(f"Error: No certificate found covering {domain}", err=True) + sys.exit(1) + def main(): cli(obj={}) if __name__ == "__main__": - main() + main() \ No newline at end of file diff --git a/tests/test_cli.py b/tests/test_cli.py index 26eee73..2c4456a 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -100,6 +100,15 @@ def test_cert_cli(): assert res.returncode == 0 assert "notAfter" in res.stdout + # 3. Resolve + res = run_infra(["cert", "resolve", "loopaware.com"]) + assert res.returncode == 0 + assert "loopaware.com.pem" in res.stdout + + res = run_infra(["cert", "resolve", "wiki.loopaware.com"]) + assert res.returncode == 0 + assert "loopaware.com.pem" in res.stdout + def test_ip_discovery(): res = run_infra(["ip", "next-free"]) assert res.returncode == 0