oasis_core_runtime/
dispatcher.rs

1//! Runtime call dispatcher.
2use std::{
3    convert::TryInto,
4    sync::{Arc, Condvar, Mutex},
5    thread,
6};
7
8use anyhow::Result as AnyResult;
9use rustc_hex::ToHex;
10use slog::{debug, error, info, warn, Logger};
11use tokio::sync::mpsc;
12
13use crate::{
14    app, attestation, cache,
15    common::{
16        crypto::{hash::Hash, signature::Signer},
17        logger::get_logger,
18        panic::AbortOnPanic,
19        sgx::QuotePolicy,
20    },
21    consensus::{
22        beacon::EpochTime,
23        roothash::{self, ComputeResultsHeader, Header, COMPUTE_RESULTS_HEADER_SIGNATURE_CONTEXT},
24        state::keymanager::Status as KeyManagerStatus,
25        verifier::Verifier,
26        LightBlock,
27    },
28    enclave_rpc::{
29        demux::Demux as RpcDemux,
30        dispatcher::Dispatcher as RpcDispatcher,
31        session::{self, SessionInfo},
32        types::{
33            Kind as RpcKind, Message as RpcMessage, Request as RpcRequest, Response as RpcResponse,
34        },
35        Context as RpcContext,
36    },
37    future::block_on,
38    identity::Identity,
39    policy::PolicyVerifier,
40    protocol::Protocol,
41    storage::mkvs::{sync::NoopReadSyncer, OverlayTree, Root, RootType},
42    transaction::{
43        dispatcher::{Dispatcher as TxnDispatcher, NoopDispatcher as TxnNoopDispatcher},
44        tree::Tree as TxnTree,
45        types::TxnBatch,
46        Context as TxnContext,
47    },
48    types::{Body, ComputedBatch, Error, ExecutionMode},
49};
50
51/// Maximum amount of requests that can be in the dispatcher queue.
52const BACKLOG_SIZE: usize = 1000;
53
54/// Maximum total number of EnclaveRPC sessions.
55const RPC_MAX_SESSIONS: usize = 1024;
56/// Maximum concurrent EnclaveRPC sessions per peer. In case more sessions are open, old sessions
57/// will be closed to make room for new sessions.
58const RPC_MAX_SESSIONS_PER_PEER: usize = 8;
59/// EnclaveRPC sessions without any processed frame for more than RPC_STALE_SESSION_TIMEOUT_SECS
60/// seconds can be closed to make room for new sessions.
61const RPC_STALE_SESSION_TIMEOUT_SECS: i64 = 10;
62
63/// Interface for dispatcher initializers.
64pub trait Initializer: Send + Sync {
65    /// Initializes the dispatcher(s).
66    fn init(self: Box<Self>, state: PreInitState<'_>) -> PostInitState;
67}
68
69impl<F> Initializer for F
70where
71    F: FnOnce(PreInitState<'_>) -> PostInitState + Send + Sync,
72{
73    fn init(self: Box<Self>, state: PreInitState<'_>) -> PostInitState {
74        self(state)
75    }
76}
77
78/// State available before initialization.
79pub struct PreInitState<'a> {
80    /// Protocol instance.
81    pub protocol: &'a Arc<Protocol>,
82    /// Runtime Attestation Key instance.
83    pub identity: &'a Arc<Identity>,
84    /// RPC demultiplexer instance.
85    pub rpc_demux: &'a mut RpcDemux,
86    /// RPC dispatcher instance.
87    pub rpc_dispatcher: &'a mut RpcDispatcher,
88    /// Consensus verifier instance.
89    pub consensus_verifier: &'a Arc<dyn Verifier>,
90}
91
92/// State returned by the initializer.
93#[derive(Default)]
94pub struct PostInitState {
95    /// Optional transaction dispatcher that should be used.
96    pub txn_dispatcher: Option<Box<dyn TxnDispatcher>>,
97    /// Optional ROFL application.
98    pub app: Option<Box<dyn app::App>>,
99}
100
101impl From<tokio::task::JoinError> for Error {
102    fn from(e: tokio::task::JoinError) -> Self {
103        Error::new(
104            "dispatcher",
105            1,
106            &format!("error while processing request: {e}"),
107        )
108    }
109}
110
111/// State related to dispatching a runtime transaction.
112struct TxDispatchState {
113    mode: ExecutionMode,
114    consensus_block: LightBlock,
115    consensus_verifier: Arc<dyn Verifier>,
116    header: Header,
117    epoch: EpochTime,
118    round_results: roothash::RoundResults,
119    max_messages: u32,
120    check_only: bool,
121}
122
123/// State provided by the protocol upon successful initialization.
124struct ProtocolState {
125    protocol: Arc<Protocol>,
126    consensus_verifier: Arc<dyn Verifier>,
127}
128
129/// State held by the dispatcher, shared between all async tasks.
130#[derive(Clone)]
131struct State {
132    protocol: Arc<Protocol>,
133    consensus_verifier: Arc<dyn Verifier>,
134    dispatcher: Arc<Dispatcher>,
135    app: Arc<dyn app::App>,
136    rpc_demux: Arc<RpcDemux>,
137    rpc_dispatcher: Arc<RpcDispatcher>,
138    txn_dispatcher: Arc<dyn TxnDispatcher>,
139    attestation_handler: attestation::Handler,
140    policy_verifier: Arc<PolicyVerifier>,
141    cache_set: cache::CacheSet,
142}
143
144#[derive(Debug)]
145enum Command {
146    Request(u64, Body),
147}
148
149/// Runtime call dispatcher.
150pub struct Dispatcher {
151    logger: Logger,
152    queue_tx: mpsc::Sender<Command>,
153    identity: Arc<Identity>,
154
155    state: Mutex<Option<ProtocolState>>,
156    state_cond: Condvar,
157
158    tokio_runtime: tokio::runtime::Handle,
159}
160
161impl Dispatcher {
162    /// Create a new runtime call dispatcher.
163    pub fn new(
164        tokio_runtime: tokio::runtime::Handle,
165        initializer: Box<dyn Initializer>,
166        identity: Arc<Identity>,
167    ) -> Arc<Self> {
168        let (tx, rx) = mpsc::channel(BACKLOG_SIZE);
169
170        let dispatcher = Arc::new(Dispatcher {
171            logger: get_logger("runtime/dispatcher"),
172            queue_tx: tx,
173            identity,
174            state: Mutex::new(None),
175            state_cond: Condvar::new(),
176            tokio_runtime,
177        });
178
179        // Spawn the dispatcher processing thread.
180        let d = dispatcher.clone();
181        thread::spawn(move || {
182            let _guard = AbortOnPanic;
183            d.run(initializer, rx);
184        });
185
186        dispatcher
187    }
188
189    /// Start the dispatcher.
190    pub fn start(&self, protocol: Arc<Protocol>, consensus_verifier: Box<dyn Verifier>) {
191        let consensus_verifier = Arc::from(consensus_verifier);
192        let mut s = self.state.lock().unwrap();
193        *s = Some(ProtocolState {
194            protocol,
195            consensus_verifier,
196        });
197        self.state_cond.notify_one();
198    }
199
200    /// Queue a new request to be dispatched.
201    pub fn queue_request(&self, id: u64, body: Body) -> AnyResult<()> {
202        self.queue_tx.blocking_send(Command::Request(id, body))?;
203        Ok(())
204    }
205
206    fn run(self: &Arc<Self>, initializer: Box<dyn Initializer>, mut rx: mpsc::Receiver<Command>) {
207        // Wait for the state to be available.
208        let ProtocolState {
209            protocol,
210            consensus_verifier,
211        } = {
212            let mut guard = self.state.lock().unwrap();
213            while guard.is_none() {
214                guard = self.state_cond.wait(guard).unwrap();
215            }
216
217            guard.take().unwrap()
218        };
219
220        // Ensure Tokio runtime is available during dispatcher initialization.
221        let _guard = self.tokio_runtime.enter();
222
223        // Create actual dispatchers for RPCs and transactions.
224        info!(self.logger, "Starting the runtime dispatcher");
225        let mut rpc_demux = RpcDemux::new(
226            session::Builder::default().local_identity(self.identity.clone()),
227            RPC_MAX_SESSIONS,
228            RPC_MAX_SESSIONS_PER_PEER,
229            RPC_STALE_SESSION_TIMEOUT_SECS,
230        );
231        let mut rpc_dispatcher = RpcDispatcher::default();
232        let pre_init_state = PreInitState {
233            protocol: &protocol,
234            identity: &self.identity,
235            rpc_demux: &mut rpc_demux,
236            rpc_dispatcher: &mut rpc_dispatcher,
237            consensus_verifier: &consensus_verifier,
238        };
239        let post_init_state = initializer.init(pre_init_state);
240        let txn_dispatcher = post_init_state
241            .txn_dispatcher
242            .unwrap_or_else(|| Box::<TxnNoopDispatcher>::default());
243        let mut app = post_init_state
244            .app
245            .unwrap_or_else(|| Box::new(app::NoopApp));
246
247        // Initialize the application.
248        if let Err(err) = app.on_init(protocol.clone()) {
249            error!(self.logger, "ROFL application initialization failed"; "err" => ?err);
250        }
251
252        let app: Arc<dyn app::App> = Arc::from(app);
253        let state = State {
254            protocol: protocol.clone(),
255            consensus_verifier: consensus_verifier.clone(),
256            dispatcher: self.clone(),
257            app: app.clone(),
258            rpc_demux: Arc::new(rpc_demux),
259            rpc_dispatcher: Arc::new(rpc_dispatcher),
260            txn_dispatcher: Arc::from(txn_dispatcher),
261            attestation_handler: attestation::Handler::new(
262                self.identity.clone(),
263                protocol.clone(),
264                consensus_verifier.clone(),
265                protocol.get_runtime_id(),
266                protocol.get_config().version,
267                app,
268            ),
269            policy_verifier: Arc::new(PolicyVerifier::new(consensus_verifier)),
270            cache_set: cache::CacheSet::new(protocol.clone()),
271        };
272
273        // Start the async message processing task.
274        self.tokio_runtime.block_on(async move {
275            while let Some(cmd) = rx.recv().await {
276                // Process received command.
277                match cmd {
278                    Command::Request(id, request) => {
279                        // Process request in its own task.
280                        let state = state.clone();
281
282                        tokio::spawn(async move {
283                            let protocol = state.protocol.clone();
284                            let dispatcher = state.dispatcher.clone();
285                            let result = dispatcher.handle_request(state, request).await;
286
287                            // Send response.
288                            let response = match result {
289                                Ok(body) => body,
290                                Err(error) => Body::Error(error),
291                            };
292                            protocol.send_response(id, response).unwrap();
293                        });
294                    }
295                }
296            }
297        });
298
299        info!(self.logger, "Runtime call dispatcher is terminating");
300    }
301
302    async fn handle_request(self: &Arc<Self>, state: State, request: Body) -> Result<Body, Error> {
303        match request {
304            // Attestation-related requests.
305            Body::RuntimeCapabilityTEERakInitRequest { .. }
306            | Body::RuntimeCapabilityTEERakReportRequest {}
307            | Body::RuntimeCapabilityTEERakAvrRequest { .. }
308            | Body::RuntimeCapabilityTEERakQuoteRequest { .. }
309            | Body::RuntimeCapabilityTEEUpdateEndorsementRequest { .. } => {
310                Ok(state.attestation_handler.handle(request).await?)
311            }
312
313            // RPC requests.
314            Body::RuntimeRPCCallRequest {
315                request,
316                kind,
317                peer_id,
318            } => {
319                debug!(self.logger, "Received RPC call request";
320                    "kind" => ?kind,
321                    "peer_id" => peer_id.to_hex::<String>(),
322                );
323
324                match kind {
325                    RpcKind::NoiseSession => {
326                        self.dispatch_secure_rpc(state, request, peer_id).await
327                    }
328                    RpcKind::InsecureQuery => self.dispatch_insecure_rpc(state, request).await,
329                    RpcKind::LocalQuery => self.dispatch_local_rpc(state, request).await,
330                }
331            }
332            Body::RuntimeLocalRPCCallRequest { request } => {
333                debug!(self.logger, "Received RPC call request";
334                    "kind" => ?RpcKind::LocalQuery,
335                );
336
337                self.dispatch_local_rpc(state, request).await
338            }
339
340            // RONL.
341            // TODO: Refactor this so it can be part of an "app".
342            Body::RuntimeExecuteTxBatchRequest {
343                mode,
344                consensus_block,
345                round_results,
346                io_root,
347                inputs,
348                in_msgs,
349                block,
350                epoch,
351                max_messages,
352            } => {
353                // Transaction execution.
354                self.dispatch_txn(
355                    state.cache_set,
356                    &state.txn_dispatcher,
357                    &state.protocol,
358                    io_root,
359                    inputs.unwrap_or_default(),
360                    in_msgs,
361                    TxDispatchState {
362                        mode,
363                        consensus_block,
364                        consensus_verifier: state.consensus_verifier,
365                        header: block.header,
366                        epoch,
367                        round_results,
368                        max_messages,
369                        check_only: false,
370                    },
371                )
372                .await
373            }
374            Body::RuntimeCheckTxBatchRequest {
375                consensus_block,
376                inputs,
377                block,
378                epoch,
379                max_messages,
380            } => {
381                // Transaction check.
382                self.dispatch_txn(
383                    state.cache_set,
384                    &state.txn_dispatcher,
385                    &state.protocol,
386                    Hash::default(),
387                    inputs,
388                    vec![],
389                    TxDispatchState {
390                        mode: ExecutionMode::Execute,
391                        consensus_block,
392                        consensus_verifier: state.consensus_verifier,
393                        header: block.header,
394                        epoch,
395                        round_results: Default::default(),
396                        max_messages,
397                        check_only: true,
398                    },
399                )
400                .await
401            }
402            Body::RuntimeQueryRequest {
403                consensus_block,
404                header,
405                epoch,
406                max_messages,
407                method,
408                args,
409            } if state.txn_dispatcher.is_supported() => {
410                // Query.
411                self.dispatch_query(
412                    state.cache_set,
413                    &state.txn_dispatcher,
414                    &state.protocol,
415                    method,
416                    args,
417                    TxDispatchState {
418                        mode: ExecutionMode::Execute,
419                        consensus_block,
420                        consensus_verifier: state.consensus_verifier,
421                        header,
422                        epoch,
423                        round_results: Default::default(),
424                        max_messages,
425                        check_only: true,
426                    },
427                )
428                .await
429            }
430
431            // ROFL.
432            Body::RuntimeQueryRequest { method, args, .. } if state.app.is_rofl() => state
433                .app
434                .query(&method, args)
435                .await
436                .map(|data| Body::RuntimeQueryResponse { data })
437                .map_err(Into::into),
438            Body::RuntimeNotifyRequest {
439                runtime_block,
440                runtime_event,
441            } => {
442                if let Some(runtime_block) = runtime_block {
443                    if let Err(err) = state.app.on_runtime_block(&runtime_block).await {
444                        error!(self.logger, "Application block notification failed"; "err" => ?err);
445                    }
446                }
447                if let Some(runtime_event) = runtime_event {
448                    if let Err(err) = state
449                        .app
450                        .on_runtime_event(&runtime_event.block, &runtime_event.tags)
451                        .await
452                    {
453                        error!(self.logger, "Application event notification failed"; "err" => ?err);
454                    }
455                }
456
457                Ok(Body::Empty {})
458            }
459
460            // Other requests.
461            Body::RuntimeKeyManagerStatusUpdateRequest { status } => {
462                // Key manager status update local RPC call.
463                self.handle_km_status_update(state, status).await
464            }
465            Body::RuntimeKeyManagerQuotePolicyUpdateRequest {
466                policy: quote_policy,
467            } => {
468                // Key manager quote policy update local RPC call.
469                self.handle_km_quote_policy_update(state, quote_policy)
470                    .await
471            }
472            Body::RuntimeConsensusSyncRequest { height } => state
473                .consensus_verifier
474                .sync(height)
475                .await
476                .map_err(Into::into)
477                .map(|_| Body::RuntimeConsensusSyncResponse {}),
478
479            _ => {
480                error!(self.logger, "Unsupported request type");
481                Err(Error::new("dispatcher", 1, "Unsupported request type"))
482            }
483        }
484    }
485
486    #[allow(clippy::too_many_arguments)]
487    async fn dispatch_query(
488        &self,
489        cache_set: cache::CacheSet,
490        txn_dispatcher: &Arc<dyn TxnDispatcher>,
491        protocol: &Arc<Protocol>,
492        method: String,
493        args: Vec<u8>,
494        state: TxDispatchState,
495    ) -> Result<Body, Error> {
496        debug!(self.logger, "Received query request";
497            "method" => &method,
498            "state_root" => ?state.header.state_root,
499            "round" => ?state.header.round,
500        );
501
502        // Verify that the runtime ID matches the block's namespace. This is a protocol violation
503        // as the compute node should never change the runtime ID.
504        if state.header.namespace != protocol.get_runtime_id() {
505            return Err(Error::new(
506                "dispatcher",
507                1,
508                &format!(
509                    "block namespace does not match runtime id (namespace: {:?} runtime ID: {:?})",
510                    state.header.namespace,
511                    protocol.get_runtime_id(),
512                ),
513            ));
514        }
515
516        let protocol = protocol.clone();
517        let txn_dispatcher = txn_dispatcher.clone();
518
519        // For queries we don't do any consensus layer integrity verification by default and it
520        // is up to the runtime to decide whether this is critical on a query-by-query basis.
521        let consensus_state = state
522            .consensus_verifier
523            .unverified_state(state.consensus_block.clone())
524            .await?;
525
526        tokio::task::spawn_blocking(move || {
527            let cache = cache_set.query(Root {
528                namespace: state.header.namespace,
529                version: state.header.round,
530                root_type: RootType::State,
531                hash: state.header.state_root,
532            });
533            let mut cache = cache.borrow_mut();
534            let mut overlay = OverlayTree::new(cache.tree_mut());
535
536            let txn_ctx = TxnContext::new(
537                protocol,
538                &state.consensus_block,
539                consensus_state,
540                &mut overlay,
541                &state.header,
542                state.epoch,
543                &state.round_results,
544                state.max_messages,
545                state.check_only,
546            );
547
548            txn_dispatcher
549                .query(txn_ctx, &method, args)
550                .map(|data| Body::RuntimeQueryResponse { data })
551        })
552        .await?
553    }
554
555    fn txn_check_batch(
556        &self,
557        protocol: Arc<Protocol>,
558        cache_set: cache::CacheSet,
559        txn_dispatcher: &dyn TxnDispatcher,
560        inputs: TxnBatch,
561        state: TxDispatchState,
562    ) -> Result<Body, Error> {
563        // For check-only we don't do any consensus layer integrity verification.
564        // TODO: Make this async.
565        let consensus_state = block_on(
566            state
567                .consensus_verifier
568                .unverified_state(state.consensus_block.clone()),
569        )?;
570
571        let mut cache = cache_set.check(Root {
572            namespace: state.header.namespace,
573            version: state.header.round,
574            root_type: RootType::State,
575            hash: state.header.state_root,
576        });
577        let mut overlay = OverlayTree::new(cache.tree_mut());
578
579        let txn_ctx = TxnContext::new(
580            protocol.clone(),
581            &state.consensus_block,
582            consensus_state,
583            &mut overlay,
584            &state.header,
585            state.epoch,
586            &state.round_results,
587            state.max_messages,
588            state.check_only,
589        );
590        let results = txn_dispatcher.check_batch(txn_ctx, &inputs);
591
592        if protocol.get_config().persist_check_tx_state {
593            // Commit results to in-memory tree so they persist for subsequent batches that are
594            // based on the same block.
595            let _ = overlay.commit().unwrap();
596        }
597
598        debug!(self.logger, "Transaction batch check complete");
599
600        results.map(|results| Body::RuntimeCheckTxBatchResponse { results })
601    }
602
603    #[allow(clippy::too_many_arguments)]
604    fn txn_execute_batch(
605        &self,
606        protocol: Arc<Protocol>,
607        cache_set: cache::CacheSet,
608        txn_dispatcher: &dyn TxnDispatcher,
609        mut inputs: TxnBatch,
610        in_msgs: Vec<roothash::IncomingMessage>,
611        io_root: Hash,
612        state: TxDispatchState,
613    ) -> Result<Body, Error> {
614        // Verify consensus state and runtime state root integrity before execution.
615        // TODO: Make this async.
616        let consensus_state = block_on(state.consensus_verifier.verify(
617            state.consensus_block.clone(),
618            state.header.clone(),
619            state.epoch,
620        ))?;
621        // Ensure the runtime is still ready to process requests.
622        protocol.ensure_initialized()?;
623
624        let header = &state.header;
625
626        let mut cache = cache_set.execute(Root {
627            namespace: state.header.namespace,
628            version: state.header.round,
629            root_type: RootType::State,
630            hash: state.header.state_root,
631        });
632        let mut overlay = OverlayTree::new(cache.tree_mut());
633
634        let txn_ctx = TxnContext::new(
635            protocol,
636            &state.consensus_block,
637            consensus_state,
638            &mut overlay,
639            header,
640            state.epoch,
641            &state.round_results,
642            state.max_messages,
643            state.check_only,
644        );
645
646        // Perform execution based on the passed mode.
647        let mut results = match state.mode {
648            ExecutionMode::Execute => {
649                // Just execute the batch.
650                txn_dispatcher.execute_batch(txn_ctx, &inputs, &in_msgs)?
651            }
652            ExecutionMode::Schedule => {
653                // Allow the runtime to arbitrarily update the batch.
654                txn_dispatcher.schedule_and_execute_batch(txn_ctx, &mut inputs, &in_msgs)?
655            }
656        };
657
658        // Finalize state.
659        let (state_write_log, new_state_root) = overlay
660            .commit_both(header.namespace, header.round + 1)
661            .expect("state commit must succeed");
662
663        txn_dispatcher.finalize(new_state_root);
664        cache.commit(header.round + 1, new_state_root);
665
666        // Generate I/O root. Since we already fetched the inputs we avoid the need
667        // to fetch them again by generating the previous I/O tree (generated by the
668        // transaction scheduler) from the inputs.
669        let mut txn_tree = TxnTree::new(
670            Box::new(NoopReadSyncer),
671            Root {
672                namespace: header.namespace,
673                version: header.round + 1,
674                root_type: RootType::IO,
675                hash: Hash::empty_hash(),
676            },
677        );
678        let mut hashes = Vec::new();
679        for (batch_order, input) in inputs.drain(..).enumerate() {
680            hashes.push(Hash::digest_bytes(&input));
681            txn_tree
682                .add_input(input, batch_order.try_into().unwrap())
683                .expect("add transaction must succeed");
684        }
685
686        let (input_write_log, input_io_root) = txn_tree.commit().expect("io commit must succeed");
687
688        assert!(
689            state.mode != ExecutionMode::Execute || input_io_root == io_root,
690            "dispatcher: I/O root inconsistent with inputs (expected: {:?} got: {:?})",
691            io_root,
692            input_io_root
693        );
694
695        for (tx_hash, result) in hashes.iter().zip(results.results.drain(..)) {
696            txn_tree
697                .add_output(*tx_hash, result.output, result.tags)
698                .expect("add transaction must succeed");
699        }
700
701        txn_tree
702            .add_block_tags(results.block_tags)
703            .expect("adding block tags must succeed");
704
705        let (io_write_log, io_root) = txn_tree.commit().expect("io commit must succeed");
706
707        let header = ComputeResultsHeader {
708            round: header.round + 1,
709            previous_hash: header.encoded_hash(),
710            io_root: Some(io_root),
711            state_root: Some(new_state_root),
712            messages_hash: Some(roothash::Message::messages_hash(&results.messages)),
713            in_msgs_hash: Some(roothash::IncomingMessage::in_messages_hash(
714                &in_msgs[..results.in_msgs_count],
715            )),
716            in_msgs_count: results.in_msgs_count.try_into().unwrap(),
717        };
718
719        debug!(self.logger, "Transaction batch execution complete";
720            "previous_hash" => ?header.previous_hash,
721            "io_root" => ?header.io_root,
722            "state_root" => ?header.state_root,
723            "messages_hash" => ?header.messages_hash,
724            "in_msgs_hash" => ?header.in_msgs_hash,
725        );
726
727        let rak_sig = self
728            .identity
729            .sign(
730                COMPUTE_RESULTS_HEADER_SIGNATURE_CONTEXT,
731                &cbor::to_vec(header.clone()),
732            )
733            .unwrap();
734
735        Ok(Body::RuntimeExecuteTxBatchResponse {
736            batch: ComputedBatch {
737                header,
738                io_write_log,
739                state_write_log,
740                rak_sig,
741                messages: results.messages,
742            },
743            tx_hashes: hashes,
744            tx_reject_hashes: results.tx_reject_hashes,
745            tx_input_root: input_io_root,
746            tx_input_write_log: input_write_log,
747        })
748    }
749
750    #[allow(clippy::too_many_arguments)]
751    async fn dispatch_txn(
752        self: &Arc<Self>,
753        cache_set: cache::CacheSet,
754        txn_dispatcher: &Arc<dyn TxnDispatcher>,
755        protocol: &Arc<Protocol>,
756        io_root: Hash,
757        inputs: TxnBatch,
758        in_msgs: Vec<roothash::IncomingMessage>,
759        state: TxDispatchState,
760    ) -> Result<Body, Error> {
761        // Make sure to abort the process on panic during transaction processing as that indicates
762        // a serious problem and should make sure to clean up the process.
763        let _guard = AbortOnPanic;
764
765        debug!(self.logger, "Received transaction batch request";
766            "state_root" => ?state.header.state_root,
767            "round" => state.header.round + 1,
768            "round_results" => ?state.round_results,
769            "tx_count" => inputs.len(),
770            "in_msg_count" => in_msgs.len(),
771            "check_only" => state.check_only,
772        );
773
774        // Verify that the runtime ID matches the block's namespace. This is a protocol violation
775        // as the compute node should never change the runtime ID.
776        assert!(
777            state.header.namespace == protocol.get_runtime_id(),
778            "block namespace does not match runtime id (namespace: {:?} runtime ID: {:?})",
779            state.header.namespace,
780            protocol.get_runtime_id(),
781        );
782
783        let protocol = protocol.clone();
784        let dispatcher = self.clone();
785        let txn_dispatcher = txn_dispatcher.clone();
786
787        tokio::task::spawn_blocking(move || {
788            if state.check_only {
789                dispatcher.txn_check_batch(protocol, cache_set, &txn_dispatcher, inputs, state)
790            } else {
791                dispatcher.txn_execute_batch(
792                    protocol,
793                    cache_set,
794                    &txn_dispatcher,
795                    inputs,
796                    in_msgs,
797                    io_root,
798                    state,
799                )
800            }
801        })
802        .await
803        .unwrap() // Propagate panics during transaction dispatch.
804    }
805
806    async fn dispatch_secure_rpc(
807        &self,
808        state: State,
809        request: Vec<u8>,
810        peer_id: Vec<u8>,
811    ) -> Result<Body, Error> {
812        // Make sure to abort the process on panic during RPC processing as that indicates a
813        // serious problem and should make sure to clean up the process.
814        let _guard = AbortOnPanic;
815
816        // Process frame.
817        let mut buffer = vec![];
818        let (mut session, message) = state
819            .rpc_demux
820            .process_frame(peer_id, request, &mut buffer)
821            .await?;
822
823        if let Some(message) = message {
824            // Dispatch request.
825            assert!(
826                buffer.is_empty(),
827                "must have no handshake data in transport mode"
828            );
829
830            match message {
831                RpcMessage::Request(req) => {
832                    // Request, dispatch.
833                    let response = self
834                        .dispatch_rpc(req, RpcKind::NoiseSession, session.info(), &state)
835                        .await?;
836                    let response = RpcMessage::Response(response);
837
838                    // Note: MKVS commit is omitted, this MUST be global side-effect free.
839
840                    debug!(self.logger, "RPC call dispatch complete";
841                        "kind" => ?RpcKind::NoiseSession,
842                    );
843
844                    let mut buffer = vec![];
845                    session
846                        .write_message(response, &mut buffer)
847                        .map_err(|err| {
848                            error!(self.logger, "Error while writing response"; "err" => %err);
849                            Error::new("rhp/dispatcher", 1, &format!("{err}"))
850                        })
851                        .map(|_| Body::RuntimeRPCCallResponse { response: buffer })
852                }
853                RpcMessage::Close => {
854                    // Session close.
855                    let mut buffer = vec![];
856                    state
857                        .rpc_demux
858                        .close(session, &mut buffer)
859                        .map_err(|err| {
860                            error!(self.logger, "Error while closing session"; "err" => %err);
861                            Error::new("rhp/dispatcher", 1, &format!("{err}"))
862                        })
863                        .map(|_| Body::RuntimeRPCCallResponse { response: buffer })
864                }
865                msg => {
866                    warn!(self.logger, "Ignoring invalid RPC message type"; "msg" => ?msg);
867                    Err(Error::new("rhp/dispatcher", 1, "invalid RPC message type"))
868                }
869            }
870        } else {
871            // Send back any handshake frames.
872            Ok(Body::RuntimeRPCCallResponse { response: buffer })
873        }
874    }
875
876    async fn dispatch_insecure_rpc(&self, state: State, request: Vec<u8>) -> Result<Body, Error> {
877        // Make sure to abort the process on panic during RPC processing as that indicates a
878        // serious problem and should make sure to clean up the process.
879        let _guard = AbortOnPanic;
880
881        let request: RpcRequest = cbor::from_slice(&request)
882            .map_err(|_| Error::new("rhp/dispatcher", 1, "malformed request"))?;
883
884        // Request, dispatch.
885        let response = self
886            .dispatch_rpc(request, RpcKind::InsecureQuery, None, &state)
887            .await?;
888        let response = cbor::to_vec(response);
889
890        // Note: MKVS commit is omitted, this MUST be global side-effect free.
891
892        debug!(self.logger, "RPC call dispatch complete";
893            "kind" => ?RpcKind::InsecureQuery,
894        );
895
896        Ok(Body::RuntimeRPCCallResponse { response })
897    }
898
899    async fn dispatch_local_rpc(&self, state: State, request: Vec<u8>) -> Result<Body, Error> {
900        // Make sure to abort the process on panic during local RPC processing as that indicates a
901        // serious problem and should make sure to clean up the process.
902        let _guard = AbortOnPanic;
903
904        let request = cbor::from_slice(&request)
905            .map_err(|_| Error::new("rhp/dispatcher", 1, "malformed request"))?;
906
907        // Request, dispatch.
908        let response = self
909            .dispatch_rpc(request, RpcKind::LocalQuery, None, &state)
910            .await?;
911        let response = RpcMessage::Response(response);
912        let response = cbor::to_vec(response);
913
914        debug!(self.logger, "RPC call dispatch complete";
915            "kind" => ?RpcKind::LocalQuery,
916        );
917
918        Ok(Body::RuntimeLocalRPCCallResponse { response })
919    }
920
921    async fn dispatch_rpc(
922        &self,
923        request: RpcRequest,
924        kind: RpcKind,
925        session_info: Option<Arc<SessionInfo>>,
926        state: &State,
927    ) -> Result<RpcResponse, Error> {
928        let rpc_dispatcher = state.rpc_dispatcher.clone();
929
930        let response = tokio::task::spawn_blocking(move || {
931            let rpc_ctx = RpcContext::new(session_info);
932            rpc_dispatcher.dispatch(rpc_ctx, request, kind)
933        })
934        .await?;
935
936        Ok(response)
937    }
938
939    async fn handle_km_status_update(
940        &self,
941        state: State,
942        status: KeyManagerStatus,
943    ) -> Result<Body, Error> {
944        // Make sure to abort the process on panic during policy processing as that indicates a
945        // serious problem and should make sure to clean up the process.
946        let _guard = AbortOnPanic;
947
948        debug!(self.logger, "Received km status update request");
949
950        // Verify and decode the status.
951        let runtime_id = state.protocol.get_host_info().runtime_id;
952
953        tokio::task::spawn_blocking(move || -> Result<(), Error> {
954            let key_manager = state.policy_verifier.key_manager(&runtime_id)?;
955            let published_status = state
956                .policy_verifier
957                .verify_key_manager_status(status, key_manager)?;
958
959            // Dispatch the local RPC call.
960            state
961                .rpc_dispatcher
962                .handle_km_status_update(published_status);
963
964            Ok(())
965        })
966        .await
967        .unwrap()?; // Propagate panics during key manager status update.
968
969        debug!(self.logger, "KM status update request complete");
970
971        Ok(Body::RuntimeKeyManagerStatusUpdateResponse {})
972    }
973
974    async fn handle_km_quote_policy_update(
975        &self,
976        state: State,
977        quote_policy: QuotePolicy,
978    ) -> Result<Body, Error> {
979        // Make sure to abort the process on panic during quote policy processing as that indicates
980        // a serious problem and should make sure to clean up the process.
981        let _guard = AbortOnPanic;
982
983        debug!(self.logger, "Received km quote policy update request");
984
985        // Verify and decode the policy.
986        let runtime_id = state.protocol.get_host_info().runtime_id;
987
988        tokio::task::spawn_blocking(move || -> Result<(), Error> {
989            let key_manager = state.policy_verifier.key_manager(&runtime_id)?;
990            let policy =
991                state
992                    .policy_verifier
993                    .verify_quote_policy(quote_policy, &key_manager, None)?;
994
995            // Dispatch the local RPC call.
996            state.rpc_dispatcher.handle_km_quote_policy_update(policy);
997
998            Ok(())
999        })
1000        .await
1001        .unwrap()?; // Propagate panics during key manager quote policy update.
1002
1003        debug!(self.logger, "KM quote policy update request complete");
1004
1005        Ok(Body::RuntimeKeyManagerQuotePolicyUpdateResponse {})
1006    }
1007}