In this post, I will show how to run a Python-based stock data collector that publishes U.S. stock market data into ThingsBoard.
The project is available on GitHub:
https://github.com/maksonlee/stock-iot-collector
This collector is designed to treat stock symbols like IoT devices. Instead of collecting temperature, humidity, or sensor values, it collects daily stock OHLCV data and publishes each stock as a ThingsBoard virtual device.
For example:
stock.AAPL
stock.MSFT
stock.TSLAThe data flow looks like this:
Polygon / Massive API
↓
stock-iot-collector
↓ MQTT Gateway API
ThingsBoard Gateway Device
↓
stock.AAPL
stock.MSFT
stock.TSLAOne important clarification: this is not a real-time trading data pipeline.
By default, the collector uses Polygon/Massive daily End-of-Day data. That means each stock gets one OHLCV bar per U.S. market day. The price value sent to ThingsBoard is the daily close price.
The collector runs once per hour by default, but this does not mean it receives hourly stock prices. The hourly schedule is mainly used to retry and synchronize the previous U.S. market weekday’s daily data.
Environment Used in This Post
This post is based on my existing ThingsBoard CE Kubernetes environment, which was deployed in this previous guide:
The environment is a 3-node bare-metal Kubernetes cluster running on Ubuntu 24.04.
The base environment includes:
Kubernetes v1.34
Ubuntu Server 24.04
containerd
Calico
kube-vip
MetalLB
Traefik
cert-manager
Ceph RBD
ThingsBoard CE microservices
PostgreSQL
Cassandra
Kafka
Zookeeper
ValkeyThingsBoard CE is deployed in microservices mode with a hybrid database setup:
PostgreSQL for entities
Cassandra for time-series data
Kafka, Zookeeper, and Valkey for messaging and cachingThe ThingsBoard UI is exposed through Traefik:
https://tb.maksonlee.comMQTT over TLS is exposed separately through MetalLB:
mqtts://mqtt.maksonlee.com:8883The MQTT path does not go through Traefik. Instead, port 8883 is exposed directly to the tb-mqtt-transport service through a LoadBalancer service.
This keeps MQTT as a raw TCP path and allows TLS and X.509 client certificate authentication to be handled by the ThingsBoard MQTT transport itself.
This stock collector reuses that existing ThingsBoard MQTT over TLS endpoint. It does not require changing the main ThingsBoard deployment. It only adds a new Kubernetes deployment that publishes stock telemetry into ThingsBoard through the MQTT Gateway API.
What This Project Does
The default behavior is:
Collect all U.S. stocks
Use daily End-of-Day OHLCV data
Use close price as the main price telemetry
Run once per hour
Publish each stock as a ThingsBoard virtual device
Use MQTT over TLS with X.509 client certificate authenticationEach stock is published under a ThingsBoard gateway device.
For example, the ticker AAPL becomes:
stock.AAPLThe telemetry includes values such as:
open
high
low
close
price
volume
vwap
transaction_count
bar_timestamp_msThe price field is the same as the daily close price.
The ThingsBoard telemetry timestamp uses the market bar timestamp from Polygon/Massive, not the time when the collector publishes the message.
Why Use ThingsBoard for Stock Data?
ThingsBoard is usually used for IoT telemetry, but the model also works well for stock market monitoring.
A stock can be treated like a virtual device:
Device: stock.AAPL
Telemetry:
price
open
high
low
close
volumeThis makes it possible to use ThingsBoard dashboards, alarms, rule chains, and historical telemetry storage for stock data.
For example, you could create dashboards for:
Daily close price
Daily volume
Price changes
Market watchlists
Stock alertsThis project is not intended for trading execution. It is a delayed telemetry ingestion pipeline for monitoring, dashboards, and alerts.
Project Layout
The project structure is simple:
stock-iot-collector/
├── app/
│ ├── main.py
│ ├── collector.py
│ ├── config.py
│ ├── providers/
│ │ └── polygon.py
│ ├── sinks/
│ │ └── thingsboard.py
│ └── utils/
│ └── time_utils.py
├── config/
│ └── stocks.yaml
├── docker/
│ └── Dockerfile
├── k8s/
│ ├── namespace.yaml
│ ├── configmap.yaml
│ ├── secret.example.yaml
│ └── deployment.yaml
├── requirements.txt
└── README.mdThe important parts are:
app/providers/polygon.pyThis handles data collection from Polygon/Massive.
app/sinks/thingsboard.pyThis handles publishing telemetry to ThingsBoard through the MQTT Gateway API.
config/stocks.yamlThis controls which stocks should be collected.
k8s/This contains Kubernetes deployment files.
Default Stock Selection
By default, the collector uses:
stocks:
- symbol: "*"This means it collects all U.S. stocks returned by the Polygon/Massive grouped daily endpoint.
If you only want a smaller watchlist, you can change it to something like this:
stocks:
- symbol: AAPL
- symbol: MSFT
- symbol: NVDAFor this setup, I use the default all-stock mode.
Required Environment Variables
The required environment variables are:
POLYGON_API_KEY
THINGSBOARD_MQTT_HOST
THINGSBOARD_MQTT_CA_CERT
THINGSBOARD_MQTT_CLIENT_CERT
THINGSBOARD_MQTT_CLIENT_KEYThese are required because the collector needs to connect to Polygon/Massive and also publish telemetry to ThingsBoard using MQTT over TLS with X.509 authentication.
Optional Environment Variables
The collector also supports optional environment variables:
| Variable | Default | Description |
|---|---|---|
STOCK_CONFIG_PATH | ./config/stocks.yaml locally, otherwise /etc/stock-iot-collector/stocks.yaml | Stock config path |
POLYGON_TIMESPAN | day | day for daily bars or minute for minute bars |
DAILY_MARKET_DAYS_AGO | 1 | Collect previous N U.S. market weekdays |
POLL_INTERVAL_SECONDS | 3600 | How often the collector runs |
PUBLISH_CHUNK_SIZE | 50 | Number of virtual devices per MQTT gateway telemetry message |
DELAY_MINUTES | 20 | Mainly used by minute mode |
THINGSBOARD_MQTT_PORT | 8883 | MQTT TLS port |
THINGSBOARD_MQTT_CLIENT_ID | stock-collector-01 | MQTT client ID |
MQTT_KEEPALIVE_SECONDS | 60 | MQTT keepalive |
For most deployments, the default values are enough.
If ThingsBoard is under pressure when publishing all U.S. stocks, lower the chunk size:
PUBLISH_CHUNK_SIZE=10ThingsBoard Requirements
This post assumes you already have ThingsBoard CE running on Kubernetes.
In my case, I use the 3-node bare-metal Kubernetes ThingsBoard CE environment from the previous post. The important endpoint for this collector is the MQTT over TLS endpoint:
mqtt.maksonlee.com:8883The collector connects to this endpoint using MQTT over TLS with X.509 client certificate authentication.
Before running the collector, create one ThingsBoard gateway device.
Example:
Device name: stock-collector-01
Is gateway: enabled
Credentials: X.509 certificateThe collector publishes to the ThingsBoard MQTT Gateway API topic:
v1/gateway/telemetryExample payload:
{
"stock.AAPL": [
{
"ts": 1777302300000,
"values": {
"symbol": "AAPL",
"open": 173.2,
"high": 173.5,
"low": 173.1,
"close": 173.4,
"price": 173.4,
"volume": 1200345,
"source": "polygon",
"aggregate_timespan": "day",
"bar_timestamp_ms": 1777320000000
}
}
]
}The gateway certificate and private key must match the ThingsBoard gateway device credentials.
Local Python Test
Clone the repository:
git clone https://github.com/maksonlee/stock-iot-collector.git
cd stock-iot-collectorCreate a Python virtual environment:
python3 -m venv .venv
source .venv/bin/activateInstall dependencies:
pip install -r requirements.txtSet the required environment variables:
export POLYGON_API_KEY="your_polygon_api_key"
export THINGSBOARD_MQTT_HOST="mqtt.maksonlee.com"
export THINGSBOARD_MQTT_CA_CERT="/path/to/ca.crt"
export THINGSBOARD_MQTT_CLIENT_CERT="/path/to/client.crt"
export THINGSBOARD_MQTT_CLIENT_KEY="/path/to/client.key"Run the collector:
python -m app.mainIf the connection is successful, you should see logs similar to:
Loaded ... stock(s), polygon_timespan=day
MQTT connection accepted: Success
Connected to ThingsBoard MQTT host=...
Published telemetry for ... stock(s)
Sleeping for 3600 second(s)Build the Docker Image
Build the Docker image:
docker build -t stock-iot-collector:0.1.0 -f docker/Dockerfile .Tag the image for Docker Hub:
docker tag stock-iot-collector:0.1.0 cdlee/stock-iot-collector:0.1.0Push it:
docker push cdlee/stock-iot-collector:0.1.0Run with Docker
Example Docker run command:
docker run --rm \
-e POLYGON_API_KEY="your_polygon_api_key" \
-e THINGSBOARD_MQTT_HOST="mqtt.maksonlee.com" \
-e THINGSBOARD_MQTT_CA_CERT="/etc/stock-iot-collector/certs/ca.crt" \
-e THINGSBOARD_MQTT_CLIENT_CERT="/etc/stock-iot-collector/certs/client.crt" \
-e THINGSBOARD_MQTT_CLIENT_KEY="/etc/stock-iot-collector/certs/client.key" \
-v "$PWD/config/stocks.yaml:/etc/stock-iot-collector/stocks.yaml:ro" \
-v "/path/to/certs:/etc/stock-iot-collector/certs:ro" \
cdlee/stock-iot-collector:0.1.0The certificate directory should contain:
ca.crt
client.crt
client.keyIf your CA file is named differently, adjust the environment variable accordingly.
Kubernetes Deployment
The repository includes Kubernetes manifests under:
k8s/The deployment uses the Docker Hub image:
image: cdlee/stock-iot-collector:0.1.0If you publish a newer image tag later, update k8s/deployment.yaml.
- Copy the Project to the Kubernetes Machine
On your Kubernetes admin node, copy or clone the project:
git clone https://github.com/maksonlee/stock-iot-collector.git
cd stock-iot-collectorIn my environment, I run the commands from the Kubernetes admin node where kubectl is already configured.
- Create the Namespace
Apply the namespace manifest:
kubectl apply -f k8s/namespace.yamlThe namespace is:
stock- Create the Kubernetes Secret
The collector needs:
Polygon/Massive API key
MQTT CA certificate
MQTT client certificate
MQTT client private keySet your API key first:
export POLYGON_API_KEY="your_polygon_api_key"Create or update the secret:
kubectl create secret generic stock-iot-collector-secret \
-n stock \
--from-literal=POLYGON_API_KEY="$POLYGON_API_KEY" \
--from-file=ca.crt=full-ca.crt \
--from-file=client.crt=client.crt \
--from-file=client.key=client.key \
--dry-run=client -o yaml | kubectl apply -f -Expected local files:
full-ca.crt
client.crt
client.keyIn this example, full-ca.crt is stored in the Kubernetes secret as ca.crt.
The client certificate and key must match the ThingsBoard gateway device credentials.
- Configure the Collector
Edit:
k8s/configmap.yamlSet your ThingsBoard MQTT host:
THINGSBOARD_MQTT_HOST: "mqtt.maksonlee.com"The default stock configuration collects all U.S. stocks:
stocks.yaml: |
stocks:
- symbol: "*"If ThingsBoard is under pressure, lower the publish chunk size:
PUBLISH_CHUNK_SIZE: "10"The default value is:
PUBLISH_CHUNK_SIZE: "50"Use 10 if your ThingsBoard or MQTT transport cannot handle larger chunks reliably.
- Deploy
Apply the ConfigMap:
kubectl apply -f k8s/configmap.yamlApply the Deployment:
kubectl apply -f k8s/deployment.yamlCheck the pod:
kubectl get pods -n stockWatch the logs:
kubectl logs -n stock -l app=stock-iot-collector -fHealthy logs should look similar to:
Loaded ... stock(s), polygon_timespan=day
MQTT connection accepted: Success
Connected to ThingsBoard MQTT host=...
Collecting target_time_utc=...
Built grouped daily payload for ... stock(s)
Published telemetry for ... stock(s)
Sleeping for 3600 second(s)Did this guide save you time?
Support this site