Установка туннеля с ГИС ЖКХ под Linux

Задача установить защищенный туннель с ГИС ЖКХ под Astra Linux.

Решение:

Сначала скопируем с токена сертификат в формате pfx. Для этого можно воспользоваться утилитой P12FromGostCSP (Windows). Под Linux тоже как-то можно, но как- нужно гуглить. Мне предоставили уже готовый файл.

Далее нужно установить крипто-про CSP с пакетом stunnel. Зайдя в крипто про, во вкладке «сертификаты» необходимо установить корневые сертификаты с https://my.dom.gosuslugi.ru/ и установить сертификат из файла psk.

Для настройки конфигурационного файла stunnel, нужно из файла формата pfx получить файлы key и crt

crt и pem:

openssl pkcs12 -in file.pfx -clcerts -nokeys -out public.crt
openssl x509 -in public.crt -out public.pem -outform PEM

key:

openssl pkcs12 -in SSK_obezl_3.pfx -nocerts -out private.key

Лично у меня на этом месте выскочила ошибка:

Enter Import Password:
Error outputting keys and certificates
124867336299712:error:06074079:digital envelope routines:EVP_PBE_CipherInit:unknown pbe algorithm:../crypto/evp/evp_pbe.c:95:TYPE=1.2.840.113549.1.12.1.80
124867336299712:error:23077073:PKCS12 routines:PKCS12_pbe_crypt:pkcs12 algor cipherinit error:../crypto/pkcs12/p12_decr.c:41:
124867336299712:error:2306A075:PKCS12 routines:PKCS12_item_decrypt_d2i:pkcs12 pbe crypt error:../crypto/pkcs12/p12_decr.c:94:

Которую я так и не победил, но тем не менее файл сформировался.

Далее необходимо настроить конфигурационный файл туннеля. У меня он получился вида:

pid=/home/vasya/stunnel/stunnel_cli.pid
output=/home/vasya/stunnel/t.log
socket = l:TCP_NODELAY=1
socket = r:TCP_NODELAY=1
debug = 7
[https]
client = yes
accept=localhost:8080
connect = api.dom.gosuslugi.ru:443
cert=/home/vasya/stunnel/public.pem
CAFile=/home/vasya/stunnel/CA-PPAK_2023.pem
key=/home/vasya/stunnel/private.key
verify=0

Файл CA-PPAK_2023.pem взял из архива «ГИС ЖКХ_Интеграция v.14.8.0.2.zip» с документацией скачанного с сайта https://my.dom.gosuslugi.ru/

И пробуем запустить туннель:

opt/cprocsp/sbin/amd64/stunnel_thread /home/vasya/stunnel/stunnel_run.conf

Логи будут писаться в файл t.log. Если всё хорошо, и в логах ошибок нет, то можно попробовать выполнить в браузере запрос вида:

http://127.0.0.1:8080/ext-bus-debt-service/services/DebtAsync

От ГИС ЖКХ придет что-то вроде:

<env:Envelope xmlns:ns6="http://dom.gosuslugi.ru/schema/integration/individual-registry-base/" xmlns:ns5="http://dom.gosuslugi.ru/schema/integration/account-base/" xmlns:ns8="http://dom.gosuslugi.ru/schema/integration/metering-device-base/" xmlns:ns7="http://dom.gosuslugi.ru/schema/integration/nsi-base/" xmlns:ns13="http://dom.gosuslugi.ru/schema/integration/debts/" xmlns:ns9="http://dom.gosuslugi.ru/schema/integration/organizations-registry-base/" xmlns:ns12="http://dom.gosuslugi.ru/schema/integration/bills-base/" xmlns:ns11="http://dom.gosuslugi.ru/schema/integration/payments-base/" xmlns:ns10="http://dom.gosuslugi.ru/schema/integration/organizations-base/" xmlns:env="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ns4="http://dom.gosuslugi.ru/schema/integration/base/" xmlns:ns3="http://www.w3.org/2000/09/xmldsig#">
   <env:Body>
      <env:Fault>
         <faultcode>env:Server</faultcode>
         <faultstring>AUT011000: Нет активной ИС с данным сертификатом</faultstring>
         <detail>
            <ns4:Fault>
               <ns4:ErrorCode>AUT011000</ns4:ErrorCode>
               <ns4:ErrorMessage>Нет активной ИС с данным сертификатом</ns4:ErrorMessage>
            </ns4:Fault>
         </detail>
      </env:Fault>
   </env:Body>
