Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/hotfix/src/initiator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ async fn establish_connection<Outbound: OutboundMessage>(
completion_tx: watch::Sender<bool>,
) {
loop {
if session_ref.await_active_session_time().await.is_err() {
warn!("session task terminated when checking active session time");
if session_ref.await_in_schedule().await.is_err() {
warn!("session task terminated when checking schedule");
break;
}

Expand Down
28 changes: 15 additions & 13 deletions crates/hotfix/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ pub(crate) use crate::session::session_ref::InternalSessionRef;
pub use crate::session::session_ref::InternalSessionRef;
use crate::session::session_ref::OutboundRequest;
use crate::session::state::SessionState;
use crate::session::state::{AwaitingResendTransitionOutcome, TestRequestId};
use crate::session::state::{
AwaitingLogonState, AwaitingLogoutState, AwaitingResendTransitionOutcome, TestRequestId,
};
use crate::session_schedule::{SessionPeriodComparison, SessionSchedule};
use crate::store::MessageStore;
use crate::transport::writer::WriterRef;
Expand Down Expand Up @@ -200,7 +202,7 @@ where
}
}

if let SessionState::AwaitingLogon { .. } = &mut self.state {
if let SessionState::AwaitingLogon(_) = &mut self.state {
// TODO: should this (and all inbound message processing) logic be pushed into the state?
if message_type != Logon::MSG_TYPE {
self.state.disconnect_writer().await;
Expand Down Expand Up @@ -332,11 +334,11 @@ where
}

async fn on_connect(&mut self, writer: WriterRef) -> Result<(), SessionOperationError> {
self.state = SessionState::AwaitingLogon {
self.state = SessionState::AwaitingLogon(AwaitingLogonState {
writer,
logon_sent: false,
logon_timeout: Instant::now() + Duration::from_secs(self.config.logon_timeout),
};
});
self.reset_peer_timer(None);
self.send_logon().await?;

Expand All @@ -345,23 +347,23 @@ where

async fn on_disconnect(&mut self, reason: String) {
match self.state {
SessionState::Active { .. }
| SessionState::AwaitingLogon { .. }
SessionState::Active(_)
| SessionState::AwaitingLogon(_)
| SessionState::AwaitingResend(_) => {
self.state.disconnect_writer().await;
self.state = SessionState::new_disconnected(true, &reason);
}
SessionState::Disconnected { .. } => {
SessionState::Disconnected(_) => {
warn!("disconnect message was received, but the session is already disconnected")
}
SessionState::AwaitingLogout { reconnect, .. } => {
SessionState::AwaitingLogout(AwaitingLogoutState { reconnect, .. }) => {
self.state = SessionState::new_disconnected(reconnect, &reason);
}
}
}

async fn on_logon(&mut self, message: &Message) -> Result<(), SessionOperationError> {
if let SessionState::AwaitingLogon { writer, .. } = &self.state {
if let SessionState::AwaitingLogon(AwaitingLogonState { writer, .. }) = &self.state {
match self.verify_message(message, true, true) {
Ok(_) => {
// happy logon flow, the session is now active
Expand Down Expand Up @@ -395,7 +397,7 @@ where
// if the session is already disconnected, we have nothing else to do
SessionState::Disconnected(..) => {}
// if we initiated the logout, preserve the reconnect flag
SessionState::AwaitingLogout { reconnect, .. } => {
SessionState::AwaitingLogout(AwaitingLogoutState { reconnect, .. }) => {
self.state.disconnect_writer().await;
self.state = SessionState::new_disconnected(reconnect, "logout completed");
}
Expand Down Expand Up @@ -1039,8 +1041,8 @@ where
warn!("tried to respond to ShouldReconnect query but the receiver is gone");
}
}
SessionEvent::AwaitingActiveSession(responder) => {
self.state.register_session_awaiter(responder);
SessionEvent::AwaitSchedule(responder) => {
self.state.register_schedule_awaiter(responder);
}
}
}
Expand Down Expand Up @@ -1117,7 +1119,7 @@ where
let is_active = self.schedule.is_active_at(&now);

if is_active {
self.state.notify_session_awaiter();
self.state.notify_schedule_awaiter();
match self
.schedule
.is_same_session_period(&self.store.creation_time(), &now)
Expand Down
18 changes: 9 additions & 9 deletions crates/hotfix/src/session/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@ pub enum SessionEvent {
Connected(WriterRef),
/// Ask the session whether we should attempt to reconnect.
ShouldReconnect(oneshot::Sender<bool>),
/// Ask the session to notify us when the session is active.
AwaitingActiveSession(oneshot::Sender<AwaitingActiveSessionResponse>),
/// Ask the session to notify us when the schedule indicates we should connect.
AwaitSchedule(oneshot::Sender<ScheduleResponse>),
}

/// The response sent by the session to AwaitingActiveSession messages.
/// The response sent by the session to AwaitSchedule messages.
///
/// This doesn't include an Inactive variant, as the session won't respond until
/// it's active or in a state that indicates it should just be shut down due to an
/// unrecoverable error.
/// This doesn't include an out-of-schedule variant, as the session won't respond
/// until the schedule indicates we should connect or the session is in a state that
/// indicates it should just be shut down due to an unrecoverable error.
#[derive(Debug, Clone, Copy)]
pub enum AwaitingActiveSessionResponse {
/// The session is now active and ready to connect.
Active,
pub enum ScheduleResponse {
/// The schedule indicates we should connect.
InSchedule,
/// The session should be shut down due to an unrecoverable error.
Shutdown,
}
12 changes: 6 additions & 6 deletions crates/hotfix/src/session/session_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::message::{OutboundMessage, RawFixMessage};
use crate::session::Session;
use crate::session::admin_request::AdminRequest;
use crate::session::error::{SendError, SendOutcome, SessionCreationError};
use crate::session::event::{AwaitingActiveSessionResponse, SessionEvent};
use crate::session::event::{ScheduleResponse, SessionEvent};
use crate::store::MessageStore;
use crate::transport::writer::WriterRef;
use crate::{Application, session};
Expand Down Expand Up @@ -82,15 +82,15 @@ impl<Outbound: OutboundMessage> InternalSessionRef<Outbound> {
Ok(receiver.await?)
}

pub async fn await_active_session_time(&self) -> Result<(), SessionGone> {
debug!("awaiting active session time");
let (sender, receiver) = oneshot::channel::<AwaitingActiveSessionResponse>();
pub async fn await_in_schedule(&self) -> Result<(), SessionGone> {
debug!("awaiting in-schedule time");
let (sender, receiver) = oneshot::channel::<ScheduleResponse>();
self.event_sender
.send(SessionEvent::AwaitingActiveSession(sender))
.send(SessionEvent::AwaitSchedule(sender))
.await?;
receiver.await?;

debug!("resuming connection as session is active");
debug!("resuming connection as schedule is active");
Ok(())
}
}
Expand Down
Loading
Loading