Блокировка повторного запуска скрипта на Python

Обычно для того чтобы исключить повторный запуск скрипта на python используют классический способ:

import os,fcntl
fp = open(os.path.realpath(__file__), 'r')
try:
    fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
    sys.exit(0)

И он хорошо срабатывает при периодическом запуске например из крона скрипта с какойто периодичностью, для проверки «а закончилась ли работа предыдущего запуска?». Но дело в том, что иногда пишут скрипт которые работает с разным функционалом в зависимости от параметров запуска. Тогда в этом случае вполне допустим запуск скрипта, но с другим параметром. Для того чтобы обойти этот момент, я стал использовать несколько другой способ. А именно: опрашиваю список запущенных в текущий момент процессов и смотрю параметры их запуска. Если нахожу совпадение, то выхожу. Если нет — позволяю скрипту работать дальше. Вышло примерно так:

import psutil
def AYouRun(script_param):
    pids=psutil.pids()
    cnt=0
    for pid in pids:
        p = psutil.Process(pid)
        res=p.cmdline()
        if script_param in res:
            cnt=cnt+1
    if cnt>1:
        print(f"--скрипт с параметром {script_param} уже запущен")
        exit(-1)
    return True
..
..
if __name__ == '__main__':
    for param in sys.argv:
        if param == "--telegram-news":
            if AYouRun(param):
                Insert2Log("Запущен мониторинг телеграм новостей",1)
                client = TelegramClient("parser_data", global_config["telegram_api_id"], global_config["telegram_api_hash"])
..
..

Парсинг новостей групп VK

В продолжение предыдущей статьи, появилась необходимость парсить так-же и новости в социальной сети vk с проверкой на наличие стоп-слов. Для этого воспользовался модулем vk на python. Так-же понадобится токен доступа полученный на https://vk.com/apps?act=manage

В итоге код получился примерно следующий:

#!/usr/bin/env python3
# encoding: utf-8
import vk
import json
import funcs

with open('config.json', 'r') as file:
    config_data = json.load(file)
    print(config_data)


api = vk.API(access_token=config_data["vk_api_token"],v='5.131')

for group in config_data["groups"]:
    chan_data = funcs.get_chan_json(group)
    skeep_after = chan_data["las_id"]
    wall_content = api.wall.get(domain=group, count=config_data["limit"])
    poz = 0
    for message in wall_content["items"]:
        #print(message)
        if poz == 0:
            chan_data["las_id"] = message["id"]
            funcs.save_chan_json(group, chan_data)
        if skeep_after == message["id"]:
            print("Все новости уже прочитаны...")
            break
        for word in config_data["alert_words"]:
          if word in message["text"]:
              print(f"--нашли слово {word}")
              funcs.SendMailVK(config_data,group, word, message)
              print(message)
        poz = poz + 1
print("all done..");

По сути код очень простой — получаем через API VK все последние новости из каждой группы. Если в тексте новости находим стоп-слово, то отправляем соответствующее письмо. Так-же использую дополнительный файл функций, которые далее использую во всех парсерах:

#!/usr/bin/env python3
#encoding: utf-8
import json
from datetime import datetime
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from telethon.tl.functions.contacts import ResolveUsernameRequest
from telethon.tl.functions.channels import GetMessagesRequest
from telethon.tl.functions.messages import GetHistoryRequest, ReadHistoryRequest

def save_chan_json(chan,chan_data):
    f = open('saves/'+chan + '.json', "w+")
    json.dump(chan_data, f)
    f.close()

def get_chan_json(chan):
    # узнаём какое последнее сообщение прочитали на канале?
    chan_data = {}
    chan_data["las_id"] = 0
    try:
        with open('saves/'+chan + '.json', 'r') as file:
            chan_data = json.load(file)
            print(chan_data)
            return chan_data
    except:
        save_chan_json(chan, chan_data)
        return chan_data
    return chan_data;


def SendMail(config_data,chan,word,message):
    msg = MIMEMultipart()
    msg['Subject'] = f"Найдено слово '{word}' в новости на канале {chan} в Телеграм"
    msg.add_header('Content-Type', 'text/html')
    message.text=message.text.replace(word,"<strong>"+word+"</strong>")
    dt_pub=message.date.strftime('%d-%m-%Y %H:%M:%S')
    msg.set_payload(f"Канал: <a href='https://vc.com/{chan}'>https://t.me/{chan}</a>, опубликовано {dt_pub}<hr/>"+message.text)

    smtpObj = smtplib.SMTP(config_data["smtp_server"], config_data["smtp_port"])
    smtpObj.starttls()
    smtpObj.login(config_data["email_login"], config_data["from_password"])
    smtpObj.sendmail(config_data["email_from"], config_data["notify_email"], msg.as_string().encode('utf-8'))
    smtpObj.quit()

