diff --git a/Cargo.lock b/Cargo.lock index c657502e9..c3193c54f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3106,25 +3106,20 @@ dependencies = [ "bincode", "chrono", "futures-util", - "guinea", "log", "magicblock-config", "magicblock-core", "magicblock-ledger", - "magicblock-magic-program-api", "magicblock-program", "rusqlite", - "solana-account", "solana-instruction", "solana-message", - "solana-program", "solana-pubkey", - "solana-pubsub-client", + "solana-rpc-client", "solana-rpc-client-api", "solana-signature", "solana-transaction", "solana-transaction-error", - "test-kit", "thiserror 1.0.69", "tokio", "tokio-util", diff --git a/magicblock-api/src/magic_validator.rs b/magicblock-api/src/magic_validator.rs index 4de681a46..4534cccd3 100644 --- a/magicblock-api/src/magic_validator.rs +++ b/magicblock-api/src/magic_validator.rs @@ -285,7 +285,7 @@ impl MagicValidator { let task_scheduler = TaskSchedulerService::new( &task_scheduler_db_path, &config.task_scheduler, - dispatch.transaction_scheduler.clone(), + config.listen.http(), dispatch .tasks_service .take() diff --git a/magicblock-config/src/types/network.rs b/magicblock-config/src/types/network.rs index d3bdedec0..344049519 100644 --- a/magicblock-config/src/types/network.rs +++ b/magicblock-config/src/types/network.rs @@ -21,6 +21,30 @@ impl Default for BindAddress { } } +impl BindAddress { + fn as_connect_addr(&self) -> SocketAddr { + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; + + match self.0.ip() { + IpAddr::V4(ip) if ip.is_unspecified() => { + SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), self.0.port()) + } + IpAddr::V6(ip) if ip.is_unspecified() => { + SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), self.0.port()) + } + _ => self.0, + } + } + + pub fn http(&self) -> String { + format!("http://{}", self.as_connect_addr()) + } + + pub fn websocket(&self) -> String { + format!("ws://{}", self.as_connect_addr()) + } +} + /// A connection to one or more remote clusters (e.g., "devnet"). #[derive(Deserialize, Serialize, Clone, Debug, PartialEq)] #[serde(rename_all = "kebab-case", untagged)] diff --git a/magicblock-task-scheduler/Cargo.toml b/magicblock-task-scheduler/Cargo.toml index d70eae05b..dd1f6a8bb 100644 --- a/magicblock-task-scheduler/Cargo.toml +++ b/magicblock-task-scheduler/Cargo.toml @@ -19,9 +19,8 @@ magicblock-program = { workspace = true } rusqlite = { workspace = true } solana-instruction = { workspace = true } solana-message = { workspace = true } -solana-program = { workspace = true } solana-pubkey = { workspace = true } -solana-pubsub-client = { workspace = true } +solana-rpc-client = { workspace = true } solana-rpc-client-api = { workspace = true } solana-signature = { workspace = true } solana-transaction = { workspace = true } @@ -29,9 +28,3 @@ solana-transaction-error = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true, features = ["time"] } - -[dev-dependencies] -magicblock-magic-program-api = { workspace = true } -test-kit = { workspace = true } -guinea = { workspace = true } -solana-account = { workspace = true } diff --git a/magicblock-task-scheduler/src/db.rs b/magicblock-task-scheduler/src/db.rs index aeb328615..5ec23d6a9 100644 --- a/magicblock-task-scheduler/src/db.rs +++ b/magicblock-task-scheduler/src/db.rs @@ -7,7 +7,7 @@ use solana_instruction::Instruction; use solana_pubkey::Pubkey; use tokio::sync::Mutex; -use crate::errors::TaskSchedulerError; +use crate::errors::TaskSchedulerResult; /// Represents a task in the database /// Uses i64 for all timestamps and IDs to avoid overflows @@ -65,7 +65,7 @@ impl SchedulerDatabase { path.join("task_scheduler.sqlite") } - pub fn new>(path: P) -> Result { + pub fn new>(path: P) -> TaskSchedulerResult { let conn = Connection::open(path)?; // Create tables @@ -108,10 +108,7 @@ impl SchedulerDatabase { }) } - pub async fn insert_task( - &self, - task: &DbTask, - ) -> Result<(), TaskSchedulerError> { + pub async fn insert_task(&self, task: &DbTask) -> TaskSchedulerResult<()> { let instructions_bin = bincode::serialize(&task.instructions)?; let authority_str = task.authority.to_string(); let now = Utc::now().timestamp_millis(); @@ -139,7 +136,7 @@ impl SchedulerDatabase { &self, task_id: i64, last_execution: i64, - ) -> Result<(), TaskSchedulerError> { + ) -> TaskSchedulerResult<()> { let now = Utc::now().timestamp_millis(); self.conn.lock().await.execute( @@ -158,7 +155,7 @@ impl SchedulerDatabase { &self, task_id: i64, error: String, - ) -> Result<(), TaskSchedulerError> { + ) -> TaskSchedulerResult<()> { self.conn.lock().await.execute( "INSERT INTO failed_scheduling (timestamp, task_id, error) VALUES (?, ?, ?)", params![Utc::now().timestamp_millis(), task_id, error], @@ -171,7 +168,7 @@ impl SchedulerDatabase { &self, task_id: i64, error: String, - ) -> Result<(), TaskSchedulerError> { + ) -> TaskSchedulerResult<()> { self.conn.lock().await.execute( "INSERT INTO failed_tasks (timestamp, task_id, error) VALUES (?, ?, ?)", params![Utc::now().timestamp_millis(), task_id, error], @@ -183,7 +180,7 @@ impl SchedulerDatabase { pub async fn unschedule_task( &self, task_id: i64, - ) -> Result<(), TaskSchedulerError> { + ) -> TaskSchedulerResult<()> { self.conn.lock().await.execute( "UPDATE tasks SET executions_left = 0 WHERE id = ?", [task_id], @@ -192,10 +189,7 @@ impl SchedulerDatabase { Ok(()) } - pub async fn remove_task( - &self, - task_id: i64, - ) -> Result<(), TaskSchedulerError> { + pub async fn remove_task(&self, task_id: i64) -> TaskSchedulerResult<()> { self.conn .lock() .await @@ -207,7 +201,7 @@ impl SchedulerDatabase { pub async fn get_task( &self, task_id: i64, - ) -> Result, TaskSchedulerError> { + ) -> TaskSchedulerResult> { let db = self.conn.lock().await; let mut stmt = db.prepare( "SELECT id, instructions, authority, execution_interval_millis, executions_left, last_execution_millis @@ -244,7 +238,7 @@ impl SchedulerDatabase { Ok(rows.next().transpose()?) } - pub async fn get_tasks(&self) -> Result, TaskSchedulerError> { + pub async fn get_tasks(&self) -> TaskSchedulerResult> { let db = self.conn.lock().await; let mut stmt = db.prepare( "SELECT id, instructions, authority, execution_interval_millis, executions_left, last_execution_millis @@ -286,7 +280,7 @@ impl SchedulerDatabase { Ok(tasks) } - pub async fn get_task_ids(&self) -> Result, TaskSchedulerError> { + pub async fn get_task_ids(&self) -> TaskSchedulerResult> { let db = self.conn.lock().await; let mut stmt = db.prepare( "SELECT id @@ -300,7 +294,7 @@ impl SchedulerDatabase { pub async fn get_failed_schedulings( &self, - ) -> Result, TaskSchedulerError> { + ) -> TaskSchedulerResult> { let db = self.conn.lock().await; let mut stmt = db.prepare( "SELECT * @@ -321,7 +315,7 @@ impl SchedulerDatabase { pub async fn get_failed_tasks( &self, - ) -> Result, TaskSchedulerError> { + ) -> TaskSchedulerResult> { let db = self.conn.lock().await; let mut stmt = db.prepare( "SELECT * diff --git a/magicblock-task-scheduler/src/errors.rs b/magicblock-task-scheduler/src/errors.rs index d7ab79a50..7e070699e 100644 --- a/magicblock-task-scheduler/src/errors.rs +++ b/magicblock-task-scheduler/src/errors.rs @@ -7,47 +7,15 @@ pub enum TaskSchedulerError { #[error(transparent)] DatabaseConnection(#[from] rusqlite::Error), - #[error(transparent)] - Pubsub( - Box< - solana_pubsub_client::nonblocking::pubsub_client::PubsubClientError, - >, - ), - #[error(transparent)] Bincode(#[from] bincode::Error), - #[error("Task not found: {0}")] - TaskNotFound(i64), - #[error(transparent)] - Transaction(#[from] solana_transaction_error::TransactionError), - - #[error("Task context not found")] - TaskContextNotFound, + Rpc(#[from] Box), #[error(transparent)] Io(#[from] std::io::Error), - #[error("Failed to process some context requests: {0:?}")] - SchedulingRequests(Vec), - - #[error("Failed to serialize task context: {0:?}")] - ContextSerialization(Vec), - - #[error("Failed to deserialize task context: {0:?}")] - ContextDeserialization(Vec), - #[error("Task {0} already exists and is owned by {1}, not {2}")] UnauthorizedReplacing(i64, String, String), } - -impl From - for TaskSchedulerError -{ - fn from( - e: solana_pubsub_client::nonblocking::pubsub_client::PubsubClientError, - ) -> Self { - Self::Pubsub(Box::new(e)) - } -} diff --git a/magicblock-task-scheduler/src/service.rs b/magicblock-task-scheduler/src/service.rs index 1d8ad3211..3334d0738 100644 --- a/magicblock-task-scheduler/src/service.rs +++ b/magicblock-task-scheduler/src/service.rs @@ -7,9 +7,7 @@ use std::{ use futures_util::StreamExt; use log::*; use magicblock_config::config::TaskSchedulerConfig; -use magicblock_core::link::transactions::{ - ScheduledTasksRx, TransactionSchedulerHandle, -}; +use magicblock_core::link::transactions::ScheduledTasksRx; use magicblock_ledger::LatestBlock; use magicblock_program::{ args::{CancelTaskRequest, TaskRequest}, @@ -18,6 +16,7 @@ use magicblock_program::{ }; use solana_instruction::Instruction; use solana_message::Message; +use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_signature::Signature; use solana_transaction::Transaction; use tokio::{select, task::JoinHandle, time::Duration}; @@ -34,8 +33,8 @@ use crate::{ pub struct TaskSchedulerService { /// Database for persisting tasks db: SchedulerDatabase, - /// Used to send transactions for execution - tx_scheduler: TransactionSchedulerHandle, + /// RPC client used to send transactions + rpc_client: RpcClient, /// Used to receive scheduled tasks from the transaction executor scheduled_tasks: ScheduledTasksRx, /// Provides latest blockhash for signing transactions @@ -52,7 +51,7 @@ pub struct TaskSchedulerService { enum ProcessingOutcome { Success, - Recoverable(TaskSchedulerError), + Recoverable(Box), } // SAFETY: TaskSchedulerService is moved into a single Tokio task in `start()` and never cloned. @@ -65,11 +64,11 @@ impl TaskSchedulerService { pub fn new( path: &Path, config: &TaskSchedulerConfig, - tx_scheduler: TransactionSchedulerHandle, + rpc_url: String, scheduled_tasks: ScheduledTasksRx, block: LatestBlock, token: CancellationToken, - ) -> Result { + ) -> TaskSchedulerResult { if config.reset { match std::fs::remove_file(path) { Ok(_) => {} @@ -87,7 +86,7 @@ impl TaskSchedulerService { let db = SchedulerDatabase::new(path)?; Ok(Self { db, - tx_scheduler, + rpc_client: RpcClient::new(rpc_url), scheduled_tasks, block, task_queue: DelayQueue::new(), @@ -139,7 +138,7 @@ impl TaskSchedulerService { schedule_request.id, e ); - return Ok(ProcessingOutcome::Recoverable(e)); + return Ok(ProcessingOutcome::Recoverable(Box::new(e))); } } TaskRequest::Cancel(cancel_request) => { @@ -157,7 +156,7 @@ impl TaskSchedulerService { cancel_request.task_id, e ); - return Ok(ProcessingOutcome::Recoverable(e)); + return Ok(ProcessingOutcome::Recoverable(Box::new(e))); } } }; @@ -321,8 +320,10 @@ impl TaskSchedulerService { blockhash, ); - let sig = tx.signatures[0]; - self.tx_scheduler.execute(tx).await?; - Ok(sig) + Ok(self + .rpc_client + .send_transaction(&tx) + .await + .map_err(Box::new)?) } } diff --git a/magicblock-task-scheduler/tests/service.rs b/magicblock-task-scheduler/tests/service.rs deleted file mode 100644 index 601f8274b..000000000 --- a/magicblock-task-scheduler/tests/service.rs +++ /dev/null @@ -1,219 +0,0 @@ -use std::time::Duration; - -use guinea::GuineaInstruction; -use magicblock_config::config::TaskSchedulerConfig; -use magicblock_program::{ - args::ScheduleTaskArgs, - validator::{init_validator_authority_if_needed, validator_authority_id}, -}; -use magicblock_task_scheduler::{ - errors::TaskSchedulerResult, SchedulerDatabase, TaskSchedulerError, - TaskSchedulerService, -}; -use solana_account::ReadableAccount; -use solana_program::{ - instruction::{AccountMeta, Instruction}, - native_token::LAMPORTS_PER_SOL, -}; -use test_kit::{ExecutionTestEnv, Signer}; -use tokio::task::JoinHandle; -use tokio_util::sync::CancellationToken; - -type SetupResult = TaskSchedulerResult<( - ExecutionTestEnv, - CancellationToken, - JoinHandle>, -)>; - -async fn setup() -> SetupResult { - let mut env = ExecutionTestEnv::new(); - - init_validator_authority_if_needed(env.payer.insecure_clone()); - // NOTE: validator authority is unique for all tests in this file, but the payer changes for each test - // Airdrop some SOL to the validator authority, which is used to pay task fees - env.fund_account(validator_authority_id(), LAMPORTS_PER_SOL); - - let token = CancellationToken::new(); - let task_scheduler_db_path = SchedulerDatabase::path( - env.ledger - .ledger_path() - .parent() - .expect("ledger_path didn't have a parent, should never happen"), - ); - let handle = TaskSchedulerService::new( - &task_scheduler_db_path, - &TaskSchedulerConfig::default(), - env.transaction_scheduler.clone(), - env.dispatch - .tasks_service - .take() - .expect("Tasks service should be initialized"), - env.ledger.latest_block().clone(), - token.clone(), - )? - .start() - .await?; - - Ok((env, token, handle)) -} - -#[tokio::test] -pub async fn test_schedule_task() -> TaskSchedulerResult<()> { - let (env, token, handle) = setup().await?; - - let account = - env.create_account_with_config(LAMPORTS_PER_SOL, 1, guinea::ID); - - // Schedule a task - let ix = Instruction::new_with_bincode( - guinea::ID, - &GuineaInstruction::ScheduleTask(ScheduleTaskArgs { - task_id: 1, - execution_interval_millis: 10, - iterations: 1, - instructions: vec![Instruction::new_with_bincode( - guinea::ID, - &GuineaInstruction::Increment, - vec![AccountMeta::new(account.pubkey(), false)], - )], - }), - vec![ - AccountMeta::new_readonly(magicblock_magic_program_api::ID, false), - AccountMeta::new(env.payer.pubkey(), true), - AccountMeta::new(account.pubkey(), false), - ], - ); - let txn = env.build_transaction(&[ix]); - let result = env.execute_transaction(txn).await; - assert!( - result.is_ok(), - "failed to execute schedule task transaction: {:?}", - result - ); - - // Wait until the task scheduler actually mutates the account (with an upper bound to avoid hangs) - tokio::time::timeout(Duration::from_secs(1), async { - loop { - if env.get_account(account.pubkey()).data().first() == Some(&1) { - break; - } - tokio::time::sleep(Duration::from_millis(20)).await; - } - }) - .await - .expect("task scheduler never incremented the account within 1s"); - - token.cancel(); - handle.await.expect("task service join handle failed")?; - - Ok(()) -} - -#[tokio::test] -pub async fn test_cancel_task() -> TaskSchedulerResult<()> { - let (env, token, handle) = setup().await?; - - let account = - env.create_account_with_config(LAMPORTS_PER_SOL, 1, guinea::ID); - - // Schedule a task - let task_id = 2; - let interval = 100; - let ix = Instruction::new_with_bincode( - guinea::ID, - &GuineaInstruction::ScheduleTask(ScheduleTaskArgs { - task_id, - execution_interval_millis: interval, - iterations: 100, - instructions: vec![Instruction::new_with_bincode( - guinea::ID, - &GuineaInstruction::Increment, - vec![AccountMeta::new(account.pubkey(), false)], - )], - }), - vec![ - AccountMeta::new_readonly(magicblock_magic_program_api::ID, false), - AccountMeta::new(env.payer.pubkey(), true), - AccountMeta::new(account.pubkey(), false), - ], - ); - let txn = env.build_transaction(&[ix]); - let result = env.execute_transaction(txn).await; - assert!( - result.is_ok(), - "failed to execute schedule task transaction: {:?}", - result - ); - - // Wait until we actually observe at least five executions - let executed_before_cancel = tokio::time::timeout( - Duration::from_millis(10 * interval as u64), - async { - loop { - if let Some(value) = - env.get_account(account.pubkey()).data().first() - { - if *value >= 5 { - break *value; - } - } - tokio::time::sleep(Duration::from_millis(20)).await; - } - }, - ) - .await - .expect("task scheduler never reached five executions within 10 intervals"); - - // Cancel the task - let ix = Instruction::new_with_bincode( - guinea::ID, - &GuineaInstruction::CancelTask(task_id), - vec![ - AccountMeta::new_readonly(magicblock_magic_program_api::ID, false), - AccountMeta::new(env.payer.pubkey(), true), - ], - ); - let txn = env.build_transaction(&[ix]); - let result = env.execute_transaction(txn).await; - assert!( - result.is_ok(), - "failed to execute cancel task transaction: {:?}", - result - ); - - // Wait for the cancel to be processed - tokio::time::sleep(Duration::from_millis(interval as u64)).await; - - let value_at_cancel = env - .get_account(account.pubkey()) - .data() - .first() - .copied() - .unwrap_or_default(); - assert!( - value_at_cancel >= executed_before_cancel, - "unexpected: value at cancellation ({}) < value when 5 executions were observed ({})", - value_at_cancel, - executed_before_cancel - ); - - // Ensure the scheduler stops issuing executions after cancellation - tokio::time::sleep(Duration::from_millis(2 * interval as u64)).await; - - let value_after_cancel = env - .get_account(account.pubkey()) - .data() - .first() - .copied() - .unwrap_or_default(); - - assert_eq!( - value_after_cancel, value_at_cancel, - "task scheduler kept executing after cancellation" - ); - - token.cancel(); - handle.await.expect("task service join handle failed")?; - - Ok(()) -} diff --git a/test-integration/Cargo.lock b/test-integration/Cargo.lock index 65bb664be..c456a8262 100644 --- a/test-integration/Cargo.lock +++ b/test-integration/Cargo.lock @@ -3505,9 +3505,8 @@ dependencies = [ "rusqlite", "solana-instruction", "solana-message", - "solana-program", "solana-pubkey", - "solana-pubsub-client", + "solana-rpc-client", "solana-rpc-client-api", "solana-signature", "solana-transaction", diff --git a/test-integration/programs/schedulecommit/src/api.rs b/test-integration/programs/schedulecommit/src/api.rs index adecc662b..8e1d84543 100644 --- a/test-integration/programs/schedulecommit/src/api.rs +++ b/test-integration/programs/schedulecommit/src/api.rs @@ -65,7 +65,7 @@ pub fn delegate_account_cpi_instruction( let args = DelegateCpiArgs { valid_until: i64::MAX, - commit_frequency_ms: 1_000_000_000, + commit_frequency_ms: 0, validator, player, };