</env:Envelope>

Тут всё просто — открываем заявку на сайте ГИС ЖКХ и добавляем сертификат.

P,S. Отладку запросов далее можно делать в утилите soapui, Как? Тема отдельной будущей статьи, когда буду разбираться с запросами

nginx: отключить доступ к сайту по ip

Для того чтобы отключить доступ к сайту по url вида https://ip достаточно вверху конфигурации добавить строчки вида:

  listen 443 default_server;
  listen [::]:443 default_server;
  server_name _;
    
  ssl_certificate /etc/nginx/ssl/fullchain.pem;
  ssl_certificate_key /etc/nginx/ssl/cert.key;
        
    if ($host !~* ^(vasya.ru|www.vasya.ru)$ ) {
                return 444;
        }    
    return      444;
}

Вместо return 444, можно указать любую ошибку. Ну или сделать редирект на основной сайт, указав:

return 301 http://YOUR.DOMAIN;

Блокировка повторного запуска скрипта на Python

Обычно для того чтобы исключить повторный запуск скрипта на python используют классический способ:

import os,fcntl
fp = open(os.path.realpath(__file__), 'r')
try:
    fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
    sys.exit(0)

И он хорошо срабатывает при периодическом запуске например из крона скрипта с какойто периодичностью, для проверки «а закончилась ли работа предыдущего запуска?». Но дело в том, что иногда пишут скрипт которые работает с разным функционалом в зависимости от параметров запуска. Тогда в этом случае вполне допустим запуск скрипта, но с другим параметром. Для того чтобы обойти этот момент, я стал использовать несколько другой способ. А именно: опрашиваю список запущенных в текущий момент процессов и смотрю параметры их запуска. Если нахожу совпадение, то выхожу. Если нет — позволяю скрипту работать дальше. Вышло примерно так:

import psutil
def AYouRun(script_param):
    pids=psutil.pids()
    cnt=0
    for pid in pids:
        p = psutil.Process(pid)
        res=p.cmdline()
        if script_param in res:
            cnt=cnt+1
    if cnt>1:
        print(f"--скрипт с параметром {script_param} уже запущен")
        exit(-1)
    return True
..
..
if __name__ == '__main__':
    for param in sys.argv:
        if param == "--telegram-news":
            if AYouRun(param):
                Insert2Log("Запущен мониторинг телеграм новостей",1)
                client = TelegramClient("parser_data", global_config["telegram_api_id"], global_config["telegram_api_hash"])
..
..

Парсинг новостей групп VK

В продолжение предыдущей статьи, появилась необходимость парсить так-же и новости в социальной сети vk с проверкой на наличие стоп-слов. Для этого воспользовался модулем vk на python. Так-же понадобится токен доступа полученный на https://vk.com/apps?act=manage

В итоге код получился примерно следующий:

#!/usr/bin/env python3
# encoding: utf-8
import vk
import json
import funcs

with open('config.json', 'r') as file:
    config_data = json.load(file)
    print(config_data)


api = vk.API(access_token=config_data["vk_api_token"],v='5.131')

for group in config_data["groups"]:
    chan_data = funcs.get_chan_json(group)
    skeep_after = chan_data["las_id"]
    wall_content = api.wall.get(domain=group, count=config_data["limit"])
    poz = 0
    for message in wall_content["items"]:
        #print(message)
        if poz == 0:
            chan_data["las_id"] = message["id"]
            funcs.save_chan_json(group, chan_data)
        if skeep_after == message["id"]:
            print("Все новости уже прочитаны...")
            break
        for word in config_data["alert_words"]:
          if word in message["text"]:
              print(f"--нашли слово {word}")
              funcs.SendMailVK(config_data,group, word, message)
              print(message)
        poz = poz + 1
print("all done..");

По сути код очень простой — получаем через API VK все последние новости из каждой группы. Если в тексте новости находим стоп-слово, то отправляем соответствующее письмо. Так-же использую дополнительный файл функций, которые далее использую во всех парсерах:

