oasis_runtime_sdk/
dispatcher.rs

1//! Transaction dispatcher.
2use std::{
3    collections::{BTreeMap, BTreeSet},
4    convert::TryInto,
5    marker::PhantomData,
6    sync::{atomic::AtomicBool, Arc},
7};
8
9use anyhow::anyhow;
10use slog::error;
11use thiserror::Error;
12
13use oasis_core_runtime::{
14    self,
15    common::crypto::hash::Hash,
16    consensus::{roothash, verifier::Verifier},
17    enclave_rpc::dispatcher::Dispatcher as RpcDispatcher,
18    future::block_on,
19    protocol::{HostInfo, Protocol},
20    transaction::{
21        self,
22        dispatcher::{ExecuteBatchResult, ExecuteTxResult},
23        tags::Tags,
24        types::TxnBatch,
25    },
26    types::{CheckTxMetadata, CheckTxResult},
27};
28
29use crate::{
30    callformat,
31    context::{Context, RuntimeBatchContext},
32    enclave_rpc,
33    error::{Error as _, RuntimeError},
34    event::IntoTags,
35    keymanager::{KeyManagerClient, KeyManagerError},
36    module::{self, BlockHandler, MethodHandler, TransactionHandler},
37    modules,
38    modules::core::API as _,
39    runtime::Runtime,
40    schedule_control::ScheduleControlHost,
41    sender::SenderMeta,
42    state::{self, CurrentState, Mode, TransactionResult, TransactionWithMeta},
43    storage::{self, Prefix},
44    types,
45    types::transaction::{AuthProof, Transaction},
46};
47
48/// Unique module name.
49const MODULE_NAME: &str = "dispatcher";
50
51/// Error emitted by the dispatch process. Note that this indicates an error in the dispatch
52/// process itself and should not be used for any transaction-related errors.
53#[derive(Error, Debug, oasis_runtime_sdk_macros::Error)]
54#[sdk_error(abort_self)]
55pub enum Error {
56    #[error("dispatch aborted")]
57    #[sdk_error(code = 1)]
58    Aborted,
59
60    #[error("malformed transaction in batch: {0}")]
61    #[sdk_error(code = 2)]
62    MalformedTransactionInBatch(#[source] anyhow::Error),
63
64    #[error("query aborted: {0}")]
65    #[sdk_error(code = 3)]
66    QueryAborted(String),
67
68    #[error("key manager failure: {0}")]
69    #[sdk_error(code = 4)]
70    KeyManagerFailure(#[from] KeyManagerError),
71
72    #[error("batch out of gas")]
73    #[sdk_error(code = 5)]
74    BatchOutOfGas,
75}
76
77/// Result of dispatching a transaction.
78#[derive(Debug)]
79pub struct DispatchResult {
80    /// Transaction call result.
81    pub result: module::CallResult,
82    /// Transaction tags.
83    pub tags: Tags,
84    /// Transaction priority.
85    pub priority: u64,
86    /// Transaction sender metadata.
87    pub sender_metadata: SenderMeta,
88    /// Call format metadata.
89    pub call_format_metadata: callformat::Metadata,
90}
91
92impl DispatchResult {
93    fn new(
94        result: module::CallResult,
95        tags: Tags,
96        call_format_metadata: callformat::Metadata,
97    ) -> Self {
98        Self {
99            result,
100            tags,
101            priority: 0,
102            sender_metadata: Default::default(),
103            call_format_metadata,
104        }
105    }
106}
107
108impl From<module::CallResult> for DispatchResult {
109    fn from(result: module::CallResult) -> Self {
110        Self::new(result, vec![], callformat::Metadata::Empty)
111    }
112}
113
114/// Additional options for dispatch operations.
115#[derive(Default)]
116pub struct DispatchOptions<'a> {
117    /// Transaction size.
118    pub tx_size: u32,
119    /// Transaction index within the batch.
120    pub tx_index: usize,
121    /// Transaction hash.
122    pub tx_hash: Hash,
123    /// Optionally only allow methods for which the provided authorizer closure returns true.
124    pub method_authorizer: Option<&'a dyn Fn(&str) -> bool>,
125    /// Optionally skip authentication.
126    pub skip_authentication: bool,
127}
128
129/// The runtime dispatcher.
130pub struct Dispatcher<R: Runtime> {
131    host_info: HostInfo,
132    host: Arc<Protocol>,
133    key_manager: Option<Arc<KeyManagerClient>>,
134    consensus_verifier: Arc<dyn Verifier>,
135    schedule_control_host: Arc<dyn ScheduleControlHost>,
136    _runtime: PhantomData<R>,
137}
138
139impl<R: Runtime> Dispatcher<R> {
140    /// Create a new instance of the dispatcher for the given runtime.
141    ///
142    /// Note that the dispatcher is fully static and the constructor is only needed so that the
143    /// instance can be used directly with the dispatcher system provided by Oasis Core.
144    pub(super) fn new(
145        host: Arc<Protocol>,
146        key_manager: Option<Arc<KeyManagerClient>>,
147        consensus_verifier: Arc<dyn Verifier>,
148    ) -> Self {
149        Self {
150            host_info: host.get_host_info(),
151            key_manager,
152            consensus_verifier,
153            schedule_control_host: host.clone(),
154            host,
155            _runtime: PhantomData,
156        }
157    }
158
159    /// Decode a runtime transaction.
160    pub fn decode_tx<C: Context>(
161        ctx: &C,
162        tx: &[u8],
163    ) -> Result<types::transaction::Transaction, modules::core::Error> {
164        // Perform any checks before decoding.
165        R::Modules::approve_raw_tx(ctx, tx)?;
166
167        // Deserialize transaction.
168        let utx: types::transaction::UnverifiedTransaction = cbor::from_slice(tx)
169            .map_err(|e| modules::core::Error::MalformedTransaction(e.into()))?;
170
171        // Perform any checks before signature verification.
172        R::Modules::approve_unverified_tx(ctx, &utx)?;
173
174        match utx.1.as_slice() {
175            [AuthProof::Module(scheme)] => {
176                R::Modules::decode_tx(ctx, scheme, &utx.0)?.ok_or_else(|| {
177                    modules::core::Error::MalformedTransaction(anyhow!(
178                        "module-controlled transaction decoding scheme {} not supported",
179                        scheme
180                    ))
181                })
182            }
183            _ => utx
184                .verify()
185                .map_err(|e| modules::core::Error::MalformedTransaction(e.into())),
186        }
187    }
188
189    /// Run the dispatch steps inside a transaction context. This includes the before call hooks,
190    /// the call itself and after call hooks. The after call hooks are called regardless if the call
191    /// succeeds or not.
192    pub fn dispatch_tx_call<C: Context>(
193        ctx: &C,
194        call: types::transaction::Call,
195        opts: &DispatchOptions<'_>,
196    ) -> (module::CallResult, callformat::Metadata) {
197        let read_only = call.read_only;
198
199        // Dispatch the call.
200        let (result, metadata) = Self::_dispatch_tx_call(ctx, call, opts);
201
202        // Unconditionally call after handle call hook.
203        if let Err(e) = R::Modules::after_handle_call(ctx, &result) {
204            // If the call failed, return the error.
205            return (e.into_call_result(), metadata);
206        };
207
208        // Make sure that a read-only call did not result in any modifications.
209        if read_only && CurrentState::with(|state| state.has_pending_store_updates()) {
210            return (
211                modules::core::Error::ReadOnlyTransaction.into_call_result(),
212                metadata,
213            );
214        }
215
216        (result, metadata)
217    }
218
219    fn _dispatch_tx_call<C: Context>(
220        ctx: &C,
221        call: types::transaction::Call,
222        opts: &DispatchOptions<'_>,
223    ) -> (module::CallResult, callformat::Metadata) {
224        if let Err(e) = R::Modules::before_handle_call(ctx, &call) {
225            return (e.into_call_result(), callformat::Metadata::Empty);
226        }
227
228        // Decode call based on specified call format.
229        let (call, call_format_metadata) = match callformat::decode_call(ctx, call, opts.tx_index) {
230            Ok(Some(result)) => result,
231            Ok(None) => {
232                return (
233                    module::CallResult::Ok(cbor::Value::Simple(cbor::SimpleValue::NullValue)),
234                    callformat::Metadata::Empty,
235                )
236            }
237            Err(err) => return (err.into_call_result(), callformat::Metadata::Empty),
238        };
239
240        // Apply optional method authorization.
241        if let Some(method_authorizer) = opts.method_authorizer {
242            if !method_authorizer(&call.method) {
243                return (
244                    modules::core::Error::Forbidden.into_call_result(),
245                    call_format_metadata,
246                );
247            }
248        }
249
250        if let Err(e) = R::Modules::before_authorized_call_dispatch(ctx, &call) {
251            return (e.into_call_result(), call_format_metadata);
252        }
253
254        let result = match R::Modules::dispatch_call(ctx, &call.method, call.body) {
255            module::DispatchResult::Handled(result) => result,
256            module::DispatchResult::Unhandled(_) => {
257                modules::core::Error::InvalidMethod(call.method).into_call_result()
258            }
259        };
260
261        (result, call_format_metadata)
262    }
263
264    /// Dispatch a runtime transaction in the given context with the provided options.
265    pub fn dispatch_tx_opts<C: Context>(
266        ctx: &C,
267        tx: types::transaction::Transaction,
268        opts: &DispatchOptions<'_>,
269    ) -> Result<DispatchResult, Error> {
270        // Run pre-processing hooks.
271        if !opts.skip_authentication {
272            if let Err(err) = R::Modules::authenticate_tx(ctx, &tx) {
273                return Ok(err.into_call_result().into());
274            }
275        }
276        let tx_auth_info = tx.auth_info.clone();
277        let is_read_only = tx.call.read_only;
278        let call = tx.call.clone(); // TODO: Avoid clone.
279
280        let result = CurrentState::with_transaction_opts(
281            state::Options::new().with_tx(TransactionWithMeta {
282                data: tx,
283                size: opts.tx_size,
284                index: opts.tx_index,
285                hash: opts.tx_hash,
286            }),
287            || {
288                let (result, call_format_metadata) = Self::dispatch_tx_call(ctx, call, opts);
289                if !result.is_success() || is_read_only {
290                    // Retrieve unconditional events.
291                    let events = CurrentState::with(|state| state.take_unconditional_events());
292
293                    return TransactionResult::Rollback(DispatchResult::new(
294                        result,
295                        events.into_tags(),
296                        call_format_metadata,
297                    ));
298                }
299
300                // Load priority.
301                let priority = R::Core::take_priority();
302                // Load sender metadata.
303                let sender_metadata = R::Core::take_sender_meta();
304
305                if CurrentState::with_env(|env| env.is_check_only()) {
306                    TransactionResult::Rollback(DispatchResult {
307                        result,
308                        tags: Vec::new(),
309                        priority,
310                        sender_metadata,
311                        call_format_metadata,
312                    })
313                } else {
314                    // Merge normal and unconditional events.
315                    let tags = CurrentState::with(|state| state.take_all_events().into_tags());
316
317                    TransactionResult::Commit(DispatchResult {
318                        result,
319                        tags,
320                        priority,
321                        sender_metadata,
322                        call_format_metadata,
323                    })
324                }
325            },
326        );
327
328        // Run after dispatch hooks.
329        R::Modules::after_dispatch_tx(ctx, &tx_auth_info, &result.result);
330
331        // Propagate batch aborts.
332        if let module::CallResult::Aborted(err) = result.result {
333            return Err(err);
334        }
335
336        Ok(result)
337    }
338
339    /// Dispatch a runtime transaction in the given context.
340    pub fn dispatch_tx<C: Context>(
341        ctx: &C,
342        tx_size: u32,
343        tx: types::transaction::Transaction,
344        tx_index: usize,
345    ) -> Result<DispatchResult, Error> {
346        Self::dispatch_tx_opts(
347            ctx,
348            tx,
349            &DispatchOptions {
350                tx_size,
351                tx_index,
352                ..Default::default()
353            },
354        )
355    }
356
357    /// Check whether the given transaction is valid.
358    pub fn check_tx<C: Context>(
359        ctx: &C,
360        tx_size: u32,
361        tx: Transaction,
362    ) -> Result<CheckTxResult, Error> {
363        // In case of any panics, treat it as a failed check instead of crashing the runtime.
364        let catch_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
365            Self::dispatch_tx(ctx, tx_size, tx, usize::MAX)
366        }));
367        let dispatch = match catch_result {
368            Ok(dispatch) => dispatch?,
369            Err(panic_err) => {
370                // Convert panics into transaction check failures as it is clearly the fault of a
371                // specific transaction.
372                return Ok(CheckTxResult {
373                    error: RuntimeError {
374                        module: MODULE_NAME.to_string(),
375                        code: 1,
376                        message: format!("transaction check aborted: {panic_err:?}"),
377                    },
378                    meta: None,
379                });
380            }
381        };
382
383        match dispatch.result {
384            module::CallResult::Ok(_) => Ok(CheckTxResult {
385                error: Default::default(),
386                meta: Some(CheckTxMetadata {
387                    priority: dispatch.priority,
388                    sender: dispatch.sender_metadata.id(),
389                    sender_seq: dispatch.sender_metadata.tx_nonce,
390                    sender_state_seq: dispatch.sender_metadata.state_nonce,
391                }),
392            }),
393
394            module::CallResult::Failed {
395                module,
396                code,
397                message,
398            } => Ok(CheckTxResult {
399                error: RuntimeError {
400                    module,
401                    code,
402                    message,
403                },
404                meta: None,
405            }),
406
407            module::CallResult::Aborted(err) => Err(err),
408        }
409    }
410
411    /// Execute the given transaction, returning unserialized results.
412    pub fn execute_tx_opts<C: Context>(
413        ctx: &C,
414        tx: Transaction,
415        opts: &DispatchOptions<'_>,
416    ) -> Result<(types::transaction::CallResult, Tags), Error> {
417        let dispatch_result = Self::dispatch_tx_opts(ctx, tx, opts)?;
418        let output: types::transaction::CallResult = callformat::encode_result(
419            ctx,
420            dispatch_result.result,
421            dispatch_result.call_format_metadata,
422        );
423
424        Ok((output, dispatch_result.tags))
425    }
426
427    /// Execute the given transaction.
428    pub fn execute_tx<C: Context>(
429        ctx: &C,
430        tx_size: u32,
431        tx_hash: Hash,
432        tx: Transaction,
433        tx_index: usize,
434    ) -> Result<ExecuteTxResult, Error> {
435        let (output, tags) = Self::execute_tx_opts(
436            ctx,
437            tx,
438            &DispatchOptions {
439                tx_size,
440                tx_index,
441                tx_hash,
442                ..Default::default()
443            },
444        )?;
445
446        Ok(ExecuteTxResult {
447            output: cbor::to_vec(output),
448            tags,
449        })
450    }
451
452    /// Prefetch prefixes for the given transaction.
453    pub fn prefetch_tx(
454        prefixes: &mut BTreeSet<Prefix>,
455        tx: types::transaction::Transaction,
456    ) -> Result<(), RuntimeError> {
457        match R::Modules::prefetch(prefixes, &tx.call.method, tx.call.body, &tx.auth_info) {
458            module::DispatchResult::Handled(r) => r,
459            module::DispatchResult::Unhandled(_) => Ok(()), // Unimplemented prefetch is allowed.
460        }
461    }
462
463    fn handle_last_round_messages<C: Context>(ctx: &C) -> Result<(), modules::core::Error> {
464        let message_events = ctx.runtime_round_results().messages.clone();
465
466        let mut handlers = CurrentState::with_store(|store| {
467            let store = storage::TypedStore::new(storage::PrefixStore::new(
468                store,
469                &modules::core::MODULE_NAME,
470            ));
471            let handlers: BTreeMap<u32, types::message::MessageEventHookInvocation> = store
472                .get(modules::core::state::MESSAGE_HANDLERS)
473                .unwrap_or_default();
474
475            handlers
476        });
477
478        for event in message_events {
479            let handler = handlers
480                .remove(&event.index)
481                .ok_or(modules::core::Error::MessageHandlerMissing(event.index))?;
482            let hook_name = handler.hook_name.clone();
483
484            R::Modules::dispatch_message_result(
485                ctx,
486                &hook_name,
487                types::message::MessageResult {
488                    event,
489                    context: handler.payload,
490                },
491            )
492            .ok_or(modules::core::Error::InvalidMethod(hook_name))?;
493        }
494
495        if !handlers.is_empty() {
496            error!(ctx.get_logger("dispatcher"), "message handler not invoked"; "unhandled" => ?handlers);
497            return Err(modules::core::Error::MessageHandlerNotInvoked);
498        }
499
500        Ok(())
501    }
502
503    fn save_emitted_message_handlers(handlers: Vec<types::message::MessageEventHookInvocation>) {
504        let message_handlers: BTreeMap<u32, types::message::MessageEventHookInvocation> = handlers
505            .into_iter()
506            .enumerate()
507            .map(|(idx, h)| (idx as u32, h))
508            .collect();
509
510        CurrentState::with_store(|store| {
511            let mut store = storage::TypedStore::new(storage::PrefixStore::new(
512                store,
513                &modules::core::MODULE_NAME,
514            ));
515            store.insert(modules::core::state::MESSAGE_HANDLERS, message_handlers);
516        });
517    }
518
519    /// Process the given runtime query.
520    pub fn dispatch_query<C: Context>(
521        ctx: &C,
522        method: &str,
523        args: Vec<u8>,
524    ) -> Result<Vec<u8>, RuntimeError> {
525        let args = cbor::from_slice(&args)
526            .map_err(|err| modules::core::Error::InvalidArgument(err.into()))?;
527
528        CurrentState::with_transaction(|| {
529            // Catch any panics that occur during query dispatch.
530            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
531                // Perform state migrations if required.
532                R::migrate(ctx);
533
534                if !R::is_allowed_query(method) || !ctx.is_allowed_query::<R>(method) {
535                    return Err(modules::core::Error::Forbidden.into());
536                }
537
538                R::Modules::dispatch_query(ctx, method, args)
539                    .ok_or_else(|| modules::core::Error::InvalidMethod(method.into()))?
540            }));
541
542            // Always rollback any changes to storage. Note that this is usually a no-op because
543            // Oasis Core would rollback any storage changes related to queries, but this makes it
544            // explicit to ensure this remains the case regardless of upstream changes.
545            TransactionResult::Rollback(result)
546        })
547        .map_err(|err| -> RuntimeError { Error::QueryAborted(format!("{err:?}")).into() })?
548        .map(cbor::to_vec)
549    }
550
551    fn execute_batch_common<F>(
552        &self,
553        mut rt_ctx: transaction::Context<'_>,
554        f: F,
555    ) -> Result<ExecuteBatchResult, RuntimeError>
556    where
557        F: FnOnce(&RuntimeBatchContext<'_, R>) -> Result<Vec<ExecuteTxResult>, RuntimeError>,
558    {
559        // Prepare dispatch context.
560        let key_manager = self
561            .key_manager
562            .as_ref()
563            // NOTE: We are explicitly allowing private key operations during execution.
564            .map(|mgr| mgr.with_private_context());
565        let history = self.consensus_verifier.clone();
566
567        let root = storage::MKVSStore::new(&mut rt_ctx.runtime_state);
568        let ctx = RuntimeBatchContext::<'_, R>::new(
569            &self.host_info,
570            key_manager,
571            rt_ctx.header,
572            rt_ctx.round_results,
573            &rt_ctx.consensus_state,
574            &history,
575            rt_ctx.epoch,
576            rt_ctx.max_messages,
577        );
578
579        CurrentState::enter_opts(state::Options::new().with_mode(Mode::Execute), root, || {
580            // Perform state migrations if required.
581            R::migrate(&ctx);
582
583            // Handle last round message results.
584            Self::handle_last_round_messages(&ctx)?;
585
586            // Run begin block hooks.
587            R::Modules::begin_block(&ctx);
588
589            let results = f(&ctx)?;
590
591            // Run end block hooks.
592            R::Modules::end_block(&ctx);
593
594            // Process any emitted messages and block-level events.
595            let (messages, handlers, block_tags) = CurrentState::with(|state| {
596                let (messages, handlers) = state.take_messages().into_iter().unzip();
597                let block_tags = state.take_all_events().into_tags();
598
599                (messages, handlers, block_tags)
600            });
601            Self::save_emitted_message_handlers(handlers);
602
603            Ok(ExecuteBatchResult {
604                results,
605                messages,
606                block_tags,
607                tx_reject_hashes: vec![],
608                in_msgs_count: 0, // TODO: Support processing incoming messages.
609            })
610        })
611    }
612
613    /// Register EnclaveRPC methods.
614    pub fn register_enclaverpc(&self, rpc: &mut RpcDispatcher)
615    where
616        R: Runtime + Send + Sync + 'static,
617    {
618        enclave_rpc::Wrapper::<R>::wrap(
619            rpc,
620            self.host.clone(),
621            self.host_info.clone(),
622            self.key_manager.clone(),
623            self.consensus_verifier.clone(),
624        );
625    }
626}
627
628impl<R: Runtime + Send + Sync> transaction::dispatcher::Dispatcher for Dispatcher<R> {
629    fn execute_batch(
630        &self,
631        rt_ctx: transaction::Context<'_>,
632        batch: &TxnBatch,
633        _in_msgs: &[roothash::IncomingMessage],
634    ) -> Result<ExecuteBatchResult, RuntimeError> {
635        self.execute_batch_common(
636            rt_ctx,
637            |ctx| -> Result<Vec<ExecuteTxResult>, RuntimeError> {
638                // If prefetch limit is set enable prefetch.
639                let prefetch_enabled = R::PREFETCH_LIMIT > 0;
640
641                let mut txs = Vec::with_capacity(batch.len());
642                let mut prefixes: BTreeSet<Prefix> = BTreeSet::new();
643                for tx in batch.iter() {
644                    let tx_size = tx.len().try_into().map_err(|_| {
645                        Error::MalformedTransactionInBatch(anyhow!("transaction too large"))
646                    })?;
647                    let tx_hash = Hash::digest_bytes(tx);
648                    // It is an error to include a malformed transaction in a batch. So instead of only
649                    // reporting a failed execution result, we fail the whole batch. This will make the compute
650                    // node vote for failure and the round will fail.
651                    //
652                    // Correct proposers should only include transactions which have passed check_tx.
653                    let tx = Self::decode_tx(ctx, tx)
654                        .map_err(|err| Error::MalformedTransactionInBatch(err.into()))?;
655                    txs.push((tx_size, tx_hash, tx.clone()));
656
657                    if prefetch_enabled {
658                        Self::prefetch_tx(&mut prefixes, tx)?;
659                    }
660                }
661                if prefetch_enabled {
662                    CurrentState::with_store(|store| {
663                        store.prefetch_prefixes(prefixes.into_iter().collect(), R::PREFETCH_LIMIT);
664                    })
665                }
666
667                // Execute the batch.
668                let mut results = Vec::with_capacity(batch.len());
669                for (index, (tx_size, tx_hash, tx)) in txs.into_iter().enumerate() {
670                    results.push(Self::execute_tx(ctx, tx_size, tx_hash, tx, index)?);
671                }
672
673                Ok(results)
674            },
675        )
676    }
677
678    fn schedule_and_execute_batch(
679        &self,
680        rt_ctx: transaction::Context<'_>,
681        batch: &mut TxnBatch,
682        _in_msgs: &[roothash::IncomingMessage],
683    ) -> Result<ExecuteBatchResult, RuntimeError> {
684        let cfg = R::SCHEDULE_CONTROL;
685        let mut tx_reject_hashes = Vec::new();
686
687        let mut result = self.execute_batch_common(
688            rt_ctx,
689            |ctx| -> Result<Vec<ExecuteTxResult>, RuntimeError> {
690                // Schedule and execute the batch.
691                //
692                // The idea is to keep scheduling transactions as long as we have some space
693                // available in the block as determined by gas use.
694                let mut new_batch = Vec::new();
695                let mut results = Vec::with_capacity(batch.len());
696                let mut requested_batch_len = cfg.initial_batch_size;
697                'batch: loop {
698                    // Remember length of last batch.
699                    let last_batch_len = batch.len();
700                    let last_batch_tx_hash = batch.last().map(|raw_tx| Hash::digest_bytes(raw_tx));
701
702                    for raw_tx in batch.drain(..) {
703                        // If we don't have enough gas for processing even the cheapest transaction
704                        // we are done. Same if we reached the runtime-imposed maximum tx count.
705                        let remaining_gas = R::Core::remaining_batch_gas();
706                        if remaining_gas < cfg.min_remaining_gas
707                            || new_batch.len() >= cfg.max_tx_count
708                        {
709                            break 'batch;
710                        }
711
712                        // Decode transaction.
713                        let tx_hash = Hash::digest_bytes(&raw_tx);
714                        let tx = match Self::decode_tx(ctx, &raw_tx) {
715                            Ok(tx) => tx,
716                            Err(_) => {
717                                // Transaction is malformed, make sure it gets removed from the
718                                // queue and don't include it in a block.
719                                tx_reject_hashes.push(tx_hash);
720                                continue;
721                            }
722                        };
723                        let tx_size = raw_tx.len().try_into().unwrap();
724
725                        // If we don't have enough gas remaining to process this transaction, just
726                        // skip it.
727                        if tx.auth_info.fee.gas > remaining_gas {
728                            continue;
729                        }
730                        // Same if we don't have enough consensus message slots.
731                        let remaining_messages = CurrentState::with(|state| {
732                            ctx.max_messages()
733                                .saturating_sub(state.emitted_messages_count() as u32)
734                        });
735                        if tx.auth_info.fee.consensus_messages > remaining_messages {
736                            continue;
737                        }
738
739                        // Determine the current transaction index.
740                        let tx_index = new_batch.len();
741
742                        // First run the transaction in check tx mode in a separate subcontext. If
743                        // that fails, skip and (sometimes) reject transaction.
744                        let skip = CurrentState::with_transaction_opts(
745                            state::Options::new().with_mode(Mode::PreSchedule),
746                            || -> Result<_, Error> {
747                                // First authenticate the transaction to get any nonce related errors.
748                                match R::Modules::authenticate_tx(ctx, &tx) {
749                                    Err(modules::core::Error::FutureNonce) => {
750                                        // Only skip transaction as it may become valid in the future.
751                                        return Ok(true);
752                                    }
753                                    Err(_) => {
754                                        // Skip and reject the transaction.
755                                    }
756                                    Ok(_) => {
757                                        // Run additional checks on the transaction.
758                                        let check_result = Self::dispatch_tx_opts(
759                                            ctx,
760                                            tx.clone(),
761                                            &DispatchOptions {
762                                                tx_size,
763                                                tx_index,
764                                                tx_hash,
765                                                skip_authentication: true, // Already done.
766                                                ..Default::default()
767                                            },
768                                        )?;
769                                        if check_result.result.is_success() {
770                                            // Checks successful, execute transaction as usual.
771                                            return Ok(false);
772                                        }
773                                    }
774                                }
775
776                                // Skip and reject the transaction.
777                                tx_reject_hashes.push(tx_hash);
778                                Ok(true)
779                            },
780                        )?;
781                        if skip {
782                            continue;
783                        }
784
785                        new_batch.push(raw_tx);
786                        results.push(Self::execute_tx(ctx, tx_size, tx_hash, tx, tx_index)?);
787                    }
788
789                    // If there's more room in the block and we got the maximum number of
790                    // transactions, request more transactions.
791                    if last_batch_tx_hash.is_some()
792                        && last_batch_len >= requested_batch_len as usize
793                    {
794                        if let Some(fetched_batch) = self
795                            .schedule_control_host
796                            .fetch_tx_batch(last_batch_tx_hash, cfg.batch_size)?
797                        {
798                            *batch = fetched_batch;
799                            requested_batch_len = cfg.batch_size;
800                            continue;
801                        }
802                        // No more transactions, let's just finish.
803                    }
804                    break;
805                }
806
807                // Replace input batch with newly generated batch.
808                *batch = new_batch.into();
809
810                Ok(results)
811            },
812        )?;
813
814        // Include rejected transaction hashes in the final result.
815        result.tx_reject_hashes = tx_reject_hashes;
816
817        Ok(result)
818    }
819
820    fn check_batch(
821        &self,
822        mut rt_ctx: transaction::Context<'_>,
823        batch: &TxnBatch,
824    ) -> Result<Vec<CheckTxResult>, RuntimeError> {
825        // If prefetch limit is set enable prefetch.
826        let prefetch_enabled = R::PREFETCH_LIMIT > 0;
827
828        // Prepare dispatch context.
829        let key_manager = self.key_manager.as_ref().map(|mgr| mgr.with_context());
830        let history = self.consensus_verifier.clone();
831
832        let root = storage::MKVSStore::new(&mut rt_ctx.runtime_state);
833        let ctx = RuntimeBatchContext::<'_, R>::new(
834            &self.host_info,
835            key_manager,
836            rt_ctx.header,
837            rt_ctx.round_results,
838            &rt_ctx.consensus_state,
839            &history,
840            rt_ctx.epoch,
841            rt_ctx.max_messages,
842        );
843
844        CurrentState::enter_opts(
845            state::Options::new().with_mode(state::Mode::Check),
846            root,
847            || {
848                // Perform state migrations if required.
849                R::migrate(&ctx);
850
851                // Prefetch.
852                let mut txs: Vec<Result<_, RuntimeError>> = Vec::with_capacity(batch.len());
853                let mut prefixes: BTreeSet<Prefix> = BTreeSet::new();
854                for tx in batch.iter() {
855                    let tx_size = tx.len().try_into().map_err(|_| {
856                        Error::MalformedTransactionInBatch(anyhow!("transaction too large"))
857                    })?;
858                    let res = match Self::decode_tx(&ctx, tx) {
859                        Ok(tx) => {
860                            if prefetch_enabled {
861                                Self::prefetch_tx(&mut prefixes, tx.clone()).map(|_| (tx_size, tx))
862                            } else {
863                                Ok((tx_size, tx))
864                            }
865                        }
866                        Err(err) => Err(err.into()),
867                    };
868                    txs.push(res);
869                }
870                if prefetch_enabled {
871                    CurrentState::with_store(|store| {
872                        store.prefetch_prefixes(prefixes.into_iter().collect(), R::PREFETCH_LIMIT);
873                    });
874                }
875
876                // Check the batch.
877                let mut results = Vec::with_capacity(batch.len());
878                for tx in txs.into_iter() {
879                    match tx {
880                        Ok((tx_size, tx)) => results.push(Self::check_tx(&ctx, tx_size, tx)?),
881                        Err(err) => results.push(CheckTxResult {
882                            error: err,
883                            meta: None,
884                        }),
885                    }
886                }
887
888                Ok(results)
889            },
890        )
891    }
892
893    fn set_abort_batch_flag(&mut self, _abort_batch: Arc<AtomicBool>) {
894        // TODO: Implement support for graceful batch aborts (oasis-sdk#129).
895    }
896
897    fn query(
898        &self,
899        mut rt_ctx: transaction::Context<'_>,
900        method: &str,
901        args: Vec<u8>,
902    ) -> Result<Vec<u8>, RuntimeError> {
903        // Determine whether the method is allowed to access confidential state and provide an
904        // appropriately scoped instance of the key manager client.
905        let is_confidential_allowed = R::Modules::is_allowed_private_km_query(method)
906            && R::is_allowed_private_km_query(method);
907        if is_confidential_allowed {
908            // Perform consensus layer state integrity verification for any queries that allow
909            // access to confidential state.
910            block_on(self.consensus_verifier.verify_for_query(
911                rt_ctx.consensus_block.clone(),
912                rt_ctx.header.clone(),
913                rt_ctx.epoch,
914            ))?;
915            // Ensure the runtime is still ready to process requests.
916            rt_ctx.protocol.ensure_initialized()?;
917        }
918        let key_manager = self.key_manager.as_ref().map(|mgr| {
919            if is_confidential_allowed {
920                mgr.with_private_context()
921            } else {
922                mgr.with_context()
923            }
924        });
925
926        // Prepare dispatch context.
927        let history = self.consensus_verifier.clone();
928
929        let root = storage::MKVSStore::new(&mut rt_ctx.runtime_state);
930        let ctx = RuntimeBatchContext::<'_, R>::new(
931            &self.host_info,
932            key_manager,
933            rt_ctx.header,
934            rt_ctx.round_results,
935            &rt_ctx.consensus_state,
936            &history,
937            rt_ctx.epoch,
938            rt_ctx.max_messages,
939        );
940
941        CurrentState::enter_opts(
942            state::Options::new()
943                .with_mode(state::Mode::Check)
944                .with_rng_local_entropy(), // Mix in local (private) entropy for queries.
945            root,
946            || Self::dispatch_query(&ctx, method, args),
947        )
948    }
949}
950
951#[cfg(test)]
952mod test {
953    use super::*;
954    use crate::{
955        handler,
956        module::Module,
957        modules::{accounts, core},
958        sdk_derive,
959        state::{CurrentState, Options},
960        storage::Store,
961        testing::{configmap, keys, mock::Mock},
962        types::{token, transaction},
963        Version,
964    };
965    use cbor::Encode as _;
966
967    struct CoreConfig;
968    impl core::Config for CoreConfig {}
969    type Core = core::Module<CoreConfig>;
970    type Accounts = accounts::Module;
971
972    #[derive(Error, Debug, oasis_runtime_sdk_macros::Error)]
973    enum AlphabetError {
974        #[error("{0}")]
975        #[sdk_error(transparent, abort)]
976        Core(#[source] core::Error),
977    }
978
979    /// A module with multiple no-op methods; intended for testing routing.
980    struct AlphabetModule;
981
982    #[sdk_derive(Module)]
983    impl AlphabetModule {
984        const NAME: &'static str = "alphabet";
985        const VERSION: u32 = 42;
986        type Error = AlphabetError;
987        type Event = ();
988        type Parameters = ();
989        type Genesis = ();
990
991        #[handler(call = "alphabet.ReadOnly")]
992        fn read_only<C: Context>(_ctx: &C, _args: ()) -> Result<u64, AlphabetError> {
993            CurrentState::with_store(|store| {
994                let _ = store.get(b"key"); // Read something and ignore result.
995            });
996            Ok(42)
997        }
998
999        #[handler(call = "alphabet.NotReadOnly")]
1000        fn not_read_only<C: Context>(_ctx: &C, _args: ()) -> Result<u64, AlphabetError> {
1001            CurrentState::with_store(|store| {
1002                store.insert(b"key", b"value");
1003            });
1004            Ok(10)
1005        }
1006
1007        #[handler(call = "alphabet.Aborting")]
1008        fn aborting<C: Context>(_ctx: &C, _args: ()) -> Result<(), AlphabetError> {
1009            // Use a deeply nested abort to make sure this is handled correctly.
1010            Err(AlphabetError::Core(core::Error::Abort(Error::Aborted)))
1011        }
1012
1013        #[handler(query = "alphabet.Alpha")]
1014        fn alpha<C: Context>(_ctx: &C, _args: ()) -> Result<(), AlphabetError> {
1015            Ok(())
1016        }
1017
1018        #[handler(query = "alphabet.Omega", expensive)]
1019        fn expensive<C: Context>(_ctx: &C, _args: ()) -> Result<(), AlphabetError> {
1020            // Nothing actually expensive here. We're just pretending for testing purposes.
1021            Ok(())
1022        }
1023    }
1024
1025    impl module::BlockHandler for AlphabetModule {}
1026    impl module::TransactionHandler for AlphabetModule {}
1027    impl module::InvariantHandler for AlphabetModule {}
1028
1029    struct AlphabetRuntime;
1030
1031    impl Runtime for AlphabetRuntime {
1032        const VERSION: Version = Version::new(0, 0, 0);
1033        type Core = Core;
1034        type Accounts = Accounts;
1035        type Modules = (Core, AlphabetModule);
1036
1037        fn genesis_state() -> <Self::Modules as module::MigrationHandler>::Genesis {
1038            (
1039                core::Genesis {
1040                    parameters: core::Parameters {
1041                        max_batch_gas: u64::MAX,
1042                        max_tx_size: 32 * 1024,
1043                        max_tx_signers: 1,
1044                        max_multisig_signers: 8,
1045                        gas_costs: Default::default(),
1046                        min_gas_price: BTreeMap::from([(token::Denomination::NATIVE, 0)]),
1047                        dynamic_min_gas_price: Default::default(),
1048                    },
1049                },
1050                (),
1051            )
1052        }
1053    }
1054
1055    #[test]
1056    fn test_allowed_queries_defaults() {
1057        let mut mock = Mock::with_local_config(BTreeMap::new());
1058        let mut ctx = mock.create_ctx_for_runtime::<AlphabetRuntime>(false);
1059
1060        Dispatcher::<AlphabetRuntime>::dispatch_query(
1061            &mut ctx,
1062            "alphabet.Alpha",
1063            cbor::to_vec(().into_cbor_value()),
1064        )
1065        .expect("alphabet.Alpha is an inexpensive query, allowed by default");
1066
1067        Dispatcher::<AlphabetRuntime>::dispatch_query(
1068            &mut ctx,
1069            "alphabet.Omega",
1070            cbor::to_vec(().into_cbor_value()),
1071        )
1072        .expect_err("alphabet.Omega is an expensive query, disallowed by default");
1073    }
1074
1075    #[test]
1076    fn test_allowed_queries_custom() {
1077        let local_config = configmap! {
1078            // Allow expensive gas estimation and expensive queries so they can be tested.
1079            "estimate_gas_by_simulating_contracts" => true,
1080            "allowed_queries" => vec![
1081                configmap! {"alphabet.Alpha" => false},
1082                configmap! {"all_expensive" => true},
1083                configmap! {"all" => true}  // should have no effect on Alpha
1084            ],
1085        };
1086        let mut mock = Mock::with_local_config(local_config);
1087        let mut ctx = mock.create_ctx_for_runtime::<AlphabetRuntime>(false);
1088
1089        CurrentState::with_transaction_opts(Options::new().with_mode(state::Mode::Check), || {
1090            Dispatcher::<AlphabetRuntime>::dispatch_query(
1091                &mut ctx,
1092                "alphabet.Alpha",
1093                cbor::to_vec(().into_cbor_value()),
1094            )
1095            .expect_err("alphabet.Alpha is a disallowed query");
1096
1097            Dispatcher::<AlphabetRuntime>::dispatch_query(
1098                &mut ctx,
1099                "alphabet.Omega",
1100                cbor::to_vec(().into_cbor_value()),
1101            )
1102            .expect("alphabet.Omega is an expensive query and expensive queries are allowed");
1103
1104            TransactionResult::Rollback(())
1105        });
1106    }
1107
1108    #[test]
1109    fn test_dispatch_read_only_call() {
1110        let mut mock = Mock::default();
1111        let mut ctx = mock.create_ctx_for_runtime::<AlphabetRuntime>(false);
1112
1113        AlphabetRuntime::migrate(&mut ctx);
1114
1115        let mut tx = transaction::Transaction {
1116            version: 1,
1117            call: transaction::Call {
1118                format: transaction::CallFormat::Plain,
1119                method: "alphabet.ReadOnly".to_owned(),
1120                read_only: true,
1121                ..Default::default()
1122            },
1123            auth_info: transaction::AuthInfo {
1124                signer_info: vec![transaction::SignerInfo::new_sigspec(
1125                    keys::alice::sigspec(),
1126                    0,
1127                )],
1128                fee: transaction::Fee {
1129                    amount: token::BaseUnits::new(0, token::Denomination::NATIVE),
1130                    gas: 1000,
1131                    ..Default::default()
1132                },
1133                ..Default::default()
1134            },
1135        };
1136
1137        // Dispatch read-only transaction.
1138        let dispatch_result =
1139            Dispatcher::<AlphabetRuntime>::dispatch_tx(&mut ctx, 1024, tx.clone(), 0)
1140                .expect("read only method dispatch should work");
1141        let result = dispatch_result.result.unwrap();
1142        let result: u64 = cbor::from_value(result).unwrap();
1143        assert_eq!(result, 42);
1144
1145        // Dispatch read-only transaction of a method that writes.
1146        tx.call.method = "alphabet.NotReadOnly".to_owned();
1147
1148        let dispatch_result = Dispatcher::<AlphabetRuntime>::dispatch_tx(&mut ctx, 1024, tx, 0)
1149            .expect("read only method dispatch should work");
1150        match dispatch_result.result {
1151            module::CallResult::Failed {
1152                module,
1153                code,
1154                message,
1155            } => {
1156                assert_eq!(&module, "core");
1157                assert_eq!(code, 25);
1158                assert_eq!(&message, "read-only transaction attempted modifications")
1159            }
1160            _ => panic!("not read only method execution did not fail"),
1161        }
1162    }
1163
1164    #[test]
1165    fn test_dispatch_abort_forwarding() {
1166        let mut mock = Mock::default();
1167        let mut ctx = mock.create_ctx_for_runtime::<AlphabetRuntime>(false);
1168
1169        AlphabetRuntime::migrate(&mut ctx);
1170
1171        let tx = transaction::Transaction {
1172            version: 1,
1173            call: transaction::Call {
1174                format: transaction::CallFormat::Plain,
1175                method: "alphabet.Aborting".to_owned(),
1176                ..Default::default()
1177            },
1178            auth_info: transaction::AuthInfo {
1179                signer_info: vec![transaction::SignerInfo::new_sigspec(
1180                    keys::alice::sigspec(),
1181                    0,
1182                )],
1183                fee: transaction::Fee {
1184                    amount: token::BaseUnits::new(0, token::Denomination::NATIVE),
1185                    gas: 1000,
1186                    ..Default::default()
1187                },
1188                ..Default::default()
1189            },
1190        };
1191
1192        // Dispatch transaction and make sure the abort gets propagated.
1193        let dispatch_result =
1194            Dispatcher::<AlphabetRuntime>::dispatch_tx(&mut ctx, 1024, tx.clone(), 0);
1195        assert!(matches!(dispatch_result, Err(Error::Aborted)));
1196    }
1197}