def SendMailVK(config_data,chan,word,message):
    msg = MIMEMultipart()
    msg['Subject'] = f"Найдено слово '{word}' в новости на группе {chan} в VK"
    msg.add_header('Content-Type', 'text/html')
    message["text"]=message["text"].replace(word,"<strong>"+word+"</strong>")
    dt_pub=datetime.utcfromtimestamp(message["date"]).strftime('%d-%m-%Y %H:%M:%S')
    msg.set_payload(f"Группа: <a href='https://vc.com/{chan}'>https://vk.com/{chan}</a>, опубликовано {dt_pub}<hr/>"+message["text"])

    smtpObj = smtplib.SMTP(config_data["smtp_server"], config_data["smtp_port"])
    smtpObj.starttls()
    smtpObj.login(config_data["email_login"], config_data["from_password"])
    smtpObj.sendmail(config_data["email_from"], config_data["notify_email"], msg.as_string().encode('utf-8'))
    smtpObj.quit()

def SendMailNews(config_data,url,word,message):
    msg = MIMEMultipart()
    msg['Subject'] = f"Найдено слово '{word}' в новости на сайте {url}"
    msg.add_header('Content-Type', 'text/html')
    if message.get("href")!=None:
     message.string="<a href='"+message["href"]+"'>"+message.string.replace(word,"<strong>"+word+"</strong>")+"</a>"
    else:
        message.string = message.string.replace(word,"<strong>" + word + "</strong>")
    msg.set_payload(message.string)

    smtpObj = smtplib.SMTP(config_data["smtp_server"], config_data["smtp_port"])
    smtpObj.starttls()
    smtpObj.login(config_data["email_login"], config_data["from_password"])
    smtpObj.sendmail(config_data["email_from"], config_data["notify_email"], msg.as_string().encode('utf-8'))
    smtpObj.quit()

Парсинг телеграм каналов

Задача: необходимо просматривать несколько новостных телеграм каналов, и в случае обнаружения в новости неких стоп-слов, высылать уведомление на электронную почту.

К сожалению воспользоваться для решения этой задачи API для работы с ботами не получится, т.к. такого функционала просто нет. Выходом может служить — воспользоваться одним из многочисленных клиентов Телеграм, реализованых на PHP, Python, JavaScript (NodeJS) и т.д. В моём случае — воспользуюсь python и библиотекой telethon. К ней довольно толковая документация, в том числе и на русском

Итак, для начала нужно зайти на ресурс https://my.telegram.org/apps и получить api_id и api_hash, для того чтобы библиотека смогла создать соединение. Далее создам файл с настройками вида:

{
  "chanels": [
    "rus_now_news","-1001237513492"
  ],
  "limit": 200,
  "alert_words": [
    "кусь","мейнкун","бабки","лапландия","рабочие","песель-акробат"
  ],
  "api_id": "12435245235",
  "api_hash": "екыпукерпенуркенрке",
  "notify_email": "екпукеп@укепукеуке.ru",
  "email_login": "уепукеп-куепукеп@кепукеп.ru",
  "email_from": "уекпукеп@кепукепук.ru",
  "from_password": "укацука!укауцка",
  "smtp_server": "уцкацука-owa.уцкацука.ru",
  "smtp_port":587
}

В нём перечисляем каналы которые мониторим и стоп слова, которые ловим. Алгоритм работы скрипта:

  • соединяемся с сервером телеграм
  • получаем список последних новостей канала
  • если ID новости уже смотрели, пропускаем его
  • если в тексте новости нашли стоп слово — отправляем уведомление на почту

А вот и сам скрипт:

#!/usr/bin/env python3
# encoding: utf-8
import asyncio
import json
import sys
import re
from telethon import TelegramClient
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from telethon.tl.functions.contacts import ResolveUsernameRequest
from telethon.tl.functions.channels import GetMessagesRequest
from telethon.tl.functions.messages import GetHistoryRequest, ReadHistoryRequest

with open('config.json', 'r') as file:
    config_data = json.load(file)
    print(config_data)
client = TelegramClient("parser_data", config_data["api_id"], config_data["api_hash"])
async def TConnect():
    await client.start()
async def ListChanels():
    async for dialog in client.iter_dialogs():
        print(dialog.name, 'has ID', dialog.id)

