After filtering and transforming telemetry from ThingsBoard into a clean Delta Lake table, you can begin exploring the data interactively using JupyterLab and PySpark.
This guide walks through installing required packages, connecting to Spark, and performing aggregation, health classification, and outlier detection.
Prerequisites
Before starting, make sure:
- Spark with Delta Lake and Hive Metastore is installed
Install Apache Spark with Delta Lake and Hive Metastore on Ubuntu - JupyterLab is running on your Spark node
Install JupyterLab on Ubuntu 24.04 - Your Delta Lake table
thingsboard.temp_humidity_processed
includes:
device_id, device_name, ts_datetime (timestamp), event_date, temperature, humidity
- Install Required Packages
At the top of your notebook, install required packages:
import sys
!{sys.executable} -m pip install --quiet pyspark==3.5.6 pandas matplotlib
- Start SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("ThingsBoard IoT Analysis") \
.enableHiveSupport() \
.getOrCreate()
- Daily Aggregation
Aggregate temperature and humidity per device per day:
daily_df = spark.sql("""
SELECT
device_id,
device_name,
event_date,
AVG(temperature) AS avg_temperature,
AVG(humidity) AS avg_humidity
FROM thingsboard.temp_humidity_processed
GROUP BY device_id, device_name, event_date
""")
daily_df.show(10)
- Hourly Aggregation
Summarize values per device per hour:
hourly_df = spark.sql("""
SELECT
device_id,
device_name,
DATE(ts_datetime) AS event_date,
HOUR(ts_datetime) AS event_hour,
AVG(temperature) AS avg_temperature,
AVG(humidity) AS avg_humidity
FROM thingsboard.temp_humidity_processed
GROUP BY
device_id,
device_name,
DATE(ts_datetime),
HOUR(ts_datetime)
""")
hourly_df.show(10)
- Outlier Detection
Flag readings outside expected range:
outlier_df = spark.sql("""
SELECT *
FROM thingsboard.temp_humidity_processed
WHERE temperature NOT BETWEEN 10 AND 40
OR humidity NOT BETWEEN 20 AND 90
""")
outlier_df.show(10)
- Compare Device Behavior
Rank devices by average/min/max temperature and humidity:
behavior_df = spark.sql("""
SELECT
device_name,
AVG(temperature) AS avg_temp,
MIN(temperature) AS min_temp,
MAX(temperature) AS max_temp,
AVG(humidity) AS avg_humid
FROM thingsboard.temp_humidity_processed
GROUP BY device_name
ORDER BY avg_temp DESC
""")
behavior_df.show()
- Visualize
Plot average temperature per device:
import pandas as pd
import matplotlib.pyplot as plt
pandas_df = behavior_df.toPandas()
pandas_df.plot(kind="bar", x="device_name", y="avg_temp", legend=False)
plt.ylabel("Avg Temperature (°C)")
plt.title("Average Temperature per Device")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()