End-to-End: Stream Cassandra 5.0.5 CDC → Kafka 4.0 (SASL/SSL) → Spark 3.5.6 → Delta Lake

This post shows a clean, reproducible path to stream ThingsBoard data stored in Cassandra 5.0.5 into Kafka 4.0 using Debezium (Cassandra), and then land it in Delta Lake with Spark 3.5.6.

Stack & install references

Architecture: Cassandra CDCDebezium Cassandra Connector (standalone)Kafka (SASL/SSL)Spark Structured StreamingDelta Lake (bronze/silver)


Why this architecture?

  • Minimal load on Cassandra (streaming): change events come from CDC commit logs. (Note: the initial snapshot performs a one-time table read; you can tune or disable it.)
  • At-least-once delivery to Kafka: Debezium preserves per-partition ordering and retries safely; use Delta MERGE to dedupe for EO-like results.
  • Decoupled compute: Spark only needs the Kafka source and Delta sink—keeps the pipeline simple and future-proof for Spark 4.x.
  • Schema-on-write in Delta: enforced schema, optional evolution, and efficient MERGE upserts for fast analytics.

Note on Spark versions (why CDC + Debezium + Kafka, not Spark Cassandra Connector)

We use Spark 3.5.6 here. The CDC → Debezium → Kafka approach keeps Spark code on standard connectors (Kafka + Delta) that are updated quickly and are the easiest path to Spark 4.x. The Spark Cassandra Connector is tied closely to specific Spark releases and historically lags major versions, which can block upgrades.


  1. Enable Cassandra CDC
  • Node-level CDC

Ensure global CDC is enabled and the raw directory exists:

# /etc/cassandra/cassandra.yaml
cdc_enabled: true
cdc_raw_directory: /var/lib/cassandra/cdc_raw
cdc_total_space: 4096MiB
cdc_free_space_check_interval: 250ms
cdc_block_writes: true

Create the directories (if needed) and set permissions for the cassandra user:

sudo mkdir -p /var/lib/cassandra/cdc_raw /var/lib/cassandra/cdc_relocate
sudo chown -R cassandra:cassandra /var/lib/cassandra

Restart Cassandra if you changed the YAML.

  • Table-level CDC (ThingsBoard tables)

Turn on CDC on the two ThingsBoard tables (if not already true):

-- ts_kv_cf: time-series key/value data
ALTER TABLE thingsboard.ts_kv_cf WITH cdc = true;

-- ts_kv_partitions_cf: partition tracking
ALTER TABLE thingsboard.ts_kv_partitions_cf WITH cdc = true;

Verify:

DESCRIBE TABLE thingsboard.ts_kv_cf;
DESCRIBE TABLE thingsboard.ts_kv_partitions_cf;
-- Look for:  AND cdc = true

  1. Debezium Cassandra (standalone) on Java 17
  • Layout
sudo mkdir -p /opt/debezium/conf /opt/debezium/state
sudo chown -R cassandra:cassandra /opt/debezium

Copy the Debezium Cassandra connector fat JAR (the one you’re already using) to /opt/debezium/, e.g.:

curl -OL https://repo1.maven.org/maven2/io/debezium/debezium-connector-cassandra-5/3.2.1.Final/debezium-connector-cassandra-5-3.2.1.Final-jar-with-dependencies.jar
sudo cp debezium-connector-cassandra-5-3.2.1.Final-jar-with-dependencies.jar /opt/debezium/
  • JVM module flags (Java 17 + Cassandra 5)

Create /opt/debezium/jvm17-debezium.options (this avoids editing Cassandra’s config):

# JPMS exports (needed by Cassandra 5 on Java 17)
--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED
--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED
--add-exports=java.base/sun.nio.ch=ALL-UNNAMED
--add-exports=jdk.management/com.sun.management.internal=ALL-UNNAMED

# JPMS opens (reflection access)
--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED
--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED

# Required because we’re not using Cassandra’s launcher
-Dcassandra.storagedir=/var/lib/cassandra
  • Debezium connector config

Save /opt/debezium/conf/cassandra.properties:

# --- Identity / health
connector.name=tb_cass
http.port=8000

