Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/mint.json
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@
"providers/documentation/signl4-provider",
"providers/documentation/site24x7-provider",
"providers/documentation/slack-provider",
"providers/documentation/snmp-provider",
"providers/documentation/smtp-provider",
"providers/documentation/snowflake-provider",
"providers/documentation/splunk-provider",
Expand Down
27 changes: 27 additions & 0 deletions docs/providers/documentation/snmp-provider.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
---
title: "SNMP"
sidebarTitle: "SNMP Provider"
description: "Ingest SNMP traps and notifications into Keep as alerts via HTTP JSON webhooks."
---

## Overview

SNMP traps are typically delivered over UDP with BER encoding. Keep ingests them on the standard **`POST /alerts/event/snmp`** webhook path when your forwarder translates traps into **JSON** (for example `snmptrapd` + shell, SNMPTT, or Telegraf `inputs.snmp_trap`).

Add the **SNMP** provider in Keep, open **Webhook** instructions, and use the generated URL and **`X-API-KEY`** header.

## Payload shape

Send a single trap object, a JSON array of traps, or a batch wrapper:

- `snmp_traps`: array of trap objects
- `traps`: alias array (same semantics)

Supported field aliases include `trapOid` / `snmpTrapOID` for the trap OID and `agentAddress` for the agent address.

Standard **`1.3.6.1.6.3.1.1.5.*`** (`snmpTrapOID` / `coldStart`, `linkDown`, …) notification OIDs map to default severities; you can override with `severity` / `status` when needed.

## Useful links

