From bffa6ceaf46ba6264bf3f8fa1b6736cf85b684a4 Mon Sep 17 00:00:00 2001 From: Jyrki Gadinger Date: Fri, 24 Apr 2026 12:17:13 +0200 Subject: [PATCH] fix: avoid too many concurrent db queries when cache becomes invalid In large installations it's more likely to process multiple storage_update events at the same time. When the storage mapping cache becomes invalid each of these events would try to query the database to update the cache. Apart from causing additional load on the database this would also exhaust sqlx's internal connection pool when more event handling threads are running than there are available database connections -- in this case the `Failed to query database: pool timed out while waiting for an open connection` message is logged. This commit changes how the storage mapping cache is updated once it becomes invalid by trying to reduce the chance of multiple threads querying the database at once. Other callers should then still retrieve the old cached mapping during the time the cache is updated. Signed-off-by: Jyrki Gadinger --- src/storage_mapping.rs | 37 +++++++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/src/storage_mapping.rs b/src/storage_mapping.rs index c094e565..9eb9c725 100644 --- a/src/storage_mapping.rs +++ b/src/storage_mapping.rs @@ -13,7 +13,7 @@ use log::debug; use rand::{thread_rng, Rng}; use sqlx::any::AnyConnectOptions; use sqlx::{query_as, Any, AnyPool, FromRow}; -use std::time::Instant; +use std::{sync::RwLock, time::Instant}; use tokio::time::Duration; #[derive(Debug, Clone, FromRow)] @@ -27,6 +27,7 @@ pub struct UserStorageAccess { struct CachedAccess { access: Vec, valid_till: Instant, + updating: RwLock, } impl CachedAccess { @@ -36,11 +37,18 @@ impl CachedAccess { access, valid_till: Instant::now() + Duration::from_millis(rng.gen_range((4 * 60 * 1000)..(5 * 60 * 1000))), + updating: RwLock::new(false), } } pub fn is_valid(&self) -> bool { - self.valid_till > Instant::now() + self.valid_till > Instant::now() || self.updating.try_read().is_ok_and(|value| *value) + } + + pub fn prepare_update(&self, value: bool) { + if let Ok(mut updating) = self.updating.try_write() { + *updating = value + } } } @@ -71,14 +79,27 @@ impl StorageMapping { &self, storage: u32, ) -> Result, DatabaseError> { - if let Some(cached) = self.cache.get(&storage).filter(|cached| cached.is_valid()) { - Ok(cached) - } else { - let users = self.load_storage_mapping(storage).await?; + if let Some(cached) = self.cache.get(&storage) { + if cached.is_valid() { + return Ok(cached); + } - self.cache.insert(storage, CachedAccess::new(users)); - Ok(self.cache.get(&storage).unwrap()) + cached.prepare_update(true); + let users = self + .load_storage_mapping(storage) + .await + .inspect_err(|_| cached.prepare_update(false))?; + + drop(cached); + let cached = CachedAccess::new(users); + self.cache.insert(storage, cached); + return Ok(self.cache.get(&storage).unwrap()); } + + let users = self.load_storage_mapping(storage).await?; + + self.cache.insert(storage, CachedAccess::new(users)); + Ok(self.cache.get(&storage).unwrap()) } pub async fn get_users_for_storage_path(