Перейти к содержанию

Тема 12: Архитектура парсера: Как организовать код (Producer/Consumer), чтобы парсить миллионы страниц и не терять данные при сбое?

Большинство новичков пишет парсеры так:

# ПУТЬ СМЕРТНИКА
urls = [...] # Список на миллион ссылок в памяти
results = []

for url in urls:
    html = get(url)   # Ждем сеть
    data = parse(html) # Ждем проц
    results.append(data) # Ждем пока память переполнится

save(results) # Если скрипт упадет на 999 999 строке — ты потерял всё.

Это линейная блокирующая архитектура.

  1. Память: Список urls жрет RAM. Список results жрет RAM.
  2. Скорость: Процессор простаивает, пока качает сеть. Сеть простаивает, пока работает процессор.
  3. Отказоустойчивость: Ноль. Ошибка = краш = потеря всего.

1. Архитектура: Конвейер (Producer / Consumer)

Представь завод.

  • Producer (Поставщик): Чувак, который кидает детали на ленту конвейера. Он быстрый (просто генерирует ссылки).
  • Queue (Очередь): Сама лента конвейера. Буфер, где лежат задачи.
  • Consumers (Воркеры/Потребители): 50 китайцев, которые хватают детали с ленты и собирают Айфоны. Если один китаец заболел (ошибка), остальные 49 работают.

В чем кайф:

  1. Асинхронность: Мы запускаем 1 продюсера и 50 воркеров одновременно.
  2. Backpressure (Контроль давления): Очередь не дает воркерам захлебнуться. Они берут ровно столько, сколько могут переварить.
  3. Изоляция: Сбор данных и их обработка разделены.

2. Реализация на asyncio (Золотой Стандарт)

В Python есть встроенный asyncio.Queue. Это идеальный инструмент для одного сервера.

Вот шаблон боевого парсера:

import asyncio
import httpx
from random import randint

# Очередь задач (ссылок)
queue = asyncio.Queue()

# --- 1. PRODUCER (Генератор ссылок) ---
async def producer():
    print("👨‍🏭 Продюсер: Начинаю генерировать ссылки...")
    for i in range(100):
        # В реальности тут может быть чтение из БД, генерация ID, пагинация
        url = f"<https://api.site.com/items/{i}>"
        await queue.put(url) # Кладем в очередь

    print("👨‍🏭 Продюсер: Все ссылки в очереди.")

# --- 2. CONSUMER (Воркер) ---
async def worker(worker_id: int):
    print(f"🤖 Воркер {worker_id}: Готов к труду")

    async with httpx.AsyncClient() as client:
        while True:
            # Берем задачу из очереди.
            # Если очередь пуста - воркер спит и ждет.
            url = await queue.get()

            try:
                # --- ТУТ ВСЯ РАБОТА ---
                # print(f"🤖 Воркер {worker_id} качает {url}")
                await asyncio.sleep(randint(1, 3)) # Имитация запроса
                # data = parse(response)
                # save_to_db(data) # Пишем сразу! Не копим в памяти!
                # -----------------------
            except Exception as e:
                print(f"🔥 Ошибка на {url}: {e}")
                # Важно: можно вернуть url обратно в очередь или записать в лог ошибок
            finally:
                # Сообщаем очереди, что задача выполнена
                queue.task_done()

# --- 3. ORCHESTRATOR (Главная функция) ---
async def main():
    # Запускаем продюсера (он наполнит очередь)
    prod_task = asyncio.create_task(producer())

    # Запускаем 5 воркеров (они начнут разгребать очередь)
    workers = [asyncio.create_task(worker(i)) for i in range(5)]

    # Ждем, пока продюсер закончит накидывать задачи
    await prod_task

    # Ждем, пока очередь опустеет (воркеры всё доделают)
    print("⏳ Ждем завершения задач в очереди...")
    await queue.join()

    # Отменяем воркеров (они в бесконечном цикле while True)
    for w in workers:
        w.cancel()

    print("✅ Работа окончена!")

if __name__ == "__main__":
    asyncio.run(main())

3. Почему это архитектура Сеньора?

  1. Сохранение на лету: Внутри worker мы вызываем функцию сохранения (save_to_db или запись в CSV) для каждого товара отдельно.
    • Результат: Если выключат свет на 500-й ссылке, у тебя в базе будет лежать 499 товаров. Ты ничего не потеряешь.
  2. Масштабируемость: Хочешь быстрее? Поменяй range(5) на range(50). Код менять не надо.
  3. Graceful Shutdown: queue.join() гарантирует, что скрипт не выйдет, пока последняя ссылка не будет обработана.

⚠️ Level Up: Распределенная очередь (Celery / RabbitMQ)

Если asyncio.Queue работает в памяти одного скрипта. Если скрипт упадет — очередь пропадет. Для очень больших систем (миллионы страниц, разные серверы) вместо внутренней очереди используют внешние брокеры: RabbitMQ, Redis, Kafka. Но для 99% задач (до 10 млн страниц) asyncio.Queue + сохранение в БД достаточно.

Резюме: Никаких списков results = []. Есть Очередь. Есть Воркеры. Воркер взял -> Сделал -> Сохранил -> Забыл. Только так.

Блок 4 (Архитектура) закрыт. Переходим к финальному и критически важному блоку — Блок 5: Логистика и Доступ.