diff --git a/Cargo.toml b/Cargo.toml index 62741ca..fe76f54 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,9 +12,10 @@ rust-version = "1.68" # 1.67 contains an UB we would tri [dependencies] bytes = "1" crc32fast = "1.2" -serde = { version = "1", features = ["derive"] } thiserror = "2" tracing = "0.1.37" +arc-swap = "1.7" +rclite = "0.2" [dev-dependencies] criterion = "0.5" @@ -26,3 +27,7 @@ tempfile = "3" [[bench]] name = "bench" harness = false + +# Uncomment for profiling +# [profile.bench] +# debug=true diff --git a/TODO b/TODO new file mode 100644 index 0000000..39cd85b --- /dev/null +++ b/TODO @@ -0,0 +1,12 @@ +we do not need to record position anymore. Keeping the last record is sufficient. +Of course it would be nicer to store that in a separate way to avoid fragmentation. + +Record refcount + +No GC however means on restart we will often have to read way more than what is actually required. + +We need a way to stop reading the past. + +Truncating a non existing queue. + +just truncate to record position. diff --git a/src/block_read_write.rs b/src/block_read_write.rs index c259283..c4c3f5b 100644 --- a/src/block_read_write.rs +++ b/src/block_read_write.rs @@ -4,14 +4,22 @@ use crate::PersistAction; pub const BLOCK_NUM_BYTES: usize = 32_768; +/// A block read is supposed to be positioned on a block at its initialization. +/// +/// In other words, it is not necessary to call `next_block` a first time +/// before calling `block()`. pub trait BlockRead { + type Session; + + fn start_session(&self) -> Self::Session; + /// Loads the next block. /// If `Ok(true)` is returned, the new block is available through /// `.block()`. /// /// If `Ok(false)` is returned, the end of the `BlockReader` /// has been reached and the content of `block()` could be anything. - fn next_block(&mut self) -> io::Result; + fn next_block(&mut self, read_session: &mut Self::Session) -> io::Result; /// A `BlockReader` is always position on a specific block. /// @@ -25,8 +33,17 @@ pub trait BlockRead { } pub trait BlockWrite { + type Session; + + fn start_write_session(&mut self) -> io::Result; + + fn make_room(&mut self, num_bytes: u64) -> io::Result<()>; + /// Must panic if buf is larger than `num_bytes_remaining_in_block`. - fn write(&mut self, buf: &[u8]) -> io::Result<()>; + /// Not that this trait does not have next_block() method. + /// + /// We automatically go to the next block after the current block has been entirely written. + fn write(&mut self, buf: &[u8], write_session: &mut Self::Session) -> io::Result<()>; /// Persist the data following the `persist_action`. fn persist(&mut self, persist_action: PersistAction) -> io::Result<()>; /// Number of bytes that can be added in the block. @@ -49,7 +66,11 @@ impl<'a> From<&'a [u8]> for ArrayReader<'a> { } impl BlockRead for ArrayReader<'_> { - fn next_block(&mut self) -> io::Result { + type Session = (); + + fn start_session(&self) -> Self::Session {} + + fn next_block(&mut self, _session: &mut Self::Session) -> io::Result { if self.data.len() < BLOCK_NUM_BYTES { return Ok(false); } @@ -81,7 +102,14 @@ impl From for Vec { } impl BlockWrite for VecBlockWriter { - fn write(&mut self, buf: &[u8]) -> io::Result<()> { + type Session = (); + fn make_room(&mut self, num_bytes: u64) -> io::Result<()> { + // TODO consider just doubling for performance. + let new_len = ceil_to_block(self.cursor + num_bytes as usize); + self.buffer.resize(new_len, 0u8); + Ok(()) + } + fn write(&mut self, buf: &[u8], _session: &mut Self::Session) -> io::Result<()> { assert!(buf.len() <= self.num_bytes_remaining_in_block()); if self.cursor + buf.len() > self.buffer.len() { let new_len = ceil_to_block((self.cursor + buf.len()) * 2 + 1); @@ -99,4 +127,8 @@ impl BlockWrite for VecBlockWriter { fn num_bytes_remaining_in_block(&self) -> usize { BLOCK_NUM_BYTES - (self.cursor % BLOCK_NUM_BYTES) } + + fn start_write_session(&mut self) -> io::Result { + Ok(()) + } } diff --git a/src/error.rs b/src/error.rs index 4e53926..5c18bb7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -91,3 +91,13 @@ pub enum ReadRecordError { #[error("Corruption")] Corruption, } + +#[derive(Error, Debug)] +pub enum HeaderError { + #[error("invalid magic number: found {magic_number}")] + InvalidMagicNumber { magic_number: u32 }, + #[error("invalid checksum")] + InvalidChecksum, + #[error("unsupported version: {version}")] + UnsupportedVersion { version: u32 }, +} diff --git a/src/frame/reader.rs b/src/frame/reader.rs index 7fd7c12..0ffea6b 100644 --- a/src/frame/reader.rs +++ b/src/frame/reader.rs @@ -2,15 +2,14 @@ use std::io; use thiserror::Error; -use crate::frame::{FrameType, FrameWriter, Header, HEADER_LEN}; -use crate::rolling::{RollingReader, RollingWriter}; +use crate::frame::{FrameType, Header, HEADER_LEN}; use crate::{BlockRead, BLOCK_NUM_BYTES}; pub struct FrameReader { - reader: R, + pub(crate) reader: R, /// In block cursor - cursor: usize, + pub(crate) cursor: usize, // The current block is corrupted. block_corrupted: bool, @@ -35,8 +34,8 @@ impl FrameReader { } } - pub fn read(&self) -> &R { - &self.reader + pub fn start_session(&self) -> R::Session { + self.reader.start_session() } // Returns the number of bytes remaining into @@ -47,13 +46,16 @@ impl FrameReader { crate::BLOCK_NUM_BYTES - self.cursor } - fn go_to_next_block_if_necessary(&mut self) -> Result<(), ReadFrameError> { + fn go_to_next_block_if_necessary( + &mut self, + session: &mut R::Session, + ) -> Result<(), ReadFrameError> { let num_bytes_to_end_of_block = self.num_bytes_to_end_of_block(); let need_to_skip_block = self.block_corrupted || num_bytes_to_end_of_block < HEADER_LEN; if !need_to_skip_block { return Ok(()); } - if !self.reader.next_block()? { + if !self.reader.next_block(session)? { return Err(ReadFrameError::NotAvailable); } @@ -79,8 +81,11 @@ impl FrameReader { } // Reads the next frame. - pub fn read_frame(&mut self) -> Result<(FrameType, &[u8]), ReadFrameError> { - self.go_to_next_block_if_necessary()?; + pub fn read_frame( + &mut self, + session: &mut R::Session, + ) -> Result<(FrameType, &[u8]), ReadFrameError> { + self.go_to_next_block_if_necessary(session)?; let header = self.get_frame_header()?; self.cursor += HEADER_LEN; if self.cursor + header.len() > BLOCK_NUM_BYTES { @@ -103,11 +108,3 @@ impl FrameReader { Ok((header.frame_type(), frame_payload)) } } - -impl FrameReader { - pub fn into_writer(self) -> io::Result> { - let mut rolling_writer: RollingWriter = self.reader.into_writer()?; - rolling_writer.forward(self.cursor)?; - Ok(FrameWriter::create(rolling_writer)) - } -} diff --git a/src/frame/tests.rs b/src/frame/tests.rs index 1ea8a7b..c66ff27 100644 --- a/src/frame/tests.rs +++ b/src/frame/tests.rs @@ -3,39 +3,42 @@ use std::io; use crate::block_read_write::{ArrayReader, VecBlockWriter}; use crate::frame::header::{FrameType, HEADER_LEN}; use crate::frame::{FrameReader, FrameWriter, ReadFrameError}; -use crate::{PersistAction, BLOCK_NUM_BYTES}; +use crate::{BlockWrite as _, PersistAction, BLOCK_NUM_BYTES}; #[test] fn test_frame_simple() { let block_writer = { - let wrt: VecBlockWriter = VecBlockWriter::default(); + let mut wrt: VecBlockWriter = VecBlockWriter::default(); + let mut session = wrt.start_write_session().unwrap(); let mut frame_writer = FrameWriter::create(wrt); + frame_writer - .write_frame(FrameType::First, &b"abc"[..]) + .write_frame(FrameType::First, &b"abc"[..], &mut session) .unwrap(); frame_writer - .write_frame(FrameType::Middle, &b"de"[..]) + .write_frame(FrameType::Middle, &b"de"[..], &mut session) .unwrap(); frame_writer - .write_frame(FrameType::Last, &b"fgh"[..]) + .write_frame(FrameType::Last, &b"fgh"[..], &mut session) .unwrap(); frame_writer.persist(PersistAction::Flush).unwrap(); frame_writer.into_writer() }; let buffer: Vec = block_writer.into(); let mut frame_reader = FrameReader::open(ArrayReader::from(&buffer[..])); - let read_frame_res = frame_reader.read_frame(); + let mut session = frame_reader.start_session(); + let read_frame_res = frame_reader.read_frame(&mut session); assert_eq!(read_frame_res.unwrap(), (FrameType::First, &b"abc"[..])); assert_eq!( - frame_reader.read_frame().unwrap(), + frame_reader.read_frame(&mut session).unwrap(), (FrameType::Middle, &b"de"[..]) ); assert_eq!( - frame_reader.read_frame().unwrap(), + frame_reader.read_frame(&mut session).unwrap(), (FrameType::Last, &b"fgh"[..]) ); assert!(matches!( - frame_reader.read_frame().unwrap_err(), + frame_reader.read_frame(&mut session).unwrap_err(), ReadFrameError::NotAvailable )); } @@ -44,20 +47,22 @@ fn test_frame_simple() { fn test_frame_corruption_in_payload() -> io::Result<()> { let mut buf: Vec = { let mut frame_writer = FrameWriter::create(VecBlockWriter::default()); - frame_writer.write_frame(FrameType::First, &b"abc"[..])?; + let mut session = frame_writer.start_session()?; + frame_writer.write_frame(FrameType::First, &b"abc"[..], &mut session)?; frame_writer.persist(PersistAction::Flush)?; - frame_writer.write_frame(FrameType::Middle, &b"de"[..])?; + frame_writer.write_frame(FrameType::Middle, &b"de"[..], &mut session)?; frame_writer.persist(PersistAction::Flush)?; frame_writer.into_writer().into() }; buf[8] = 0u8; let mut frame_reader = FrameReader::open(ArrayReader::from(&buf[..])); + let mut session = frame_reader.start_session(); assert!(matches!( - frame_reader.read_frame(), + frame_reader.read_frame(&mut session), Err(ReadFrameError::Corruption) )); assert!(matches!( - frame_reader.read_frame(), + frame_reader.read_frame(&mut session), Ok((FrameType::Middle, b"de")) )); Ok(()) @@ -65,8 +70,11 @@ fn test_frame_corruption_in_payload() -> io::Result<()> { fn repeat_empty_frame_util(repeat: usize) -> Vec { let mut frame_writer = FrameWriter::create(VecBlockWriter::default()); + let mut session = frame_writer.start_session().unwrap(); for _ in 0..repeat { - frame_writer.write_frame(FrameType::Full, &b""[..]).unwrap(); + frame_writer + .write_frame(FrameType::Full, &b""[..], &mut session) + .unwrap(); } frame_writer.persist(PersistAction::Flush).unwrap(); frame_writer.into_writer().into() @@ -77,12 +85,13 @@ fn test_simple_multiple_blocks() -> io::Result<()> { let num_frames = 1 + BLOCK_NUM_BYTES / HEADER_LEN; let buffer = repeat_empty_frame_util(num_frames); let mut frame_reader = FrameReader::open(ArrayReader::from(&buffer[..])); + let mut session = frame_reader.start_session(); for _ in 0..num_frames { - let read_frame_res = frame_reader.read_frame(); + let read_frame_res = frame_reader.read_frame(&mut session); assert!(matches!(read_frame_res, Ok((FrameType::Full, &[])))); } assert!(matches!( - frame_reader.read_frame(), + frame_reader.read_frame(&mut &mut &mut &mut &mut &mut &mut &mut session), Err(ReadFrameError::NotAvailable) )); Ok(()) @@ -96,20 +105,21 @@ fn test_multiple_blocks_corruption_on_length() -> io::Result<()> { let mut buffer = repeat_empty_frame_util(num_frames); buffer[2000 * HEADER_LEN + 5] = 255u8; let mut frame_reader = FrameReader::open(ArrayReader::from(&buffer[..])); + let mut session = frame_reader.start_session(); for _ in 0..2000 { - let read_frame_res = frame_reader.read_frame(); + let read_frame_res = frame_reader.read_frame(&mut session); assert!(matches!(read_frame_res, Ok((FrameType::Full, &[])))); } assert!(matches!( - frame_reader.read_frame(), + frame_reader.read_frame(&mut session), Err(ReadFrameError::Corruption) )); assert!(matches!( - frame_reader.read_frame(), + frame_reader.read_frame(&mut session), Ok((FrameType::Full, &[])) )); assert!(matches!( - frame_reader.read_frame(), + frame_reader.read_frame(&mut session), Err(ReadFrameError::NotAvailable) )); Ok(()) diff --git a/src/frame/writer.rs b/src/frame/writer.rs index 69d23c0..a070a78 100644 --- a/src/frame/writer.rs +++ b/src/frame/writer.rs @@ -1,11 +1,10 @@ use std::io; use crate::frame::{FrameType, Header, HEADER_LEN}; -use crate::rolling::{Directory, RollingWriter}; use crate::{BlockWrite, PersistAction, BLOCK_NUM_BYTES}; pub struct FrameWriter { - wrt: W, + pub(crate) wrt: W, // temporary buffer, not storing anything in particular after any function returns buffer: Box<[u8; BLOCK_NUM_BYTES]>, } @@ -18,21 +17,40 @@ impl FrameWriter { } } + pub fn start_session(&mut self) -> io::Result { + self.wrt.start_write_session() + } + + pub fn make_room(&mut self, num_bytes: u64) -> io::Result<()> { + // Framing adds some overhead. We can however compute an upperbound of the amount of room + // that will be needed. The worst case scenario is if we start at the very end of a + // block and the first frame is empty and we end up just writing a header. + const MAX_EFFECTIVE_BLOCK_BYTES: u64 = (BLOCK_NUM_BYTES - HEADER_LEN) as u64; + let num_blocks_upperbound = 1 + num_bytes.div_ceil(MAX_EFFECTIVE_BLOCK_BYTES); + let room_needed_upperbound: u64 = HEADER_LEN as u64 * num_blocks_upperbound + num_bytes; + self.wrt.make_room(room_needed_upperbound) + } + /// Writes a frame. The payload has to be lower than the /// remaining space in the frame as defined /// by `max_writable_frame_length`. - pub fn write_frame(&mut self, frame_type: FrameType, payload: &[u8]) -> io::Result<()> { + pub fn write_frame( + &mut self, + frame_type: FrameType, + payload: &[u8], + session: &mut W::Session, + ) -> io::Result<()> { let num_bytes_remaining_in_block = self.wrt.num_bytes_remaining_in_block(); if num_bytes_remaining_in_block < HEADER_LEN { let zero_bytes = [0u8; HEADER_LEN]; self.wrt - .write(&zero_bytes[..num_bytes_remaining_in_block])?; + .write(&zero_bytes[..num_bytes_remaining_in_block], session)?; } let record_len = HEADER_LEN + payload.len(); let (buffer_header, buffer_record) = self.buffer[..record_len].split_at_mut(HEADER_LEN); buffer_record.copy_from_slice(payload); Header::for_payload(frame_type, payload).serialize(buffer_header); - self.wrt.write(&self.buffer[..record_len])?; + self.wrt.write(&self.buffer[..record_len], session)?; Ok(()) } @@ -65,9 +83,3 @@ impl FrameWriter { self.wrt } } - -impl FrameWriter { - pub fn directory(&mut self) -> &mut Directory { - &mut self.wrt.directory - } -} diff --git a/src/lib.rs b/src/lib.rs index 41ce3f4..deb578a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,6 @@ use std::borrow::Cow; +use std::fs::File; +use std::io::{self, BufWriter, Cursor}; mod block_read_write; @@ -6,11 +8,12 @@ pub use block_read_write::{BlockRead, BlockWrite, BLOCK_NUM_BYTES}; pub mod error; mod frame; mod mem; + mod multi_record_log; +mod page_directory; mod persist_policy; mod record; mod recordlog; -mod rolling; pub use mem::{QueueSummary, QueuesSummary}; pub use multi_record_log::MultiRecordLog; @@ -39,13 +42,16 @@ pub struct ResourceUsage { pub memory_used_bytes: usize, /// Capacity allocated, a part of which may be unused right now pub memory_allocated_bytes: usize, - /// Disk size used - pub disk_used_bytes: usize, + pub num_pages: u32, + pub num_used_pages: u32, } #[cfg(test)] mod tests; +#[cfg(test)] +mod mockfile; + #[cfg(test)] mod proptests; @@ -65,3 +71,45 @@ impl<'a> Serializable<'a> for &'a str { std::str::from_utf8(buffer).ok() } } + +pub trait FileLikeWrite: io::Write + io::Seek { + fn fsyncdata(&mut self) -> io::Result<()>; + fn set_len(&mut self, num_bytes: u64) -> io::Result<()>; +} + +pub trait FileLike: io::Read + FileLikeWrite {} + +impl FileLikeWrite for BufWriter { + fn fsyncdata(&mut self) -> io::Result<()> { + self.get_mut().fsyncdata() + } + + fn set_len(&mut self, num_bytes: u64) -> io::Result<()> { + self.get_mut().set_len(num_bytes) + } +} + +impl FileLikeWrite for File { + fn fsyncdata(&mut self) -> io::Result<()> { + self.sync_data() + } + + fn set_len(&mut self, num_bytes: u64) -> io::Result<()> { + File::set_len(self, num_bytes) + } +} + +impl FileLike for File {} + +impl FileLikeWrite for Cursor> { + fn fsyncdata(&mut self) -> io::Result<()> { + Ok(()) + } + + fn set_len(&mut self, num_bytes: u64) -> io::Result<()> { + self.get_mut().resize(num_bytes as usize, 0u8); + Ok(()) + } +} + +impl FileLike for Cursor> {} diff --git a/src/mem/queue.rs b/src/mem/queue.rs index c9f1e66..705dcdf 100644 --- a/src/mem/queue.rs +++ b/src/mem/queue.rs @@ -2,41 +2,35 @@ use std::ops::{Bound, RangeBounds, RangeToInclusive}; use super::rolling_buffer::RollingBuffer; use crate::error::AppendError; -use crate::mem::QueueSummary; -use crate::rolling::FileNumber; +use crate::page_directory::PageRangeRef; use crate::Record; #[derive(Clone)] -struct RecordMeta { +struct RecordMeta { start_offset: usize, // in a vec of RecordMeta, this field should be set only on the last record // which relate to that File. - file_number: Option, + ref_count_handle: Option, position: u64, } -#[derive(Default)] -pub(crate) struct MemQueue { +pub(crate) struct MemQueue { // Concatenated records concatenated_records: RollingBuffer, start_position: u64, - record_metas: Vec, + record_metas: Vec>, + // We make sure to keep the last truncate record because it helps us keeping track of the queue + // position even when the queue is empty. + record_position_ref_count: H, } -impl MemQueue { - pub fn with_next_position(next_position: u64) -> Self { +impl MemQueue { + pub fn with_next_position(next_position: u64, record_position_ref_count: H) -> Self { MemQueue { concatenated_records: RollingBuffer::new(), start_position: next_position, record_metas: Vec::new(), - } - } - - pub fn summary(&self) -> QueueSummary { - QueueSummary { - start: self.start_position(), - end: self.last_position(), - file_number: self.first_file_number(), + record_position_ref_count: record_position_ref_count, } } @@ -44,19 +38,6 @@ impl MemQueue { self.record_metas.is_empty() } - pub(crate) fn first_file_number(&self) -> Option { - let file_number: &FileNumber = self - .record_metas - .iter() - .filter_map(|record_meta| record_meta.file_number.as_ref()) - .next()?; - Some(file_number.file_number()) - } - - pub(crate) fn start_position(&self) -> u64 { - self.start_position - } - /// Returns the position of the last record appended to the queue. pub fn last_position(&self) -> Option { self.next_position().checked_sub(1) @@ -84,7 +65,7 @@ impl MemQueue { /// AppendError if the record is strangely in the past or is too much in the future. pub fn append_record( &mut self, - file_number: &FileNumber, + ref_count_handle: &H, target_position: u64, payload: &[u8], ) -> Result<(), AppendError> { @@ -97,19 +78,19 @@ impl MemQueue { self.start_position = target_position; } - let file_number = if let Some(record_meta) = self.record_metas.last_mut() { - if record_meta.file_number.as_ref() == Some(file_number) { - record_meta.file_number.take().unwrap() + let ref_count_handle = if let Some(record_meta) = self.record_metas.last_mut() { + if record_meta.ref_count_handle.as_ref() == Some(ref_count_handle) { + record_meta.ref_count_handle.take().unwrap() } else { - file_number.clone() + ref_count_handle.clone() } } else { - file_number.clone() + ref_count_handle.clone() }; let record_meta = RecordMeta { start_offset: self.concatenated_records.len(), - file_number: Some(file_number), + ref_count_handle: Some(ref_count_handle), position: target_position, }; self.record_metas.push(record_meta); @@ -164,11 +145,18 @@ impl MemQueue { /// /// If truncating to a future position, make the queue go forward to that position. /// Return the number of record removed. - pub fn truncate_head(&mut self, truncate_range: RangeToInclusive) -> usize { + pub fn truncate_head( + &mut self, + truncate_range: RangeToInclusive, + ref_count_handle: H, + ) -> usize { let truncate_up_to_pos = truncate_range.end; if self.start_position > truncate_up_to_pos { return 0; } + + self.record_position_ref_count = ref_count_handle; + if truncate_up_to_pos + 1 >= self.next_position() { self.start_position = truncate_up_to_pos + 1; self.concatenated_records.clear(); @@ -193,11 +181,11 @@ impl MemQueue { pub fn size(&self) -> usize { self.concatenated_records.len() - + self.record_metas.len() * std::mem::size_of::() + + self.record_metas.len() * std::mem::size_of::>() } pub fn capacity(&self) -> usize { self.concatenated_records.capacity() - + self.record_metas.capacity() * std::mem::size_of::() + + self.record_metas.capacity() * std::mem::size_of::>() } } diff --git a/src/mem/queues.rs b/src/mem/queues.rs index 259fb22..3037c85 100644 --- a/src/mem/queues.rs +++ b/src/mem/queues.rs @@ -4,33 +4,38 @@ use std::ops::{RangeBounds, RangeToInclusive}; use tracing::{info, warn}; use crate::error::{AlreadyExists, AppendError, MissingQueue}; -use crate::mem::{MemQueue, QueuesSummary}; -use crate::rolling::FileNumber; +use crate::mem::MemQueue; +use crate::page_directory::PageRangeRef; use crate::Record; -#[derive(Default)] -pub(crate) struct MemQueues { - queues: HashMap, +pub(crate) struct MemQueues { + queues: HashMap>, } -impl MemQueues { + +impl Default for MemQueues { + fn default() -> MemQueues { + MemQueues { + queues: Default::default(), + } + } +} + +impl MemQueues { /// The file number argument is here unused. Its point is just to make sure we /// flushed the file before updating the in memory queue. - pub fn create_queue(&mut self, queue: &str) -> Result<(), AlreadyExists> { - if self.queues.contains_key(queue) { + pub fn create_queue( + &mut self, + queue_id: &str, + ref_count_handle: H, + ) -> Result<(), AlreadyExists> { + if self.queues.contains_key(queue_id) { return Err(AlreadyExists); } - self.queues.insert(queue.to_string(), MemQueue::default()); + let mem_queue = MemQueue::with_next_position(0u64, ref_count_handle); + self.queues.insert(queue_id.to_string(), mem_queue); Ok(()) } - pub fn summary(&self) -> QueuesSummary { - let mut summary = QueuesSummary::default(); - for (queue_name, queue) in &self.queues { - summary.queues.insert(queue_name.clone(), queue.summary()); - } - summary - } - pub fn delete_queue(&mut self, queue: &str) -> Result<(), MissingQueue> { info!(queue = queue, "deleting queue"); if self.queues.remove(queue).is_none() { @@ -41,7 +46,8 @@ impl MemQueues { } /// Returns all sub-queues which are currently empty. - pub fn empty_queues(&mut self) -> impl Iterator + '_ { + #[cfg(test)] + pub fn empty_queues(&mut self) -> impl Iterator)> + '_ { self.queues.iter_mut().filter_map(|(queue, mem_queue)| { if mem_queue.is_empty() { Some((queue.as_str(), mem_queue)) @@ -66,7 +72,7 @@ impl MemQueues { } } - pub(crate) fn get_queue(&self, queue: &str) -> Result<&MemQueue, MissingQueue> { + pub(crate) fn get_queue(&self, queue: &str) -> Result<&MemQueue, MissingQueue> { // We do not rely on `entry` in order to avoid // the allocation. self.queues @@ -74,7 +80,7 @@ impl MemQueues { .ok_or_else(|| MissingQueue(queue.to_string())) } - pub(crate) fn get_queue_mut(&mut self, queue: &str) -> Result<&mut MemQueue, MissingQueue> { + pub(crate) fn get_queue_mut(&mut self, queue: &str) -> Result<&mut MemQueue, MissingQueue> { // We do not rely on `entry` in order to avoid // the allocation. self.queues @@ -85,12 +91,12 @@ impl MemQueues { pub fn append_record( &mut self, queue: &str, - file_number: &FileNumber, + ref_count_handle: &H, target_position: u64, payload: &[u8], ) -> Result<(), AppendError> { self.get_queue_mut(queue)? - .append_record(file_number, target_position, payload) + .append_record(ref_count_handle, target_position, payload) } pub fn contains_queue(&self, queue: &str) -> bool { @@ -107,7 +113,7 @@ impl MemQueues { /// match, truncate it and make it go forward to the requested position. /// /// This operation is meant only to rebuild the in memory queue from its on-disk state. - pub fn ack_position(&mut self, queue_name: &str, next_position: u64) { + pub fn ack_position(&mut self, queue_name: &str, next_position: u64, ref_count_handle: &H) { if let Some(queue) = self.queues.get(queue_name) { // It is possible for `ack_position` to be called when a queue already exists. // @@ -123,14 +129,14 @@ impl MemQueues { self.queues.remove(queue_name); self.queues.insert( queue_name.to_string(), - MemQueue::with_next_position(next_position), + MemQueue::with_next_position(next_position, ref_count_handle.clone()), ); } } else { // The queue does not exist! Let's create it and set the right `next_position`. self.queues.insert( queue_name.to_string(), - MemQueue::with_next_position(next_position), + MemQueue::with_next_position(next_position, ref_count_handle.clone()), ); } } @@ -154,9 +160,14 @@ impl MemQueues { /// /// If there are no records `<= position`, the method will /// not do anything. - pub fn truncate(&mut self, queue: &str, position: RangeToInclusive) -> Option { + pub fn truncate( + &mut self, + queue: &str, + position: RangeToInclusive, + ref_count_handle: &H, + ) -> Option { if let Ok(queue) = self.get_queue_mut(queue) { - Some(queue.truncate_head(position)) + Some(queue.truncate_head(position, ref_count_handle.clone())) } else { None } diff --git a/src/mem/summary.rs b/src/mem/summary.rs index 71f62b2..2c808bd 100644 --- a/src/mem/summary.rs +++ b/src/mem/summary.rs @@ -1,15 +1,13 @@ use std::collections::BTreeMap; -use serde::Serialize; - -#[derive(Default, Serialize, Debug)] +#[derive(Default, Debug)] pub struct QueueSummary { pub start: u64, pub end: Option, pub file_number: Option, } -#[derive(Default, Serialize)] +#[derive(Default)] pub struct QueuesSummary { pub queues: BTreeMap, } diff --git a/src/mem/tests.rs b/src/mem/tests.rs index 7948657..c0edc59 100644 --- a/src/mem/tests.rs +++ b/src/mem/tests.rs @@ -1,14 +1,15 @@ +use std::sync::Arc; + use super::*; use crate::error::{AlreadyExists, AppendError}; -use crate::rolling::FileNumber; use crate::Record; #[test] fn test_mem_queues_already_exists() { let mut mem_queues = MemQueues::default(); - mem_queues.create_queue("droopy").unwrap(); + mem_queues.create_queue("droopy", ()).unwrap(); assert!(matches!( - mem_queues.create_queue("droopy"), + mem_queues.create_queue("droopy", ()), Err(AlreadyExists) )); } @@ -16,33 +17,23 @@ fn test_mem_queues_already_exists() { #[test] fn test_mem_queues() { let mut mem_queues = MemQueues::default(); - mem_queues.create_queue("droopy").unwrap(); - mem_queues.create_queue("fable").unwrap(); + mem_queues.create_queue("droopy", ()).unwrap(); + mem_queues.create_queue("fable", ()).unwrap(); { - assert!(mem_queues - .append_record("droopy", &FileNumber::for_test(1), 0, b"hello") - .is_ok()); - assert!(mem_queues - .append_record("droopy", &FileNumber::for_test(1), 1, b"happy") - .is_ok()); + assert!(mem_queues.append_record("droopy", &(), 0, b"hello").is_ok()); + assert!(mem_queues.append_record("droopy", &(), 1, b"happy").is_ok()); } { + assert!(mem_queues.append_record("fable", &(), 0, b"maitre").is_ok()); assert!(mem_queues - .append_record("fable", &FileNumber::for_test(1), 0, b"maitre") - .is_ok()); - assert!(mem_queues - .append_record("fable", &FileNumber::for_test(1), 1, b"corbeau") + .append_record("fable", &(), 1, b"corbeau") .is_ok()); } { - assert!(mem_queues - .append_record("droopy", &FileNumber::for_test(1), 2, b"tax") - .is_ok()); - assert!(mem_queues - .append_record("droopy", &FileNumber::for_test(1), 3, b"payer") - .is_ok()); + assert!(mem_queues.append_record("droopy", &(), 2, b"tax").is_ok()); + assert!(mem_queues.append_record("droopy", &(), 3, b"payer").is_ok()); assert_eq!( mem_queues.range("droopy", 0..).unwrap().next(), Some(Record::new(0, b"hello")) @@ -64,28 +55,18 @@ fn test_mem_queues() { #[test] fn test_mem_queues_truncate() { let mut mem_queues = MemQueues::default(); - mem_queues.create_queue("droopy").unwrap(); + mem_queues.create_queue("droopy", ()).unwrap(); { - assert!(mem_queues - .append_record("droopy", &1.into(), 0, b"hello") - .is_ok()); - assert!(mem_queues - .append_record("droopy", &1.into(), 1, b"happy") - .is_ok()); - assert!(mem_queues - .append_record("droopy", &1.into(), 2, b"tax") - .is_ok()); - assert!(mem_queues - .append_record("droopy", &1.into(), 3, b"payer") - .is_ok()); - assert!(mem_queues - .append_record("droopy", &1.into(), 4, b"!") - .is_ok()); + assert!(mem_queues.append_record("droopy", &(), 0, b"hello").is_ok()); + assert!(mem_queues.append_record("droopy", &(), 1, b"happy").is_ok()); + assert!(mem_queues.append_record("droopy", &(), 2, b"tax").is_ok()); + assert!(mem_queues.append_record("droopy", &(), 3, b"payer").is_ok()); + assert!(mem_queues.append_record("droopy", &(), 4, b"!").is_ok()); mem_queues - .append_record("droopy", &1.into(), 5, b"payer") + .append_record("droopy", &(), 5, b"payer") .unwrap(); } - mem_queues.truncate("droopy", ..=3); + mem_queues.truncate("droopy", ..=3, &()); let droopy: Vec = mem_queues.range("droopy", 0..).unwrap().collect(); assert_eq!( &droopy[..], @@ -96,18 +77,12 @@ fn test_mem_queues_truncate() { #[test] fn test_mem_queues_skip_advance() { let mut mem_queues = MemQueues::default(); - mem_queues.create_queue("droopy").unwrap(); - assert!(mem_queues - .append_record("droopy", &1.into(), 0, b"hello") - .is_ok()); - assert!(mem_queues - .append_record("droopy", &1.into(), 2, b"happy") - .is_ok()); - assert!(mem_queues - .append_record("droopy", &1.into(), 3, b"happy") - .is_ok()); + mem_queues.create_queue("droopy", ()).unwrap(); + assert!(mem_queues.append_record("droopy", &(), 0, b"hello").is_ok()); + assert!(mem_queues.append_record("droopy", &(), 2, b"happy").is_ok()); + assert!(mem_queues.append_record("droopy", &(), 3, b"happy").is_ok()); assert!(mem_queues - .append_record("droopy", &1.into(), 1, b"happy") + .append_record("droopy", &(), 1, b"happy") .is_err()); let droopy: Vec = mem_queues.range("droopy", 0..).unwrap().collect(); assert_eq!( @@ -134,16 +109,12 @@ fn test_mem_queues_skip_advance() { #[test] fn test_mem_queues_append_in_the_past_yield_error() { - let mut mem_queues = MemQueues::default(); - mem_queues.create_queue("droopy").unwrap(); - assert!(mem_queues - .append_record("droopy", &1.into(), 0, b"hello") - .is_ok()); - assert!(mem_queues - .append_record("droopy", &1.into(), 1, b"happy") - .is_ok()); + let mut mem_queues: MemQueues<()> = MemQueues::default(); + mem_queues.create_queue("droopy", ()).unwrap(); + assert!(mem_queues.append_record("droopy", &(), 0, b"hello").is_ok()); + assert!(mem_queues.append_record("droopy", &(), 1, b"happy").is_ok()); assert!(matches!( - mem_queues.append_record("droopy", &1.into(), 0, b"happy"), + mem_queues.append_record("droopy", &(), 0, b"happy"), Err(AppendError::Past) )); } @@ -151,13 +122,11 @@ fn test_mem_queues_append_in_the_past_yield_error() { #[test] fn test_mem_queues_append_idempotence() { let mut mem_queues = MemQueues::default(); - mem_queues.create_queue("droopy").unwrap(); - assert!(mem_queues - .append_record("droopy", &1.into(), 0, b"hello") - .is_ok()); + mem_queues.create_queue("droopy", ()).unwrap(); + assert!(mem_queues.append_record("droopy", &(), 0, b"hello").is_ok()); assert!(matches!( mem_queues - .append_record("droopy", &1.into(), 0, b"different") + .append_record("droopy", &(), 0, b"different") .unwrap_err(), AppendError::Past )); @@ -168,74 +137,76 @@ fn test_mem_queues_append_idempotence() { #[test] fn test_mem_queues_non_zero_first_el() { let mut mem_queues = MemQueues::default(); - mem_queues.create_queue("droopy").unwrap(); - assert!(mem_queues - .append_record("droopy", &1.into(), 5, b"hello") - .is_ok()); + mem_queues.create_queue("droopy", ()).unwrap(); + assert!(mem_queues.append_record("droopy", &(), 5, b"hello").is_ok()); let droopy: Vec = mem_queues.range("droopy", 0..).unwrap().collect(); assert_eq!(droopy, &[Record::new(5, b"hello")]); } #[test] -fn test_mem_queues_keep_filenum() { +fn test_mem_queues_keep_ref_count() { + let has_been_dropped = |ref_count: &Arc| Arc::strong_count(ref_count) == 1; + let mut mem_queues = MemQueues::default(); - let files = (0..4).map(FileNumber::for_test).collect::>(); + let ref_counts = (0..4).map(|i| Arc::new(i)).collect::>(); - assert!(files.iter().all(FileNumber::can_be_deleted)); + assert!(ref_counts.iter().all(has_been_dropped)); - mem_queues.create_queue("droopy").unwrap(); mem_queues - .append_record("droopy", &files[0], 0, b"hello") + .create_queue("droopy", ref_counts[0].clone()) + .unwrap(); + mem_queues + .append_record("droopy", &ref_counts[0], 0, b"hello") .unwrap(); - assert!(!files[0].can_be_deleted()); + assert!(!has_been_dropped(&ref_counts[0])); mem_queues - .append_record("droopy", &files[0], 1, b"hello") + .append_record("droopy", &ref_counts[0], 1, b"hello") .unwrap(); - assert!(!files[0].can_be_deleted()); + assert!(!has_been_dropped(&ref_counts[0])); mem_queues - .append_record("droopy", &files[0], 2, b"hello") + .append_record("droopy", &ref_counts[0], 2, b"hello") .unwrap(); - assert!(!files[0].can_be_deleted()); + assert!(!has_been_dropped(&ref_counts[0])); mem_queues - .append_record("droopy", &files[1], 3, b"hello") + .append_record("droopy", &ref_counts[1], 3, b"hello") .unwrap(); - assert!(!files[0].can_be_deleted()); - assert!(!files[1].can_be_deleted()); + assert!(!has_been_dropped(&ref_counts[0])); + assert!(!has_been_dropped(&ref_counts[1])); - mem_queues.truncate("droopy", ..=1); + mem_queues.truncate("droopy", ..=1, &ref_counts[1]); - assert!(!files[0].can_be_deleted()); - assert!(!files[1].can_be_deleted()); + assert!(!has_been_dropped(&ref_counts[0])); + assert!(!has_been_dropped(&ref_counts[1])); mem_queues - .append_record("droopy", &files[2], 4, b"hello") + .append_record("droopy", &ref_counts[2], 4, b"hello") .unwrap(); - assert!(!files[0].can_be_deleted()); - assert!(!files[1].can_be_deleted()); - assert!(!files[2].can_be_deleted()); + assert!(!has_been_dropped(&ref_counts[0])); + assert!(!has_been_dropped(&ref_counts[1])); + assert!(!has_been_dropped(&ref_counts[2])); - mem_queues.truncate("droopy", ..=3); + mem_queues.truncate("droopy", ..=3, &ref_counts[2]); - assert!(files[0].can_be_deleted()); - assert!(files[1].can_be_deleted()); - assert!(!files[2].can_be_deleted()); + assert!(has_been_dropped(&ref_counts[0])); + assert!(has_been_dropped(&ref_counts[1])); + assert!(!has_been_dropped(&ref_counts[2])); - mem_queues.truncate("droopy", ..=4); + mem_queues.truncate("droopy", ..=4, &ref_counts[3]); let empty_queues = mem_queues.empty_queues().collect::>(); assert_eq!(empty_queues.len(), 1); assert_eq!(empty_queues[0].0, "droopy"); - mem_queues.ack_position("droopy", 5); + mem_queues.ack_position("droopy", 5, &ref_counts[3]); - assert!(files[2].can_be_deleted()); + assert!(has_been_dropped(&ref_counts[2])); } diff --git a/src/mockfile.rs b/src/mockfile.rs new file mode 100644 index 0000000..62f1ca2 --- /dev/null +++ b/src/mockfile.rs @@ -0,0 +1,74 @@ +use std::io; + +use crate::{FileLike, FileLikeWrite}; + +pub(crate) struct MockFile { + buf: Vec, + cursor: usize, + fsynced: bool, +} + +impl MockFile { + pub fn new() -> MockFile { + MockFile { + buf: Vec::new(), + cursor: 0, + fsynced: false, + } + } + + pub fn len(&self) -> usize { + self.buf.len() + } +} + +impl FileLikeWrite for MockFile { + fn fsyncdata(&mut self) -> io::Result<()> { + self.fsynced = true; + Ok(()) + } + + fn set_len(&mut self, num_bytes: u64) -> io::Result<()> { + self.buf.resize(num_bytes as usize, 0u8); + Ok(()) + } +} + +impl FileLike for MockFile {} + +impl io::Read for MockFile { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let end = (self.cursor + buf.len()).min(self.buf.len()); + let len = end - self.cursor; + buf[..len].copy_from_slice(&self.buf[self.cursor..end]); + self.cursor = end; + Ok(len) + } +} + +impl io::Write for MockFile { + fn write(&mut self, buf: &[u8]) -> io::Result { + if self.cursor + buf.len() > self.buf.len() { + self.buf.resize(self.cursor + buf.len(), 0u8); + } + self.fsynced = false; + self.buf[self.cursor..][..buf.len()].copy_from_slice(buf); + self.cursor += buf.len(); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl io::Seek for MockFile { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + match pos { + io::SeekFrom::Start(offset) => self.cursor = offset as usize, + io::SeekFrom::End(offset) => self.cursor = self.buf.len() as usize + offset as usize, + io::SeekFrom::Current(offset) => self.cursor += offset as usize, + } + Ok(self.cursor as u64) + } +} diff --git a/src/multi_record_log.rs b/src/multi_record_log.rs index c8f5bae..29b5ff6 100644 --- a/src/multi_record_log.rs +++ b/src/multi_record_log.rs @@ -3,19 +3,33 @@ use std::ops::{RangeBounds, RangeToInclusive}; use std::path::Path; use bytes::Buf; -use tracing::{debug, event_enabled, info, warn, Level}; +use tracing::{debug, info, warn}; use crate::error::{ AppendError, CreateQueueError, DeleteQueueError, MissingQueue, ReadRecordError, TruncateError, }; -use crate::mem::{MemQueue, QueuesSummary}; +use crate::page_directory::{PageListReader, PageListWriter, PageRangeRef}; use crate::record::{MultiPlexedRecord, MultiRecord}; use crate::recordlog::RecordWriter; -use crate::rolling::RollingWriter; use crate::{mem, PersistAction, PersistPolicy, PersistState, Record, ResourceUsage}; +#[derive(Copy, Clone, Debug)] +pub struct Preferences { + pub persist_policy: PersistPolicy, + pub num_bytes: u64, +} + +impl Default for Preferences { + fn default() -> Preferences { + Preferences { + persist_policy: PersistPolicy::Always(PersistAction::Flush), + num_bytes: 10_000_000, + } + } +} + pub struct MultiRecordLog { - record_log_writer: crate::recordlog::RecordWriter, + record_log_writer: crate::recordlog::RecordWriter, in_mem_queues: mem::MemQueues, next_persist: PersistState, // A simple buffer we reuse to avoid allocation. @@ -25,26 +39,30 @@ pub struct MultiRecordLog { impl MultiRecordLog { /// Open the multi record log, flushing after each operation, but not fsyncing. pub fn open(directory_path: &Path) -> Result { - Self::open_with_prefs(directory_path, PersistPolicy::Always(PersistAction::Flush)) - } - - pub fn summary(&self) -> QueuesSummary { - self.in_mem_queues.summary() + Self::open_with_prefs(directory_path, Preferences::default()) } /// Open the multi record log, syncing following the provided policy. pub fn open_with_prefs( directory_path: &Path, - persist_policy: PersistPolicy, + preferences: Preferences, ) -> Result { + let Preferences { + persist_policy, + num_bytes, + } = preferences; // io errors are non-recoverable - let rolling_reader = crate::rolling::RollingReader::open(directory_path)?; - let mut record_reader = crate::recordlog::RecordReader::open(rolling_reader); + // TODO set num pages + let queue_file = directory_path.join(&Path::new("mrecordlog.wal")); + // TODO stop hard coding + let directory = crate::page_directory::Directory::create_or_open(&queue_file, num_bytes)?; + let page_reader = PageListReader::new(directory)?; + let mut record_reader = crate::recordlog::RecordReader::open(page_reader); let mut in_mem_queues = crate::mem::MemQueues::default(); debug!("loading wal"); loop { - let file_number = record_reader.read().current_file().clone(); - let Ok(record) = record_reader.read_record::() else { + let mut session = record_reader.start_session(); + let Ok(record) = record_reader.read_record::(&mut session) else { warn!("Detected corrupted record: some data may have been lost"); continue; }; @@ -56,7 +74,7 @@ impl MultiRecordLog { position, } => { if !in_mem_queues.contains_queue(queue) { - in_mem_queues.ack_position(queue, position); + in_mem_queues.ack_position(queue, position, &session); } for record in records { // if this fails, it means some corruption wasn't detected at a lower @@ -68,7 +86,7 @@ impl MultiRecordLog { // corruption. In that case, maybe we should ack_position() and try // to insert again? in_mem_queues - .append_record(queue, &file_number, position, payload) + .append_record(queue, &session, position, payload) .map_err(|_| ReadRecordError::Corruption)?; } } @@ -76,10 +94,10 @@ impl MultiRecordLog { truncate_range, queue, } => { - in_mem_queues.truncate(queue, truncate_range); + in_mem_queues.truncate(queue, truncate_range, &session); } MultiPlexedRecord::RecordPosition { queue, position } => { - in_mem_queues.ack_position(queue, position); + in_mem_queues.ack_position(queue, position, &session); } MultiPlexedRecord::DeleteQueue { queue, position: _ } => { // can fail if we don't know about the queue getting deleted. It's fine to @@ -92,23 +110,16 @@ impl MultiRecordLog { } } // io errors are non-recoverable - let record_log_writer: RecordWriter = record_reader.into_writer()?; - let mut multi_record_log = MultiRecordLog { + let record_log_writer: RecordWriter = record_reader.into_writer()?; + let multi_record_log = MultiRecordLog { record_log_writer, in_mem_queues, next_persist: persist_policy.into(), multi_record_spare_buffer: Vec::new(), }; - multi_record_log.run_gc_if_necessary()?; Ok(multi_record_log) } - #[cfg(test)] - pub fn list_file_numbers(&self) -> Vec { - let rolling_writer = self.record_log_writer.get_underlying_wrt(); - rolling_writer.list_file_numbers() - } - /// Creates a new queue. /// /// Returns an error if the queue already exists. @@ -117,10 +128,11 @@ impl MultiRecordLog { if self.queue_exists(queue) { return Err(CreateQueueError::AlreadyExists); } + let mut session = self.record_log_writer.start_session()?; let record = MultiPlexedRecord::RecordPosition { queue, position: 0 }; - self.record_log_writer.write_record(record)?; + self.record_log_writer.write_record(record, &mut session)?; self.persist(PersistAction::FlushAndFsync)?; - self.in_mem_queues.create_queue(queue)?; + self.in_mem_queues.create_queue(queue, session)?; Ok(()) } @@ -128,9 +140,9 @@ impl MultiRecordLog { info!(queue = queue, "delete queue"); let position = self.in_mem_queues.next_position(queue)?; let record = MultiPlexedRecord::DeleteQueue { queue, position }; - self.record_log_writer.write_record(record)?; + let mut session = self.record_log_writer.start_session()?; + self.record_log_writer.write_record(record, &mut session)?; self.in_mem_queues.delete_queue(queue)?; - self.run_gc_if_necessary()?; self.persist(PersistAction::FlushAndFsync)?; Ok(()) } @@ -179,7 +191,6 @@ impl MultiRecordLog { } } let position = position_opt.unwrap_or(next_position); - let file_number = self.record_log_writer.current_file().clone(); let mut multi_record_spare_buffer = std::mem::take(&mut self.multi_record_spare_buffer); MultiRecord::serialize(payloads, position, &mut multi_record_spare_buffer); @@ -190,12 +201,15 @@ impl MultiRecordLog { } let records = MultiRecord::new_unchecked(&multi_record_spare_buffer); - let record = MultiPlexedRecord::AppendRecords { + let multi_record = MultiPlexedRecord::AppendRecords { position, queue, records, }; - self.record_log_writer.write_record(record)?; + + let mut session: PageRangeRef = self.record_log_writer.start_session()?; + self.record_log_writer + .write_record(multi_record, &mut session)?; self.persist_on_policy()?; let mem_queue = self.in_mem_queues.get_queue_mut(queue)?; @@ -203,7 +217,7 @@ impl MultiRecordLog { for record in records { // we just serialized it, we know it's valid let (position, payload) = record.unwrap(); - mem_queue.append_record(&file_number, position, payload)?; + mem_queue.append_record(&session, position, payload)?; max_position = position; } @@ -211,25 +225,6 @@ impl MultiRecordLog { Ok(Some(max_position)) } - fn record_empty_queues_position(&mut self) -> io::Result<()> { - let mut has_empty_queues = false; - for (queue_id, queue) in self.in_mem_queues.empty_queues() { - let next_position = queue.next_position(); - let record = MultiPlexedRecord::RecordPosition { - queue: queue_id, - position: next_position, - }; - self.record_log_writer.write_record(record)?; - has_empty_queues = true - } - if has_empty_queues { - // We need to fsync here! We are remove files from the FS - // so we need to make sure our empty queue positions are properly persisted. - self.persist(PersistAction::FlushAndFsync)?; - } - Ok(()) - } - /// Truncates the queue up to a given `position`, included. This method immediately /// truncates the underlying in-memory queue whereas the backing log files are deleted /// asynchronously when they become exclusively composed of deleted records. @@ -245,49 +240,22 @@ impl MultiRecordLog { if !self.queue_exists(queue) { return Err(TruncateError::MissingQueue(queue.to_string())); } + let mut session = self.record_log_writer.start_session()?; + let truncate_record = MultiPlexedRecord::Truncate { + truncate_range, + queue, + }; self.record_log_writer - .write_record(MultiPlexedRecord::Truncate { - truncate_range, - queue, - })?; + .write_record(truncate_record, &mut session)?; let removed_count = self .in_mem_queues - .truncate(queue, truncate_range) + .truncate(queue, truncate_range, &session) .unwrap_or(0); - self.run_gc_if_necessary()?; + // self.run_gc_if_necessary()?; self.persist_on_policy()?; Ok(removed_count) } - fn run_gc_if_necessary(&mut self) -> io::Result<()> { - debug!("run_gc_if_necessary"); - if self - .record_log_writer - .directory() - .has_files_that_can_be_deleted() - { - // We are about to delete files. - // Let's make sure we record the offsets of the empty queues - // so that we don't lose that information after dropping the files. - // - // But first we clone the current file number to make sure that the file that will - // contain the truncate positions it self won't be GC'ed. - let _file_number = self.record_log_writer.current_file().clone(); - self.record_empty_queues_position()?; - self.record_log_writer.directory().gc()?; - } - // only execute the following if we are above the debug level in tokio tracing - if event_enabled!(Level::DEBUG) { - for queue in self.list_queues() { - let queue: &MemQueue = self.in_mem_queues.get_queue(queue).unwrap(); - let first_pos = queue.range(..).next().map(|record| record.position); - let last_pos = queue.last_position(); - debug!(first_pos=?first_pos, last_pos=?last_pos, "queue positions after gc"); - } - } - Ok(()) - } - pub fn range( &self, queue: &str, @@ -323,14 +291,17 @@ impl MultiRecordLog { self.in_mem_queues.last_record(queue) } - /// Return the amount of memory and disk space used by mrecordlog. + // Return the amount of memory and disk space used by mrecordlog. pub fn resource_usage(&self) -> ResourceUsage { - let disk_used_bytes = self.record_log_writer.size(); + let page_list_writer = self.record_log_writer.get_underlying_wrt(); + let num_pages = page_list_writer.num_pages(); + let num_used_pages = page_list_writer.num_used_pages(); let (memory_used_bytes, memory_allocated_bytes) = self.in_mem_queues.size(); ResourceUsage { memory_used_bytes, memory_allocated_bytes, - disk_used_bytes, + num_pages, + num_used_pages, } } } diff --git a/src/page_directory/header.rs b/src/page_directory/header.rs new file mode 100644 index 0000000..3ce721a --- /dev/null +++ b/src/page_directory/header.rs @@ -0,0 +1,117 @@ +use std::io; +use std::ops::Range; + +use crate::error::HeaderError; +use crate::page_directory::compute_slot_len; + +const MAGIC_NUMBER: u32 = 1_778_463_742_u32; +const HEADER_SIZE: usize = 4 + // magic number + 4 + // version + 4 + // num_pages + 4; // CRC +const VERSION: u32 = 1; + +fn crc32(data: &[u8]) -> u32 { + let mut hasher = crc32fast::Hasher::new(); + hasher.update(data); + hasher.finalize() +} + +fn to_header_bytes(num_pages: u32) -> [u8; HEADER_SIZE] { + let mut buf = [0u8; HEADER_SIZE]; + buf[0..4].copy_from_slice(&MAGIC_NUMBER.to_le_bytes()); + buf[4..8].copy_from_slice(&VERSION.to_le_bytes()); + buf[8..12].copy_from_slice(&num_pages.to_le_bytes()); + let checksum = crc32(&buf[..12]); + buf[12..16].copy_from_slice(&checksum.to_le_bytes()); + buf +} + +fn from_header_bytes(header_bytes: [u8; HEADER_SIZE]) -> Result { + let magic_number = u32::from_le_bytes(header_bytes[0..4].try_into().unwrap()); + let version = u32::from_le_bytes(header_bytes[4..8].try_into().unwrap()); + let num_pages = u32::from_le_bytes(header_bytes[8..12].try_into().unwrap()); + let checksum = u32::from_le_bytes(header_bytes[12..16].try_into().unwrap()); + let computed_checksum = crc32(&header_bytes[..12]); + if checksum != computed_checksum { + return Err(HeaderError::InvalidChecksum); + } + if magic_number != MAGIC_NUMBER { + return Err(HeaderError::InvalidMagicNumber { magic_number }); + } + if version != VERSION { + return Err(HeaderError::UnsupportedVersion { version }); + } + Ok(num_pages) +} + +pub fn serialize_header(num_pages: u32, wrt: &mut dyn io::Write) -> io::Result { + let header_bytes = to_header_bytes(num_pages); + wrt.write_all(&header_bytes[..])?; + Ok(HeaderInfo { + header_len: HEADER_SIZE, + num_pages, + }) +} + +pub fn deserialize_header(read: &mut dyn io::Read) -> io::Result { + let mut header_bytes = [0u8; HEADER_SIZE]; + read.read_exact(&mut header_bytes)?; + let num_pages = from_header_bytes(header_bytes) + .map_err(|header_err| io::Error::new(io::ErrorKind::InvalidData, header_err))?; + Ok(HeaderInfo { + header_len: HEADER_SIZE, + num_pages, + }) +} + +#[derive(PartialEq, Eq, Debug, Clone, Copy)] +pub struct HeaderInfo { + pub header_len: usize, + pub num_pages: u32, +} + +impl HeaderInfo { + pub fn compute_slot_range(&self, epoch_parity: bool) -> Range { + let slot_len = compute_slot_len(self.num_pages); + let start_offset = if epoch_parity { + self.header_len + slot_len + } else { + self.header_len + }; + start_offset..start_offset + slot_len + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn test_simple_serialize_deserialize_header() { + let mut buf = Vec::new(); + let num_pages = 10; + let expected_header_info = HeaderInfo { + header_len: HEADER_SIZE, + num_pages, + }; + assert_eq!( + serialize_header(num_pages, &mut buf).unwrap(), + expected_header_info + ); + assert_eq!( + deserialize_header(&mut buf.as_slice()).unwrap(), + expected_header_info + ); + } + + #[test] + fn test_serialize_header() { + let mut buf = Vec::new(); + let num_pages = 10; + let header_info = serialize_header(num_pages, &mut buf).unwrap(); + assert_eq!(header_info.header_len, buf.len()); + assert_eq!(header_info.num_pages, num_pages); + } +} diff --git a/src/page_directory/mod.rs b/src/page_directory/mod.rs new file mode 100644 index 0000000..e95f682 --- /dev/null +++ b/src/page_directory/mod.rs @@ -0,0 +1,177 @@ +use std::fs::File; +use std::io; +use std::path::Path; + +mod header; +mod page_list; +mod page_refcounts; +mod reader; +mod writer; + +use page_list::PageList; +pub(crate) use page_refcounts::PageRangeRef; +pub use reader::PageListReader; +pub use writer::PageListWriter; + +use crate::frame::{FrameReader, FrameWriter}; +use crate::recordlog::{RecordReader, RecordWriter}; +use crate::{FileLike, BLOCK_NUM_BYTES}; + +pub const PAGE_SIZE: usize = 1 << 16; // 65,536 bytes +pub const MIN_NUM_PAGES: usize = 2; + +pub type PageId = u32; + +pub struct Directory { + file: F, + page_list: PageList, +} + +enum CreateOrOpen { + Created, + Opened, +} + +pub(crate) fn compute_slot_len(num_pages: u32) -> usize { + 8 + // epoch + 3 * num_pages as usize + // page ids encoded over 3 bytes each + 4 // checksum +} + +fn create_or_open_page_file(path: &Path, num_pages: usize) -> io::Result<(CreateOrOpen, File)> { + let len = num_pages * PAGE_SIZE; + match std::fs::File::create_new(path) { + Ok(file) => { + file.set_len(len as u64)?; + Ok((CreateOrOpen::Created, file)) + } + Err(io_err) if io_err.kind() == io::ErrorKind::AlreadyExists => { + let file = std::fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(path)?; + Ok((CreateOrOpen::Opened, file)) + } + Err(err) => Err(err), + } +} + +impl Directory { + pub fn create_or_open(path: &Path, len: u64) -> io::Result { + let num_pages = len as usize / PAGE_SIZE; + if num_pages < MIN_NUM_PAGES { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "file length is too small. requested {}, should be at least {}", + len, + PAGE_SIZE * MIN_NUM_PAGES + ), + )); + } + let (create_or_open, file) = create_or_open_page_file(path, num_pages)?; + match create_or_open { + CreateOrOpen::Created => Directory::create(num_pages, file), + CreateOrOpen::Opened => Directory::open(num_pages, file), + } + } +} + +impl Directory { + fn create(num_pages: usize, mut file: F) -> io::Result> { + let page_list = PageList::initialize_page_file(num_pages, &mut file)?; + let directory = Directory { file, page_list }; + Ok(directory) + } + + fn open(num_pages: usize, mut file: F) -> io::Result> { + file.seek(io::SeekFrom::Start(0u64))?; + let header = header::deserialize_header(&mut file)?; + if header.num_pages as usize != num_pages { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "number of pages does not match existing file", + )); + } + let page_list = PageList::load(header, &mut file)?; + Ok(Directory { file, page_list }) + } +} + +impl RecordReader { + pub fn into_writer(self) -> io::Result> { + let frame_writer: FrameWriter = self.frame_reader.into_writer()?; + Ok(RecordWriter::from(frame_writer)) + } +} + +impl FrameReader { + pub fn into_writer(self) -> io::Result> { + let offset: u64 = self.reader.block_id as u64 * BLOCK_NUM_BYTES as u64 + self.cursor as u64; + let page_list_writer: PageListWriter = self.reader.into_writer(offset as u64)?; + Ok(FrameWriter::create(page_list_writer)) + } +} + +#[cfg(test)] +mod tests { + use std::io::Cursor; + + use super::reader::PageListReader; + use super::*; + use crate::{BlockRead as _, BlockWrite as _, BLOCK_NUM_BYTES}; + + #[test] + fn test_simple_no_write() { + let fake_file = { + let directory = Directory::create(100, Cursor::new(Vec::new())).unwrap(); + let mut page_list_reader = PageListReader::new(directory).unwrap(); + let mut session = page_list_reader.start_session(); + assert_eq!(session.num_pages, 1); + loop { + if !page_list_reader.next_block(&mut session).unwrap() { + break; + } + } + assert_eq!(session.start_page_id, 0u32); + assert_eq!(session.num_pages, 100u16); + page_list_reader.into_writer(0u64).unwrap().into_file() + }; + + let directory = Directory::open(100, fake_file).unwrap(); + let page_list_reader = PageListReader::new(directory).unwrap(); + let _writer = page_list_reader.into_writer(0); + } + + #[test] + fn test_simple_write() { + let fake_file0 = Cursor::new(Vec::new()); + let directory = Directory::create(100, fake_file0).unwrap(); + let mut page_list_reader = PageListReader::new(directory).unwrap(); + let mut session = page_list_reader.start_session(); + loop { + if !page_list_reader.next_block(&mut session).unwrap() { + break; + } + } + let fake_file1 = page_list_reader.into_writer(0u64).unwrap().into_file(); + + let directory = Directory::open(100, fake_file1).unwrap(); + let page_list_reader = PageListReader::new(directory).unwrap(); + let mut writer = page_list_reader.into_writer(0).unwrap(); + let mut session = writer.start_write_session().unwrap(); + assert_eq!(writer.num_bytes_remaining_in_block(), BLOCK_NUM_BYTES); + writer.write(b"hello", &mut session).unwrap(); + assert_eq!(session.num_pages, 1u16); + assert_eq!( + writer.num_bytes_remaining_in_block(), + BLOCK_NUM_BYTES - b"hello".len() + ); + let fake_file2 = writer.into_file(); + + let directory = Directory::open(100, fake_file2).unwrap(); + let page_list_reader = PageListReader::new(directory).unwrap(); + assert_eq!(&page_list_reader.block()[..5], b"hello"); + } +} diff --git a/src/page_directory/page_list.rs b/src/page_directory/page_list.rs new file mode 100644 index 0000000..c68b200 --- /dev/null +++ b/src/page_directory/page_list.rs @@ -0,0 +1,374 @@ +use std::io::{self, Read, Write}; +use std::ops::Range; +use std::sync::atomic::Ordering; + +use super::header::{self, HeaderInfo}; +use super::{PageId, PAGE_SIZE}; +use crate::page_directory::page_refcounts::PageRegistry; +use crate::page_directory::PageRangeRef; +use crate::{FileLike, FileLikeWrite}; + +pub struct PageList { + header_info: HeaderInfo, + page_registry: rclite::Arc, + epoch: u64, + first_page_offset: u64, +} + +const CHUNK_NUM_PAGES: usize = 4_096; +const CHUNK_BYTES: usize = CHUNK_NUM_PAGES * 3; + +fn write_slot(epoch: u64, page_ids: &[PageId], wrt: &mut impl Write) -> io::Result<()> { + let mut buf: Vec = Vec::with_capacity(CHUNK_BYTES); + let mut hasher = crc32fast::Hasher::new(); + + let epoch_bytes = epoch.to_le_bytes(); + hasher.update(&epoch_bytes); + wrt.write_all(&epoch_bytes)?; + for page_chunk in page_ids.chunks(CHUNK_NUM_PAGES) { + buf.clear(); + for &page_id in page_chunk { + let page_id_bytes = page_id.to_le_bytes(); + buf.extend_from_slice(&page_id_bytes[..3]); + } + if buf.is_empty() { + break; + } + hasher.update(&buf); + wrt.write_all(&buf)?; + if buf.len() < CHUNK_BYTES { + break; + } + } + let digest = hasher.finalize(); + wrt.write_all(&digest.to_le_bytes())?; + Ok(()) +} + +#[derive(Debug)] +struct PageListSerialized { + epoch: u64, + page_ids: Vec, +} + +fn read_slot(num_pages: u32, read: &mut dyn Read) -> io::Result> { + let mut page_ids = Vec::with_capacity(num_pages as usize); + let mut hasher = crc32fast::Hasher::new(); + + let mut epoch_bytes = [0u8; 8]; + read.read_exact(&mut epoch_bytes)?; + hasher.update(&epoch_bytes); + + let epoch = u64::from_le_bytes(epoch_bytes.try_into().unwrap()); + + let mut buff = vec![0u8; CHUNK_BYTES]; + while page_ids.len() < num_pages as usize { + let num_pages_to_read = (num_pages as usize - page_ids.len()).min(CHUNK_NUM_PAGES); + let num_bytes_to_read = num_pages_to_read * 3; + let chunk_buf = &mut buff[..num_bytes_to_read]; + read.read_exact(chunk_buf)?; + hasher.update(chunk_buf); + page_ids.extend(chunk_buf.chunks_exact(3).map(|page_id| { + let mut page_id_bytes = [0u8; 4]; + page_id_bytes[0..3].copy_from_slice(page_id); + u32::from_le_bytes(page_id_bytes) + })); + } + assert_eq!(page_ids.len(), num_pages as usize); + let expected_digest = hasher.finalize(); + + let mut digest_bytes = [0u8; 4]; + read.read_exact(&mut digest_bytes[..])?; + let digest = u32::from_le_bytes(digest_bytes); + + if expected_digest != digest { + return Ok(None); + } + + Ok(Some(PageListSerialized { epoch, page_ids })) +} + +fn next_multiple_of(offset: u64, mult: u64) -> u64 { + let k = offset.div_ceil(mult); + k * mult +} + +impl PageList { + fn new(header_info: HeaderInfo) -> Self { + let page_registry = PageRegistry::new(header_info.num_pages as usize); + let end_of_page_list = header_info.compute_slot_range(true).end as u64; + let first_page_offset = next_multiple_of(end_of_page_list, PAGE_SIZE as u64); + PageList { + header_info, + page_registry: rclite::Arc::new(page_registry), + epoch: 0u64, + first_page_offset, + } + } + + pub(crate) fn initialize_page_file( + num_pages: usize, + file: &mut W, + ) -> io::Result { + file.seek(io::SeekFrom::Start(0))?; + let header_info = header::serialize_header(num_pages as u32, file)?; + let mut page_list = PageList::new(header_info); + let file_num_bytes = page_list.first_page_offset + (num_pages as u64 * PAGE_SIZE as u64); + file.set_len(file_num_bytes)?; + // We call gc once in order to initialize the page_list. + page_list.gc(0u64, file)?; + Ok(page_list) + } + + fn compute_slot_range(&self, epoch_parity: bool) -> Range { + self.header_info.compute_slot_range(epoch_parity) + } + + pub fn load(header_info: HeaderInfo, file: &mut impl FileLike) -> io::Result { + let mut page_list = PageList::new(header_info); + + let first_slot_range = page_list.compute_slot_range(false); + file.seek(io::SeekFrom::Start(first_slot_range.start as u64))?; + let first_page_list = read_slot(page_list.num_pages(), file)?; + + let second_slot_range = page_list.compute_slot_range(true); + file.seek(io::SeekFrom::Start(second_slot_range.start as u64))?; + let second_page_list = read_slot(page_list.num_pages(), file)?; + + let PageListSerialized { epoch, page_ids } = match (first_page_list, second_page_list) { + (None, None) => { + let error_msg = "page list is corrupted"; + return Err(io::Error::new(io::ErrorKind::InvalidData, error_msg)); + } + (None, Some(page_list)) | (Some(page_list), None) => page_list, + (Some(first_page_list), Some(second_page_list)) => { + if first_page_list.epoch > second_page_list.epoch { + first_page_list + } else { + second_page_list + } + } + }; + + page_list.page_registry = rclite::Arc::new(PageRegistry::with_page_ids(page_ids)); + + page_list.epoch = epoch; + + Ok(page_list) + } + + pub fn new_page_range_ref(&self, start_page_id: u32) -> PageRangeRef { + self.new_page_range_ref_with_num_pages(start_page_id, 1u32) + } + + #[inline(always)] + pub fn new_page_range_ref_with_num_pages( + &self, + start_page_id: u32, + num_pages: u32, + ) -> PageRangeRef { + let mut page_range_ref = PageRangeRef { + start_page_id: start_page_id, + num_pages: 0, + page_registry: self.page_registry.clone(), + }; + for _ in 0..num_pages { + page_range_ref.add_page(); + } + page_range_ref + } + + #[inline(always)] + pub fn num_pages(&self) -> u32 { + self.header_info.num_pages + } + + #[inline(always)] + pub fn page_id(&self, page_ord: usize) -> Option { + self.page_registry + .page_index + .load() + .ord_to_id + .get(page_ord) + .copied() + } + + #[inline(always)] + pub fn page_start_offset(&self, page_ord: usize) -> Option { + let page_id = self.page_id(page_ord)?; + Some(self.first_page_offset + page_id as u64 * PAGE_SIZE as u64) + } + + // Runs a gc operation. + // `cursor` is the next offset of the byte to be written. + // + // This method returns the number that have been freed. + pub fn gc(&mut self, cursor: u64, file: &mut impl FileLikeWrite) -> io::Result { + self.epoch += 1; + let written_page_id = cursor.div_ceil(PAGE_SIZE as u64) as usize; + let mut page_ids = self.page_registry.page_index.load().ord_to_id.clone(); + let mut free_pages: Vec = page_ids.drain(written_page_id..).collect(); + let num_free_pages_when_gc_started = free_pages.len(); + drain_filter( + &mut page_ids, + |page_id| { + self.page_registry.ref_counts[page_id as usize].load(Ordering::Acquire) == 0u32 + }, + &mut free_pages, + ); + let num_freed_pages = free_pages.len() - num_free_pages_when_gc_started; + free_pages.sort_unstable(); + page_ids.extend_from_slice(&free_pages); + let slot_range = self.compute_slot_range(self.epoch % 2 != 0); + file.seek(io::SeekFrom::Start(slot_range.start as u64))?; + write_slot(self.epoch, &page_ids, file)?; + file.flush()?; + file.fsyncdata()?; + self.page_registry.update_page_ids_after_gc(page_ids); + Ok(num_freed_pages) + } +} + +fn drain_filter(els: &mut Vec, filter: impl Fn(PageId) -> bool, output: &mut Vec) { + let mut wrt_cursor = 0; + for read_cursor in 0..els.len() { + let page = els[read_cursor]; + if filter(page) { + output.push(page); + } else { + els[wrt_cursor] = page; + wrt_cursor += 1; + } + } + els.truncate(wrt_cursor); +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::mockfile::MockFile; + + #[test] + fn test_drain_filter() { + let mut output = vec![6u32, 4]; + let mut els: Vec = vec![1u32, 3, 2, 8, 5, 7]; + drain_filter(&mut els, |page_id| page_id % 2 == 0, &mut output); + assert_eq!(&els[..], &[1, 3, 5, 7]); + assert_eq!(&output[..], &[6, 4, 2, 8]); + } + + #[track_caller] + fn test_serialize_page_list_slot_aux(epoch: u64, page_ids: &[u32]) { + let mut buf = Vec::new(); + write_slot(epoch, page_ids, &mut buf).unwrap(); + assert_eq!( + super::super::compute_slot_len(page_ids.len() as u32), + buf.len(), + "serialized len does not match expectation" + ); + let mut arr = &buf[..]; + let page_list_serialized_opt = read_slot(page_ids.len() as u32, &mut arr).unwrap(); + assert!(arr.is_empty(), "data was not entirely read"); + let page_list_serialized = page_list_serialized_opt.unwrap(); + assert_eq!(page_list_serialized.epoch, epoch, "epoch does not match"); + assert_eq!( + &page_list_serialized.page_ids, page_ids, + "page ids do not match" + ); + } + + #[test] + fn test_serialize_page_list_slot() { + test_serialize_page_list_slot_aux(3u64, &[]); + test_serialize_page_list_slot_aux(3u64, &[3u32, 9u32]); + } + + proptest::proptest! { + #[test] + fn test_proptest_serialize_page_list_slot( + epoch in proptest::num::u64::ANY, + page_ids in proptest::collection::vec(0..1u32 << 24, 0..CHUNK_NUM_PAGES * 3) + ) { + test_serialize_page_list_slot_aux(epoch, &page_ids); + } + } + + #[test] + fn test_page_list_gc_simple() { + let mut file = MockFile::new(); + let header_info = HeaderInfo { + header_len: 16, + num_pages: 10u32, + }; + { + let page_list = PageList::initialize_page_file(10, &mut file).unwrap(); + assert_eq!(page_list.epoch, 1); + assert_eq!(page_list.header_info.header_len, 16); + // The header AND the slots should fit in a single page. + assert_eq!(PAGE_SIZE * 11, file.len()); + assert_eq!(page_list.first_page_offset % PAGE_SIZE as u64, 0u64); + } + { + let mut page_list = PageList::load(header_info, &mut file).unwrap(); + + assert_eq!( + &page_list.page_registry.page_index.load().ord_to_id, + &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + ); + assert_eq!(page_list.epoch, 1); + assert_eq!(page_list.header_info.num_pages, header_info.num_pages); + for i in 0..header_info.num_pages as usize { + assert_eq!( + page_list.page_registry.page_index.load().ord_to_id[i], + i as PageId + ); + } + assert_eq!(page_list.epoch, 1); + page_list.gc(0, &mut file).unwrap(); + assert_eq!(page_list.epoch, 2); + } + { + let mut page_list = PageList::load(header_info, &mut file).unwrap(); + let page_registry = page_list.page_registry.clone(); + assert_eq!(page_list.epoch, 2); + assert_eq!(page_list.header_info.num_pages, header_info.num_pages); + for i in 0..header_info.num_pages as usize { + assert_eq!(page_registry.page_index.load().ord_to_id[i], i as PageId); + } + + let ref_page0 = page_list.new_page_range_ref(0); + let _ref_page1 = page_list.new_page_range_ref(1); + let _ref_page2 = page_list.new_page_range_ref(2); + let _ref_page3 = page_list.new_page_range_ref(3); + drop(ref_page0); + let ref_page4 = page_list.new_page_range_ref(4); + let _ref_page5 = page_list.new_page_range_ref(5); + drop(ref_page4); + page_list.gc(6u64 * PAGE_SIZE as u64, &mut file).unwrap(); + assert_eq!( + &page_registry.page_index.load().ord_to_id, + &[1, 2, 3, 5, 0, 4, 6, 7, 8, 9] + ); + } + } + + #[test] + fn test_page_list_gc_clone() { + let mut file = MockFile::new(); + let mut page_list = PageList::initialize_page_file(5, &mut file).unwrap(); + let page_registry = page_list.page_registry.clone(); + + let ref_page0 = page_list.new_page_range_ref(0); + let ref_page1 = page_list.new_page_range_ref(1); + let _ref_page2 = page_list.new_page_range_ref(2); + let _ref_page1_clone = ref_page1.clone(); + drop(ref_page0); + drop(ref_page1); + + page_list.gc(3u64 * PAGE_SIZE as u64, &mut file).unwrap(); + assert_eq!( + &page_registry.page_index.load().ord_to_id, + &[1, 2, 0, 3, 4,] + ); + } +} diff --git a/src/page_directory/page_refcounts.rs b/src/page_directory/page_refcounts.rs new file mode 100644 index 0000000..4024b4f --- /dev/null +++ b/src/page_directory/page_refcounts.rs @@ -0,0 +1,133 @@ +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; + +use arc_swap::ArcSwap; + +use crate::page_directory::PageId; + +pub struct PageIndex { + // This is a page_ord -> page_id mapping. + pub ord_to_id: Vec, + // This is a page_id -> page_ord mapping. + pub id_to_ord: Vec, +} + +impl PageIndex { + pub fn new(num_pages: usize) -> PageIndex { + let ord_to_id: Vec = (0..num_pages as u32).collect(); + let id_to_ord = ord_to_id.clone(); + PageIndex { + ord_to_id, + id_to_ord, + } + } + + pub fn from_page_ids(ord_to_id: Vec) -> PageIndex { + let mut page_index = PageIndex { + ord_to_id, + id_to_ord: Vec::new(), + }; + page_index.rebuild_page_index(); + page_index + } + + fn rebuild_page_index(&mut self) { + self.id_to_ord.resize(self.ord_to_id.len(), 0u32); + for (ord, &page_id) in self.ord_to_id.iter().enumerate() { + self.id_to_ord[page_id as usize] = ord as u32; + } + } +} + +pub struct PageRegistry { + pub ref_counts: Box<[AtomicU32]>, + pub page_index: ArcSwap, +} + +impl PageRegistry { + pub fn with_page_ids(ord_to_id: Vec) -> PageRegistry { + let page_index = PageIndex::from_page_ids(ord_to_id); + PageRegistry::with_page_index(page_index) + } + + pub fn update_page_ids_after_gc(&self, ord_to_id: Vec) { + let page_index = PageIndex::from_page_ids(ord_to_id); + self.page_index.store(Arc::new(page_index)); + } + + pub fn with_page_index(page_index: PageIndex) -> PageRegistry { + let num_pages = page_index.ord_to_id.len(); + let ref_counts: Vec = std::iter::repeat_with(AtomicU32::default) + .take(num_pages) + .collect(); + PageRegistry { + ref_counts: ref_counts.into_boxed_slice(), + page_index: ArcSwap::new(Arc::new(page_index)), + } + } + + pub fn new(num_pages: usize) -> PageRegistry { + let page_index = PageIndex::new(num_pages); + Self::with_page_index(page_index) + } +} + +pub struct PageRangeRef { + pub start_page_id: u32, + pub num_pages: u16, + pub page_registry: rclite::Arc, +} + +impl Eq for PageRangeRef {} + +impl PartialEq for PageRangeRef { + #[inline(always)] + fn eq(&self, other: &Self) -> bool { + self.start_page_id == other.start_page_id && self.num_pages == other.num_pages + } +} + +impl PageRangeRef { + // Adds a page to the reference. + pub fn add_page(&mut self) { + let page_id = if self.num_pages == 0u16 { + self.start_page_id + } else { + let page_index = self.page_registry.page_index.load(); + let start_page_ord = page_index.id_to_ord[self.start_page_id as usize]; + let page_ord = start_page_ord + self.num_pages as u32; + page_index.ord_to_id[page_ord as usize] + }; + self.num_pages += 1; + self.page_registry.ref_counts[page_id as usize].fetch_add(1, Ordering::Release); + } +} + +impl Clone for PageRangeRef { + fn clone(&self) -> PageRangeRef { + let mut page_range_ref = PageRangeRef { + start_page_id: self.start_page_id, + num_pages: 0, + page_registry: self.page_registry.clone(), + }; + for _ in 0..self.num_pages { + page_range_ref.add_page(); + } + page_range_ref + } +} + +impl Drop for PageRangeRef { + fn drop(&mut self) { + let page_index = self.page_registry.page_index.load(); + let page_ids = if self.num_pages == 0u16 { + &[self.start_page_id] + } else { + let start_page_ord = page_index.id_to_ord[self.start_page_id as usize]; + &page_index.ord_to_id[start_page_ord as usize..][..self.num_pages as usize] + }; + for &page_id in page_ids { + self.page_registry.ref_counts[page_id as usize].fetch_sub(1, Ordering::AcqRel); + } + } +} diff --git a/src/page_directory/reader.rs b/src/page_directory/reader.rs new file mode 100644 index 0000000..728398d --- /dev/null +++ b/src/page_directory/reader.rs @@ -0,0 +1,93 @@ +use std::fs::File; +use std::io::{self, BufWriter, SeekFrom}; + +use super::page_list::PageList; +use super::writer::PageListWriter; +use super::{Directory, PAGE_SIZE}; +use crate::page_directory::PageRangeRef; +use crate::{BlockRead, FileLike, BLOCK_NUM_BYTES}; + +const NUM_BLOCK_PER_PAGE: usize = PAGE_SIZE / BLOCK_NUM_BYTES; + +pub struct PageListReader { + file: F, + page_list: PageList, + page_buffer: Box<[u8]>, + pub(crate) block_id: usize, + page_id: u32, +} + +impl PageListReader { + pub fn new(page_directory: Directory) -> io::Result { + let Directory { file, page_list } = page_directory; + let mut page_list_reader = PageListReader { + file, + page_list, + page_buffer: vec![0u8; PAGE_SIZE].into_boxed_slice(), + page_id: 0u32, + block_id: 0, + }; + page_list_reader.load_page()?; + Ok(page_list_reader) + } + + fn block_id_within_page(&self) -> usize { + self.block_id % NUM_BLOCK_PER_PAGE + } + + // Loads the current page. + // + // Returns Ok(false) if we have reached the end of the file. + fn load_page(&mut self) -> std::io::Result { + let page_ord = self.block_id / NUM_BLOCK_PER_PAGE; + let Some(page_id) = self.page_list.page_id(page_ord) else { + return Ok(false); + }; + self.page_id = page_id; + let Some(page_start_offset) = self.page_list.page_start_offset(page_ord) else { + return Ok(false); + }; + self.file.seek(SeekFrom::Start(page_start_offset as u64))?; + self.file.read_exact(&mut self.page_buffer[..])?; + Ok(true) + } + + pub fn into_writer(self, num_bytes: u64) -> io::Result> { + let mut page_list_writer = PageListWriter { + page_list: self.page_list, + wrt: BufWriter::with_capacity(BLOCK_NUM_BYTES, self.file), + cursor: num_bytes, + }; + page_list_writer.reposition_write_head()?; + Ok(page_list_writer) + } +} + +impl BlockRead for PageListReader { + type Session = PageRangeRef; + + fn start_session(&self) -> Self::Session { + self.page_list.new_page_range_ref(self.page_id) + } + + fn next_block(&mut self, session: &mut Self::Session) -> std::io::Result { + self.block_id += 1; + let block_id_within_page = self.block_id_within_page(); + if block_id_within_page == 0 { + if self.load_page()? { + session.num_pages += 1u16; + Ok(true) + } else { + Ok(false) + } + } else { + Ok(true) + } + } + + fn block(&self) -> &[u8; crate::BLOCK_NUM_BYTES] { + let start_offset = self.block_id_within_page() * crate::BLOCK_NUM_BYTES; + let block_slice = &self.page_buffer[start_offset..start_offset + crate::BLOCK_NUM_BYTES]; + block_slice.try_into().unwrap() + } +} diff --git a/src/page_directory/readme.txt b/src/page_directory/readme.txt new file mode 100644 index 0000000..e7eb989 --- /dev/null +++ b/src/page_directory/readme.txt @@ -0,0 +1,23 @@ +The page directory is a single file that stores a list of pages. + +These page themselves are used to store the mrecordlog: a WAL common to all indexes. +Because the mrecordlog contains a several queues, it is possible for a single queue to +prevent a truncation at the scale of the mrecordlog. + +With a page system, a single queue lagging will only prevent the recollection of the +subset of pages holding data for that queue. + +The page directory works as follows. All pages have a physical id. +At all point in time, they are all organized in a specific order in which they +are supposed to be read and written in. That order is saved into a header of the file. + +We track the list of pages in use through reference counting. Upon GC, we recompute the right ordering +and written back into the header. + +In order to make writing that page list atomic, the header actually consists of two slots. +We track the number of GC operations that have been executed so far: the GC epoch. + +We alternatively write the first or the second one based on the parity of the epoch. +When reading, we read both slots and select non-corrupted slot that holds the highest epoch. +That way, if a GC operations is interrupted, it will only corrupt the new slot, and the old +will still be readable. diff --git a/src/page_directory/writer.rs b/src/page_directory/writer.rs new file mode 100644 index 0000000..e9b4f80 --- /dev/null +++ b/src/page_directory/writer.rs @@ -0,0 +1,146 @@ +use std::fs::File; +use std::io::{self, BufWriter, Seek as _, SeekFrom, Write}; + +use super::page_list::PageList; +use super::PageId; +use crate::page_directory::{PageRangeRef, PAGE_SIZE}; +use crate::{BlockWrite, FileLike, PersistAction, BLOCK_NUM_BYTES}; + +pub struct PageListWriter { + pub(crate) page_list: PageList, + pub(crate) wrt: BufWriter, + // cursor is the offset at which we are trying to write. + // This is not a physical offset (an offset on disk), but a logical one. + // The page list is here to convert this into an actual physical offset. + pub(crate) cursor: u64, +} + +impl BlockWrite for PageListWriter { + type Session = PageRangeRef; + + #[inline(always)] + fn start_write_session(&mut self) -> io::Result { + let Some(page_id) = self.get_write_page() else { + return Err(std::io::Error::new( + io::ErrorKind::StorageFull, + "All pages are used", + )); + }; + let initial_num_pages = if self.cursor % PAGE_SIZE as u64 == 0 { + // We haven't loaded the page yet. + 0 + } else { + 1 + }; + let page_ref = self + .page_list + .new_page_range_ref_with_num_pages(page_id, initial_num_pages); + Ok(page_ref) + } + + fn write(&mut self, buf: &[u8], page_range_ref: &mut PageRangeRef) -> std::io::Result<()> { + if buf.len() == 0 { + return Ok(()); + } + assert!(buf.len() <= self.num_bytes_remaining_in_block()); + if self.cursor % PAGE_SIZE as u64 == 0 { + // If we are about to run the first write on a page. + // We need to seek into its physical address. + page_range_ref.add_page(); + self.reposition_write_head()?; + } + self.wrt.write_all(buf)?; + self.cursor += buf.len() as u64; + Ok(()) + } + + fn persist(&mut self, persist_action: crate::PersistAction) -> std::io::Result<()> { + match persist_action { + PersistAction::FlushAndFsync => { + self.wrt.flush()?; + self.wrt.get_mut().fsyncdata()?; + Ok(()) + } + PersistAction::Flush => { + // This will flush the buffer of the BufWriter to the underlying OS. + self.wrt.flush() + } + } + } + + #[inline] + fn num_bytes_remaining_in_block(&self) -> usize { + BLOCK_NUM_BYTES - (self.cursor as usize % BLOCK_NUM_BYTES) + } + + fn make_room(&mut self, num_bytes: u64) -> io::Result<()> { + if self.remaining_capacity() >= num_bytes { + return Ok(()); + } + self.gc()?; + if self.remaining_capacity() >= num_bytes { + return Ok(()); + } + let error_msg = format!( + "mrecordlog capacity reached. cursor={}, num_pages={}, requested={num_bytes}", + self.cursor, + self.page_list.num_pages() + ); + Err(io::Error::new(io::ErrorKind::OutOfMemory, error_msg)) + } +} + +impl PageListWriter { + fn gc(&mut self) -> io::Result<()> { + let num_pages_delete = self.page_list.gc(self.cursor, &mut self.wrt)?; + // We need to update the cursor. + self.cursor -= num_pages_delete as u64 * PAGE_SIZE as u64; + self.reposition_write_head()?; + Ok(()) + } + + #[inline(always)] + pub fn num_pages(&self) -> u32 { + self.page_list.num_pages() + } + + pub fn num_used_pages(&self) -> u32 { + self.cursor.div_ceil(PAGE_SIZE as u64) as u32 + } + + // Returns the physical file offset range that corresponds to the current page. + #[inline(always)] + fn get_write_page(&mut self) -> Option { + // if self.cursor == self.page_list.num_pages() as u64 * PAGE_SIZE as u64 { + // self.gc()?; + // } + let page_ord = (self.cursor / PAGE_SIZE as u64) as usize; + self.page_list.page_id(page_ord) + } + + #[inline(always)] + fn remaining_capacity(&self) -> u64 { + let total_capacity = self.page_list.num_pages() as u64 * PAGE_SIZE as u64; + total_capacity - self.cursor + } + + #[cfg(test)] + pub fn into_file(self) -> F { + self.wrt.into_inner().map_err(|_| ()).unwrap() + } + + // Seek into the file to our current write position. + pub fn reposition_write_head(&mut self) -> io::Result<()> { + let page_ord = (self.cursor / PAGE_SIZE as u64) as usize; + let Some(page_start_offset) = self.page_list.page_start_offset(page_ord) else { + return Err(std::io::Error::new( + io::ErrorKind::StorageFull, + "All pages are used", + )); + }; + let offset_within_page = self.cursor - page_ord as u64 * PAGE_SIZE as u64; + self.wrt + .seek(SeekFrom::Start(page_start_offset + offset_within_page))?; + Ok(()) + } +} diff --git a/src/persist_policy.rs b/src/persist_policy.rs index 7a758af..b7b7fdb 100644 --- a/src/persist_policy.rs +++ b/src/persist_policy.rs @@ -32,7 +32,7 @@ impl PersistAction { /// /// The `PersistPolicy` defines the trade-off applied for the second kind of /// operations. -#[derive(Clone, Debug)] +#[derive(Clone, Copy, Debug)] pub enum PersistPolicy { /// Only ensure data is persisted when critical records are written. /// diff --git a/src/proptests.rs b/src/proptests.rs index a33c4f1..8ba2a4e 100644 --- a/src/proptests.rs +++ b/src/proptests.rs @@ -7,20 +7,26 @@ use proptest::prop_oneof; use proptest::strategy::{Just, Strategy}; use tempfile::TempDir; +use crate::multi_record_log::Preferences; use crate::record::{MultiPlexedRecord, MultiRecord}; -use crate::{MultiRecordLog, Record, Serializable}; +use crate::{MultiRecordLog, PersistAction, PersistPolicy, Record, Serializable}; struct PropTestEnv { tempdir: TempDir, record_log: MultiRecordLog, state: HashMap<&'static str, (Range, u64)>, - block_to_write: Vec, + record_to_write: Vec, + preferences: Preferences, } impl PropTestEnv { - pub fn new(block_size: usize) -> Self { + pub fn new(record_len: usize) -> Self { + Self::new_with_prefs(record_len, Preferences::default()) + } + + pub fn new_with_prefs(record_len: usize, preferences: Preferences) -> Self { let tempdir = tempfile::tempdir().unwrap(); - let mut record_log = MultiRecordLog::open(tempdir.path()).unwrap(); + let mut record_log = MultiRecordLog::open_with_prefs(tempdir.path(), preferences).unwrap(); record_log.create_queue("q1").unwrap(); record_log.create_queue("q2").unwrap(); let mut state = HashMap::default(); @@ -30,7 +36,8 @@ impl PropTestEnv { tempdir, record_log, state, - block_to_write: vec![b'A'; block_size], + record_to_write: vec![b'A'; record_len], + preferences, } } @@ -59,7 +66,8 @@ impl PropTestEnv { } pub fn reload(&mut self) { - self.record_log = MultiRecordLog::open(self.tempdir.path()).unwrap(); + self.record_log = + MultiRecordLog::open_with_prefs(self.tempdir.path(), self.preferences).unwrap(); for (queue, (_range, count)) in &self.state { assert_eq!( self.record_log.range(queue, ..).unwrap().count() as u64, @@ -98,7 +106,7 @@ impl PropTestEnv { .append_records( queue, Some(new_pos), - std::iter::repeat(&self.block_to_write[..]).take(count as usize), + std::iter::repeat(&self.record_to_write[..]).take(count as usize), ) .unwrap(); @@ -238,7 +246,13 @@ fn test_scenario_big_records() { }, Reopen, ]; - let mut env = PropTestEnv::new(1 << 26); + let mut env = PropTestEnv::new_with_prefs( + 1 << 26, + Preferences { + persist_policy: PersistPolicy::Always(PersistAction::Flush), + num_bytes: 500_000_000, + }, + ); // 64mb for op in ops { env.apply(op); } diff --git a/src/recordlog/reader.rs b/src/recordlog/reader.rs index c97080c..4c6ed89 100644 --- a/src/recordlog/reader.rs +++ b/src/recordlog/reader.rs @@ -1,13 +1,9 @@ -use std::io; - use crate::error::ReadRecordError; -use crate::frame::{FrameReader, FrameWriter, ReadFrameError}; -use crate::recordlog::RecordWriter; -use crate::rolling::{RollingReader, RollingWriter}; +use crate::frame::{FrameReader, ReadFrameError}; use crate::{BlockRead, Serializable}; pub struct RecordReader { - frame_reader: FrameReader, + pub(crate) frame_reader: FrameReader, record_buffer: Vec, // true if we are in the middle of reading a multifragment record. // This is useful, as it makes it possible to drop a record @@ -25,8 +21,8 @@ impl RecordReader { } } - pub fn read(&self) -> &R { - self.frame_reader.read() + pub fn start_session(&mut self) -> R::Session { + self.frame_reader.start_session() } /// Deserialize a record without actually consuming data. @@ -37,8 +33,9 @@ impl RecordReader { /// Advance cursor and deserialize the next record. pub fn read_record<'a, S: Serializable<'a>>( &'a mut self, + session: &mut R::Session, ) -> Result, ReadRecordError> { - let has_record = self.go_next()?; + let has_record = self.go_next(session)?; if has_record { let record = self.record().ok_or(ReadRecordError::Corruption)?; Ok(Some(record)) @@ -49,9 +46,9 @@ impl RecordReader { // Attempts to position the reader to the next record and return // true or false whether such a record is available or not. - pub fn go_next(&mut self) -> Result { + pub fn go_next(&mut self, session: &mut R::Session) -> Result { loop { - let frame = self.frame_reader.read_frame(); + let frame = self.frame_reader.read_frame(session); match frame { Ok((frame_type, frame_payload)) => { if frame_type.is_first_frame_of_record() { @@ -81,10 +78,3 @@ impl RecordReader { } } } - -impl RecordReader { - pub fn into_writer(self) -> io::Result> { - let frame_writer: FrameWriter = self.frame_reader.into_writer()?; - Ok(RecordWriter::from(frame_writer)) - } -} diff --git a/src/recordlog/tests.rs b/src/recordlog/tests.rs index 59f21df..6cb4adc 100644 --- a/src/recordlog/tests.rs +++ b/src/recordlog/tests.rs @@ -8,30 +8,38 @@ use crate::{PersistAction, BLOCK_NUM_BYTES}; fn test_no_data() { let data = vec![0u8; BLOCK_NUM_BYTES * 4]; let mut reader = RecordReader::open(ArrayReader::from(&data[..])); - assert_eq!(reader.read_record::<&str>().unwrap(), None); + let mut session = reader.start_session(); + assert_eq!(reader.read_record::<&str>(&mut session).unwrap(), None); } #[test] fn test_empty_record() { let mut writer = RecordWriter::in_memory(); - writer.write_record("").unwrap(); + let mut session = writer.start_session().unwrap(); + writer.write_record("", &mut session).unwrap(); writer.persist(PersistAction::Flush).unwrap(); let buf: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buf[..])); - assert_eq!(reader.read_record::<&str>().unwrap(), Some("")); - assert_eq!(reader.read_record::<&str>().unwrap(), None); + let mut session = reader.start_session(); + assert_eq!(reader.read_record::<&str>(&mut session).unwrap(), Some("")); + assert_eq!(reader.read_record::<&str>(&mut session).unwrap(), None); } #[test] fn test_simple_record() { let mut writer = RecordWriter::in_memory(); + let mut session = writer.start_session().unwrap(); let record = "hello"; - writer.write_record(record).unwrap(); + writer.write_record(record, &mut session).unwrap(); writer.persist(PersistAction::Flush).unwrap(); let buf: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buf[..])); - assert!(matches!(reader.read_record::<&str>(), Ok(Some("hello")))); - assert!(matches!(reader.read_record::<&str>(), Ok(None))); + let mut session = reader.start_session(); + assert!(matches!( + reader.read_record::<&str>(&mut session), + Ok(Some("hello")) + )); + assert!(matches!(reader.read_record::<&str>(&mut session), Ok(None))); } fn make_long_entry(len: usize) -> String { @@ -42,13 +50,17 @@ fn make_long_entry(len: usize) -> String { fn test_spans_over_more_than_one_block() { let long_entry: String = make_long_entry(80_000); let mut writer = RecordWriter::in_memory(); - writer.write_record(long_entry.as_str()).unwrap(); + let mut session = writer.start_session().unwrap(); + writer + .write_record(long_entry.as_str(), &mut session) + .unwrap(); writer.persist(PersistAction::Flush).unwrap(); let buf: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buf[..])); - let record_payload: &str = reader.read_record().unwrap().unwrap(); + let mut session = reader.start_session(); + let record_payload: &str = reader.read_record(&mut session).unwrap().unwrap(); assert_eq!(record_payload, &long_entry); - assert_eq!(reader.read_record::<&str>().unwrap(), None); + assert_eq!(reader.read_record::<&str>(&mut session).unwrap(), None); } #[test] @@ -58,17 +70,24 @@ fn test_block_requires_padding() { let long_record = make_long_entry(BLOCK_NUM_BYTES - HEADER_LEN - HEADER_LEN - 1 - 8); let short_record = "hello"; let mut writer = RecordWriter::in_memory(); - writer.write_record(long_record.as_str()).unwrap(); - writer.write_record(short_record).unwrap(); + let mut session = writer.start_session().unwrap(); + writer + .write_record(long_record.as_str(), &mut session) + .unwrap(); + writer.write_record(short_record, &mut session).unwrap(); writer.persist(PersistAction::Flush).unwrap(); let buffer: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buffer[..])); + let mut session = reader.start_session(); assert_eq!( - reader.read_record::<&str>().unwrap(), + reader.read_record::<&str>(&mut session).unwrap(), Some(long_record.as_str()) ); - assert_eq!(reader.read_record::<&str>().unwrap(), Some(short_record)); - assert_eq!(reader.read_record::<&str>().unwrap(), None); + assert_eq!( + reader.read_record::<&str>(&mut session).unwrap(), + Some(short_record) + ); + assert_eq!(reader.read_record::<&str>(&mut session).unwrap(), None); } #[test] @@ -78,45 +97,59 @@ fn test_first_chunk_empty() { let long_record = make_long_entry(BLOCK_NUM_BYTES - HEADER_LEN - HEADER_LEN); let short_record = "hello"; let mut writer = RecordWriter::in_memory(); - writer.write_record(&long_record[..]).unwrap(); - writer.write_record(short_record).unwrap(); + let mut session = writer.start_session().unwrap(); + writer.write_record(&long_record[..], &mut session).unwrap(); + writer.write_record(short_record, &mut session).unwrap(); writer.persist(PersistAction::Flush).unwrap(); let buf: Vec = writer.into_writer().into(); let mut reader = RecordReader::open(ArrayReader::from(&buf[..])); + let mut session = reader.start_session(); assert_eq!( - reader.read_record::<&str>().unwrap(), + reader.read_record::<&str>(&mut session).unwrap(), Some(long_record.as_str()) ); - assert_eq!(reader.read_record::<&str>().unwrap(), Some(short_record)); - assert_eq!(reader.read_record::<&str>().unwrap(), None); + assert_eq!( + reader.read_record::<&str>(&mut session).unwrap(), + Some(short_record) + ); + assert_eq!(reader.read_record::<&str>(&mut session).unwrap(), None); } #[test] fn test_behavior_upon_corruption() { let records: Vec = (0..1_000).map(|i| format!("hello{i}")).collect(); let mut writer = RecordWriter::in_memory(); + let mut session = writer.start_session().unwrap(); for record in &records { - writer.write_record(record.as_str()).unwrap(); + writer.write_record(record.as_str(), &mut session).unwrap(); } writer.persist(PersistAction::Flush).unwrap(); let mut buffer: Vec = writer.into_writer().into(); { let mut reader = RecordReader::open(ArrayReader::from(&buffer[..])); + let mut session = reader.start_session(); for record in &records { - assert_eq!(reader.read_record::<&str>().unwrap(), Some(record.as_str())); + assert_eq!( + reader.read_record::<&str>(&mut session).unwrap(), + Some(record.as_str()) + ); } - assert_eq!(reader.read_record::<&str>().unwrap(), None); + assert_eq!(reader.read_record::<&str>(&mut session).unwrap(), None); } // Introducing a corruption. buffer[1_000] = 3; { let mut reader = RecordReader::open(ArrayReader::from(&buffer[..])); + let mut session = reader.start_session(); for record in &records[0..72] { // bug at i=72 - assert_eq!(reader.read_record::<&str>().unwrap(), Some(record.as_str())); + assert_eq!( + reader.read_record::<&str>(&mut session).unwrap(), + Some(record.as_str()) + ); } assert!(matches!( - reader.read_record::<&str>(), + reader.read_record::<&str>(&mut session), Err(ReadRecordError::Corruption) )); } diff --git a/src/recordlog/writer.rs b/src/recordlog/writer.rs index 0a44d6e..5ac7bc0 100644 --- a/src/recordlog/writer.rs +++ b/src/recordlog/writer.rs @@ -1,12 +1,11 @@ use std::io; use crate::block_read_write::VecBlockWriter; -use crate::frame::{FrameType, FrameWriter}; -use crate::rolling::{Directory, FileNumber, RollingWriter}; -use crate::{BlockWrite, PersistAction, Serializable}; +use crate::frame::{FrameType, FrameWriter, HEADER_LEN}; +use crate::{BlockWrite, PersistAction, Serializable, BLOCK_NUM_BYTES}; pub struct RecordWriter { - frame_writer: FrameWriter, + pub(crate) frame_writer: FrameWriter, buffer: Vec, } @@ -36,6 +35,10 @@ impl RecordWriter { } impl RecordWriter { + pub fn start_session(&mut self) -> io::Result { + self.frame_writer.start_session() + } + /// Writes a record. /// /// Even if this call returns `Ok(())`, at this point the data @@ -44,11 +47,20 @@ impl RecordWriter { /// For instance, the data could be stale in a library level buffer, /// by a writer level buffer, or an application buffer, /// or could not be flushed to disk yet by the OS. - pub fn write_record<'a>(&mut self, record: impl Serializable<'a>) -> io::Result<()> { + pub fn write_record<'a>( + &mut self, + record: impl Serializable<'a>, + session: &mut W::Session, + ) -> io::Result<()> { let mut is_first_frame = true; self.buffer.clear(); record.serialize(&mut self.buffer); let mut payload = &self.buffer[..]; + let room_needed_upperbound: u64 = HEADER_LEN as u64 + + (BLOCK_NUM_BYTES as u64) + * payload.len().div_ceil(BLOCK_NUM_BYTES - HEADER_LEN) as u64; + self.frame_writer.make_room(room_needed_upperbound)?; + loop { let frame_payload_len = self .frame_writer @@ -58,7 +70,8 @@ impl RecordWriter { payload = &payload[frame_payload_len..]; let is_last_frame = payload.is_empty(); let frame_type = frame_type(is_first_frame, is_last_frame); - self.frame_writer.write_frame(frame_type, frame_payload)?; + self.frame_writer + .write_frame(frame_type, frame_payload, session)?; is_first_frame = false; if is_last_frame { break; @@ -77,20 +90,6 @@ impl RecordWriter { } } -impl RecordWriter { - pub fn directory(&mut self) -> &mut Directory { - self.frame_writer.directory() - } - - pub fn current_file(&mut self) -> &FileNumber { - self.get_underlying_wrt().current_file() - } - - pub fn size(&self) -> usize { - self.get_underlying_wrt().size() - } -} - impl RecordWriter { #[cfg(test)] pub fn in_memory() -> Self { diff --git a/src/rolling/directory.rs b/src/rolling/directory.rs index ac64bac..488bfed 100644 --- a/src/rolling/directory.rs +++ b/src/rolling/directory.rs @@ -75,12 +75,12 @@ impl Directory { } /// Get the first still used FileNumber. - pub fn first_file_number(&self) -> &FileNumber { + pub(crate) fn first_file_number(&self) -> &FileNumber { self.files.first() } /// Returns true if some file could be GCed. - pub fn has_files_that_can_be_deleted(&self) -> bool { + pub(crate) fn is_gc_necessary(&self) -> bool { self.files.count() >= 2 && self.files.first().can_be_deleted() } @@ -168,7 +168,13 @@ fn read_block(file: &mut File, block: &mut [u8; BLOCK_NUM_BYTES]) -> io::Result< } impl BlockRead for RollingReader { - fn next_block(&mut self) -> io::Result { + type Session = FileNumber; + + fn start_session(&self) -> Self::Session { + self.current_file().clone() + } + + fn next_block(&mut self, _session: &mut Self::Session) -> io::Result { let success = read_block(&mut self.file, &mut self.block)?; if success { self.block_id += 1; @@ -225,6 +231,8 @@ impl RollingWriter { &self.file_number } + /// Returns number of bytes occupied on the disk by the + /// different files of the directory pub fn size(&self) -> usize { self.directory.files.count() * FILE_NUM_BYTES } @@ -238,7 +246,13 @@ impl RollingWriter { } impl BlockWrite for RollingWriter { - fn write(&mut self, buf: &[u8]) -> io::Result<()> { + type Session = FileNumber; + + fn start_write_session(&mut self) -> io::Result { + Ok(self.current_file().clone()) + } + + fn write(&mut self, buf: &[u8], _session: &mut Self::Session) -> io::Result<()> { if buf.is_empty() { return Ok(()); } diff --git a/src/rolling/mod.rs b/src/rolling/mod.rs index 5cca8d0..9ef27b2 100644 --- a/src/rolling/mod.rs +++ b/src/rolling/mod.rs @@ -1,8 +1,12 @@ mod directory; mod file_number; +use std::io; + pub use self::directory::{Directory, RollingReader, RollingWriter}; pub use self::file_number::{FileNumber, FileTracker}; +use crate::frame::{FrameReader, FrameWriter}; +use crate::recordlog::{RecordReader, RecordWriter}; const FRAME_NUM_BYTES: usize = 1 << 15; @@ -15,3 +19,39 @@ const NUM_BLOCKS_PER_FILE: usize = 4; const FILE_NUM_BYTES: usize = FRAME_NUM_BYTES * NUM_BLOCKS_PER_FILE; #[cfg(test)] mod tests; + +impl FrameReader { + pub fn into_writer(self) -> io::Result> { + let mut rolling_writer: RollingWriter = self.reader.into_writer()?; + rolling_writer.forward(self.cursor)?; + Ok(FrameWriter::create(rolling_writer)) + } +} + +impl FrameWriter { + pub fn directory(&mut self) -> &mut Directory { + &mut self.wrt.directory + } +} + +impl RecordReader { + pub fn into_writer(self) -> io::Result> { + let frame_writer: FrameWriter = self.frame_reader.into_writer()?; + Ok(RecordWriter::from(frame_writer)) + } +} + +// TODO remove me +impl RecordWriter { + pub fn directory(&mut self) -> &mut Directory { + self.frame_writer.directory() + } + + pub fn current_file(&mut self) -> &FileNumber { + self.get_underlying_wrt().current_file() + } + + pub fn size(&self) -> usize { + self.get_underlying_wrt().size() + } +} diff --git a/src/rolling/tests.rs b/src/rolling/tests.rs index 1c017d8..b3b393d 100644 --- a/src/rolling/tests.rs +++ b/src/rolling/tests.rs @@ -9,19 +9,21 @@ fn test_read_write() { let rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); assert!(&rolling_reader.block().iter().all(|&b| b == 0)); let mut writer: RollingWriter = rolling_reader.into_writer().unwrap(); + let mut session = writer.start_write_session().unwrap(); buffer.fill(0u8); - writer.write(&buffer[..]).unwrap(); + writer.write(&buffer[..], &mut session).unwrap(); buffer.fill(1u8); - writer.write(&buffer[..]).unwrap(); + writer.write(&buffer[..], &mut session).unwrap(); buffer.fill(2u8); - writer.write(&buffer[..]).unwrap(); + writer.write(&buffer[..], &mut session).unwrap(); writer.persist(PersistAction::Flush).unwrap(); } let mut rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); + let mut session = rolling_reader.start_session(); assert!(rolling_reader.block().iter().all(|&b| b == 0)); - assert!(rolling_reader.next_block().unwrap()); + assert!(rolling_reader.next_block(&mut session).unwrap()); assert!(rolling_reader.block().iter().all(|&b| b == 1)); - assert!(rolling_reader.next_block().unwrap()); + assert!(rolling_reader.next_block(&mut session).unwrap()); assert!(rolling_reader.block().iter().all(|&b| b == 2)); } @@ -32,33 +34,36 @@ fn test_read_write_2nd_block() { { let rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); let mut writer: RollingWriter = rolling_reader.into_writer().unwrap(); + let mut session = writer.start_write_session().unwrap(); for i in 1..=10 { buffer.fill(i); - writer.write(&buffer[..]).unwrap(); + writer.write(&buffer[..], &mut session).unwrap(); } writer.persist(PersistAction::Flush).unwrap(); } { let mut rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); + let mut session = rolling_reader.start_session(); assert!(rolling_reader.block().iter().all(|&b| b == 1)); - assert!(rolling_reader.next_block().unwrap()); + assert!(rolling_reader.next_block(&mut session).unwrap()); assert!(rolling_reader.block().iter().all(|&b| b == 2)); - assert!(rolling_reader.next_block().unwrap()); + assert!(rolling_reader.next_block(&mut session).unwrap()); assert!(rolling_reader.block().iter().all(|&b| b == 3)); let mut writer: RollingWriter = rolling_reader.into_writer().unwrap(); for i in 13..=23 { buffer.fill(i); - writer.write(&buffer[..]).unwrap(); + writer.write(&buffer[..], &mut session).unwrap(); } writer.persist(PersistAction::Flush).unwrap(); } { let mut rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); + let mut session = rolling_reader.start_session(); assert!(rolling_reader.block().iter().all(|&b| b == 1)); - assert!(rolling_reader.next_block().unwrap()); + assert!(rolling_reader.next_block(&mut session).unwrap()); assert!(rolling_reader.block().iter().all(|&b| b == 2)); for i in 13..=23 { - assert!(rolling_reader.next_block().unwrap()); + assert!(rolling_reader.next_block(&mut session).unwrap()); assert!(rolling_reader.block().iter().all(|&b| b == i)); } } @@ -72,9 +77,10 @@ fn test_read_truncated() { { let rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); let mut writer: RollingWriter = rolling_reader.into_writer().unwrap(); + let mut session = writer.start_write_session().unwrap(); for i in 0..to_write { buffer.fill(i as u8); - writer.write(&buffer[..]).unwrap(); + writer.write(&buffer[..], &mut session).unwrap(); } writer.persist(PersistAction::Flush).unwrap(); let file_ids = writer.list_file_numbers(); @@ -91,6 +97,7 @@ fn test_read_truncated() { } { let mut rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); + let mut session = rolling_reader.start_session(); for i in 0..to_write { // ignore file 1 as it was corrupted @@ -100,7 +107,10 @@ fn test_read_truncated() { assert!(rolling_reader.block().iter().all(|&b| b == i as u8)); // check we manage to get the next block, except for the last block: there is nothing // after - assert_eq!(rolling_reader.next_block().unwrap(), i != to_write - 1); + assert_eq!( + rolling_reader.next_block(&mut session).unwrap(), + i != to_write - 1 + ); } } } @@ -114,10 +124,11 @@ fn test_directory_single_file() { assert_eq!(first_file.unroll(&directory.files), &[0]); } let mut rolling_reader: RollingReader = RollingReader::open(tmp_dir.path()).unwrap(); + let mut session = rolling_reader.start_session(); for _ in 0..NUM_BLOCKS_PER_FILE - 1 { - assert!(rolling_reader.next_block().unwrap()); + assert!(rolling_reader.next_block(&mut session).unwrap()); } - assert!(!rolling_reader.next_block().unwrap()); + assert!(!rolling_reader.next_block(&mut session).unwrap()); } #[test] @@ -128,9 +139,10 @@ fn test_directory_simple() { .unwrap() .into_writer() .unwrap(); + let mut session = writer.start_write_session().unwrap(); let buf = vec![1u8; FRAME_NUM_BYTES]; for _ in 0..(NUM_BLOCKS_PER_FILE + 1) { - writer.write(&buf).unwrap(); + writer.write(&buf, &mut session).unwrap(); } } { @@ -143,33 +155,33 @@ fn test_directory_simple() { #[test] fn test_directory_truncate() { let tmp_dir = tempfile::tempdir().unwrap(); - let file_0: FileNumber; - let file_1: FileNumber; - let file_2: FileNumber; - let file_3: FileNumber; + let mut file_0: FileNumber; + let mut file_1: FileNumber; + let mut file_2: FileNumber; + let mut file_3: FileNumber; { let reader = RollingReader::open(tmp_dir.path()).unwrap(); - file_0 = reader.current_file().clone(); - assert!(!file_0.can_be_deleted()); let mut writer: RollingWriter = reader.into_writer().unwrap(); + file_0 = writer.start_write_session().unwrap(); + assert!(!file_0.can_be_deleted()); let buf = vec![1u8; FRAME_NUM_BYTES]; assert_eq!(&writer.current_file().unroll(&writer.directory.files), &[0]); for _ in 0..NUM_BLOCKS_PER_FILE + 1 { - writer.write(&buf).unwrap(); + writer.write(&buf, &mut file_0).unwrap(); } assert_eq!(&writer.list_file_numbers(), &[0, 1]); - file_1 = writer.current_file().clone(); + file_1 = writer.start_write_session().unwrap(); assert_eq!(file_1.file_number(), 1); for _ in 0..NUM_BLOCKS_PER_FILE { - writer.write(&buf).unwrap(); + writer.write(&buf, &mut file_1).unwrap(); } assert_eq!(&writer.list_file_numbers(), &[0, 1, 2]); - file_2 = writer.current_file().clone(); + file_2 = writer.start_write_session().unwrap(); assert_eq!(file_2.file_number(), 2); for _ in 0..NUM_BLOCKS_PER_FILE { - writer.write(&buf).unwrap(); + writer.write(&buf, &mut file_2).unwrap(); } - file_3 = writer.current_file().clone(); + file_3 = writer.start_write_session().unwrap(); assert_eq!(&writer.list_file_numbers(), &[0, 1, 2, 3]); assert!(!file_0.can_be_deleted()); drop(file_1); diff --git a/src/tests.rs b/src/tests.rs index f1f03a2..3929e2a 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -2,6 +2,8 @@ use std::borrow::Cow; use bytes::Buf; +use crate::multi_record_log::Preferences; +use crate::page_directory::PAGE_SIZE; use crate::{MultiRecordLog, Record}; fn read_all_records<'a>(multi_record_log: &'a MultiRecordLog, queue: &str) -> Vec> { @@ -60,7 +62,7 @@ fn test_multi_record_log_simple() { &read_all_records(&multi_record_log, "queue"), &[b"hello".as_slice(), b"happy".as_slice()] ); - assert_eq!(&multi_record_log.list_file_numbers(), &[0]); + // assert_eq!(&multi_record_log.list_file_numbers(), &[0]); } } @@ -88,7 +90,7 @@ fn test_multi_record_log_chained() { &read_all_records(&multi_record_log, "queue"), &[b"world order".as_slice(), b"nice day".as_slice()] ); - assert_eq!(&multi_record_log.list_file_numbers(), &[0]); + // assert_eq!(&multi_record_log.list_file_numbers(), &[0]); } } @@ -111,7 +113,7 @@ fn test_multi_record_log_reopen() { &read_all_records(&multi_record_log, "queue"), &[b"hello".as_slice(), b"happy".as_slice()] ); - assert_eq!(&multi_record_log.list_file_numbers(), &[0]); + // assert_eq!(&multi_record_log.list_file_numbers(), &[0]); } } @@ -145,7 +147,7 @@ fn test_multi_record_log() { &read_all_records(&multi_record_log, "queue2"), &[b"maitre".as_slice(), b"corbeau".as_slice()] ); - assert_eq!(&multi_record_log.list_file_numbers(), &[0]); + // assert_eq!(&multi_record_log.list_file_numbers(), &[0]); } { let mut multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); @@ -161,7 +163,7 @@ fn test_multi_record_log() { b"bubu".as_slice() ] ); - assert_eq!(&multi_record_log.list_file_numbers(), &[0]); + // assert_eq!(&multi_record_log.list_file_numbers(), &[0]); } } @@ -186,12 +188,12 @@ fn test_multi_record_position_known_after_truncate() { .unwrap(), Some(1) ); - assert_eq!(&multi_record_log.list_file_numbers(), &[0]); + // assert_eq!(&multi_record_log.list_file_numbers(), &[0]); } { let mut multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); multi_record_log.truncate("queue", ..=1).unwrap(); - assert_eq!(&multi_record_log.list_file_numbers(), &[0]); + // assert_eq!(&multi_record_log.list_file_numbers(), &[0]); } { let mut multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); @@ -318,32 +320,33 @@ fn test_truncate_range_correct_pos() { } } -#[test] -fn test_multi_record_size() { - let tempdir = tempfile::tempdir().unwrap(); - { - let mut multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); - assert_eq!(multi_record_log.resource_usage().memory_used_bytes, 0); - assert_eq!(multi_record_log.resource_usage().memory_allocated_bytes, 0); - - multi_record_log.create_queue("queue").unwrap(); - let size_mem_create = multi_record_log.resource_usage(); - assert!(size_mem_create.memory_used_bytes > 0); - assert!(size_mem_create.memory_allocated_bytes >= size_mem_create.memory_used_bytes); - - multi_record_log - .append_record("queue", None, &b"hello"[..]) - .unwrap(); - let size_mem_append = multi_record_log.resource_usage(); - assert!(size_mem_append.memory_used_bytes > size_mem_create.memory_used_bytes); - assert!(size_mem_append.memory_allocated_bytes >= size_mem_append.memory_used_bytes); - assert!(size_mem_append.memory_allocated_bytes >= size_mem_create.memory_allocated_bytes); - - multi_record_log.truncate("queue", ..=0).unwrap(); - let size_mem_truncate = multi_record_log.resource_usage(); - assert!(size_mem_truncate.memory_used_bytes < size_mem_append.memory_used_bytes); - } -} +// #[test] +// fn test_multi_record_size() { +// let tempdir = tempfile::tempdir().unwrap(); +// { +// let mut multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); +// // assert_eq!(multi_record_log.resource_usage().memory_used_bytes, 0); +// // assert_eq!(multi_record_log.resource_usage().memory_allocated_bytes, 0); + +// multi_record_log.create_queue("queue").unwrap(); +// // let size_mem_create = multi_record_log.resource_usage(); +// assert!(size_mem_create.memory_used_bytes > 0); +// assert!(size_mem_create.memory_allocated_bytes >= size_mem_create.memory_used_bytes); + +// multi_record_log +// .append_record("queue", None, &b"hello"[..]) +// .unwrap(); +// // let size_mem_append = multi_record_log.resource_usage(); +// assert!(size_mem_append.memory_used_bytes > size_mem_create.memory_used_bytes); +// assert!(size_mem_append.memory_allocated_bytes >= size_mem_append.memory_used_bytes); +// assert!(size_mem_append.memory_allocated_bytes >= +// size_mem_create.memory_allocated_bytes); + +// multi_record_log.truncate("queue", ..=0).unwrap(); +// // let size_mem_truncate = multi_record_log.resource_usage(); +// assert!(size_mem_truncate.memory_used_bytes < size_mem_append.memory_used_bytes); +// } +// } #[test] fn test_open_corrupted() { @@ -455,3 +458,31 @@ fn test_last_record() { let last_record = multi_record_log.last_record("queue1").unwrap(); assert!(last_record.is_none()); } + +#[test] +fn test_gc() { + let tempdir = tempfile::tempdir().unwrap(); + + let preferences = Preferences { + num_bytes: PAGE_SIZE as u64 * 10, + ..Default::default() + }; + let mut multi_record_log = + MultiRecordLog::open_with_prefs(tempdir.path(), preferences).unwrap(); + multi_record_log.create_queue("queue1").unwrap(); + multi_record_log.create_queue("queue2").unwrap(); + + let payload = vec![b'a'; 5000]; + const N: usize = 100; + for _ in 0..N { + multi_record_log + .append_record("queue1", None, &payload[..]) + .unwrap(); + } + multi_record_log.truncate("queue1", ..=100).unwrap(); + for _ in 0..N { + multi_record_log + .append_record("queue2", None, &payload[..]) + .unwrap(); + } +}