Luftdata och visualisering med en Raspberry Pi

Den senaste tiden har jag, som tidigare nämnt, börjat pyssla en del med Luftdata. Som ett litet helgpyssel funderade jag på hur man skulle kunna visualisera själva datan som sensorerna producerar. Min lilla prototyp ser ut så här:

Det hela funkar så här. En passiv infrarödsensor skickar en signal till min Rasberry Pi som är utrustad med en Sense Hat. När en rörelse detekteras så ansluter jag till Luftdatens API och hämtar datan från den sensor som jag har satt upp nära Korsvägen i GBG. Värdena visas sedan på displayen.

All kod finns här.

Jag tänkte att jag gör en liten "walk-through" av koden för er som är intresserade.

#!/usr/bin/python3
from sense_hat import SenseHat
from time import sleep
import requests
import RPi.GPIO as GPIO
import time
from datetime import datetime
import sqlite3
conn = sqlite3.connect('intruders.sqlite3')

Först och främst importerar vi ett antal användbara bibliotek. SenseHat har redan färdiga bibliotek, vi behöver time för att skapa olika inväntningar och tidsstämplar, sedan behövs GPIO för att kommunicera med rörelsedetektorn, och sedan för att spara data (framförallt för att kunna hitta fel) så lägger jag datan i en sqlite3-databas. Databasen konstrueras med följande schema:

sqlite> CREATE TABLE intruders (clocktime TEXT, timestamp TEXT, pm10 TEXT, pm25
TEXT, temp TEXT, humidity TEXT, UNIQUE(clocktime));

För att sedan få in data från rörelsedetektorn behöver vi konfigurera pins.

GPIO.setwarnings(False)
GPIO.setmode(GPIO.BOARD)
GPIO.setup(11, GPIO.IN)

Observera. SenseHaten tar egentligen upp alla GPIO-pins på Rasberry Pi. Så man får göra ett litet hack. Jag har helt enkelt förlängt pins för 5v och jord, samt GPIO-pin 11. Dessa pins används ändå inte av SenseHaten så det är bara att köra rakt igenom.

Därefter lite konfiguration av SenseHaten och definiering av ett gäng färger:

sense = SenseHat()
sense.set_rotation(180)

r = [255, 0, 0]
o = [255, 127, 0]
y = [255, 255, 0]
g = [0, 255, 0]
b = [0, 0, 255]
i = [75, 0, 130]
v = [159, 0, 255]
e = [0, 0, 0]

För att hämta data från sensorn så anropar jag Luftdaten.info's API. Följande funktion hämtar data för PM10 och PM2.5 samt tidsstämpel. Om sensorn skulle vara offline så visar SenseHat ett felmeddelande.

def getdata(sensorid):
    PM10 = {}
    PM2 = {}
    Time = ""
    data = requests.get('http://api.luftdaten.info/v1/sensor/' + str(sensorid)
                        + '/')
    jsonobject = data.json()
    if len(jsonobject) == 0:
        sense.show_message("SENSOR OFFLINE!",
                           scroll_speed=0.03, text_colour=r)

    for j in jsonobject:
        Time = j['timestamp']
        PM10[j['sensordatavalues'][0]['value_type']] = j['sensordatavalues'][0]['value']
        PM2[j['sensordatavalues'][1]['value_type']] = j['sensordatavalues'][1]['value']
    return(Time, PM10, PM2)

Mellan varven kan det vara bra att kunna rensa displayen så vi gör en sådan funktion också:

def cleardisplay():
    print("Clearing display...")
    sense.clear()

Sedan till själva huvudfunktionen scrolldisplay() som både hämtar, sorterar och visualiserar datan, samt, slutligen, även sparar den i en databas för framtida analys. Ni får ursäkta komplexiteten, det är inte helt by the book att lägga så här mycket i en funktion, jag vet. Men jag ville även hämta en del data från själva SenseHat, framförallt temperatur och luftfuktighet, främst för att se att själva Raspberry Pi inte blev för varm i sitt litta plastcase.

