Граббер всех файлов телеграм канала

В свете того, что телеграм начали замедлять, появились опасения, что потеряю в конце концов доступ к телеграм каналу, где собрано много всяких интересных файлов. В связи с чем было принято решение спарсить и скачать всё его содержимое. Написанный скрипт проходит по каждому сообщению канала и скачивает все файлы в папки соответствующие идентифактору сообщения (или идентификатору группы сообщений). В общем получилось что-то вроде:

#!/usr/bin/env python3
# encoding: utf-8
import sys
import os,fcntl
import funcs
from telethon import TelegramClient
import requests
import configparser

## Защита от дубля запуска скрипта
fp = open(os.path.realpath(__file__), 'r')
try:
    fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
    sys.exit(0)

config = configparser.ConfigParser()
config.read("settings.ini")

print(config["Telegram"]["telegram_api_id"])

async def SaveFile(dir_name,message):
     if os.path.isdir(os.path.dirname(os.path.abspath(__file__))+f"/files/{dir_name}")==False:
       os.mkdir(os.path.dirname(os.path.abspath(__file__))+f"/files/{dir_name}")
     with open(os.path.dirname(os.path.abspath(__file__))+f"/files/{dir_name}/message.txt", 'w') as file:
        file.write(message.message)
     print(f"Вложения: {message.media}")
     if hasattr(message.media,"document"):
       if message.media.document != False:
         file_name=message.media.document.attributes[0].file_name
         print(f"-вложенный файл {file_name}")
         print("--сохраняю..")
         if os.path.isfile(os.path.dirname(os.path.abspath(__file__))+f"/files/{dir_name}/{file_name}")==False:
           await client.download_media(message.media, file=os.path.dirname(os.path.abspath(__file__))+f"/files/{dir_name}/{file_name}")
           print("--ок..")
         else:
           print("--уже скачивали!")
     if hasattr(message.media,"video"):
       if message.media.video!=False:
          print(f"-вложенное видео")

async def tele_news_parse():
    dp = await client.get_entity(config["Telegram"]["stl_chanel"])
    poz=int(config["Telegram"]["current_message_id"])
    print(f"-start at {poz}")
    async for message in client.iter_messages(dp,limit=10,offset_id=poz):
        print("---------------------------------------------------------")
        if hasattr(message, "message"):
            if message.message!=None:
                print(f"Сообщение: {message}")
                if message.grouped_id:
                   print("-its album!")
                   target_group_id = message.grouped_id
                   search_ids = range(message.id - 20, message.id + 21)
                   posts = await client.get_messages(config["Telegram"]["stl_chanel"], ids=list(search_ids))
                   media_group = []
                   for post in posts:
                     if post is not None and post.grouped_id == target_group_id and post.media is not None:
                      media_group.append(post)
                   for message in media_group:
                      print(f"--из альбома: {message}")
                      await SaveFile(target_group_id,message)
                else:
                  print("-its one file!")
                  await SaveFile(message.id,message)


    with open('settings.ini', 'w') as configfile:
        poz+=10
        print(f"-new poz at {poz}")
        config["Telegram"]["current_message_id"]=str(poz)
        config.write(configfile)

if __name__ == '__main__':
    client = TelegramClient("parser_data", config["Telegram"]["telegram_api_id"], config["Telegram"]["telegram_api_hash"])
    client.start()
    client.loop.run_until_complete(tele_news_parse())

Блокировка повторного запуска скрипта на Python

Обычно для того чтобы исключить повторный запуск скрипта на python используют классический способ:

import os,fcntl
fp = open(os.path.realpath(__file__), 'r')
try:
    fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
    sys.exit(0)

И он хорошо срабатывает при периодическом запуске например из крона скрипта с какойто периодичностью, для проверки «а закончилась ли работа предыдущего запуска?». Но дело в том, что иногда пишут скрипт которые работает с разным функционалом в зависимости от параметров запуска. Тогда в этом случае вполне допустим запуск скрипта, но с другим параметром. Для того чтобы обойти этот момент, я стал использовать несколько другой способ. А именно: опрашиваю список запущенных в текущий момент процессов и смотрю параметры их запуска. Если нахожу совпадение, то выхожу. Если нет — позволяю скрипту работать дальше. Вышло примерно так:

