initial commit: dynamic infra tooling cli

This commit is contained in:
Fredrick Amnehagen 2026-02-05 11:29:34 +01:00
commit db843ceec8
9 changed files with 339 additions and 0 deletions

30
infra_cli/config.py Normal file
View file

@ -0,0 +1,30 @@
import os
import yaml
DEFAULT_CONFIG_PATH = os.path.expanduser("~/.config/loopaware/infra-cli.yaml")
class Config:
def __init__(self, config_path=None):
self.path = config_path or os.environ.get("INFRA_CONFIG") or DEFAULT_CONFIG_PATH
self.data = self._load()
def _load(self):
if not os.path.exists(self.path):
# Fallback to local config if exists
if os.path.exists("config.yaml"):
self.path = "config.yaml"
else:
raise FileNotFoundError(f"Config file not found at {self.path}. Please create it based on config.yaml.example")
with open(self.path, 'r') as f:
return yaml.safe_vars(f) if hasattr(yaml, 'safe_vars') else yaml.safe_load(f)
def get(self, key, default=None):
parts = key.split('.')
val = self.data
for part in parts:
if isinstance(val, dict) and part in val:
val = val[part]
else:
return default
return val

50
infra_cli/dns.py Normal file
View file

@ -0,0 +1,50 @@
from .ssh import SSHClient
class DNSManager:
def __init__(self, config):
self.pve_host = config.get('proxmox.host')
self.pve_user = config.get('proxmox.user', 'root')
self.lxc_id = config.get('proxmox.dnsmasq_lxc_id')
self.ssh_key = config.get('proxmox.ssh_key')
self.client = SSHClient(self.pve_host, self.pve_user, self.ssh_key)
self.hosts_file = "/etc/dnsmasq.d/dynamic-hosts.conf"
self.dns_file = "/etc/dnsmasq.d/dynamic-dns.conf"
def exec_lxc(self, cmd):
return self.client.run(f"pct exec {self.lxc_id} -- {cmd}")
def add_host(self, mac, ip, hostname):
self.exec_lxc(f"touch {self.hosts_file}")
# Check for duplicates
res = self.exec_lxc(f"grep -q -F '{mac}' {self.hosts_file}")
if res.returncode == 0:
raise ValueError(f"MAC {mac} already exists")
self.client.run(f"pct exec {self.lxc_id} -- sh -c \"echo 'dhcp-host={mac},{hostname},{ip}' >> {self.hosts_file}\")
self.reload()
def remove_host(self, mac):
self.exec_lxc(f"sed -i '/{mac}/d' {self.hosts_file}")
self.reload()
def add_dns(self, domain, ip):
self.exec_lxc(f"touch {self.dns_file}")
self.client.run(f"pct exec {self.lxc_id} -- sh -c \"echo 'address=/{domain}/{ip}' >> {self.dns_file}\")
self.reload()
def remove_dns(self, domain):
self.client.run(f"pct exec {self.lxc_id} -- sh -c \"sed -i '\#address=/{domain}/#d' {self.dns_file}\")
self.reload()
def reload(self):
res = self.exec_lxc("dnsmasq --test")
if res.returncode == 0:
self.exec_lxc("systemctl reload dnsmasq")
else:
raise RuntimeError(f"Dnsmasq config test failed: {res.stderr}")
def list(self):
hosts = self.exec_lxc(f"cat {self.hosts_file}").stdout
dns = self.exec_lxc(f"cat {self.dns_file}").stdout
return {"hosts": hosts, "dns": dns}

27
infra_cli/ingress.py Normal file
View file

@ -0,0 +1,27 @@
from .ssh import SSHClient
class IngressManager:
def __init__(self, config):
self.host = config.get('haproxy.host')
self.user = config.get('haproxy.user', 'root')
self.ssh_key = config.get('haproxy.ssh_key')
self.client = SSHClient(self.host, self.user, self.ssh_key)
def add(self, domain, ip, port, https=False):
cmd = f"ingress-manager add {domain} {ip} {port}"
if https:
cmd += " --https"
res = self.client.run(cmd)
if res.returncode != 0:
raise RuntimeError(f"Failed to add ingress: {res.stderr}")
return res.stdout
def remove(self, domain):
res = self.client.run(f"ingress-manager remove {domain}")
if res.returncode != 0:
raise RuntimeError(f"Failed to remove ingress: {res.stderr}")
return res.stdout
def list(self):
res = self.client.run("ingress-manager list")
return res.stdout

82
infra_cli/main.py Normal file
View file

@ -0,0 +1,82 @@
import click
from .config import Config
from .dns import DNSManager
from .ingress import IngressManager
from .router import RouterManager
import sys
@click.group()
@click.option('--config', help='Path to config file')
@click.pass_context
def cli(ctx, config):
"""LoopAware Infrastructure Management CLI"""
try:
ctx.obj = Config(config)
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
@cli.group()
def dns():
"""Manage DNS and DHCP"""
pass
@dns.command(name='add-host')
@click.argument('mac')
@click.argument('ip')
@click.argument('hostname')
@click.pass_obj
def dns_add_host(config, mac, ip, hostname):
mgr = DNSManager(config)
mgr.add_host(mac, ip, hostname)
click.echo(f"Added host {hostname} ({ip})")
@dns.command(name='remove-host')
@click.argument('mac')
@click.pass_obj
def dns_remove_host(config, mac):
mgr = DNSManager(config)
mgr.remove_host(mac)
click.echo(f"Removed host {mac}")
@dns.command(name='list')
@click.pass_obj
def dns_list(config):
mgr = DNSManager(config)
data = mgr.list()
click.echo(data['hosts'])
click.echo(data['dns'])
@cli.group()
def ingress():
"""Manage HAProxy Ingress"""
pass
@ingress.command(name='add')
@click.argument('domain')
@click.argument('ip')
@click.argument('port', type=int)
@click.option('--https', is_flag=True, help='Target uses HTTPS')
@click.pass_obj
def ingress_add(config, domain, ip, port, https):
mgr = IngressManager(config)
mgr.add(domain, ip, port, https)
click.echo(f"Added ingress for {domain}")
@cli.group()
def router():
"""Manage Router Port Forwards"""
pass
@router.command(name='list')
@click.pass_obj
def router_list(config):
mgr = RouterManager(config)
for rule in mgr.list():
click.echo(f"[{rule['section']}] {rule['name']}: {rule['proto']} {rule['port']} -> {rule['dest']}")
def main():
cli(obj={})
if __name__ == "__main__":
main()

58
infra_cli/router.py Normal file
View file

@ -0,0 +1,58 @@
from .ssh import SSHClient
import os
class RouterManager:
def __init__(self, config):
self.host = config.get('router.host')
self.user = config.get('router.user', 'root')
self.ssh_key = config.get('router.ssh_key')
self.password = os.environ.get("ROUTER_PASS")
self.client = SSHClient(self.host, self.user, self.ssh_key, self.password)
def run_uci(self, cmds):
res = self.client.run(cmds)
if res.returncode != 0:
raise RuntimeError(f"UCI command failed: {res.stderr}")
return res
def add_forward(self, name, proto, ext_port, int_ip, int_port):
config_name = name.lower().replace(" ", "_").replace("-", "_")
cmds = f"uci set firewall.{config_name}=redirect; " \
f"uci set firewall.{config_name}.name='{name}'; " \
f"uci set firewall.{config_name}.proto='{proto}'; " \
f"uci set firewall.{config_name}.src='wan'; " \
f"uci set firewall.{config_name}.dest='lan'; " \
f"uci set firewall.{config_name}.src_dport='{ext_port}'; " \
f"uci set firewall.{config_name}.dest_ip='{int_ip}'; " \
f"uci set firewall.{config_name}.dest_port='{int_port}'; " \
f"uci set firewall.{config_name}.target='DNAT'; " \
f"uci commit firewall; " \
f"/etc/init.d/firewall reload"
self.run_uci(cmds)
def remove_forward(self, section_name):
cmds = f"uci delete firewall.{section_name}; uci commit firewall; /etc/init.d/firewall reload"
self.run_uci(cmds)
def list(self):
# Get sections
res = self.client.run("uci show firewall | grep '=redirect' | cut -d. -f2 | cut -d= -f1 | sort | uniq")
sections = res.stdout.strip().split('\n')
results = []
for section in sections:
if not section: continue
name = self.client.run(f"uci get firewall.{section}.name", capture=True).stdout.strip()
proto = self.client.run(f"uci get firewall.{section}.proto", capture=True).stdout.strip()
port = self.client.run(f"uci get firewall.{section}.src_dport", capture=True).stdout.strip()
dest = self.client.run(f"uci get firewall.{section}.dest_ip", capture=True).stdout.strip()
results.append({
"section": section,
"name": name,
"proto": proto,
"port": port,
"dest": dest
})
return results

43
infra_cli/ssh.py Normal file
View file

@ -0,0 +1,43 @@
import subprocess
import os
class SSHClient:
def __init__(self, host, user="root", key_path=None, password=None):
self.host = host
self.user = user
self.key_path = os.path.expanduser(key_path) if key_path else None
self.password = password
def run(self, cmd, capture=True):
ssh_cmd = ["ssh", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null"]
if self.key_path:
ssh_cmd += ["-i", self.key_path]
target = f"{self.user}@{self.host}"
if self.password:
full_cmd = ["sshpass", "-p", self.password] + ssh_cmd + [target, cmd]
else:
full_cmd = ssh_cmd + [target, cmd]
result = subprocess.run(
full_cmd,
capture_output=capture,
text=True
)
return result
def scp_to(self, local_path, remote_path):
scp_cmd = ["scp", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null"]
if self.key_path:
scp_cmd += ["-i", self.key_path]
target = f"{self.user}@{self.host}:{remote_path}"
if self.password:
full_cmd = ["sshpass", "-p", self.password] + scp_cmd + [local_path, target]
else:
full_cmd = scp_cmd + [local_path, target]
return subprocess.run(full_cmd, capture_output=True)