今回は、TOCH社の水質センサーと接続してみます。
CONTENTS
用意するもの
用意するのは次のとおりです。SensingApp のアカウントを用意したら、センサーを登録した後、次の情報をご確認下さい。
- USER ID
- データ送信先URL
- センサー種別
- センサー番号
以上の情報をご用意いただければ、センシングデータをSensingAppに送信してグラフ表示することが可能です。
①と②のUSER IDとデータ送信先URLは、SensingApp画面のユーザー情報で確認できます。
また、③センサー種別と④センサー番号(No.)は、SensingApp画面のセンサー一覧で確認できます。
サンプルコード(Python)
RaspberryPiで動作させるサンプルコードを示します。
# -*- coding: utf-8 -*-
import base64
import json
import urllib.request
from datetime import datetime
import time
import serial
import binascii
import struct
import math
import traceback
from decimal import Decimal
from urllib.error import HTTPError
# 当システムのアクセス情報を変更(** 必ず変更する **)
URL = "https://xxxxx.sensingapp.io/api/data/"
USER_ID = "b0f68403-ff4f-4520-889e-xxxxxxxxxxxx"
ITEM = "xx"
ITEM_NUM = "xx"
# センシング間隔(秒)
INTERVAL_TIME = 60 # 1分ごと
SENSOR_USB = "/dev/ttyUSB0"
# センシングデータをTOCH指示計から取得
def get_data(device):
try:
ser = serial.Serial(
device, baudrate='9600', timeout=None,
stopbits=1, bytesize=8, parity='N'
)
# Connect TOCH Sensors (pH, MLSS, DO)
CMD = bytes.fromhex('0103000000044409')
ser.write(CMD)
time.sleep(1)
data_all = ser.read_all()
data_hex = binascii.hexlify(data_all)
worker_id = int.from_bytes(binascii.a2b_hex(data_hex[0:2]),"little")
func_cd = int.from_bytes(binascii.a2b_hex(data_hex[2:4]),"little")
data_size = int.from_bytes(binascii.a2b_hex(data_hex[4:6]),"little")
data_float = hex_to_float((data_hex[10:14]+data_hex[6:10]).decode())
data = math.floor(data_float * 10 ** 2) / (10 ** 2)
ser.close()
except Exception:
data = 0
return data
def hex_to_float(s):
if s.startswith('0x'):
s = s[2:]
s = s.replace(' ', '')
return struct.unpack('>f', binascii.unhexlify(s))[0]
# センシングデータをサーバに送信
def send_data(data):
obj = {
"user_id": USER_ID,
"item": ITEM,
"item_num": ITEM_NUM,
"regist_date": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"sensing_data": str(data),
}
json_data = json.dumps(obj).encode("utf-8")
credentials = ('%s:%s' % (user, password))
encoded_credentials = base64.b64encode(credentials.encode('ascii'))
# httpリクエストを準備してPOST
request = urllib.request.Request(
URL, data=json_data, method="POST",
headers={"Content-Type": "application/json", }
)
request.add_header(
'Authorization', 'Basic %s' % encoded_credentials.decode("ascii")
)
try:
with urllib.request.urlopen(request) as response:
response_data = response.read().decode("utf-8")
except Exception:
response_data = 0
return response_data
# 決められた時間間隔でのセンシング
def main():
try:
while True:
# 開始時刻を記録
start_time = time.time()
sensing_data = get_data(SENSOR_USB) # 指定のUSBからのデータを取得する。
if sensing_data != 0 : # 正常に数値を取得したらデータを処理する。
response_data = send_data(sensing_data) # データをクラウドに送信する。
print(response_data) # for debug
# 終了時刻を記録
stop_time = time.time()
# (INTERVAL_TIME - 送信にかかった時間)秒待ってループ
wait = start_time - stop_time + INTERVAL_TIME
if wait > 0:
time.sleep(wait)
# キーボード例外を検出
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()
データが表示されなかったら
ダッシュボードでデータが表示されない場合は次の理由が考えられます。
① IoTデバイスがインターネットに接続されていない。
② IoTデバイスでデータ送信するプログラムのパラメーターが正しくない。
- IoTデバイス設定の通信パラメーターをご覧ください。
- 通信が一度でも出来たあとに、登録センサー画面で、該当するセンサーを削除したり、種別を変更すると通信できなくなります。
③ システム全体が不具合を起こしている。
- この場合には画面も見られなくなっている可能性がありますので、弊社にご相談ください。
上記のように一旦、誤ってセンサーを削除したら、IoTデバイスのパラメーターを変更する必要があります。その場合には弊社にご相談ください。