気が付いたら今年の前半戦も終盤に...
はじめに
以前こちらの記事で Raspberry Pi Zero と環境センサーで室内の温度・湿度・気圧を計測し Grafana Cloud に連携する話を書きました。
同様のことを Raspberry Pi Pico W で実現したいと考えているのですが、そこで問題になるのがどうやってメトリクスを Prometheus に送信するかです。
今回はその問題の解決として作成した Raspberry Pi Pico W から Prometheus にメトリクスを送信するためのライブラリとその使用例にいて書こうと思います。
Remote Write エンドポイント
上の記事では Grafana Agent がメトリクスの Prometheus への送信周りをよしなにやってくれていましたが、Raspberry Pi Pico W ではそのような便利なものは使えません。
また、Send Metrics to Grafana Cloud | Grafana Cloud documentation に Grafana Cloud の Prometheus にメトリクスを送信する方法がいくつか記載されていますが、いずれも Raspberry Pi Pico W 単体では利用できない方法になります。
そこで Prometheus の Remote Write エンドポイントに直接メトリクスを送信することを考えてみます。
Remote Write の仕様はこちらにあります:
仕様自体は簡単なものなのですが、データを送信する際に Protocol Buffers 形式でシリアライズする必要があること、Snappy で圧縮する必要があることが問題になります。
いずれも Python 3 だとめぼしいライブラリがあるのですが、MicroPython (※) だと使えそうなものは見つかりませんでした。
※ Raspberry Pi Pico では C/C++ と Python の SDK が用意されていますが、筆者は C/C++ を普段使わないので Python (MicroPython) で実装する前提になってます
prometheus_remote_write_payload
ということで自作したものがこちらになります:
Protocol Buffers のシリアライズは自前で実装しました。
# Protocol Buffers のシリアライズ部分のコード def get_payload(self) -> bytes: payload = b"" for timeseries in self.timeseries: timeseries_data = b"" for label in timeseries.labels: label_data = (1 << 3 | 2).to_bytes(1, "little") label_data += data_with_length(label.name.encode("utf-8")) label_data += (2 << 3 | 2).to_bytes(1, "little") label_data += data_with_length(label.value.encode("utf-8")) timeseries_data += (1 << 3 | 2).to_bytes(1, "little") timeseries_data += data_with_length(label_data) for sample in timeseries.samples: sample_data = (1 << 3 | 1).to_bytes(1, "little") sample_data += struct.pack("<d", sample.value) sample_data += (2 << 3 | 0).to_bytes(1, "little") sample_data += int_to_varint(sample.timestamp) timeseries_data += (2 << 3 | 2).to_bytes(1, "little") timeseries_data += data_with_length(sample_data) payload += (1 << 3 | 2).to_bytes(1, "little") payload += data_with_length(timeseries_data) return no_compress_snappy(payload)
実装は以下の記事が参考になりました:
Snappy は圧縮は行わずに元のデータを全てリテラル(非圧縮部分)にすることで フォーマット だけ合わせるようにしてます。 あんまりよくないですが、実装大変そうなのと送るデータ量も多くないので手を抜いてます。
# Snappy データの作成部分のコード def no_compress_snappy(data: bytes): snappy = int_to_varint(len(data)) for chunk in [data[i : i + 60] for i in range(0, len(data), 60)]: chunk_len = len(chunk) snappy += ((chunk_len - 1) << 2).to_bytes(1, "little") snappy += chunk return snappy
使い方
mip 用に package.json を配置しているので次のコードを実行することでインストールできます:
# Wi-Fi 接続必要です >>> import mip >>> mip.install("github:ttk1/prometheus_remote_write_payload")
README にも記載していますが、以下のようなコードで Prometheus にメトリクスを送信できます:
import utime import network import urequests import ntptime from prometheus_remote_write_payload import PrometheusRemoteWritePayload # Wi-Fi に接続 ssid = "ssid" password = "password for ssid" wlan = network.WLAN(network.STA_IF) wlan.active(True) wlan.connect(ssid, password) utime.sleep(5) if wlan.status() != 3: raise Exception("network connection failed") # メトリクスにタイムスタンプを指定する必要があるので # 時刻を同期する ntptime.settime() # Prometheus にメトリクスを送信 prometheus = PrometheusRemoteWritePayload() prometheus.add_data( "test_test", {"instance": "test_micropython"}, 987.654, int(utime.time() * 1000) ) prometheus_remote_write_endpoint = "https://prometheus-remote-write-endpoint" prometheus_remote_write_endpoint_basic_auth = ("user", "password") urequests.post( prometheus_remote_write_endpoint, data=prometheus.get_payload(), auth=prometheus_remote_write_endpoint_basic_auth, )
使用例
環境センサーで室内の温度・湿度・気圧を計測しそのデータを Grafana Cloud の Prometheus に送信するようにしてみます。 センサーは BME680 モジュール を使用します。
BME680 モジュールからのデータの読み取りは同じく自作の以下のライブラリを使用します。
こちらも prometheus_remote_write_payload と同様 mip でインストールできます:
>>> mip.install("github:ttk1/bme680-py")
認証情報などの設定は config.py として別に作成しておきます:
# 値はそれぞれの環境に応じて設定してください wifi_ssid = "" wifi_password = "" prometheus_endpoint = "" prometheus_user = "" prometheus_password = "" instance = ""
main.py は次のように作成します:
import utime import network import urequests import ntptime import machine from bme680 import BME680 from prometheus_remote_write_payload import PrometheusRemoteWritePayload import config def sleep(seconds: int, wdt: machine.WDT): while seconds - 5 > 0: wdt.feed() utime.sleep(5) seconds -= 5 wdt.feed() utime.sleep(seconds) wdt.feed() def main(wdt: machine.WDT): ssid = config.wifi_ssid password = config.wifi_password wlan = network.WLAN(network.STA_IF) wlan.active(True) wlan.connect(ssid, password) sleep(5, wdt) if wlan.status() != 3: raise Exception("network connection failed") ntptime.settime() prometheus_remote_write_endpoint = config.prometheus_endpoint prometheus_remote_write_endpoint_basic_auth = ( config.prometheus_user, config.prometheus_password, ) device = BME680() while True: device.measure() timestamp = int(utime.time() * 1000) labels = {"instance": config.instance} prometheus = PrometheusRemoteWritePayload() prometheus.add_data( "home_env_temp", labels, device.temp, timestamp, ) prometheus.add_data( "home_env_press", labels, device.press / 100, timestamp, ) prometheus.add_data( "home_env_hum", labels, device.hum, timestamp, ) urequests.post( prometheus_remote_write_endpoint, data=prometheus.get_payload(), auth=prometheus_remote_write_endpoint_basic_auth, ) sleep(10, wdt) if __name__ == "__main__": wdt = machine.WDT(timeout=8 * 1000) while True: try: main(wdt) except Exception as e: sleep(60, wdt)
メトリクスの送信が終わったら 10 秒スリープし、次の送信に続くようになっています。
ネットワーク通信の不具合などで処理が止まってしまわないように
machine.WDT(timeout=8 * 1000)
で ウォッチドッグタイマー を設定しています。
この設定では 8 秒以内の間隔で wdt.feed()
を呼び出し続ける必要があります。
そのため sleep(seconds: int, wdt: machine.WDT)
関数を作成し長い時間のスリープを行っています。
実行結果
Grafana Cloud 上で送信したデータを確認できました。 特に Snappy の部分がうまく実装できてる自信がなかったのですが、問題なさそうでホッとしました。
まとめ
- Raspberry Pi Pico W だけで Grafana Cloud の Prometheus にメトリクスが送信できた!
- MicroPython はいいぞ!
- Raspberry Pi Pico はいいぞ!