import psutil
def AYouRun(script_param):
    pids=psutil.pids()
    cnt=0
    for pid in pids:
        p = psutil.Process(pid)
        res=p.cmdline()
        if script_param in res:
            cnt=cnt+1
    if cnt>1:
        print(f"--скрипт с параметром {script_param} уже запущен")
        exit(-1)
    return True
..
..
if __name__ == '__main__':
    for param in sys.argv:
        if param == "--telegram-news":
            if AYouRun(param):
                Insert2Log("Запущен мониторинг телеграм новостей",1)
                client = TelegramClient("parser_data", global_config["telegram_api_id"], global_config["telegram_api_hash"])
..
..

Парсинг новостей групп VK

В продолжение предыдущей статьи, появилась необходимость парсить так-же и новости в социальной сети vk с проверкой на наличие стоп-слов. Для этого воспользовался модулем vk на python. Так-же понадобится токен доступа полученный на https://vk.com/apps?act=manage

В итоге код получился примерно следующий:

#!/usr/bin/env python3
# encoding: utf-8
import vk
import json
import funcs

with open('config.json', 'r') as file:
    config_data = json.load(file)
    print(config_data)


api = vk.API(access_token=config_data["vk_api_token"],v='5.131')

for group in config_data["groups"]:
    chan_data = funcs.get_chan_json(group)
    skeep_after = chan_data["las_id"]
    wall_content = api.wall.get(domain=group, count=config_data["limit"])
    poz = 0
    for message in wall_content["items"]:
        #print(message)
        if poz == 0:
            chan_data["las_id"] = message["id"]
            funcs.save_chan_json(group, chan_data)
        if skeep_after == message["id"]:
            print("Все новости уже прочитаны...")
            break
        for word in config_data["alert_words"]:
          if word in message["text"]:
              print(f"--нашли слово {word}")
              funcs.SendMailVK(config_data,group, word, message)
              print(message)
        poz = poz + 1
print("all done..");

По сути код очень простой — получаем через API VK все последние новости из каждой группы. Если в тексте новости находим стоп-слово, то отправляем соответствующее письмо. Так-же использую дополнительный файл функций, которые далее использую во всех парсерах:

#!/usr/bin/env python3
#encoding: utf-8
import json
from datetime import datetime
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from telethon.tl.functions.contacts import ResolveUsernameRequest
from telethon.tl.functions.channels import GetMessagesRequest
from telethon.tl.functions.messages import GetHistoryRequest, ReadHistoryRequest

def save_chan_json(chan,chan_data):
    f = open('saves/'+chan + '.json', "w+")
    json.dump(chan_data, f)
    f.close()

def get_chan_json(chan):
    # узнаём какое последнее сообщение прочитали на канале?
    chan_data = {}
    chan_data["las_id"] = 0
    try:
        with open('saves/'+chan + '.json', 'r') as file:
            chan_data = json.load(file)
            print(chan_data)
            return chan_data
    except:
        save_chan_json(chan, chan_data)
        return chan_data
    return chan_data;


def SendMail(config_data,chan,word,message):
    msg = MIMEMultipart()
    msg['Subject'] = f"Найдено слово '{word}' в новости на канале {chan} в Телеграм"
    msg.add_header('Content-Type', 'text/html')
    message.text=message.text.replace(word,"<strong>"+word+"</strong>")
    dt_pub=message.date.strftime('%d-%m-%Y %H:%M:%S')
    msg.set_payload(f"Канал: <a href='https://vc.com/{chan}'>https://t.me/{chan}</a>, опубликовано {dt_pub}<hr/>"+message.text)

    smtpObj = smtplib.SMTP(config_data["smtp_server"], config_data["smtp_port"])
    smtpObj.starttls()
    smtpObj.login(config_data["email_login"], config_data["from_password"])
    smtpObj.sendmail(config_data["email_from"], config_data["notify_email"], msg.as_string().encode('utf-8'))
    smtpObj.quit()

