Квест в консоли на Python. Часть 2

Итак, продолжаем продолжаем писать квест в консоли на языке Python. Первая часть описана здесь. В ней мы реализовали автоматическую загрузку и сохранение состояния прохождения квеста. Сейчас же займемся (начнем по крайне мере) отрисовкой локации, и реакцией на нажатые кнопки. В локации предусмотрим возможность отображения картинки из ASCII. Например json стартовой локации может выглядеть примерно так:

{
  "title": "Стартовая локация",
  "description": "Стартовая локация описание",
  "ascii_art": "                    __            ================================\n         ALCATRAZ  /__\\            ||     ||<(.)>||<(.)>||     || \n       ____________|  |            ||    _||     ||     ||_    || \n       |_|_|_|_|_|_|  |            ||   (__D     ||     C__)   || \n       |_|_|_|_|_|_|__|            ||   (__D     ||     C__)   ||\n      A@\\|_|_|_|_|_|/@@Aa          ||   (__D     ||     C__)   ||\n   aaA@@@@@@@@@@@@@@@@@@@aaaA      ||   (__D     ||     C__)   ||\n  A@@@@@@@@@@@DWB@@@@@@@@@@@@A     ||     ||     ||     ||  dwb||\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^  ================================",
  "available_locations": {
    "left": 1,
    "right": 2,
    "forward": 3,
    "back": 4
  }
}

Создадим класс TLocation, при инициалиизации будем передавать в него инициализированный класс player. В переменной класса data — будем хранить загруженную локацию.

class TLocation:
    data = {}
    player={}
    stdscr=curses.initscr()
    scr_size=stdscr.getmaxyx()
    def __init__(self,player):
        self.player=player;
    def load_location(self, location):
        f = open("locations/" + str(location) + ".json", mode='r', encoding='utf-8')
        self.data = json.load(f)
        f.close()

Далее нарисуем верхнее меню, где сообщаем игроку, на какие локации он может перемещаться, и что он держит в руках:

    def top_menu(self):
        loc="Идти: "
        if "left" in self.data["available_locations"]:
            loc=loc+"влево(4) "
        if "right" in self.data["available_locations"]:
            loc=loc+"вправо(6) "
        if "forward" in self.data["available_locations"]:
            loc=loc+"вперед(8) "
        if "back" in self.data["available_locations"]:
            loc=loc+"назад(2) "
        curses.start_color()
        curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK)
        self.stdscr.addstr(1, 1, loc,curses.color_pair(1))
        curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK)
        self.stdscr.addstr(1, 1, loc,curses.color_pair(1))
        # что в руках
        hands = "В руках: "
        self.stdscr.addstr(2, 1, "")
        for object in self.player.data["in_hands"]:
            hands=hands+"["+object+"]"
        if len(self.player.data["in_hands"])==0:
            hands = hands+"ничего нет"
        self.stdscr.addstr(2, 1, hands, curses.color_pair(1))

Ну и собственно основной код отрисовки локации, включающий бесконечный цикл ожидания нажатий клавиатуры. Предусматриваем переход на другую локацию, выход из игры и сохранение игры.

    def location_view(self, location):
        self.load_location(location)
        self.stdscr.clear()
        self.stdscr.border()
        self.top_menu()  # рисуем верхнее меню
        # название локации
        curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK)
        x = int((self.scr_size[1] - len(self.data["title"])) / 2)
        self.stdscr.addstr(2, x, self.data["title"],curses.color_pair(2))
        # рисуем рисунок ежели он есть
        y=3
        if "ascii_art" in self.data:
            mass_art=self.data["ascii_art"].split("\n")
            i=0
            while i<len(mass_art):
                x=int((self.scr_size[1]-len(mass_art[i]))/2)
                self.stdscr.addstr(y+i, x, mass_art[i])
                i+=1
            y = y + len(mass_art);
         # Выводим описательную часть
        curses.init_pair(3, curses.COLOR_CYAN, curses.COLOR_BLACK)
        self.stdscr.addstr(y, 1, self.data["description"],curses.color_pair(3))
        # Выводим нижнее меню
        curses.init_pair(4, curses.COLOR_RED, curses.COLOR_BLACK)
        self.stdscr.addstr(y + 1, 1, "Осмотреться вокруг [v] Применить что в руках [h]", curses.color_pair(4))
        self.stdscr.addstr(y + 2, 1, "Выйти из квеста [q] Сохранить состояние [r]", curses.color_pair(4))
        self.stdscr.refresh()
        while True:
            key=self.stdscr.getch()
            print(key)
            # реализация перехода с локации на локацию
            if key==52 and "left" in self.data["available_locations"]:
                self.player.data["location"]=self.data["available_locations"]["left"]
                self.location_view(self.player.data["location"])
            if key==54 and "right" in self.data["available_locations"]:
                self.player.data["location"]=self.data["available_locations"]["right"]
                self.location_view(self.player.data["location"])
            if key==56 and "forward" in self.data["available_locations"]:
                self.player.data["location"]=self.data["available_locations"]["forward"]
                self.location_view(self.player.data["location"])
            if key==50 and "back" in self.data["available_locations"]:
                self.player.data["location"]=self.data["available_locations"]["back"]
                self.location_view(self.player.data["location"])
            if key == 114:
                self.player.save()
                #self.message("Внимание!","Состояние прохождения завершено. \nФайл находится в папке /saves")
            if key==113:
                curses.reset_shell_mode()
                curses.endwin()
                exit(0)
        print(self.data)