# --- Cassandra
cassandra.config=/etc/cassandra/cassandra.yaml
cassandra.hosts=127.0.0.1
cassandra.port=9042

# --- Commit log processing (4.x+ near real-time)
commit.log.real.time.processing.enabled=true
commit.log.marked.complete.poll.interval.ms=1000

# Move processed logs out of cdc_raw, then delete them
commit.log.relocation.dir=/var/lib/cassandra/cdc_relocate
commit.log.post.processing.enabled=true
commit.log.relocation.dir.poll.interval.ms=10000
commit.log.transfer.class=io.debezium.connector.cassandra.BlackHoleCommitLogTransfer

# Align ordering with Cassandra partition key
event.order.guarantee.mode=PARTITION_VALUES
latest.commit.log.only=false

# --- Kafka producer (your SASL_SSL)
kafka.producer.bootstrap.servers=kafka.maksonlee.com:9093
kafka.producer.security.protocol=SASL_SSL
kafka.producer.sasl.mechanism=PLAIN
kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user1" password="password1";
kafka.producer.ssl.endpoint.identification.algorithm=https

# Producer safety
kafka.producer.acks=all
kafka.producer.enable.idempotence=true
kafka.producer.max.in.flight.requests.per.connection=5
kafka.producer.retries=2147483647

# --- Topics & offsets
topic.prefix=tb
offset.backing.store.dir=/opt/debezium/state

# --- Snapshot then stream
snapshot.mode=INITIAL
snapshot.consistency=LOCAL_QUORUM

# --- JSON output (Spark-friendly; no Schema Registry)
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
sudo chown -R cassandra:cassandra /opt/debezium
  • Start Debezium (as cassandra user)