#!/usr/bin/env python3
#encoding: utf-8
import json
from datetime import datetime
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from telethon.tl.functions.contacts import ResolveUsernameRequest
from telethon.tl.functions.channels import GetMessagesRequest
from telethon.tl.functions.messages import GetHistoryRequest, ReadHistoryRequest

def save_chan_json(chan,chan_data):
    f = open('saves/'+chan + '.json', "w+")
    json.dump(chan_data, f)
    f.close()

def get_chan_json(chan):
    # узнаём какое последнее сообщение прочитали на канале?
    chan_data = {}
    chan_data["las_id"] = 0
    try:
        with open('saves/'+chan + '.json', 'r') as file:
            chan_data = json.load(file)
            print(chan_data)
            return chan_data
    except:
        save_chan_json(chan, chan_data)
        return chan_data
    return chan_data;


def SendMail(config_data,chan,word,message):
    msg = MIMEMultipart()
    msg['Subject'] = f"Найдено слово '{word}' в новости на канале {chan} в Телеграм"
    msg.add_header('Content-Type', 'text/html')
    message.text=message.text.replace(word,"<strong>"+word+"</strong>")
    dt_pub=message.date.strftime('%d-%m-%Y %H:%M:%S')
    msg.set_payload(f"Канал: <a href='https://vc.com/{chan}'>https://t.me/{chan}</a>, опубликовано {dt_pub}<hr/>"+message.text)

    smtpObj = smtplib.SMTP(config_data["smtp_server"], config_data["smtp_port"])
    smtpObj.starttls()
    smtpObj.login(config_data["email_login"], config_data["from_password"])
    smtpObj.sendmail(config_data["email_from"], config_data["notify_email"], msg.as_string().encode('utf-8'))
    smtpObj.quit()

def SendMailVK(config_data,chan,word,message):
    msg = MIMEMultipart()
    msg['Subject'] = f"Найдено слово '{word}' в новости на группе {chan} в VK"
    msg.add_header('Content-Type', 'text/html')
    message["text"]=message["text"].replace(word,"<strong>"+word+"</strong>")
    dt_pub=datetime.utcfromtimestamp(message["date"]).strftime('%d-%m-%Y %H:%M:%S')
    msg.set_payload(f"Группа: <a href='https://vc.com/{chan}'>https://vk.com/{chan}</a>, опубликовано {dt_pub}<hr/>"+message["text"])

    smtpObj = smtplib.SMTP(config_data["smtp_server"], config_data["smtp_port"])
    smtpObj.starttls()
    smtpObj.login(config_data["email_login"], config_data["from_password"])
    smtpObj.sendmail(config_data["email_from"], config_data["notify_email"], msg.as_string().encode('utf-8'))
    smtpObj.quit()

def SendMailNews(config_data,url,word,message):
    msg = MIMEMultipart()
    msg['Subject'] = f"Найдено слово '{word}' в новости на сайте {url}"
    msg.add_header('Content-Type', 'text/html')
    if message.get("href")!=None:
     message.string="<a href='"+message["href"]+"'>"+message.string.replace(word,"<strong>"+word+"</strong>")+"</a>"
    else:
        message.string = message.string.replace(word,"<strong>" + word + "</strong>")
    msg.set_payload(message.string)

    smtpObj = smtplib.SMTP(config_data["smtp_server"], config_data["smtp_port"])
    smtpObj.starttls()
    smtpObj.login(config_data["email_login"], config_data["from_password"])
    smtpObj.sendmail(config_data["email_from"], config_data["notify_email"], msg.as_string().encode('utf-8'))
    smtpObj.quit()

Парсинг телеграм каналов

Задача: необходимо просматривать несколько новостных телеграм каналов, и в случае обнаружения в новости неких стоп-слов, высылать уведомление на электронную почту.

К сожалению воспользоваться для решения этой задачи API для работы с ботами не получится, т.к. такого функционала просто нет. Выходом может служить — воспользоваться одним из многочисленных клиентов Телеграм, реализованых на PHP, Python, JavaScript (NodeJS) и т.д. В моём случае — воспользуюсь python и библиотекой telethon. К ней довольно толковая документация, в том числе и на русском

Итак, для начала нужно зайти на ресурс https://my.telegram.org/apps и получить api_id и api_hash, для того чтобы библиотека смогла создать соединение. Далее создам файл с настройками вида:

{
  "chanels": [
    "rus_now_news","-1001237513492"
  ],
  "limit": 200,
  "alert_words": [
    "кусь","мейнкун","бабки","лапландия","рабочие","песель-акробат"
  ],
  "api_id": "12435245235",
  "api_hash": "екыпукерпенуркенрке",
  "notify_email": "екпукеп@укепукеуке.ru",
  "email_login": "уепукеп-куепукеп@кепукеп.ru",
  "email_from": "уекпукеп@кепукепук.ru",
  "from_password": "укацука!укауцка",
  "smtp_server": "уцкацука-owa.уцкацука.ru",
  "smtp_port":587
}

В нём перечисляем каналы которые мониторим и стоп слова, которые ловим. Алгоритм работы скрипта:

  • соединяемся с сервером телеграм
  • получаем список последних новостей канала
  • если ID новости уже смотрели, пропускаем его
  • если в тексте новости нашли стоп слово — отправляем уведомление на почту

А вот и сам скрипт:

#!/usr/bin/env python3
# encoding: utf-8
import asyncio
import json
import sys
import re
from telethon import TelegramClient
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from telethon.tl.functions.contacts import ResolveUsernameRequest
from telethon.tl.functions.channels import GetMessagesRequest
from telethon.tl.functions.messages import GetHistoryRequest, ReadHistoryRequest

with open('config.json', 'r') as file:
    config_data = json.load(file)
    print(config_data)
client = TelegramClient("parser_data", config_data["api_id"], config_data["api_hash"])
async def TConnect():
    await client.start()
async def ListChanels():
    async for dialog in client.iter_dialogs():
        print(dialog.name, 'has ID', dialog.id)

def save_chan_json(chan,chan_data):
    f = open(chan + '.json', "w+")
    json.dump(chan_data, f)
    f.close()

def get_chan_json(chan):
    # узнаём какое последнее сообщение прочитали на канале?
    chan_data = {}
    chan_data["las_id"] = 0
    try:
        with open(chan + '.json', 'r') as file:
            chan_data = json.load(file)
            print(chan_data)
            return chan_data
    except:
        save_chan_json(chan, chan_data)
        return chan_data
    return chan_data;

def SendMail(chan,word,message):
    msg = MIMEMultipart()
    msg['Subject'] = f"Найдено слово '{word}' в новости на канале {chan} в Телеграм"
    msg.add_header('Content-Type', 'text/html')
    message.text=message.text.replace(word,"<strong>"+word+"</strong>")
    dt_pub=message.date.strftime('%d-%m-%Y %H:%M:%S')
    msg.set_payload(f"Канал: <a href='https://t.me/{chan}'>https://t.me/{chan}</a>, опубликовано {dt_pub}<hr/>"+message.text)

    smtpObj = smtplib.SMTP(config_data["smtp_server"], config_data["smtp_port"])
    smtpObj.starttls()
    smtpObj.login(config_data["email_login"], config_data["from_password"])
    smtpObj.sendmail(config_data["email_from"], config_data["notify_email"], msg.as_string().encode('utf-8'))
    smtpObj.quit()
async def main():
    print("-start")
    await TConnect()
    #await ListChanels()
    for chan in config_data["chanels"]:
        chan_data=get_chan_json(chan)
        skeep_after = chan_data["las_id"]
        if "-" in chan:
          dp = await client.get_entity(int(chan))
        else:
          dp = await client.get_entity(chan)
        poz=0
        async for message in client.iter_messages(dp,limit=config_data["limit"]):
           if poz==0:
               chan_data["las_id"]=message.id
               save_chan_json(chan, chan_data)
           if skeep_after==message.id:
               print("Все новости уже прочитаны...")
               break
           print(f"-смотрим message_id:{message.id}")
           for word in config_data["alert_words"]:
               if word in message.text:
                   print(f"--нашли слово {word}")
                   SendMail(chan, word, message)
                   print(message)
           #print(message.id, message.text)
           poz=poz+1
    print("all done..");
if __name__ == '__main__':
    for param in sys.argv:
        if param == "--list_chanels":
            TConnect()
            ListChanels()
    with client:
        client.loop.run_until_complete(main())

1 13 14 15 16 17 301