Raspberry Pi Zero WHを使用したSwitchBotデータの収集とグラフ作成

目的

SwitchBot の気温や湿度を Windows 上に表示する。

環境

概要

  • SwitchBot ハブは持っていないため SwitchBot API は使わず Bluetooth(BLE) 接続で端末から直接取得する。
  • データの取得は Raspi zero で一定時間ごとに行い、取得したデータは Google スプレッドシートに記録する。
  • Google スプレッドシートのデータをグラフ化し、デスクトップアプリのような形で表示する。

開発

Raspberry Pi Zero WH の Bluetooth 確認

# hciconfig
ci0:   Type: Primary  Bus: UART
    BD Address: B8:27:EB:7D:69:B4  ACL MTU: 1021:8  SCO MTU: 64:1
    UP RUNNING
    RX bytes:3610 acl:0 sco:0 events:256 errors:0
    TX bytes:33188 acl:0 sco:0 commands:256 errors:0

"UP RUNNING" なので OK

Python 関連のインストール

pip3 に切り替えるタイミングってどこが良いのだろう。

# apt-get update
# apt-get install libbluetooth-dev libglib2.0-dev libboost-python-dev libboost-thread-dev
# apt-get install python3-pip
# apt-get install python3-gattlib
# apt-get install python3.11-venv
# python3 -m venv myenv
# source myenv/bin/activate
# pip3 install bluepy
# pip3 install gspread
# pip3 install oauth2client

PythonBluetooth 検知

SwitchBot アプリから確認できる MAC アドレスが実行結果に含まれていることを確認。 アドレスタイプが random なので電池切れなどの端末再起動で MAC アドレス変わるかも。

参考

SwitchBot 温湿度計の値を取得

どうも温湿度計(液晶あり)の方法では防水温湿度計(液晶なし)はうまくいかないらしい。

参考

SwitchBot 防水温湿度計の値を取得

バッテリー残量、温度、湿度が取得できたのを確認。

参考

PythonGoogle スプレッドシートに書き込み

SwitchBot からデータ取得、スプレッドシートに書き込みまでするプログラム。

#!/usr/bin/python3

import sys
import datetime
import logging

# Switch-Botからデータ取得のためのインポート
import binascii
from bluepy.btle import Scanner, DefaultDelegate, BTLEDisconnectError

# GoogleSpreadSheetに書き込むためのインポート
import requests
import gspread
from oauth2client.service_account import ServiceAccountCredentials

logging.basicConfig(level=logging.ERROR)

class ScanDelegate(DefaultDelegate):
    def __init__(self, addr, name):
        DefaultDelegate.__init__(self)
        self.addr = addr
        self.name = name
        self.processed_devices = set()  # 処理済みのデバイスの初期化

    def handleDiscovery(self, dev, isNewDev, isNewData):
        # アドレスチェック
        if dev.addr != self.addr:
            return

        # 処理済みでないかチェック
        if dev.addr in self.processed_devices:
            return

        # 主処理
        for (adtype, desc, value) in dev.getScanData():
            # 気温、湿度を読み込んでスプレッドシートに登録
            if(adtype == 22):
                servicedata = binascii.unhexlify(value[4:])
                battery = servicedata[2] & 0b01111111

                timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                #print(self.name + "," + timestamp + "," + str(battery) + "," + str(temp) + "," + str(humid))
                self.writeSpreadsheet(self.name, timestamp, battery, temp, humid)

                # 処理したデバイスのアドレスを記録
                self.processed_devices.add(dev.addr)

                return
            elif(adtype == 255):
                madata = binascii.unhexlify(value[16:])
                humid = madata[4] & 0b01111111
                temp = (madata[2] & 0b00001111) / 10 + (madata[3] & 0b01111111)
                isOverZero=(madata[3]&0b10000000)
                if not isOverZero:
                    temp = -temp
                continue
            else :
                continue

    def writeSpreadsheet(self, name, timestamp, battery, temp, humid):
        # スプレッドシート
        key_name = '{ jsonファイルを指定 }'
        book_name = '{ Spreadsheet のファイル名 }'
        sheet_name = '{ Spredsheet のシート名 }'

        # GoogleAPIを通してスプレッドシートに接続
        scope = ['https://spreadsheets.google.com/feeds','https://www.googleapis.com/auth/drive']
        credentials = ServiceAccountCredentials.from_json_keyfile_name(key_name, scope)
        gc = gspread.authorize(credentials)

        # シートを取得
        worksheet = gc.open(book_name).worksheet(sheet_name)

        # A列のデータを配列として取得
        A_COL_ARRAY = worksheet.col_values(1)

        # 最下行インデックスを取得
        LAST_ROW_IDX = len(A_COL_ARRAY)

        # 書き込み
        worksheet.update_cell(LAST_ROW_IDX+1, 1, name)
        worksheet.update_cell(LAST_ROW_IDX+1, 2, timestamp)
        worksheet.update_cell(LAST_ROW_IDX+1, 3, battery)
        worksheet.update_cell(LAST_ROW_IDX+1, 4, temp)
        worksheet.update_cell(LAST_ROW_IDX+1, 5, humid)

