Используем нейросеть локально

Задача: установить нейросеть на свой ПК, и пообщаться с ней посредством python. Маленькая задачка, часть более глобальной далее (в следующих частях) — дообучить на своих данных, и сделать скрипт отвечающий на вопросы пользователей.

Статья подготовлена на основе следующих источников:

  • https://habr.com/ru/companies/bothub/articles/1019314/

Решение:

Сначала установим оболочку для нейросетей. Например я остановился (в данный момент) на https://gpt4all.io. Там есть инсталляторы для разных ОС. Я пока остановился для Windows. После установки нужно выбрать модель для скачивания, в зависимости от того какое у вас «железо». В моем случае, т.к. памяти достаточно (64ггб оперативы + 32ггб RTX 3060), я выбрал DeepSeek-R1-Distill-Qwen-14B

После загрузки модели, попробовал создать чат, и в принципе модель бодренько стала мне отвечать. Правда во время генерации ответов все ресурсы ушли в «полочку». Но ответы вполне осмысленные:

Далее попытался написать скрипт на python, который непосредственно может цепляться к установленной модели. Для этого необходимо в настройка gpt4all включить соответствующую опцию:

После этого становится доступен api через обычный http. Точки вызовов следующие:

  • http://localhost:4891/v1/models — получить список установленных моделей
  • http://localhost:4891/v1/models/ — получить настройки выбранной модели
  • http://localhost:4891/v1/completions — генерация текста
  • http://localhost:4891/v1/chat/completions — общение в режиме чата

Небольшая сноска. Наш любимый РКН, 21.03.2026 заблокировал часть адресов, среди которых под раздачу попала и установка пакетов через pip install. Я сиё обошел установкой VPN Amnezia. Далее упоминать об этом не буду, и считаю что способ установки пакетов у вас есть.

Ну а дальше простой скрипт:

import requests

# URL, куда отправляем запрос (если вы установили GPT-4All локально)
url = "http://localhost:4891/v1/chat/completions"

headers = {
    "Content-Type": "application/json",
}

data = {
    "model": "DeepSeek-R1-Distill-Qwen-14B",  # Используйте модель, которую поддерживает GPT-4All
    "messages": [{"role": "user", "content": "Сколько будет 2+2?"}],
    "max_tokens": 500,
    "temperature": 0.28
}

response = requests.post(url, headers=headers, json=data)

if response.status_code == 200:
    print(response.json())
else:
    print(response)
    print(f"Ошибка: {response.status_code}")

Результат:

Хм, пользователь спросил «Сколько будет 2+2?» на русском языке. Нужно ответить правильно и понятно.
Проверю: 2 плюс 2 равно 4.

Просто скрипт для организации нагрузочного тестирования сайта

Задача: по-быстрому протестировать на скольки соединениях сайт «ляжет».

Решение: напишем скрипт на python, который в многопоточном режиме будет дергать определенную страницу.

#!/usr/bin/env python3
# encoding: utf-8
import sys
import requests
import threading
import time

BASE_URL = "https://щшощшукащшук.ru"
shreads=100
poz=0
def fetch_post(pz):
   cnt=threading.active_count();
   print(f"- задача {poz}, потоков {cnt}")
   data_to_post = {"erfe	": "-99","erfew":"werferwfer"}
   response = requests.post(f"{BASE_URL}", json=data_to_post)
   print(response.status_code)
   print(response.text)
   return 0

while True:
    if threading.active_count()<shreads:
        poz=poz+1
        t = threading.Thread(target=fetch_post, args=(poz,))
        t.start()

Граббер всех файлов телеграм канала

В свете того, что телеграм начали замедлять, появились опасения, что потеряю в конце концов доступ к телеграм каналу, где собрано много всяких интересных файлов. В связи с чем было принято решение спарсить и скачать всё его содержимое. Написанный скрипт проходит по каждому сообщению канала и скачивает все файлы в папки соответствующие идентифактору сообщения (или идентификатору группы сообщений). В общем получилось что-то вроде:

#!/usr/bin/env python3
# encoding: utf-8
import sys
import os,fcntl
import funcs
from telethon import TelegramClient
import requests
import configparser

## Защита от дубля запуска скрипта
fp = open(os.path.realpath(__file__), 'r')
try:
    fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
    sys.exit(0)

config = configparser.ConfigParser()
config.read("settings.ini")

print(config["Telegram"]["telegram_api_id"])

