oasis_runtime_sdk/
dispatcher.rs

1//! Transaction dispatcher.
2use std::{
3    collections::{BTreeMap, BTreeSet},
4    convert::TryInto,
5    marker::PhantomData,
6    sync::{atomic::AtomicBool, Arc},
7};
8
9use anyhow::anyhow;
10use slog::error;
11use thiserror::Error;
12
13use oasis_core_runtime::{
14    self,
15    common::crypto::hash::Hash,
16    consensus::{roothash, verifier::Verifier},
17    enclave_rpc::dispatcher::Dispatcher as RpcDispatcher,
18    future::block_on,
19    protocol::{HostInfo, Protocol},
20    transaction::{
21        self,
22        dispatcher::{ExecuteBatchResult, ExecuteTxResult},
23        tags::Tags,
24        types::TxnBatch,
25    },
26    types::{CheckTxMetadata, CheckTxResult},
27};
28
29use crate::{
30    callformat,
31    context::{Context, RuntimeBatchContext},
32    enclave_rpc,
33    error::{Error as _, RuntimeError},
34    event::IntoTags,
35    keymanager::{KeyManagerClient, KeyManagerError},
36    module::{self, BlockHandler, MethodHandler, TransactionHandler},
37    modules,
38    modules::core::API as _,
39    runtime::Runtime,
40    schedule_control::ScheduleControlHost,
41    sender::SenderMeta,
42    state::{self, CurrentState, Mode, TransactionResult, TransactionWithMeta},
43    storage::{self, Prefix},
44    types,
45    types::transaction::{AuthProof, Transaction},
46};
47
48/// Unique module name.
49const MODULE_NAME: &str = "dispatcher";
50
51/// Error emitted by the dispatch process. Note that this indicates an error in the dispatch
52/// process itself and should not be used for any transaction-related errors.
53#[derive(Error, Debug, oasis_runtime_sdk_macros::Error)]
54#[sdk_error(abort_self)]
55pub enum Error {
56    #[error("dispatch aborted")]
57    #[sdk_error(code = 1)]
58    Aborted,
59
60    #[error("malformed transaction in batch: {0}")]
61    #[sdk_error(code = 2)]
62    MalformedTransactionInBatch(#[source] anyhow::Error),
63
64    #[error("query aborted: {0}")]
65    #[sdk_error(code = 3)]
66    QueryAborted(String),
67
68    #[error("key manager failure: {0}")]
69    #[sdk_error(code = 4)]
70    KeyManagerFailure(#[from] KeyManagerError),
71
72    #[error("batch out of gas")]
73    #[sdk_error(code = 5)]
74    BatchOutOfGas,
75}
76
77/// Result of dispatching a transaction.
78#[derive(Debug)]
79pub struct DispatchResult {
80    /// Transaction call result.
81    pub result: module::CallResult,
82    /// Transaction tags.
83    pub tags: Tags,
84    /// Transaction priority.
85    pub priority: u64,
86    /// Transaction sender metadata.
87    pub sender_metadata: SenderMeta,
88    /// Call format metadata.
89    pub call_format_metadata: callformat::Metadata,
90}
91
92impl DispatchResult {
93    fn new(
94        result: module::CallResult,
95        tags: Tags,
96        call_format_metadata: callformat::Metadata,
97    ) -> Self {
98        Self {
99            result,
100            tags,
101            priority: 0,
102            sender_metadata: Default::default(),
103            call_format_metadata,
104        }
105    }
106}
107
108impl From<module::CallResult> for DispatchResult {
109    fn from(result: module::CallResult) -> Self {
110        Self::new(result, vec![], callformat::Metadata::Empty)
111    }
112}
113
114/// Additional options for dispatch operations.
115#[derive(Default)]
116pub struct DispatchOptions<'a> {
117    /// Transaction size.
118    pub tx_size: u32,
119    /// Transaction index within the batch.
120    pub tx_index: usize,
121    /// Transaction hash.
122    pub tx_hash: Hash,
123    /// Optionally only allow methods for which the provided authorizer closure returns true.
124    pub method_authorizer: Option<&'a dyn Fn(&str) -> bool>,
125    /// Optionally skip authentication.
126    pub skip_authentication: bool,
127}
128
129/// The runtime dispatcher.
130pub struct Dispatcher<R: Runtime> {
131    host_info: HostInfo,
132    host: Arc<Protocol>,
133    key_manager: Option<Arc<KeyManagerClient>>,
134    consensus_verifier: Arc<dyn Verifier>,
135    schedule_control_host: Arc<dyn ScheduleControlHost>,
136    _runtime: PhantomData<R>,
137}
138
139impl<R: Runtime> Dispatcher<R> {
140    /// Create a new instance of the dispatcher for the given runtime.
141    ///
142    /// Note that the dispatcher is fully static and the constructor is only needed so that the
143    /// instance can be used directly with the dispatcher system provided by Oasis Core.
144    pub(super) fn new(
145        host: Arc<Protocol>,
146        key_manager: Option<Arc<KeyManagerClient>>,
147        consensus_verifier: Arc<dyn Verifier>,
148    ) -> Self {
149        Self {
150            host_info: host.get_host_info(),
151            key_manager,
152            consensus_verifier,
153            schedule_control_host: host.clone(),
154            host,
155            _runtime: PhantomData,
156        }
157    }
158
159    /// Decode a runtime transaction.
160    pub fn decode_tx<C: Context>(
161        ctx: &C,
162        tx: &[u8],
163    ) -> Result<types::transaction::Transaction, modules::core::Error> {
164        // Perform any checks before decoding.
165        R::Modules::approve_raw_tx(ctx, tx)?;
166
167        // Deserialize transaction.
168        let utx: types::transaction::UnverifiedTransaction = cbor::from_slice(tx)
169            .map_err(|e| modules::core::Error::MalformedTransaction(e.into()))?;
170
171        // Perform any checks before signature verification.
172        R::Modules::approve_unverified_tx(ctx, &utx)?;
173
174        match utx.1.as_slice() {
175            [AuthProof::Module(scheme)] => {
176                R::Modules::decode_tx(ctx, scheme, &utx.0)?.ok_or_else(|| {
177                    modules::core::Error::MalformedTransaction(anyhow!(
178                        "module-controlled transaction decoding scheme {scheme} not supported"
179                    ))
180                })
181            }
182            _ => utx
183                .verify()
184                .map_err(|e| modules::core::Error::MalformedTransaction(e.into())),
185        }
186    }
187
188    /// Run the dispatch steps inside a transaction context. This includes the before call hooks,
189    /// the call itself and after call hooks. The after call hooks are called regardless if the call
190    /// succeeds or not.
191    pub fn dispatch_tx_call<C: Context>(
192        ctx: &C,
193        call: types::transaction::Call,
194        opts: &DispatchOptions<'_>,
195    ) -> (module::CallResult, callformat::Metadata) {
196        let read_only = call.read_only;
197
198        // Dispatch the call.
199        let (result, metadata) = Self::_dispatch_tx_call(ctx, call, opts);
200
201        // Unconditionally call after handle call hook.
202        if let Err(e) = R::Modules::after_handle_call(ctx, &result) {
203            // If the call failed, return the error.
204            return (e.into_call_result(), metadata);
205        };
206
207        // Make sure that a read-only call did not result in any modifications.
208        if read_only && CurrentState::with(|state| state.has_pending_store_updates()) {
209            return (
210                modules::core::Error::ReadOnlyTransaction.into_call_result(),
211                metadata,
212            );
213        }
214
215        (result, metadata)
216    }
217
218    fn _dispatch_tx_call<C: Context>(
219        ctx: &C,
220        call: types::transaction::Call,
221        opts: &DispatchOptions<'_>,
222    ) -> (module::CallResult, callformat::Metadata) {
223        if let Err(e) = R::Modules::before_handle_call(ctx, &call) {
224            return (e.into_call_result(), callformat::Metadata::Empty);
225        }
226
227        // Decode call based on specified call format.
228        let (call, call_format_metadata) = match callformat::decode_call(ctx, call, opts.tx_index) {
229            Ok(Some(result)) => result,
230            Ok(None) => {
231                return (
232                    module::CallResult::Ok(cbor::Value::Simple(cbor::SimpleValue::NullValue)),
233                    callformat::Metadata::Empty,
234                )
235            }
236            Err(err) => return (err.into_call_result(), callformat::Metadata::Empty),
237        };
238
239        // Apply optional method authorization.
240        if let Some(method_authorizer) = opts.method_authorizer {
241            if !method_authorizer(&call.method) {
242                return (
243                    modules::core::Error::Forbidden.into_call_result(),
244                    call_format_metadata,
245                );
246            }
247        }
248
249        if let Err(e) = R::Modules::before_authorized_call_dispatch(ctx, &call) {
250            return (e.into_call_result(), call_format_metadata);
251        }
252
253        let result = match R::Modules::dispatch_call(ctx, &call.method, call.body) {
254            module::DispatchResult::Handled(result) => result,
255            module::DispatchResult::Unhandled(_) => {
256                modules::core::Error::InvalidMethod(call.method).into_call_result()
257            }
258        };
259
260        (result, call_format_metadata)
261    }
262
263    /// Dispatch a runtime transaction in the given context with the provided options.
264    pub fn dispatch_tx_opts<C: Context>(
265        ctx: &C,
266        tx: types::transaction::Transaction,
267        opts: &DispatchOptions<'_>,
268    ) -> Result<DispatchResult, Error> {
269        // Run pre-processing hooks.
270        if !opts.skip_authentication {
271            if let Err(err) = R::Modules::authenticate_tx(ctx, &tx) {
272                return Ok(err.into_call_result().into());
273            }
274        }
275        let tx_auth_info = tx.auth_info.clone();
276        let is_read_only = tx.call.read_only;
277        let call = tx.call.clone(); // TODO: Avoid clone.
278
279        let result = CurrentState::with_transaction_opts(
280            state::Options::new().with_tx(TransactionWithMeta {
281                data: tx,
282                size: opts.tx_size,
283                index: opts.tx_index,
284                hash: opts.tx_hash,
285            }),
286            || {
287                let (result, call_format_metadata) = Self::dispatch_tx_call(ctx, call, opts);
288                if !result.is_success() || is_read_only {
289                    // Retrieve unconditional events.
290                    let events = CurrentState::with(|state| state.take_unconditional_events());
291
292                    return TransactionResult::Rollback(DispatchResult::new(
293                        result,
294                        events.into_tags(),
295                        call_format_metadata,
296                    ));
297                }
298
299                // Load priority.
300                let priority = R::Core::take_priority();
301                // Load sender metadata.
302                let sender_metadata = R::Core::take_sender_meta();
303
304                if CurrentState::with_env(|env| env.is_check_only()) {
305                    TransactionResult::Rollback(DispatchResult {
306                        result,
307                        tags: Vec::new(),
308                        priority,
309                        sender_metadata,
310                        call_format_metadata,
311                    })
312                } else {
313                    // Merge normal and unconditional events.
314                    let tags = CurrentState::with(|state| state.take_all_events().into_tags());
315
316                    TransactionResult::Commit(DispatchResult {
317                        result,
318                        tags,
319                        priority,
320                        sender_metadata,
321                        call_format_metadata,
322                    })
323                }
324            },
325        );
326
327        // Run after dispatch hooks.
328        R::Modules::after_dispatch_tx(ctx, &tx_auth_info, &result.result);
329
330        // Propagate batch aborts.
331        if let module::CallResult::Aborted(err) = result.result {
332            return Err(err);
333        }
334
335        Ok(result)
336    }
337
338    /// Dispatch a runtime transaction in the given context.
339    pub fn dispatch_tx<C: Context>(
340        ctx: &C,
341        tx_size: u32,
342        tx: types::transaction::Transaction,
343        tx_index: usize,
344    ) -> Result<DispatchResult, Error> {
345        Self::dispatch_tx_opts(
346            ctx,
347            tx,
348            &DispatchOptions {
349                tx_size,
350                tx_index,
351                ..Default::default()
352            },
353        )
354    }
355
356    /// Check whether the given transaction is valid.
357    pub fn check_tx<C: Context>(
358        ctx: &C,
359        tx_size: u32,
360        tx: Transaction,
361    ) -> Result<CheckTxResult, Error> {
362        // In case of any panics, treat it as a failed check instead of crashing the runtime.
363        let catch_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
364            Self::dispatch_tx(ctx, tx_size, tx, usize::MAX)
365        }));
366        let dispatch = match catch_result {
367            Ok(dispatch) => dispatch?,
368            Err(panic_err) => {
369                // Convert panics into transaction check failures as it is clearly the fault of a
370                // specific transaction.
371                return Ok(CheckTxResult {
372                    error: RuntimeError {
373                        module: MODULE_NAME.to_string(),
374                        code: 1,
375                        message: format!("transaction check aborted: {panic_err:?}"),
376                    },
377                    meta: None,
378                });
379            }
380        };
381
382        match dispatch.result {
383            module::CallResult::Ok(_) => Ok(CheckTxResult {
384                error: Default::default(),
385                meta: Some(CheckTxMetadata {
386                    priority: dispatch.priority,
387                    sender: dispatch.sender_metadata.id(),
388                    sender_seq: dispatch.sender_metadata.tx_nonce,
389                    sender_state_seq: dispatch.sender_metadata.state_nonce,
390                }),
391            }),
392
393            module::CallResult::Failed {
394                module,
395                code,
396                message,
397            } => Ok(CheckTxResult {
398                error: RuntimeError {
399                    module,
400                    code,
401                    message,
402                },
403                meta: None,
404            }),
405
406            module::CallResult::Aborted(err) => Err(err),
407        }
408    }
409
410    /// Execute the given transaction, returning unserialized results.
411    pub fn execute_tx_opts<C: Context>(
412        ctx: &C,
413        tx: Transaction,
414        opts: &DispatchOptions<'_>,
415    ) -> Result<(types::transaction::CallResult, Tags), Error> {
416        let dispatch_result = Self::dispatch_tx_opts(ctx, tx, opts)?;
417        let output: types::transaction::CallResult = callformat::encode_result(
418            ctx,
419            dispatch_result.result,
420            dispatch_result.call_format_metadata,
421        );
422
423        Ok((output, dispatch_result.tags))
424    }
425
426    /// Execute the given transaction.
427    pub fn execute_tx<C: Context>(
428        ctx: &C,
429        tx_size: u32,
430        tx_hash: Hash,
431        tx: Transaction,
432        tx_index: usize,
433    ) -> Result<ExecuteTxResult, Error> {
434        let (output, tags) = Self::execute_tx_opts(
435            ctx,
436            tx,
437            &DispatchOptions {
438                tx_size,
439                tx_index,
440                tx_hash,
441                ..Default::default()
442            },
443        )?;
444
445        Ok(ExecuteTxResult {
446            output: cbor::to_vec(output),
447            tags,
448        })
449    }
450
451    /// Prefetch prefixes for the given transaction.
452    pub fn prefetch_tx(
453        prefixes: &mut BTreeSet<Prefix>,
454        tx: types::transaction::Transaction,
455    ) -> Result<(), RuntimeError> {
456        match R::Modules::prefetch(prefixes, &tx.call.method, tx.call.body, &tx.auth_info) {
457            module::DispatchResult::Handled(r) => r,
458            module::DispatchResult::Unhandled(_) => Ok(()), // Unimplemented prefetch is allowed.
459        }
460    }
461
462    fn handle_last_round_messages<C: Context>(ctx: &C) -> Result<(), modules::core::Error> {
463        let message_events = ctx.runtime_round_results().messages.clone();
464
465        let mut handlers = CurrentState::with_store(|store| {
466            let store = storage::TypedStore::new(storage::PrefixStore::new(
467                store,
468                &modules::core::MODULE_NAME,
469            ));
470            let handlers: BTreeMap<u32, types::message::MessageEventHookInvocation> = store
471                .get(modules::core::state::MESSAGE_HANDLERS)
472                .unwrap_or_default();
473
474            handlers
475        });
476
477        for event in message_events {
478            let handler = handlers
479                .remove(&event.index)
480                .ok_or(modules::core::Error::MessageHandlerMissing(event.index))?;
481            let hook_name = handler.hook_name.clone();
482
483            R::Modules::dispatch_message_result(
484                ctx,
485                &hook_name,
486                types::message::MessageResult {
487                    event,
488                    context: handler.payload,
489                },
490            )
491            .ok_or(modules::core::Error::InvalidMethod(hook_name))?;
492        }
493
494        if !handlers.is_empty() {
495            error!(ctx.get_logger("dispatcher"), "message handler not invoked"; "unhandled" => ?handlers);
496            return Err(modules::core::Error::MessageHandlerNotInvoked);
497        }
498
499        Ok(())
500    }
501
502    fn save_emitted_message_handlers(handlers: Vec<types::message::MessageEventHookInvocation>) {
503        let message_handlers: BTreeMap<u32, types::message::MessageEventHookInvocation> = handlers
504            .into_iter()
505            .enumerate()
506            .map(|(idx, h)| (idx as u32, h))
507            .collect();
508
509        CurrentState::with_store(|store| {
510            let mut store = storage::TypedStore::new(storage::PrefixStore::new(
511                store,
512                &modules::core::MODULE_NAME,
513            ));
514            store.insert(modules::core::state::MESSAGE_HANDLERS, message_handlers);
515        });
516    }
517
518    /// Process the given runtime query.
519    pub fn dispatch_query<C: Context>(
520        ctx: &C,
521        method: &str,
522        args: Vec<u8>,
523    ) -> Result<Vec<u8>, RuntimeError> {
524        let args = cbor::from_slice(&args)
525            .map_err(|err| modules::core::Error::InvalidArgument(err.into()))?;
526
527        CurrentState::with_transaction(|| {
528            // Catch any panics that occur during query dispatch.
529            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
530                // Perform state migrations if required.
531                R::migrate(ctx);
532
533                if !R::is_allowed_query(method) || !ctx.is_allowed_query::<R>(method) {
534                    return Err(modules::core::Error::Forbidden.into());
535                }
536
537                R::Modules::dispatch_query(ctx, method, args)
538                    .ok_or_else(|| modules::core::Error::InvalidMethod(method.into()))?
539            }));
540
541            // Always rollback any changes to storage. Note that this is usually a no-op because
542            // Oasis Core would rollback any storage changes related to queries, but this makes it
543            // explicit to ensure this remains the case regardless of upstream changes.
544            TransactionResult::Rollback(result)
545        })
546        .map_err(|err| -> RuntimeError { Error::QueryAborted(format!("{err:?}")).into() })?
547        .map(cbor::to_vec)
548    }
549
550    fn execute_batch_common<F>(
551        &self,
552        mut rt_ctx: transaction::Context<'_>,
553        f: F,
554    ) -> Result<ExecuteBatchResult, RuntimeError>
555    where
556        F: FnOnce(&RuntimeBatchContext<'_, R>) -> Result<Vec<ExecuteTxResult>, RuntimeError>,
557    {
558        // Prepare dispatch context.
559        let key_manager = self
560            .key_manager
561            .as_ref()
562            // NOTE: We are explicitly allowing private key operations during execution.
563            .map(|mgr| mgr.with_private_context());
564        let history = self.consensus_verifier.clone();
565
566        let root = storage::MKVSStore::new(&mut rt_ctx.runtime_state);
567        let ctx = RuntimeBatchContext::<'_, R>::new(
568            &self.host_info,
569            key_manager,
570            rt_ctx.header,
571            rt_ctx.round_results,
572            &rt_ctx.consensus_state,
573            &history,
574            rt_ctx.epoch,
575            rt_ctx.max_messages,
576        );
577
578        CurrentState::enter_opts(state::Options::new().with_mode(Mode::Execute), root, || {
579            // Perform state migrations if required.
580            R::migrate(&ctx);
581
582            // Handle last round message results.
583            Self::handle_last_round_messages(&ctx)?;
584
585            // Run begin block hooks.
586            R::Modules::begin_block(&ctx);
587
588            let results = f(&ctx)?;
589
590            // Run end block hooks.
591            R::Modules::end_block(&ctx);
592
593            // Process any emitted messages and block-level events.
594            let (messages, handlers, block_tags) = CurrentState::with(|state| {
595                let (messages, handlers) = state.take_messages().into_iter().unzip();
596                let block_tags = state.take_all_events().into_tags();
597
598                (messages, handlers, block_tags)
599            });
600            Self::save_emitted_message_handlers(handlers);
601
602            Ok(ExecuteBatchResult {
603                results,
604                messages,
605                block_tags,
606                tx_reject_hashes: vec![],
607                in_msgs_count: 0, // TODO: Support processing incoming messages.
608            })
609        })
610    }
611
612    /// Register EnclaveRPC methods.
613    pub fn register_enclaverpc(&self, rpc: &mut RpcDispatcher)
614    where
615        R: Runtime + Send + Sync + 'static,
616    {
617        enclave_rpc::Wrapper::<R>::wrap(
618            rpc,
619            self.host.clone(),
620            self.host_info.clone(),
621            self.key_manager.clone(),
622            self.consensus_verifier.clone(),
623        );
624    }
625}
626
627impl<R: Runtime + Send + Sync> transaction::dispatcher::Dispatcher for Dispatcher<R> {
628    fn execute_batch(
629        &self,
630        rt_ctx: transaction::Context<'_>,
631        batch: &TxnBatch,
632        _in_msgs: &[roothash::IncomingMessage],
633    ) -> Result<ExecuteBatchResult, RuntimeError> {
634        self.execute_batch_common(
635            rt_ctx,
636            |ctx| -> Result<Vec<ExecuteTxResult>, RuntimeError> {
637                // If prefetch limit is set enable prefetch.
638                let prefetch_enabled = R::PREFETCH_LIMIT > 0;
639
640                let mut txs = Vec::with_capacity(batch.len());
641                let mut prefixes: BTreeSet<Prefix> = BTreeSet::new();
642                for tx in batch.iter() {
643                    let tx_size = tx.len().try_into().map_err(|_| {
644                        Error::MalformedTransactionInBatch(anyhow!("transaction too large"))
645                    })?;
646                    let tx_hash = Hash::digest_bytes(tx);
647                    // It is an error to include a malformed transaction in a batch. So instead of only
648                    // reporting a failed execution result, we fail the whole batch. This will make the compute
649                    // node vote for failure and the round will fail.
650                    //
651                    // Correct proposers should only include transactions which have passed check_tx.
652                    let tx = Self::decode_tx(ctx, tx)
653                        .map_err(|err| Error::MalformedTransactionInBatch(err.into()))?;
654                    txs.push((tx_size, tx_hash, tx.clone()));
655
656                    if prefetch_enabled {
657                        Self::prefetch_tx(&mut prefixes, tx)?;
658                    }
659                }
660                if prefetch_enabled {
661                    CurrentState::with_store(|store| {
662                        store.prefetch_prefixes(prefixes.into_iter().collect(), R::PREFETCH_LIMIT);
663                    })
664                }
665
666                // Execute the batch.
667                let mut results = Vec::with_capacity(batch.len());
668                for (index, (tx_size, tx_hash, tx)) in txs.into_iter().enumerate() {
669                    results.push(Self::execute_tx(ctx, tx_size, tx_hash, tx, index)?);
670                }
671
672                Ok(results)
673            },
674        )
675    }
676
677    fn schedule_and_execute_batch(
678        &self,
679        rt_ctx: transaction::Context<'_>,
680        batch: &mut TxnBatch,
681        _in_msgs: &[roothash::IncomingMessage],
682    ) -> Result<ExecuteBatchResult, RuntimeError> {
683        let cfg = R::SCHEDULE_CONTROL;
684        let mut tx_reject_hashes = Vec::new();
685
686        let mut result = self.execute_batch_common(
687            rt_ctx,
688            |ctx| -> Result<Vec<ExecuteTxResult>, RuntimeError> {
689                // Schedule and execute the batch.
690                //
691                // The idea is to keep scheduling transactions as long as we have some space
692                // available in the block as determined by gas use.
693                let mut new_batch = Vec::new();
694                let mut results = Vec::with_capacity(batch.len());
695                let mut requested_batch_len = cfg.initial_batch_size;
696                'batch: loop {
697                    // Remember length of last batch.
698                    let last_batch_len = batch.len();
699                    let last_batch_tx_hash = batch.last().map(|raw_tx| Hash::digest_bytes(raw_tx));
700
701                    for raw_tx in batch.drain(..) {
702                        // If we don't have enough gas for processing even the cheapest transaction
703                        // we are done. Same if we reached the runtime-imposed maximum tx count.
704                        let remaining_gas = R::Core::remaining_batch_gas();
705                        if remaining_gas < cfg.min_remaining_gas
706                            || new_batch.len() >= cfg.max_tx_count
707                        {
708                            break 'batch;
709                        }
710
711                        // Decode transaction.
712                        let tx_hash = Hash::digest_bytes(&raw_tx);
713                        let tx = match Self::decode_tx(ctx, &raw_tx) {
714                            Ok(tx) => tx,
715                            Err(_) => {
716                                // Transaction is malformed, make sure it gets removed from the
717                                // queue and don't include it in a block.
718                                tx_reject_hashes.push(tx_hash);
719                                continue;
720                            }
721                        };
722                        let tx_size = raw_tx.len().try_into().unwrap();
723
724                        // If we don't have enough gas remaining to process this transaction, just
725                        // skip it.
726                        if tx.auth_info.fee.gas > remaining_gas {
727                            continue;
728                        }
729                        // Same if we don't have enough consensus message slots.
730                        let remaining_messages = CurrentState::with(|state| {
731                            ctx.max_messages()
732                                .saturating_sub(state.emitted_messages_count() as u32)
733                        });
734                        if tx.auth_info.fee.consensus_messages > remaining_messages {
735                            continue;
736                        }
737
738                        // Determine the current transaction index.
739                        let tx_index = new_batch.len();
740
741                        // First run the transaction in check tx mode in a separate subcontext. If
742                        // that fails, skip and (sometimes) reject transaction.
743                        let skip = CurrentState::with_transaction_opts(
744                            state::Options::new().with_mode(Mode::PreSchedule),
745                            || -> Result<_, Error> {
746                                // First authenticate the transaction to get any nonce related errors.
747                                match R::Modules::authenticate_tx(ctx, &tx) {
748                                    Err(modules::core::Error::FutureNonce) => {
749                                        // Only skip transaction as it may become valid in the future.
750                                        return Ok(true);
751                                    }
752                                    Err(_) => {
753                                        // Skip and reject the transaction.
754                                        tx_reject_hashes.push(tx_hash);
755                                        return Ok(true);
756                                    }
757                                    Ok(_) => {}
758                                }
759
760                                // Run additional checks on the transaction.
761                                let check_result = Self::dispatch_tx_opts(
762                                    ctx,
763                                    tx.clone(),
764                                    &DispatchOptions {
765                                        tx_size,
766                                        tx_index,
767                                        tx_hash,
768                                        skip_authentication: true, // Already done.
769                                        ..Default::default()
770                                    },
771                                )?;
772
773                                if !check_result.result.is_success() {
774                                    // Skip and reject the transaction.
775                                    tx_reject_hashes.push(tx_hash);
776                                    return Ok(true);
777                                }
778
779                                // Checks successful, execute transaction as usual.
780                                Ok(false)
781                            },
782                        )?;
783                        if skip {
784                            continue;
785                        }
786
787                        new_batch.push(raw_tx);
788                        results.push(Self::execute_tx(ctx, tx_size, tx_hash, tx, tx_index)?);
789                    }
790
791                    // If there's more room in the block and we got the maximum number of
792                    // transactions, request more transactions.
793                    if last_batch_tx_hash.is_some()
794                        && last_batch_len >= requested_batch_len as usize
795                    {
796                        if let Some(fetched_batch) = self
797                            .schedule_control_host
798                            .fetch_tx_batch(last_batch_tx_hash, cfg.batch_size)?
799                        {
800                            *batch = fetched_batch;
801                            requested_batch_len = cfg.batch_size;
802                            continue;
803                        }
804                        // No more transactions, let's just finish.
805                    }
806                    break;
807                }
808
809                // Replace input batch with newly generated batch.
810                *batch = new_batch.into();
811
812                Ok(results)
813            },
814        )?;
815
816        // Include rejected transaction hashes in the final result.
817        result.tx_reject_hashes = tx_reject_hashes;
818
819        Ok(result)
820    }
821
822    fn check_batch(
823        &self,
824        mut rt_ctx: transaction::Context<'_>,
825        batch: &TxnBatch,
826    ) -> Result<Vec<CheckTxResult>, RuntimeError> {
827        // If prefetch limit is set enable prefetch.
828        let prefetch_enabled = R::PREFETCH_LIMIT > 0;
829
830        // Prepare dispatch context.
831        let key_manager = self.key_manager.as_ref().map(|mgr| mgr.with_context());
832        let history = self.consensus_verifier.clone();
833
834        let root = storage::MKVSStore::new(&mut rt_ctx.runtime_state);
835        let ctx = RuntimeBatchContext::<'_, R>::new(
836            &self.host_info,
837            key_manager,
838            rt_ctx.header,
839            rt_ctx.round_results,
840            &rt_ctx.consensus_state,
841            &history,
842            rt_ctx.epoch,
843            rt_ctx.max_messages,
844        );
845
846        CurrentState::enter_opts(
847            state::Options::new().with_mode(state::Mode::Check),
848            root,
849            || {
850                // Perform state migrations if required.
851                R::migrate(&ctx);
852
853                // Prefetch.
854                let mut txs: Vec<Result<_, RuntimeError>> = Vec::with_capacity(batch.len());
855                let mut prefixes: BTreeSet<Prefix> = BTreeSet::new();
856                for tx in batch.iter() {
857                    let tx_size = tx.len().try_into().map_err(|_| {
858                        Error::MalformedTransactionInBatch(anyhow!("transaction too large"))
859                    })?;
860                    let res = match Self::decode_tx(&ctx, tx) {
861                        Ok(tx) => {
862                            if prefetch_enabled {
863                                Self::prefetch_tx(&mut prefixes, tx.clone()).map(|_| (tx_size, tx))
864                            } else {
865                                Ok((tx_size, tx))
866                            }
867                        }
868                        Err(err) => Err(err.into()),
869                    };
870                    txs.push(res);
871                }
872                if prefetch_enabled {
873                    CurrentState::with_store(|store| {
874                        store.prefetch_prefixes(prefixes.into_iter().collect(), R::PREFETCH_LIMIT);
875                    });
876                }
877
878                // Check the batch.
879                let mut results = Vec::with_capacity(batch.len());
880                for tx in txs.into_iter() {
881                    match tx {
882                        Ok((tx_size, tx)) => results.push(Self::check_tx(&ctx, tx_size, tx)?),
883                        Err(err) => results.push(CheckTxResult {
884                            error: err,
885                            meta: None,
886                        }),
887                    }
888                }
889
890                Ok(results)
891            },
892        )
893    }
894
895    fn set_abort_batch_flag(&mut self, _abort_batch: Arc<AtomicBool>) {
896        // TODO: Implement support for graceful batch aborts (oasis-sdk#129).
897    }
898
899    fn query(
900        &self,
901        mut rt_ctx: transaction::Context<'_>,
902        method: &str,
903        args: Vec<u8>,
904    ) -> Result<Vec<u8>, RuntimeError> {
905        // Determine whether the method is allowed to access confidential state and provide an
906        // appropriately scoped instance of the key manager client.
907        let is_confidential_allowed = R::Modules::is_allowed_private_km_query(method)
908            && R::is_allowed_private_km_query(method);
909        if is_confidential_allowed {
910            // Perform consensus layer state integrity verification for any queries that allow
911            // access to confidential state.
912            block_on(self.consensus_verifier.verify_for_query(
913                rt_ctx.consensus_block.clone(),
914                rt_ctx.header.clone(),
915                rt_ctx.epoch,
916            ))?;
917            // Ensure the runtime is still ready to process requests.
918            rt_ctx.protocol.ensure_initialized()?;
919        }
920        let key_manager = self.key_manager.as_ref().map(|mgr| {
921            if is_confidential_allowed {
922                mgr.with_private_context()
923            } else {
924                mgr.with_context()
925            }
926        });
927
928        // Prepare dispatch context.
929        let history = self.consensus_verifier.clone();
930
931        let root = storage::MKVSStore::new(&mut rt_ctx.runtime_state);
932        let ctx = RuntimeBatchContext::<'_, R>::new(
933            &self.host_info,
934            key_manager,
935            rt_ctx.header,
936            rt_ctx.round_results,
937            &rt_ctx.consensus_state,
938            &history,
939            rt_ctx.epoch,
940            rt_ctx.max_messages,
941        );
942
943        CurrentState::enter_opts(
944            state::Options::new()
945                .with_mode(state::Mode::Check)
946                .with_rng_local_entropy(), // Mix in local (private) entropy for queries.
947            root,
948            || Self::dispatch_query(&ctx, method, args),
949        )
950    }
951}
952
953#[cfg(test)]
954mod test {
955    use super::*;
956    use crate::{
957        handler,
958        module::Module,
959        modules::{accounts, core},
960        sdk_derive,
961        state::{CurrentState, Options},
962        storage::Store,
963        testing::{configmap, keys, mock::Mock},
964        types::{token, transaction},
965        Version,
966    };
967    use cbor::Encode as _;
968
969    struct CoreConfig;
970    impl core::Config for CoreConfig {}
971    type Core = core::Module<CoreConfig>;
972    type Accounts = accounts::Module;
973
974    #[derive(Error, Debug, oasis_runtime_sdk_macros::Error)]
975    enum AlphabetError {
976        #[error("{0}")]
977        #[sdk_error(transparent, abort)]
978        Core(#[source] core::Error),
979    }
980
981    /// A module with multiple no-op methods; intended for testing routing.
982    struct AlphabetModule;
983
984    #[sdk_derive(Module)]
985    impl AlphabetModule {
986        const NAME: &'static str = "alphabet";
987        const VERSION: u32 = 42;
988        type Error = AlphabetError;
989        type Event = ();
990        type Parameters = ();
991        type Genesis = ();
992
993        #[handler(call = "alphabet.ReadOnly")]
994        fn read_only<C: Context>(_ctx: &C, _args: ()) -> Result<u64, AlphabetError> {
995            CurrentState::with_store(|store| {
996                let _ = store.get(b"key"); // Read something and ignore result.
997            });
998            Ok(42)
999        }
1000
1001        #[handler(call = "alphabet.NotReadOnly")]
1002        fn not_read_only<C: Context>(_ctx: &C, _args: ()) -> Result<u64, AlphabetError> {
1003            CurrentState::with_store(|store| {
1004                store.insert(b"key", b"value");
1005            });
1006            Ok(10)
1007        }
1008
1009        #[handler(call = "alphabet.Aborting")]
1010        fn aborting<C: Context>(_ctx: &C, _args: ()) -> Result<(), AlphabetError> {
1011            // Use a deeply nested abort to make sure this is handled correctly.
1012            Err(AlphabetError::Core(core::Error::Abort(Error::Aborted)))
1013        }
1014
1015        #[handler(query = "alphabet.Alpha")]
1016        fn alpha<C: Context>(_ctx: &C, _args: ()) -> Result<(), AlphabetError> {
1017            Ok(())
1018        }
1019
1020        #[handler(query = "alphabet.Omega", expensive)]
1021        fn expensive<C: Context>(_ctx: &C, _args: ()) -> Result<(), AlphabetError> {
1022            // Nothing actually expensive here. We're just pretending for testing purposes.
1023            Ok(())
1024        }
1025    }
1026
1027    impl module::BlockHandler for AlphabetModule {}
1028    impl module::TransactionHandler for AlphabetModule {}
1029    impl module::InvariantHandler for AlphabetModule {}
1030
1031    struct AlphabetRuntime;
1032
1033    impl Runtime for AlphabetRuntime {
1034        const VERSION: Version = Version::new(0, 0, 0);
1035        type Core = Core;
1036        type Accounts = Accounts;
1037        type Modules = (Core, AlphabetModule);
1038
1039        fn genesis_state() -> <Self::Modules as module::MigrationHandler>::Genesis {
1040            (
1041                core::Genesis {
1042                    parameters: core::Parameters {
1043                        max_batch_gas: u64::MAX,
1044                        max_tx_size: 32 * 1024,
1045                        max_tx_signers: 1,
1046                        max_multisig_signers: 8,
1047                        gas_costs: Default::default(),
1048                        min_gas_price: BTreeMap::from([(token::Denomination::NATIVE, 0)]),
1049                        dynamic_min_gas_price: Default::default(),
1050                    },
1051                },
1052                (),
1053            )
1054        }
1055    }
1056
1057    #[test]
1058    fn test_allowed_queries_defaults() {
1059        let mut mock = Mock::with_local_config(BTreeMap::new());
1060        let mut ctx = mock.create_ctx_for_runtime::<AlphabetRuntime>(false);
1061
1062        Dispatcher::<AlphabetRuntime>::dispatch_query(
1063            &mut ctx,
1064            "alphabet.Alpha",
1065            cbor::to_vec(().into_cbor_value()),
1066        )
1067        .expect("alphabet.Alpha is an inexpensive query, allowed by default");
1068
1069        Dispatcher::<AlphabetRuntime>::dispatch_query(
1070            &mut ctx,
1071            "alphabet.Omega",
1072            cbor::to_vec(().into_cbor_value()),
1073        )
1074        .expect_err("alphabet.Omega is an expensive query, disallowed by default");
1075    }
1076
1077    #[test]
1078    fn test_allowed_queries_custom() {
1079        let local_config = configmap! {
1080            // Allow expensive gas estimation and expensive queries so they can be tested.
1081            "estimate_gas_by_simulating_contracts" => true,
1082            "allowed_queries" => vec![
1083                configmap! {"alphabet.Alpha" => false},
1084                configmap! {"all_expensive" => true},
1085                configmap! {"all" => true}  // should have no effect on Alpha
1086            ],
1087        };
1088        let mut mock = Mock::with_local_config(local_config);
1089        let mut ctx = mock.create_ctx_for_runtime::<AlphabetRuntime>(false);
1090
1091        CurrentState::with_transaction_opts(Options::new().with_mode(state::Mode::Check), || {
1092            Dispatcher::<AlphabetRuntime>::dispatch_query(
1093                &mut ctx,
1094                "alphabet.Alpha",
1095                cbor::to_vec(().into_cbor_value()),
1096            )
1097            .expect_err("alphabet.Alpha is a disallowed query");
1098
1099            Dispatcher::<AlphabetRuntime>::dispatch_query(
1100                &mut ctx,
1101                "alphabet.Omega",
1102                cbor::to_vec(().into_cbor_value()),
1103            )
1104            .expect("alphabet.Omega is an expensive query and expensive queries are allowed");
1105
1106            TransactionResult::Rollback(())
1107        });
1108    }
1109
1110    #[test]
1111    fn test_dispatch_read_only_call() {
1112        let mut mock = Mock::default();
1113        let mut ctx = mock.create_ctx_for_runtime::<AlphabetRuntime>(false);
1114
1115        AlphabetRuntime::migrate(&mut ctx);
1116
1117        let mut tx = transaction::Transaction {
1118            version: 1,
1119            call: transaction::Call {
1120                format: transaction::CallFormat::Plain,
1121                method: "alphabet.ReadOnly".to_owned(),
1122                read_only: true,
1123                ..Default::default()
1124            },
1125            auth_info: transaction::AuthInfo {
1126                signer_info: vec![transaction::SignerInfo::new_sigspec(
1127                    keys::alice::sigspec(),
1128                    0,
1129                )],
1130                fee: transaction::Fee {
1131                    amount: token::BaseUnits::default(),
1132                    gas: 1000,
1133                    ..Default::default()
1134                },
1135                ..Default::default()
1136            },
1137        };
1138
1139        // Dispatch read-only transaction.
1140        let dispatch_result =
1141            Dispatcher::<AlphabetRuntime>::dispatch_tx(&mut ctx, 1024, tx.clone(), 0)
1142                .expect("read only method dispatch should work");
1143        let result = dispatch_result.result.unwrap();
1144        let result: u64 = cbor::from_value(result).unwrap();
1145        assert_eq!(result, 42);
1146
1147        // Dispatch read-only transaction of a method that writes.
1148        tx.call.method = "alphabet.NotReadOnly".to_owned();
1149
1150        let dispatch_result = Dispatcher::<AlphabetRuntime>::dispatch_tx(&mut ctx, 1024, tx, 0)
1151            .expect("read only method dispatch should work");
1152        match dispatch_result.result {
1153            module::CallResult::Failed {
1154                module,
1155                code,
1156                message,
1157            } => {
1158                assert_eq!(&module, "core");
1159                assert_eq!(code, 25);
1160                assert_eq!(&message, "read-only transaction attempted modifications")
1161            }
1162            _ => panic!("not read only method execution did not fail"),
1163        }
1164    }
1165
1166    #[test]
1167    fn test_dispatch_abort_forwarding() {
1168        let mut mock = Mock::default();
1169        let mut ctx = mock.create_ctx_for_runtime::<AlphabetRuntime>(false);
1170
1171        AlphabetRuntime::migrate(&mut ctx);
1172
1173        let tx = transaction::Transaction {
1174            version: 1,
1175            call: transaction::Call {
1176                format: transaction::CallFormat::Plain,
1177                method: "alphabet.Aborting".to_owned(),
1178                ..Default::default()
1179            },
1180            auth_info: transaction::AuthInfo {
1181                signer_info: vec![transaction::SignerInfo::new_sigspec(
1182                    keys::alice::sigspec(),
1183                    0,
1184                )],
1185                fee: transaction::Fee {
1186                    amount: token::BaseUnits::default(),
1187                    gas: 1000,
1188                    ..Default::default()
1189                },
1190                ..Default::default()
1191            },
1192        };
1193
1194        // Dispatch transaction and make sure the abort gets propagated.
1195        let dispatch_result =
1196            Dispatcher::<AlphabetRuntime>::dispatch_tx(&mut ctx, 1024, tx.clone(), 0);
1197        assert!(matches!(dispatch_result, Err(Error::Aborted)));
1198    }
1199}