Filter and Transform ThingsBoard IoT Data for Analysis Using Spark SQL

Introduction

Once you’ve synced ThingsBoard data into Delta Lake, including both entity data from PostgreSQL and time-series data from Cassandra, the next step is to filter and transform this raw data for analysis.

In this guide, we focus on a specific use case:
Process telemetry from devices with the temp-humidity profile and build a clean Delta Lake table that includes only temperature and humidity readings.


Goal

We want to:

  • Filter telemetry from devices with the temp-humidity profile and extract only temperature and humidity telemetry
  • Join with device and profile metadata
  • Normalize timestamps into readable formats
  • Create a clean, analysis-ready Delta Lake table

Required Input Tables

Make sure the following Delta Lake tables are already available:

TableSourceDescription
thingsboard.ts_kv_cfCassandraRaw telemetry key-values
thingsboard.devicePostgreSQLDevice metadata
thingsboard.device_profilePostgreSQLDevice profile/type metadata

Spark SQL Query

Use CREATE TABLE to build the processed telemetry table and avoid overwrite errors:

CREATE TABLE thingsboard.temp_humidity_processed
USING DELTA
AS
SELECT
  t.entity_id AS device_id,
  d.name AS device_name,
  p.name AS profile_name,
  t.ts,
  from_unixtime(t.ts / 1000) AS ts_datetime,
  to_date(from_unixtime(t.ts / 1000)) AS event_date,

  MAX(CASE WHEN t.key = 'temperature' THEN 
      COALESCE(CAST(t.dbl_v AS STRING), CAST(t.long_v AS STRING)) END) AS temperature,

  MAX(CASE WHEN t.key = 'humidity' THEN 
      COALESCE(CAST(t.dbl_v AS STRING), CAST(t.long_v AS STRING)) END) AS humidity

FROM thingsboard.ts_kv_cf t
JOIN thingsboard.device d
  ON t.entity_id = d.id
JOIN thingsboard.device_profile p
  ON d.device_profile_id = p.id
WHERE t.entity_type = 'DEVICE'
  AND p.name = 'temp-humidity'
  AND t.key IN ('temperature', 'humidity')
GROUP BY t.entity_id, d.name, p.name, t.ts;

Explanation

  • Filters telemetry from DEVICE type with the temp-humidity profile
  • Extracts only temperature and humidity values
  • Joins device and profile metadata
  • Converts timestamps to:
    ts_datetime (human-readable datetime)
    event_date (for partitioning and aggregation)
  • Uses MAX(CASE WHEN ...) to pivot ThingsBoard key-value rows into structured columns

Output Table

The resulting table thingsboard.temp_humidity_processed will have this structure:

device_iddevice_nameprofile_nametsts_datetimeevent_datetemperaturehumidity

It is:

  • Clean and structured
  • Ideal for aggregation and reporting
  • Ready for BI dashboards or ML pipelines

What’s Next?

With this processed table in place, the next steps could include:

  • Creating daily/hourly aggregation views
  • Computing device health metrics
  • Building dashboards with tools like Superset or Power BI

We’ll cover these in future guides.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top