Набор скриптов для «умного дома» на Raspberry PI 3

Так уж получилось, что мой «умный» дом, это фактически небольшая кучка скриптов. Приведу пример основных из них, вдруг кому интересно будет…

Отправка/получение данных по радиоканалу с частотой 433Mhz:

Получение:

#!/usr/bin/env python3

import argparse
import signal
import sys
import time
import logging
from rpi_rf import RFDevice
import fcntl, sys,os

fp = open(os.path.realpath(__file__), 'r')
try:
    fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
    print('неужели другой мой экземпляр всё ещё работает?')
    sys.exit(0)

rfdevice = None
# pylint: disable=unused-argument
def exithandler(signal, frame):
    rfdevice.cleanup()
    sys.exit(0)

logging.basicConfig(level=logging.DEBUG, datefmt='%Y-%m-%d %H:%M:%S',
                    format='%(asctime)-15s - [%(levelname)s] %(module)s: %(message)s', )

parser = argparse.ArgumentParser(description='Receives a decimal code via a 433/315MHz GPIO device')
parser.add_argument('-g', dest='gpio', type=int, default=22,
                    help="GPIO pin (Default: 27)")
args = parser.parse_args()

signal.signal(signal.SIGINT, exithandler)
rfdevice = RFDevice(args.gpio)
rfdevice.enable_rx()
timestamp = None
logging.info("Listening for codes on GPIO " + str(args.gpio))
while True:
    if rfdevice.rx_code_timestamp != timestamp:
        timestamp = rfdevice.rx_code_timestamp
        logging.info(str(rfdevice.rx_code) +
                     " [pulselength " + str(rfdevice.rx_pulselength) +
                     ", protocol " + str(rfdevice.rx_proto) + "]")
    time.sleep(0.01)

Отправка:

#!/usr/bin/env python3

import argparse
import signal
import sys
import time
import logging
from rpi_rf import RFDevice
import fcntl, sys,os

fp = open(os.path.realpath(__file__), 'r')
try:
    fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
    print('неужели другой мой экземпляр всё ещё работает?')
    sys.exit(0)

logging.basicConfig(level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S',
                    format='%(asctime)-15s - [%(levelname)s] %(module)s: %(message)s',)

parser = argparse.ArgumentParser(description='Sends a decimal code via a 433/315MHz GPIO device')
parser.add_argument('code', metavar='CODE', type=int, help="Decimal code to send")
parser.add_argument('-g', dest='gpio', type=int, default=17, help="GPIO pin (Default: 17)")
parser.add_argument('-p', dest='pulselength', type=int, default=None,help="Pulselength (Default: 350)")
parser.add_argument('-t', dest='protocol', type=int, default=None,help="Protocol (Default: 1)")
parser.add_argument('-l', dest='length', type=int, default=None,help="Codelength (Default: 24)")
parser.add_argument('-r', dest='repeat', type=int, default=10,help="Repeat cycles (Default: 10)")
args = parser.parse_args()

rfdevice = RFDevice(args.gpio)
rfdevice.enable_tx()
rfdevice.tx_repeat = args.repeat

if args.protocol:
    protocol = args.protocol
else:
    protocol = "default"
if args.pulselength:
    pulselength = args.pulselength
else:
    pulselength = "default"
if args.length:
    length = args.length
else:
    length = "default"

logging.info(str(args.code) +
             " [protocol: " + str(protocol) +
             ", pulselength: " + str(pulselength) +
             ", length: " + str(length) +
             ", repeat: " + str(rfdevice.tx_repeat) + "]")

rfdevice.tx_code(args.code, args.protocol, args.pulselength, args.length)
rfdevice.cleanup()

Получение температуры/влажности с датчиков dh11:

#!/usr/bin/python
import Adafruit_DHT
import time
import mysql.connector
from mysql.connector import Error
import json
import os
import fcntl, sys

fp = open(os.path.realpath(__file__), 'r')
try:
    fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
    print('неужели другой мой экземпляр всё ещё работает?')
    sys.exit(0)

dir=os.path.dirname(os.path.abspath(__file__))
with open(dir+'/../config.json', 'r', encoding='utf-8') as f: #открыли файл с данными
    config = json.load(f) #загнали все, что получилось в переменную

print(config["db"]);


try:
    conn = mysql.connector.connect(host=config["db"]["host"],database=config["db"]["database"],user=config["db"]["username"],password=config["db"]["password"])
    if conn.is_connected(): print('Вроде соеденился!')
except Error as e:
    print(e);
    exit(0);

DHT_SENSOR = Adafruit_DHT.DHT11

sql="SELECT * FROM sources WHERE device=1";
if len(sys.argv)==2:
 id=sys.argv[1]
 sql=f"SELECT * FROM sources WHERE id='{id}'";

cursor2=conn.cursor(dictionary=True,buffered=True)
cursor2.execute(sql,[]);
myrow3 = cursor2.fetchone()
while myrow3 is not None:
  id=myrow3["id"]
  source=myrow3["id"]
  place=myrow3["place"]
  param=json.loads(myrow3["param"].decode("utf-8"))
  pin=param["pin"]
  print(f"-опрашиваю статус датчик {id},pin={pin}")
  cnt=3
  while cnt>0:
    humidity, temperature = Adafruit_DHT.read(DHT_SENSOR, pin)
    if humidity is not None and temperature is not None:
        if humidity<140:
         print("Temp={0:0.1f}C Humidity={1:0.1f}%".format(temperature, humidity))
         sql=f"insert into m_data (place,source,value_type,value,dt) values ({place},{source},1,{temperature},now());";
         cursor = conn.cursor(dictionary=True,buffered=True)
         cursor.execute(sql);
         conn.commit()
         cnt=0
    else:
        print("Sensor failure. Check wiring...");
    time.sleep(3);
    cnt=cnt-1
  myrow3 = cursor2.fetchone()
conn.commit()

exit(-1)

Мигаем светодиом:

#!/usr/bin/python
import RPi.GPIO as GPIO
import time
import fcntl, sys, os
from time import sleep
while  True:
    try:
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(23, GPIO.OUT)
        GPIO.output(23, True)
        time.sleep(0.5)
        GPIO.output(23, False)
        time.sleep(0.5)
    finally:
       print("clean up")
       GPIO.cleanup() 

Получение данных с микротика

Температура:

$fl = fopen("/tmp/".basename(__FILE__).".lock", "w");
    if( ! ( $fl && flock( $fl, LOCK_EX | LOCK_NB ) ) ) {
    die("--копия скрипта уже запущена!");
};


define('WUO_ROOT', dirname(__FILE__));
require_once WUO_ROOT.'/vendor/autoload.php';
require_once WUO_ROOT.'/../class/Tsql.php';

$config=json_decode(file_get_contents(WUO_ROOT."/../config.json"));
var_dump($config);


$sqln=new Tsql("mysql","m_data",$config->db->host,$config->db->username,$config->db->password);


$config = new \RouterOS\Config([
    'host' => '192.wefrwerfwe1',
    'user' => 'ewrfwerfe',
    'pass' => 'erwfwerfew',
    'port' => 8728,
]);
$client = new \RouterOS\Client($config);

$res=$client->query('/system/health/print')->read();

$temp=$res[0]["temperature"];

 $sql="insert into m_data (place,source,value_type,value,dt) values (10,12,1,'$temp',now())";
 $stmt=$sqln->dbh->prepare($sql);
 $stmt->execute();

Уровни WIFI сигналов:

$client = new \RouterOS\Client($config);

$res=$client->query('/interface/wireless/registration-table/print')->read();

var_dump($res);

WiFi реле Sonoff DIY 3:

Текущий статус:

#!/usr/bin/python3
import pymysql
import requests
from urllib3.exceptions import InsecureRequestWarning
from mysql.connector import Error
import mysql.connector
import random
import os,fcntl, sys
import json

