feat: add cert resolve command and improve discovery logic
This commit is contained in:
parent
6d2dde9a60
commit
069441d3eb
3 changed files with 118 additions and 28 deletions
|
|
@ -1,8 +1,10 @@
|
||||||
from .ssh import SSHClient
|
from .ssh import SSHClient
|
||||||
|
import fnmatch
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
|
||||||
class CertificateManager:
|
class CertificateManager:
|
||||||
def __init__(self, config):
|
def __init__(self, config):
|
||||||
# Certificate manager is on la-vmh-11 (LXC 11215)
|
|
||||||
node = config.get_node('la-vmh-11')
|
node = config.get_node('la-vmh-11')
|
||||||
if not node:
|
if not node:
|
||||||
raise ValueError("Node 'la-vmh-11' not found in config")
|
raise ValueError("Node 'la-vmh-11' not found in config")
|
||||||
|
|
@ -16,25 +18,50 @@ class CertificateManager:
|
||||||
self.shared_path = "/shared-certs"
|
self.shared_path = "/shared-certs"
|
||||||
|
|
||||||
def exec_cert(self, cmd):
|
def exec_cert(self, cmd):
|
||||||
return self.client.run(f"pct exec {self.lxc_id} -- {cmd}")
|
# Using bash -c to ensure globbing works if needed, though find is better
|
||||||
|
return self.client.run(f"pct exec {self.lxc_id} -- bash -c '{cmd}'")
|
||||||
|
|
||||||
def list_certs(self):
|
def list_certs(self):
|
||||||
res = self.exec_cert(f"ls -lh {self.shared_path}")
|
# Find all .pem files in the root of shared_path
|
||||||
return res.stdout
|
res = self.exec_cert(f"find {self.shared_path} -maxdepth 1 -name \"*.pem\"")
|
||||||
|
files = [os.path.basename(c) for c in res.stdout.strip().split('\n') if c.strip()]
|
||||||
|
return "\n".join(sorted(files))
|
||||||
|
|
||||||
|
def get_sans(self, cert_filename):
|
||||||
|
cmd = f"openssl x509 -text -noout -in {self.shared_path}/{cert_filename}"
|
||||||
|
res = self.exec_cert(cmd)
|
||||||
|
sans = re.findall(r'DNS:([^, \n]+)', res.stdout)
|
||||||
|
return sans
|
||||||
|
|
||||||
|
def resolve_cert_for_domain(self, domain):
|
||||||
|
"""Finds which .pem file covers the given domain"""
|
||||||
|
res = self.exec_cert(f"find {self.shared_path} -maxdepth 1 -name \"*.pem\"")
|
||||||
|
certs = [os.path.basename(c) for c in res.stdout.strip().split('\n') if c.strip()]
|
||||||
|
|
||||||
|
for filename in certs:
|
||||||
|
sans = self.get_sans(filename)
|
||||||
|
for san in sans:
|
||||||
|
# Direct match
|
||||||
|
if domain.lower() == san.lower():
|
||||||
|
return filename
|
||||||
|
# Wildcard match
|
||||||
|
if san.startswith('*.'):
|
||||||
|
# Replace *. with nothing for base domain check, or use fnmatch
|
||||||
|
if fnmatch.fnmatch(domain.lower(), san.lower()):
|
||||||
|
return filename
|
||||||
|
return None
|
||||||
|
|
||||||
def renew(self, force=False):
|
def renew(self, force=False):
|
||||||
script_path = "/root/local-config/infra-cert-mgr/scripts/dynamic-san-manager.sh"
|
script_path = "/root/local-config/infra-cert-mgr/scripts/dynamic-san-manager.sh"
|
||||||
cmd = f"bash {script_path}"
|
cmd = f"bash {script_path}"
|
||||||
if force:
|
if force:
|
||||||
cmd += " --force-update"
|
cmd += " --force-update"
|
||||||
|
|
||||||
res = self.exec_cert(cmd)
|
res = self.exec_cert(cmd)
|
||||||
if res.returncode != 0:
|
if res.returncode != 0:
|
||||||
raise RuntimeError(f"Certificate renewal failed: {res.stderr}")
|
raise RuntimeError(f"Certificate renewal failed: {res.stderr}")
|
||||||
return res.stdout
|
return res.stdout
|
||||||
|
|
||||||
def check_expiry(self):
|
def check_expiry(self):
|
||||||
# Checks expiry of the main wildcard cert
|
|
||||||
cmd = f"openssl x509 -enddate -noout -in {self.shared_path}/loopaware.com.pem"
|
cmd = f"openssl x509 -enddate -noout -in {self.shared_path}/loopaware.com.pem"
|
||||||
res = self.exec_cert(cmd)
|
res = self.exec_cert(cmd)
|
||||||
return res.stdout.strip()
|
return res.stdout.strip()
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ from .cloudflare import CloudflareManager
|
||||||
from .database import DatabaseManager
|
from .database import DatabaseManager
|
||||||
from .cert import CertificateManager
|
from .cert import CertificateManager
|
||||||
import sys
|
import sys
|
||||||
|
import os
|
||||||
|
|
||||||
@click.group()
|
@click.group()
|
||||||
@click.option('--config', help='Path to config file')
|
@click.option('--config', help='Path to config file')
|
||||||
|
|
@ -21,30 +22,45 @@ def cli(ctx, config):
|
||||||
click.echo(f"Error: {e}", err=True)
|
click.echo(f"Error: {e}", err=True)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
@cli.group()
|
@cli.command()
|
||||||
def cert():
|
@click.option('--domain', help='Public domain to remove from HAProxy and DNS')
|
||||||
"""Manage SSL/TLS Certificates"""
|
@click.option('--mac', help='MAC address to remove from DHCP')
|
||||||
pass
|
@click.option('--vmid', type=int, help='LXC VMID to destroy')
|
||||||
|
@click.option('--node', help='Proxmox node for the VMID')
|
||||||
@cert.command(name='list')
|
@click.option('--port-name', help='Router port forward section name to delete')
|
||||||
@click.pass_obj
|
@click.pass_obj
|
||||||
def cert_list(config):
|
def decommission(config, domain, mac, vmid, node, port_name):
|
||||||
mgr = CertificateManager(config)
|
"""Orchestrated removal of a service from all infrastructure layers"""
|
||||||
click.echo(mgr.list_certs())
|
if domain:
|
||||||
|
click.echo(f"Removing Ingress/DNS for {domain}...")
|
||||||
|
try:
|
||||||
|
IngressManager(config).remove(domain)
|
||||||
|
DNSManager(config).remove_dns(domain)
|
||||||
|
except Exception as e:
|
||||||
|
click.echo(f"Warning: Ingress/DNS cleanup failed: {e}")
|
||||||
|
|
||||||
@cert.command(name='status')
|
if mac:
|
||||||
@click.pass_obj
|
click.echo(f"Removing DHCP reservation for {mac}...")
|
||||||
def cert_status(config):
|
try:
|
||||||
mgr = CertificateManager(config)
|
DNSManager(config).remove_host(mac)
|
||||||
click.echo(f"Main Certificate Expiry: {mgr.check_expiry()}")
|
except Exception as e:
|
||||||
|
click.echo(f"Warning: DHCP cleanup failed: {e}")
|
||||||
|
|
||||||
@cert.command(name='renew')
|
if port_name:
|
||||||
@click.option('--force', is_flag=True, help='Force full SAN discovery and renewal')
|
click.echo(f"Removing Router Port Forward {port_name}...")
|
||||||
@click.pass_obj
|
try:
|
||||||
def cert_renew(config, force):
|
RouterManager(config).remove_forward(port_name)
|
||||||
mgr = CertificateManager(config)
|
except Exception as e:
|
||||||
click.echo("Running dynamic SAN manager...")
|
click.echo(f"Warning: Router cleanup failed: {e}")
|
||||||
click.echo(mgr.renew(force))
|
|
||||||
|
if vmid:
|
||||||
|
click.echo(f"Destroying LXC {vmid} on {node or 'default node'}...")
|
||||||
|
try:
|
||||||
|
ProxmoxManager(config, node).delete_lxc(vmid)
|
||||||
|
except Exception as e:
|
||||||
|
click.echo(f"Warning: Proxmox cleanup failed: {e}")
|
||||||
|
|
||||||
|
click.echo("Decommission process complete.")
|
||||||
|
|
||||||
@cli.group()
|
@cli.group()
|
||||||
def db():
|
def db():
|
||||||
|
|
@ -332,6 +348,44 @@ def router_list(config):
|
||||||
for rule in mgr.list():
|
for rule in mgr.list():
|
||||||
click.echo(f"[{rule['section']}] {rule['name']}: {rule['proto']} {rule['port']} -> {rule['dest']}")
|
click.echo(f"[{rule['section']}] {rule['name']}: {rule['proto']} {rule['port']} -> {rule['dest']}")
|
||||||
|
|
||||||
|
@cli.group()
|
||||||
|
def cert():
|
||||||
|
"""Manage SSL/TLS Certificates"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@cert.command(name='list')
|
||||||
|
@click.pass_obj
|
||||||
|
def cert_list(config):
|
||||||
|
mgr = CertificateManager(config)
|
||||||
|
click.echo(mgr.list_certs())
|
||||||
|
|
||||||
|
@cert.command(name='status')
|
||||||
|
@click.pass_obj
|
||||||
|
def cert_status(config):
|
||||||
|
mgr = CertificateManager(config)
|
||||||
|
click.echo(f"Main Certificate Expiry: {mgr.check_expiry()}")
|
||||||
|
|
||||||
|
@cert.command(name='renew')
|
||||||
|
@click.option('--force', is_flag=True, help='Force full SAN discovery and renewal')
|
||||||
|
@click.pass_obj
|
||||||
|
def cert_renew(config, force):
|
||||||
|
mgr = CertificateManager(config)
|
||||||
|
click.echo("Running dynamic SAN manager...")
|
||||||
|
click.echo(mgr.renew(force))
|
||||||
|
|
||||||
|
@cert.command(name='resolve')
|
||||||
|
@click.argument('domain')
|
||||||
|
@click.pass_obj
|
||||||
|
def cert_resolve(config, domain):
|
||||||
|
"""Find the certificate file covering a specific domain"""
|
||||||
|
mgr = CertificateManager(config)
|
||||||
|
cert_file = mgr.resolve_cert_for_domain(domain)
|
||||||
|
if cert_file:
|
||||||
|
click.echo(cert_file)
|
||||||
|
else:
|
||||||
|
click.echo(f"Error: No certificate found covering {domain}", err=True)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
cli(obj={})
|
cli(obj={})
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -100,6 +100,15 @@ def test_cert_cli():
|
||||||
assert res.returncode == 0
|
assert res.returncode == 0
|
||||||
assert "notAfter" in res.stdout
|
assert "notAfter" in res.stdout
|
||||||
|
|
||||||
|
# 3. Resolve
|
||||||
|
res = run_infra(["cert", "resolve", "loopaware.com"])
|
||||||
|
assert res.returncode == 0
|
||||||
|
assert "loopaware.com.pem" in res.stdout
|
||||||
|
|
||||||
|
res = run_infra(["cert", "resolve", "wiki.loopaware.com"])
|
||||||
|
assert res.returncode == 0
|
||||||
|
assert "loopaware.com.pem" in res.stdout
|
||||||
|
|
||||||
def test_ip_discovery():
|
def test_ip_discovery():
|
||||||
res = run_infra(["ip", "next-free"])
|
res = run_infra(["ip", "next-free"])
|
||||||
assert res.returncode == 0
|
assert res.returncode == 0
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue