Квест в консоли на Python. Часть 1

Дело было вечером, делать было нечего (с). Ну не то чтобы совсем нечего, но выдалась свободное немножко время, поэтому для того чтобы не забыть (да уж чего там, и вспомнить уже) окончательно Python, решил сделать маленький движёк для текстовых квестов с выполняющихся в консоли (квест в консоли).

Сначала определимся что где и как:

  1. Локации будем описывать в формате json
  2. Локации будем складывать в папку locations. Имена файлов — номер локации.
  3. В ходе квеста можно «сохраняться», чтобы была возможность продолжить квест
  4. Сохранения будем хранить в папке saves
  5. Все классы храним в папке classes

В результате у меня получилась такая структура папок и файлов:

квест в консоли

Первым делом нарисую минимальный json стартовой локации:

{
  "title": "Стартовая страница",
  "description": "Стартовая страница описание",
  "available_locations": {
    "left": 1,
    "right": 2,
    "forward": 3,
    "back": 4
  }
}

Т.е. начинаем на стартовой локации (0), доступны переходы в локации 1,2,3 и 4.

Далее реализуем класс игрока, с реализацией функционала сохранения и стадии прохождения квеста:

from datetime import datetime as dt
import json
import os


class TPlayer:
    data = {}

    def __init__(self, name):
        self.data["name"] = name  # Имя пользователя
        self.data["location"] = 0  # текущая локация

    def load(self, filename):
        """
        Загрузить состояние квеста из файла
        :param filename: имя файла из папки saves
        """
        f = open("saves/" + filename, mode='r', encoding='utf-8')
        self.data = json.load(f)
        f.close()

    def list_saves(self):
        """
            Показать доступные сохранения
        """
        files = os.listdir("saves")
        print(files)

    def save(self):
        """
            Сохранить текущее состояние пользователя
        """
        time = dt.now()
        filename = time.strftime("%d_%m_%Y_%H_%M") + ".save"
        print(filename)
        f = open("saves/" + filename, mode='w', encoding='utf-8')
        json.dump(self.data, f)
        f.close()

В главном файле (main.py), реализуем проверку аргументов командной строки и переход к началу квеста:

#!/usr/bin/env python3
# encoding: utf-8
import classes.player as tplayer
import classes.location as tlocation
import sys

player = tplayer.TPlayer("Васян")
location = tlocation.TLocation()


def start_location():
    print(player.data["name"])
    location.location_view(player.data["location"])

if __name__ == '__main__':
    for param in sys.argv:
        if param == "--load":
            if len(sys.argv) == 2:
                print("Ошибка: нет имени файла")
                exit(0)
            player.load(sys.argv[2])
            start_location()
            exit(0)
        if param == "--new":
            exit(0)
        if param == "--list":
            player.list_saves()
            exit(0)
    if len(sys.argv) == 1:
        print("Для запуска квеста необходимо использовать следующие параметры:")
        print("  --load   - загрузить сохранение и начать квест")
        print("  --list   - получить список сохранений")
        print("  --new <имя участника>    - начать квест заново")
        exit()

Разработка «квест в консоли» может быть действительно просто.. Вы можете посмотреть и другие мои статьи посвященные разработке на Python

Python: ошибка установки cv2

При попытке установить:

pip install cv2

Получаем ошибку:

ERROR: Could not find a version that satisfies the requirement cv2 (from versions: none)

А всё просто. Почему то авторы пакета обозвали его в репозитарии pip как opencv-python. Соответственно и устанавливать его нужно так:

pip install opencv-python

А использовать так:

import cv2

Сохранение весов модели нейросети

В продолжении статьи Создаём нейросеть на Python, Итак сеть создали, натренировали. А что делать чтобы решение можно было определять по картинке лето или зима, не обучая каждый раз модель заново? Ну так ведь можно просто сохранить полученные веса нейросети в файл, А затем уже их загружать в случае необходимости.

В Tensotflow уже есть встроенный функционал для сохранения весов в файл. Для этого используется функция вызова кэлбека после прохождения каждого шага обучения. В моём случае промежуточные модели обучения не нужны, потому параметр save_freq делаю равным количеству итераций обучения:

traning_model_save=base_dir+"\save_model\cp.ckpt"
....
EPOCHS = 10
print("- настраиваем кэлбеки для сохранения натренированной модели")
# Создаем колбек для сохранения контрольной точки
cp_callback = tf.keras.callbacks.ModelCheckpoint(traning_model_save,
                                                 save_weights_only=True,
                                                 save_freq=EPOCHS,
                                                 verbose=1)
print("- тренируем модель")
history = model.fit(
    train_data_gen,
    steps_per_epoch=int(np.ceil(total_train / float(BATCH_SIZE))),
    epochs=EPOCHS,
    validation_data=val_data_gen,
    validation_steps=int(np.ceil(total_val / float(BATCH_SIZE))),
    callbacks = [cp_callback]   # вызываем кэлбек после каждого шага обучения
)

Так-же можно сохранять не только веса, но и всю модель целиком:

model.save('model.h5')

Набор скриптов для «умного дома» на Raspberry PI 3

Так уж получилось, что мой «умный» дом, это фактически небольшая кучка скриптов. Приведу пример основных из них, вдруг кому интересно будет…

Отправка/получение данных по радиоканалу с частотой 433Mhz:

Получение:

#!/usr/bin/env python3

import argparse
import signal
import sys
import time
import logging
from rpi_rf import RFDevice
import fcntl, sys,os

fp = open(os.path.realpath(__file__), 'r')
try:
    fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
    print('неужели другой мой экземпляр всё ещё работает?')
    sys.exit(0)

rfdevice = None
# pylint: disable=unused-argument
def exithandler(signal, frame):
    rfdevice.cleanup()
    sys.exit(0)

logging.basicConfig(level=logging.DEBUG, datefmt='%Y-%m-%d %H:%M:%S',
                    format='%(asctime)-15s - [%(levelname)s] %(module)s: %(message)s', )

parser = argparse.ArgumentParser(description='Receives a decimal code via a 433/315MHz GPIO device')
parser.add_argument('-g', dest='gpio', type=int, default=22,
                    help="GPIO pin (Default: 27)")
args = parser.parse_args()

signal.signal(signal.SIGINT, exithandler)
rfdevice = RFDevice(args.gpio)
rfdevice.enable_rx()
timestamp = None
logging.info("Listening for codes on GPIO " + str(args.gpio))
while True:
    if rfdevice.rx_code_timestamp != timestamp:
        timestamp = rfdevice.rx_code_timestamp
        logging.info(str(rfdevice.rx_code) +
                     " [pulselength " + str(rfdevice.rx_pulselength) +
                     ", protocol " + str(rfdevice.rx_proto) + "]")
    time.sleep(0.01)

Отправка:

#!/usr/bin/env python3

import argparse
import signal
import sys
import time
import logging
from rpi_rf import RFDevice
import fcntl, sys,os

fp = open(os.path.realpath(__file__), 'r')
try:
    fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
    print('неужели другой мой экземпляр всё ещё работает?')
    sys.exit(0)

logging.basicConfig(level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S',
                    format='%(asctime)-15s - [%(levelname)s] %(module)s: %(message)s',)

parser = argparse.ArgumentParser(description='Sends a decimal code via a 433/315MHz GPIO device')
parser.add_argument('code', metavar='CODE', type=int, help="Decimal code to send")
parser.add_argument('-g', dest='gpio', type=int, default=17, help="GPIO pin (Default: 17)")
parser.add_argument('-p', dest='pulselength', type=int, default=None,help="Pulselength (Default: 350)")
parser.add_argument('-t', dest='protocol', type=int, default=None,help="Protocol (Default: 1)")
parser.add_argument('-l', dest='length', type=int, default=None,help="Codelength (Default: 24)")
parser.add_argument('-r', dest='repeat', type=int, default=10,help="Repeat cycles (Default: 10)")
args = parser.parse_args()

rfdevice = RFDevice(args.gpio)
rfdevice.enable_tx()
rfdevice.tx_repeat = args.repeat

if args.protocol:
    protocol = args.protocol
else:
    protocol = "default"
if args.pulselength:
    pulselength = args.pulselength
else:
    pulselength = "default"
if args.length:
    length = args.length
else:
    length = "default"

logging.info(str(args.code) +
             " [protocol: " + str(protocol) +
             ", pulselength: " + str(pulselength) +
             ", length: " + str(length) +
             ", repeat: " + str(rfdevice.tx_repeat) + "]")

rfdevice.tx_code(args.code, args.protocol, args.pulselength, args.length)
rfdevice.cleanup()

Получение температуры/влажности с датчиков dh11:

#!/usr/bin/python
import Adafruit_DHT
import time
import mysql.connector
from mysql.connector import Error
import json
import os
import fcntl, sys

fp = open(os.path.realpath(__file__), 'r')
try:
    fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
    print('неужели другой мой экземпляр всё ещё работает?')
    sys.exit(0)

dir=os.path.dirname(os.path.abspath(__file__))
with open(dir+'/../config.json', 'r', encoding='utf-8') as f: #открыли файл с данными
    config = json.load(f) #загнали все, что получилось в переменную

print(config["db"]);


try:
    conn = mysql.connector.connect(host=config["db"]["host"],database=config["db"]["database"],user=config["db"]["username"],password=config["db"]["password"])
    if conn.is_connected(): print('Вроде соеденился!')
except Error as e:
    print(e);
    exit(0);

DHT_SENSOR = Adafruit_DHT.DHT11

sql="SELECT * FROM sources WHERE device=1";
if len(sys.argv)==2:
 id=sys.argv[1]
 sql=f"SELECT * FROM sources WHERE id='{id}'";

cursor2=conn.cursor(dictionary=True,buffered=True)
cursor2.execute(sql,[]);
myrow3 = cursor2.fetchone()
while myrow3 is not None:
  id=myrow3["id"]
  source=myrow3["id"]
  place=myrow3["place"]
  param=json.loads(myrow3["param"].decode("utf-8"))
  pin=param["pin"]
  print(f"-опрашиваю статус датчик {id},pin={pin}")
  cnt=3
  while cnt>0:
    humidity, temperature = Adafruit_DHT.read(DHT_SENSOR, pin)
    if humidity is not None and temperature is not None:
        if humidity<140:
         print("Temp={0:0.1f}C Humidity={1:0.1f}%".format(temperature, humidity))
         sql=f"insert into m_data (place,source,value_type,value,dt) values ({place},{source},1,{temperature},now());";
         cursor = conn.cursor(dictionary=True,buffered=True)
         cursor.execute(sql);
         conn.commit()
         cnt=0
    else:
        print("Sensor failure. Check wiring...");
    time.sleep(3);
    cnt=cnt-1
  myrow3 = cursor2.fetchone()
conn.commit()

exit(-1)

Мигаем светодиом:

#!/usr/bin/python
import RPi.GPIO as GPIO
import time
import fcntl, sys, os
from time import sleep
while  True:
    try:
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(23, GPIO.OUT)
        GPIO.output(23, True)
        time.sleep(0.5)
        GPIO.output(23, False)
        time.sleep(0.5)
    finally:
       print("clean up")
       GPIO.cleanup() 

Получение данных с микротика

Температура:

$fl = fopen("/tmp/".basename(__FILE__).".lock", "w");
    if( ! ( $fl && flock( $fl, LOCK_EX | LOCK_NB ) ) ) {
    die("--копия скрипта уже запущена!");
};


define('WUO_ROOT', dirname(__FILE__));
require_once WUO_ROOT.'/vendor/autoload.php';
require_once WUO_ROOT.'/../class/Tsql.php';

$config=json_decode(file_get_contents(WUO_ROOT."/../config.json"));
var_dump($config);


$sqln=new Tsql("mysql","m_data",$config->db->host,$config->db->username,$config->db->password);


$config = new \RouterOS\Config([
    'host' => '192.wefrwerfwe1',
    'user' => 'ewrfwerfe',
    'pass' => 'erwfwerfew',
    'port' => 8728,
]);
$client = new \RouterOS\Client($config);

$res=$client->query('/system/health/print')->read();

$temp=$res[0]["temperature"];

 $sql="insert into m_data (place,source,value_type,value,dt) values (10,12,1,'$temp',now())";
 $stmt=$sqln->dbh->prepare($sql);
 $stmt->execute();

Уровни WIFI сигналов:

$client = new \RouterOS\Client($config);

$res=$client->query('/interface/wireless/registration-table/print')->read();

var_dump($res);

WiFi реле Sonoff DIY 3:

Текущий статус:

#!/usr/bin/python3
import pymysql
import requests
from urllib3.exceptions import InsecureRequestWarning
from mysql.connector import Error
import mysql.connector
import random
import os,fcntl, sys
import json

sys.path.insert(0, '/root/scripts/includes')
import functions

fp = open(os.path.realpath(__file__), 'r')
try:
    fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
    print('неужели другой мой экземпляр всё ещё работает?')
    sys.exit(0)


dir=os.path.dirname(os.path.abspath(__file__))
with open(dir+'/../config.json', 'r', encoding='utf-8') as f: #открыли файл с данными
    config = json.load(f) #загнали все, что получилось в переменную

def GetSonoffStatus(ip):
 ret={"error":True,"signal":0,"switch":True}

 requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
 post_params = '{"deviceid": "","data": {}}'

 try:
    response = requests.post(f"http://{ip}:8081/zeroconf/info", data=post_params, verify=False)
    res = response.json()
    print(f"пришло:{res}")

    if type(res['data'])==str:
      res["data"]=json.loads(res["data"])

    if res["data"]["switch"]=="off":
       ret["switch"]=False

    if "signalStrength" in res["data"]:
       ret["signal"]=res["data"]["signalStrength"]
    ret["error"]=False

 except Exception as e:
    print(f"Ошибка:{e}")


# соединяемся с БД
try:
    conn = mysql.connector.connect(host=config["db"]["host"],database=config["db"]["database"],user=config["db"]["username"],password=config["db"]["password"])
    if conn.is_connected(): print('Вроде соеденился!')
except Error as e:
    print(e);
    exit(0);

sql="SELECT * FROM sources WHERE device=3";
if len(sys.argv)==2:
 ip=sys.argv[1]
 sql=f"SELECT * FROM sources WHERE ip='{ip}'";

cursor2=conn.cursor(dictionary=True,buffered=True)
cursor2.execute(sql,[]);
myrow3 = cursor2.fetchone()
while myrow3 is not None:
  ip=myrow3["ip"].decode("utf-8")
  source=myrow3["id"]
  place=myrow3["place"]
  deviceid=myrow3["deviceid"].decode("utf-8")
  print(f"-опрашиваю статус реле {ip}")
  res=functions.GetSonoffStatus(ip)
  if res["error"]==False:
    sql=f"insert into m_data (place,source,value_type,value,dt) values ({place},{source},3,{res['switch']},now())";
    cursor = conn.cursor(dictionary=True,buffered=True)
    cursor.execute(sql);
    conn.commit()
    sql=f"insert into m_data (place,source,value_type,value,dt) values ({place},{source},4,{res['signal']},now())";
    cursor = conn.cursor(dictionary=True,buffered=True)
    cursor.execute(sql);
    conn.commit()
  myrow3 = cursor2.fetchone()
conn.commit()

Переключение реле:

def SonoffReleSwitch(conn,ip,deviceid,rele):
 requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
 if rele==True:
   post_params = '{"deviceid": "","data": {"switch":"on"}}'
 else:
     post_params = '{"deviceid": "","data": {"switch":"off"}}'
 # переключаю реле
 try:
    response = requests.post(f"http://{ip}:8081/zeroconf/switch", data=post_params, verify=False)
    res = response.json()
    print(f"---пришло:{res}")
    if res["error"]==0:
       print("- ошибок нет!");
       return True
    return False
 except requests.exceptions.HTTPError as errh:
  print("HTTP Error")
 except requests.exceptions.ReadTimeout as errrt:
  print("Time out")
 except requests.exceptions.ConnectionError as conerr:
  print("Connection error")
 return False

Python GPIO: This channel is already in use, continuing anyway

Такая ошибка чаще всего возникает, если пытаетесь включить/выключить ногу работа с которой уже ранее была начата.

Решение: после того как с пином поработали, необходимо сбросить работу с пинами. Как например делаю это я при работе с реле.

Включаем реле:

#!/usr/bin/python
import RPi.GPIO as GPIO

GPIO.setmode(GPIO.BCM)
GPIO.setup(9, GPIO.OUT)
GPIO.output(9, False)

Выключаем реле:

#!/usr/bin/python
import RPi.GPIO as GPIO

GPIO.setmode(GPIO.BCM)
GPIO.setup(9, GPIO.OUT)
GPIO.output(9, True)
GPIO.cleanup()
1 2 3 4 13