Skip to content

musecollaboration/AdEvents-Collector

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

AdEvents Collector

Production-ready высокопроизводительный сервис сбора рекламных событий, способный обрабатывать 100K+ RPS с дедупликацией, батчингом и полным мониторингом.

Содержание

Архитектура

┌─────────────┐    ┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Client    │───▶│   FastAPI   │───▶│    Redis    │───▶│ RQ Workers  │
│             │    │     API     │    │   Queue     │    │ (3 replicas)│
└─────────────┘    └─────────────┘    └─────────────┘    └─────────────┘
                           │                                      │
                           ▼                                      ▼
                   ┌─────────────┐                        ┌─────────────┐
                   │ Prometheus  │                        │ PostgreSQL  │
                   │   Metrics   │                        │ (Partitioned)│
                   └─────────────┘                        └─────────────┘
                           │
                           ▼
                   ┌─────────────┐
                   │   Grafana   │
                   │ Dashboards  │
                   └─────────────┘

Компоненты системы

  1. FastAPI API - Асинхронный веб-сервер для приема событий
  2. Redis - Кэширование для дедупликации + очередь задач
  3. RQ Workers - Фоновые процессы для батчевой обработки
  4. PostgreSQL - Партиционированная база данных
  5. Prometheus - Сбор метрик производительности
  6. Grafana - Визуализация метрик и алертинг

Особенности

Производительность

  • 100K+ RPS - Поддержка высоких нагрузок
  • Асинхронная обработка - FastAPI + asyncio
  • Батчинг - Группировка событий для оптимизации записи
  • PostgreSQL COPY - Высокоскоростная вставка данных
  • Connection pooling - Эффективное управление соединениями

Надежность

  • Дедупликация - Redis-based с TTL для предотвращения дубликатов
  • Rate limiting - 100 RPS на IP с burst поддержкой
  • Health checks - Kubernetes-ready endpoints
  • Graceful shutdown - Корректное завершение работы
  • Error handling - Comprehensive error handling и retry логика

Мониторинг

  • Prometheus метрики - Полный набор метрик производительности
  • Grafana дашборды - Визуализация в реальном времени
  • Structured logging - JSON логи с structlog
  • Distributed tracing ready - Подготовлен для OpenTelemetry

Масштабируемость

  • Горизонтальное масштабирование - Stateless архитектура
  • Партиционирование БД - Автоматические партиции по дате
  • Worker scaling - Независимое масштабирование обработчиков
  • Redis clustering ready - Поддержка кластера Redis

Технологический стек

Backend

  • Python 3.11+ - Современная версия Python
  • FastAPI - Высокопроизводительный async веб-фреймворк
  • Pydantic v2 - Валидация данных и сериализация
  • asyncio - Асинхронное программирование

База данных и кэширование

  • PostgreSQL 15 - Основная база данных с партиционированием
  • Redis 7 - Кэширование и очередь задач
  • psycopg2 - PostgreSQL адаптер с connection pooling

Очереди и фоновые задачи

  • RQ (Redis Queue) - Система очередей задач
  • RQ Worker - Фоновые процессы обработки

Мониторинг и логирование

  • Prometheus - Сбор метрик
  • Grafana - Визуализация и алертинг
  • structlog - Структурированное логирование

DevOps и деплой

  • Docker + Docker Compose - Контейнеризация
  • Kubernetes ready - Готов для Kubernetes
  • GitHub Actions - CI/CD пайплайны

Качество кода

  • pytest - Тестирование с 95% покрытием
  • black - Форматирование кода
  • ruff - Быстрый линтер
  • mypy - Статическая типизация

Быстрый старт

Предварительные требования

  • Docker 20.10+
  • Docker Compose 2.0+
  • Python 3.11+ (для разработки)

Запуск в Docker

# Клонируем репозиторий
git clone https://github.com/your-repo/AdEvents_Collector.git
cd AdEvents_Collector

# Запускаем все сервисы
docker-compose up -d

# Проверяем статус
docker-compose ps

# Просматриваем логи
docker-compose logs -f api worker

Проверка работоспособности

# Health check
curl http://localhost:8000/health

