Перейти к содержанию

Инструмент 7: "Радист" (WebSockets)

Стек: aiohttp (лучший выбор, так как часто нужно делать и HTTP, и WS запросы в одном приложении) или websockets. Суть: Мы не спрашиваем "Как дела?". Мы подключаемся к "трубе" и слушаем поток данных. Главная сложность: Соединение рвется. Сервер перезагружается, интернет моргает. Твой скрипт должен уметь переподключаться (Reconnect) автоматически и бесконечно.

Где применять: Биржи (Binance, Bybit), букмекерские конторы (Live-ставки), онлайн-игры, чаты.

import asyncio
import aiohttp
import json
import logging
from typing import Dict

# Настраиваем логи, чтобы видеть разрывы связи
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("WSRadio")

# --- 1. CONFIG ---
# Пример публичного вебсокета Binance (Торговые сделки по BTC)
WS_URL = "wss://stream.binance.com:9443/ws/btcusdt@trade"

# Некоторые серверы требуют отправить JSON, чтобы подписаться на данные
# Binance позволяет через URL, но другие требуют 'subscribe' message.
SUBSCRIPTION_PAYLOAD = {
    "method": "SUBSCRIBE",
    "params": ["btcusdt@trade"],
    "id": 1
}

# --- 2. ARCHITECTURE CLASS ---
class WebSocketMonitor:
    def __init__(self, url: str):
        self.url = url
        self.session = None
        self.ws = None
        self.keep_running = True

    async def connect(self):
        """Главный цикл жизни соединения"""
        self.session = aiohttp.ClientSession()

        while self.keep_running:
            try:
                logger.info(f"🔌 Подключение к {self.url}...")

                async with self.session.ws_connect(self.url) as ws:
                    self.ws = ws
                    logger.info("✅ Соединение установлено!")

                    # 1. Если нужно подписаться - шлем сообщение сразу после входа
                    # await ws.send_json(SUBSCRIPTION_PAYLOAD)

                    # 2. Слушаем эфир (Бесконечный цикл, пока сокет открыт)
                    async for msg in ws:
                        if msg.type == aiohttp.WSMsgType.TEXT:
                            await self.process_message(msg.data)
                        elif msg.type == aiohttp.WSMsgType.ERROR:
                            logger.error("💥 Ошибка сокета!")
                            break

                    logger.warning("⚠️ Сервер закрыл соединение.")

            except aiohttp.ClientError as e:
                logger.error(f"💀 Ошибка сети: {e}")
            except Exception as e:
                logger.error(f"💀 Неизвестная ошибка: {e}")

            # --- RECONNECT LOGIC ---
            # Если мы вылетели из цикла async for, значит соединение разорвано.
            # Ждем и пробуем снова.
            logger.info("⏳ Реконнект через 5 секунд...")
            await asyncio.sleep(5)

    async def process_message(self, raw_data: str):
        """Обработка входящего сообщения (Бизнес-логика)"""
        try:
            data = json.loads(raw_data)

            # Фильтруем пинги/понги или служебные сообщения
            if "e" not in data:
                return # Это не сделка, пропускаем

            # Пример парсинга сделки Binance
            trade = {
                "price": float(data['p']),
                "qty": float(data['q']),
                "time": data['T']
            }

            # ТУТ ПИШЕМ В БД ИЛИ ОЧЕРЕДЬ
            print(f"💰 Сделка: ${trade['price']} (Объем: {trade['qty']})")

        except json.JSONDecodeError:
            pass # Бывает приходит мусор

    async def stop(self):
        """Аккуратное завершение"""
        self.keep_running = False
        if self.ws:
            await self.ws.close()
        if self.session:
            await self.session.close()
        logger.info("🛑 Монитор остановлен.")

# --- 3. ENTRY POINT ---
async def main():
    monitor = WebSocketMonitor(WS_URL)

    # Запускаем монитор в фоне
    task = asyncio.create_task(monitor.connect())

    try:
        # Имитируем работу приложения (пусть работает час)
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        pass
    finally:
        await monitor.stop()
        await task

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        # Чтобы Ctrl+C не выплевывал страшные трейсбеки
        pass

Куда смотреть:

  1. while self.keep_running: Это бессмертие твоего скрипта. Если интернет пропадет на час, скрипт будет долбиться каждые 5 секунд, пока связь не вернется.
  2. async for msg in ws: Это самый эффективный способ чтения. Он не блокирует поток, пока сообщений нет.
  3. SUBSCRIPTION_PAYLOAD: Часто вебсокетам нужно сказать "Привет, я хочу данные по матчу ID 123". Это делается через ws.send_json() сразу после коннекта.
  4. Heartbeat (Пинг): aiohttp обычно сам обрабатывает пинги (Ping/Pong frames). Но некоторые API требуют слать { "type": "ping" } вручную раз в 30 секунд. Если соединение рвется ровно через минуту — добавь параллельную задачу (asyncio.create_task), которая шлет пинги.