サンプルプログラム

サンプルプログラム(TOCH社センサー)

 今回は、TOCH社の水質センサーと接続してみます。

用意するもの

 用意するのは次のとおりです。SensingApp のアカウントを用意したら、センサーを登録した後、次の情報をご確認下さい。

  1. USER ID
  2. データ送信先URL
  3. センサー種別
  4. センサー番号

 以上の情報をご用意いただければ、センシングデータをSensingAppに送信してグラフ表示することが可能です。

①と②のUSER IDとデータ送信先URLは、SensingApp画面のユーザー情報で確認できます。

 また、③センサー種別と④センサー番号(No.)は、SensingApp画面のセンサー一覧で確認できます。

サンプルコード(Python)

RaspberryPiで動作させるサンプルコードを示します。

# -*- coding: utf-8 -*-
import base64
import json
import urllib.request
from datetime import datetime

import time
import serial
import binascii
import struct
import math
import traceback

from decimal import Decimal
from urllib.error import HTTPError

# 当システムのアクセス情報を変更(** 必ず変更する **)
URL = "https://xxxxx.sensingapp.io/api/data/"
USER_ID = "b0f68403-ff4f-4520-889e-xxxxxxxxxxxx"
ITEM = "xx"
ITEM_NUM = "xx"

# センシング間隔(秒)
INTERVAL_TIME = 60    # 1分ごと

SENSOR_USB = "/dev/ttyUSB0" 

# センシングデータをTOCH指示計から取得
def get_data(device):
    try:
        ser = serial.Serial(
            device, baudrate='9600', timeout=None, 
            stopbits=1, bytesize=8, parity='N'
        )

        # Connect TOCH Sensors (pH, MLSS, DO)
        CMD = bytes.fromhex('0103000000044409')

        ser.write(CMD)
        time.sleep(1)
        data_all = ser.read_all()

        data_hex = binascii.hexlify(data_all)

        worker_id = int.from_bytes(binascii.a2b_hex(data_hex[0:2]),"little")
        func_cd = int.from_bytes(binascii.a2b_hex(data_hex[2:4]),"little")
        data_size = int.from_bytes(binascii.a2b_hex(data_hex[4:6]),"little")

        data_float = hex_to_float((data_hex[10:14]+data_hex[6:10]).decode())
        data = math.floor(data_float * 10 ** 2) / (10 ** 2)

        ser.close()

    except Exception:
        data = 0

    return data


def hex_to_float(s):
    if s.startswith('0x'):
        s = s[2:]
    s = s.replace(' ', '')
    return struct.unpack('>f', binascii.unhexlify(s))[0]


# センシングデータをサーバに送信
def send_data(data):

    obj = {
        "user_id": USER_ID,
        "item": ITEM,
        "item_num": ITEM_NUM,
        "regist_date": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        "sensing_data": str(data),
    }
    json_data = json.dumps(obj).encode("utf-8")

    credentials = ('%s:%s' % (user, password))
    encoded_credentials = base64.b64encode(credentials.encode('ascii'))

    # httpリクエストを準備してPOST
    request = urllib.request.Request(
        URL, data=json_data, method="POST", 
        headers={"Content-Type": "application/json", }
    )
    request.add_header(
        'Authorization', 'Basic %s' % encoded_credentials.decode("ascii")
    )

    try:
        with urllib.request.urlopen(request) as response:
            response_data = response.read().decode("utf-8")

    except Exception:
        response_data = 0

    return response_data


# 決められた時間間隔でのセンシング
def main():

    try:
        while True:
            # 開始時刻を記録
            start_time = time.time()

            sensing_data = get_data(SENSOR_USB) # 指定のUSBからのデータを取得する。

            if sensing_data != 0 : # 正常に数値を取得したらデータを処理する。
                response_data = send_data(sensing_data)  # データをクラウドに送信する。
                print(response_data) # for debug
                # 終了時刻を記録
                stop_time = time.time()
                # (INTERVAL_TIME - 送信にかかった時間)秒待ってループ
                wait = start_time - stop_time + INTERVAL_TIME
                if wait > 0:
                    time.sleep(wait)

            # キーボード例外を検出
    except KeyboardInterrupt:
        pass


if __name__ == '__main__':
    main()


データが表示されなかったら

 ダッシュボードでデータが表示されない場合は次の理由が考えられます。

① IoTデバイスがインターネットに接続されていない。

② IoTデバイスでデータ送信するプログラムのパラメーターが正しくない。

  • IoTデバイス設定の通信パラメーターをご覧ください。
  • 通信が一度でも出来たあとに、登録センサー画面で、該当するセンサーを削除したり、種別を変更すると通信できなくなります。

③ システム全体が不具合を起こしている。

  • この場合には画面も見られなくなっている可能性がありますので、弊社にご相談ください。

 上記のように一旦、誤ってセンサーを削除したら、IoTデバイスのパラメーターを変更する必要があります。その場合には弊社にご相談ください。