def SendMailVK(config_data,chan,word,message):
    msg = MIMEMultipart()
    msg['Subject'] = f"Найдено слово '{word}' в новости на группе {chan} в VK"
    msg.add_header('Content-Type', 'text/html')
    message["text"]=message["text"].replace(word,"<strong>"+word+"</strong>")
    dt_pub=datetime.utcfromtimestamp(message["date"]).strftime('%d-%m-%Y %H:%M:%S')
    msg.set_payload(f"Группа: <a href='https://vc.com/{chan}'>https://vk.com/{chan}</a>, опубликовано {dt_pub}<hr/>"+message["text"])

    smtpObj = smtplib.SMTP(config_data["smtp_server"], config_data["smtp_port"])
    smtpObj.starttls()
    smtpObj.login(config_data["email_login"], config_data["from_password"])
    smtpObj.sendmail(config_data["email_from"], config_data["notify_email"], msg.as_string().encode('utf-8'))
    smtpObj.quit()

def SendMailNews(config_data,url,word,message):
    msg = MIMEMultipart()
    msg['Subject'] = f"Найдено слово '{word}' в новости на сайте {url}"
    msg.add_header('Content-Type', 'text/html')
    if message.get("href")!=None:
     message.string="<a href='"+message["href"]+"'>"+message.string.replace(word,"<strong>"+word+"</strong>")+"</a>"
    else:
        message.string = message.string.replace(word,"<strong>" + word + "</strong>")
    msg.set_payload(message.string)

    smtpObj = smtplib.SMTP(config_data["smtp_server"], config_data["smtp_port"])
    smtpObj.starttls()
    smtpObj.login(config_data["email_login"], config_data["from_password"])
    smtpObj.sendmail(config_data["email_from"], config_data["notify_email"], msg.as_string().encode('utf-8'))
    smtpObj.quit()

Парсинг телеграм каналов

Задача: необходимо просматривать несколько новостных телеграм каналов, и в случае обнаружения в новости неких стоп-слов, высылать уведомление на электронную почту.

К сожалению воспользоваться для решения этой задачи API для работы с ботами не получится, т.к. такого функционала просто нет. Выходом может служить — воспользоваться одним из многочисленных клиентов Телеграм, реализованых на PHP, Python, JavaScript (NodeJS) и т.д. В моём случае — воспользуюсь python и библиотекой telethon. К ней довольно толковая документация, в том числе и на русском

Итак, для начала нужно зайти на ресурс https://my.telegram.org/apps и получить api_id и api_hash, для того чтобы библиотека смогла создать соединение. Далее создам файл с настройками вида:

{
  "chanels": [
    "rus_now_news","-1001237513492"
  ],
  "limit": 200,
  "alert_words": [
    "кусь","мейнкун","бабки","лапландия","рабочие","песель-акробат"
  ],
  "api_id": "12435245235",
  "api_hash": "екыпукерпенуркенрке",
  "notify_email": "екпукеп@укепукеуке.ru",
  "email_login": "уепукеп-куепукеп@кепукеп.ru",
  "email_from": "уекпукеп@кепукепук.ru",
  "from_password": "укацука!укауцка",
  "smtp_server": "уцкацука-owa.уцкацука.ru",
  "smtp_port":587
}

В нём перечисляем каналы которые мониторим и стоп слова, которые ловим. Алгоритм работы скрипта:

  • соединяемся с сервером телеграм
  • получаем список последних новостей канала
  • если ID новости уже смотрели, пропускаем его
  • если в тексте новости нашли стоп слово — отправляем уведомление на почту

А вот и сам скрипт:

#!/usr/bin/env python3
# encoding: utf-8
import asyncio
import json
import sys
import re
from telethon import TelegramClient
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from telethon.tl.functions.contacts import ResolveUsernameRequest
from telethon.tl.functions.channels import GetMessagesRequest
from telethon.tl.functions.messages import GetHistoryRequest, ReadHistoryRequest