sys.path.insert(0, '/root/scripts/includes')
import functions

fp = open(os.path.realpath(__file__), 'r')
try:
    fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
    print('неужели другой мой экземпляр всё ещё работает?')
    sys.exit(0)


dir=os.path.dirname(os.path.abspath(__file__))
with open(dir+'/../config.json', 'r', encoding='utf-8') as f: #открыли файл с данными
    config = json.load(f) #загнали все, что получилось в переменную

def GetSonoffStatus(ip):
 ret={"error":True,"signal":0,"switch":True}

 requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
 post_params = '{"deviceid": "","data": {}}'

 try:
    response = requests.post(f"http://{ip}:8081/zeroconf/info", data=post_params, verify=False)
    res = response.json()
    print(f"пришло:{res}")

    if type(res['data'])==str:
      res["data"]=json.loads(res["data"])

    if res["data"]["switch"]=="off":
       ret["switch"]=False

    if "signalStrength" in res["data"]:
       ret["signal"]=res["data"]["signalStrength"]
    ret["error"]=False

 except Exception as e:
    print(f"Ошибка:{e}")


# соединяемся с БД
try:
    conn = mysql.connector.connect(host=config["db"]["host"],database=config["db"]["database"],user=config["db"]["username"],password=config["db"]["password"])
    if conn.is_connected(): print('Вроде соеденился!')
except Error as e:
    print(e);
    exit(0);

sql="SELECT * FROM sources WHERE device=3";
if len(sys.argv)==2:
 ip=sys.argv[1]
 sql=f"SELECT * FROM sources WHERE ip='{ip}'";

cursor2=conn.cursor(dictionary=True,buffered=True)
cursor2.execute(sql,[]);
myrow3 = cursor2.fetchone()
while myrow3 is not None:
  ip=myrow3["ip"].decode("utf-8")
  source=myrow3["id"]
  place=myrow3["place"]
  deviceid=myrow3["deviceid"].decode("utf-8")
  print(f"-опрашиваю статус реле {ip}")
  res=functions.GetSonoffStatus(ip)
  if res["error"]==False:
    sql=f"insert into m_data (place,source,value_type,value,dt) values ({place},{source},3,{res['switch']},now())";
    cursor = conn.cursor(dictionary=True,buffered=True)
    cursor.execute(sql);
    conn.commit()
    sql=f"insert into m_data (place,source,value_type,value,dt) values ({place},{source},4,{res['signal']},now())";
    cursor = conn.cursor(dictionary=True,buffered=True)
    cursor.execute(sql);
    conn.commit()
  myrow3 = cursor2.fetchone()
conn.commit()

Переключение реле:

def SonoffReleSwitch(conn,ip,deviceid,rele):
 requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
 if rele==True:
   post_params = '{"deviceid": "","data": {"switch":"on"}}'
 else:
     post_params = '{"deviceid": "","data": {"switch":"off"}}'
 # переключаю реле
 try:
    response = requests.post(f"http://{ip}:8081/zeroconf/switch", data=post_params, verify=False)
    res = response.json()
    print(f"---пришло:{res}")
    if res["error"]==0:
       print("- ошибок нет!");
       return True
    return False
 except requests.exceptions.HTTPError as errh:
  print("HTTP Error")
 except requests.exceptions.ReadTimeout as errrt:
  print("Time out")
 except requests.exceptions.ConnectionError as conerr:
  print("Connection error")
 return False

Python GPIO: This channel is already in use, continuing anyway

Такая ошибка чаще всего возникает, если пытаетесь включить/выключить ногу работа с которой уже ранее была начата.

Решение: после того как с пином поработали, необходимо сбросить работу с пинами. Как например делаю это я при работе с реле.

Включаем реле:

#!/usr/bin/python
import RPi.GPIO as GPIO

GPIO.setmode(GPIO.BCM)
GPIO.setup(9, GPIO.OUT)
GPIO.output(9, False)

