Goal: keep a hostname (e.g., home.maksonlee.com
) always pointing to your current WAN IP by calling Cloudflare’s API on a schedule. This post gives you a drop-in Python script and a simple cron schedule.
What you’ll deploy
- Detect current public IPv4
- Auto-discover Zone ID (exact match)
- Find the DNS record by FQDN (accepts
home
orhome.maksonlee.com
) - Create if missing; update only when values change
- TTL = Auto (Cloudflare uses
1
to mean Auto) - Optional Cloudflare proxy (orange cloud) toggle
Prerequisites
- Cloudflare API Token limited to your zone with Zone → DNS → Edit permissions.
- Ubuntu 24.04 Server with Python 3 (default).
python3-requests
andjq
are usually present; verify or install:
# Verify
python3 -c 'import requests; print("requests", requests.__version__)'
jq --version
# (Optional) Install if missing
sudo apt-get update
sudo apt-get install -y python3-requests jq
- Save the script
Create ~/scripts/cf_ddns.py
:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Cloudflare DDNS updater (IPv4 only, A record).
- Detects current public IPv4.
- Finds the exact zone by name (no fallback).
- Creates the DNS record if missing; updates only when necessary.
"""
import sys
import ipaddress
from typing import List, Dict
import requests
# ========= HARD-CODED CONFIG =========
CF_API_TOKEN = "YOUR_CF_API_TOKEN_HERE" # Requires Zone:Zone:Read + Zone:DNS:Edit on the target zone
ZONE_NAME = "maksonlee.com" # e.g., "maksonlee.com"
DNS_NAME = "home.maksonlee.com" # accepts "home" or "home.maksonlee.com"
TTL = 1 # 1 => Cloudflare "Auto" TTL
PROXIED = False # True to enable Cloudflare proxy (orange cloud)
TIMEOUT = 15 # seconds for API calls
# =====================================
API_BASE = "https://api.cloudflare.com/client/v4"
def cf_headers(token: str) -> Dict[str, str]:
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
def normalize_fqdn(name: str, zone: str) -> str:
name = (name or "").strip().rstrip(".").lower()
zone = (zone or "").strip().rstrip(".").lower()
if name in ("", "@", zone): # zone apex
return zone
if name.endswith("." + zone):
return name
return f"{name}.{zone}"
def get_public_ipv4() -> str:
"""Return a validated public IPv4 string."""
urls = ["https://api.ipify.org", "https://ipv4.icanhazip.com", "https://ifconfig.me/ip"]
for url in urls:
try:
r = requests.get(url, timeout=6)
ip = (r.text or "").strip()
if ip and ipaddress.ip_address(ip).version == 4:
return ip
except Exception:
pass
raise SystemExit("Failed to determine public IPv4 address from all providers.")
def get_zone_id(zone_name: str, token: str) -> str:
"""Find the EXACT zone id for zone_name, otherwise exit with error."""
url = f"{API_BASE}/zones"
params = {"name": zone_name, "status": "active", "per_page": 50}
r = requests.get(url, headers=cf_headers(token), params=params, timeout=TIMEOUT)
try:
data = r.json()
except Exception:
raise SystemExit(f"Cannot parse Cloudflare zones response (HTTP {r.status_code}).")
if not data.get("success"):
raise SystemExit(f"Failed to list zones. Check token permissions (needs Zone:Zone:Read): {data}")
results = data.get("result") or []
for z in results:
if (z.get("name") or "").lower().rstrip(".") == zone_name.lower().rstrip("."):
return z["id"]
names = [z.get("name") for z in results]
raise SystemExit(f"Exact zone '{zone_name}' not found. Available (filtered) zones: {names}")
def _list_records_page(zone_id: str, token: str, params: Dict) -> Dict:
url = f"{API_BASE}/zones/{zone_id}/dns_records"
r = requests.get(url, headers=cf_headers(token), params=params, timeout=TIMEOUT)
data = r.json()
if not data.get("success"):
raise SystemExit(f"Failed to list DNS records: {data}")
return data
def list_records(zone_id: str, token: str, name_fqdn: str) -> List[Dict]:
# Server-side exact filter by type+name (FQDN)
params = {"type": "A", "name": name_fqdn, "per_page": 100, "page": 1}
data = _list_records_page(zone_id, token, params)
result = data.get("result", [])
if result:
return result
# Fallback: fetch all A records and filter locally by exact FQDN
params = {"type": "A", "per_page": 100, "page": 1}
all_results: List[Dict] = []
while True:
data = _list_records_page(zone_id, token, params)
all_results.extend(data.get("result", []))
info = data.get("result_info", {}) or {}
if info.get("page", 1) >= info.get("total_pages", 1):
break
params["page"] = info["page"] + 1
name_lc = name_fqdn.lower().rstrip(".")
return [r for r in all_results if (r.get("name", "").lower().rstrip(".")) == name_lc]
def create_record(zone_id: str, token: str, name_fqdn: str, ip: str, ttl: int, proxied: bool) -> Dict:
url = f"{API_BASE}/zones/{zone_id}/dns_records"
payload = {"type": "A", "name": name_fqdn, "content": ip, "ttl": ttl, "proxied": proxied}
r = requests.post(url, headers=cf_headers(token), json=payload, timeout=TIMEOUT)
data = r.json()
if not data.get("success"):
# 81058 => identical record already exists
for err in data.get("errors", []):
if err.get("code") == 81058:
raise RuntimeError("CF_DUPLICATE")
raise SystemExit(f"Create record failed: {data}")
return data["result"]
def update_record(zone_id: str, token: str, record_id: str, name_fqdn: str, ip: str, ttl: int, proxied: bool) -> Dict:
url = f"{API_BASE}/zones/{zone_id}/dns_records/{record_id}"
payload = {"type": "A", "name": name_fqdn, "content": ip, "ttl": ttl, "proxied": proxied}
r = requests.put(url, headers=cf_headers(token), json=payload, timeout=TIMEOUT)
data = r.json()
if not data.get("success"):
raise SystemExit(f"Update record failed: {data}")
return data["result"]
def main():
# sanity
for k, v in {"CF_API_TOKEN": CF_API_TOKEN, "ZONE_NAME": ZONE_NAME, "DNS_NAME": DNS_NAME}.items():
if not v:
raise SystemExit(f"Missing required config: {k}")
fqdn = normalize_fqdn(DNS_NAME, ZONE_NAME)
ip = get_public_ipv4()
print(f"Public IP: {ip}")
print(f"FQDN: {fqdn}")
zone_id = get_zone_id(ZONE_NAME, CF_API_TOKEN)
print(f"Zone ID: {zone_id}")
records = list_records(zone_id, CF_API_TOKEN, fqdn)
if not records:
try:
created = create_record(zone_id, CF_API_TOKEN, fqdn, ip, TTL, PROXIED)
print(f"Created A: {created['name']} -> {created['content']} (ttl={created['ttl']}, proxied={created['proxied']})")
return
except RuntimeError as e:
if str(e) == "CF_DUPLICATE":
# Re-list after Cloudflare’s normalization (should now be enumerable with FQDN)
records = list_records(zone_id, CF_API_TOKEN, fqdn)
if not records:
print("Record exists but couldn't be enumerated. Please check the Cloudflare UI or API.")
sys.exit(1)
else:
raise
changed = False
for rec in records:
rid = rec["id"]
cur_ip = rec.get("content")
cur_ttl = rec.get("ttl")
cur_proxied = bool(rec.get("proxied"))
if cur_ip == ip and cur_ttl == TTL and cur_proxied == bool(PROXIED):
print(f"No change needed: {fqdn} -> {cur_ip} (ttl={cur_ttl}, proxied={cur_proxied})")
continue
updated = update_record(zone_id, CF_API_TOKEN, rid, fqdn, ip, TTL, PROXIED)
print(f"Updated A {rid}: {updated['name']} -> {updated['content']} (ttl={updated['ttl']}, proxied={updated['proxied']})")
changed = True
if not changed:
print("No updates were necessary.")
if __name__ == "__main__":
try:
main()
except Exception as e:
print(str(e), file=sys.stderr)
sys.exit(1)
Make it executable and private:
chmod 700 ~/scripts
chmod 700 ~/scripts/cf_ddns.py
- One-shot test
~/scripts/cf_ddns.py
Sample output:
Public IP: 125.228.82.223
FQDN: home.maksonlee.com
Zone ID: b62ff0b4a06fad84bef817fa408bf41f
Updated A 123abc...: home.maksonlee.com -> 125.228.82.223 (ttl=1, proxied=False)
- Schedule it with cron
Run every 5 minutes as user administrator
:
crontab -e
# add:
*/5 * * * * /home/administrator/scripts/cf_ddns.py >/dev/null 2>&1