async def SaveFile(dir_name,message):
     if os.path.isdir(os.path.dirname(os.path.abspath(__file__))+f"/files/{dir_name}")==False:
       os.mkdir(os.path.dirname(os.path.abspath(__file__))+f"/files/{dir_name}")
     with open(os.path.dirname(os.path.abspath(__file__))+f"/files/{dir_name}/message.txt", 'w') as file:
        file.write(message.message)
     print(f"Вложения: {message.media}")
     if hasattr(message.media,"document"):
       if message.media.document != False:
         file_name=message.media.document.attributes[0].file_name
         print(f"-вложенный файл {file_name}")
         print("--сохраняю..")
         if os.path.isfile(os.path.dirname(os.path.abspath(__file__))+f"/files/{dir_name}/{file_name}")==False:
           await client.download_media(message.media, file=os.path.dirname(os.path.abspath(__file__))+f"/files/{dir_name}/{file_name}")
           print("--ок..")
         else:
           print("--уже скачивали!")
     if hasattr(message.media,"video"):
       if message.media.video!=False:
          print(f"-вложенное видео")

async def tele_news_parse():
    dp = await client.get_entity(config["Telegram"]["stl_chanel"])
    poz=int(config["Telegram"]["current_message_id"])
    print(f"-start at {poz}")
    async for message in client.iter_messages(dp,limit=10,offset_id=poz):
        print("---------------------------------------------------------")
        if hasattr(message, "message"):
            if message.message!=None:
                print(f"Сообщение: {message}")
                if message.grouped_id:
                   print("-its album!")
                   target_group_id = message.grouped_id
                   search_ids = range(message.id - 20, message.id + 21)
                   posts = await client.get_messages(config["Telegram"]["stl_chanel"], ids=list(search_ids))
                   media_group = []
                   for post in posts:
                     if post is not None and post.grouped_id == target_group_id and post.media is not None:
                      media_group.append(post)
                   for message in media_group:
                      print(f"--из альбома: {message}")
                      await SaveFile(target_group_id,message)
                else:
                  print("-its one file!")
                  await SaveFile(message.id,message)


    with open('settings.ini', 'w') as configfile:
        poz+=10
        print(f"-new poz at {poz}")
        config["Telegram"]["current_message_id"]=str(poz)
        config.write(configfile)

if __name__ == '__main__':
    client = TelegramClient("parser_data", config["Telegram"]["telegram_api_id"], config["Telegram"]["telegram_api_hash"])
    client.start()
    client.loop.run_until_complete(tele_news_parse())

Блокировка повторного запуска скрипта на Python

Обычно для того чтобы исключить повторный запуск скрипта на python используют классический способ:

import os,fcntl
fp = open(os.path.realpath(__file__), 'r')
try:
    fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
    sys.exit(0)

И он хорошо срабатывает при периодическом запуске например из крона скрипта с какойто периодичностью, для проверки «а закончилась ли работа предыдущего запуска?». Но дело в том, что иногда пишут скрипт которые работает с разным функционалом в зависимости от параметров запуска. Тогда в этом случае вполне допустим запуск скрипта, но с другим параметром. Для того чтобы обойти этот момент, я стал использовать несколько другой способ. А именно: опрашиваю список запущенных в текущий момент процессов и смотрю параметры их запуска. Если нахожу совпадение, то выхожу. Если нет — позволяю скрипту работать дальше. Вышло примерно так:

import psutil
def AYouRun(script_param):
    pids=psutil.pids()
    cnt=0
    for pid in pids:
        p = psutil.Process(pid)
        res=p.cmdline()
        if script_param in res:
            cnt=cnt+1
    if cnt>1:
        print(f"--скрипт с параметром {script_param} уже запущен")
        exit(-1)
    return True
..
..
if __name__ == '__main__':
    for param in sys.argv:
        if param == "--telegram-news":
            if AYouRun(param):
                Insert2Log("Запущен мониторинг телеграм новостей",1)
                client = TelegramClient("parser_data", global_config["telegram_api_id"], global_config["telegram_api_hash"])
..
..

Парсинг новостей групп VK

В продолжение предыдущей статьи, появилась необходимость парсить так-же и новости в социальной сети vk с проверкой на наличие стоп-слов. Для этого воспользовался модулем vk на python. Так-же понадобится токен доступа полученный на https://vk.com/apps?act=manage

В итоге код получился примерно следующий:

#!/usr/bin/env python3
# encoding: utf-8
import vk
import json
import funcs

with open('config.json', 'r') as file:
    config_data = json.load(file)
    print(config_data)


api = vk.API(access_token=config_data["vk_api_token"],v='5.131')

