diff --git a/concurrency/src/threads/stream.rs b/concurrency/src/threads/stream.rs index a4fd749..f96ddae 100644 --- a/concurrency/src/threads/stream.rs +++ b/concurrency/src/threads/stream.rs @@ -1,17 +1,36 @@ -use crate::threads::{GenServer, GenServerHandle}; +use std::thread::JoinHandle; -use futures::Stream; +use crate::threads::{GenServer, GenServerHandle}; /// Spawns a listener that listens to a stream and sends messages to a GenServer. /// /// Items sent through the stream are required to be wrapped in a Result type. -pub fn spawn_listener(_handle: GenServerHandle, _message_builder: F, _stream: S) +pub fn spawn_listener(mut handle: GenServerHandle, stream: I) -> JoinHandle<()> where - T: GenServer + 'static, - F: Fn(I) -> T::CastMsg + Send + 'static, - I: Send + 'static, - E: std::fmt::Debug + Send + 'static, - S: Unpin + Send + Stream> + 'static, + T: GenServer, + I: IntoIterator, + ::IntoIter: std::marker::Send + 'static, { - unimplemented!("Unsupported function in threads mode") + let mut iter = stream.into_iter(); + let mut cancelation_token = handle.cancellation_token(); + let join_handle = spawned_rt::threads::spawn(move || loop { + match iter.next() { + Some(msg) => match handle.cast(msg) { + Ok(_) => tracing::trace!("Message sent successfully"), + Err(e) => { + tracing::error!("Failed to send message: {e:?}"); + break; + } + }, + None => { + tracing::trace!("Stream finished"); + break; + } + } + if cancelation_token.is_cancelled() { + tracing::trace!("GenServer stopped"); + break; + } + }); + join_handle }