記憶力が無い

プログラミングと室内園芸と何か

Raspberry Pi Pico W から Prometheus にメトリクスを送信する

気が付いたら今年の前半戦も終盤に...

はじめに

blog.ttk1.net

以前こちらの記事で Raspberry Pi Zero と環境センサーで室内の温度・湿度・気圧を計測し Grafana Cloud に連携する話を書きました。

同様のことを Raspberry Pi Pico W で実現したいと考えているのですが、そこで問題になるのがどうやってメトリクスを Prometheus に送信するかです。

今回はその問題の解決として作成した Raspberry Pi Pico W から Prometheus にメトリクスを送信するためのライブラリとその使用例にいて書こうと思います。

Remote Write エンドポイント

上の記事では Grafana Agent がメトリクスの Prometheus への送信周りをよしなにやってくれていましたが、Raspberry Pi Pico W ではそのような便利なものは使えません。

また、Send Metrics to Grafana Cloud | Grafana Cloud documentation に Grafana Cloud の Prometheus にメトリクスを送信する方法がいくつか記載されていますが、いずれも Raspberry Pi Pico W 単体では利用できない方法になります。

そこで Prometheus の Remote Write エンドポイントに直接メトリクスを送信することを考えてみます。

Remote Write の仕様はこちらにあります:

prometheus.io

仕様自体は簡単なものなのですが、データを送信する際に Protocol Buffers 形式でシリアライズする必要があること、Snappy で圧縮する必要があることが問題になります。

いずれも Python 3 だとめぼしいライブラリがあるのですが、MicroPython (※) だと使えそうなものは見つかりませんでした。
※ Raspberry Pi Pico では C/C++ と Python の SDK が用意されていますが、筆者は C/C++ を普段使わないので Python (MicroPython) で実装する前提になってます

prometheus_remote_write_payload

ということで自作したものがこちらになります:

github.com

Protocol Buffers のシリアライズは自前で実装しました。

# Protocol Buffers のシリアライズ部分のコード
    def get_payload(self) -> bytes:
        payload = b""
        for timeseries in self.timeseries:
            timeseries_data = b""
            for label in timeseries.labels:
                label_data = (1 << 3 | 2).to_bytes(1, "little")
                label_data += data_with_length(label.name.encode("utf-8"))
                label_data += (2 << 3 | 2).to_bytes(1, "little")
                label_data += data_with_length(label.value.encode("utf-8"))
                timeseries_data += (1 << 3 | 2).to_bytes(1, "little")
                timeseries_data += data_with_length(label_data)
            for sample in timeseries.samples:
                sample_data = (1 << 3 | 1).to_bytes(1, "little")
                sample_data += struct.pack("<d", sample.value)
                sample_data += (2 << 3 | 0).to_bytes(1, "little")
                sample_data += int_to_varint(sample.timestamp)
                timeseries_data += (2 << 3 | 2).to_bytes(1, "little")
                timeseries_data += data_with_length(sample_data)
            payload += (1 << 3 | 2).to_bytes(1, "little")
            payload += data_with_length(timeseries_data)
        return no_compress_snappy(payload)

実装は以下の記事が参考になりました:

engineering.mercari.com

Snappy は圧縮は行わずに元のデータを全てリテラル(非圧縮部分)にすることで フォーマット だけ合わせるようにしてます。 あんまりよくないですが、実装大変そうなのと送るデータ量も多くないので手を抜いてます。

# Snappy データの作成部分のコード
def no_compress_snappy(data: bytes):
    snappy = int_to_varint(len(data))
    for chunk in [data[i : i + 60] for i in range(0, len(data), 60)]:
        chunk_len = len(chunk)
        snappy += ((chunk_len - 1) << 2).to_bytes(1, "little")
        snappy += chunk
    return snappy

使い方

mip 用に package.json を配置しているので次のコードを実行することでインストールできます:

# Wi-Fi 接続必要です
>>> import mip
>>> mip.install("github:ttk1/prometheus_remote_write_payload")

README にも記載していますが、以下のようなコードで Prometheus にメトリクスを送信できます:

import utime
import network
import urequests
import ntptime

from prometheus_remote_write_payload import PrometheusRemoteWritePayload

