diff --git a/Cargo.lock b/Cargo.lock index c93985b7..8c077769 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -119,12 +119,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "arc-swap" -version = "1.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" - [[package]] name = "async-stream" version = "0.3.6" @@ -1557,7 +1551,7 @@ dependencies = [ "nextcloud-config-parser", "nextcloud_appinfo", "once_cell", - "parse-display", + "parse-display 0.10.0", "rand 0.8.5", "redis", "reqwest", @@ -1704,7 +1698,18 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "914a1c2265c98e2446911282c6ac86d8524f495792c38c5bd884f80499c7538a" dependencies = [ - "parse-display-derive", + "parse-display-derive 0.9.1", + "regex", + "regex-syntax 0.8.5", +] + +[[package]] +name = "parse-display" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287d8d3ebdce117b8539f59411e4ed9ec226e0a4153c7f55495c6070d68e6f72" +dependencies = [ + "parse-display-derive 0.10.0", "regex", "regex-syntax 0.8.5", ] @@ -1723,6 +1728,20 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "parse-display-derive" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fc048687be30d79502dea2f623d052f3a074012c6eac41726b7ab17213616b1" +dependencies = [ + "proc-macro2", + "quote", + "regex", + "regex-syntax 0.8.5", + "structmeta", + "syn 2.0.101", +] + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -1753,7 +1772,7 @@ dependencies = [ "logos", "memchr", "miette", - "parse-display", + "parse-display 0.9.1", "serde", "thiserror 1.0.69", ] @@ -1995,11 +2014,10 @@ dependencies = [ [[package]] name = "redis" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "438a4e5f8e9aa246d6f3666d6978441bf1b37d5f417b50c4dd220be09f5fcc17" +checksum = "0bc1ea653e0b2e097db3ebb5b7f678be339620b8041f66b30a308c1d45d36a7f" dependencies = [ - "arc-swap", "bytes", "cfg-if", "combine", diff --git a/Cargo.toml b/Cargo.toml index 4f45e448..81c57d9d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" rust-version = "1.81.0" [dependencies] -redis = { version = "0.30.0", default-features = false, features = ["tokio-comp", "aio", "cluster", "cluster-async", "keep-alive", "tls-rustls", "tokio-rustls-comp", "tls-rustls-webpki-roots", "tls-rustls-insecure"] } +redis = { version = "0.31.0", default-features = false, features = ["tokio-comp", "aio", "cluster", "cluster-async", "keep-alive", "tls-rustls", "tokio-rustls-comp", "tls-rustls-webpki-roots", "tls-rustls-insecure"] } serde = { version = "1.0.217", features = ["derive"] } serde_json = "1.0.139" thiserror = "2.0.11" @@ -24,7 +24,7 @@ miette = { version = "7.4.0", features = ["fancy"] } smallvec = { version = "1.13.2", features = ["serde"] } reqwest = { version = "0.12.12", default-features = false, features = ["rustls-tls", "rustls-tls-native-roots", "json"] } warp-real-ip = "0.2.0" -parse-display = "0.9.1" +parse-display = "0.10.0" rand = { version = "0.8.5", features = ["small_rng"] } ahash = "0.8.11" flexi_logger = { version = "0.29.8", features = ["colors"] } diff --git a/src/event.rs b/src/event.rs index f077c986..2f2199fc 100644 --- a/src/event.rs +++ b/src/event.rs @@ -6,6 +6,7 @@ use crate::metrics::METRICS; use crate::{Redis, Result, UserId}; use parse_display::Display; +use redis::aio::PubSubSink; use redis::Msg; use serde::Deserialize; use serde_json::Value; @@ -153,7 +154,10 @@ impl TryFrom for Event { pub async fn subscribe( client: &Redis, -) -> Result>> { +) -> Result<( + PubSubSink, + impl Stream>, +)> { let mut pubsub = client.pubsub().await?; let channels = [ "notify_storage_update", @@ -172,8 +176,12 @@ pub async fn subscribe( pubsub.subscribe(*channel).await?; } - Ok(pubsub.into_on_message().map(|event| { - METRICS.add_event(); - Event::try_from(event) - })) + let (sink, stream) = pubsub.split(); + Ok(( + sink, + stream.map(|event| { + METRICS.add_event(); + Event::try_from(event) + }), + )) } diff --git a/src/lib.rs b/src/lib.rs index 89fc8b38..24743d7b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -418,7 +418,7 @@ pub async fn listen_loop(app: Arc, cancel: oneshot::Receiver<()>) { } pub async fn listen(app: Arc) -> Result<()> { - let mut event_stream = event::subscribe(&app.redis).await?; + let (mut pubsub_sink, mut event_stream) = event::subscribe(&app.redis).await?; let handle = move |event: Event| { // todo: any way to do this without cloning the arc every event (scoped?) @@ -428,6 +428,13 @@ pub async fn listen(app: Arc) -> Result<()> { } }; + let ping_handle = tokio::spawn(async move { + loop { + sleep(Duration::from_secs(15)).await; + let _ = pubsub_sink.ping::<()>().await; + } + }); + while let Some(event) = event_stream.next().await { match event { Ok(event) => { @@ -440,5 +447,7 @@ pub async fn listen(app: Arc) -> Result<()> { Err(e) => log::warn!("{e:#}"), } } + + ping_handle.abort(); Ok(()) }