Инструмент 7: "Радист" (WebSockets)
Стек: aiohttp (лучший выбор, так как часто нужно делать и HTTP, и WS запросы в одном приложении) или websockets.
Суть: Мы не спрашиваем "Как дела?". Мы подключаемся к "трубе" и слушаем поток данных.
Главная сложность: Соединение рвется. Сервер перезагружается, интернет моргает. Твой скрипт должен уметь переподключаться (Reconnect) автоматически и бесконечно.
Где применять: Биржи (Binance, Bybit), букмекерские конторы (Live-ставки), онлайн-игры, чаты.
import asyncio
import aiohttp
import json
import logging
from typing import Dict
# Настраиваем логи, чтобы видеть разрывы связи
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("WSRadio")
# --- 1. CONFIG ---
# Пример публичного вебсокета Binance (Торговые сделки по BTC)
WS_URL = "wss://stream.binance.com:9443/ws/btcusdt@trade"
# Некоторые серверы требуют отправить JSON, чтобы подписаться на данные
# Binance позволяет через URL, но другие требуют 'subscribe' message.
SUBSCRIPTION_PAYLOAD = {
"method": "SUBSCRIBE",
"params": ["btcusdt@trade"],
"id": 1
}
# --- 2. ARCHITECTURE CLASS ---
class WebSocketMonitor:
def __init__(self, url: str):
self.url = url
self.session = None
self.ws = None
self.keep_running = True
async def connect(self):
"""Главный цикл жизни соединения"""
self.session = aiohttp.ClientSession()
while self.keep_running:
try:
logger.info(f"🔌 Подключение к {self.url}...")
async with self.session.ws_connect(self.url) as ws:
self.ws = ws
logger.info("✅ Соединение установлено!")
# 1. Если нужно подписаться - шлем сообщение сразу после входа
# await ws.send_json(SUBSCRIPTION_PAYLOAD)
# 2. Слушаем эфир (Бесконечный цикл, пока сокет открыт)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
await self.process_message(msg.data)
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.error("💥 Ошибка сокета!")
break
logger.warning("⚠️ Сервер закрыл соединение.")
except aiohttp.ClientError as e:
logger.error(f"💀 Ошибка сети: {e}")
except Exception as e:
logger.error(f"💀 Неизвестная ошибка: {e}")
# --- RECONNECT LOGIC ---
# Если мы вылетели из цикла async for, значит соединение разорвано.
# Ждем и пробуем снова.
logger.info("⏳ Реконнект через 5 секунд...")
await asyncio.sleep(5)
async def process_message(self, raw_data: str):
"""Обработка входящего сообщения (Бизнес-логика)"""
try:
data = json.loads(raw_data)
# Фильтруем пинги/понги или служебные сообщения
if "e" not in data:
return # Это не сделка, пропускаем
# Пример парсинга сделки Binance
trade = {
"price": float(data['p']),
"qty": float(data['q']),
"time": data['T']
}
# ТУТ ПИШЕМ В БД ИЛИ ОЧЕРЕДЬ
print(f"💰 Сделка: ${trade['price']} (Объем: {trade['qty']})")
except json.JSONDecodeError:
pass # Бывает приходит мусор
async def stop(self):
"""Аккуратное завершение"""
self.keep_running = False
if self.ws:
await self.ws.close()
if self.session:
await self.session.close()
logger.info("🛑 Монитор остановлен.")
# --- 3. ENTRY POINT ---
async def main():
monitor = WebSocketMonitor(WS_URL)
# Запускаем монитор в фоне
task = asyncio.create_task(monitor.connect())
try:
# Имитируем работу приложения (пусть работает час)
await asyncio.sleep(3600)
except asyncio.CancelledError:
pass
finally:
await monitor.stop()
await task
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
# Чтобы Ctrl+C не выплевывал страшные трейсбеки
pass
Куда смотреть:
while self.keep_running: Это бессмертие твоего скрипта. Если интернет пропадет на час, скрипт будет долбиться каждые 5 секунд, пока связь не вернется.async for msg in ws: Это самый эффективный способ чтения. Он не блокирует поток, пока сообщений нет.SUBSCRIPTION_PAYLOAD: Часто вебсокетам нужно сказать "Привет, я хочу данные по матчу ID 123". Это делается черезws.send_json()сразу после коннекта.- Heartbeat (Пинг):
aiohttpобычно сам обрабатывает пинги (Ping/Pong frames). Но некоторые API требуют слать{ "type": "ping" }вручную раз в 30 секунд. Если соединение рвется ровно через минуту — добавь параллельную задачу (asyncio.create_task), которая шлет пинги.