Набор скриптов для «умного дома» на Raspberry PI 3
Так уж получилось, что мой «умный» дом, это фактически небольшая кучка скриптов. Приведу пример основных из них, вдруг кому интересно будет…
Отправка/получение данных по радиоканалу с частотой 433Mhz:
Получение:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
#!/usr/bin/env python3 import argparse import signal import sys import time import logging from rpi_rf import RFDevice import fcntl, sys,os fp = open(os.path.realpath(__file__), 'r') try: fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: print('неужели другой мой экземпляр всё ещё работает?') sys.exit(0) rfdevice = None # pylint: disable=unused-argument def exithandler(signal, frame): rfdevice.cleanup() sys.exit(0) logging.basicConfig(level=logging.DEBUG, datefmt='%Y-%m-%d %H:%M:%S', format='%(asctime)-15s - [%(levelname)s] %(module)s: %(message)s', ) parser = argparse.ArgumentParser(description='Receives a decimal code via a 433/315MHz GPIO device') parser.add_argument('-g', dest='gpio', type=int, default=22, help="GPIO pin (Default: 27)") args = parser.parse_args() signal.signal(signal.SIGINT, exithandler) rfdevice = RFDevice(args.gpio) rfdevice.enable_rx() timestamp = None logging.info("Listening for codes on GPIO " + str(args.gpio)) while True: if rfdevice.rx_code_timestamp != timestamp: timestamp = rfdevice.rx_code_timestamp logging.info(str(rfdevice.rx_code) + " [pulselength " + str(rfdevice.rx_pulselength) + ", protocol " + str(rfdevice.rx_proto) + "]") time.sleep(0.01) |
Отправка:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
#!/usr/bin/env python3 import argparse import signal import sys import time import logging from rpi_rf import RFDevice import fcntl, sys,os fp = open(os.path.realpath(__file__), 'r') try: fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: print('неужели другой мой экземпляр всё ещё работает?') sys.exit(0) logging.basicConfig(level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S', format='%(asctime)-15s - [%(levelname)s] %(module)s: %(message)s',) parser = argparse.ArgumentParser(description='Sends a decimal code via a 433/315MHz GPIO device') parser.add_argument('code', metavar='CODE', type=int, help="Decimal code to send") parser.add_argument('-g', dest='gpio', type=int, default=17, help="GPIO pin (Default: 17)") parser.add_argument('-p', dest='pulselength', type=int, default=None,help="Pulselength (Default: 350)") parser.add_argument('-t', dest='protocol', type=int, default=None,help="Protocol (Default: 1)") parser.add_argument('-l', dest='length', type=int, default=None,help="Codelength (Default: 24)") parser.add_argument('-r', dest='repeat', type=int, default=10,help="Repeat cycles (Default: 10)") args = parser.parse_args() rfdevice = RFDevice(args.gpio) rfdevice.enable_tx() rfdevice.tx_repeat = args.repeat if args.protocol: protocol = args.protocol else: protocol = "default" if args.pulselength: pulselength = args.pulselength else: pulselength = "default" if args.length: length = args.length else: length = "default" logging.info(str(args.code) + " [protocol: " + str(protocol) + ", pulselength: " + str(pulselength) + ", length: " + str(length) + ", repeat: " + str(rfdevice.tx_repeat) + "]") rfdevice.tx_code(args.code, args.protocol, args.pulselength, args.length) rfdevice.cleanup() |
Получение температуры/влажности с датчиков dh11:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
#!/usr/bin/python import Adafruit_DHT import time import mysql.connector from mysql.connector import Error import json import os import fcntl, sys fp = open(os.path.realpath(__file__), 'r') try: fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: print('неужели другой мой экземпляр всё ещё работает?') sys.exit(0) dir=os.path.dirname(os.path.abspath(__file__)) with open(dir+'/../config.json', 'r', encoding='utf-8') as f: #открыли файл с данными config = json.load(f) #загнали все, что получилось в переменную print(config["db"]); try: conn = mysql.connector.connect(host=config["db"]["host"],database=config["db"]["database"],user=config["db"]["username"],password=config["db"]["password"]) if conn.is_connected(): print('Вроде соеденился!') except Error as e: print(e); exit(0); DHT_SENSOR = Adafruit_DHT.DHT11 sql="SELECT * FROM sources WHERE device=1"; if len(sys.argv)==2: id=sys.argv[1] sql=f"SELECT * FROM sources WHERE id='{id}'"; cursor2=conn.cursor(dictionary=True,buffered=True) cursor2.execute(sql,[]); myrow3 = cursor2.fetchone() while myrow3 is not None: id=myrow3["id"] source=myrow3["id"] place=myrow3["place"] param=json.loads(myrow3["param"].decode("utf-8")) pin=param["pin"] print(f"-опрашиваю статус датчик {id},pin={pin}") cnt=3 while cnt>0: humidity, temperature = Adafruit_DHT.read(DHT_SENSOR, pin) if humidity is not None and temperature is not None: if humidity<140: print("Temp={0:0.1f}C Humidity={1:0.1f}%".format(temperature, humidity)) sql=f"insert into m_data (place,source,value_type,value,dt) values ({place},{source},1,{temperature},now());"; cursor = conn.cursor(dictionary=True,buffered=True) cursor.execute(sql); conn.commit() cnt=0 else: print("Sensor failure. Check wiring..."); time.sleep(3); cnt=cnt-1 myrow3 = cursor2.fetchone() conn.commit() exit(-1) |
Мигаем светодиом:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
#!/usr/bin/python import RPi.GPIO as GPIO import time import fcntl, sys, os from time import sleep while True: try: GPIO.setmode(GPIO.BCM) GPIO.setup(23, GPIO.OUT) GPIO.output(23, True) time.sleep(0.5) GPIO.output(23, False) time.sleep(0.5) finally: print("clean up") GPIO.cleanup() |
Получение данных с микротика
Температура:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
$fl = fopen("/tmp/".basename(__FILE__).".lock", "w"); if( ! ( $fl && flock( $fl, LOCK_EX | LOCK_NB ) ) ) { die("--копия скрипта уже запущена!"); }; define('WUO_ROOT', dirname(__FILE__)); require_once WUO_ROOT.'/vendor/autoload.php'; require_once WUO_ROOT.'/../class/Tsql.php'; $config=json_decode(file_get_contents(WUO_ROOT."/../config.json")); var_dump($config); $sqln=new Tsql("mysql","m_data",$config->db->host,$config->db->username,$config->db->password); $config = new \RouterOS\Config([ 'host' => '192.wefrwerfwe1', 'user' => 'ewrfwerfe', 'pass' => 'erwfwerfew', 'port' => 8728, ]); $client = new \RouterOS\Client($config); $res=$client->query('/system/health/print')->read(); $temp=$res[0]["temperature"]; $sql="insert into m_data (place,source,value_type,value,dt) values (10,12,1,'$temp',now())"; $stmt=$sqln->dbh->prepare($sql); $stmt->execute(); |
Уровни WIFI сигналов:
1 2 3 4 5 6 7 |
$client = new \RouterOS\Client($config); $res=$client->query('/interface/wireless/registration-table/print')->read(); var_dump($res); |
WiFi реле Sonoff DIY 3:
Текущий статус:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
#!/usr/bin/python3 import pymysql import requests from urllib3.exceptions import InsecureRequestWarning from mysql.connector import Error import mysql.connector import random import os,fcntl, sys import json sys.path.insert(0, '/root/scripts/includes') import functions fp = open(os.path.realpath(__file__), 'r') try: fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: print('неужели другой мой экземпляр всё ещё работает?') sys.exit(0) dir=os.path.dirname(os.path.abspath(__file__)) with open(dir+'/../config.json', 'r', encoding='utf-8') as f: #открыли файл с данными config = json.load(f) #загнали все, что получилось в переменную def GetSonoffStatus(ip): ret={"error":True,"signal":0,"switch":True} requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) post_params = '{"deviceid": "","data": {}}' try: response = requests.post(f"http://{ip}:8081/zeroconf/info", data=post_params, verify=False) res = response.json() print(f"пришло:{res}") if type(res['data'])==str: res["data"]=json.loads(res["data"]) if res["data"]["switch"]=="off": ret["switch"]=False if "signalStrength" in res["data"]: ret["signal"]=res["data"]["signalStrength"] ret["error"]=False except Exception as e: print(f"Ошибка:{e}") # соединяемся с БД try: conn = mysql.connector.connect(host=config["db"]["host"],database=config["db"]["database"],user=config["db"]["username"],password=config["db"]["password"]) if conn.is_connected(): print('Вроде соеденился!') except Error as e: print(e); exit(0); sql="SELECT * FROM sources WHERE device=3"; if len(sys.argv)==2: ip=sys.argv[1] sql=f"SELECT * FROM sources WHERE ip='{ip}'"; cursor2=conn.cursor(dictionary=True,buffered=True) cursor2.execute(sql,[]); myrow3 = cursor2.fetchone() while myrow3 is not None: ip=myrow3["ip"].decode("utf-8") source=myrow3["id"] place=myrow3["place"] deviceid=myrow3["deviceid"].decode("utf-8") print(f"-опрашиваю статус реле {ip}") res=functions.GetSonoffStatus(ip) if res["error"]==False: sql=f"insert into m_data (place,source,value_type,value,dt) values ({place},{source},3,{res['switch']},now())"; cursor = conn.cursor(dictionary=True,buffered=True) cursor.execute(sql); conn.commit() sql=f"insert into m_data (place,source,value_type,value,dt) values ({place},{source},4,{res['signal']},now())"; cursor = conn.cursor(dictionary=True,buffered=True) cursor.execute(sql); conn.commit() myrow3 = cursor2.fetchone() conn.commit() |
Переключение реле:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
def SonoffReleSwitch(conn,ip,deviceid,rele): requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) if rele==True: post_params = '{"deviceid": "","data": {"switch":"on"}}' else: post_params = '{"deviceid": "","data": {"switch":"off"}}' # переключаю реле try: response = requests.post(f"http://{ip}:8081/zeroconf/switch", data=post_params, verify=False) res = response.json() print(f"---пришло:{res}") if res["error"]==0: print("- ошибок нет!"); return True return False except requests.exceptions.HTTPError as errh: print("HTTP Error") except requests.exceptions.ReadTimeout as errrt: print("Time out") except requests.exceptions.ConnectionError as conerr: print("Connection error") return False |