for group in config_data["groups"]:
    chan_data = funcs.get_chan_json(group)
    skeep_after = chan_data["las_id"]
    wall_content = api.wall.get(domain=group, count=config_data["limit"])
    poz = 0
    for message in wall_content["items"]:
        #print(message)
        if poz == 0:
            chan_data["las_id"] = message["id"]
            funcs.save_chan_json(group, chan_data)
        if skeep_after == message["id"]:
            print("Все новости уже прочитаны...")
            break
        for word in config_data["alert_words"]:
          if word in message["text"]:
              print(f"--нашли слово {word}")
              funcs.SendMailVK(config_data,group, word, message)
              print(message)
        poz = poz + 1
print("all done..");

По сути код очень простой — получаем через API VK все последние новости из каждой группы. Если в тексте новости находим стоп-слово, то отправляем соответствующее письмо. Так-же использую дополнительный файл функций, которые далее использую во всех парсерах:

#!/usr/bin/env python3
#encoding: utf-8
import json
from datetime import datetime
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from telethon.tl.functions.contacts import ResolveUsernameRequest
from telethon.tl.functions.channels import GetMessagesRequest
from telethon.tl.functions.messages import GetHistoryRequest, ReadHistoryRequest

def save_chan_json(chan,chan_data):
    f = open('saves/'+chan + '.json', "w+")
    json.dump(chan_data, f)
    f.close()

def get_chan_json(chan):
    # узнаём какое последнее сообщение прочитали на канале?
    chan_data = {}
    chan_data["las_id"] = 0
    try:
        with open('saves/'+chan + '.json', 'r') as file:
            chan_data = json.load(file)
            print(chan_data)
            return chan_data
    except:
        save_chan_json(chan, chan_data)
        return chan_data
    return chan_data;


def SendMail(config_data,chan,word,message):
    msg = MIMEMultipart()
    msg['Subject'] = f"Найдено слово '{word}' в новости на канале {chan} в Телеграм"
    msg.add_header('Content-Type', 'text/html')
    message.text=message.text.replace(word,"<strong>"+word+"</strong>")
    dt_pub=message.date.strftime('%d-%m-%Y %H:%M:%S')
    msg.set_payload(f"Канал: <a href='https://vc.com/{chan}'>https://t.me/{chan}</a>, опубликовано {dt_pub}<hr/>"+message.text)

    smtpObj = smtplib.SMTP(config_data["smtp_server"], config_data["smtp_port"])
    smtpObj.starttls()
    smtpObj.login(config_data["email_login"], config_data["from_password"])
    smtpObj.sendmail(config_data["email_from"], config_data["notify_email"], msg.as_string().encode('utf-8'))
    smtpObj.quit()

def SendMailVK(config_data,chan,word,message):
    msg = MIMEMultipart()
    msg['Subject'] = f"Найдено слово '{word}' в новости на группе {chan} в VK"
    msg.add_header('Content-Type', 'text/html')
    message["text"]=message["text"].replace(word,"<strong>"+word+"</strong>")
    dt_pub=datetime.utcfromtimestamp(message["date"]).strftime('%d-%m-%Y %H:%M:%S')
    msg.set_payload(f"Группа: <a href='https://vc.com/{chan}'>https://vk.com/{chan}</a>, опубликовано {dt_pub}<hr/>"+message["text"])

    smtpObj = smtplib.SMTP(config_data["smtp_server"], config_data["smtp_port"])
    smtpObj.starttls()
    smtpObj.login(config_data["email_login"], config_data["from_password"])
    smtpObj.sendmail(config_data["email_from"], config_data["notify_email"], msg.as_string().encode('utf-8'))
    smtpObj.quit()

def SendMailNews(config_data,url,word,message):
    msg = MIMEMultipart()
    msg['Subject'] = f"Найдено слово '{word}' в новости на сайте {url}"
    msg.add_header('Content-Type', 'text/html')
    if message.get("href")!=None:
     message.string="<a href='"+message["href"]+"'>"+message.string.replace(word,"<strong>"+word+"</strong>")+"</a>"
    else:
        message.string = message.string.replace(word,"<strong>" + word + "</strong>")
    msg.set_payload(message.string)

    smtpObj = smtplib.SMTP(config_data["smtp_server"], config_data["smtp_port"])
    smtpObj.starttls()
    smtpObj.login(config_data["email_login"], config_data["from_password"])
    smtpObj.sendmail(config_data["email_from"], config_data["notify_email"], msg.as_string().encode('utf-8'))
    smtpObj.quit()

1 2 3 14