Выключаем реле:

#!/usr/bin/python
import RPi.GPIO as GPIO

GPIO.setmode(GPIO.BCM)
GPIO.setup(9, GPIO.OUT)
GPIO.output(9, True)
GPIO.cleanup()

Пишем игру Питон на Python ;). Часть 3

Продолжаем начатое. Добавим в игру увеличение скорости если питон ест, и подсчет очков. Добавим переменные:

...
speed=500;          # начальная скорость питона
increase_speed=10;  # размер увеличения скорости при удачной еде
score=0;            # набранные очки в результате игры
score_color=(100,100,50)  # цвет очков
...

Интересный момент работы с таймером. По умолчанию time.time() отдает данные в секундах целые числа, и доли секунд в цифрах после запятой. Для того чтобы соответственно удобно считать миллисекунды, значения нужно умножать на 1000. Изменим код:

start_time=time.time()*1000
...
while running:
    if (time.time()*1000-start_time)>speed:
        start_time=time.time()*1000
        Draw_foods(screen)
        Draw_python(screen)
...

А теперь изменим функцию Draw_python, добавив отрисовку количества очков, а так-же увеличение скорости и количества очков при поедании:

...
global score,speed
..
        for element in foods:
            if [element[0],element[1]] == head_coor:
                del_tail=False
                foods.remove(element)
                speed=speed-increase_speed  # увеличиваем скорость питона
                score=score+element[0]      # увеличиваем очки в зависимости от типа сожранного
...
 # Рисую набранные очки
    pygame.draw.rect(screen,(0,0,0),(width_field,0,width_field+300,100))
    img = font.render(f'Очки: {score}', True, score_color)
    screen.blit(img, (width_field+10, 20))
...

Добавим еще штрих — сделаем реакцию на столкновение со стенами поля, а именно чтоб питон переходил на левый край при столкновении с правым и т.д.,

...        if (head_coor[0]>size_x):
            head_coor[0]=1;
        if (head_coor[1]>size_y):
            head_coor[1]=1;
        if (head_coor[0]<0):
            head_coor[0]=size_x;
        if (head_coor[1]<0):
            head_coor[1]=size_y;
...

Теперь не хватает последнего: проигрыша в случае столкновения головы с хвостом. Объявим глобальный цикл в котором будет крутится игра, в нём разместим цикл игры. При проигрыше выводим вопрос «Вы проиграли хотите еще (Y/N)» Если игрок выбирает Y, то из глобального цикла не выходим, иначе покидаем глобальный цикл.

gloop=True
while gloop: # глобальный цикл
    pygame.init()
    ...
    font = pygame.font.SysFont(None, 24)
    while running: # цикл игры
        if (time.time()*1000-start_time)>speed:
            start_time=time.time()*1000
...
 # здесь оказываемся когда проиграли...
    pygame.draw.rect(screen,(200,100,5),(50,50,width_field-50,100))
    img = font.render(f'Вы проиграли. Сыграем еще партию? (Y/N)', True, score_color)
    screen.blit(img, (100, 80))
    pygame.display.flip()
    pygame.display.update()
    running = True
    while running:
        for event in pygame.event.get():
            if event.type == pygame.KEYDOWN:
                if event.key == pygame.K_y:
                    moved_direction_x = 0
                    moved_direction_y = 0
                    speed=500
                    score=0
                    gloop=True
                    running=False
                if event.key == pygame.K_n:
                    gloop=False
                    running = False
print("нормально вышли")

Окончательную версию игры можно скачать здесь

Пишем игру Питон на Python ;). Часть 2

В предыдущей части, мы научили питона двигаться. Теперь чуть усложним задачу: раскидаем по игровому полю еду, и в том случае если питон её съедает, то питон растёт. В результате получится что-то вроде:

Что делаем сначала? А сначала инициируем массив с едой, где будет храниться координаты еды, и её тип. В зависимости от типа еды, ёё цвет будет разным. Так-же определим массив с доступными цветами еды, и количество еды доступной на поле одномоментно:

