feat: integrate testinfra for state-based infrastructure verification

This commit is contained in:
Fredrick Amnehagen 2026-02-05 21:24:52 +01:00
parent 069441d3eb
commit e801fe7b8d

View file

@ -2,11 +2,12 @@ import pytest
import subprocess import subprocess
import os import os
import uuid import uuid
import time import testinfra
# Use the bin/infra wrapper for testing # Use the bin/infra wrapper for testing
CLI_BIN = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "bin", "infra")) CLI_BIN = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "bin", "infra"))
CONFIG_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "config.yaml")) CONFIG_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "config.yaml"))
SSH_KEY = os.path.expanduser("~/.ssh/id_ed25519_no_pass")
def run_infra(cmd, env=None): def run_infra(cmd, env=None):
full_env = os.environ.copy() full_env = os.environ.copy()
@ -15,101 +16,84 @@ def run_infra(cmd, env=None):
full_cmd = [CLI_BIN, "--config", CONFIG_PATH] + cmd full_cmd = [CLI_BIN, "--config", CONFIG_PATH] + cmd
return subprocess.run(full_cmd, capture_output=True, text=True, env=full_env) return subprocess.run(full_cmd, capture_output=True, text=True, env=full_env)
def get_testinfra_host(host_str):
return testinfra.get_host(
f"ssh://root@{host_str}",
ssh_identity_file=SSH_KEY,
ssh_extra_args="-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null"
)
@pytest.fixture @pytest.fixture
def unique_id(): def unique_id():
return str(uuid.uuid4())[:8] return str(uuid.uuid4())[:8]
def test_dns_full_lifecycle(unique_id): def test_dns_full_lifecycle_with_verification(unique_id):
# 1. Programmatically find a free IP
res = run_infra(["ip", "next-free"])
assert res.returncode == 0
ip = res.stdout.strip()
mac = f"aa:bb:cc:dd:ee:{unique_id[:2]}" mac = f"aa:bb:cc:dd:ee:{unique_id[:2]}"
ip = "10.32.70.220" hostname = f"test-infra-{unique_id}"
hostname = f"test-lifecycle-{unique_id}"
domain = f"dns-test-{unique_id}.fe.loopaware.com"
# Add # 2. Add
assert run_infra(["dns", "add-host", mac, ip, hostname]).returncode == 0 res = run_infra(["dns", "add-host", mac, ip, hostname])
assert run_infra(["dns", "add-dns", domain, ip]).returncode == 0 assert res.returncode == 0
# Verify # 3. Infrastructure Verification
res = run_infra(["dns", "list"]) host = get_testinfra_host("10.32.2.1")
assert mac in res.stdout res = host.run(f"pct exec 11209 -- grep {mac} /etc/dnsmasq.d/dynamic-hosts.conf")
assert domain in res.stdout assert res.rc == 0
assert ip in res.stdout
# Cleanup # 4. Cleanup
assert run_infra(["dns", "remove-host", mac]).returncode == 0 assert run_infra(["dns", "remove-host", mac]).returncode == 0
assert run_infra(["dns", "remove-dns", domain]).returncode == 0
def test_cloudflare_lifecycle(unique_id): def test_samba_verification(unique_id):
test_domain = f"test-ddns-{unique_id}.org" username = f"tester_{unique_id}"
# 1. Add to DDNS list
res = run_infra(["cloudflare", "add-ddns", test_domain])
assert res.returncode == 0
# 2. Verify in list
res = run_infra(["cloudflare", "list-ddns"])
assert test_domain in res.stdout
# 3. Remove from list
res = run_infra(["cloudflare", "remove-ddns", test_domain])
assert res.returncode == 0
# 4. Verify gone
res = run_infra(["cloudflare", "list-ddns"])
assert test_domain not in res.stdout
def test_decommission_command_flow(unique_id):
# This tests the command structure and error handling (using non-existent resources)
# We expect it to complete even if individual parts "fail" cleanup
domain = f"ghost-{unique_id}.com"
res = run_infra(["decommission", "--domain", domain])
assert res.returncode == 0
assert "Decommission process complete" in res.stdout
def test_proxmox_template_resolution():
# Verify the alias resolves to something on a known node
res = run_infra(["proxmox", "list-lxcs", "--node", "la-vmh-11"])
assert res.returncode == 0
# The actual resolution happens inside create-lxc, but we can verify the command exists
def test_samba_group_management(unique_id):
username = f"group_test_{unique_id}"
password = "TestPassword123!" password = "TestPassword123!"
group = "xmpp-users"
# Add User & Group Join # 1. Add User
assert run_infra(["samba", "add-user", username, password]).returncode == 0 assert run_infra(["samba", "add-user", username, password]).returncode == 0
assert run_infra(["samba", "add-to-group", group, username]).returncode == 0
def test_database_provisioning(unique_id):
project = f"test_proj_{unique_id}"
res = run_infra(["db", "provision", project])
assert res.returncode == 0
assert project in res.stdout
res = run_infra(["db", "list-dbs"]) # 2. Infrastructure Verification
assert project.lower().replace("-", "_") in res.stdout host = get_testinfra_host("10.32.2.1")
res = host.run(f"pct exec 1113 -- samba-tool user list | grep {username}")
assert res.rc == 0
def test_cert_cli(): def test_database_verification(unique_id):
# 1. List project = f"testinfra_proj_{unique_id}"
res = run_infra(["cert", "list"]) db_name = project.lower().replace("-", "_")
assert res.returncode == 0
assert "loopaware.com.pem" in res.stdout
# 2. Status # 1. Provision
res = run_infra(["cert", "status"]) assert run_infra(["db", "provision", project]).returncode == 0
assert res.returncode == 0
assert "notAfter" in res.stdout # 2. Infrastructure Verification
host = get_testinfra_host("10.32.70.54")
# Use raw string for the regex pipe to avoid syntax warnings
res = host.run(rf"su - postgres -c 'psql -lqt | cut -d \| -f 1 | grep -qw {db_name}'")
assert res.rc == 0
# 3. Resolve def test_cert_verification():
res = run_infra(["cert", "resolve", "loopaware.com"]) # Verify the cert existence on the cert-mgr node
assert res.returncode == 0 host = get_testinfra_host("10.32.2.1")
assert "loopaware.com.pem" in res.stdout res = host.run("pct exec 11215 -- ls /shared-certs/la-infra-san.pem")
assert res.rc == 0
res = run_infra(["cert", "resolve", "wiki.loopaware.com"])
assert res.returncode == 0
assert "loopaware.com.pem" in res.stdout
def test_ip_discovery(): def test_ip_discovery():
res = run_infra(["ip", "next-free"]) res = run_infra(["ip", "next-free"])
assert res.returncode == 0 assert res.returncode == 0
assert "10.32." in res.stdout assert "10.32." in res.stdout
def test_cloudflare_lifecycle(unique_id):
test_domain = f"testinfra-ddns-{unique_id}.org"
assert run_infra(["cloudflare", "add-ddns", test_domain]).returncode == 0
res = run_infra(["cloudflare", "list-ddns"])
assert test_domain in res.stdout
assert run_infra(["cloudflare", "remove-ddns", test_domain]).returncode == 0
def test_decommission_command_flow(unique_id):
domain = f"ghost-{unique_id}.com"
res = run_infra(["decommission", "--domain", domain])
assert res.returncode == 0
assert "Decommission process complete" in res.stdout