def scrolldisplay(sensorid, clocktime):
    currentdata = getdata(sensorid)
    print("current data: " + str(currentdata))
    Time = currentdata[0]
    PM10 = float(currentdata[1]['P1'])
    PM2 = float(currentdata[2]['P2'])
    if PM10 > 49:
        pm10colour = r
    elif PM10 > 24:
        pm10colour = y
    else:
        pm10colour = g
    print("PM10 " + str(PM10))
    sense.show_message("PM10 " + str(PM10), text_colour=pm10colour)
    if PM2 > 24:
        pm2colour = r
    elif PM2 > 10:
        pm2colour = y
    else:
        pm2colour = g
    print("PM2.5 " + str(PM2))
    sense.show_message("PM2.5 " + str(PM2), text_colour=pm2colour)
    temp = sense.get_temperature()
    print("Temp: " + str(temp))
    if temp > 30:
        tempcolour = r
    elif temp > 24:
        tempcolour = y
    else:
        tempcolour = b
    # sense.show_message("Temp: %s C" % round(temp, 2), text_colour=tempcolour)
    humidity = sense.get_humidity()
    print("Luftfuktighetet:" + str(humidity))
    # sense.show_message("Fukt: %s %%rH" % round(humidity, 2), text_colour=v)
    sense.show_message(Time[10:], text_colour=g)
    sense.show_message(" Luftdata.se", text_colour=g)
    conn.execute("INSERT INTO intruders (clocktime, timestamp, pm10, pm25, temp, humidity) VALUES (?, ?, ?, ?, ?, ?);", (str(clocktime), Time[10:], str(PM10), str(PM2), str(temp), str(humidity)))
    print("Writing to database... SUCCESS.")
    conn.commit()

Nån gång i framtiden ska jag skriva om den där funktionen så att den blir lite överskådligare.

Nu över till en liten startup-funktion som testar så att Raspberry Pi har internetkontakt, annars ger den ett felmeddelande:

def startup(sensorid):
    print("Booting...")
    sense.show_message("Booting...", scroll_speed=0.03, text_colour=r)
    # test for internet connection
    if len(getdata(sensorid)) == 3:
        sense.show_message("Network OK", scroll_speed=0.03, text_colour=g)
    else:
        sense.show_message("Network Error", scroll_speed=0.03, text_colour=r)

Sist men inte minst den funktion som bevakar rörelsesensorn och triggar funktionen scrolldisplay() ovan om den detekterar rörelse. Dessutom hämtar den systemklockans tid för loggning.

def runPIR(sensorid):
    # This function is used for infrared motion detection as user input.
    print("Waiting for intruders to trigger measurement...")
    sense.show_message("Detecting movement... ", scroll_speed=0.03, text_colour=r)
    while True:
        i = GPIO.input(11)
        if i == 0:  # When output from motion sensor is LOW
            #print("No intruders", i)
            sleep(5)
        elif i == 1:  # When output from motion sensor is HIGH
            print("-" * 40)
            print("INTRUDER DETECTED!!!", i)
            clocktime = str(datetime.now())
            print("Timestamp for detection: ", clocktime)
            for i in range(3):
                sense.clear(b)
                sleep(0.1)
                sense.clear()
                sleep(0.1)
            cleardisplay()
            scrolldisplay(sensorid, clocktime)

Utöver detta kör jag sedan ett litet skript som heter run.py som importerar all kod ovan och sedan körs i screen på Rasbperry Pi, vilket gör att man kan se programmets status även över SSH.

import luftdata
import sys

luftdata.startup(10136)

try:
    while True:
        luftdata.runPIR(10136)

except KeyboardInterrupt:
    luftdata.cleardisplay()
sys.exit()

Först kollar skriptet så att Raspberryn har nätkontakt. Sedan kör den rörelsedetektorn kontinuerligt. Och, såklart, ange sensorns id-nummer (10136).