def scan(addr, name):
    scanner = Scanner().withDelegate(ScanDelegate(addr, name))
    try:
        # 指定秒BLEのスキャンを行う
        scanner.scan(20)
    except BTLEDisconnectError as e:
        pass
    except Exception as e:
        logging.error("Error during scanning: %s", e)

def main():
    scan("xx:xx:xx:xx:xx:x1", "名称1")
    scan("xx:xx:xx:xx:xx:x2", "名称2")
    scan("xx:xx:xx:xx:xx:x3", "名称3")

if __name__ == "__main__":
    main()

参考

cron 設定

python(pip) を cron で設定するは面倒らしいので、実行部分は shell で実装。ついでにログをファイル出力。

#!/bin/bash
source /hoge/myenv/bin/activate
python /hoge/switchbot_meter.py >> /hoge/output.log 2>> /hoge/error.log

crontab で 15 分毎の設定。

*/15 * * * * /hoge/run.sh

グラフの作成

Google スプレッドシートのデータを容易に扱える Looker Studio を使って折れ線グラフなどを作成。

15 分ごとの更新設定をしてるけどうまくいかず。

デスクトップ表示

Chrome の機能を使ってウェブサイトを疑似的にアプリ化。

参考

最後に

おおよその目的は達成できて満足。15 分ごとの更新ができていないが、raspi 上で php を動かしているのでそちらでグラフ表示&自動更新を考えている。(横軸が変わる場合に自動更新ができないというのをどこかで見た)

また、何も考えずデータの登録をしてしまったが、グラフ表示のことも考えてデータ登録を考えた方が良さそう。正規化大事。

  • 現在

    場所 タイムスタンプ バッテリー(%) 気温(℃) 湿度(%)
    ベランダ 2024-03-20 19:00:12 96 7.7 51
    ベビールーム 2024-03-20 19:00:30 92 22.7 33
    寝室 2024-03-20 19:00:48 100 13.8 44
  • 改善後(想定)

    場所 タイプ タイムスタンプ
    ベランダ バッテリー(%) 96 2024/3/20 19:00
    ベランダ 気温(℃) 7.7 2024/3/20 19:00
    ベランダ 湿度(%) 51 2024/3/20 19:00
    ベビールーム バッテリー(%) 92 2024/3/20 19:00
    ベビールーム 気温(℃) 22.7 2024/3/20 19:00
    ベビールーム 湿度(%) 33 2024/3/20 19:00
    寝室 バッテリー(%) 100 2024/3/20 19:00
    寝室 気温(℃) 13.8 2024/3/20 19:00
    寝室 湿度(%) 44 2024/3/20 19:00