В результате картинка (квест в консоли) на мониторе выглядит уже чуть симпатичнее:

 квест в консоли

Квест в консоли на Python. Часть 1

Дело было вечером, делать было нечего (с). Ну не то чтобы совсем нечего, но выдалась свободное немножко время, поэтому для того чтобы не забыть (да уж чего там, и вспомнить уже) окончательно Python, решил сделать маленький движёк для текстовых квестов с выполняющихся в консоли (квест в консоли).

Сначала определимся что где и как:

  1. Локации будем описывать в формате json
  2. Локации будем складывать в папку locations. Имена файлов — номер локации.
  3. В ходе квеста можно «сохраняться», чтобы была возможность продолжить квест
  4. Сохранения будем хранить в папке saves
  5. Все классы храним в папке classes

В результате у меня получилась такая структура папок и файлов:

квест в консоли

Первым делом нарисую минимальный json стартовой локации:

{
  "title": "Стартовая страница",
  "description": "Стартовая страница описание",
  "available_locations": {
    "left": 1,
    "right": 2,
    "forward": 3,
    "back": 4
  }
}

Т.е. начинаем на стартовой локации (0), доступны переходы в локации 1,2,3 и 4.

Далее реализуем класс игрока, с реализацией функционала сохранения и стадии прохождения квеста:

from datetime import datetime as dt
import json
import os


class TPlayer:
    data = {}

    def __init__(self, name):
        self.data["name"] = name  # Имя пользователя
        self.data["location"] = 0  # текущая локация

    def load(self, filename):
        """
        Загрузить состояние квеста из файла
        :param filename: имя файла из папки saves
        """
        f = open("saves/" + filename, mode='r', encoding='utf-8')
        self.data = json.load(f)
        f.close()

    def list_saves(self):
        """
            Показать доступные сохранения
        """
        files = os.listdir("saves")
        print(files)

    def save(self):
        """
            Сохранить текущее состояние пользователя
        """
        time = dt.now()
        filename = time.strftime("%d_%m_%Y_%H_%M") + ".save"
        print(filename)
        f = open("saves/" + filename, mode='w', encoding='utf-8')
        json.dump(self.data, f)
        f.close()

В главном файле (main.py), реализуем проверку аргументов командной строки и переход к началу квеста:

#!/usr/bin/env python3
# encoding: utf-8
import classes.player as tplayer
import classes.location as tlocation
import sys

player = tplayer.TPlayer("Васян")
location = tlocation.TLocation()


def start_location():
    print(player.data["name"])
    location.location_view(player.data["location"])

if __name__ == '__main__':
    for param in sys.argv:
        if param == "--load":
            if len(sys.argv) == 2:
                print("Ошибка: нет имени файла")
                exit(0)
            player.load(sys.argv[2])
            start_location()
            exit(0)
        if param == "--new":
            exit(0)
        if param == "--list":
            player.list_saves()
            exit(0)
    if len(sys.argv) == 1:
        print("Для запуска квеста необходимо использовать следующие параметры:")
        print("  --load   - загрузить сохранение и начать квест")
        print("  --list   - получить список сохранений")
        print("  --new <имя участника>    - начать квест заново")
        exit()

Разработка «квест в консоли» может быть действительно просто.. Вы можете посмотреть и другие мои статьи посвященные разработке на Python

Python: ошибка установки cv2

При попытке установить:

pip install cv2

Получаем ошибку:

ERROR: Could not find a version that satisfies the requirement cv2 (from versions: none)

А всё просто. Почему то авторы пакета обозвали его в репозитарии pip как opencv-python. Соответственно и устанавливать его нужно так:

pip install opencv-python

А использовать так:

import cv2

Сохранение весов модели нейросети

В продолжении статьи Создаём нейросеть на Python, Итак сеть создали, натренировали. А что делать чтобы решение можно было определять по картинке лето или зима, не обучая каждый раз модель заново? Ну так ведь можно просто сохранить полученные веса нейросети в файл, А затем уже их загружать в случае необходимости.

В Tensotflow уже есть встроенный функционал для сохранения весов в файл. Для этого используется функция вызова кэлбека после прохождения каждого шага обучения. В моём случае промежуточные модели обучения не нужны, потому параметр save_freq делаю равным количеству итераций обучения:

traning_model_save=base_dir+"\save_model\cp.ckpt"
....
EPOCHS = 10
print("- настраиваем кэлбеки для сохранения натренированной модели")
# Создаем колбек для сохранения контрольной точки
cp_callback = tf.keras.callbacks.ModelCheckpoint(traning_model_save,
                                                 save_weights_only=True,
                                                 save_freq=EPOCHS,
                                                 verbose=1)
print("- тренируем модель")
history = model.fit(
    train_data_gen,
    steps_per_epoch=int(np.ceil(total_train / float(BATCH_SIZE))),
    epochs=EPOCHS,
    validation_data=val_data_gen,
    validation_steps=int(np.ceil(total_val / float(BATCH_SIZE))),
    callbacks = [cp_callback]   # вызываем кэлбек после каждого шага обучения
)

Так-же можно сохранять не только веса, но и всю модель целиком:

model.save('model.h5')

Набор скриптов для «умного дома» на Raspberry PI 3

Так уж получилось, что мой «умный» дом, это фактически небольшая кучка скриптов. Приведу пример основных из них, вдруг кому интересно будет…

Отправка/получение данных по радиоканалу с частотой 433Mhz:

Получение:

#!/usr/bin/env python3

import argparse
import signal
import sys
import time
import logging
from rpi_rf import RFDevice
import fcntl, sys,os

fp = open(os.path.realpath(__file__), 'r')
try:
    fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
    print('неужели другой мой экземпляр всё ещё работает?')
    sys.exit(0)

rfdevice = None
# pylint: disable=unused-argument
def exithandler(signal, frame):
    rfdevice.cleanup()
    sys.exit(0)

logging.basicConfig(level=logging.DEBUG, datefmt='%Y-%m-%d %H:%M:%S',
                    format='%(asctime)-15s - [%(levelname)s] %(module)s: %(message)s', )

parser = argparse.ArgumentParser(description='Receives a decimal code via a 433/315MHz GPIO device')
parser.add_argument('-g', dest='gpio', type=int, default=22,
                    help="GPIO pin (Default: 27)")
args = parser.parse_args()

signal.signal(signal.SIGINT, exithandler)
rfdevice = RFDevice(args.gpio)
rfdevice.enable_rx()
timestamp = None
logging.info("Listening for codes on GPIO " + str(args.gpio))
while True:
    if rfdevice.rx_code_timestamp != timestamp:
        timestamp = rfdevice.rx_code_timestamp
        logging.info(str(rfdevice.rx_code) +
                     " [pulselength " + str(rfdevice.rx_pulselength) +
                     ", protocol " + str(rfdevice.rx_proto) + "]")
    time.sleep(0.01)

Отправка:

#!/usr/bin/env python3

import argparse
import signal
import sys
import time
import logging
from rpi_rf import RFDevice
import fcntl, sys,os

fp = open(os.path.realpath(__file__), 'r')
try:
    fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
    print('неужели другой мой экземпляр всё ещё работает?')
    sys.exit(0)

logging.basicConfig(level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S',
                    format='%(asctime)-15s - [%(levelname)s] %(module)s: %(message)s',)

parser = argparse.ArgumentParser(description='Sends a decimal code via a 433/315MHz GPIO device')
parser.add_argument('code', metavar='CODE', type=int, help="Decimal code to send")
parser.add_argument('-g', dest='gpio', type=int, default=17, help="GPIO pin (Default: 17)")
parser.add_argument('-p', dest='pulselength', type=int, default=None,help="Pulselength (Default: 350)")
parser.add_argument('-t', dest='protocol', type=int, default=None,help="Protocol (Default: 1)")
parser.add_argument('-l', dest='length', type=int, default=None,help="Codelength (Default: 24)")
parser.add_argument('-r', dest='repeat', type=int, default=10,help="Repeat cycles (Default: 10)")
args = parser.parse_args()

rfdevice = RFDevice(args.gpio)
rfdevice.enable_tx()
rfdevice.tx_repeat = args.repeat

if args.protocol:
    protocol = args.protocol
else:
    protocol = "default"
if args.pulselength:
    pulselength = args.pulselength
else:
    pulselength = "default"
if args.length:
    length = args.length
else:
    length = "default"

logging.info(str(args.code) +
             " [protocol: " + str(protocol) +
             ", pulselength: " + str(pulselength) +
             ", length: " + str(length) +
             ", repeat: " + str(rfdevice.tx_repeat) + "]")

rfdevice.tx_code(args.code, args.protocol, args.pulselength, args.length)
rfdevice.cleanup()

Получение температуры/влажности с датчиков dh11:

#!/usr/bin/python
import Adafruit_DHT
import time
import mysql.connector
from mysql.connector import Error
import json
import os
import fcntl, sys

fp = open(os.path.realpath(__file__), 'r')
try:
    fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
    print('неужели другой мой экземпляр всё ещё работает?')
    sys.exit(0)

dir=os.path.dirname(os.path.abspath(__file__))
with open(dir+'/../config.json', 'r', encoding='utf-8') as f: #открыли файл с данными
    config = json.load(f) #загнали все, что получилось в переменную

print(config["db"]);


try:
    conn = mysql.connector.connect(host=config["db"]["host"],database=config["db"]["database"],user=config["db"]["username"],password=config["db"]["password"])
    if conn.is_connected(): print('Вроде соеденился!')
except Error as e:
    print(e);
    exit(0);

DHT_SENSOR = Adafruit_DHT.DHT11

sql="SELECT * FROM sources WHERE device=1";
if len(sys.argv)==2:
 id=sys.argv[1]
 sql=f"SELECT * FROM sources WHERE id='{id}'";

cursor2=conn.cursor(dictionary=True,buffered=True)
cursor2.execute(sql,[]);
myrow3 = cursor2.fetchone()
while myrow3 is not None:
  id=myrow3["id"]
  source=myrow3["id"]
  place=myrow3["place"]
  param=json.loads(myrow3["param"].decode("utf-8"))
  pin=param["pin"]
  print(f"-опрашиваю статус датчик {id},pin={pin}")
  cnt=3
  while cnt>0:
    humidity, temperature = Adafruit_DHT.read(DHT_SENSOR, pin)
    if humidity is not None and temperature is not None:
        if humidity<140:
         print("Temp={0:0.1f}C Humidity={1:0.1f}%".format(temperature, humidity))
         sql=f"insert into m_data (place,source,value_type,value,dt) values ({place},{source},1,{temperature},now());";
         cursor = conn.cursor(dictionary=True,buffered=True)
         cursor.execute(sql);
         conn.commit()
         cnt=0
    else:
        print("Sensor failure. Check wiring...");
    time.sleep(3);
    cnt=cnt-1
  myrow3 = cursor2.fetchone()
conn.commit()

exit(-1)

Мигаем светодиом:

#!/usr/bin/python
import RPi.GPIO as GPIO
import time
import fcntl, sys, os
from time import sleep
while  True:
    try:
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(23, GPIO.OUT)
        GPIO.output(23, True)
        time.sleep(0.5)
        GPIO.output(23, False)
        time.sleep(0.5)
    finally:
       print("clean up")
       GPIO.cleanup() 

Получение данных с микротика

Температура:

$fl = fopen("/tmp/".basename(__FILE__).".lock", "w");
    if( ! ( $fl && flock( $fl, LOCK_EX | LOCK_NB ) ) ) {
    die("--копия скрипта уже запущена!");
};


define('WUO_ROOT', dirname(__FILE__));
require_once WUO_ROOT.'/vendor/autoload.php';
require_once WUO_ROOT.'/../class/Tsql.php';

$config=json_decode(file_get_contents(WUO_ROOT."/../config.json"));
var_dump($config);


$sqln=new Tsql("mysql","m_data",$config->db->host,$config->db->username,$config->db->password);