# Wi-Fi に接続
ssid = "ssid"
password = "password for ssid"
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)
utime.sleep(5)
if wlan.status() != 3:
    raise Exception("network connection failed")

# メトリクスにタイムスタンプを指定する必要があるので
# 時刻を同期する
ntptime.settime()

# Prometheus にメトリクスを送信
prometheus = PrometheusRemoteWritePayload()
prometheus.add_data(
    "test_test", {"instance": "test_micropython"}, 987.654, int(utime.time() * 1000)
)
prometheus_remote_write_endpoint = "https://prometheus-remote-write-endpoint"
prometheus_remote_write_endpoint_basic_auth = ("user", "password")
urequests.post(
    prometheus_remote_write_endpoint,
    data=prometheus.get_payload(),
    auth=prometheus_remote_write_endpoint_basic_auth,
)

使用例

環境センサーで室内の温度・湿度・気圧を計測しそのデータを Grafana Cloud の Prometheus に送信するようにしてみます。 センサーは BME680 モジュール を使用します。

BME680 モジュールからのデータの読み取りは同じく自作の以下のライブラリを使用します。

github.com

こちらも prometheus_remote_write_payload と同様 mip でインストールできます:

>>> mip.install("github:ttk1/bme680-py")

認証情報などの設定は config.py として別に作成しておきます:

# 値はそれぞれの環境に応じて設定してください
wifi_ssid = ""
wifi_password = ""
prometheus_endpoint = ""
prometheus_user = ""
prometheus_password = ""
instance = ""

main.py は次のように作成します:

import utime
import network
import urequests
import ntptime
import machine

from bme680 import BME680
from prometheus_remote_write_payload import PrometheusRemoteWritePayload

import config


def sleep(seconds: int, wdt: machine.WDT):
    while seconds - 5 > 0:
        wdt.feed()
        utime.sleep(5)
        seconds -= 5
    wdt.feed()
    utime.sleep(seconds)
    wdt.feed()


def main(wdt: machine.WDT):
    ssid = config.wifi_ssid
    password = config.wifi_password
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    wlan.connect(ssid, password)
    sleep(5, wdt)
    if wlan.status() != 3:
        raise Exception("network connection failed")
    ntptime.settime()

    prometheus_remote_write_endpoint = config.prometheus_endpoint
    prometheus_remote_write_endpoint_basic_auth = (
        config.prometheus_user,
        config.prometheus_password,
    )
    device = BME680()

    while True:
        device.measure()
        timestamp = int(utime.time() * 1000)
        labels = {"instance": config.instance}
        prometheus = PrometheusRemoteWritePayload()
        prometheus.add_data(
            "home_env_temp",
            labels,
            device.temp,
            timestamp,
        )
        prometheus.add_data(
            "home_env_press",
            labels,
            device.press / 100,
            timestamp,
        )
        prometheus.add_data(
            "home_env_hum",
            labels,
            device.hum,
            timestamp,
        )
        urequests.post(
            prometheus_remote_write_endpoint,
            data=prometheus.get_payload(),
            auth=prometheus_remote_write_endpoint_basic_auth,
        )
        sleep(10, wdt)


if __name__ == "__main__":
    wdt = machine.WDT(timeout=8 * 1000)
    while True:
        try:
            main(wdt)
        except Exception as e:
            sleep(60, wdt)

メトリクスの送信が終わったら 10 秒スリープし、次の送信に続くようになっています。

ネットワーク通信の不具合などで処理が止まってしまわないように machine.WDT(timeout=8 * 1000)ウォッチドッグタイマー を設定しています。 この設定では 8 秒以内の間隔で wdt.feed() を呼び出し続ける必要があります。 そのため sleep(seconds: int, wdt: machine.WDT) 関数を作成し長い時間のスリープを行っています。

実行結果

Grafana Cloud 上で送信したデータを確認できました。 特に Snappy の部分がうまく実装できてる自信がなかったのですが、問題なさそうでホッとしました。

まとめ

  • Raspberry Pi Pico W だけで Grafana Cloud の Prometheus にメトリクスが送信できた!
  • MicroPython はいいぞ!
  • Raspberry Pi Pico はいいぞ!
Copyright © 2017 ttk1