# Отправка тестового события
curl -X POST http://localhost:8000/events \
  -H "Content-Type: application/json" \
  -d '{
    "event_type": "impression",
    "user_id": "user123",
    "ad_id": "ad456",
    "campaign_id": "camp789",
    "publisher_id": "pub001"
  }'

# Проверка статистики
curl http://localhost:8000/stats

Доступ к сервисам

Конфигурация

Переменные окружения

Создайте файл .env в корне проекта:

# Основные настройки
ENVIRONMENT=production
LOG_LEVEL=INFO
SECRET_KEY=your-super-secret-key-at-least-32-chars

# База данных
DATABASE_URL=postgresql://postgres:postgres@postgres:5432/ad_events

# Redis
REDIS_URL=redis://redis:6379/0

# API настройки
API_HOST=0.0.0.0
API_PORT=8000
API_WORKERS=4

# Батчинг
BATCH_SIZE=1000
MAX_BATCH_SIZE=5000

# Rate limiting
RATE_LIMIT_PER_MINUTE=100
RATE_LIMIT_BURST=200

# Мониторинг
METRICS_ENABLED=true
HEALTH_CHECK_TIMEOUT=30

# Партиционирование
PARTITION_RETENTION_DAYS=30
AUTO_CREATE_PARTITIONS=true

Конфигурация для production

# Production .env
ENVIRONMENT=production
DEBUG=false
LOG_LEVEL=WARNING

# Увеличиваем workers для production
API_WORKERS=8
WORKER_REPLICAS=6

# Увеличиваем batch размеры
BATCH_SIZE=2000
MAX_BATCH_SIZE=10000

# Настройки БД для высокой нагрузки
DATABASE_POOL_SIZE=50
DATABASE_MAX_OVERFLOW=100

# Redis настройки
REDIS_MAX_CONNECTIONS=100

API документация

Эндпоинты

POST /events

Отправка одного события

{
  "event_type": "impression|click|conversion",
  "user_id": "string",
  "ad_id": "string",
  "campaign_id": "string",
  "publisher_id": "string",
  "timestamp": "2023-10-31T10:00:00Z",
  "device_type": "desktop|mobile|tablet|tv",
  "ip_address": "192.168.1.1",
  "country": "US",
  "bid_price": 0.5,
  "revenue": 1.25
}

POST /events/batch

Отправка батча событий (до 5000 событий)

{
  "events": [
    // массив событий как выше
  ]
}

GET /health

Проверка здоровья сервиса

{
  "status": "healthy",
  "timestamp": 1698750000,
  "services": {
    "redis": "healthy",
    "api": "healthy"
  }
}

GET /stats

Статистика очереди

{
  "queue_length": 1250,
  "timestamp": 1698750000
}

GET /metrics

Prometheus метрики

Rate Limits

  • Single events: 100 запросов/минуту на IP
  • Batch events: 10 запросов/минуту на IP
  • Health checks: Без ограничений

Коды ответов

  • 202 - Событие принято в обработку
  • 400 - Некорректные данные
  • 429 - Превышен rate limit
  • 500 - Внутренняя ошибка сервера

Мониторинг

Prometheus метрики

Основные метрики

  • ad_events_received_total - Счетчик полученных событий
  • request_duration_seconds - Время обработки запросов
  • queue_length - Длина очереди Redis
  • events_processed_total - Обработанные события
  • batch_processing_time - Время обработки батча

Системные метрики

  • CPU, Memory, Disk usage (через node-exporter)
  • PostgreSQL метрики
  • Redis метрики

Grafana дашборды

API Dashboard

  • RPS и latency
  • Error rates
  • Rate limiting статистика

Processing Dashboard

  • Queue length и processing rate
  • Batch processing performance
  • Database write performance

Infrastructure Dashboard

  • System resources
  • Database connections
  • Redis memory usage

Алерты

# Пример алертов
groups:
  - name: adv_collector
    rules:
      - alert: HighErrorRate
        expr: rate(ad_events_received_total{status="error"}[5m]) > 0.1
        for: 2m

      - alert: HighQueueLength
        expr: queue_length > 10000
        for: 5m

      - alert: SlowBatchProcessing
        expr: avg(batch_processing_time) > 30
        for: 3m

Производительность

Бенчмарки

