Custom DDNS for Your Home Lab with Cloudflare (Python + cron)

Goal: keep a hostname (e.g., home.maksonlee.com) always pointing to your current WAN IP by calling Cloudflare’s API on a schedule. This post gives you a drop-in Python script and a simple cron schedule.


What you’ll deploy

  • Detect current public IPv4
  • Auto-discover Zone ID (exact match)
  • Find the DNS record by FQDN (accepts home or home.maksonlee.com)
  • Create if missing; update only when values change
  • TTL = Auto (Cloudflare uses 1 to mean Auto)
  • Optional Cloudflare proxy (orange cloud) toggle

Prerequisites

  • Cloudflare API Token limited to your zone with Zone → DNS → Edit permissions.
  • Ubuntu 24.04 Server with Python 3 (default).
  • python3-requests and jq are usually present; verify or install:
# Verify
python3 -c 'import requests; print("requests", requests.__version__)'
jq --version

# (Optional) Install if missing
sudo apt-get update
sudo apt-get install -y python3-requests jq

  1. Save the script

Create ~/scripts/cf_ddns.py:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Cloudflare DDNS updater (IPv4 only, A record).
- Detects current public IPv4.
- Finds the exact zone by name (no fallback).
- Creates the DNS record if missing; updates only when necessary.
"""

import sys
import ipaddress
from typing import List, Dict
import requests

# ========= HARD-CODED CONFIG =========
CF_API_TOKEN = "YOUR_CF_API_TOKEN_HERE"    # Requires Zone:Zone:Read + Zone:DNS:Edit on the target zone
ZONE_NAME    = "maksonlee.com"             # e.g., "maksonlee.com"
DNS_NAME     = "home.maksonlee.com"        # accepts "home" or "home.maksonlee.com"
TTL          = 1                           # 1 => Cloudflare "Auto" TTL
PROXIED      = False                       # True to enable Cloudflare proxy (orange cloud)
TIMEOUT      = 15                          # seconds for API calls
# =====================================

API_BASE = "https://api.cloudflare.com/client/v4"


def cf_headers(token: str) -> Dict[str, str]:
    return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}


def normalize_fqdn(name: str, zone: str) -> str:
    name = (name or "").strip().rstrip(".").lower()
    zone = (zone or "").strip().rstrip(".").lower()
    if name in ("", "@", zone):  # zone apex
        return zone
    if name.endswith("." + zone):
        return name
    return f"{name}.{zone}"


def get_public_ipv4() -> str:
    """Return a validated public IPv4 string."""
    urls = ["https://api.ipify.org", "https://ipv4.icanhazip.com", "https://ifconfig.me/ip"]
    for url in urls:
        try:
            r = requests.get(url, timeout=6)
            ip = (r.text or "").strip()
            if ip and ipaddress.ip_address(ip).version == 4:
                return ip
        except Exception:
            pass
    raise SystemExit("Failed to determine public IPv4 address from all providers.")


def get_zone_id(zone_name: str, token: str) -> str:
    """Find the EXACT zone id for zone_name, otherwise exit with error."""
    url = f"{API_BASE}/zones"
    params = {"name": zone_name, "status": "active", "per_page": 50}
    r = requests.get(url, headers=cf_headers(token), params=params, timeout=TIMEOUT)
    try:
        data = r.json()
    except Exception:
        raise SystemExit(f"Cannot parse Cloudflare zones response (HTTP {r.status_code}).")

    if not data.get("success"):
        raise SystemExit(f"Failed to list zones. Check token permissions (needs Zone:Zone:Read): {data}")

    results = data.get("result") or []
    for z in results:
        if (z.get("name") or "").lower().rstrip(".") == zone_name.lower().rstrip("."):
            return z["id"]

    names = [z.get("name") for z in results]
    raise SystemExit(f"Exact zone '{zone_name}' not found. Available (filtered) zones: {names}")


def _list_records_page(zone_id: str, token: str, params: Dict) -> Dict:
    url = f"{API_BASE}/zones/{zone_id}/dns_records"
    r = requests.get(url, headers=cf_headers(token), params=params, timeout=TIMEOUT)
    data = r.json()
    if not data.get("success"):
        raise SystemExit(f"Failed to list DNS records: {data}")
    return data


def list_records(zone_id: str, token: str, name_fqdn: str) -> List[Dict]:
    # Server-side exact filter by type+name (FQDN)
    params = {"type": "A", "name": name_fqdn, "per_page": 100, "page": 1}
    data = _list_records_page(zone_id, token, params)
    result = data.get("result", [])
    if result:
        return result

    # Fallback: fetch all A records and filter locally by exact FQDN
    params = {"type": "A", "per_page": 100, "page": 1}
    all_results: List[Dict] = []
    while True:
        data = _list_records_page(zone_id, token, params)
        all_results.extend(data.get("result", []))
        info = data.get("result_info", {}) or {}
        if info.get("page", 1) >= info.get("total_pages", 1):
            break
        params["page"] = info["page"] + 1

    name_lc = name_fqdn.lower().rstrip(".")
    return [r for r in all_results if (r.get("name", "").lower().rstrip(".")) == name_lc]


def create_record(zone_id: str, token: str, name_fqdn: str, ip: str, ttl: int, proxied: bool) -> Dict:
    url = f"{API_BASE}/zones/{zone_id}/dns_records"
    payload = {"type": "A", "name": name_fqdn, "content": ip, "ttl": ttl, "proxied": proxied}
    r = requests.post(url, headers=cf_headers(token), json=payload, timeout=TIMEOUT)
    data = r.json()
    if not data.get("success"):
        # 81058 => identical record already exists
        for err in data.get("errors", []):
            if err.get("code") == 81058:
                raise RuntimeError("CF_DUPLICATE")
        raise SystemExit(f"Create record failed: {data}")
    return data["result"]


def update_record(zone_id: str, token: str, record_id: str, name_fqdn: str, ip: str, ttl: int, proxied: bool) -> Dict:
    url = f"{API_BASE}/zones/{zone_id}/dns_records/{record_id}"
    payload = {"type": "A", "name": name_fqdn, "content": ip, "ttl": ttl, "proxied": proxied}
    r = requests.put(url, headers=cf_headers(token), json=payload, timeout=TIMEOUT)
    data = r.json()
    if not data.get("success"):
        raise SystemExit(f"Update record failed: {data}")
    return data["result"]


def main():
    # sanity
    for k, v in {"CF_API_TOKEN": CF_API_TOKEN, "ZONE_NAME": ZONE_NAME, "DNS_NAME": DNS_NAME}.items():
        if not v:
            raise SystemExit(f"Missing required config: {k}")

    fqdn = normalize_fqdn(DNS_NAME, ZONE_NAME)
    ip = get_public_ipv4()
    print(f"Public IP: {ip}")
    print(f"FQDN: {fqdn}")

    zone_id = get_zone_id(ZONE_NAME, CF_API_TOKEN)
    print(f"Zone ID: {zone_id}")

    records = list_records(zone_id, CF_API_TOKEN, fqdn)

    if not records:
        try:
            created = create_record(zone_id, CF_API_TOKEN, fqdn, ip, TTL, PROXIED)
            print(f"Created A: {created['name']} -> {created['content']} (ttl={created['ttl']}, proxied={created['proxied']})")
            return
        except RuntimeError as e:
            if str(e) == "CF_DUPLICATE":
                # Re-list after Cloudflare’s normalization (should now be enumerable with FQDN)
                records = list_records(zone_id, CF_API_TOKEN, fqdn)
                if not records:
                    print("Record exists but couldn't be enumerated. Please check the Cloudflare UI or API.")
                    sys.exit(1)
            else:
                raise

    changed = False
    for rec in records:
        rid = rec["id"]
        cur_ip = rec.get("content")
        cur_ttl = rec.get("ttl")
        cur_proxied = bool(rec.get("proxied"))

        if cur_ip == ip and cur_ttl == TTL and cur_proxied == bool(PROXIED):
            print(f"No change needed: {fqdn} -> {cur_ip} (ttl={cur_ttl}, proxied={cur_proxied})")
            continue

        updated = update_record(zone_id, CF_API_TOKEN, rid, fqdn, ip, TTL, PROXIED)
        print(f"Updated A {rid}: {updated['name']} -> {updated['content']} (ttl={updated['ttl']}, proxied={updated['proxied']})")
        changed = True

    if not changed:
        print("No updates were necessary.")


if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print(str(e), file=sys.stderr)
        sys.exit(1)

Make it executable and private:

chmod 700 ~/scripts
chmod 700 ~/scripts/cf_ddns.py

  1. One-shot test
~/scripts/cf_ddns.py

Sample output:

Public IP: 125.228.82.223
FQDN: home.maksonlee.com
Zone ID: b62ff0b4a06fad84bef817fa408bf41f
Updated A 123abc...: home.maksonlee.com -> 125.228.82.223 (ttl=1, proxied=False)

  1. Schedule it with cron

Run every 5 minutes as user administrator:

crontab -e
# add:
*/5 * * * * /home/administrator/scripts/cf_ddns.py >/dev/null 2>&1

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top