Production-ready высокопроизводительный сервис сбора рекламных событий, способный обрабатывать 100K+ RPS с дедупликацией, батчингом и полным мониторингом.
- Архитектура
- Особенности
- Технологический стек
- Быстрый старт
- Конфигурация
- API документация
- Мониторинг
- Производительность
- Разработка
- Развертывание
- Troubleshooting
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client │───▶│ FastAPI │───▶│ Redis │───▶│ RQ Workers │
│ │ │ API │ │ Queue │ │ (3 replicas)│
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Prometheus │ │ PostgreSQL │
│ Metrics │ │ (Partitioned)│
└─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Grafana │
│ Dashboards │
└─────────────┘
- FastAPI API - Асинхронный веб-сервер для приема событий
- Redis - Кэширование для дедупликации + очередь задач
- RQ Workers - Фоновые процессы для батчевой обработки
- PostgreSQL - Партиционированная база данных
- Prometheus - Сбор метрик производительности
- Grafana - Визуализация метрик и алертинг
- 100K+ RPS - Поддержка высоких нагрузок
- Асинхронная обработка - FastAPI + asyncio
- Батчинг - Группировка событий для оптимизации записи
- PostgreSQL COPY - Высокоскоростная вставка данных
- Connection pooling - Эффективное управление соединениями
- Дедупликация - Redis-based с TTL для предотвращения дубликатов
- Rate limiting - 100 RPS на IP с burst поддержкой
- Health checks - Kubernetes-ready endpoints
- Graceful shutdown - Корректное завершение работы
- Error handling - Comprehensive error handling и retry логика
- Prometheus метрики - Полный набор метрик производительности
- Grafana дашборды - Визуализация в реальном времени
- Structured logging - JSON логи с structlog
- Distributed tracing ready - Подготовлен для OpenTelemetry
- Горизонтальное масштабирование - Stateless архитектура
- Партиционирование БД - Автоматические партиции по дате
- Worker scaling - Независимое масштабирование обработчиков
- Redis clustering ready - Поддержка кластера Redis
- Python 3.11+ - Современная версия Python
- FastAPI - Высокопроизводительный async веб-фреймворк
- Pydantic v2 - Валидация данных и сериализация
- asyncio - Асинхронное программирование
- PostgreSQL 15 - Основная база данных с партиционированием
- Redis 7 - Кэширование и очередь задач
- psycopg2 - PostgreSQL адаптер с connection pooling
- RQ (Redis Queue) - Система очередей задач
- RQ Worker - Фоновые процессы обработки
- Prometheus - Сбор метрик
- Grafana - Визуализация и алертинг
- structlog - Структурированное логирование
- Docker + Docker Compose - Контейнеризация
- Kubernetes ready - Готов для Kubernetes
- GitHub Actions - CI/CD пайплайны
- pytest - Тестирование с 95% покрытием
- black - Форматирование кода
- ruff - Быстрый линтер
- mypy - Статическая типизация
- Docker 20.10+
- Docker Compose 2.0+
- Python 3.11+ (для разработки)
# Клонируем репозиторий
git clone https://github.com/your-repo/AdEvents_Collector.git
cd AdEvents_Collector
# Запускаем все сервисы
docker-compose up -d
# Проверяем статус
docker-compose ps
# Просматриваем логи
docker-compose logs -f api worker# Health check
curl http://localhost:8000/health
# Отправка тестового события
curl -X POST http://localhost:8000/events \
-H "Content-Type: application/json" \
-d '{
"event_type": "impression",
"user_id": "user123",
"ad_id": "ad456",
"campaign_id": "camp789",
"publisher_id": "pub001"
}'
# Проверка статистики
curl http://localhost:8000/stats- API: http://localhost:8000
- API Docs: http://localhost:8000/docs
- Grafana: http://localhost:3000 (admin/admin)
- Prometheus: http://localhost:9090
- pgAdmin: http://localhost:5050 (если добавлен в docker-compose)
Создайте файл .env в корне проекта:
# Основные настройки
ENVIRONMENT=production
LOG_LEVEL=INFO
SECRET_KEY=your-super-secret-key-at-least-32-chars
# База данных
DATABASE_URL=postgresql://postgres:postgres@postgres:5432/ad_events
# Redis
REDIS_URL=redis://redis:6379/0
# API настройки
API_HOST=0.0.0.0
API_PORT=8000
API_WORKERS=4
# Батчинг
BATCH_SIZE=1000
MAX_BATCH_SIZE=5000
# Rate limiting
RATE_LIMIT_PER_MINUTE=100
RATE_LIMIT_BURST=200
# Мониторинг
METRICS_ENABLED=true
HEALTH_CHECK_TIMEOUT=30
# Партиционирование
PARTITION_RETENTION_DAYS=30
AUTO_CREATE_PARTITIONS=true# Production .env
ENVIRONMENT=production
DEBUG=false
LOG_LEVEL=WARNING
# Увеличиваем workers для production
API_WORKERS=8
WORKER_REPLICAS=6
# Увеличиваем batch размеры
BATCH_SIZE=2000
MAX_BATCH_SIZE=10000
# Настройки БД для высокой нагрузки
DATABASE_POOL_SIZE=50
DATABASE_MAX_OVERFLOW=100
# Redis настройки
REDIS_MAX_CONNECTIONS=100Отправка одного события
{
"event_type": "impression|click|conversion",
"user_id": "string",
"ad_id": "string",
"campaign_id": "string",
"publisher_id": "string",
"timestamp": "2023-10-31T10:00:00Z",
"device_type": "desktop|mobile|tablet|tv",
"ip_address": "192.168.1.1",
"country": "US",
"bid_price": 0.5,
"revenue": 1.25
}Отправка батча событий (до 5000 событий)
{
"events": [
// массив событий как выше
]
}Проверка здоровья сервиса
{
"status": "healthy",
"timestamp": 1698750000,
"services": {
"redis": "healthy",
"api": "healthy"
}
}Статистика очереди
{
"queue_length": 1250,
"timestamp": 1698750000
}Prometheus метрики
- Single events: 100 запросов/минуту на IP
- Batch events: 10 запросов/минуту на IP
- Health checks: Без ограничений
202- Событие принято в обработку400- Некорректные данные429- Превышен rate limit500- Внутренняя ошибка сервера
ad_events_received_total- Счетчик полученных событийrequest_duration_seconds- Время обработки запросовqueue_length- Длина очереди Redisevents_processed_total- Обработанные событияbatch_processing_time- Время обработки батча
- CPU, Memory, Disk usage (через node-exporter)
- PostgreSQL метрики
- Redis метрики
- RPS и latency
- Error rates
- Rate limiting статистика
- Queue length и processing rate
- Batch processing performance
- Database write performance
- System resources
- Database connections
- Redis memory usage
# Пример алертов
groups:
- name: adv_collector
rules:
- alert: HighErrorRate
expr: rate(ad_events_received_total{status="error"}[5m]) > 0.1
for: 2m
- alert: HighQueueLength
expr: queue_length > 10000
for: 5m
- alert: SlowBatchProcessing
expr: avg(batch_processing_time) > 30
for: 3m- CPU: 8 cores
- RAM: 16GB
- SSD: NVMe
Single events:
- 50K RPS при latency p99 < 100ms
- 100K RPS при latency p99 < 200ms
Batch events (1000 events/batch):
- 1K batches/sec = 1M events/sec
- Latency p99 < 500ms
Database writes:
- 100K inserts/sec через COPY
- 50K inserts/sec через INSERT
- Асинхронная обработка запросов
- Connection pooling для БД и Redis
- Батчинг событий перед записью в БД
- Партиционирование по дате
- Оптимизированные индексы
- PostgreSQL COPY для bulk inserts
- Redis для дедупликации и очередей
- Горизонтальное масштабирование workers
- CDN для статических ресурсов (если нужно)
# Увеличиваем workers
API_WORKERS=16
WORKER_REPLICAS=10
# Увеличиваем batch размеры
BATCH_SIZE=5000
MAX_BATCH_SIZE=20000
# Настройки БД
DATABASE_POOL_SIZE=100
DATABASE_MAX_OVERFLOW=200
# Redis настройки
REDIS_MAX_CONNECTIONS=200# Клонируем репозиторий
git clone https://github.com/your-repo/AdEvents_Collector.git
cd AdEvents_Collector
# Создаем виртуальную среду
python -m venv venv
source venv/bin/activate # Linux/Mac
# или
venv\Scripts\activate # Windows
# Устанавливаем зависимости
pip install -r requirements-dev.txt
# Запускаем локальные сервисы
docker-compose up -d postgres redis prometheus grafana
# Настраиваем pre-commit hooks
pre-commit installfastapi>=0.104.0
uvicorn[standard]>=0.24.0
pydantic>=2.4.0
pydantic-settings>=2.0.0
redis>=5.0.0
rq>=1.15.0
psycopg2-binary>=2.9.0
structlog>=23.2.0
prometheus-client>=0.18.0
slowapi>=0.1.9-r requirements.txt
pytest>=7.4.0
pytest-asyncio>=0.21.0
pytest-cov>=4.1.0
httpx>=0.25.0
black>=23.10.0
ruff>=0.1.0
mypy>=1.6.0
pre-commit>=3.5.0
locust>=2.17.0# Все тесты с покрытием
pytest --cov=src --cov-report=html
# Только unit тесты
pytest tests/unit/
# Только integration тесты
pytest tests/integration/
# Нагрузочные тесты
locust -f locustfile.py --host=http://localhost:8000# Форматирование кода
black src/ tests/
# Линтинг
ruff check src/ tests/
# Проверка типов
mypy src/
# Все проверки одновременно
pre-commit run --all-filesAdEvents_Collector/
├── src/
│ ├── api/
│ │ ├── __init__.py
│ │ ├── main.py # FastAPI приложение
│ │ └── models.py # Pydantic модели
│ ├── core/
│ │ ├── __init__.py
│ │ └── settings.py # Конфигурация
│ └── worker/
│ ├── __init__.py
│ ├── tasks.py # RQ задачи
│ └── worker.py # Worker процесс
├── tests/
│ ├── unit/ # Unit тесты
│ ├── integration/ # Integration тесты
│ └── conftest.py # Pytest fixtures
├── prometheus/
│ └── prometheus.yml # Конфиг Prometheus
├── grafana/
│ └── provisioning/ # Grafana конфигурация
├── sql/
│ └── init.sql # Инициализация БД
├── docker-compose.yml # Локальная среда
├── Dockerfile # Образ приложения
├── requirements.txt # Python зависимости
└── locustfile.py # Нагрузочные тесты
# Сборка образа
docker build -t adv-collector:latest .
# Запуск контейнера
docker run -d \
--name adv-collector \
-p 8000:8000 \
-e DATABASE_URL="postgresql://..." \
-e REDIS_URL="redis://..." \
adv-collector:latest# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: adv-collector
spec:
replicas: 6
selector:
matchLabels:
app: adv-collector
template:
metadata:
labels:
app: adv-collector
spec:
containers:
- name: api
image: adv-collector:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: adv-collector-secrets
key: database-url
livenessProbe:
httpGet:
path: /health
port: 8000
readinessProbe:
httpGet:
path: /ready
port: 8000
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"# .github/workflows/deploy.yml
name: Deploy
on:
push:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run tests
run: |
pytest --cov=src
build:
needs: test
runs-on: ubuntu-latest
steps:
- name: Build and push Docker image
run: |
docker build -t $IMAGE_TAG .
docker push $IMAGE_TAG
deploy:
needs: build
runs-on: ubuntu-latest
steps:
- name: Deploy to Kubernetes
run: |
kubectl set image deployment/adv-collector api=$IMAGE_TAG# monitoring/alertmanager.yml
global:
smtp_smarthost: "localhost:587"
smtp_from: "alerts@yourcompany.com"
route:
group_by: ["alertname"]
group_wait: 10s
group_interval: 10s
repeat_interval: 1h
receiver: "team-notifications"
receivers:
- name: "team-notifications"
email_configs:
- to: "team@yourcompany.com"
subject: "AdEvents Collector Alert"# Проверяем метрики
curl http://localhost:8000/metrics | grep request_duration
# Проверяем connection pool
# В логах ищем "connection pool exhausted"
# Решение: увеличиваем pool size
DATABASE_POOL_SIZE=50
DATABASE_MAX_OVERFLOW=100# Проверяем длину очереди
redis-cli llen events_queue
# Проверяем workers
docker-compose ps worker
# Решение: увеличиваем количество workers
docker-compose up -d --scale worker=6# Проверяем партиции
SELECT schemaname, tablename
FROM pg_tables
WHERE tablename LIKE 'ad_events_%';
# Проверяем индексы
SELECT indexname FROM pg_indexes
WHERE tablename LIKE 'ad_events_%';
# Решение: убеждаемся что партиции создаются автоматически
AUTO_CREATE_PARTITIONS=true# Логи API
docker-compose logs -f api
# Логи workers
docker-compose logs -f worker
# Логи с фильтрацией
docker-compose logs api | grep ERROR
# Отладка Redis
redis-cli monitor
# Отладка PostgreSQL
docker-compose exec postgres psql -U postgres -d ad_events# Добавляем в код для профилирования
import cProfile
import pstats
# Профилируем endpoints
@app.middleware("http")
async def profile_middleware(request, call_next):
profiler = cProfile.Profile()
profiler.enable()
response = await call_next(request)
profiler.disable()
# Анализируем результаты
return response-- postgresql.conf для высоких нагрузок
shared_buffers = 4GB
effective_cache_size = 12GB
random_page_cost = 1.1
checkpoint_completion_target = 0.9
wal_buffers = 16MB
default_statistics_target = 100# Backup PostgreSQL
docker-compose exec postgres pg_dump -U postgres ad_events > backup.sql
# Восстановление
docker-compose exec -T postgres psql -U postgres ad_events < backup.sql
# Backup Redis
docker-compose exec redis redis-cli BGSAVEAdEvents Collector - Production-ready сервис для высоконагруженной обработки рекламных событий