1use std::{
3 collections::{BTreeMap, BTreeSet},
4 convert::TryInto,
5 marker::PhantomData,
6 sync::{atomic::AtomicBool, Arc},
7};
8
9use anyhow::anyhow;
10use slog::error;
11use thiserror::Error;
12
13use oasis_core_runtime::{
14 self,
15 common::crypto::hash::Hash,
16 consensus::{roothash, verifier::Verifier},
17 enclave_rpc::dispatcher::Dispatcher as RpcDispatcher,
18 future::block_on,
19 protocol::{HostInfo, Protocol},
20 transaction::{
21 self,
22 dispatcher::{ExecuteBatchResult, ExecuteTxResult},
23 tags::Tags,
24 types::TxnBatch,
25 },
26 types::{CheckTxMetadata, CheckTxResult},
27};
28
29use crate::{
30 callformat,
31 context::{Context, RuntimeBatchContext},
32 enclave_rpc,
33 error::{Error as _, RuntimeError},
34 event::IntoTags,
35 keymanager::{KeyManagerClient, KeyManagerError},
36 module::{self, BlockHandler, MethodHandler, TransactionHandler},
37 modules,
38 modules::core::API as _,
39 runtime::Runtime,
40 schedule_control::ScheduleControlHost,
41 sender::SenderMeta,
42 state::{self, CurrentState, Mode, TransactionResult, TransactionWithMeta},
43 storage::{self, Prefix},
44 types,
45 types::transaction::{AuthProof, Transaction},
46};
47
48const MODULE_NAME: &str = "dispatcher";
50
51#[derive(Error, Debug, oasis_runtime_sdk_macros::Error)]
54#[sdk_error(abort_self)]
55pub enum Error {
56 #[error("dispatch aborted")]
57 #[sdk_error(code = 1)]
58 Aborted,
59
60 #[error("malformed transaction in batch: {0}")]
61 #[sdk_error(code = 2)]
62 MalformedTransactionInBatch(#[source] anyhow::Error),
63
64 #[error("query aborted: {0}")]
65 #[sdk_error(code = 3)]
66 QueryAborted(String),
67
68 #[error("key manager failure: {0}")]
69 #[sdk_error(code = 4)]
70 KeyManagerFailure(#[from] KeyManagerError),
71
72 #[error("batch out of gas")]
73 #[sdk_error(code = 5)]
74 BatchOutOfGas,
75}
76
77#[derive(Debug)]
79pub struct DispatchResult {
80 pub result: module::CallResult,
82 pub tags: Tags,
84 pub priority: u64,
86 pub sender_metadata: SenderMeta,
88 pub call_format_metadata: callformat::Metadata,
90}
91
92impl DispatchResult {
93 fn new(
94 result: module::CallResult,
95 tags: Tags,
96 call_format_metadata: callformat::Metadata,
97 ) -> Self {
98 Self {
99 result,
100 tags,
101 priority: 0,
102 sender_metadata: Default::default(),
103 call_format_metadata,
104 }
105 }
106}
107
108impl From<module::CallResult> for DispatchResult {
109 fn from(result: module::CallResult) -> Self {
110 Self::new(result, vec![], callformat::Metadata::Empty)
111 }
112}
113
114#[derive(Default)]
116pub struct DispatchOptions<'a> {
117 pub tx_size: u32,
119 pub tx_index: usize,
121 pub tx_hash: Hash,
123 pub method_authorizer: Option<&'a dyn Fn(&str) -> bool>,
125 pub skip_authentication: bool,
127}
128
129pub struct Dispatcher<R: Runtime> {
131 host_info: HostInfo,
132 host: Arc<Protocol>,
133 key_manager: Option<Arc<KeyManagerClient>>,
134 consensus_verifier: Arc<dyn Verifier>,
135 schedule_control_host: Arc<dyn ScheduleControlHost>,
136 _runtime: PhantomData<R>,
137}
138
139impl<R: Runtime> Dispatcher<R> {
140 pub(super) fn new(
145 host: Arc<Protocol>,
146 key_manager: Option<Arc<KeyManagerClient>>,
147 consensus_verifier: Arc<dyn Verifier>,
148 ) -> Self {
149 Self {
150 host_info: host.get_host_info(),
151 key_manager,
152 consensus_verifier,
153 schedule_control_host: host.clone(),
154 host,
155 _runtime: PhantomData,
156 }
157 }
158
159 pub fn decode_tx<C: Context>(
161 ctx: &C,
162 tx: &[u8],
163 ) -> Result<types::transaction::Transaction, modules::core::Error> {
164 R::Modules::approve_raw_tx(ctx, tx)?;
166
167 let utx: types::transaction::UnverifiedTransaction = cbor::from_slice(tx)
169 .map_err(|e| modules::core::Error::MalformedTransaction(e.into()))?;
170
171 R::Modules::approve_unverified_tx(ctx, &utx)?;
173
174 match utx.1.as_slice() {
175 [AuthProof::Module(scheme)] => {
176 R::Modules::decode_tx(ctx, scheme, &utx.0)?.ok_or_else(|| {
177 modules::core::Error::MalformedTransaction(anyhow!(
178 "module-controlled transaction decoding scheme {scheme} not supported"
179 ))
180 })
181 }
182 _ => utx
183 .verify()
184 .map_err(|e| modules::core::Error::MalformedTransaction(e.into())),
185 }
186 }
187
188 pub fn dispatch_tx_call<C: Context>(
192 ctx: &C,
193 call: types::transaction::Call,
194 opts: &DispatchOptions<'_>,
195 ) -> (module::CallResult, callformat::Metadata) {
196 let read_only = call.read_only;
197
198 let (result, metadata) = Self::_dispatch_tx_call(ctx, call, opts);
200
201 if let Err(e) = R::Modules::after_handle_call(ctx, &result) {
203 return (e.into_call_result(), metadata);
205 };
206
207 if read_only && CurrentState::with(|state| state.has_pending_store_updates()) {
209 return (
210 modules::core::Error::ReadOnlyTransaction.into_call_result(),
211 metadata,
212 );
213 }
214
215 (result, metadata)
216 }
217
218 fn _dispatch_tx_call<C: Context>(
219 ctx: &C,
220 call: types::transaction::Call,
221 opts: &DispatchOptions<'_>,
222 ) -> (module::CallResult, callformat::Metadata) {
223 if let Err(e) = R::Modules::before_handle_call(ctx, &call) {
224 return (e.into_call_result(), callformat::Metadata::Empty);
225 }
226
227 let (call, call_format_metadata) = match callformat::decode_call(ctx, call, opts.tx_index) {
229 Ok(Some(result)) => result,
230 Ok(None) => {
231 return (
232 module::CallResult::Ok(cbor::Value::Simple(cbor::SimpleValue::NullValue)),
233 callformat::Metadata::Empty,
234 )
235 }
236 Err(err) => return (err.into_call_result(), callformat::Metadata::Empty),
237 };
238
239 if let Some(method_authorizer) = opts.method_authorizer {
241 if !method_authorizer(&call.method) {
242 return (
243 modules::core::Error::Forbidden.into_call_result(),
244 call_format_metadata,
245 );
246 }
247 }
248
249 if let Err(e) = R::Modules::before_authorized_call_dispatch(ctx, &call) {
250 return (e.into_call_result(), call_format_metadata);
251 }
252
253 let result = match R::Modules::dispatch_call(ctx, &call.method, call.body) {
254 module::DispatchResult::Handled(result) => result,
255 module::DispatchResult::Unhandled(_) => {
256 modules::core::Error::InvalidMethod(call.method).into_call_result()
257 }
258 };
259
260 (result, call_format_metadata)
261 }
262
263 pub fn dispatch_tx_opts<C: Context>(
265 ctx: &C,
266 tx: types::transaction::Transaction,
267 opts: &DispatchOptions<'_>,
268 ) -> Result<DispatchResult, Error> {
269 if !opts.skip_authentication {
271 if let Err(err) = R::Modules::authenticate_tx(ctx, &tx) {
272 return Ok(err.into_call_result().into());
273 }
274 }
275 let tx_auth_info = tx.auth_info.clone();
276 let is_read_only = tx.call.read_only;
277 let call = tx.call.clone(); let result = CurrentState::with_transaction_opts(
280 state::Options::new().with_tx(TransactionWithMeta {
281 data: tx,
282 size: opts.tx_size,
283 index: opts.tx_index,
284 hash: opts.tx_hash,
285 }),
286 || {
287 let (result, call_format_metadata) = Self::dispatch_tx_call(ctx, call, opts);
288 if !result.is_success() || is_read_only {
289 let events = CurrentState::with(|state| state.take_unconditional_events());
291
292 return TransactionResult::Rollback(DispatchResult::new(
293 result,
294 events.into_tags(),
295 call_format_metadata,
296 ));
297 }
298
299 let priority = R::Core::take_priority();
301 let sender_metadata = R::Core::take_sender_meta();
303
304 if CurrentState::with_env(|env| env.is_check_only()) {
305 TransactionResult::Rollback(DispatchResult {
306 result,
307 tags: Vec::new(),
308 priority,
309 sender_metadata,
310 call_format_metadata,
311 })
312 } else {
313 let tags = CurrentState::with(|state| state.take_all_events().into_tags());
315
316 TransactionResult::Commit(DispatchResult {
317 result,
318 tags,
319 priority,
320 sender_metadata,
321 call_format_metadata,
322 })
323 }
324 },
325 );
326
327 R::Modules::after_dispatch_tx(ctx, &tx_auth_info, &result.result);
329
330 if let module::CallResult::Aborted(err) = result.result {
332 return Err(err);
333 }
334
335 Ok(result)
336 }
337
338 pub fn dispatch_tx<C: Context>(
340 ctx: &C,
341 tx_size: u32,
342 tx: types::transaction::Transaction,
343 tx_index: usize,
344 ) -> Result<DispatchResult, Error> {
345 Self::dispatch_tx_opts(
346 ctx,
347 tx,
348 &DispatchOptions {
349 tx_size,
350 tx_index,
351 ..Default::default()
352 },
353 )
354 }
355
356 pub fn check_tx<C: Context>(
358 ctx: &C,
359 tx_size: u32,
360 tx: Transaction,
361 ) -> Result<CheckTxResult, Error> {
362 let catch_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
364 Self::dispatch_tx(ctx, tx_size, tx, usize::MAX)
365 }));
366 let dispatch = match catch_result {
367 Ok(dispatch) => dispatch?,
368 Err(panic_err) => {
369 return Ok(CheckTxResult {
372 error: RuntimeError {
373 module: MODULE_NAME.to_string(),
374 code: 1,
375 message: format!("transaction check aborted: {panic_err:?}"),
376 },
377 meta: None,
378 });
379 }
380 };
381
382 match dispatch.result {
383 module::CallResult::Ok(_) => Ok(CheckTxResult {
384 error: Default::default(),
385 meta: Some(CheckTxMetadata {
386 priority: dispatch.priority,
387 sender: dispatch.sender_metadata.id(),
388 sender_seq: dispatch.sender_metadata.tx_nonce,
389 sender_state_seq: dispatch.sender_metadata.state_nonce,
390 }),
391 }),
392
393 module::CallResult::Failed {
394 module,
395 code,
396 message,
397 } => Ok(CheckTxResult {
398 error: RuntimeError {
399 module,
400 code,
401 message,
402 },
403 meta: None,
404 }),
405
406 module::CallResult::Aborted(err) => Err(err),
407 }
408 }
409
410 pub fn execute_tx_opts<C: Context>(
412 ctx: &C,
413 tx: Transaction,
414 opts: &DispatchOptions<'_>,
415 ) -> Result<(types::transaction::CallResult, Tags), Error> {
416 let dispatch_result = Self::dispatch_tx_opts(ctx, tx, opts)?;
417 let output: types::transaction::CallResult = callformat::encode_result(
418 ctx,
419 dispatch_result.result,
420 dispatch_result.call_format_metadata,
421 );
422
423 Ok((output, dispatch_result.tags))
424 }
425
426 pub fn execute_tx<C: Context>(
428 ctx: &C,
429 tx_size: u32,
430 tx_hash: Hash,
431 tx: Transaction,
432 tx_index: usize,
433 ) -> Result<ExecuteTxResult, Error> {
434 let (output, tags) = Self::execute_tx_opts(
435 ctx,
436 tx,
437 &DispatchOptions {
438 tx_size,
439 tx_index,
440 tx_hash,
441 ..Default::default()
442 },
443 )?;
444
445 Ok(ExecuteTxResult {
446 output: cbor::to_vec(output),
447 tags,
448 })
449 }
450
451 pub fn prefetch_tx(
453 prefixes: &mut BTreeSet<Prefix>,
454 tx: types::transaction::Transaction,
455 ) -> Result<(), RuntimeError> {
456 match R::Modules::prefetch(prefixes, &tx.call.method, tx.call.body, &tx.auth_info) {
457 module::DispatchResult::Handled(r) => r,
458 module::DispatchResult::Unhandled(_) => Ok(()), }
460 }
461
462 fn handle_last_round_messages<C: Context>(ctx: &C) -> Result<(), modules::core::Error> {
463 let message_events = ctx.runtime_round_results().messages.clone();
464
465 let mut handlers = CurrentState::with_store(|store| {
466 let store = storage::TypedStore::new(storage::PrefixStore::new(
467 store,
468 &modules::core::MODULE_NAME,
469 ));
470 let handlers: BTreeMap<u32, types::message::MessageEventHookInvocation> = store
471 .get(modules::core::state::MESSAGE_HANDLERS)
472 .unwrap_or_default();
473
474 handlers
475 });
476
477 for event in message_events {
478 let handler = handlers
479 .remove(&event.index)
480 .ok_or(modules::core::Error::MessageHandlerMissing(event.index))?;
481 let hook_name = handler.hook_name.clone();
482
483 R::Modules::dispatch_message_result(
484 ctx,
485 &hook_name,
486 types::message::MessageResult {
487 event,
488 context: handler.payload,
489 },
490 )
491 .ok_or(modules::core::Error::InvalidMethod(hook_name))?;
492 }
493
494 if !handlers.is_empty() {
495 error!(ctx.get_logger("dispatcher"), "message handler not invoked"; "unhandled" => ?handlers);
496 return Err(modules::core::Error::MessageHandlerNotInvoked);
497 }
498
499 Ok(())
500 }
501
502 fn save_emitted_message_handlers(handlers: Vec<types::message::MessageEventHookInvocation>) {
503 let message_handlers: BTreeMap<u32, types::message::MessageEventHookInvocation> = handlers
504 .into_iter()
505 .enumerate()
506 .map(|(idx, h)| (idx as u32, h))
507 .collect();
508
509 CurrentState::with_store(|store| {
510 let mut store = storage::TypedStore::new(storage::PrefixStore::new(
511 store,
512 &modules::core::MODULE_NAME,
513 ));
514 store.insert(modules::core::state::MESSAGE_HANDLERS, message_handlers);
515 });
516 }
517
518 pub fn dispatch_query<C: Context>(
520 ctx: &C,
521 method: &str,
522 args: Vec<u8>,
523 ) -> Result<Vec<u8>, RuntimeError> {
524 let args = cbor::from_slice(&args)
525 .map_err(|err| modules::core::Error::InvalidArgument(err.into()))?;
526
527 CurrentState::with_transaction(|| {
528 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
530 R::migrate(ctx);
532
533 if !R::is_allowed_query(method) || !ctx.is_allowed_query::<R>(method) {
534 return Err(modules::core::Error::Forbidden.into());
535 }
536
537 R::Modules::dispatch_query(ctx, method, args)
538 .ok_or_else(|| modules::core::Error::InvalidMethod(method.into()))?
539 }));
540
541 TransactionResult::Rollback(result)
545 })
546 .map_err(|err| -> RuntimeError { Error::QueryAborted(format!("{err:?}")).into() })?
547 .map(cbor::to_vec)
548 }
549
550 fn execute_batch_common<F>(
551 &self,
552 mut rt_ctx: transaction::Context<'_>,
553 f: F,
554 ) -> Result<ExecuteBatchResult, RuntimeError>
555 where
556 F: FnOnce(&RuntimeBatchContext<'_, R>) -> Result<Vec<ExecuteTxResult>, RuntimeError>,
557 {
558 let key_manager = self
560 .key_manager
561 .as_ref()
562 .map(|mgr| mgr.with_private_context());
564 let history = self.consensus_verifier.clone();
565
566 let root = storage::MKVSStore::new(&mut rt_ctx.runtime_state);
567 let ctx = RuntimeBatchContext::<'_, R>::new(
568 &self.host_info,
569 key_manager,
570 rt_ctx.header,
571 rt_ctx.round_results,
572 &rt_ctx.consensus_state,
573 &history,
574 rt_ctx.epoch,
575 rt_ctx.max_messages,
576 );
577
578 CurrentState::enter_opts(state::Options::new().with_mode(Mode::Execute), root, || {
579 R::migrate(&ctx);
581
582 Self::handle_last_round_messages(&ctx)?;
584
585 R::Modules::begin_block(&ctx);
587
588 let results = f(&ctx)?;
589
590 R::Modules::end_block(&ctx);
592
593 let (messages, handlers, block_tags) = CurrentState::with(|state| {
595 let (messages, handlers) = state.take_messages().into_iter().unzip();
596 let block_tags = state.take_all_events().into_tags();
597
598 (messages, handlers, block_tags)
599 });
600 Self::save_emitted_message_handlers(handlers);
601
602 Ok(ExecuteBatchResult {
603 results,
604 messages,
605 block_tags,
606 tx_reject_hashes: vec![],
607 in_msgs_count: 0, })
609 })
610 }
611
612 pub fn register_enclaverpc(&self, rpc: &mut RpcDispatcher)
614 where
615 R: Runtime + Send + Sync + 'static,
616 {
617 enclave_rpc::Wrapper::<R>::wrap(
618 rpc,
619 self.host.clone(),
620 self.host_info.clone(),
621 self.key_manager.clone(),
622 self.consensus_verifier.clone(),
623 );
624 }
625}
626
627impl<R: Runtime + Send + Sync> transaction::dispatcher::Dispatcher for Dispatcher<R> {
628 fn execute_batch(
629 &self,
630 rt_ctx: transaction::Context<'_>,
631 batch: &TxnBatch,
632 _in_msgs: &[roothash::IncomingMessage],
633 ) -> Result<ExecuteBatchResult, RuntimeError> {
634 self.execute_batch_common(
635 rt_ctx,
636 |ctx| -> Result<Vec<ExecuteTxResult>, RuntimeError> {
637 let prefetch_enabled = R::PREFETCH_LIMIT > 0;
639
640 let mut txs = Vec::with_capacity(batch.len());
641 let mut prefixes: BTreeSet<Prefix> = BTreeSet::new();
642 for tx in batch.iter() {
643 let tx_size = tx.len().try_into().map_err(|_| {
644 Error::MalformedTransactionInBatch(anyhow!("transaction too large"))
645 })?;
646 let tx_hash = Hash::digest_bytes(tx);
647 let tx = Self::decode_tx(ctx, tx)
653 .map_err(|err| Error::MalformedTransactionInBatch(err.into()))?;
654 txs.push((tx_size, tx_hash, tx.clone()));
655
656 if prefetch_enabled {
657 Self::prefetch_tx(&mut prefixes, tx)?;
658 }
659 }
660 if prefetch_enabled {
661 CurrentState::with_store(|store| {
662 store.prefetch_prefixes(prefixes.into_iter().collect(), R::PREFETCH_LIMIT);
663 })
664 }
665
666 let mut results = Vec::with_capacity(batch.len());
668 for (index, (tx_size, tx_hash, tx)) in txs.into_iter().enumerate() {
669 results.push(Self::execute_tx(ctx, tx_size, tx_hash, tx, index)?);
670 }
671
672 Ok(results)
673 },
674 )
675 }
676
677 fn schedule_and_execute_batch(
678 &self,
679 rt_ctx: transaction::Context<'_>,
680 batch: &mut TxnBatch,
681 _in_msgs: &[roothash::IncomingMessage],
682 ) -> Result<ExecuteBatchResult, RuntimeError> {
683 let cfg = R::SCHEDULE_CONTROL;
684 let mut tx_reject_hashes = Vec::new();
685
686 let mut result = self.execute_batch_common(
687 rt_ctx,
688 |ctx| -> Result<Vec<ExecuteTxResult>, RuntimeError> {
689 let mut new_batch = Vec::new();
694 let mut results = Vec::with_capacity(batch.len());
695 let mut requested_batch_len = cfg.initial_batch_size;
696 'batch: loop {
697 let last_batch_len = batch.len();
699 let last_batch_tx_hash = batch.last().map(|raw_tx| Hash::digest_bytes(raw_tx));
700
701 for raw_tx in batch.drain(..) {
702 let remaining_gas = R::Core::remaining_batch_gas();
705 if remaining_gas < cfg.min_remaining_gas
706 || new_batch.len() >= cfg.max_tx_count
707 {
708 break 'batch;
709 }
710
711 let tx_hash = Hash::digest_bytes(&raw_tx);
713 let tx = match Self::decode_tx(ctx, &raw_tx) {
714 Ok(tx) => tx,
715 Err(_) => {
716 tx_reject_hashes.push(tx_hash);
719 continue;
720 }
721 };
722 let tx_size = raw_tx.len().try_into().unwrap();
723
724 if tx.auth_info.fee.gas > remaining_gas {
727 continue;
728 }
729 let remaining_messages = CurrentState::with(|state| {
731 ctx.max_messages()
732 .saturating_sub(state.emitted_messages_count() as u32)
733 });
734 if tx.auth_info.fee.consensus_messages > remaining_messages {
735 continue;
736 }
737
738 let tx_index = new_batch.len();
740
741 let skip = CurrentState::with_transaction_opts(
744 state::Options::new().with_mode(Mode::PreSchedule),
745 || -> Result<_, Error> {
746 match R::Modules::authenticate_tx(ctx, &tx) {
748 Err(modules::core::Error::FutureNonce) => {
749 return Ok(true);
751 }
752 Err(_) => {
753 tx_reject_hashes.push(tx_hash);
755 return Ok(true);
756 }
757 Ok(_) => {}
758 }
759
760 let check_result = Self::dispatch_tx_opts(
762 ctx,
763 tx.clone(),
764 &DispatchOptions {
765 tx_size,
766 tx_index,
767 tx_hash,
768 skip_authentication: true, ..Default::default()
770 },
771 )?;
772
773 if !check_result.result.is_success() {
774 tx_reject_hashes.push(tx_hash);
776 return Ok(true);
777 }
778
779 Ok(false)
781 },
782 )?;
783 if skip {
784 continue;
785 }
786
787 new_batch.push(raw_tx);
788 results.push(Self::execute_tx(ctx, tx_size, tx_hash, tx, tx_index)?);
789 }
790
791 if last_batch_tx_hash.is_some()
794 && last_batch_len >= requested_batch_len as usize
795 {
796 if let Some(fetched_batch) = self
797 .schedule_control_host
798 .fetch_tx_batch(last_batch_tx_hash, cfg.batch_size)?
799 {
800 *batch = fetched_batch;
801 requested_batch_len = cfg.batch_size;
802 continue;
803 }
804 }
806 break;
807 }
808
809 *batch = new_batch.into();
811
812 Ok(results)
813 },
814 )?;
815
816 result.tx_reject_hashes = tx_reject_hashes;
818
819 Ok(result)
820 }
821
822 fn check_batch(
823 &self,
824 mut rt_ctx: transaction::Context<'_>,
825 batch: &TxnBatch,
826 ) -> Result<Vec<CheckTxResult>, RuntimeError> {
827 let prefetch_enabled = R::PREFETCH_LIMIT > 0;
829
830 let key_manager = self.key_manager.as_ref().map(|mgr| mgr.with_context());
832 let history = self.consensus_verifier.clone();
833
834 let root = storage::MKVSStore::new(&mut rt_ctx.runtime_state);
835 let ctx = RuntimeBatchContext::<'_, R>::new(
836 &self.host_info,
837 key_manager,
838 rt_ctx.header,
839 rt_ctx.round_results,
840 &rt_ctx.consensus_state,
841 &history,
842 rt_ctx.epoch,
843 rt_ctx.max_messages,
844 );
845
846 CurrentState::enter_opts(
847 state::Options::new().with_mode(state::Mode::Check),
848 root,
849 || {
850 R::migrate(&ctx);
852
853 let mut txs: Vec<Result<_, RuntimeError>> = Vec::with_capacity(batch.len());
855 let mut prefixes: BTreeSet<Prefix> = BTreeSet::new();
856 for tx in batch.iter() {
857 let tx_size = tx.len().try_into().map_err(|_| {
858 Error::MalformedTransactionInBatch(anyhow!("transaction too large"))
859 })?;
860 let res = match Self::decode_tx(&ctx, tx) {
861 Ok(tx) => {
862 if prefetch_enabled {
863 Self::prefetch_tx(&mut prefixes, tx.clone()).map(|_| (tx_size, tx))
864 } else {
865 Ok((tx_size, tx))
866 }
867 }
868 Err(err) => Err(err.into()),
869 };
870 txs.push(res);
871 }
872 if prefetch_enabled {
873 CurrentState::with_store(|store| {
874 store.prefetch_prefixes(prefixes.into_iter().collect(), R::PREFETCH_LIMIT);
875 });
876 }
877
878 let mut results = Vec::with_capacity(batch.len());
880 for tx in txs.into_iter() {
881 match tx {
882 Ok((tx_size, tx)) => results.push(Self::check_tx(&ctx, tx_size, tx)?),
883 Err(err) => results.push(CheckTxResult {
884 error: err,
885 meta: None,
886 }),
887 }
888 }
889
890 Ok(results)
891 },
892 )
893 }
894
895 fn set_abort_batch_flag(&mut self, _abort_batch: Arc<AtomicBool>) {
896 }
898
899 fn query(
900 &self,
901 mut rt_ctx: transaction::Context<'_>,
902 method: &str,
903 args: Vec<u8>,
904 ) -> Result<Vec<u8>, RuntimeError> {
905 let is_confidential_allowed = R::Modules::is_allowed_private_km_query(method)
908 && R::is_allowed_private_km_query(method);
909 if is_confidential_allowed {
910 block_on(self.consensus_verifier.verify_for_query(
913 rt_ctx.consensus_block.clone(),
914 rt_ctx.header.clone(),
915 rt_ctx.epoch,
916 ))?;
917 rt_ctx.protocol.ensure_initialized()?;
919 }
920 let key_manager = self.key_manager.as_ref().map(|mgr| {
921 if is_confidential_allowed {
922 mgr.with_private_context()
923 } else {
924 mgr.with_context()
925 }
926 });
927
928 let history = self.consensus_verifier.clone();
930
931 let root = storage::MKVSStore::new(&mut rt_ctx.runtime_state);
932 let ctx = RuntimeBatchContext::<'_, R>::new(
933 &self.host_info,
934 key_manager,
935 rt_ctx.header,
936 rt_ctx.round_results,
937 &rt_ctx.consensus_state,
938 &history,
939 rt_ctx.epoch,
940 rt_ctx.max_messages,
941 );
942
943 CurrentState::enter_opts(
944 state::Options::new()
945 .with_mode(state::Mode::Check)
946 .with_rng_local_entropy(), root,
948 || Self::dispatch_query(&ctx, method, args),
949 )
950 }
951}
952
953#[cfg(test)]
954mod test {
955 use super::*;
956 use crate::{
957 handler,
958 module::Module,
959 modules::{accounts, core},
960 sdk_derive,
961 state::{CurrentState, Options},
962 storage::Store,
963 testing::{configmap, keys, mock::Mock},
964 types::{token, transaction},
965 Version,
966 };
967 use cbor::Encode as _;
968
969 struct CoreConfig;
970 impl core::Config for CoreConfig {}
971 type Core = core::Module<CoreConfig>;
972 type Accounts = accounts::Module;
973
974 #[derive(Error, Debug, oasis_runtime_sdk_macros::Error)]
975 enum AlphabetError {
976 #[error("{0}")]
977 #[sdk_error(transparent, abort)]
978 Core(#[source] core::Error),
979 }
980
981 struct AlphabetModule;
983
984 #[sdk_derive(Module)]
985 impl AlphabetModule {
986 const NAME: &'static str = "alphabet";
987 const VERSION: u32 = 42;
988 type Error = AlphabetError;
989 type Event = ();
990 type Parameters = ();
991 type Genesis = ();
992
993 #[handler(call = "alphabet.ReadOnly")]
994 fn read_only<C: Context>(_ctx: &C, _args: ()) -> Result<u64, AlphabetError> {
995 CurrentState::with_store(|store| {
996 let _ = store.get(b"key"); });
998 Ok(42)
999 }
1000
1001 #[handler(call = "alphabet.NotReadOnly")]
1002 fn not_read_only<C: Context>(_ctx: &C, _args: ()) -> Result<u64, AlphabetError> {
1003 CurrentState::with_store(|store| {
1004 store.insert(b"key", b"value");
1005 });
1006 Ok(10)
1007 }
1008
1009 #[handler(call = "alphabet.Aborting")]
1010 fn aborting<C: Context>(_ctx: &C, _args: ()) -> Result<(), AlphabetError> {
1011 Err(AlphabetError::Core(core::Error::Abort(Error::Aborted)))
1013 }
1014
1015 #[handler(query = "alphabet.Alpha")]
1016 fn alpha<C: Context>(_ctx: &C, _args: ()) -> Result<(), AlphabetError> {
1017 Ok(())
1018 }
1019
1020 #[handler(query = "alphabet.Omega", expensive)]
1021 fn expensive<C: Context>(_ctx: &C, _args: ()) -> Result<(), AlphabetError> {
1022 Ok(())
1024 }
1025 }
1026
1027 impl module::BlockHandler for AlphabetModule {}
1028 impl module::TransactionHandler for AlphabetModule {}
1029 impl module::InvariantHandler for AlphabetModule {}
1030
1031 struct AlphabetRuntime;
1032
1033 impl Runtime for AlphabetRuntime {
1034 const VERSION: Version = Version::new(0, 0, 0);
1035 type Core = Core;
1036 type Accounts = Accounts;
1037 type Modules = (Core, AlphabetModule);
1038
1039 fn genesis_state() -> <Self::Modules as module::MigrationHandler>::Genesis {
1040 (
1041 core::Genesis {
1042 parameters: core::Parameters {
1043 max_batch_gas: u64::MAX,
1044 max_tx_size: 32 * 1024,
1045 max_tx_signers: 1,
1046 max_multisig_signers: 8,
1047 gas_costs: Default::default(),
1048 min_gas_price: BTreeMap::from([(token::Denomination::NATIVE, 0)]),
1049 dynamic_min_gas_price: Default::default(),
1050 },
1051 },
1052 (),
1053 )
1054 }
1055 }
1056
1057 #[test]
1058 fn test_allowed_queries_defaults() {
1059 let mut mock = Mock::with_local_config(BTreeMap::new());
1060 let mut ctx = mock.create_ctx_for_runtime::<AlphabetRuntime>(false);
1061
1062 Dispatcher::<AlphabetRuntime>::dispatch_query(
1063 &mut ctx,
1064 "alphabet.Alpha",
1065 cbor::to_vec(().into_cbor_value()),
1066 )
1067 .expect("alphabet.Alpha is an inexpensive query, allowed by default");
1068
1069 Dispatcher::<AlphabetRuntime>::dispatch_query(
1070 &mut ctx,
1071 "alphabet.Omega",
1072 cbor::to_vec(().into_cbor_value()),
1073 )
1074 .expect_err("alphabet.Omega is an expensive query, disallowed by default");
1075 }
1076
1077 #[test]
1078 fn test_allowed_queries_custom() {
1079 let local_config = configmap! {
1080 "estimate_gas_by_simulating_contracts" => true,
1082 "allowed_queries" => vec![
1083 configmap! {"alphabet.Alpha" => false},
1084 configmap! {"all_expensive" => true},
1085 configmap! {"all" => true} ],
1087 };
1088 let mut mock = Mock::with_local_config(local_config);
1089 let mut ctx = mock.create_ctx_for_runtime::<AlphabetRuntime>(false);
1090
1091 CurrentState::with_transaction_opts(Options::new().with_mode(state::Mode::Check), || {
1092 Dispatcher::<AlphabetRuntime>::dispatch_query(
1093 &mut ctx,
1094 "alphabet.Alpha",
1095 cbor::to_vec(().into_cbor_value()),
1096 )
1097 .expect_err("alphabet.Alpha is a disallowed query");
1098
1099 Dispatcher::<AlphabetRuntime>::dispatch_query(
1100 &mut ctx,
1101 "alphabet.Omega",
1102 cbor::to_vec(().into_cbor_value()),
1103 )
1104 .expect("alphabet.Omega is an expensive query and expensive queries are allowed");
1105
1106 TransactionResult::Rollback(())
1107 });
1108 }
1109
1110 #[test]
1111 fn test_dispatch_read_only_call() {
1112 let mut mock = Mock::default();
1113 let mut ctx = mock.create_ctx_for_runtime::<AlphabetRuntime>(false);
1114
1115 AlphabetRuntime::migrate(&mut ctx);
1116
1117 let mut tx = transaction::Transaction {
1118 version: 1,
1119 call: transaction::Call {
1120 format: transaction::CallFormat::Plain,
1121 method: "alphabet.ReadOnly".to_owned(),
1122 read_only: true,
1123 ..Default::default()
1124 },
1125 auth_info: transaction::AuthInfo {
1126 signer_info: vec![transaction::SignerInfo::new_sigspec(
1127 keys::alice::sigspec(),
1128 0,
1129 )],
1130 fee: transaction::Fee {
1131 amount: token::BaseUnits::default(),
1132 gas: 1000,
1133 ..Default::default()
1134 },
1135 ..Default::default()
1136 },
1137 };
1138
1139 let dispatch_result =
1141 Dispatcher::<AlphabetRuntime>::dispatch_tx(&mut ctx, 1024, tx.clone(), 0)
1142 .expect("read only method dispatch should work");
1143 let result = dispatch_result.result.unwrap();
1144 let result: u64 = cbor::from_value(result).unwrap();
1145 assert_eq!(result, 42);
1146
1147 tx.call.method = "alphabet.NotReadOnly".to_owned();
1149
1150 let dispatch_result = Dispatcher::<AlphabetRuntime>::dispatch_tx(&mut ctx, 1024, tx, 0)
1151 .expect("read only method dispatch should work");
1152 match dispatch_result.result {
1153 module::CallResult::Failed {
1154 module,
1155 code,
1156 message,
1157 } => {
1158 assert_eq!(&module, "core");
1159 assert_eq!(code, 25);
1160 assert_eq!(&message, "read-only transaction attempted modifications")
1161 }
1162 _ => panic!("not read only method execution did not fail"),
1163 }
1164 }
1165
1166 #[test]
1167 fn test_dispatch_abort_forwarding() {
1168 let mut mock = Mock::default();
1169 let mut ctx = mock.create_ctx_for_runtime::<AlphabetRuntime>(false);
1170
1171 AlphabetRuntime::migrate(&mut ctx);
1172
1173 let tx = transaction::Transaction {
1174 version: 1,
1175 call: transaction::Call {
1176 format: transaction::CallFormat::Plain,
1177 method: "alphabet.Aborting".to_owned(),
1178 ..Default::default()
1179 },
1180 auth_info: transaction::AuthInfo {
1181 signer_info: vec![transaction::SignerInfo::new_sigspec(
1182 keys::alice::sigspec(),
1183 0,
1184 )],
1185 fee: transaction::Fee {
1186 amount: token::BaseUnits::default(),
1187 gas: 1000,
1188 ..Default::default()
1189 },
1190 ..Default::default()
1191 },
1192 };
1193
1194 let dispatch_result =
1196 Dispatcher::<AlphabetRuntime>::dispatch_tx(&mut ctx, 1024, tx.clone(), 0);
1197 assert!(matches!(dispatch_result, Err(Error::Aborted)));
1198 }
1199}