Тема 12: Архитектура парсера: Как организовать код (Producer/Consumer), чтобы парсить миллионы страниц и не терять данные при сбое?
Большинство новичков пишет парсеры так:
# ПУТЬ СМЕРТНИКА
urls = [...] # Список на миллион ссылок в памяти
results = []
for url in urls:
html = get(url) # Ждем сеть
data = parse(html) # Ждем проц
results.append(data) # Ждем пока память переполнится
save(results) # Если скрипт упадет на 999 999 строке — ты потерял всё.
Это линейная блокирующая архитектура.
- Память: Список
urlsжрет RAM. Списокresultsжрет RAM. - Скорость: Процессор простаивает, пока качает сеть. Сеть простаивает, пока работает процессор.
- Отказоустойчивость: Ноль. Ошибка = краш = потеря всего.
1. Архитектура: Конвейер (Producer / Consumer)
Представь завод.
- Producer (Поставщик): Чувак, который кидает детали на ленту конвейера. Он быстрый (просто генерирует ссылки).
- Queue (Очередь): Сама лента конвейера. Буфер, где лежат задачи.
- Consumers (Воркеры/Потребители): 50 китайцев, которые хватают детали с ленты и собирают Айфоны. Если один китаец заболел (ошибка), остальные 49 работают.
В чем кайф:
- Асинхронность: Мы запускаем 1 продюсера и 50 воркеров одновременно.
- Backpressure (Контроль давления): Очередь не дает воркерам захлебнуться. Они берут ровно столько, сколько могут переварить.
- Изоляция: Сбор данных и их обработка разделены.
2. Реализация на asyncio (Золотой Стандарт)
В Python есть встроенный asyncio.Queue. Это идеальный инструмент для одного сервера.
Вот шаблон боевого парсера:
import asyncio
import httpx
from random import randint
# Очередь задач (ссылок)
queue = asyncio.Queue()
# --- 1. PRODUCER (Генератор ссылок) ---
async def producer():
print("👨🏭 Продюсер: Начинаю генерировать ссылки...")
for i in range(100):
# В реальности тут может быть чтение из БД, генерация ID, пагинация
url = f"<https://api.site.com/items/{i}>"
await queue.put(url) # Кладем в очередь
print("👨🏭 Продюсер: Все ссылки в очереди.")
# --- 2. CONSUMER (Воркер) ---
async def worker(worker_id: int):
print(f"🤖 Воркер {worker_id}: Готов к труду")
async with httpx.AsyncClient() as client:
while True:
# Берем задачу из очереди.
# Если очередь пуста - воркер спит и ждет.
url = await queue.get()
try:
# --- ТУТ ВСЯ РАБОТА ---
# print(f"🤖 Воркер {worker_id} качает {url}")
await asyncio.sleep(randint(1, 3)) # Имитация запроса
# data = parse(response)
# save_to_db(data) # Пишем сразу! Не копим в памяти!
# -----------------------
except Exception as e:
print(f"🔥 Ошибка на {url}: {e}")
# Важно: можно вернуть url обратно в очередь или записать в лог ошибок
finally:
# Сообщаем очереди, что задача выполнена
queue.task_done()
# --- 3. ORCHESTRATOR (Главная функция) ---
async def main():
# Запускаем продюсера (он наполнит очередь)
prod_task = asyncio.create_task(producer())
# Запускаем 5 воркеров (они начнут разгребать очередь)
workers = [asyncio.create_task(worker(i)) for i in range(5)]
# Ждем, пока продюсер закончит накидывать задачи
await prod_task
# Ждем, пока очередь опустеет (воркеры всё доделают)
print("⏳ Ждем завершения задач в очереди...")
await queue.join()
# Отменяем воркеров (они в бесконечном цикле while True)
for w in workers:
w.cancel()
print("✅ Работа окончена!")
if __name__ == "__main__":
asyncio.run(main())
3. Почему это архитектура Сеньора?
- Сохранение на лету: Внутри
workerмы вызываем функцию сохранения (save_to_dbили запись в CSV) для каждого товара отдельно.- Результат: Если выключат свет на 500-й ссылке, у тебя в базе будет лежать 499 товаров. Ты ничего не потеряешь.
- Масштабируемость: Хочешь быстрее? Поменяй
range(5)наrange(50). Код менять не надо. - Graceful Shutdown:
queue.join()гарантирует, что скрипт не выйдет, пока последняя ссылка не будет обработана.
⚠️ Level Up: Распределенная очередь (Celery / RabbitMQ)
Если asyncio.Queue работает в памяти одного скрипта. Если скрипт упадет — очередь пропадет.
Для очень больших систем (миллионы страниц, разные серверы) вместо внутренней очереди используют внешние брокеры: RabbitMQ, Redis, Kafka.
Но для 99% задач (до 10 млн страниц) asyncio.Queue + сохранение в БД достаточно.
Резюме:
Никаких списков results = [].
Есть Очередь. Есть Воркеры. Воркер взял -> Сделал -> Сохранил -> Забыл.
Только так.
Блок 4 (Архитектура) закрыт. Переходим к финальному и критически важному блоку — Блок 5: Логистика и Доступ.