Тестовая среда

  • CPU: 8 cores
  • RAM: 16GB
  • SSD: NVMe

Результаты нагрузочного тестирования

Single events:
  - 50K RPS при latency p99 < 100ms
  - 100K RPS при latency p99 < 200ms

Batch events (1000 events/batch):
  - 1K batches/sec = 1M events/sec
  - Latency p99 < 500ms

Database writes:
  - 100K inserts/sec через COPY
  - 50K inserts/sec через INSERT

Оптимизации

Application level

  • Асинхронная обработка запросов
  • Connection pooling для БД и Redis
  • Батчинг событий перед записью в БД

Database level

  • Партиционирование по дате
  • Оптимизированные индексы
  • PostgreSQL COPY для bulk inserts

Infrastructure level

  • Redis для дедупликации и очередей
  • Горизонтальное масштабирование workers
  • CDN для статических ресурсов (если нужно)

Настройка для высоких нагрузок

# Увеличиваем workers
API_WORKERS=16
WORKER_REPLICAS=10

# Увеличиваем batch размеры
BATCH_SIZE=5000
MAX_BATCH_SIZE=20000

# Настройки БД
DATABASE_POOL_SIZE=100
DATABASE_MAX_OVERFLOW=200

# Redis настройки
REDIS_MAX_CONNECTIONS=200

Разработка

Настройка среды разработки

# Клонируем репозиторий
git clone https://github.com/your-repo/AdEvents_Collector.git
cd AdEvents_Collector

# Создаем виртуальную среду
python -m venv venv
source venv/bin/activate  # Linux/Mac
# или
venv\Scripts\activate     # Windows

# Устанавливаем зависимости
pip install -r requirements-dev.txt

# Запускаем локальные сервисы
docker-compose up -d postgres redis prometheus grafana

# Настраиваем pre-commit hooks
pre-commit install

Зависимости

Основные (requirements.txt)

fastapi>=0.104.0
uvicorn[standard]>=0.24.0
pydantic>=2.4.0
pydantic-settings>=2.0.0
redis>=5.0.0
rq>=1.15.0
psycopg2-binary>=2.9.0
structlog>=23.2.0
prometheus-client>=0.18.0
slowapi>=0.1.9

Разработка (requirements-dev.txt)

-r requirements.txt
pytest>=7.4.0
pytest-asyncio>=0.21.0
pytest-cov>=4.1.0
httpx>=0.25.0
black>=23.10.0
ruff>=0.1.0
mypy>=1.6.0
pre-commit>=3.5.0
locust>=2.17.0

Запуск тестов

# Все тесты с покрытием
pytest --cov=src --cov-report=html

# Только unit тесты
pytest tests/unit/

# Только integration тесты
pytest tests/integration/

# Нагрузочные тесты
locust -f locustfile.py --host=http://localhost:8000

Линтинг и форматирование

# Форматирование кода
black src/ tests/

# Линтинг
ruff check src/ tests/

# Проверка типов
mypy src/

# Все проверки одновременно
pre-commit run --all-files

Структура проекта

AdEvents_Collector/
├── src/
│   ├── api/
│   │   ├── __init__.py
│   │   ├── main.py          # FastAPI приложение
│   │   └── models.py        # Pydantic модели
│   ├── core/
│   │   ├── __init__.py
│   │   └── settings.py      # Конфигурация
│   └── worker/
│       ├── __init__.py
│       ├── tasks.py         # RQ задачи
│       └── worker.py        # Worker процесс
├── tests/
│   ├── unit/               # Unit тесты
│   ├── integration/        # Integration тесты
│   └── conftest.py        # Pytest fixtures
├── prometheus/
│   └── prometheus.yml     # Конфиг Prometheus
├── grafana/
│   └── provisioning/      # Grafana конфигурация
├── sql/
│   └── init.sql          # Инициализация БД
├── docker-compose.yml    # Локальная среда
├── Dockerfile           # Образ приложения
├── requirements.txt     # Python зависимости
└── locustfile.py       # Нагрузочные тесты

Развертывание

Docker

# Сборка образа
docker build -t adv-collector:latest .

# Запуск контейнера
docker run -d \
  --name adv-collector \
  -p 8000:8000 \
  -e DATABASE_URL="postgresql://..." \
  -e REDIS_URL="redis://..." \
  adv-collector:latest

