Introduction
Once you’ve synced ThingsBoard data into Delta Lake, including both entity data from PostgreSQL and time-series data from Cassandra, the next step is to filter and transform this raw data for analysis.
In this guide, we focus on a specific use case:
Process telemetry from devices with the temp-humidity
profile and build a clean Delta Lake table that includes only temperature
and humidity
readings.
Goal
We want to:
- Filter telemetry from devices with the
temp-humidity
profile and extract onlytemperature
andhumidity
telemetry - Join with device and profile metadata
- Normalize timestamps into readable formats
- Create a clean, analysis-ready Delta Lake table
Required Input Tables
Make sure the following Delta Lake tables are already available:
Table | Source | Description |
---|---|---|
thingsboard.ts_kv_cf | Cassandra | Raw telemetry key-values |
thingsboard.device | PostgreSQL | Device metadata |
thingsboard.device_profile | PostgreSQL | Device profile/type metadata |
Spark SQL Query
Use CREATE TABLE
to build the processed telemetry table and avoid overwrite errors:
CREATE TABLE thingsboard.temp_humidity_processed
USING DELTA
AS
SELECT
t.entity_id AS device_id,
d.name AS device_name,
p.name AS profile_name,
t.ts,
from_unixtime(t.ts / 1000) AS ts_datetime,
to_date(from_unixtime(t.ts / 1000)) AS event_date,
MAX(CASE WHEN t.key = 'temperature' THEN
COALESCE(CAST(t.dbl_v AS STRING), CAST(t.long_v AS STRING)) END) AS temperature,
MAX(CASE WHEN t.key = 'humidity' THEN
COALESCE(CAST(t.dbl_v AS STRING), CAST(t.long_v AS STRING)) END) AS humidity
FROM thingsboard.ts_kv_cf t
JOIN thingsboard.device d
ON t.entity_id = d.id
JOIN thingsboard.device_profile p
ON d.device_profile_id = p.id
WHERE t.entity_type = 'DEVICE'
AND p.name = 'temp-humidity'
AND t.key IN ('temperature', 'humidity')
GROUP BY t.entity_id, d.name, p.name, t.ts;
Explanation
- Filters telemetry from
DEVICE
type with thetemp-humidity
profile - Extracts only
temperature
andhumidity
values - Joins device and profile metadata
- Converts timestamps to:
ts_datetime
(human-readable datetime)event_date
(for partitioning and aggregation) - Uses
MAX(CASE WHEN ...)
to pivot ThingsBoard key-value rows into structured columns
Output Table
The resulting table thingsboard.temp_humidity_processed
will have this structure:
device_id | device_name | profile_name | ts | ts_datetime | event_date | temperature | humidity |
---|
It is:
- Clean and structured
- Ideal for aggregation and reporting
- Ready for BI dashboards or ML pipelines
What’s Next?
With this processed table in place, the next steps could include:
- Creating daily/hourly aggregation views
- Computing device health metrics
- Building dashboards with tools like Superset or Power BI
We’ll cover these in future guides.