Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ magicblock-committor-program = { path = "./magicblock-committor-program", featur
magicblock-committor-service = { path = "./magicblock-committor-service" }
magicblock-config = { path = "./magicblock-config" }
magicblock-core = { path = "./magicblock-core" }
magicblock-delegation-program = { git = "https://github.com/magicblock-labs/delegation-program.git", rev = "e8d03936", features = [
magicblock-delegation-program = { git = "https://github.com/magicblock-labs/delegation-program.git", rev = "ea1f2f916268132248fe8d5de5f07d76765dd937", features = [
"no-entrypoint",
] }
magicblock-geyser-plugin = { path = "./magicblock-geyser-plugin" }
Expand Down
10 changes: 5 additions & 5 deletions magicblock-committor-service/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ IntentExecutor - responsible for execution of Intent. Calls **TransactionPrepar
TransactionPreparator - is an entity that handles all of the above "Transaction preparation" calling **TaskBuilderV1**, **TaskStrategist**, **DeliveryPreparator** and then assempling it all and passing to **MessageExecutor**

## DeliveryPreparator
After our **BaseTask**s are ready we need to prepare eveything for their successful execution. **DeliveryPreparator** - handles ALTs and commit buffers
After our **Task**s are ready we need to prepare eveything for their successful execution. **DeliveryPreparator** - handles ALTs and commit buffers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix spelling error.

"eveything" should be "everything".

Apply this diff:

-After our **Task**s are ready we need to prepare eveything for their successful execution. **DeliveryPreparator** - handles ALTs and commit buffers
+After our **Task**s are ready we need to prepare everything for their successful execution. **DeliveryPreparator** - handles ALTs and commit buffers
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
After our **Task**s are ready we need to prepare eveything for their successful execution. **DeliveryPreparator** - handles ALTs and commit buffers
After our **Task**s are ready we need to prepare everything for their successful execution. **DeliveryPreparator** - handles ALTs and commit buffers
🧰 Tools
🪛 LanguageTool

[grammar] ~26-~26: Ensure spelling is correct
Context: ... Tasks are ready we need to prepare eveything for their successful execution. **Deliv...

(QB_NEW_EN_ORTHOGRAPHY_ERROR_IDS_1)

🤖 Prompt for AI Agents
In magicblock-committor-service/README.md around line 26, fix the spelling
mistake "eveything" to "everything" in the sentence so it reads: "After our
**Task**s are ready we need to prepare everything for their successful
execution. **DeliveryPreparator** - handles ALTs and commit buffers".


## TaskBuilder
First, lets build atomic tasks from scheduled message/intent.

High level: TaskBuilder responsible for creating BaseTasks(to be renamed...) from ScheduledBaseIntent(to be renamed...).
High level: TaskBuilder responsible for creating Tasks(to be renamed...) from ScheduledBaseIntent(to be renamed...).
Details: To do that is requires additional information from DelegationMetadata, it is provided **CommitIdFetcher**

### BaseTask
High level: BaseTask - is an atomic operation that is to be performed on the Base layer, like: Commit, Undelegate, Finalize, Action.
### Task
High level: Task - is an atomic operation that is to be performed on the Base layer, like: Commit, Undelegate, Finalize, Action.

Details: There's to implementation of BaseTask: ArgsTask, BufferTask. ArgsTask - gives instruction using args. BufferTask - gives instruction using buffer. BufferTask at the moment supports only commits
Details: There's to implementation of Task: ArgsTask, BufferTask. ArgsTask - gives instruction using args. BufferTask - gives instruction using buffer. BufferTask at the moment supports only commits

### TaskInfoFetcher
High level: for account to be accepted by `dlp` it needs to have incremental commit ids. TaskInfoFetcher provides a user with the correct ids/nonces for set of committees
Expand Down
6 changes: 3 additions & 3 deletions magicblock-committor-service/src/intent_executor/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use solana_transaction_error::TransactionError;
use crate::{
tasks::{
task_builder::TaskBuilderError, task_strategist::TaskStrategistError,
BaseTask, TaskType,
Task, TaskType,
},
transaction_preparator::error::TransactionPreparatorError,
};
Expand Down Expand Up @@ -174,7 +174,7 @@ impl TransactionStrategyExecutionError {
pub fn try_from_transaction_error(
err: TransactionError,
signature: Option<Signature>,
tasks: &[Box<dyn BaseTask>],
tasks: &[Task],
) -> Result<Self, TransactionError> {
// There's always 2 budget instructions in front
const OFFSET: u8 = 2;
Expand Down Expand Up @@ -256,7 +256,7 @@ impl metrics::LabelValue for TransactionStrategyExecutionError {
}

pub(crate) struct IntentTransactionErrorMapper<'a> {
pub tasks: &'a [Box<dyn BaseTask>],
pub tasks: &'a [Task],
}
impl TransactionErrorMapper for IntentTransactionErrorMapper<'_> {
type ExecutionError = TransactionStrategyExecutionError;
Expand Down
49 changes: 42 additions & 7 deletions magicblock-committor-service/src/intent_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub use null_task_info_fetcher::*;
use solana_pubkey::Pubkey;
use solana_rpc_client_api::config::RpcTransactionConfig;
use solana_signature::Signature;
use solana_signer::Signer;
use solana_signer::{Signer, SignerError};
use solana_transaction::versioned::VersionedTransaction;

use crate::{
Expand All @@ -53,10 +53,11 @@ use crate::{
tasks::{
task_builder::{TaskBuilderError, TaskBuilderImpl, TasksBuilder},
task_strategist::{
StrategyExecutionMode, TaskStrategist, TransactionStrategy,
StrategyExecutionMode, TaskStrategist, TaskStrategistError,
TransactionStrategy,
},
task_visitors::utility_visitor::TaskVisitorUtils,
BaseTask, TaskType,
Task, TaskType,
},
transaction_preparator::{
delivery_preparator::BufferExecutionError,
Expand Down Expand Up @@ -145,6 +146,40 @@ where
}
}

/// Checks if it is possible to unite Commit & Finalize stages in 1 transaction
/// Returns corresponding `TransactionStrategy` if possible, otherwise `None`
fn try_unite_tasks<P: IntentPersister>(
commit_tasks: &[Task],
finalize_task: &[Task],
authority: &Pubkey,
persister: &Option<P>,
) -> Result<Option<TransactionStrategy>, SignerError> {
const MAX_UNITED_TASKS_LEN: usize = 22;

// We can unite in 1 tx a lot of commits
// but then there's a possibility of hitting CPI limit, aka
// MaxInstructionTraceLengthExceeded error.
// So we limit tasks len with 22 total tasks
// In case this fails as well, it will be retried with TwoStage approach
// on retry, once retries are introduced
if commit_tasks.len() + finalize_task.len() > MAX_UNITED_TASKS_LEN {
return Ok(None);
}

// Clone tasks since strategies applied to united case maybe suboptimal for regular one
let mut commit_tasks = commit_tasks.to_owned();
let finalize_task = finalize_task.to_owned();

// Unite tasks to attempt running as single tx
commit_tasks.extend(finalize_task);
match TaskStrategist::build_strategy(commit_tasks, authority, persister)
{
Ok(strategy) => Ok(Some(strategy)),
Err(TaskStrategistError::FailedToFitError) => Ok(None),
Err(TaskStrategistError::SignerError(err)) => Err(err),
}
}

async fn execute_inner<P: IntentPersister>(
&mut self,
base_intent: ScheduledBaseIntent,
Expand All @@ -169,7 +204,7 @@ where
Some(value) => value,
None => {
// Build tasks for commit stage
let commit_tasks = TaskBuilderImpl::commit_tasks(
let commit_tasks = TaskBuilderImpl::create_commit_tasks(
&self.task_info_fetcher,
&base_intent,
persister,
Expand All @@ -194,7 +229,7 @@ where

// Build tasks for commit & finalize stages
let (commit_tasks, finalize_tasks) = {
let commit_tasks_fut = TaskBuilderImpl::commit_tasks(
let commit_tasks_fut = TaskBuilderImpl::create_commit_tasks(
&self.task_info_fetcher,
&base_intent,
persister,
Expand Down Expand Up @@ -641,7 +676,7 @@ where
async fn execute_message_with_retries(
&self,
prepared_message: VersionedMessage,
tasks: &[Box<dyn BaseTask>],
tasks: &[Task],
) -> IntentExecutorResult<Signature, TransactionStrategyExecutionError>
{
struct IntentErrorMapper<TxMap> {
Expand Down Expand Up @@ -874,7 +909,7 @@ mod tests {
let intent = create_test_intent(0, &pubkey);

let info_fetcher = Arc::new(MockInfoFetcher);
let commit_task = TaskBuilderImpl::commit_tasks(
let commit_task = TaskBuilderImpl::create_commit_tasks(
&info_fetcher,
&intent,
&None::<IntentPersisterImpl>,
Expand Down
215 changes: 215 additions & 0 deletions magicblock-committor-service/src/tasks/buffer_lifecycle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
use magicblock_committor_program::{
instruction_builder::{
close_buffer::{create_close_ix, CreateCloseIxArgs},
init_buffer::{create_init_ix, CreateInitIxArgs},
realloc_buffer::{
create_realloc_buffer_ixs, CreateReallocBufferIxArgs,
},
write_buffer::{create_write_ix, CreateWriteIxArgs},
},
pdas, ChangesetChunks, Chunks,
};
use magicblock_program::magic_scheduled_base_intent::CommittedAccount;
use solana_account::Account;
use solana_instruction::Instruction;
use solana_pubkey::Pubkey;

use crate::consts::MAX_WRITE_CHUNK_SIZE;

#[derive(Debug, Clone)]
pub struct BufferLifecycle {
// TODO (snawaz): rename
// PreparationTask -> CreateBufferTask
// CleanupTask -> DestroyBufferTask
pub preparation: PreparationTask,
pub cleanup: CleanupTask,
}

impl BufferLifecycle {
pub fn new(
commit_id: u64,
account: &CommittedAccount,
base_account: Option<&Account>,
) -> BufferLifecycle {
let data = if let Some(base_account) = base_account {
dlp::compute_diff(&base_account.data, &account.account.data)
.to_vec()
} else {
account.account.data.clone()
};

BufferLifecycle {
preparation: PreparationTask {
commit_id,
pubkey: account.pubkey,
chunks: Chunks::from_data_length(
data.len(),
MAX_WRITE_CHUNK_SIZE,
),
state_or_diff: data,
},
cleanup: CleanupTask {
pubkey: account.pubkey,
commit_id,
},
}
}
}

#[derive(Clone, Debug)]
pub struct PreparationTask {
pub commit_id: u64,
pub pubkey: Pubkey,
pub chunks: Chunks,
pub state_or_diff: Vec<u8>,
}

impl PreparationTask {
/// Returns initialization [`Instruction`]
pub fn instruction(&self, authority: &Pubkey) -> Instruction {
// // SAFETY: as object_length internally uses only already allocated or static buffers,
// // and we don't use any fs writers, so the only error that may occur here is of kind
// // OutOfMemory or WriteZero. This is impossible due to:
// // Chunks::new panics if its size exceeds MAX_ACCOUNT_ALLOC_PER_INSTRUCTION_SIZE or 10_240
// // https://github.com/near/borsh-rs/blob/f1b75a6b50740bfb6231b7d0b1bd93ea58ca5452/borsh/src/ser/helpers.rs#L59
let chunks_account_size =
borsh::object_length(&self.chunks).unwrap() as u64;
let buffer_account_size = self.state_or_diff.len() as u64;

let (instruction, _, _) = create_init_ix(CreateInitIxArgs {
authority: *authority,
pubkey: self.pubkey,
chunks_account_size,
buffer_account_size,
commit_id: self.commit_id,
chunk_count: self.chunks.count(),
chunk_size: self.chunks.chunk_size(),
});

instruction
}

/// Returns compute units required for realloc instruction
pub fn init_compute_units(&self) -> u32 {
12_000
}

/// Returns realloc instruction required for Buffer preparation
#[allow(clippy::let_and_return)]
pub fn realloc_instructions(&self, authority: &Pubkey) -> Vec<Instruction> {
let buffer_account_size = self.state_or_diff.len() as u64;
let realloc_instructions =
create_realloc_buffer_ixs(CreateReallocBufferIxArgs {
authority: *authority,
pubkey: self.pubkey,
buffer_account_size,
commit_id: self.commit_id,
});

realloc_instructions
}

/// Returns compute units required for realloc instruction
pub fn realloc_compute_units(&self) -> u32 {
6_000
}

/// Returns realloc instruction required for Buffer preparation
#[allow(clippy::let_and_return)]
pub fn write_instructions(&self, authority: &Pubkey) -> Vec<Instruction> {
let chunks_iter =
ChangesetChunks::new(&self.chunks, self.chunks.chunk_size())
.iter(&self.state_or_diff);
let write_instructions = chunks_iter
.map(|chunk| {
create_write_ix(CreateWriteIxArgs {
authority: *authority,
pubkey: self.pubkey,
offset: chunk.offset,
data_chunk: chunk.data_chunk,
commit_id: self.commit_id,
})
})
.collect::<Vec<_>>();

write_instructions
}

pub fn write_compute_units(&self, bytes_count: usize) -> u32 {
const PER_BYTE: u32 = 3;

u32::try_from(bytes_count)
.ok()
.and_then(|bytes_count| bytes_count.checked_mul(PER_BYTE))
.unwrap_or(u32::MAX)
}
Comment on lines +117 to +145
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

write_compute_units saturates to u32::MAX on large inputs—document this as an intentional cap

write_compute_units does a careful overflow-safe multiplication and then falls back to u32::MAX:

const PER_BYTE: u32 = 3;

u32::try_from(bytes_count)
    .ok()
    .and_then(|bytes_count| bytes_count.checked_mul(PER_BYTE))
    .unwrap_or(u32::MAX)

This is a sensible pattern, but the semantics (“if the length doesn’t fit in u32 or the product overflows, treat it as max CU”) aren’t obvious from call sites. A brief comment or docstring note that this acts as a saturating upper bound would make it clear that:

  • Extremely large buffers intentionally force a maximal CU estimate, and
  • Callers should treat u32::MAX as “don’t even try to pack this into a normal TX”.

No code change needed; just making the saturation behavior explicit will help future readers.

🤖 Prompt for AI Agents
In magicblock-committor-service/src/tasks/buffer_lifecycle.rs around lines 117
to 145, document that write_compute_units intentionally saturates to u32::MAX
when the input bytes_count cannot be converted to u32 or when the multiplication
would overflow; add a brief comment or docstring above write_compute_units
stating that it returns a per-byte estimate (PER_BYTE = 3) and treats
conversion/overflow failures as an intentional cap (u32::MAX) so callers know to
treat that value as “maximal/unsuitable for normal TX packing.”


pub fn chunks_pda(&self, authority: &Pubkey) -> Pubkey {
pdas::chunks_pda(
authority,
&self.pubkey,
self.commit_id.to_le_bytes().as_slice(),
)
.0
}

pub fn buffer_pda(&self, authority: &Pubkey) -> Pubkey {
pdas::buffer_pda(
authority,
&self.pubkey,
self.commit_id.to_le_bytes().as_slice(),
)
.0
}

pub fn cleanup_task(&self) -> CleanupTask {
CleanupTask {
pubkey: self.pubkey,
commit_id: self.commit_id,
}
}
}

#[derive(Clone, Debug)]
pub struct CleanupTask {
pub pubkey: Pubkey,
pub commit_id: u64,
}

impl CleanupTask {
pub fn instruction(&self, authority: &Pubkey) -> Instruction {
create_close_ix(CreateCloseIxArgs {
authority: *authority,
pubkey: self.pubkey,
commit_id: self.commit_id,
})
}

/// Returns compute units required to execute [`CleanupTask`]
pub fn compute_units(&self) -> u32 {
30_000
}

/// Returns a number of [`CleanupTask`]s that is possible to fit in single
pub const fn max_tx_fit_count_with_budget() -> usize {
8
}

pub fn chunks_pda(&self, authority: &Pubkey) -> Pubkey {
pdas::chunks_pda(
authority,
&self.pubkey,
self.commit_id.to_le_bytes().as_slice(),
)
.0
}

pub fn buffer_pda(&self, authority: &Pubkey) -> Pubkey {
pdas::buffer_pda(
authority,
&self.pubkey,
self.commit_id.to_le_bytes().as_slice(),
)
.0
}
}
Loading