- [RFC 3416 — SNMPv2 protocol (notification / trap context)](https://www.rfc-editor.org/rfc/rfc3416)
- [Adding a provider (Keep docs)](https://docs.keephq.dev/providers/adding-a-new-provider)
3 changes: 3 additions & 0 deletions keep/providers/snmp_provider/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from keep.providers.snmp_provider.snmp_provider import SnmpProvider

__all__ = ["SnmpProvider"]
244 changes: 244 additions & 0 deletions keep/providers/snmp_provider/snmp_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
"""
SNMP Provider — ingest SNMP trap / notification events into Keep via HTTP webhook.

SNMP traps are classically UDP/BER; Keep's ingestion path is HTTP. The supported
workflow is to forward traps to Keep as JSON (snmptrapd + shell, SNMPTT,
Telegraf `inputs.snmp_trap`, etc.) against POST /event/snmp with a webhook API key.
"""

from __future__ import annotations

import datetime
import json
import logging
import uuid
from typing import Any

from keep.api.models.alert import AlertDto, AlertSeverity, AlertStatus
from keep.contextmanager.contextmanager import ContextManager
from keep.providers.base.base_provider import BaseProvider
from keep.providers.models.provider_config import ProviderConfig

logger = logging.getLogger(__name__)

# Standard SNMPv2-MIB snmpTrapOID values under prefix 1.3.6.1.6.3.1.1.5.<n>
_STD_TRAP_SUFFIX_SEVERITY: dict[str, AlertSeverity] = {
"1": AlertSeverity.INFO, # coldStart
"2": AlertSeverity.INFO, # warmStart
"3": AlertSeverity.HIGH, # linkDown
"4": AlertSeverity.INFO, # linkUp
"5": AlertSeverity.WARNING, # authenticationFailure
"6": AlertSeverity.WARNING, # egpNeighborLoss
}


class SnmpProvider(BaseProvider):
"""Receive SNMP trap / inform payloads (as JSON) into Keep."""

PROVIDER_DISPLAY_NAME = "SNMP"
PROVIDER_TAGS = ["alert"]
PROVIDER_CATEGORY = ["Monitoring"]
WEBHOOK_INSTALLATION_REQUIRED = True
FINGERPRINT_FIELDS = ["name", "host", "labels.trap_oid"]

webhook_description = ""
webhook_template = ""
webhook_markdown = """
### SNMP traps → Keep (HTTP)

Keep accepts **JSON** describing one or more traps on `POST {keep_webhook_api_url}` with header `X-API-KEY` / `Authorization: Bearer ...` (same as other webhook providers).

Use any forwarder that can `curl` JSON, for example **`snmptrapd`** with a `traphandle` script, **SNMPTT**, or **Telegraf** `inputs.snmp_trap` + `outputs.http`.

#### Single trap (minimal)

```json
{
"trap_oid": "1.3.6.1.6.3.1.1.5.3",
"agent_address": "192.0.2.10",
"name": "linkDown on eth0",
"message": "Interface eth0 down",
"hostname": "router-01"
}
```

#### Batch

```json
{
"snmp_traps": [
{ "trap_oid": "1.3.6.1.6.3.1.1.5.1", "agent_address": "192.0.2.1", "hostname": "sw1" }
]
}
```

Optional fields: `varbinds` (list of `{oid,type,value}`), `community`, `uptime`, `severity`, `status`, `lastReceived`.

`trap_oid` may also be sent as `trapOid` / `snmpTrapOID` (forwarder-specific).
"""

def __init__(
self, context_manager: ContextManager, provider_id: str, config: ProviderConfig
):
super().__init__(context_manager, provider_id, config)

def validate_config(self):
pass

def dispose(self):
pass

@staticmethod
def parse_event_raw_body(raw_body: bytes | dict) -> dict:
if isinstance(raw_body, dict):
return raw_body
if isinstance(raw_body, (bytes, bytearray)):
text = raw_body.decode("utf-8", errors="replace").strip()
if not text:
return {}
try:
parsed: Any = json.loads(text)
except json.JSONDecodeError:
logger.exception("SNMP provider: body is not valid JSON")
raise
if isinstance(parsed, list):
return {"snmp_traps": parsed}
if isinstance(parsed, dict):
return parsed
raise ValueError("SNMP webhook JSON must be an object or array")
raise TypeError(f"Unsupported SNMP event body type: {type(raw_body)}")

@staticmethod
def _normalize_trap_dict(raw: dict[str, Any]) -> dict[str, Any]:
out = dict(raw)
oid = (
out.get("trap_oid")
or out.get("trapOid")
or out.get("snmpTrapOID")
)
if oid:
out["trap_oid"] = oid
agent = (
out.get("agent_address")
or out.get("agentAddress")
or out.get("source_ip")
or out.get("agent-addr")
)
if agent:
out["agent_address"] = agent
host = out.get("hostname") or out.get("sysName") or out.get("host")
if host:
out["hostname"] = host
return out

@staticmethod
def _severity_for_oid(trap_oid: str | None) -> AlertSeverity:
if not trap_oid:
return AlertSeverity.INFO
trap_oid = trap_oid.strip()
prefix = "1.3.6.1.6.3.1.1.5."
if trap_oid.startswith(prefix):
rest = trap_oid[len(prefix):]
suffix = rest.split(".", 1)[0] if rest else ""
return _STD_TRAP_SUFFIX_SEVERITY.get(suffix, AlertSeverity.INFO)
return AlertSeverity.INFO

@staticmethod
def _format_one(event: dict[str, Any]) -> AlertDto:
event = SnmpProvider._normalize_trap_dict(dict(event))
trap_oid = event.get("trap_oid") or "unknown"
agent = event.get("agent_address") or "unknown"
hostname = event.get("hostname") or agent
name = event.get("name") or f"SNMP trap {trap_oid}"
message = event.get("message") or event.get("description")
if not message:
vbs = event.get("varbinds")
if isinstance(vbs, list) and vbs:
message = json.dumps(vbs[:20], default=str)
else:
message = f"Trap {trap_oid} from {agent}"

sev = event.get("severity")
if isinstance(sev, str):
try:
severity = AlertSeverity(sev.lower())
except ValueError:
severity = SnmpProvider._severity_for_oid(trap_oid)
elif isinstance(sev, int) and not isinstance(sev, bool):
try:
severity = AlertSeverity.from_number(sev)
except ValueError:
severity = SnmpProvider._severity_for_oid(trap_oid)
elif isinstance(sev, float):
if sev.is_integer():
try:
severity = AlertSeverity.from_number(int(sev))
except ValueError:
severity = SnmpProvider._severity_for_oid(trap_oid)
else:
severity = SnmpProvider._severity_for_oid(trap_oid)
else:
severity = SnmpProvider._severity_for_oid(trap_oid)

st = event.get("status")
if isinstance(st, str):
try:
status = AlertStatus(st.lower())
except ValueError:
status = AlertStatus.FIRING
else:
status = AlertStatus.FIRING

last = event.get("lastReceived") or event.get("timestamp")
if not last:
last = datetime.datetime.now(tz=datetime.timezone.utc).isoformat()

labels = dict(event.get("labels") or {})
labels.setdefault("trap_oid", trap_oid)
labels.setdefault("agent_address", agent)

alert_id = event.get("id") or str(uuid.uuid4())
fingerprint = event.get("fingerprint")

return AlertDto(
id=alert_id,
name=name,
message=str(message),
description=str(event.get("description") or message),
status=status,
severity=severity,
lastReceived=str(last),
host=hostname,
source=["snmp"],
labels=labels,
pushed=True,
fingerprint=fingerprint,
)

@staticmethod
def _format_alert(
event: dict | list[dict], provider_instance: BaseProvider | None = None
) -> AlertDto | list[AlertDto]:
if isinstance(event, list):
return [SnmpProvider._format_one(dict(x)) for x in event]

if isinstance(event, dict) and "snmp_traps" in event:
traps = event["snmp_traps"]
if not isinstance(traps, list):
raise ValueError("snmp_traps must be a list")
return [SnmpProvider._format_one(dict(x)) for x in traps]

if isinstance(event, dict) and "traps" in event:
traps = event["traps"]
if not isinstance(traps, list):
raise ValueError("traps must be a list")
return [SnmpProvider._format_one(dict(x)) for x in traps]

if not isinstance(event, dict):
raise TypeError("SNMP format_alert expects dict or list[dict]")

return SnmpProvider._format_one(event)


if __name__ == "__main__":
pass
100 changes: 100 additions & 0 deletions tests/providers/snmp_provider/test_snmp_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import json

import pytest

from keep.api.models.alert import AlertDto, AlertSeverity, AlertStatus
from keep.providers.snmp_provider.snmp_provider import SnmpProvider


def test_parse_json_bytes_array_wraps():
body = json.dumps([{"trap_oid": "1.2.3", "agent_address": "10.0.0.1"}]).encode()
out = SnmpProvider.parse_event_raw_body(body)
assert out == {"snmp_traps": [{"trap_oid": "1.2.3", "agent_address": "10.0.0.1"}]}


def test_format_alert_single_link_down():
event = {
"trap_oid": "1.3.6.1.6.3.1.1.5.3",
"agent_address": "192.0.2.5",
"hostname": "gw1",
"message": "eth1 down",
}
alert = SnmpProvider._format_alert(event, None)
assert isinstance(alert, AlertDto)
assert "1.3.6.1.6.3.1.1.5.3" in alert.name or alert.labels.get("trap_oid")
assert alert.message == "eth1 down"
assert alert.status == AlertStatus.FIRING
assert alert.severity == AlertSeverity.HIGH
assert alert.host == "gw1"
assert alert.source == ["snmp"]


def test_format_alert_batch():
raw = {
"snmp_traps": [
{"trapOid": "1.3.6.1.6.3.1.1.5.1", "agentAddress": "192.0.2.1"},
{"trap_oid": "1.3.6.1.6.3.1.1.5.4", "agent_address": "192.0.2.2"},
]
}
alerts = SnmpProvider._format_alert(raw, None)
assert isinstance(alerts, list)
assert len(alerts) == 2
assert alerts[0].labels.get("trap_oid") == "1.3.6.1.6.3.1.1.5.1"
assert alerts[1].labels.get("trap_oid") == "1.3.6.1.6.3.1.1.5.4"


def test_format_alert_explicit_severity_string():
event = {
"trap_oid": "1.3.6.1.4.1.99999.0.1",
"agent_address": "10.0.0.2",
"severity": "critical",
"status": "firing",
}
alert = SnmpProvider._format_alert(event, None)
assert alert.severity == AlertSeverity.CRITICAL
assert alert.status == AlertStatus.FIRING


@pytest.mark.parametrize(
"suffix,expected",
[
("1", AlertSeverity.INFO),
("3", AlertSeverity.HIGH),
("4", AlertSeverity.INFO),
],
)
def test_severity_oid_map(suffix, expected):
oid = f"1.3.6.1.6.3.1.1.5.{suffix}"
assert SnmpProvider._severity_for_oid(oid) == expected


def test_severity_oid_enterprise_not_std_mib_prefix():
"""OIDs that end in .5.<n> but are NOT snmpTrapOID under 1.3.6.1.6.3.1.1.5.* must stay INFO."""
assert (
SnmpProvider._severity_for_oid("1.2.3.4.5.3") == AlertSeverity.INFO
)


def test_format_alert_traps_must_be_list():
with pytest.raises(ValueError, match="traps must be a list"):
SnmpProvider._format_alert({"traps": "not-a-list"}, None)


def test_severity_int_out_of_range_falls_back_to_oid():
event = {
"trap_oid": "1.3.6.1.6.3.1.1.5.3",
"agent_address": "10.0.0.1",
"severity": 99,
}
alert = SnmpProvider._format_alert(event, None)
assert alert.severity == AlertSeverity.HIGH


def test_severity_float_integer_json():
event = {
"trap_oid": "1.3.6.1.4.1.1",
"agent_address": "10.0.0.1",
"severity": 4.0,
}
alert = SnmpProvider._format_alert(event, None)
assert alert.severity == AlertSeverity.HIGH
Loading