Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 10 additions & 148 deletions crates/hotfix/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod ctx;
pub mod error;
pub(crate) mod event;
mod info;
mod outbound;
mod session_handle;
pub mod session_ref;
mod state;
Expand All @@ -15,14 +16,13 @@ use std::pin::Pin;
use tokio::select;
use tokio::sync::mpsc;
use tokio::time::{Duration, Instant, Sleep, sleep, sleep_until};
use tracing::{debug, enabled, error, info, warn};
use tracing::{debug, error, info, warn};

use crate::Application;
use crate::application::{InboundDecision, OutboundDecision};
use crate::config::SessionConfig;
use crate::message::OutboundMessage;
use crate::message::business_reject::BusinessReject;
use crate::message::generate_message;
use crate::message::heartbeat::Heartbeat;
use crate::message::logon::{Logon, ResetSeqNumConfig};
use crate::message::logout::Logout;
Expand All @@ -33,7 +33,6 @@ use crate::message::sequence_reset::SequenceReset;
use crate::message::test_request::TestRequest;
use crate::message::verification::verify_message;
use crate::message::verification_error::{CompIdType, MessageVerificationError};
use crate::message::{is_admin, prepare_message_for_resend};
use crate::session::admin_request::AdminRequest;
use crate::session::ctx::SessionCtx;
use crate::session::error::SessionCreationError;
Expand Down Expand Up @@ -526,8 +525,11 @@ where
self.ctx.store.increment_target_seq_number().await?;
}

self.resend_messages(begin_seq_number, end_seq_number, message)
.await?;
if let Some(writer) = self.state.get_writer() {
outbound::resend_messages(&mut self.ctx, writer, begin_seq_number, end_seq_number)
.await?;
self.reset_heartbeat_timer();
}

Ok(())
}
Expand Down Expand Up @@ -779,90 +781,6 @@ where
};
}

async fn resend_messages(
&mut self,
begin: u64,
end: u64,
_message: &Message,
) -> Result<(), SessionOperationError> {
info!(begin, end, "resending messages as requested");
let messages = self
.ctx
.store
.get_slice(begin as usize, end as usize)
.await?;

let no = messages.len();
debug!(number_of_messages = no, "number of messages");

let mut reset_start: Option<u64> = None;
let mut sequence_number = 0;

for msg in messages {
let mut message = self
.ctx
.message_builder
.build(msg.as_slice())
.into_message()
.ok_or_else(|| {
SessionOperationError::StoredMessageParse(format!(
"failed to build message for raw message: {msg:?}"
))
})?;
sequence_number = get_msg_seq_num(&message);
let message_type: String = message
.header()
.get::<&str>(MSG_TYPE)
.map_err(|_| SessionOperationError::MissingField("MSG_TYPE"))?
.to_string();

if is_admin(&message_type) {
if reset_start.is_none() {
reset_start = Some(sequence_number);
}
continue;
}

if let Some(begin) = reset_start {
let end = sequence_number;
Self::log_skipped_admin_messages(begin, end);
self.send_sequence_reset(begin, end).await?;
reset_start = None;
}

if let Err(e) = prepare_message_for_resend(&mut message) {
error!(
error = e,
"failed to prepare message for resend, sending original"
);
}
self.send_raw(&message_type, message.encode(&self.ctx.message_config)?)
.await;

if enabled!(tracing::Level::DEBUG)
&& let Ok(m) = String::from_utf8(msg.clone())
{
debug!(sequence_number, message = m, "resent message");
}
}

if let Some(begin) = reset_start {
// the final reset if needed
let end = sequence_number;
Self::log_skipped_admin_messages(begin, end);
self.send_sequence_reset(begin, end).await?;
}

Ok(())
}

fn log_skipped_admin_messages(begin: u64, end: u64) {
info!(
begin,
end, "skipped admin message(s) during resend, requesting reset for these"
);
}

fn reset_heartbeat_timer(&mut self) {
self.state
.reset_heartbeat_timer(self.ctx.config.heartbeat_interval);
Expand Down Expand Up @@ -899,67 +817,11 @@ where
&mut self,
message: impl OutboundMessage,
) -> Result<u64, InternalSendError> {
let seq_num = self.ctx.store.next_sender_seq_number();
let msg_type = message.message_type().to_string();
let msg = generate_message(
&self.ctx.config.begin_string,
&self.ctx.config.sender_comp_id,
&self.ctx.config.target_comp_id,
seq_num,
message,
)
.map_err(|e| {
InternalSendError::Persist(crate::store::StoreError::PersistMessage {
sequence_number: seq_num,
source: e.into(),
})
})?;

self.ctx
.store
.increment_sender_seq_number()
.await
.map_err(InternalSendError::SequenceNumber)?;

self.ctx
.store
.add(seq_num, &msg)
.await
.map_err(InternalSendError::Persist)?;

self.send_raw(&msg_type, msg).await;

Ok(seq_num)
}

async fn send_raw(&mut self, message_type: &str, data: Vec<u8>) {
self.state
.send_message(message_type, RawFixMessage::new(data))
.await;
let prepared = self.ctx.prepare_message(message).await?;
self.state.send_message(&msg_type, prepared.raw).await;
self.reset_heartbeat_timer();
}

async fn send_sequence_reset(
&mut self,
begin: u64,
end: u64,
) -> Result<(), SessionOperationError> {
let sequence_reset = SequenceReset {
gap_fill: true,
new_seq_no: end,
};
let raw_message = generate_message(
&self.ctx.config.begin_string,
&self.ctx.config.sender_comp_id,
&self.ctx.config.target_comp_id,
begin,
sequence_reset,
)?;

self.send_raw(SequenceReset::MSG_TYPE, raw_message).await;
debug!(begin, end, "sent reset sequence");

Ok(())
Ok(prepared.seq_num)
}

async fn send_resend_request(
Expand Down
48 changes: 48 additions & 0 deletions crates/hotfix/src/session/ctx.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use hotfix_message::MessageBuilder;
use hotfix_message::message::Config as MessageConfig;
use hotfix_store::MessageStore;

use crate::config::SessionConfig;
use crate::message::OutboundMessage;
use crate::message::generate_message;
use crate::message::parser::RawFixMessage;
use crate::session::error::InternalSendError;
use crate::store::StoreError;

pub(crate) struct SessionCtx<A, S> {
pub config: SessionConfig,
Expand All @@ -10,3 +16,45 @@ pub(crate) struct SessionCtx<A, S> {
pub message_builder: MessageBuilder,
pub message_config: MessageConfig,
}

pub(crate) struct PreparedMessage {
pub seq_num: u64,
pub raw: RawFixMessage,
}

impl<A, S: MessageStore> SessionCtx<A, S> {
pub async fn prepare_message(
&mut self,
message: impl OutboundMessage,
) -> Result<PreparedMessage, InternalSendError> {
let seq_num = self.store.next_sender_seq_number();
let msg = generate_message(
&self.config.begin_string,
&self.config.sender_comp_id,
&self.config.target_comp_id,
seq_num,
message,
)
.map_err(|e| {
InternalSendError::Persist(StoreError::PersistMessage {
sequence_number: seq_num,
source: e.into(),
})
})?;

self.store
.increment_sender_seq_number()
.await
.map_err(InternalSendError::SequenceNumber)?;

self.store
.add(seq_num, &msg)
.await
.map_err(InternalSendError::Persist)?;

Ok(PreparedMessage {
seq_num,
raw: RawFixMessage::new(msg),
})
}
}
Loading
Loading