def save_chan_json(chan,chan_data):
    f = open(chan + '.json', "w+")
    json.dump(chan_data, f)
    f.close()

def get_chan_json(chan):
    # узнаём какое последнее сообщение прочитали на канале?
    chan_data = {}
    chan_data["las_id"] = 0
    try:
        with open(chan + '.json', 'r') as file:
            chan_data = json.load(file)
            print(chan_data)
            return chan_data
    except:
        save_chan_json(chan, chan_data)
        return chan_data
    return chan_data;

def SendMail(chan,word,message):
    msg = MIMEMultipart()
    msg['Subject'] = f"Найдено слово '{word}' в новости на канале {chan} в Телеграм"
    msg.add_header('Content-Type', 'text/html')
    message.text=message.text.replace(word,"<strong>"+word+"</strong>")
    dt_pub=message.date.strftime('%d-%m-%Y %H:%M:%S')
    msg.set_payload(f"Канал: <a href='https://t.me/{chan}'>https://t.me/{chan}</a>, опубликовано {dt_pub}<hr/>"+message.text)

    smtpObj = smtplib.SMTP(config_data["smtp_server"], config_data["smtp_port"])
    smtpObj.starttls()
    smtpObj.login(config_data["email_login"], config_data["from_password"])
    smtpObj.sendmail(config_data["email_from"], config_data["notify_email"], msg.as_string().encode('utf-8'))
    smtpObj.quit()
async def main():
    print("-start")
    await TConnect()
    #await ListChanels()
    for chan in config_data["chanels"]:
        chan_data=get_chan_json(chan)
        skeep_after = chan_data["las_id"]
        if "-" in chan:
          dp = await client.get_entity(int(chan))
        else:
          dp = await client.get_entity(chan)
        poz=0
        async for message in client.iter_messages(dp,limit=config_data["limit"]):
           if poz==0:
               chan_data["las_id"]=message.id
               save_chan_json(chan, chan_data)
           if skeep_after==message.id:
               print("Все новости уже прочитаны...")
               break
           print(f"-смотрим message_id:{message.id}")
           for word in config_data["alert_words"]:
               if word in message.text:
                   print(f"--нашли слово {word}")
                   SendMail(chan, word, message)
                   print(message)
           #print(message.id, message.text)
           poz=poz+1
    print("all done..");
if __name__ == '__main__':
    for param in sys.argv:
        if param == "--list_chanels":
            TConnect()
            ListChanels()
    with client:
        client.loop.run_until_complete(main())

Опрос клавиатуры при помощи curses

Опрос клавиатуры при помощи curses несколько запутан. Во первых есть способ получить одиночное нажатие кнопки при помощи:

res=stdscr.getkey()

В этом случае в res попадёт непосредственно сам символ в виде строки.

Второй способ — использование :

res=stdscr.getch()

В этом случае в res попадет число вида int — код нажатой кнопки. Однако оба способа не работают, когда нужно получить данные о нажатой специальной клавиши, типа стрелок, F1..F12 и т.д. Если необходимо их получить, то нужно сначала включить данную возможность:

stdscr.keypad(True)

Зачем так сделано, загадка. Почему нельзя получить эти данные сразу?

Так-же есть способ отключить ожидание нажатия кнопки, при помощи:

curses.cbreak()
Опрос клавиатуры при помощи curses

Итог: задача «Опрос клавиатуры при помощи curses» выполнена. Можете почитать и другие заметки посвященные Python. Официальная документация на curses здесь

Квест в консоли на Python. Часть 2

Итак, продолжаем продолжаем писать квест в консоли на языке Python. Первая часть описана здесь. В ней мы реализовали автоматическую загрузку и сохранение состояния прохождения квеста. Сейчас же займемся (начнем по крайне мере) отрисовкой локации, и реакцией на нажатые кнопки. В локации предусмотрим возможность отображения картинки из ASCII. Например json стартовой локации может выглядеть примерно так:

{
  "title": "Стартовая локация",
  "description": "Стартовая локация описание",
  "ascii_art": "                    __            ================================\n         ALCATRAZ  /__\\            ||     ||<(.)>||<(.)>||     || \n       ____________|  |            ||    _||     ||     ||_    || \n       |_|_|_|_|_|_|  |            ||   (__D     ||     C__)   || \n       |_|_|_|_|_|_|__|            ||   (__D     ||     C__)   ||\n      A@\\|_|_|_|_|_|/@@Aa          ||   (__D     ||     C__)   ||\n   aaA@@@@@@@@@@@@@@@@@@@aaaA      ||   (__D     ||     C__)   ||\n  A@@@@@@@@@@@DWB@@@@@@@@@@@@A     ||     ||     ||     ||  dwb||\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^  ================================",
  "available_locations": {
    "left": 1,
    "right": 2,
    "forward": 3,
    "back": 4
  }
}