with open('config.json', 'r') as file:
    config_data = json.load(file)
    print(config_data)
client = TelegramClient("parser_data", config_data["api_id"], config_data["api_hash"])
async def TConnect():
    await client.start()
async def ListChanels():
    async for dialog in client.iter_dialogs():
        print(dialog.name, 'has ID', dialog.id)

def save_chan_json(chan,chan_data):
    f = open(chan + '.json', "w+")
    json.dump(chan_data, f)
    f.close()

def get_chan_json(chan):
    # узнаём какое последнее сообщение прочитали на канале?
    chan_data = {}
    chan_data["las_id"] = 0
    try:
        with open(chan + '.json', 'r') as file:
            chan_data = json.load(file)
            print(chan_data)
            return chan_data
    except:
        save_chan_json(chan, chan_data)
        return chan_data
    return chan_data;

def SendMail(chan,word,message):
    msg = MIMEMultipart()
    msg['Subject'] = f"Найдено слово '{word}' в новости на канале {chan} в Телеграм"
    msg.add_header('Content-Type', 'text/html')
    message.text=message.text.replace(word,"<strong>"+word+"</strong>")
    dt_pub=message.date.strftime('%d-%m-%Y %H:%M:%S')
    msg.set_payload(f"Канал: <a href='https://t.me/{chan}'>https://t.me/{chan}</a>, опубликовано {dt_pub}<hr/>"+message.text)

    smtpObj = smtplib.SMTP(config_data["smtp_server"], config_data["smtp_port"])
    smtpObj.starttls()
    smtpObj.login(config_data["email_login"], config_data["from_password"])
    smtpObj.sendmail(config_data["email_from"], config_data["notify_email"], msg.as_string().encode('utf-8'))
    smtpObj.quit()
async def main():
    print("-start")
    await TConnect()
    #await ListChanels()
    for chan in config_data["chanels"]:
        chan_data=get_chan_json(chan)
        skeep_after = chan_data["las_id"]
        if "-" in chan:
          dp = await client.get_entity(int(chan))
        else:
          dp = await client.get_entity(chan)
        poz=0
        async for message in client.iter_messages(dp,limit=config_data["limit"]):
           if poz==0:
               chan_data["las_id"]=message.id
               save_chan_json(chan, chan_data)
           if skeep_after==message.id:
               print("Все новости уже прочитаны...")
               break
           print(f"-смотрим message_id:{message.id}")
           for word in config_data["alert_words"]:
               if word in message.text:
                   print(f"--нашли слово {word}")
                   SendMail(chan, word, message)
                   print(message)
           #print(message.id, message.text)
           poz=poz+1
    print("all done..");
if __name__ == '__main__':
    for param in sys.argv:
        if param == "--list_chanels":
            TConnect()
            ListChanels()
    with client:
        client.loop.run_until_complete(main())

Опрос клавиатуры при помощи curses

Опрос клавиатуры при помощи curses несколько запутан. Во первых есть способ получить одиночное нажатие кнопки при помощи:

res=stdscr.getkey()

В этом случае в res попадёт непосредственно сам символ в виде строки.

Второй способ — использование :

res=stdscr.getch()

В этом случае в res попадет число вида int — код нажатой кнопки. Однако оба способа не работают, когда нужно получить данные о нажатой специальной клавиши, типа стрелок, F1..F12 и т.д. Если необходимо их получить, то нужно сначала включить данную возможность:

stdscr.keypad(True)

Зачем так сделано, загадка. Почему нельзя получить эти данные сразу?

Так-же есть способ отключить ожидание нажатия кнопки, при помощи:

curses.cbreak()
Опрос клавиатуры при помощи curses

Итог: задача «Опрос клавиатуры при помощи curses» выполнена. Можете почитать и другие заметки посвященные Python. Официальная документация на curses здесь

1 2 3 13