oasis_runtime_sdk/
dispatcher.rs

1//! Transaction dispatcher.
2use std::{
3    collections::{BTreeMap, BTreeSet},
4    convert::TryInto,
5    marker::PhantomData,
6    sync::{atomic::AtomicBool, Arc},
7};
8
9use anyhow::anyhow;
10use slog::error;
11use thiserror::Error;
12
13use oasis_core_runtime::{
14    self,
15    common::crypto::hash::Hash,
16    consensus::{roothash, verifier::Verifier},
17    enclave_rpc::dispatcher::Dispatcher as RpcDispatcher,
18    future::block_on,
19    protocol::{HostInfo, Protocol},
20    transaction::{
21        self,
22        dispatcher::{ExecuteBatchResult, ExecuteTxResult},
23        tags::Tags,
24        types::TxnBatch,
25    },
26    types::{CheckTxMetadata, CheckTxResult},
27};
28
29use crate::{
30    callformat,
31    context::{Context, RuntimeBatchContext},
32    enclave_rpc,
33    error::{Error as _, RuntimeError},
34    event::IntoTags,
35    keymanager::{KeyManagerClient, KeyManagerError},
36    module::{self, BlockHandler, MethodHandler, TransactionHandler},
37    modules,
38    modules::core::API as _,
39    runtime::Runtime,
40    schedule_control::ScheduleControlHost,
41    sender::SenderMeta,
42    state::{self, CurrentState, Mode, TransactionResult, TransactionWithMeta},
43    storage::{self, Prefix},
44    types,
45    types::transaction::{AuthProof, Transaction},
46};
47
48/// Unique module name.
49const MODULE_NAME: &str = "dispatcher";
50
51/// Error emitted by the dispatch process. Note that this indicates an error in the dispatch
52/// process itself and should not be used for any transaction-related errors.
53#[derive(Error, Debug, oasis_runtime_sdk_macros::Error)]
54#[sdk_error(abort_self)]
55pub enum Error {
56    #[error("dispatch aborted")]
57    #[sdk_error(code = 1)]
58    Aborted,
59
60    #[error("malformed transaction in batch: {0}")]
61    #[sdk_error(code = 2)]
62    MalformedTransactionInBatch(#[source] anyhow::Error),
63
64    #[error("query aborted: {0}")]
65    #[sdk_error(code = 3)]
66    QueryAborted(String),
67
68    #[error("key manager failure: {0}")]
69    #[sdk_error(code = 4)]
70    KeyManagerFailure(#[from] KeyManagerError),
71
72    #[error("batch out of gas")]
73    #[sdk_error(code = 5)]
74    BatchOutOfGas,
75}
76
77/// Result of dispatching a transaction.
78#[derive(Debug)]
79pub struct DispatchResult {
80    /// Transaction call result.
81    pub result: module::CallResult,
82    /// Transaction tags.
83    pub tags: Tags,
84    /// Transaction priority.
85    pub priority: u64,
86    /// Transaction sender metadata.
87    pub sender_metadata: SenderMeta,
88    /// Call format metadata.
89    pub call_format_metadata: callformat::Metadata,
90}
91
92impl DispatchResult {
93    fn new(
94        result: module::CallResult,
95        tags: Tags,
96        call_format_metadata: callformat::Metadata,
97    ) -> Self {
98        Self {
99            result,
100            tags,
101            priority: 0,
102            sender_metadata: Default::default(),
103            call_format_metadata,
104        }
105    }
106}
107
108impl From<module::CallResult> for DispatchResult {
109    fn from(result: module::CallResult) -> Self {
110        Self::new(result, vec![], callformat::Metadata::Empty)
111    }
112}
113
114/// Additional options for dispatch operations.
115#[derive(Default)]
116pub struct DispatchOptions<'a> {
117    /// Transaction size.
118    pub tx_size: u32,
119    /// Transaction index within the batch.
120    pub tx_index: usize,
121    /// Transaction hash.
122    pub tx_hash: Hash,
123    /// Optionally only allow methods for which the provided authorizer closure returns true.
124    pub method_authorizer: Option<&'a dyn Fn(&str) -> bool>,
125    /// Optionally skip authentication.
126    pub skip_authentication: bool,
127}
128
129/// The runtime dispatcher.
130pub struct Dispatcher<R: Runtime> {
131    host_info: HostInfo,
132    host: Arc<Protocol>,
133    key_manager: Option<Arc<KeyManagerClient>>,
134    consensus_verifier: Arc<dyn Verifier>,
135    schedule_control_host: Arc<dyn ScheduleControlHost>,
136    _runtime: PhantomData<R>,
137}
138
139impl<R: Runtime> Dispatcher<R> {
140    /// Create a new instance of the dispatcher for the given runtime.
141    ///
142    /// Note that the dispatcher is fully static and the constructor is only needed so that the
143    /// instance can be used directly with the dispatcher system provided by Oasis Core.
144    pub(super) fn new(
145        host: Arc<Protocol>,
146        key_manager: Option<Arc<KeyManagerClient>>,
147        consensus_verifier: Arc<dyn Verifier>,
148    ) -> Self {
149        Self {
150            host_info: host.get_host_info(),
151            key_manager,
152            consensus_verifier,
153            schedule_control_host: host.clone(),
154            host,
155            _runtime: PhantomData,
156        }
157    }
158
159    /// Decode a runtime transaction.
160    pub fn decode_tx<C: Context>(
161        ctx: &C,
162        tx: &[u8],
163    ) -> Result<types::transaction::Transaction, modules::core::Error> {
164        // Perform any checks before decoding.
165        R::Modules::approve_raw_tx(ctx, tx)?;
166
167        // Deserialize transaction.
168        let utx: types::transaction::UnverifiedTransaction = cbor::from_slice(tx)
169            .map_err(|e| modules::core::Error::MalformedTransaction(e.into()))?;
170
171        // Perform any checks before signature verification.
172        R::Modules::approve_unverified_tx(ctx, &utx)?;
173
174        match utx.1.as_slice() {
175            [AuthProof::Module(scheme)] => {
176                R::Modules::decode_tx(ctx, scheme, &utx.0)?.ok_or_else(|| {
177                    modules::core::Error::MalformedTransaction(anyhow!(
178                        "module-controlled transaction decoding scheme {} not supported",
179                        scheme
180                    ))
181                })
182            }
183            _ => utx
184                .verify()
185                .map_err(|e| modules::core::Error::MalformedTransaction(e.into())),
186        }
187    }
188
189    /// Run the dispatch steps inside a transaction context. This includes the before call hooks,
190    /// the call itself and after call hooks. The after call hooks are called regardless if the call
191    /// succeeds or not.
192    pub fn dispatch_tx_call<C: Context>(
193        ctx: &C,
194        call: types::transaction::Call,
195        opts: &DispatchOptions<'_>,
196    ) -> (module::CallResult, callformat::Metadata) {
197        let read_only = call.read_only;
198
199        // Dispatch the call.
200        let (result, metadata) = Self::_dispatch_tx_call(ctx, call, opts);
201
202        // Unconditionally call after handle call hook.
203        let result = match R::Modules::after_handle_call(ctx, result) {
204            Ok(result) => result,
205            Err(e) => {
206                // If the call failed, return the error.
207                return (e.into_call_result(), metadata);
208            }
209        };
210
211        // Make sure that a read-only call did not result in any modifications.
212        if read_only && CurrentState::with(|state| state.has_pending_store_updates()) {
213            return (
214                modules::core::Error::ReadOnlyTransaction.into_call_result(),
215                metadata,
216            );
217        }
218
219        (result, metadata)
220    }
221
222    fn _dispatch_tx_call<C: Context>(
223        ctx: &C,
224        call: types::transaction::Call,
225        opts: &DispatchOptions<'_>,
226    ) -> (module::CallResult, callformat::Metadata) {
227        if let Err(e) = R::Modules::before_handle_call(ctx, &call) {
228            return (e.into_call_result(), callformat::Metadata::Empty);
229        }
230
231        // Decode call based on specified call format.
232        let (call, call_format_metadata) = match callformat::decode_call(ctx, call, opts.tx_index) {
233            Ok(Some(result)) => result,
234            Ok(None) => {
235                return (
236                    module::CallResult::Ok(cbor::Value::Simple(cbor::SimpleValue::NullValue)),
237                    callformat::Metadata::Empty,
238                )
239            }
240            Err(err) => return (err.into_call_result(), callformat::Metadata::Empty),
241        };
242
243        // Apply optional method authorization.
244        if let Some(method_authorizer) = opts.method_authorizer {
245            if !method_authorizer(&call.method) {
246                return (
247                    modules::core::Error::Forbidden.into_call_result(),
248                    call_format_metadata,
249                );
250            }
251        }
252
253        if let Err(e) = R::Modules::before_authorized_call_dispatch(ctx, &call) {
254            return (e.into_call_result(), call_format_metadata);
255        }
256
257        let result = match R::Modules::dispatch_call(ctx, &call.method, call.body) {
258            module::DispatchResult::Handled(result) => result,
259            module::DispatchResult::Unhandled(_) => {
260                modules::core::Error::InvalidMethod(call.method).into_call_result()
261            }
262        };
263
264        (result, call_format_metadata)
265    }
266
267    /// Dispatch a runtime transaction in the given context with the provided options.
268    pub fn dispatch_tx_opts<C: Context>(
269        ctx: &C,
270        tx: types::transaction::Transaction,
271        opts: &DispatchOptions<'_>,
272    ) -> Result<DispatchResult, Error> {
273        // Run pre-processing hooks.
274        if !opts.skip_authentication {
275            if let Err(err) = R::Modules::authenticate_tx(ctx, &tx) {
276                return Ok(err.into_call_result().into());
277            }
278        }
279        let tx_auth_info = tx.auth_info.clone();
280        let is_read_only = tx.call.read_only;
281        let call = tx.call.clone(); // TODO: Avoid clone.
282
283        let result = CurrentState::with_transaction_opts(
284            state::Options::new().with_tx(TransactionWithMeta {
285                data: tx,
286                size: opts.tx_size,
287                index: opts.tx_index,
288                hash: opts.tx_hash,
289            }),
290            || {
291                let (result, call_format_metadata) = Self::dispatch_tx_call(ctx, call, opts);
292                if !result.is_success() || is_read_only {
293                    // Retrieve unconditional events.
294                    let events = CurrentState::with(|state| state.take_unconditional_events());
295
296                    return TransactionResult::Rollback(DispatchResult::new(
297                        result,
298                        events.into_tags(),
299                        call_format_metadata,
300                    ));
301                }
302
303                // Load priority.
304                let priority = R::Core::take_priority();
305                // Load sender metadata.
306                let sender_metadata = R::Core::take_sender_meta();
307
308                if CurrentState::with_env(|env| env.is_check_only()) {
309                    TransactionResult::Rollback(DispatchResult {
310                        result,
311                        tags: Vec::new(),
312                        priority,
313                        sender_metadata,
314                        call_format_metadata,
315                    })
316                } else {
317                    // Merge normal and unconditional events.
318                    let tags = CurrentState::with(|state| state.take_all_events().into_tags());
319
320                    TransactionResult::Commit(DispatchResult {
321                        result,
322                        tags,
323                        priority,
324                        sender_metadata,
325                        call_format_metadata,
326                    })
327                }
328            },
329        );
330
331        // Run after dispatch hooks.
332        R::Modules::after_dispatch_tx(ctx, &tx_auth_info, &result.result);
333
334        // Propagate batch aborts.
335        if let module::CallResult::Aborted(err) = result.result {
336            return Err(err);
337        }
338
339        Ok(result)
340    }
341
342    /// Dispatch a runtime transaction in the given context.
343    pub fn dispatch_tx<C: Context>(
344        ctx: &C,
345        tx_size: u32,
346        tx: types::transaction::Transaction,
347        tx_index: usize,
348    ) -> Result<DispatchResult, Error> {
349        Self::dispatch_tx_opts(
350            ctx,
351            tx,
352            &DispatchOptions {
353                tx_size,
354                tx_index,
355                ..Default::default()
356            },
357        )
358    }
359
360    /// Check whether the given transaction is valid.
361    pub fn check_tx<C: Context>(
362        ctx: &C,
363        tx_size: u32,
364        tx: Transaction,
365    ) -> Result<CheckTxResult, Error> {
366        // In case of any panics, treat it as a failed check instead of crashing the runtime.
367        let catch_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
368            Self::dispatch_tx(ctx, tx_size, tx, usize::MAX)
369        }));
370        let dispatch = match catch_result {
371            Ok(dispatch) => dispatch?,
372            Err(panic_err) => {
373                // Convert panics into transaction check failures as it is clearly the fault of a
374                // specific transaction.
375                return Ok(CheckTxResult {
376                    error: RuntimeError {
377                        module: MODULE_NAME.to_string(),
378                        code: 1,
379                        message: format!("transaction check aborted: {panic_err:?}"),
380                    },
381                    meta: None,
382                });
383            }
384        };
385
386        match dispatch.result {
387            module::CallResult::Ok(_) => Ok(CheckTxResult {
388                error: Default::default(),
389                meta: Some(CheckTxMetadata {
390                    priority: dispatch.priority,
391                    sender: dispatch.sender_metadata.id(),
392                    sender_seq: dispatch.sender_metadata.tx_nonce,
393                    sender_state_seq: dispatch.sender_metadata.state_nonce,
394                }),
395            }),
396
397            module::CallResult::Failed {
398                module,
399                code,
400                message,
401            } => Ok(CheckTxResult {
402                error: RuntimeError {
403                    module,
404                    code,
405                    message,
406                },
407                meta: None,
408            }),
409
410            module::CallResult::Aborted(err) => Err(err),
411        }
412    }
413
414    /// Execute the given transaction, returning unserialized results.
415    pub fn execute_tx_opts<C: Context>(
416        ctx: &C,
417        tx: Transaction,
418        opts: &DispatchOptions<'_>,
419    ) -> Result<(types::transaction::CallResult, Tags), Error> {
420        let dispatch_result = Self::dispatch_tx_opts(ctx, tx, opts)?;
421        let output: types::transaction::CallResult = callformat::encode_result(
422            ctx,
423            dispatch_result.result,
424            dispatch_result.call_format_metadata,
425        );
426
427        Ok((output, dispatch_result.tags))
428    }
429
430    /// Execute the given transaction.
431    pub fn execute_tx<C: Context>(
432        ctx: &C,
433        tx_size: u32,
434        tx_hash: Hash,
435        tx: Transaction,
436        tx_index: usize,
437    ) -> Result<ExecuteTxResult, Error> {
438        let (output, tags) = Self::execute_tx_opts(
439            ctx,
440            tx,
441            &DispatchOptions {
442                tx_size,
443                tx_index,
444                tx_hash,
445                ..Default::default()
446            },
447        )?;
448
449        Ok(ExecuteTxResult {
450            output: cbor::to_vec(output),
451            tags,
452        })
453    }
454
455    /// Prefetch prefixes for the given transaction.
456    pub fn prefetch_tx(
457        prefixes: &mut BTreeSet<Prefix>,
458        tx: types::transaction::Transaction,
459    ) -> Result<(), RuntimeError> {
460        match R::Modules::prefetch(prefixes, &tx.call.method, tx.call.body, &tx.auth_info) {
461            module::DispatchResult::Handled(r) => r,
462            module::DispatchResult::Unhandled(_) => Ok(()), // Unimplemented prefetch is allowed.
463        }
464    }
465
466    fn handle_last_round_messages<C: Context>(ctx: &C) -> Result<(), modules::core::Error> {
467        let message_events = ctx.runtime_round_results().messages.clone();
468
469        let mut handlers = CurrentState::with_store(|store| {
470            let store = storage::TypedStore::new(storage::PrefixStore::new(
471                store,
472                &modules::core::MODULE_NAME,
473            ));
474            let handlers: BTreeMap<u32, types::message::MessageEventHookInvocation> = store
475                .get(modules::core::state::MESSAGE_HANDLERS)
476                .unwrap_or_default();
477
478            handlers
479        });
480
481        for event in message_events {
482            let handler = handlers
483                .remove(&event.index)
484                .ok_or(modules::core::Error::MessageHandlerMissing(event.index))?;
485            let hook_name = handler.hook_name.clone();
486
487            R::Modules::dispatch_message_result(
488                ctx,
489                &hook_name,
490                types::message::MessageResult {
491                    event,
492                    context: handler.payload,
493                },
494            )
495            .ok_or(modules::core::Error::InvalidMethod(hook_name))?;
496        }
497
498        if !handlers.is_empty() {
499            error!(ctx.get_logger("dispatcher"), "message handler not invoked"; "unhandled" => ?handlers);
500            return Err(modules::core::Error::MessageHandlerNotInvoked);
501        }
502
503        Ok(())
504    }
505
506    fn save_emitted_message_handlers(handlers: Vec<types::message::MessageEventHookInvocation>) {
507        let message_handlers: BTreeMap<u32, types::message::MessageEventHookInvocation> = handlers
508            .into_iter()
509            .enumerate()
510            .map(|(idx, h)| (idx as u32, h))
511            .collect();
512
513        CurrentState::with_store(|store| {
514            let mut store = storage::TypedStore::new(storage::PrefixStore::new(
515                store,
516                &modules::core::MODULE_NAME,
517            ));
518            store.insert(modules::core::state::MESSAGE_HANDLERS, message_handlers);
519        });
520    }
521
522    /// Process the given runtime query.
523    pub fn dispatch_query<C: Context>(
524        ctx: &C,
525        method: &str,
526        args: Vec<u8>,
527    ) -> Result<Vec<u8>, RuntimeError> {
528        let args = cbor::from_slice(&args)
529            .map_err(|err| modules::core::Error::InvalidArgument(err.into()))?;
530
531        CurrentState::with_transaction(|| {
532            // Catch any panics that occur during query dispatch.
533            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
534                // Perform state migrations if required.
535                R::migrate(ctx);
536
537                if !R::is_allowed_query(method) || !ctx.is_allowed_query::<R>(method) {
538                    return Err(modules::core::Error::Forbidden.into());
539                }
540
541                R::Modules::dispatch_query(ctx, method, args)
542                    .ok_or_else(|| modules::core::Error::InvalidMethod(method.into()))?
543            }));
544
545            // Always rollback any changes to storage. Note that this is usually a no-op because
546            // Oasis Core would rollback any storage changes related to queries, but this makes it
547            // explicit to ensure this remains the case regardless of upstream changes.
548            TransactionResult::Rollback(result)
549        })
550        .map_err(|err| -> RuntimeError { Error::QueryAborted(format!("{err:?}")).into() })?
551        .map(cbor::to_vec)
552    }
553
554    fn execute_batch_common<F>(
555        &self,
556        mut rt_ctx: transaction::Context<'_>,
557        f: F,
558    ) -> Result<ExecuteBatchResult, RuntimeError>
559    where
560        F: FnOnce(&RuntimeBatchContext<'_, R>) -> Result<Vec<ExecuteTxResult>, RuntimeError>,
561    {
562        // Prepare dispatch context.
563        let key_manager = self
564            .key_manager
565            .as_ref()
566            // NOTE: We are explicitly allowing private key operations during execution.
567            .map(|mgr| mgr.with_private_context());
568        let history = self.consensus_verifier.clone();
569
570        let root = storage::MKVSStore::new(&mut rt_ctx.runtime_state);
571        let ctx = RuntimeBatchContext::<'_, R>::new(
572            &self.host_info,
573            key_manager,
574            rt_ctx.header,
575            rt_ctx.round_results,
576            &rt_ctx.consensus_state,
577            &history,
578            rt_ctx.epoch,
579            rt_ctx.max_messages,
580        );
581
582        CurrentState::enter_opts(state::Options::new().with_mode(Mode::Execute), root, || {
583            // Perform state migrations if required.
584            R::migrate(&ctx);
585
586            // Handle last round message results.
587            Self::handle_last_round_messages(&ctx)?;
588
589            // Run begin block hooks.
590            R::Modules::begin_block(&ctx);
591
592            let results = f(&ctx)?;
593
594            // Run end block hooks.
595            R::Modules::end_block(&ctx);
596
597            // Process any emitted messages and block-level events.
598            let (messages, handlers, block_tags) = CurrentState::with(|state| {
599                let (messages, handlers) = state.take_messages().into_iter().unzip();
600                let block_tags = state.take_all_events().into_tags();
601
602                (messages, handlers, block_tags)
603            });
604            Self::save_emitted_message_handlers(handlers);
605
606            Ok(ExecuteBatchResult {
607                results,
608                messages,
609                block_tags,
610                tx_reject_hashes: vec![],
611                in_msgs_count: 0, // TODO: Support processing incoming messages.
612            })
613        })
614    }
615
616    /// Register EnclaveRPC methods.
617    pub fn register_enclaverpc(&self, rpc: &mut RpcDispatcher)
618    where
619        R: Runtime + Send + Sync + 'static,
620    {
621        enclave_rpc::Wrapper::<R>::wrap(
622            rpc,
623            self.host.clone(),
624            self.host_info.clone(),
625            self.key_manager.clone(),
626            self.consensus_verifier.clone(),
627        );
628    }
629}
630
631impl<R: Runtime + Send + Sync> transaction::dispatcher::Dispatcher for Dispatcher<R> {
632    fn execute_batch(
633        &self,
634        rt_ctx: transaction::Context<'_>,
635        batch: &TxnBatch,
636        _in_msgs: &[roothash::IncomingMessage],
637    ) -> Result<ExecuteBatchResult, RuntimeError> {
638        self.execute_batch_common(
639            rt_ctx,
640            |ctx| -> Result<Vec<ExecuteTxResult>, RuntimeError> {
641                // If prefetch limit is set enable prefetch.
642                let prefetch_enabled = R::PREFETCH_LIMIT > 0;
643
644                let mut txs = Vec::with_capacity(batch.len());
645                let mut prefixes: BTreeSet<Prefix> = BTreeSet::new();
646                for tx in batch.iter() {
647                    let tx_size = tx.len().try_into().map_err(|_| {
648                        Error::MalformedTransactionInBatch(anyhow!("transaction too large"))
649                    })?;
650                    let tx_hash = Hash::digest_bytes(tx);
651                    // It is an error to include a malformed transaction in a batch. So instead of only
652                    // reporting a failed execution result, we fail the whole batch. This will make the compute
653                    // node vote for failure and the round will fail.
654                    //
655                    // Correct proposers should only include transactions which have passed check_tx.
656                    let tx = Self::decode_tx(ctx, tx)
657                        .map_err(|err| Error::MalformedTransactionInBatch(err.into()))?;
658                    txs.push((tx_size, tx_hash, tx.clone()));
659
660                    if prefetch_enabled {
661                        Self::prefetch_tx(&mut prefixes, tx)?;
662                    }
663                }
664                if prefetch_enabled {
665                    CurrentState::with_store(|store| {
666                        store.prefetch_prefixes(prefixes.into_iter().collect(), R::PREFETCH_LIMIT);
667                    })
668                }
669
670                // Execute the batch.
671                let mut results = Vec::with_capacity(batch.len());
672                for (index, (tx_size, tx_hash, tx)) in txs.into_iter().enumerate() {
673                    results.push(Self::execute_tx(ctx, tx_size, tx_hash, tx, index)?);
674                }
675
676                Ok(results)
677            },
678        )
679    }
680
681    fn schedule_and_execute_batch(
682        &self,
683        rt_ctx: transaction::Context<'_>,
684        batch: &mut TxnBatch,
685        _in_msgs: &[roothash::IncomingMessage],
686    ) -> Result<ExecuteBatchResult, RuntimeError> {
687        let cfg = R::SCHEDULE_CONTROL;
688        let mut tx_reject_hashes = Vec::new();
689
690        let mut result = self.execute_batch_common(
691            rt_ctx,
692            |ctx| -> Result<Vec<ExecuteTxResult>, RuntimeError> {
693                // Schedule and execute the batch.
694                //
695                // The idea is to keep scheduling transactions as long as we have some space
696                // available in the block as determined by gas use.
697                let mut new_batch = Vec::new();
698                let mut results = Vec::with_capacity(batch.len());
699                let mut requested_batch_len = cfg.initial_batch_size;
700                'batch: loop {
701                    // Remember length of last batch.
702                    let last_batch_len = batch.len();
703                    let last_batch_tx_hash = batch.last().map(|raw_tx| Hash::digest_bytes(raw_tx));
704
705                    for raw_tx in batch.drain(..) {
706                        // If we don't have enough gas for processing even the cheapest transaction
707                        // we are done. Same if we reached the runtime-imposed maximum tx count.
708                        let remaining_gas = R::Core::remaining_batch_gas();
709                        if remaining_gas < cfg.min_remaining_gas
710                            || new_batch.len() >= cfg.max_tx_count
711                        {
712                            break 'batch;
713                        }
714
715                        // Decode transaction.
716                        let tx_hash = Hash::digest_bytes(&raw_tx);
717                        let tx = match Self::decode_tx(ctx, &raw_tx) {
718                            Ok(tx) => tx,
719                            Err(_) => {
720                                // Transaction is malformed, make sure it gets removed from the
721                                // queue and don't include it in a block.
722                                tx_reject_hashes.push(tx_hash);
723                                continue;
724                            }
725                        };
726                        let tx_size = raw_tx.len().try_into().unwrap();
727
728                        // If we don't have enough gas remaining to process this transaction, just
729                        // skip it.
730                        if tx.auth_info.fee.gas > remaining_gas {
731                            continue;
732                        }
733                        // Same if we don't have enough consensus message slots.
734                        let remaining_messages = CurrentState::with(|state| {
735                            ctx.max_messages()
736                                .saturating_sub(state.emitted_messages_count() as u32)
737                        });
738                        if tx.auth_info.fee.consensus_messages > remaining_messages {
739                            continue;
740                        }
741
742                        // Determine the current transaction index.
743                        let tx_index = new_batch.len();
744
745                        // First run the transaction in check tx mode in a separate subcontext. If
746                        // that fails, skip and (sometimes) reject transaction.
747                        let skip = CurrentState::with_transaction_opts(
748                            state::Options::new().with_mode(Mode::PreSchedule),
749                            || -> Result<_, Error> {
750                                // First authenticate the transaction to get any nonce related errors.
751                                match R::Modules::authenticate_tx(ctx, &tx) {
752                                    Err(modules::core::Error::FutureNonce) => {
753                                        // Only skip transaction as it may become valid in the future.
754                                        return Ok(true);
755                                    }
756                                    Err(_) => {
757                                        // Skip and reject the transaction.
758                                    }
759                                    Ok(_) => {
760                                        // Run additional checks on the transaction.
761                                        let check_result = Self::dispatch_tx_opts(
762                                            ctx,
763                                            tx.clone(),
764                                            &DispatchOptions {
765                                                tx_size,
766                                                tx_index,
767                                                tx_hash,
768                                                skip_authentication: true, // Already done.
769                                                ..Default::default()
770                                            },
771                                        )?;
772                                        if check_result.result.is_success() {
773                                            // Checks successful, execute transaction as usual.
774                                            return Ok(false);
775                                        }
776                                    }
777                                }
778
779                                // Skip and reject the transaction.
780                                tx_reject_hashes.push(tx_hash);
781                                Ok(true)
782                            },
783                        )?;
784                        if skip {
785                            continue;
786                        }
787
788                        new_batch.push(raw_tx);
789                        results.push(Self::execute_tx(ctx, tx_size, tx_hash, tx, tx_index)?);
790                    }
791
792                    // If there's more room in the block and we got the maximum number of
793                    // transactions, request more transactions.
794                    if last_batch_tx_hash.is_some()
795                        && last_batch_len >= requested_batch_len as usize
796                    {
797                        if let Some(fetched_batch) = self
798                            .schedule_control_host
799                            .fetch_tx_batch(last_batch_tx_hash, cfg.batch_size)?
800                        {
801                            *batch = fetched_batch;
802                            requested_batch_len = cfg.batch_size;
803                            continue;
804                        }
805                        // No more transactions, let's just finish.
806                    }
807                    break;
808                }
809
810                // Replace input batch with newly generated batch.
811                *batch = new_batch.into();
812
813                Ok(results)
814            },
815        )?;
816
817        // Include rejected transaction hashes in the final result.
818        result.tx_reject_hashes = tx_reject_hashes;
819
820        Ok(result)
821    }
822
823    fn check_batch(
824        &self,
825        mut rt_ctx: transaction::Context<'_>,
826        batch: &TxnBatch,
827    ) -> Result<Vec<CheckTxResult>, RuntimeError> {
828        // If prefetch limit is set enable prefetch.
829        let prefetch_enabled = R::PREFETCH_LIMIT > 0;
830
831        // Prepare dispatch context.
832        let key_manager = self.key_manager.as_ref().map(|mgr| mgr.with_context());
833        let history = self.consensus_verifier.clone();
834
835        let root = storage::MKVSStore::new(&mut rt_ctx.runtime_state);
836        let ctx = RuntimeBatchContext::<'_, R>::new(
837            &self.host_info,
838            key_manager,
839            rt_ctx.header,
840            rt_ctx.round_results,
841            &rt_ctx.consensus_state,
842            &history,
843            rt_ctx.epoch,
844            rt_ctx.max_messages,
845        );
846
847        CurrentState::enter_opts(
848            state::Options::new().with_mode(state::Mode::Check),
849            root,
850            || {
851                // Perform state migrations if required.
852                R::migrate(&ctx);
853
854                // Prefetch.
855                let mut txs: Vec<Result<_, RuntimeError>> = Vec::with_capacity(batch.len());
856                let mut prefixes: BTreeSet<Prefix> = BTreeSet::new();
857                for tx in batch.iter() {
858                    let tx_size = tx.len().try_into().map_err(|_| {
859                        Error::MalformedTransactionInBatch(anyhow!("transaction too large"))
860                    })?;
861                    let res = match Self::decode_tx(&ctx, tx) {
862                        Ok(tx) => {
863                            if prefetch_enabled {
864                                Self::prefetch_tx(&mut prefixes, tx.clone()).map(|_| (tx_size, tx))
865                            } else {
866                                Ok((tx_size, tx))
867                            }
868                        }
869                        Err(err) => Err(err.into()),
870                    };
871                    txs.push(res);
872                }
873                if prefetch_enabled {
874                    CurrentState::with_store(|store| {
875                        store.prefetch_prefixes(prefixes.into_iter().collect(), R::PREFETCH_LIMIT);
876                    });
877                }
878
879                // Check the batch.
880                let mut results = Vec::with_capacity(batch.len());
881                for tx in txs.into_iter() {
882                    match tx {
883                        Ok((tx_size, tx)) => results.push(Self::check_tx(&ctx, tx_size, tx)?),
884                        Err(err) => results.push(CheckTxResult {
885                            error: err,
886                            meta: None,
887                        }),
888                    }
889                }
890
891                Ok(results)
892            },
893        )
894    }
895
896    fn set_abort_batch_flag(&mut self, _abort_batch: Arc<AtomicBool>) {
897        // TODO: Implement support for graceful batch aborts (oasis-sdk#129).
898    }
899
900    fn query(
901        &self,
902        mut rt_ctx: transaction::Context<'_>,
903        method: &str,
904        args: Vec<u8>,
905    ) -> Result<Vec<u8>, RuntimeError> {
906        // Determine whether the method is allowed to access confidential state and provide an
907        // appropriately scoped instance of the key manager client.
908        let is_confidential_allowed = R::Modules::is_allowed_private_km_query(method)
909            && R::is_allowed_private_km_query(method);
910        if is_confidential_allowed {
911            // Perform consensus layer state integrity verification for any queries that allow
912            // access to confidential state.
913            block_on(self.consensus_verifier.verify_for_query(
914                rt_ctx.consensus_block.clone(),
915                rt_ctx.header.clone(),
916                rt_ctx.epoch,
917            ))?;
918            // Ensure the runtime is still ready to process requests.
919            rt_ctx.protocol.ensure_initialized()?;
920        }
921        let key_manager = self.key_manager.as_ref().map(|mgr| {
922            if is_confidential_allowed {
923                mgr.with_private_context()
924            } else {
925                mgr.with_context()
926            }
927        });
928
929        // Prepare dispatch context.
930        let history = self.consensus_verifier.clone();
931
932        let root = storage::MKVSStore::new(&mut rt_ctx.runtime_state);
933        let ctx = RuntimeBatchContext::<'_, R>::new(
934            &self.host_info,
935            key_manager,
936            rt_ctx.header,
937            rt_ctx.round_results,
938            &rt_ctx.consensus_state,
939            &history,
940            rt_ctx.epoch,
941            rt_ctx.max_messages,
942        );
943
944        CurrentState::enter_opts(
945            state::Options::new()
946                .with_mode(state::Mode::Check)
947                .with_rng_local_entropy(), // Mix in local (private) entropy for queries.
948            root,
949            || Self::dispatch_query(&ctx, method, args),
950        )
951    }
952}
953
954#[cfg(test)]
955mod test {
956    use super::*;
957    use crate::{
958        handler,
959        module::Module,
960        modules::{accounts, core},
961        sdk_derive,
962        state::{CurrentState, Options},
963        storage::Store,
964        testing::{configmap, keys, mock::Mock},
965        types::{token, transaction},
966        Version,
967    };
968    use cbor::Encode as _;
969
970    struct CoreConfig;
971    impl core::Config for CoreConfig {}
972    type Core = core::Module<CoreConfig>;
973    type Accounts = accounts::Module;
974
975    #[derive(Error, Debug, oasis_runtime_sdk_macros::Error)]
976    enum AlphabetError {
977        #[error("{0}")]
978        #[sdk_error(transparent, abort)]
979        Core(#[source] core::Error),
980    }
981
982    /// A module with multiple no-op methods; intended for testing routing.
983    struct AlphabetModule;
984
985    #[sdk_derive(Module)]
986    impl AlphabetModule {
987        const NAME: &'static str = "alphabet";
988        const VERSION: u32 = 42;
989        type Error = AlphabetError;
990        type Event = ();
991        type Parameters = ();
992        type Genesis = ();
993
994        #[handler(call = "alphabet.ReadOnly")]
995        fn read_only<C: Context>(_ctx: &C, _args: ()) -> Result<u64, AlphabetError> {
996            CurrentState::with_store(|store| {
997                let _ = store.get(b"key"); // Read something and ignore result.
998            });
999            Ok(42)
1000        }
1001
1002        #[handler(call = "alphabet.NotReadOnly")]
1003        fn not_read_only<C: Context>(_ctx: &C, _args: ()) -> Result<u64, AlphabetError> {
1004            CurrentState::with_store(|store| {
1005                store.insert(b"key", b"value");
1006            });
1007            Ok(10)
1008        }
1009
1010        #[handler(call = "alphabet.Aborting")]
1011        fn aborting<C: Context>(_ctx: &C, _args: ()) -> Result<(), AlphabetError> {
1012            // Use a deeply nested abort to make sure this is handled correctly.
1013            Err(AlphabetError::Core(core::Error::Abort(Error::Aborted)))
1014        }
1015
1016        #[handler(query = "alphabet.Alpha")]
1017        fn alpha<C: Context>(_ctx: &C, _args: ()) -> Result<(), AlphabetError> {
1018            Ok(())
1019        }
1020
1021        #[handler(query = "alphabet.Omega", expensive)]
1022        fn expensive<C: Context>(_ctx: &C, _args: ()) -> Result<(), AlphabetError> {
1023            // Nothing actually expensive here. We're just pretending for testing purposes.
1024            Ok(())
1025        }
1026    }
1027
1028    impl module::BlockHandler for AlphabetModule {}
1029    impl module::TransactionHandler for AlphabetModule {}
1030    impl module::InvariantHandler for AlphabetModule {}
1031
1032    struct AlphabetRuntime;
1033
1034    impl Runtime for AlphabetRuntime {
1035        const VERSION: Version = Version::new(0, 0, 0);
1036        type Core = Core;
1037        type Accounts = Accounts;
1038        type Modules = (Core, AlphabetModule);
1039
1040        fn genesis_state() -> <Self::Modules as module::MigrationHandler>::Genesis {
1041            (
1042                core::Genesis {
1043                    parameters: core::Parameters {
1044                        max_batch_gas: u64::MAX,
1045                        max_tx_size: 32 * 1024,
1046                        max_tx_signers: 1,
1047                        max_multisig_signers: 8,
1048                        gas_costs: Default::default(),
1049                        min_gas_price: BTreeMap::from([(token::Denomination::NATIVE, 0)]),
1050                        dynamic_min_gas_price: Default::default(),
1051                    },
1052                },
1053                (),
1054            )
1055        }
1056    }
1057
1058    #[test]
1059    fn test_allowed_queries_defaults() {
1060        let mut mock = Mock::with_local_config(BTreeMap::new());
1061        let mut ctx = mock.create_ctx_for_runtime::<AlphabetRuntime>(false);
1062
1063        Dispatcher::<AlphabetRuntime>::dispatch_query(
1064            &mut ctx,
1065            "alphabet.Alpha",
1066            cbor::to_vec(().into_cbor_value()),
1067        )
1068        .expect("alphabet.Alpha is an inexpensive query, allowed by default");
1069
1070        Dispatcher::<AlphabetRuntime>::dispatch_query(
1071            &mut ctx,
1072            "alphabet.Omega",
1073            cbor::to_vec(().into_cbor_value()),
1074        )
1075        .expect_err("alphabet.Omega is an expensive query, disallowed by default");
1076    }
1077
1078    #[test]
1079    fn test_allowed_queries_custom() {
1080        let local_config = configmap! {
1081            // Allow expensive gas estimation and expensive queries so they can be tested.
1082            "estimate_gas_by_simulating_contracts" => true,
1083            "allowed_queries" => vec![
1084                configmap! {"alphabet.Alpha" => false},
1085                configmap! {"all_expensive" => true},
1086                configmap! {"all" => true}  // should have no effect on Alpha
1087            ],
1088        };
1089        let mut mock = Mock::with_local_config(local_config);
1090        let mut ctx = mock.create_ctx_for_runtime::<AlphabetRuntime>(false);
1091
1092        CurrentState::with_transaction_opts(Options::new().with_mode(state::Mode::Check), || {
1093            Dispatcher::<AlphabetRuntime>::dispatch_query(
1094                &mut ctx,
1095                "alphabet.Alpha",
1096                cbor::to_vec(().into_cbor_value()),
1097            )
1098            .expect_err("alphabet.Alpha is a disallowed query");
1099
1100            Dispatcher::<AlphabetRuntime>::dispatch_query(
1101                &mut ctx,
1102                "alphabet.Omega",
1103                cbor::to_vec(().into_cbor_value()),
1104            )
1105            .expect("alphabet.Omega is an expensive query and expensive queries are allowed");
1106
1107            TransactionResult::Rollback(())
1108        });
1109    }
1110
1111    #[test]
1112    fn test_dispatch_read_only_call() {
1113        let mut mock = Mock::default();
1114        let mut ctx = mock.create_ctx_for_runtime::<AlphabetRuntime>(false);
1115
1116        AlphabetRuntime::migrate(&mut ctx);
1117
1118        let mut tx = transaction::Transaction {
1119            version: 1,
1120            call: transaction::Call {
1121                format: transaction::CallFormat::Plain,
1122                method: "alphabet.ReadOnly".to_owned(),
1123                read_only: true,
1124                ..Default::default()
1125            },
1126            auth_info: transaction::AuthInfo {
1127                signer_info: vec![transaction::SignerInfo::new_sigspec(
1128                    keys::alice::sigspec(),
1129                    0,
1130                )],
1131                fee: transaction::Fee {
1132                    amount: token::BaseUnits::new(0, token::Denomination::NATIVE),
1133                    gas: 1000,
1134                    ..Default::default()
1135                },
1136                ..Default::default()
1137            },
1138        };
1139
1140        // Dispatch read-only transaction.
1141        let dispatch_result =
1142            Dispatcher::<AlphabetRuntime>::dispatch_tx(&mut ctx, 1024, tx.clone(), 0)
1143                .expect("read only method dispatch should work");
1144        let result = dispatch_result.result.unwrap();
1145        let result: u64 = cbor::from_value(result).unwrap();
1146        assert_eq!(result, 42);
1147
1148        // Dispatch read-only transaction of a method that writes.
1149        tx.call.method = "alphabet.NotReadOnly".to_owned();
1150
1151        let dispatch_result = Dispatcher::<AlphabetRuntime>::dispatch_tx(&mut ctx, 1024, tx, 0)
1152            .expect("read only method dispatch should work");
1153        match dispatch_result.result {
1154            module::CallResult::Failed {
1155                module,
1156                code,
1157                message,
1158            } => {
1159                assert_eq!(&module, "core");
1160                assert_eq!(code, 25);
1161                assert_eq!(&message, "read-only transaction attempted modifications")
1162            }
1163            _ => panic!("not read only method execution did not fail"),
1164        }
1165    }
1166
1167    #[test]
1168    fn test_dispatch_abort_forwarding() {
1169        let mut mock = Mock::default();
1170        let mut ctx = mock.create_ctx_for_runtime::<AlphabetRuntime>(false);
1171
1172        AlphabetRuntime::migrate(&mut ctx);
1173
1174        let tx = transaction::Transaction {
1175            version: 1,
1176            call: transaction::Call {
1177                format: transaction::CallFormat::Plain,
1178                method: "alphabet.Aborting".to_owned(),
1179                ..Default::default()
1180            },
1181            auth_info: transaction::AuthInfo {
1182                signer_info: vec![transaction::SignerInfo::new_sigspec(
1183                    keys::alice::sigspec(),
1184                    0,
1185                )],
1186                fee: transaction::Fee {
1187                    amount: token::BaseUnits::new(0, token::Denomination::NATIVE),
1188                    gas: 1000,
1189                    ..Default::default()
1190                },
1191                ..Default::default()
1192            },
1193        };
1194
1195        // Dispatch transaction and make sure the abort gets propagated.
1196        let dispatch_result =
1197            Dispatcher::<AlphabetRuntime>::dispatch_tx(&mut ctx, 1024, tx.clone(), 0);
1198        assert!(matches!(dispatch_result, Err(Error::Aborted)));
1199    }
1200}