sudo -u cassandra bash -lc '
  cd /opt/debezium
  JVM_FLAGS="$(grep -vE "^\s*(#|$)" /opt/debezium/jvm17-debezium.options | tr "\n" " ")"
  exec java -Xms256m -Xmx1g $JVM_FLAGS \
    -jar /opt/debezium/debezium-connector-cassandra-5-3.2.1.Final-jar-with-dependencies.jar \
    /opt/debezium/conf/cassandra.properties
'

You should see Debezium connect to Cassandra, detect CDC tables, and begin producing to Kafka.


  1. Kafka 4.0 → Spark 3.5.6 → Delta Lake streaming job

We’ll persist bronze (raw JSON by topic) and silver (lightly parsed) tables.

  • Prepare the Delta directory
sudo mkdir /delta
sudo chown spark:spark /delta
  • Create the streaming job
sudo su - spark
vi cdc_to_delta.py

Paste this:

import os, time
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, get_json_object
from pyspark.sql.types import *
from delta.tables import DeltaTable

# === Kafka security config (yours) ===
conf = {
    "bootstrap.servers": "kafka.maksonlee.com:9093",
    "security.protocol": "sasl_ssl",  # will be uppercased
    "sasl.mechanism": "PLAIN",
    "sasl.username": "user1",
    "sasl.password": "password1",
    # Optional TLS extras...
}

# Topics
TOPIC_CF  = "tb.thingsboard.ts_kv_cf"
TOPIC_PCF = "tb.thingsboard.ts_kv_partitions_cf"

# Output paths
BRONZE_CF_PATH  = "/delta/bronze/tb_ts_kv_cf"
BRONZE_PCF_PATH = "/delta/bronze/tb_ts_kv_partitions_cf"
SILVER_CF_PATH  = "/delta/silver/tb_ts_kv_cf"
SILVER_PCF_PATH = "/delta/silver/tb_ts_kv_partitions_cf"

# Single streaming checkpoint
DRIVER_CP = "/delta/_checkpoints/driver/cdc_fanout_upsert"

# ---------- Tuning ----------
COALESCE_N       = int(os.getenv("COALESCE_N", "1"))
SLEEP_SECS       = int(os.getenv("AVAILABLE_NOW_PAUSE_SECS", "10"))
TRIGGER_MODE     = os.getenv("TRIGGER_MODE", "availableNow").lower()  # 'availableNow' or 'processing'
TRIGGER_INTERVAL = os.getenv("TRIGGER_INTERVAL", "10 seconds")
MAX_OFFSETS      = os.getenv("MAX_OFFSETS_PER_TRIGGER", "").strip()
# ----------------------------

# Pre-create Silver schemas (so path queries won’t 404)
SILVER_CF_SCHEMA = StructType([
    StructField("entity_type", StringType(), True),
    StructField("entity_id",  StringType(), True),  # timeuuid as text
    StructField("key",         StringType(), True),
    StructField("partition",   LongType(),   True),
    StructField("ts",          LongType(),   True),
    StructField("str_v",       StringType(), True),
    StructField("long_v",      LongType(),   True),
    StructField("dbl_v",       DoubleType(), True),
    StructField("bool_v",      BooleanType(),True),
    StructField("json_v",      StringType(), True),
])

SILVER_PCF_SCHEMA = StructType([
    StructField("entity_type", StringType(), True),
    StructField("entity_id",   StringType(), True),
    StructField("key",         StringType(), True),
    StructField("partition",   LongType(),   True),
])

def build_kafka_options(conf_: dict, topics_csv: str) -> dict:
    opts = {
        "kafka.bootstrap.servers": conf_["bootstrap.servers"],
        "subscribe": topics_csv,
        "startingOffsets": "earliest",
        "failOnDataLoss": "false",
    }
    if MAX_OFFSETS:
        opts["maxOffsetsPerTrigger"] = MAX_OFFSETS
    if "security.protocol" in conf_:
        opts["kafka.security.protocol"] = conf_["security.protocol"].upper()
    if "sasl.mechanism" in conf_:
        opts["kafka.sasl.mechanism"] = conf_["sasl.mechanism"]
    if conf_.get("sasl.username") and conf_.get("sasl.password"):
        jaas = (
            'org.apache.kafka.common.security.plain.PlainLoginModule required '
            f'username="{conf_["sasl.username"]}" password="{conf_["sasl.password"]}";'
        )
        opts["kafka.sasl.jaas.config"] = jaas
    for k in ("ssl.truststore.location","ssl.truststore.password",
              "ssl.keystore.location","ssl.keystore.password",
              "ssl.key.password","ssl.endpoint.identification.algorithm"):
        if k in conf_:
            opts["kafka." + k] = conf_[k]
    return opts

def ensure_delta_table(spark: SparkSession, path: str, schema: StructType):
    if not DeltaTable.isDeltaTable(spark, path):
        spark.createDataFrame([], schema).write.format("delta").mode("overwrite").save(path)

# ---------- JSON extraction helpers (no wrapper leakage) ----------
# Debezium Cassandra fields often look like:
#   "str_v": {"value": "foo", "deletion_ts": null, "set": true}
# or for null:
#   "str_v": {"value": null, "deletion_ts": null, "set": true}
# or sometimes raw scalar without wrapper.
#
# Rule:
#   if $.field.value is NOT NULL      -> use it
#   else if wrapper exists (has .set) -> return NULL
#   else                              -> use raw $.field (legacy / no wrapper)
def av_as(after_json_col, key: str, cast_type: DataType | None):
    v      = get_json_object(after_json_col, f"$.{key}.value")
    hasSet = get_json_object(after_json_col, f"$.{key}.set")
    raw    = get_json_object(after_json_col, f"$.{key}")
    # choose value; if wrapper present but value is null -> NULL; else fallback to raw
    chosen = F.when(v.isNotNull(), v) \
              .when(hasSet.isNotNull(), F.lit(None)) \
              .otherwise(raw)
    return chosen.cast(cast_type) if cast_type is not None else chosen

# ----- Silver builders -----
def build_silver_cf_df(bronze_one_topic_df):
    after_json = F.coalesce(
        get_json_object(col("value"), "$.after"),
        get_json_object(col("value"), "$.payload.after")
    ).alias("after_json")

    parsed = bronze_one_topic_df.select("topic", "timestamp", "value", after_json)

    a = lambda k, t=None: av_as(col("after_json"), k, t)

    silver = (
        parsed
        .where(col("after_json").isNotNull())
        .select(
            a("entity_type", StringType()).alias("entity_type"),
            a("entity_id",  StringType()).alias("entity_id"),
            a("key",        StringType()).alias("key"),
            a("partition",  LongType()).alias("partition"),
            a("ts",         LongType()).alias("ts"),
            a("str_v",      StringType()).alias("str_v"),
            a("long_v",     LongType()).alias("long_v"),
            a("dbl_v",      DoubleType()).alias("dbl_v"),
            a("bool_v",     BooleanType()).alias("bool_v"),
            a("json_v",     StringType()).alias("json_v"),
        )
        .dropDuplicates(["entity_type","entity_id","key","partition","ts"])
    )
    return silver

def build_silver_pcf_df(bronze_one_topic_df):
    after_json = F.coalesce(
        get_json_object(col("value"), "$.after"),
        get_json_object(col("value"), "$.payload.after")
    ).alias("after_json")

    parsed = bronze_one_topic_df.select("topic", "timestamp", "value", after_json)
    a = lambda k, t=None: av_as(col("after_json"), k, t)

    silver = (
        parsed
        .where(col("after_json").isNotNull())
        .select(
            a("entity_type", StringType()).alias("entity_type"),
            a("entity_id",  StringType()).alias("entity_id"),
            a("key",        StringType()).alias("key"),
            a("partition",  LongType()).alias("partition"),
        )
        .dropDuplicates(["entity_type","entity_id","key","partition"])
    )
    return silver

# ----- MERGE helper (UPSERT) -----
def merge_upsert(df, path, key_cols):
    if df.rdd.isEmpty():
        return
    target = DeltaTable.forPath(spark, path)
    cond = " AND ".join([f"t.{c}=s.{c}" for c in key_cols])
    (target.alias("t")
           .merge(df.alias("s"), cond)
           .whenMatchedUpdateAll()
           .whenNotMatchedInsertAll()
           .execute())

# ----- foreachBatch: Bronze append + Silver UPSERT -----
def process_batch(batch_df, batch_id):
    try:
        cnt = batch_df.count()
        print(f"[batch {batch_id}] rows in Kafka batch: {cnt}")
        if cnt == 0:
            return
    except Exception:
        pass

    bronze = batch_df.selectExpr(
        "CAST(topic AS STRING) AS topic",
        "CAST(value AS STRING) AS value",
        "timestamp"
    )

    bronze_cf  = bronze.where(col("topic") == TOPIC_CF)
    bronze_pcf = bronze.where(col("topic") == TOPIC_PCF)

    # Bronze append
    if bronze_cf.head(1):
        (bronze_cf.coalesce(COALESCE_N)
                  .write.format("delta").mode("append")
                  .save(BRONZE_CF_PATH))
    if bronze_pcf.head(1):
        (bronze_pcf.coalesce(COALESCE_N)
                    .write.format("delta").mode("append")
                    .save(BRONZE_PCF_PATH))

    # Silver views from this micro-batch
    silver_cf_df  = build_silver_cf_df(bronze_cf)   if bronze_cf.head(1)  else None
    silver_pcf_df = build_silver_pcf_df(bronze_pcf) if bronze_pcf.head(1) else None

    # UPSERT (keys match Cassandra PKs)
    if silver_cf_df is not None and not silver_cf_df.rdd.isEmpty():
        merge_upsert(silver_cf_df,  SILVER_CF_PATH,
                     key_cols=["entity_type","entity_id","key","partition","ts"])
    if silver_pcf_df is not None and not silver_pcf_df.rdd.isEmpty():
        merge_upsert(silver_pcf_df, SILVER_PCF_PATH,
                     key_cols=["entity_type","entity_id","key","partition"])

if __name__ == "__main__":
    spark = (
        SparkSession.builder
        .appName("cassandra-cdc-to-delta (single-query fanout, UPSERT silver)")
        .config("spark.sql.shuffle.partitions", "2")
        .getOrCreate()
    )

    # Pre-create Silver tables so path queries work before first batch
    ensure_delta_table(spark, SILVER_CF_PATH,  SILVER_CF_SCHEMA)
    ensure_delta_table(spark, SILVER_PCF_PATH, SILVER_PCF_SCHEMA)

    kafka_opts = build_kafka_options(conf, f"{TOPIC_CF},{TOPIC_PCF}")
    raw = (spark.readStream.format("kafka").options(**kafka_opts).load())

    print(f"[config] trigger={TRIGGER_MODE} "
          f"{TRIGGER_INTERVAL if TRIGGER_MODE=='processing' else ''} "
          f"coalesce_n={COALESCE_N} sleep={SLEEP_SECS}s "
          f"maxOffsetsPerTrigger={MAX_OFFSETS or 'ALL'}")

    try:
        if TRIGGER_MODE == "processing":
            (raw.writeStream
                 .queryName("cdc_fanout_upsert")
                 .trigger(processingTime=TRIGGER_INTERVAL)
                 .foreachBatch(process_batch)
                 .option("checkpointLocation", DRIVER_CP)
                 .start()
                 .awaitTermination())
        else:
            while True:
                q = (raw.writeStream
                        .queryName("cdc_fanout_upsert")
                        .trigger(availableNow=True)
                        .foreachBatch(process_batch)
                        .option("checkpointLocation", DRIVER_CP)
                        .start())
                q.awaitTermination()
                print(f"[cycle] drained. sleeping {SLEEP_SECS}s…")
                time.sleep(SLEEP_SECS)
    except KeyboardInterrupt:
        print("Interrupted, stopping…")
    finally:
        for q in list(spark.streams.active):
            try: q.stop()
            except Exception: pass
        spark.stop()
  • Required spark-defaults.conf
spark.sql.warehouse.dir /opt/spark/warehouse
spark.jars.packages com.github.jnr:jnr-posix:3.1.20,io.delta:delta-spark_2.12:3.3.2,io.delta:delta-storage:3.3.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.6
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
  • Run it
spark-submit --master spark://spark.maksonlee.com:7077 \
  --deploy-mode client \
  cdc_to_delta.py

  1. Verify the pipeline
  • Bronze & Silver row counts (Delta Lake)
spark-sql -e "SELECT 'bronze_cf'  AS t, COUNT(*) c FROM delta.\`/delta/bronze/tb_ts_kv_cf\`
UNION ALL SELECT 'bronze_pcf', COUNT(*) FROM delta.\`/delta/bronze/tb_ts_kv_partitions_cf\`
UNION ALL SELECT 'silver_cf',  COUNT(*) FROM delta.\`/delta/silver/tb_ts_kv_cf\`
UNION ALL SELECT 'silver_pcf', COUNT(*) FROM delta.\`/delta/silver/tb_ts_kv_partitions_cf\`"

Sample output:

bronze_cf       2236
bronze_pcf      26
silver_cf       1438
silver_pcf      13
Time taken: 12.86 seconds, Fetched 4 row(s)

Interpretation: Bronze is append-only (every change record), so it’s larger. Silver is deduplicated/upserted and should match the current Cassandra row counts.

  • Cross-check with Cassandra
cqlsh:thingsboard> select count(*) from thingsboard.ts_kv_cf;

Sample output:

 count
-------
  1438

(1 rows)

Cassandra ts_kv_cf count matches silver_cf (1438), confirming the upsert logic.

  • Peek at Silver rows
spark-sql -e "SELECT * FROM delta.\`/delta/silver/tb_ts_kv_cf\` LIMIT 5"

Sample output (before flattening CDC cell wrappers):

API_USAGE_STATE 9a0145c0-369f-11f0-98b8-2ddc48f3ce2c    ruleEngineExecutionCount        1754006400000   1754006400000   NULL    2294    NULL    NULL    NULL
API_USAGE_STATE bf03e990-369f-11f0-9653-d7a62aea0f5a    ruleEngineExecutionCount        1754006400000   1754006400000   NULL    2294    NULL    NULL    NULL
API_USAGE_STATE 9a0145c0-369f-11f0-98b8-2ddc48f3ce2c    inactiveDevicesCountHourly      1754006400000   1755270000000   NULL    10      NULL    NULL    NULL
API_USAGE_STATE 9a0145c0-369f-11f0-98b8-2ddc48f3ce2c    ruleEngineExecutionCountHourly  1754006400000   1755270000000   NULL    4       NULL    NULL    NULL
API_USAGE_STATE bf03e990-369f-11f0-9653-d7a62aea0f5a    ruleEngineExecutionCountHourly  1754006400000   1755270000000   NULL    4       NULL    NULL    NULL
Time taken: 8.794 seconds, Fetched 5 row(s)

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top