74 lines
2.6 KiB
Python
74 lines
2.6 KiB
Python
|
|
import requests
|
||
|
|
import sys
|
||
|
|
|
||
|
|
class CloudflareManager:
|
||
|
|
def __init__(self, config):
|
||
|
|
self.token = config.get('cloudflare.token')
|
||
|
|
self.ddns_domains = config.get('cloudflare.ddns_domains', [])
|
||
|
|
self.api_url = "https://api.cloudflare.com/client/v4"
|
||
|
|
self.headers = {
|
||
|
|
"Authorization": f"Bearer {self.token}",
|
||
|
|
"Content-Type": "application/json"
|
||
|
|
}
|
||
|
|
|
||
|
|
def get_external_ip(self):
|
||
|
|
try:
|
||
|
|
return requests.get("https://checkip.amazonaws.com").text.strip()
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Error fetching external IP: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
def get_zone_id(self, domain):
|
||
|
|
res = requests.get(f"{self.api_url}/zones?name={domain}", headers=self.headers)
|
||
|
|
data = res.json()
|
||
|
|
if data.get('success') and data['result']:
|
||
|
|
return data['result'][0]['id']
|
||
|
|
return None
|
||
|
|
|
||
|
|
def update_ddns(self, force=False):
|
||
|
|
current_ip = self.get_external_ip()
|
||
|
|
if not current_ip:
|
||
|
|
return "Failed to determine current external IP."
|
||
|
|
|
||
|
|
results = []
|
||
|
|
for domain in self.ddns_domains:
|
||
|
|
zone_id = self.get_zone_id(domain)
|
||
|
|
if not zone_id:
|
||
|
|
results.append(f"[{domain}] Zone not found.")
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Find A record for the root domain
|
||
|
|
res = requests.get(f"{self.api_url}/zones/{zone_id}/dns_records?type=A&name={domain}", headers=self.headers)
|
||
|
|
records = res.json().get('result', [])
|
||
|
|
|
||
|
|
if not records:
|
||
|
|
# Create if missing? For now, just report
|
||
|
|
results.append(f"[{domain}] No A record found to update.")
|
||
|
|
continue
|
||
|
|
|
||
|
|
record = records[0]
|
||
|
|
if record['content'] == current_ip and not force:
|
||
|
|
results.append(f"[{domain}] Already up to date ({current_ip}).")
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Update record
|
||
|
|
update_data = {
|
||
|
|
"type": "A",
|
||
|
|
"name": domain,
|
||
|
|
"content": current_ip,
|
||
|
|
"ttl": 1, # Auto
|
||
|
|
"proxied": record.get('proxied', True)
|
||
|
|
}
|
||
|
|
u_res = requests.put(f"{self.api_url}/zones/{zone_id}/dns_records/{record['id']}",
|
||
|
|
headers=self.headers, json=update_data)
|
||
|
|
|
||
|
|
if u_res.json().get('success'):
|
||
|
|
results.append(f"[{domain}] Updated to {current_ip}.")
|
||
|
|
else:
|
||
|
|
results.append(f"[{domain}] Update failed: {u_res.text}")
|
||
|
|
|
||
|
|
return "\n".join(results)
|
||
|
|
|
||
|
|
def list_domains(self):
|
||
|
|
return self.ddns_domains
|