.

foods=[]    # массив координат с едой
max_foods=3 # сколько еды может быть одновременно на поле
color_food=[(100,200,150),(200,100,150),(140,20,150)]

Далее напишем функцию, которая генерирует распределение еды на поле случайным образом. При генерации проверяется, совпадение координат с телом питона — нужно не допустить генерацию «внутри» питона. Сюда же поместим и отрисовку сгенерированной еды:

# Генерируем и отрисовываем еду для питона
def Draw_foods(screen):
    for i in range(max_foods-len(foods)): # массив с едой всегда должен быть "полон"
        while True: # цикл генерации еды. Генерируем до тех пор пока сгенерированные координаты не устроят нас
            x=random.randint(0,size_x)+1
            y=random.randint(0,size_y)+1
            food_type = random.randint(0, len(color_food) - 1)
            if [x,y] not in python_body:
                break;
        foods.insert(0,[x,y,food_type])
    print(f"Нагенерировали еды: {foods}, теперь её нарисуем");
    for element in foods:
        coors=[element[0]*step_x-step_x/2,element[1]*step_y-step_y/2]
        pygame.draw.circle(screen,color_food[element[2]],coors,radius/2)

Далее чуть изменим функцию отрисовки движения питона Draw_python. А именно добавим проверку совпадения координат головы питона с координатами еды. Если совпадение есть — то еду удаляем, а хвост удлиняем (т.е. фактически не удаляем при движении)

 #проверяем: если голова совпадает с какойто едой, то еду удаляем, а жопу не удяляем
        del_tail = True
        for element in foods:
            if [element[0],element[1]] == head_coor:
                del_tail=False
                foods.remove(element)

        if del_tail==True:
            # зарисовываю черным жопу
            element=python_body[len(python_body) - 1]
            coors=[element[0]*step_x-step_x/2,element[1]*step_y-step_y/2]
            pygame.draw.circle(screen,(0,0,0),coors,radius)
            # удаляю последний элемент хвоста
            python_body.pop(-1)
            print(f"удалили жопу:{python_body}")

Код результат, можно скачать здесь

Пишем игру Питон на Python ;). Часть 1

В рамках компании по попытке обучения одного товарища программированию начал писать классическую игру Питон. С правилами: питон двигается по полю, если он что-то ест, то растёт. Если натыкается на стену или на самого себя — игрок проигрывает.

В этой части обучающей статьи, мы научимся вырисовывать питона и управлять им с клавиатуры. Результатом будет что-то вроде:

Для отрисовки графики и получения событий нажатия клавиш будем использовать библиотеку pygame

Итак начнем. Сначала зададим настроечные переменные для игры, как то размер поля, цвета и т.п.:

# объявляем переменные
FPS = 60
width_field=800 # размер поля по ширине
height_field=600 # размер поля по высоте
size_x=20   # количество клеток по ширине
size_y=20   # количество клеток по высоте
color_grid=(0,200,100)  # цвет линий ячеек
fill_grid=(0,0,0)       # цвет игровогого поля
color_python=(50,200,100) # цвет тела питона
step_x=round(width_field/size_x,0)  # шаг клетки по ширине
step_y=round(height_field/size_y,0) # шаг клетки по высоте
python_body=[[round(size_x/2,0),round(size_y/2,0)],[round((size_x-2)/2,0),round(size_y/2,0)],[round((size_x-4)/2,0),round(size_y/2,0)]] # массив тела питона
moved_direction_x=0 # текущее направление движения питона по оси x
moved_direction_y=0 # текущее направление движения питона по оси y

Затем создадим окно с указанными размерами , закрасим его чёрным цветом, и нарисуем сетку:

# Расчерчиваем поле
def Draw_grid_field(screen):
 xx=0;
 for x in range(size_x):
   pygame.draw.line(screen,color_grid,(xx,0),(xx,height_field))
   xx=xx+step_x;
 yy=0;
 for y in range(size_y):
   pygame.draw.line(screen,color_grid,(0,yy),(width_field,yy))
   yy=yy+step_y;

# Да начнётся игра..
pygame.init()
screen = pygame.display.set_mode((width_field, height_field))
screen.fill(fill_grid)
Draw_grid_field(screen)

Далее нам нужно создать бесконечный цикл, внутри которого будем ловить события нажатия стрелочек клавиатуры, и задавать вектор движения головы питона:

running = True
while running:
    for event in pygame.event.get():
        if event.type == pygame.KEYDOWN:
            if event.key == pygame.K_LEFT:
                moved_direction_x=-1
                moved_direction_y=0
            if event.key == pygame.K_RIGHT:
                moved_direction_x = 1
                moved_direction_y = 0
            if event.key == pygame.K_UP:
                moved_direction_y = -1
                moved_direction_x=0
            if event.key == pygame.K_DOWN:
                moved_direction_x=0
                moved_direction_y = 1
        if event.type == pygame.QUIT:
           running = False
    pygame.display.flip()
    pygame.display.update()
    clock.tick(FPS)
pygame.quit()
print("нормально вышли")

Если мы сейчас в этот цикл вставим событие отрисовки самого питона, то он будет двигаться с дикой скоростью. Ограничим её одной клеткой в секунду, используя измерение времени. В начале цикла замерим время начала игры, и внутри цикла, если видим, что прошла секунда — отрисовываем питона, для этого чуть изменим код:

start_time=time.time()
running = True
while running:
    if (time.time()-start_time)>1:
        start_time=time.time()
        Draw_python(screen)
    for event in pygame.event.get():
        if event.type == pygame.KEYDOWN:
            if event.key == pygame.K_LEFT:
                moved_direction_x=-1
                moved_direction_y=0
            if event.key == pygame.K_RIGHT:
                moved_direction_x = 1
                moved_direction_y = 0
            if event.key == pygame.K_UP:
                moved_direction_y = -1
                moved_direction_x=0
            if event.key == pygame.K_DOWN:
                moved_direction_x=0
                moved_direction_y = 1
        if event.type == pygame.QUIT:
           running = False
    pygame.display.flip()
    pygame.display.update()
    clock.tick(FPS)
pygame.quit()
print("нормально вышли")

Функции отрисовки питона (Draw_python) пока нет. Создадим её.. В начале функции посчитаем радиус кружка тела. Потом если вектор направления задан, то голову передвинем на вектор, а жопу питона удалим. Таким образом создадим видимость движения питона:

# рисуем питона
def Draw_python(screen):
    radius = (round(step_x / 2)-1) if round(step_x / 2) < round(step_y / 2) else (round(step_y / 2)-1)  # выбираем радиус по ширине или высоте шага, смотря что меньше, чтоб вписаться в прямоугольник
    if moved_direction_x!=0 or moved_direction_y!=0:
        print(f"Пришли:{python_body}")
        head_coor = python_body[0].copy()
        head_coor[0] = head_coor[0] + moved_direction_x;
        head_coor[1] = head_coor[1] + moved_direction_y;
        print(f"-тело{python_body},голова: {head_coor}");
        python_body.insert(0, head_coor) # передвигаю голову
        print(f"Передвинули голову:{python_body}")

        # удаляю жопу
        element=python_body[len(python_body) - 1]
        coors=[element[0]*step_x-step_x/2,element[1]*step_y-step_y/2]
        pygame.draw.circle(screen,(0,0,0),coors,radius)

        python_body.pop(-1)  # удаляю последний элемент хвоста
        print(f"удалили жопу:{python_body}")

    # рисую тушку
    for element in python_body:
        coors=[element[0]*step_x-step_x/2,element[1]*step_y-step_y/2]
        pygame.draw.circle(screen,color_python,coors,radius)

Скачать результат

1 2 3 4 5 13