106 lines
No EOL
3.5 KiB
Python
106 lines
No EOL
3.5 KiB
Python
import pytest
|
|
import subprocess
|
|
import os
|
|
import uuid
|
|
import time
|
|
|
|
# Use the bin/infra wrapper for testing
|
|
CLI_BIN = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "bin", "infra"))
|
|
CONFIG_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "config.yaml"))
|
|
|
|
def run_infra(cmd, env=None):
|
|
full_env = os.environ.copy()
|
|
if env:
|
|
full_env.update(env)
|
|
full_cmd = [CLI_BIN, "--config", CONFIG_PATH] + cmd
|
|
return subprocess.run(full_cmd, capture_output=True, text=True, env=full_env)
|
|
|
|
@pytest.fixture
|
|
def unique_id():
|
|
return str(uuid.uuid4())[:8]
|
|
|
|
def test_dns_full_lifecycle(unique_id):
|
|
mac = f"aa:bb:cc:dd:ee:{unique_id[:2]}"
|
|
ip = "10.32.70.220"
|
|
hostname = f"test-lifecycle-{unique_id}"
|
|
domain = f"dns-test-{unique_id}.fe.loopaware.com"
|
|
|
|
# Add
|
|
assert run_infra(["dns", "add-host", mac, ip, hostname]).returncode == 0
|
|
assert run_infra(["dns", "add-dns", domain, ip]).returncode == 0
|
|
|
|
# Verify
|
|
res = run_infra(["dns", "list"])
|
|
assert mac in res.stdout
|
|
assert domain in res.stdout
|
|
|
|
# Cleanup
|
|
assert run_infra(["dns", "remove-host", mac]).returncode == 0
|
|
assert run_infra(["dns", "remove-dns", domain]).returncode == 0
|
|
|
|
def test_cloudflare_lifecycle(unique_id):
|
|
test_domain = f"test-ddns-{unique_id}.org"
|
|
|
|
# 1. Add to DDNS list
|
|
res = run_infra(["cloudflare", "add-ddns", test_domain])
|
|
assert res.returncode == 0
|
|
|
|
# 2. Verify in list
|
|
res = run_infra(["cloudflare", "list-ddns"])
|
|
assert test_domain in res.stdout
|
|
|
|
# 3. Remove from list
|
|
res = run_infra(["cloudflare", "remove-ddns", test_domain])
|
|
assert res.returncode == 0
|
|
|
|
# 4. Verify gone
|
|
res = run_infra(["cloudflare", "list-ddns"])
|
|
assert test_domain not in res.stdout
|
|
|
|
def test_decommission_command_flow(unique_id):
|
|
# This tests the command structure and error handling (using non-existent resources)
|
|
# We expect it to complete even if individual parts "fail" cleanup
|
|
domain = f"ghost-{unique_id}.com"
|
|
res = run_infra(["decommission", "--domain", domain])
|
|
assert res.returncode == 0
|
|
assert "Decommission process complete" in res.stdout
|
|
|
|
def test_proxmox_template_resolution():
|
|
# Verify the alias resolves to something on a known node
|
|
res = run_infra(["proxmox", "list-lxcs", "--node", "la-vmh-11"])
|
|
assert res.returncode == 0
|
|
# The actual resolution happens inside create-lxc, but we can verify the command exists
|
|
|
|
def test_samba_group_management(unique_id):
|
|
username = f"group_test_{unique_id}"
|
|
password = "TestPassword123!"
|
|
group = "xmpp-users"
|
|
|
|
# Add User & Group Join
|
|
assert run_infra(["samba", "add-user", username, password]).returncode == 0
|
|
assert run_infra(["samba", "add-to-group", group, username]).returncode == 0
|
|
|
|
def test_database_provisioning(unique_id):
|
|
project = f"test_proj_{unique_id}"
|
|
res = run_infra(["db", "provision", project])
|
|
assert res.returncode == 0
|
|
assert project in res.stdout
|
|
|
|
res = run_infra(["db", "list-dbs"])
|
|
assert project.lower().replace("-", "_") in res.stdout
|
|
|
|
def test_cert_cli():
|
|
# 1. List
|
|
res = run_infra(["cert", "list"])
|
|
assert res.returncode == 0
|
|
assert "loopaware.com.pem" in res.stdout
|
|
|
|
# 2. Status
|
|
res = run_infra(["cert", "status"])
|
|
assert res.returncode == 0
|
|
assert "notAfter" in res.stdout
|
|
|
|
def test_ip_discovery():
|
|
res = run_infra(["ip", "next-free"])
|
|
assert res.returncode == 0
|
|
assert "10.32." in res.stdout |