from .ssh import SSHClient class DNSManager: def __init__(self, config): # DNS is on la-vmh-11 (dnsmasq_lxc_id) node = config.get_node('la-vmh-11') if not node: raise ValueError("Node 'la-vmh-11' not found in config") self.pve_host = node['host'] self.pve_user = config.get('proxmox.user', 'root') self.lxc_id = config.get('proxmox.dnsmasq_lxc_id') self.ssh_key = config.get('proxmox.ssh_key_path') self.client = SSHClient(self.pve_host, self.pve_user, self.ssh_key) self.hosts_file = "/etc/dnsmasq.d/dynamic-hosts.conf" self.dns_file = "/etc/dnsmasq.d/dynamic-dns.conf" def exec_lxc(self, cmd): return self.client.run(f"pct exec {self.lxc_id} -- {cmd}") def add_host(self, mac, ip, hostname): self.exec_lxc(f"touch {self.hosts_file}") # Check for duplicates res = self.exec_lxc(f"grep -q -F '{mac}' {self.hosts_file}") if res.returncode == 0: raise ValueError(f"MAC {mac} already exists") cmd = f"sh -c \"echo 'dhcp-host={mac},{hostname},{ip}' >> {self.hosts_file}\"" self.exec_lxc(cmd) self.reload() def remove_host(self, mac): self.exec_lxc(f"sed -i '/{mac}/d' {self.hosts_file}") self.reload() def add_dns(self, domain, ip): self.exec_lxc(f"touch {self.dns_file}") cmd = f"sh -c \"echo 'address=/{domain}/{ip}' >> {self.dns_file}\"" self.exec_lxc(cmd) self.reload() def remove_dns(self, domain): cmd = f"sh -c \"sed -i '\#address=/{domain}/#d' {self.dns_file}\"" self.exec_lxc(cmd) self.reload() def reload(self): res = self.exec_lxc("dnsmasq --test") if res.returncode == 0: self.exec_lxc("systemctl reload dnsmasq") else: raise RuntimeError(f"Dnsmasq config test failed: {res.stderr}") def list(self): hosts = self.exec_lxc(f"cat {self.hosts_file}").stdout dns = self.exec_lxc(f"cat {self.dns_file}").stdout return {"hosts": hosts, "dns": dns} def get_free_ips(self, start_subnet=70, end_subnet=80): """Finds free IPs in the range 10.32.[70-80].1-254 by checking both static and dynamic leases""" # 1. Get all static IPs from dhcp-hosts.conf and dynamic-hosts.conf static_configs = self.exec_lxc(f"cat /etc/dnsmasq.d/dhcp-hosts.conf {self.hosts_file} 2>/dev/null").stdout import re used_ips = set(re.findall(r'10\.32\.[0-9]{1,3}\.[0-9]{1,3}', static_configs)) # 2. Get all active dynamic leases leases = self.exec_lxc("cat /var/lib/misc/dnsmasq.leases 2>/dev/null").stdout used_ips.update(set(re.findall(r'10\.32\.[0-9]{1,3}\.[0-9]{1,3}', leases))) # 3. Find first available in the expanded agent range free_ips = [] for subnet_idx in range(start_subnet, end_subnet + 1): for host_idx in range(1, 255): candidate = f"10.32.{subnet_idx}.{host_idx}" if candidate not in used_ips: free_ips.append(candidate) if len(free_ips) >= 10: # Return top 10 return free_ips return free_ips