Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ tempfile = "3.20.0"
lazy_static = "1.4.0"
prost = "0.13.1"
dashmap = "6.1.0"
parking_lot = "0.12.5"
indexmap = { version = "2.13.0", features = ["serde"] }

[build-dependencies]
Expand Down
15 changes: 10 additions & 5 deletions src/alerts/alert_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ pub struct BasicAlertFields {
pub severity: Severity,
}

pub type AlertMap = HashMap<Ulid, Box<dyn AlertTrait>>;

#[derive(Debug)]
pub struct Alerts {
pub alerts: RwLock<HashMap<Ulid, Box<dyn AlertTrait>>>,
pub alerts: RwLock<HashMap<String, AlertMap>>,
pub sender: mpsc::Sender<AlertTask>,
}

Expand Down Expand Up @@ -288,7 +290,7 @@ pub struct AlertRequest {
}

impl AlertRequest {
pub async fn into(self) -> Result<AlertConfig, AlertError> {
pub async fn into(self, tenant_id: Option<String>) -> Result<AlertConfig, AlertError> {
// Validate that other_fields doesn't contain reserved field names
let other_fields = if let Some(mut other_fields) = self.other_fields {
// Limit other_fields to maximum 10 fields
Expand Down Expand Up @@ -316,7 +318,7 @@ impl AlertRequest {

// Validate that all target IDs exist
for id in &self.targets {
TARGETS.get_target_by_id(id).await?;
TARGETS.get_target_by_id(id, &tenant_id).await?;
}
let datasets = resolve_stream_names(&self.query)?;

Expand Down Expand Up @@ -369,6 +371,7 @@ impl AlertRequest {
tags: self.tags,
last_triggered_at: None,
other_fields,
tenant_id,
};

Ok(config)
Expand Down Expand Up @@ -399,6 +402,7 @@ pub struct AlertConfig {
pub last_triggered_at: Option<DateTime<Utc>>,
#[serde(flatten)]
pub other_fields: Option<serde_json::Map<String, Value>>,
pub tenant_id: Option<String>,
}

#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
Expand Down Expand Up @@ -711,6 +715,7 @@ pub struct DailyMTTRStats {
pub struct MTTRHistory {
/// Array of daily MTTR statistics
pub daily_stats: Vec<DailyMTTRStats>,
pub tenant_id: Option<String>,
}

/// Query parameters for MTTR API endpoint
Expand Down Expand Up @@ -883,7 +888,7 @@ impl MetastoreObject for AlertConfig {
}

fn get_object_path(&self) -> String {
alert_json_path(self.id).to_string()
alert_json_path(self.id, &self.tenant_id).to_string()
}
}

Expand All @@ -893,6 +898,6 @@ impl MetastoreObject for MTTRHistory {
}

fn get_object_path(&self) -> String {
mttr_json_path().to_string()
mttr_json_path(&self.tenant_id).to_string()
}
}
24 changes: 19 additions & 5 deletions src/alerts/alert_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub trait AlertTrait: Debug + Send + Sync + MetastoreObject {
fn get_datasets(&self) -> &[String];
fn to_alert_config(&self) -> AlertConfig;
fn clone_box(&self) -> Box<dyn AlertTrait>;
fn get_tenant_id(&self) -> &Option<String>;
}

#[async_trait]
Expand All @@ -86,25 +87,38 @@ pub trait AlertManagerTrait: Send + Sync {
session: SessionKey,
tags: Vec<String>,
) -> Result<Vec<AlertConfig>, AlertError>;
async fn get_alert_by_id(&self, id: Ulid) -> Result<Box<dyn AlertTrait>, AlertError>;
async fn get_alert_by_id(
&self,
id: Ulid,
tenant_id: &Option<String>,
) -> Result<Box<dyn AlertTrait>, AlertError>;
async fn update(&self, alert: &dyn AlertTrait);
async fn update_state(
&self,
alert_id: Ulid,
new_state: AlertState,
trigger_notif: Option<String>,
tenant_id: &Option<String>,
) -> Result<(), AlertError>;
async fn update_notification_state(
&self,
alert_id: Ulid,
new_notification_state: NotificationState,
tenant_id: &Option<String>,
) -> Result<(), AlertError>;
async fn delete(&self, alert_id: Ulid) -> Result<(), AlertError>;
async fn get_state(&self, alert_id: Ulid) -> Result<AlertState, AlertError>;
async fn delete(&self, alert_id: Ulid, tenant_id: &Option<String>) -> Result<(), AlertError>;
async fn get_state(
&self,
alert_id: Ulid,
tenant_id: &Option<String>,
) -> Result<AlertState, AlertError>;
async fn start_task(&self, alert: Box<dyn AlertTrait>) -> Result<(), AlertError>;
async fn delete_task(&self, alert_id: Ulid) -> Result<(), AlertError>;
async fn list_tags(&self) -> Vec<String>;
async fn get_all_alerts(&self) -> HashMap<Ulid, Box<dyn AlertTrait>>;
async fn list_tags(&self, tenant_id: &Option<String>) -> Vec<String>;
async fn get_all_alerts(
&self,
tenant_id: &Option<String>,
) -> HashMap<Ulid, Box<dyn AlertTrait>>;
}

#[async_trait]
Expand Down
38 changes: 30 additions & 8 deletions src/alerts/alert_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

use std::{str::FromStr, time::Duration};

use actix_web::http::header::{HeaderMap, HeaderName, HeaderValue};
use chrono::{DateTime, Utc};
use serde_json::Value;
use tonic::async_trait;
Expand All @@ -41,6 +42,7 @@ use crate::{
query::resolve_stream_names,
rbac::map::SessionKey,
storage::object_storage::alert_json_path,
tenants::TENANT_METADATA,
utils::user_auth_for_query,
};

Expand Down Expand Up @@ -68,11 +70,12 @@ pub struct ThresholdAlert {
pub last_triggered_at: Option<DateTime<Utc>>,
#[serde(flatten)]
pub other_fields: Option<serde_json::Map<String, Value>>,
pub tenant_id: Option<String>,
}

impl MetastoreObject for ThresholdAlert {
fn get_object_path(&self) -> String {
alert_json_path(self.id).to_string()
alert_json_path(self.id, &self.tenant_id).to_string()
}

fn get_object_id(&self) -> String {
Expand All @@ -84,7 +87,20 @@ impl MetastoreObject for ThresholdAlert {
impl AlertTrait for ThresholdAlert {
async fn eval_alert(&self) -> Result<Option<String>, AlertError> {
let time_range = extract_time_range(&self.eval_config)?;
let query_result = execute_alert_query(self.get_query(), &time_range).await?;
let auth = if let Some(tenant) = self.tenant_id.as_ref()
&& let Some(header) = TENANT_METADATA.get_global_query_auth(tenant)
{
let mut map = HeaderMap::new();
map.insert(
HeaderName::from_static("authorization"),
HeaderValue::from_str(&header).unwrap(),
);
Some(map)
} else {
None
};
Comment on lines +90 to +101
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid panics when building the Authorization header.

HeaderValue::from_str(&header).unwrap() can panic on malformed values. Convert this into a handled error instead.

🛠️ Proposed fix
-        let auth = if let Some(tenant) = self.tenant_id.as_ref()
-            && let Some(header) = TENANT_METADATA.get_global_query_auth(tenant)
-        {
-            let mut map = HeaderMap::new();
-            map.insert(
-                HeaderName::from_static("authorization"),
-                HeaderValue::from_str(&header).unwrap(),
-            );
-            Some(map)
-        } else {
-            None
-        };
+        let auth = if let Some(tenant) = self.tenant_id.as_ref()
+            && let Some(header) = TENANT_METADATA.get_global_query_auth(tenant)
+        {
+            let mut map = HeaderMap::new();
+            let value = HeaderValue::from_str(&header)
+                .map_err(|e| AlertError::CustomError(format!("Invalid auth header: {e}")))?;
+            map.insert(HeaderName::from_static("authorization"), value);
+            Some(map)
+        } else {
+            None
+        };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let auth = if let Some(tenant) = self.tenant_id.as_ref()
&& let Some(header) = TENANT_METADATA.get_global_query_auth(tenant)
{
let mut map = HeaderMap::new();
map.insert(
HeaderName::from_static("authorization"),
HeaderValue::from_str(&header).unwrap(),
);
Some(map)
} else {
None
};
let auth = if let Some(tenant) = self.tenant_id.as_ref()
&& let Some(header) = TENANT_METADATA.get_global_query_auth(tenant)
{
let mut map = HeaderMap::new();
let value = HeaderValue::from_str(&header)
.map_err(|e| AlertError::CustomError(format!("Invalid auth header: {e}")))?;
map.insert(HeaderName::from_static("authorization"), value);
Some(map)
} else {
None
};
🤖 Prompt for AI Agents
In `@src/alerts/alert_types.rs` around lines 90 - 101, The code currently uses
HeaderValue::from_str(&header).unwrap() which can panic; replace the unwrap with
proper error handling by matching or using .map_err() on
HeaderValue::from_str(&header) inside the auth-building block (the code
referencing self.tenant_id, TENANT_METADATA.get_global_query_auth, HeaderMap and
HeaderValue::from_str). Either propagate the error from the surrounding function
(turning the caller into a Result and returning an Err with a descriptive
message) or handle the parse failure gracefully (log the malformed header and
skip adding the Authorization header by returning None for auth); implement the
match or map_err flow so no panic can occur.

let query_result =
execute_alert_query(auth, self.get_query(), &time_range, &self.tenant_id).await?;

if query_result.is_simple_query {
// Handle simple queries
Expand Down Expand Up @@ -164,7 +180,7 @@ impl AlertTrait for ThresholdAlert {
"No tables found in query".into(),
));
}
create_streams_for_distributed(tables)
create_streams_for_distributed(tables, &self.tenant_id)
.await
.map_err(|_| AlertError::InvalidAlertQuery("Invalid tables".into()))?;

Expand All @@ -191,7 +207,7 @@ impl AlertTrait for ThresholdAlert {
// update on disk
PARSEABLE
.metastore
.put_alert(&self.to_alert_config())
.put_alert(&self.to_alert_config(), &self.tenant_id)
.await?;
Ok(())
}
Expand All @@ -217,12 +233,12 @@ impl AlertTrait for ThresholdAlert {
// update on disk
PARSEABLE
.metastore
.put_alert(&self.to_alert_config())
.put_alert(&self.to_alert_config(), &self.tenant_id)
.await?;
let state_entry = AlertStateEntry::new(self.id, self.state);
PARSEABLE
.metastore
.put_alert_state(&state_entry as &dyn MetastoreObject)
.put_alert_state(&state_entry as &dyn MetastoreObject, &self.tenant_id)
.await?;
return Ok(());
}
Expand Down Expand Up @@ -257,13 +273,13 @@ impl AlertTrait for ThresholdAlert {
// update on disk
PARSEABLE
.metastore
.put_alert(&self.to_alert_config())
.put_alert(&self.to_alert_config(), &self.tenant_id)
.await?;
let state_entry = AlertStateEntry::new(self.id, self.state);

PARSEABLE
.metastore
.put_alert_state(&state_entry as &dyn MetastoreObject)
.put_alert_state(&state_entry as &dyn MetastoreObject, &self.tenant_id)
.await?;

if let Some(trigger_notif) = trigger_notif
Expand Down Expand Up @@ -337,6 +353,10 @@ impl AlertTrait for ThresholdAlert {
&self.datasets
}

fn get_tenant_id(&self) -> &Option<String> {
&self.tenant_id
}

fn to_alert_config(&self) -> AlertConfig {
let clone = self.clone();
clone.into()
Expand Down Expand Up @@ -414,6 +434,7 @@ impl From<AlertConfig> for ThresholdAlert {
datasets: value.datasets,
last_triggered_at: value.last_triggered_at,
other_fields: value.other_fields,
tenant_id: value.tenant_id,
}
}
}
Expand All @@ -438,6 +459,7 @@ impl From<ThresholdAlert> for AlertConfig {
datasets: val.datasets,
last_triggered_at: val.last_triggered_at,
other_fields: val.other_fields,
tenant_id: val.tenant_id,
}
}
}
Expand Down
44 changes: 32 additions & 12 deletions src/alerts/alerts_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use std::{collections::HashMap, fmt::Display};

use actix_web::Either;
use actix_web::{Either, http::header::HeaderMap};
use arrow_array::{Array, Float64Array, Int64Array, RecordBatch};
use datafusion::{
logical_expr::{Literal, LogicalPlan},
Expand Down Expand Up @@ -75,12 +75,14 @@ pub fn extract_time_range(eval_config: &super::EvalConfig) -> Result<TimeRange,

/// Execute the alert query based on the current mode and return structured group results
pub async fn execute_alert_query(
auth_token: Option<HeaderMap>,
query: &str,
time_range: &TimeRange,
tenant_id: &Option<String>,
) -> Result<AlertQueryResult, AlertError> {
match PARSEABLE.options.mode {
Mode::All | Mode::Query => execute_local_query(query, time_range).await,
Mode::Prism => execute_remote_query(query, time_range).await,
Mode::All | Mode::Query => execute_local_query(query, time_range, tenant_id).await,
Mode::Prism => execute_remote_query(auth_token, query, time_range, tenant_id).await,
_ => Err(AlertError::CustomError(format!(
"Unsupported mode '{:?}' for alert evaluation",
PARSEABLE.options.mode
Expand All @@ -92,11 +94,12 @@ pub async fn execute_alert_query(
async fn execute_local_query(
query: &str,
time_range: &TimeRange,
tenant_id: &Option<String>,
) -> Result<AlertQueryResult, AlertError> {
let session_state = QUERY_SESSION.state();
let session_state = QUERY_SESSION.get_ctx().state();

let tables = resolve_stream_names(query)?;
create_streams_for_distributed(tables.clone())
create_streams_for_distributed(tables.clone(), tenant_id)
.await
.map_err(|err| AlertError::CustomError(format!("Failed to create streams: {err}")))?;

Expand All @@ -107,7 +110,7 @@ async fn execute_local_query(
filter_tag: None,
};

let (records, _) = execute(query, false)
let (records, _) = execute(query, false, tenant_id)
.await
.map_err(|err| AlertError::CustomError(format!("Failed to execute query: {err}")))?;

Expand All @@ -125,10 +128,12 @@ async fn execute_local_query(

/// Execute alert query remotely (Prism mode)
async fn execute_remote_query(
auth_token: Option<HeaderMap>,
query: &str,
time_range: &TimeRange,
tenant_id: &Option<String>,
) -> Result<AlertQueryResult, AlertError> {
let session_state = QUERY_SESSION.state();
let session_state = QUERY_SESSION.get_ctx().state();
let raw_logical_plan = session_state.create_logical_plan(query).await?;

let query_request = Query {
Expand All @@ -141,7 +146,7 @@ async fn execute_remote_query(
filter_tags: None,
};

let (result_value, _) = send_query_request(&query_request)
let (result_value, _) = send_query_request(auth_token, &query_request, tenant_id)
.await
.map_err(|err| AlertError::CustomError(format!("Failed to send query request: {err}")))?;

Expand Down Expand Up @@ -280,19 +285,34 @@ async fn update_alert_state(
// Now perform the state update
if let Some(msg) = message {
alerts
.update_state(*alert.get_id(), AlertState::Triggered, Some(msg))
.update_state(
*alert.get_id(),
AlertState::Triggered,
Some(msg),
alert.get_tenant_id(),
)
.await
} else if alerts
.get_state(*alert.get_id())
.get_state(*alert.get_id(), alert.get_tenant_id())
.await?
.eq(&AlertState::Triggered)
{
alerts
.update_state(*alert.get_id(), AlertState::NotTriggered, Some("".into()))
.update_state(
*alert.get_id(),
AlertState::NotTriggered,
Some("".into()),
alert.get_tenant_id(),
)
.await
} else {
alerts
.update_state(*alert.get_id(), AlertState::NotTriggered, None)
.update_state(
*alert.get_id(),
AlertState::NotTriggered,
None,
alert.get_tenant_id(),
)
.await
}
}
Expand Down
Loading
Loading