$config = new \RouterOS\Config([
    'host' => '192.wefrwerfwe1',
    'user' => 'ewrfwerfe',
    'pass' => 'erwfwerfew',
    'port' => 8728,
]);
$client = new \RouterOS\Client($config);

$res=$client->query('/system/health/print')->read();

$temp=$res[0]["temperature"];

 $sql="insert into m_data (place,source,value_type,value,dt) values (10,12,1,'$temp',now())";
 $stmt=$sqln->dbh->prepare($sql);
 $stmt->execute();

Уровни WIFI сигналов:

$client = new \RouterOS\Client($config);

$res=$client->query('/interface/wireless/registration-table/print')->read();

var_dump($res);

WiFi реле Sonoff DIY 3:

Текущий статус:

#!/usr/bin/python3
import pymysql
import requests
from urllib3.exceptions import InsecureRequestWarning
from mysql.connector import Error
import mysql.connector
import random
import os,fcntl, sys
import json

sys.path.insert(0, '/root/scripts/includes')
import functions

fp = open(os.path.realpath(__file__), 'r')
try:
    fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
    print('неужели другой мой экземпляр всё ещё работает?')
    sys.exit(0)


dir=os.path.dirname(os.path.abspath(__file__))
with open(dir+'/../config.json', 'r', encoding='utf-8') as f: #открыли файл с данными
    config = json.load(f) #загнали все, что получилось в переменную

def GetSonoffStatus(ip):
 ret={"error":True,"signal":0,"switch":True}

 requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
 post_params = '{"deviceid": "","data": {}}'

 try:
    response = requests.post(f"http://{ip}:8081/zeroconf/info", data=post_params, verify=False)
    res = response.json()
    print(f"пришло:{res}")

    if type(res['data'])==str:
      res["data"]=json.loads(res["data"])

    if res["data"]["switch"]=="off":
       ret["switch"]=False

    if "signalStrength" in res["data"]:
       ret["signal"]=res["data"]["signalStrength"]
    ret["error"]=False

 except Exception as e:
    print(f"Ошибка:{e}")


# соединяемся с БД
try:
    conn = mysql.connector.connect(host=config["db"]["host"],database=config["db"]["database"],user=config["db"]["username"],password=config["db"]["password"])
    if conn.is_connected(): print('Вроде соеденился!')
except Error as e:
    print(e);
    exit(0);

sql="SELECT * FROM sources WHERE device=3";
if len(sys.argv)==2:
 ip=sys.argv[1]
 sql=f"SELECT * FROM sources WHERE ip='{ip}'";

cursor2=conn.cursor(dictionary=True,buffered=True)
cursor2.execute(sql,[]);
myrow3 = cursor2.fetchone()
while myrow3 is not None:
  ip=myrow3["ip"].decode("utf-8")
  source=myrow3["id"]
  place=myrow3["place"]
  deviceid=myrow3["deviceid"].decode("utf-8")
  print(f"-опрашиваю статус реле {ip}")
  res=functions.GetSonoffStatus(ip)
  if res["error"]==False:
    sql=f"insert into m_data (place,source,value_type,value,dt) values ({place},{source},3,{res['switch']},now())";
    cursor = conn.cursor(dictionary=True,buffered=True)
    cursor.execute(sql);
    conn.commit()
    sql=f"insert into m_data (place,source,value_type,value,dt) values ({place},{source},4,{res['signal']},now())";
    cursor = conn.cursor(dictionary=True,buffered=True)
    cursor.execute(sql);
    conn.commit()
  myrow3 = cursor2.fetchone()
conn.commit()

Переключение реле:

def SonoffReleSwitch(conn,ip,deviceid,rele):
 requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
 if rele==True:
   post_params = '{"deviceid": "","data": {"switch":"on"}}'
 else:
     post_params = '{"deviceid": "","data": {"switch":"off"}}'
 # переключаю реле
 try:
    response = requests.post(f"http://{ip}:8081/zeroconf/switch", data=post_params, verify=False)
    res = response.json()
    print(f"---пришло:{res}")
    if res["error"]==0:
       print("- ошибок нет!");
       return True
    return False
 except requests.exceptions.HTTPError as errh:
  print("HTTP Error")
 except requests.exceptions.ReadTimeout as errrt:
  print("Time out")
 except requests.exceptions.ConnectionError as conerr:
  print("Connection error")
 return False

1 2 3 4 13