2026-02-05 11:29:34 +01:00
|
|
|
from .ssh import SSHClient
|
|
|
|
|
|
|
|
|
|
class DNSManager:
|
|
|
|
|
def __init__(self, config):
|
|
|
|
|
self.pve_host = config.get('proxmox.host')
|
|
|
|
|
self.pve_user = config.get('proxmox.user', 'root')
|
|
|
|
|
self.lxc_id = config.get('proxmox.dnsmasq_lxc_id')
|
|
|
|
|
self.ssh_key = config.get('proxmox.ssh_key')
|
|
|
|
|
self.client = SSHClient(self.pve_host, self.pve_user, self.ssh_key)
|
|
|
|
|
|
|
|
|
|
self.hosts_file = "/etc/dnsmasq.d/dynamic-hosts.conf"
|
|
|
|
|
self.dns_file = "/etc/dnsmasq.d/dynamic-dns.conf"
|
|
|
|
|
|
|
|
|
|
def exec_lxc(self, cmd):
|
|
|
|
|
return self.client.run(f"pct exec {self.lxc_id} -- {cmd}")
|
|
|
|
|
|
|
|
|
|
def add_host(self, mac, ip, hostname):
|
|
|
|
|
self.exec_lxc(f"touch {self.hosts_file}")
|
|
|
|
|
# Check for duplicates
|
|
|
|
|
res = self.exec_lxc(f"grep -q -F '{mac}' {self.hosts_file}")
|
|
|
|
|
if res.returncode == 0:
|
|
|
|
|
raise ValueError(f"MAC {mac} already exists")
|
|
|
|
|
|
2026-02-05 11:37:29 +01:00
|
|
|
cmd = f"sh -c \"echo 'dhcp-host={mac},{hostname},{ip}' >> {self.hosts_file}\""
|
|
|
|
|
self.exec_lxc(cmd)
|
2026-02-05 11:29:34 +01:00
|
|
|
self.reload()
|
|
|
|
|
|
|
|
|
|
def remove_host(self, mac):
|
|
|
|
|
self.exec_lxc(f"sed -i '/{mac}/d' {self.hosts_file}")
|
|
|
|
|
self.reload()
|
|
|
|
|
|
|
|
|
|
def add_dns(self, domain, ip):
|
|
|
|
|
self.exec_lxc(f"touch {self.dns_file}")
|
2026-02-05 11:37:29 +01:00
|
|
|
cmd = f"sh -c \"echo 'address=/{domain}/{ip}' >> {self.dns_file}\""
|
|
|
|
|
self.exec_lxc(cmd)
|
2026-02-05 11:29:34 +01:00
|
|
|
self.reload()
|
|
|
|
|
|
|
|
|
|
def remove_dns(self, domain):
|
2026-02-05 11:37:29 +01:00
|
|
|
cmd = f"sh -c \"sed -i '\#address=/{domain}/#d' {self.dns_file}\""
|
|
|
|
|
self.exec_lxc(cmd)
|
2026-02-05 11:29:34 +01:00
|
|
|
self.reload()
|
|
|
|
|
|
|
|
|
|
def reload(self):
|
|
|
|
|
res = self.exec_lxc("dnsmasq --test")
|
|
|
|
|
if res.returncode == 0:
|
|
|
|
|
self.exec_lxc("systemctl reload dnsmasq")
|
|
|
|
|
else:
|
|
|
|
|
raise RuntimeError(f"Dnsmasq config test failed: {res.stderr}")
|
|
|
|
|
|
|
|
|
|
def list(self):
|
|
|
|
|
hosts = self.exec_lxc(f"cat {self.hosts_file}").stdout
|
|
|
|
|
dns = self.exec_lxc(f"cat {self.dns_file}").stdout
|
2026-02-05 11:37:29 +01:00
|
|
|
return {"hosts": hosts, "dns": dns}
|