Создадим класс TLocation, при инициалиизации будем передавать в него инициализированный класс player. В переменной класса data — будем хранить загруженную локацию.

class TLocation:
    data = {}
    player={}
    stdscr=curses.initscr()
    scr_size=stdscr.getmaxyx()
    def __init__(self,player):
        self.player=player;
    def load_location(self, location):
        f = open("locations/" + str(location) + ".json", mode='r', encoding='utf-8')
        self.data = json.load(f)
        f.close()

Далее нарисуем верхнее меню, где сообщаем игроку, на какие локации он может перемещаться, и что он держит в руках:

    def top_menu(self):
        loc="Идти: "
        if "left" in self.data["available_locations"]:
            loc=loc+"влево(4) "
        if "right" in self.data["available_locations"]:
            loc=loc+"вправо(6) "
        if "forward" in self.data["available_locations"]:
            loc=loc+"вперед(8) "
        if "back" in self.data["available_locations"]:
            loc=loc+"назад(2) "
        curses.start_color()
        curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK)
        self.stdscr.addstr(1, 1, loc,curses.color_pair(1))
        curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK)
        self.stdscr.addstr(1, 1, loc,curses.color_pair(1))
        # что в руках
        hands = "В руках: "
        self.stdscr.addstr(2, 1, "")
        for object in self.player.data["in_hands"]:
            hands=hands+"["+object+"]"
        if len(self.player.data["in_hands"])==0:
            hands = hands+"ничего нет"
        self.stdscr.addstr(2, 1, hands, curses.color_pair(1))

Ну и собственно основной код отрисовки локации, включающий бесконечный цикл ожидания нажатий клавиатуры. Предусматриваем переход на другую локацию, выход из игры и сохранение игры.

    def location_view(self, location):
        self.load_location(location)
        self.stdscr.clear()
        self.stdscr.border()
        self.top_menu()  # рисуем верхнее меню
        # название локации
        curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK)
        x = int((self.scr_size[1] - len(self.data["title"])) / 2)
        self.stdscr.addstr(2, x, self.data["title"],curses.color_pair(2))
        # рисуем рисунок ежели он есть
        y=3
        if "ascii_art" in self.data:
            mass_art=self.data["ascii_art"].split("\n")
            i=0
            while i<len(mass_art):
                x=int((self.scr_size[1]-len(mass_art[i]))/2)
                self.stdscr.addstr(y+i, x, mass_art[i])
                i+=1
            y = y + len(mass_art);
         # Выводим описательную часть
        curses.init_pair(3, curses.COLOR_CYAN, curses.COLOR_BLACK)
        self.stdscr.addstr(y, 1, self.data["description"],curses.color_pair(3))
        # Выводим нижнее меню
        curses.init_pair(4, curses.COLOR_RED, curses.COLOR_BLACK)
        self.stdscr.addstr(y + 1, 1, "Осмотреться вокруг [v] Применить что в руках [h]", curses.color_pair(4))
        self.stdscr.addstr(y + 2, 1, "Выйти из квеста [q] Сохранить состояние [r]", curses.color_pair(4))
        self.stdscr.refresh()
        while True:
            key=self.stdscr.getch()
            print(key)
            # реализация перехода с локации на локацию
            if key==52 and "left" in self.data["available_locations"]:
                self.player.data["location"]=self.data["available_locations"]["left"]
                self.location_view(self.player.data["location"])
            if key==54 and "right" in self.data["available_locations"]:
                self.player.data["location"]=self.data["available_locations"]["right"]
                self.location_view(self.player.data["location"])
            if key==56 and "forward" in self.data["available_locations"]:
                self.player.data["location"]=self.data["available_locations"]["forward"]
                self.location_view(self.player.data["location"])
            if key==50 and "back" in self.data["available_locations"]:
                self.player.data["location"]=self.data["available_locations"]["back"]
                self.location_view(self.player.data["location"])
            if key == 114:
                self.player.save()
                #self.message("Внимание!","Состояние прохождения завершено. \nФайл находится в папке /saves")
            if key==113:
                curses.reset_shell_mode()
                curses.endwin()
                exit(0)
        print(self.data)

В результате картинка (квест в консоли) на мониторе выглядит уже чуть симпатичнее:

 квест в консоли
1 2 3 13