Kubernetes

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: adv-collector
spec:
  replicas: 6
  selector:
    matchLabels:
      app: adv-collector
  template:
    metadata:
      labels:
        app: adv-collector
    spec:
      containers:
        - name: api
          image: adv-collector:latest
          ports:
            - containerPort: 8000
          env:
            - name: DATABASE_URL
              valueFrom:
                secretKeyRef:
                  name: adv-collector-secrets
                  key: database-url
          livenessProbe:
            httpGet:
              path: /health
              port: 8000
          readinessProbe:
            httpGet:
              path: /ready
              port: 8000
          resources:
            requests:
              memory: "512Mi"
              cpu: "500m"
            limits:
              memory: "1Gi"
              cpu: "1000m"

CI/CD Pipeline

# .github/workflows/deploy.yml
name: Deploy
on:
  push:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Run tests
        run: |
          pytest --cov=src

  build:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - name: Build and push Docker image
        run: |
          docker build -t $IMAGE_TAG .
          docker push $IMAGE_TAG

  deploy:
    needs: build
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to Kubernetes
        run: |
          kubectl set image deployment/adv-collector api=$IMAGE_TAG

Мониторинг в production

# monitoring/alertmanager.yml
global:
  smtp_smarthost: "localhost:587"
  smtp_from: "alerts@yourcompany.com"

route:
  group_by: ["alertname"]
  group_wait: 10s
  group_interval: 10s
  repeat_interval: 1h
  receiver: "team-notifications"

receivers:
  - name: "team-notifications"
    email_configs:
      - to: "team@yourcompany.com"
        subject: "AdEvents Collector Alert"

Troubleshooting

Частые проблемы

Высокая latency API

# Проверяем метрики
curl http://localhost:8000/metrics | grep request_duration

# Проверяем connection pool
# В логах ищем "connection pool exhausted"

# Решение: увеличиваем pool size
DATABASE_POOL_SIZE=50
DATABASE_MAX_OVERFLOW=100

Растет очередь в Redis

# Проверяем длину очереди
redis-cli llen events_queue

# Проверяем workers
docker-compose ps worker

# Решение: увеличиваем количество workers
docker-compose up -d --scale worker=6

Медленные записи в БД

# Проверяем партиции
SELECT schemaname, tablename
FROM pg_tables
WHERE tablename LIKE 'ad_events_%';

# Проверяем индексы
SELECT indexname FROM pg_indexes
WHERE tablename LIKE 'ad_events_%';

# Решение: убеждаемся что партиции создаются автоматически
AUTO_CREATE_PARTITIONS=true

Логи и отладка

# Логи API
docker-compose logs -f api

# Логи workers
docker-compose logs -f worker

# Логи с фильтрацией
docker-compose logs api | grep ERROR

# Отладка Redis
redis-cli monitor

# Отладка PostgreSQL
docker-compose exec postgres psql -U postgres -d ad_events

Производительность

Профилирование

# Добавляем в код для профилирования
import cProfile
import pstats

# Профилируем endpoints
@app.middleware("http")
async def profile_middleware(request, call_next):
    profiler = cProfile.Profile()
    profiler.enable()
    response = await call_next(request)
    profiler.disable()
    # Анализируем результаты
    return response

Настройка PostgreSQL

-- postgresql.conf для высоких нагрузок
shared_buffers = 4GB
effective_cache_size = 12GB
random_page_cost = 1.1
checkpoint_completion_target = 0.9
wal_buffers = 16MB
default_statistics_target = 100

Backup и восстановление

# Backup PostgreSQL
docker-compose exec postgres pg_dump -U postgres ad_events > backup.sql

# Восстановление
docker-compose exec -T postgres psql -U postgres ad_events < backup.sql

# Backup Redis
docker-compose exec redis redis-cli BGSAVE

AdEvents Collector - Production-ready сервис для высоконагруженной обработки рекламных событий

About

Production-ready микросервис для сбора и обработки рекламных событий (impression, click, conversion) с поддержкой 100,000+ RPS. Сервис построен на современном стеке технологий с фокусом на производительность, масштабируемость и надежность.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors