Introduction
Once you’ve synced ThingsBoard data into Delta Lake, including both entity data from PostgreSQL and time-series data from Cassandra, the next step is to filter and transform this raw data for analysis.
In this guide, we focus on a specific use case:
Process telemetry from devices with the temp-humidity profile and build a clean Delta Lake table that includes only temperature and humidity readings.
Goal
We want to:
- Filter telemetry from devices with the
temp-humidityprofile and extract onlytemperatureandhumiditytelemetry - Join with device and profile metadata
- Normalize timestamps into readable formats
- Create a clean, analysis-ready Delta Lake table
Required Input Tables
Make sure the following Delta Lake tables are already available:
| Table | Source | Description |
|---|---|---|
thingsboard.ts_kv_cf | Cassandra | Raw telemetry key-values |
thingsboard.device | PostgreSQL | Device metadata |
thingsboard.device_profile | PostgreSQL | Device profile/type metadata |
Spark SQL Query
Use CREATE TABLE to build the processed telemetry table and avoid overwrite errors:
CREATE TABLE thingsboard.temp_humidity_processed
USING DELTA
AS
SELECT
t.entity_id AS device_id,
d.name AS device_name,
p.name AS profile_name,
t.ts,
from_unixtime(t.ts / 1000) AS ts_datetime,
to_date(from_unixtime(t.ts / 1000)) AS event_date,
MAX(CASE WHEN t.key = 'temperature' THEN
COALESCE(CAST(t.dbl_v AS STRING), CAST(t.long_v AS STRING)) END) AS temperature,
MAX(CASE WHEN t.key = 'humidity' THEN
COALESCE(CAST(t.dbl_v AS STRING), CAST(t.long_v AS STRING)) END) AS humidity
FROM thingsboard.ts_kv_cf t
JOIN thingsboard.device d
ON t.entity_id = d.id
JOIN thingsboard.device_profile p
ON d.device_profile_id = p.id
WHERE t.entity_type = 'DEVICE'
AND p.name = 'temp-humidity'
AND t.key IN ('temperature', 'humidity')
GROUP BY t.entity_id, d.name, p.name, t.ts;
Explanation
- Filters telemetry from
DEVICEtype with thetemp-humidityprofile - Extracts only
temperatureandhumidityvalues - Joins device and profile metadata
- Converts timestamps to:
ts_datetime(human-readable datetime)event_date(for partitioning and aggregation) - Uses
MAX(CASE WHEN ...)to pivot ThingsBoard key-value rows into structured columns
Output Table
The resulting table thingsboard.temp_humidity_processed will have this structure:
| device_id | device_name | profile_name | ts | ts_datetime | event_date | temperature | humidity |
|---|
It is:
- Clean and structured
- Ideal for aggregation and reporting
- Ready for BI dashboards or ML pipelines
What’s Next?
With this processed table in place, the next steps could include:
- Creating daily/hourly aggregation views
- Computing device health metrics
- Building dashboards with tools like Superset or Power BI
We’ll cover these in future guides.
