oasis_core_runtime/
dispatcher.rs

1//! Runtime call dispatcher.
2use std::{
3    convert::TryInto,
4    sync::{Arc, Condvar, Mutex},
5    thread,
6};
7
8use anyhow::Result as AnyResult;
9use rustc_hex::ToHex;
10use slog::{debug, error, info, warn, Logger};
11use tokio::sync::mpsc;
12
13use crate::{
14    app::{self, METHOD_ROFL_GET_CONFIG},
15    attestation, cache,
16    common::{
17        crypto::{hash::Hash, signature::Signer},
18        logger::get_logger,
19        panic::AbortOnPanic,
20        sgx::QuotePolicy,
21    },
22    consensus::{
23        beacon::EpochTime,
24        roothash::{self, ComputeResultsHeader, Header, COMPUTE_RESULTS_HEADER_SIGNATURE_CONTEXT},
25        state::keymanager::Status as KeyManagerStatus,
26        verifier::Verifier,
27        LightBlock,
28    },
29    enclave_rpc::{
30        demux::Demux as RpcDemux,
31        dispatcher::Dispatcher as RpcDispatcher,
32        session::{self, SessionInfo},
33        types::{
34            Kind as RpcKind, Message as RpcMessage, Request as RpcRequest, Response as RpcResponse,
35        },
36        Context as RpcContext,
37    },
38    future::block_on,
39    identity::Identity,
40    policy::PolicyVerifier,
41    protocol::Protocol,
42    storage::mkvs::{sync::NoopReadSyncer, OverlayTree, Root, RootType},
43    transaction::{
44        dispatcher::{Dispatcher as TxnDispatcher, NoopDispatcher as TxnNoopDispatcher},
45        tree::Tree as TxnTree,
46        types::TxnBatch,
47        Context as TxnContext,
48    },
49    types::{Body, ComputedBatch, Error, ExecutionMode},
50};
51
52/// Maximum amount of requests that can be in the dispatcher queue.
53const BACKLOG_SIZE: usize = 1000;
54
55/// Maximum total number of EnclaveRPC sessions.
56const RPC_MAX_SESSIONS: usize = 1024;
57/// Maximum concurrent EnclaveRPC sessions per peer. In case more sessions are open, old sessions
58/// will be closed to make room for new sessions.
59const RPC_MAX_SESSIONS_PER_PEER: usize = 8;
60/// EnclaveRPC sessions without any processed frame for more than RPC_STALE_SESSION_TIMEOUT_SECS
61/// seconds can be closed to make room for new sessions.
62const RPC_STALE_SESSION_TIMEOUT_SECS: i64 = 10;
63
64/// Interface for dispatcher initializers.
65pub trait Initializer: Send + Sync {
66    /// Initializes the dispatcher(s).
67    fn init(self: Box<Self>, state: PreInitState<'_>) -> PostInitState;
68}
69
70impl<F> Initializer for F
71where
72    F: FnOnce(PreInitState<'_>) -> PostInitState + Send + Sync,
73{
74    fn init(self: Box<Self>, state: PreInitState<'_>) -> PostInitState {
75        self(state)
76    }
77}
78
79/// State available before initialization.
80pub struct PreInitState<'a> {
81    /// Protocol instance.
82    pub protocol: &'a Arc<Protocol>,
83    /// Runtime Attestation Key instance.
84    pub identity: &'a Arc<Identity>,
85    /// RPC demultiplexer instance.
86    pub rpc_demux: &'a mut RpcDemux,
87    /// RPC dispatcher instance.
88    pub rpc_dispatcher: &'a mut RpcDispatcher,
89    /// Consensus verifier instance.
90    pub consensus_verifier: &'a Arc<dyn Verifier>,
91}
92
93/// State returned by the initializer.
94#[derive(Default)]
95pub struct PostInitState {
96    /// Optional transaction dispatcher that should be used.
97    pub txn_dispatcher: Option<Box<dyn TxnDispatcher>>,
98    /// Optional ROFL application.
99    pub app: Option<Box<dyn app::App>>,
100}
101
102impl From<tokio::task::JoinError> for Error {
103    fn from(e: tokio::task::JoinError) -> Self {
104        Error::new(
105            "dispatcher",
106            1,
107            &format!("error while processing request: {e}"),
108        )
109    }
110}
111
112/// State related to dispatching a runtime transaction.
113struct TxDispatchState {
114    mode: ExecutionMode,
115    consensus_block: LightBlock,
116    consensus_verifier: Arc<dyn Verifier>,
117    header: Header,
118    epoch: EpochTime,
119    round_results: roothash::RoundResults,
120    max_messages: u32,
121    check_only: bool,
122}
123
124/// State provided by the protocol upon successful initialization.
125struct ProtocolState {
126    protocol: Arc<Protocol>,
127    consensus_verifier: Arc<dyn Verifier>,
128}
129
130/// State held by the dispatcher, shared between all async tasks.
131#[derive(Clone)]
132struct State {
133    protocol: Arc<Protocol>,
134    consensus_verifier: Arc<dyn Verifier>,
135    dispatcher: Arc<Dispatcher>,
136    app: Arc<dyn app::App>,
137    rpc_demux: Arc<RpcDemux>,
138    rpc_dispatcher: Arc<RpcDispatcher>,
139    txn_dispatcher: Arc<dyn TxnDispatcher>,
140    attestation_handler: attestation::Handler,
141    policy_verifier: Arc<PolicyVerifier>,
142    cache_set: cache::CacheSet,
143}
144
145#[derive(Debug)]
146enum Command {
147    Request(u64, Body),
148}
149
150/// Runtime call dispatcher.
151pub struct Dispatcher {
152    logger: Logger,
153    queue_tx: mpsc::Sender<Command>,
154    identity: Arc<Identity>,
155
156    state: Mutex<Option<ProtocolState>>,
157    state_cond: Condvar,
158
159    tokio_runtime: tokio::runtime::Handle,
160}
161
162impl Dispatcher {
163    /// Create a new runtime call dispatcher.
164    pub fn new(
165        tokio_runtime: tokio::runtime::Handle,
166        initializer: Box<dyn Initializer>,
167        identity: Arc<Identity>,
168    ) -> Arc<Self> {
169        let (tx, rx) = mpsc::channel(BACKLOG_SIZE);
170
171        let dispatcher = Arc::new(Dispatcher {
172            logger: get_logger("runtime/dispatcher"),
173            queue_tx: tx,
174            identity,
175            state: Mutex::new(None),
176            state_cond: Condvar::new(),
177            tokio_runtime,
178        });
179
180        // Spawn the dispatcher processing thread.
181        let d = dispatcher.clone();
182        thread::spawn(move || {
183            let _guard = AbortOnPanic;
184            d.run(initializer, rx);
185        });
186
187        dispatcher
188    }
189
190    /// Start the dispatcher.
191    pub fn start(&self, protocol: Arc<Protocol>, consensus_verifier: Box<dyn Verifier>) {
192        let consensus_verifier = Arc::from(consensus_verifier);
193        let mut s = self.state.lock().unwrap();
194        *s = Some(ProtocolState {
195            protocol,
196            consensus_verifier,
197        });
198        self.state_cond.notify_one();
199    }
200
201    /// Queue a new request to be dispatched.
202    pub fn queue_request(&self, id: u64, body: Body) -> AnyResult<()> {
203        self.queue_tx.blocking_send(Command::Request(id, body))?;
204        Ok(())
205    }
206
207    fn run(self: &Arc<Self>, initializer: Box<dyn Initializer>, mut rx: mpsc::Receiver<Command>) {
208        // Wait for the state to be available.
209        let ProtocolState {
210            protocol,
211            consensus_verifier,
212        } = {
213            let mut guard = self.state.lock().unwrap();
214            while guard.is_none() {
215                guard = self.state_cond.wait(guard).unwrap();
216            }
217
218            guard.take().unwrap()
219        };
220
221        // Ensure Tokio runtime is available during dispatcher initialization.
222        let _guard = self.tokio_runtime.enter();
223
224        // Create actual dispatchers for RPCs and transactions.
225        info!(self.logger, "Starting the runtime dispatcher");
226        let mut rpc_demux = RpcDemux::new(
227            session::Builder::default().local_identity(self.identity.clone()),
228            RPC_MAX_SESSIONS,
229            RPC_MAX_SESSIONS_PER_PEER,
230            RPC_STALE_SESSION_TIMEOUT_SECS,
231        );
232        let mut rpc_dispatcher = RpcDispatcher::default();
233        let pre_init_state = PreInitState {
234            protocol: &protocol,
235            identity: &self.identity,
236            rpc_demux: &mut rpc_demux,
237            rpc_dispatcher: &mut rpc_dispatcher,
238            consensus_verifier: &consensus_verifier,
239        };
240        let post_init_state = initializer.init(pre_init_state);
241        let txn_dispatcher = post_init_state
242            .txn_dispatcher
243            .unwrap_or_else(|| Box::<TxnNoopDispatcher>::default());
244        let mut app = post_init_state
245            .app
246            .unwrap_or_else(|| Box::new(app::NoopApp));
247
248        // Initialize the application.
249        if let Err(err) = app.on_init(protocol.clone()) {
250            error!(self.logger, "ROFL application initialization failed"; "err" => ?err);
251        }
252
253        let app: Arc<dyn app::App> = Arc::from(app);
254        let state = State {
255            protocol: protocol.clone(),
256            consensus_verifier: consensus_verifier.clone(),
257            dispatcher: self.clone(),
258            app: app.clone(),
259            rpc_demux: Arc::new(rpc_demux),
260            rpc_dispatcher: Arc::new(rpc_dispatcher),
261            txn_dispatcher: Arc::from(txn_dispatcher),
262            attestation_handler: attestation::Handler::new(
263                self.identity.clone(),
264                protocol.clone(),
265                consensus_verifier.clone(),
266                protocol.get_runtime_id(),
267                protocol.get_config().version,
268                app,
269            ),
270            policy_verifier: Arc::new(PolicyVerifier::new(consensus_verifier)),
271            cache_set: cache::CacheSet::new(protocol.clone()),
272        };
273
274        // Start the async message processing task.
275        self.tokio_runtime.block_on(async move {
276            while let Some(cmd) = rx.recv().await {
277                // Process received command.
278                match cmd {
279                    Command::Request(id, request) => {
280                        // Process request in its own task.
281                        let state = state.clone();
282
283                        tokio::spawn(async move {
284                            let protocol = state.protocol.clone();
285                            let dispatcher = state.dispatcher.clone();
286                            let result = dispatcher.handle_request(state, request).await;
287
288                            // Send response.
289                            let response = match result {
290                                Ok(body) => body,
291                                Err(error) => Body::Error(error),
292                            };
293                            protocol.send_response(id, response).unwrap();
294                        });
295                    }
296                }
297            }
298        });
299
300        info!(self.logger, "Runtime call dispatcher is terminating");
301    }
302
303    async fn handle_request(self: &Arc<Self>, state: State, request: Body) -> Result<Body, Error> {
304        match request {
305            // Attestation-related requests.
306            Body::RuntimeCapabilityTEERakInitRequest { .. }
307            | Body::RuntimeCapabilityTEERakReportRequest {}
308            | Body::RuntimeCapabilityTEERakAvrRequest { .. }
309            | Body::RuntimeCapabilityTEERakQuoteRequest { .. }
310            | Body::RuntimeCapabilityTEEUpdateEndorsementRequest { .. } => {
311                Ok(state.attestation_handler.handle(request).await?)
312            }
313
314            // RPC requests.
315            Body::RuntimeRPCCallRequest {
316                request,
317                kind,
318                peer_id,
319            } => {
320                debug!(self.logger, "Received RPC call request";
321                    "kind" => ?kind,
322                    "peer_id" => peer_id.to_hex::<String>(),
323                );
324
325                match kind {
326                    RpcKind::NoiseSession => {
327                        self.dispatch_secure_rpc(state, request, peer_id).await
328                    }
329                    RpcKind::InsecureQuery => self.dispatch_insecure_rpc(state, request).await,
330                    RpcKind::LocalQuery => self.dispatch_local_rpc(state, request).await,
331                }
332            }
333            Body::RuntimeLocalRPCCallRequest { request } => {
334                debug!(self.logger, "Received RPC call request";
335                    "kind" => ?RpcKind::LocalQuery,
336                );
337
338                self.dispatch_local_rpc(state, request).await
339            }
340
341            // RONL.
342            // TODO: Refactor this so it can be part of an "app".
343            Body::RuntimeExecuteTxBatchRequest {
344                mode,
345                consensus_block,
346                round_results,
347                io_root,
348                inputs,
349                in_msgs,
350                block,
351                epoch,
352                max_messages,
353            } => {
354                // Transaction execution.
355                self.dispatch_txn(
356                    state.cache_set,
357                    &state.txn_dispatcher,
358                    &state.protocol,
359                    io_root,
360                    inputs.unwrap_or_default(),
361                    in_msgs,
362                    TxDispatchState {
363                        mode,
364                        consensus_block,
365                        consensus_verifier: state.consensus_verifier,
366                        header: block.header,
367                        epoch,
368                        round_results,
369                        max_messages,
370                        check_only: false,
371                    },
372                )
373                .await
374            }
375            Body::RuntimeCheckTxBatchRequest {
376                consensus_block,
377                inputs,
378                block,
379                epoch,
380                max_messages,
381            } => {
382                // Transaction check.
383                self.dispatch_txn(
384                    state.cache_set,
385                    &state.txn_dispatcher,
386                    &state.protocol,
387                    Hash::default(),
388                    inputs,
389                    vec![],
390                    TxDispatchState {
391                        mode: ExecutionMode::Execute,
392                        consensus_block,
393                        consensus_verifier: state.consensus_verifier,
394                        header: block.header,
395                        epoch,
396                        round_results: Default::default(),
397                        max_messages,
398                        check_only: true,
399                    },
400                )
401                .await
402            }
403            Body::RuntimeQueryRequest {
404                consensus_block,
405                header,
406                epoch,
407                max_messages,
408                method,
409                args,
410            } if state.txn_dispatcher.is_supported() => {
411                // Query.
412                self.dispatch_query(
413                    state.cache_set,
414                    &state.txn_dispatcher,
415                    &state.protocol,
416                    method,
417                    args,
418                    TxDispatchState {
419                        mode: ExecutionMode::Execute,
420                        consensus_block,
421                        consensus_verifier: state.consensus_verifier,
422                        header,
423                        epoch,
424                        round_results: Default::default(),
425                        max_messages,
426                        check_only: true,
427                    },
428                )
429                .await
430            }
431
432            // ROFL.
433            Body::RuntimeQueryRequest { method, args, .. } if state.app.is_rofl() => {
434                match method.as_str() {
435                    METHOD_ROFL_GET_CONFIG => Ok(cbor::to_vec(state.app.get_config())),
436                    _ => state.app.query(&method, args).await,
437                }
438                .map(|data| Body::RuntimeQueryResponse { data })
439                .map_err(Into::into)
440            }
441            Body::RuntimeNotifyRequest {
442                runtime_block,
443                runtime_event,
444            } => {
445                if let Some(runtime_block) = runtime_block {
446                    if let Err(err) = state.app.on_runtime_block(&runtime_block).await {
447                        error!(self.logger, "Application block notification failed"; "err" => ?err);
448                    }
449                }
450                if let Some(runtime_event) = runtime_event {
451                    if let Err(err) = state
452                        .app
453                        .on_runtime_event(&runtime_event.block, &runtime_event.tags)
454                        .await
455                    {
456                        error!(self.logger, "Application event notification failed"; "err" => ?err);
457                    }
458                }
459
460                Ok(Body::Empty {})
461            }
462
463            // Other requests.
464            Body::RuntimeKeyManagerStatusUpdateRequest { status } => {
465                // Key manager status update local RPC call.
466                self.handle_km_status_update(state, status).await
467            }
468            Body::RuntimeKeyManagerQuotePolicyUpdateRequest {
469                policy: quote_policy,
470            } => {
471                // Key manager quote policy update local RPC call.
472                self.handle_km_quote_policy_update(state, quote_policy)
473                    .await
474            }
475            Body::RuntimeConsensusSyncRequest { height } => state
476                .consensus_verifier
477                .sync(height)
478                .await
479                .map_err(Into::into)
480                .map(|_| Body::RuntimeConsensusSyncResponse {}),
481
482            _ => {
483                error!(self.logger, "Unsupported request type");
484                Err(Error::new("dispatcher", 1, "Unsupported request type"))
485            }
486        }
487    }
488
489    #[allow(clippy::too_many_arguments)]
490    async fn dispatch_query(
491        &self,
492        cache_set: cache::CacheSet,
493        txn_dispatcher: &Arc<dyn TxnDispatcher>,
494        protocol: &Arc<Protocol>,
495        method: String,
496        args: Vec<u8>,
497        state: TxDispatchState,
498    ) -> Result<Body, Error> {
499        debug!(self.logger, "Received query request";
500            "method" => &method,
501            "state_root" => ?state.header.state_root,
502            "round" => ?state.header.round,
503        );
504
505        // Verify that the runtime ID matches the block's namespace. This is a protocol violation
506        // as the compute node should never change the runtime ID.
507        if state.header.namespace != protocol.get_runtime_id() {
508            return Err(Error::new(
509                "dispatcher",
510                1,
511                &format!(
512                    "block namespace does not match runtime id (namespace: {:?} runtime ID: {:?})",
513                    state.header.namespace,
514                    protocol.get_runtime_id(),
515                ),
516            ));
517        }
518
519        let protocol = protocol.clone();
520        let txn_dispatcher = txn_dispatcher.clone();
521
522        // For queries we don't do any consensus layer integrity verification by default and it
523        // is up to the runtime to decide whether this is critical on a query-by-query basis.
524        let consensus_state = state
525            .consensus_verifier
526            .unverified_state(state.consensus_block.clone())
527            .await?;
528
529        tokio::task::spawn_blocking(move || {
530            let cache = cache_set.query(Root {
531                namespace: state.header.namespace,
532                version: state.header.round,
533                root_type: RootType::State,
534                hash: state.header.state_root,
535            });
536            let mut cache = cache.borrow_mut();
537            let mut overlay = OverlayTree::new(cache.tree_mut());
538
539            let txn_ctx = TxnContext::new(
540                protocol,
541                &state.consensus_block,
542                consensus_state,
543                &mut overlay,
544                &state.header,
545                state.epoch,
546                &state.round_results,
547                state.max_messages,
548                state.check_only,
549            );
550
551            txn_dispatcher
552                .query(txn_ctx, &method, args)
553                .map(|data| Body::RuntimeQueryResponse { data })
554        })
555        .await?
556    }
557
558    fn txn_check_batch(
559        &self,
560        protocol: Arc<Protocol>,
561        cache_set: cache::CacheSet,
562        txn_dispatcher: &dyn TxnDispatcher,
563        inputs: TxnBatch,
564        state: TxDispatchState,
565    ) -> Result<Body, Error> {
566        // For check-only we don't do any consensus layer integrity verification.
567        // TODO: Make this async.
568        let consensus_state = block_on(
569            state
570                .consensus_verifier
571                .unverified_state(state.consensus_block.clone()),
572        )?;
573
574        let mut cache = cache_set.check(Root {
575            namespace: state.header.namespace,
576            version: state.header.round,
577            root_type: RootType::State,
578            hash: state.header.state_root,
579        });
580        let mut overlay = OverlayTree::new(cache.tree_mut());
581
582        let txn_ctx = TxnContext::new(
583            protocol.clone(),
584            &state.consensus_block,
585            consensus_state,
586            &mut overlay,
587            &state.header,
588            state.epoch,
589            &state.round_results,
590            state.max_messages,
591            state.check_only,
592        );
593        let results = txn_dispatcher.check_batch(txn_ctx, &inputs);
594
595        if protocol.get_config().persist_check_tx_state {
596            // Commit results to in-memory tree so they persist for subsequent batches that are
597            // based on the same block.
598            let _ = overlay.commit().unwrap();
599        }
600
601        debug!(self.logger, "Transaction batch check complete");
602
603        results.map(|results| Body::RuntimeCheckTxBatchResponse { results })
604    }
605
606    #[allow(clippy::too_many_arguments)]
607    fn txn_execute_batch(
608        &self,
609        protocol: Arc<Protocol>,
610        cache_set: cache::CacheSet,
611        txn_dispatcher: &dyn TxnDispatcher,
612        mut inputs: TxnBatch,
613        in_msgs: Vec<roothash::IncomingMessage>,
614        io_root: Hash,
615        state: TxDispatchState,
616    ) -> Result<Body, Error> {
617        // Verify consensus state and runtime state root integrity before execution.
618        // TODO: Make this async.
619        let consensus_state = block_on(state.consensus_verifier.verify(
620            state.consensus_block.clone(),
621            state.header.clone(),
622            state.epoch,
623        ))?;
624        // Ensure the runtime is still ready to process requests.
625        protocol.ensure_initialized()?;
626
627        let header = &state.header;
628
629        let mut cache = cache_set.execute(Root {
630            namespace: state.header.namespace,
631            version: state.header.round,
632            root_type: RootType::State,
633            hash: state.header.state_root,
634        });
635        let mut overlay = OverlayTree::new(cache.tree_mut());
636
637        let txn_ctx = TxnContext::new(
638            protocol,
639            &state.consensus_block,
640            consensus_state,
641            &mut overlay,
642            header,
643            state.epoch,
644            &state.round_results,
645            state.max_messages,
646            state.check_only,
647        );
648
649        // Perform execution based on the passed mode.
650        let mut results = match state.mode {
651            ExecutionMode::Execute => {
652                // Just execute the batch.
653                txn_dispatcher.execute_batch(txn_ctx, &inputs, &in_msgs)?
654            }
655            ExecutionMode::Schedule => {
656                // Allow the runtime to arbitrarily update the batch.
657                txn_dispatcher.schedule_and_execute_batch(txn_ctx, &mut inputs, &in_msgs)?
658            }
659        };
660
661        // Finalize state.
662        let (state_write_log, new_state_root) = overlay
663            .commit_both(header.namespace, header.round + 1)
664            .expect("state commit must succeed");
665
666        txn_dispatcher.finalize(new_state_root);
667        cache.commit(header.round + 1, new_state_root);
668
669        // Generate I/O root. Since we already fetched the inputs we avoid the need
670        // to fetch them again by generating the previous I/O tree (generated by the
671        // transaction scheduler) from the inputs.
672        let mut txn_tree = TxnTree::new(
673            Box::new(NoopReadSyncer),
674            Root {
675                namespace: header.namespace,
676                version: header.round + 1,
677                root_type: RootType::IO,
678                hash: Hash::empty_hash(),
679            },
680        );
681        let mut hashes = Vec::new();
682        for (batch_order, input) in inputs.drain(..).enumerate() {
683            hashes.push(Hash::digest_bytes(&input));
684            txn_tree
685                .add_input(input, batch_order.try_into().unwrap())
686                .expect("add transaction must succeed");
687        }
688
689        let (input_write_log, input_io_root) = txn_tree.commit().expect("io commit must succeed");
690
691        assert!(
692            state.mode != ExecutionMode::Execute || input_io_root == io_root,
693            "dispatcher: I/O root inconsistent with inputs (expected: {:?} got: {:?})",
694            io_root,
695            input_io_root
696        );
697
698        for (tx_hash, result) in hashes.iter().zip(results.results.drain(..)) {
699            txn_tree
700                .add_output(*tx_hash, result.output, result.tags)
701                .expect("add transaction must succeed");
702        }
703
704        txn_tree
705            .add_block_tags(results.block_tags)
706            .expect("adding block tags must succeed");
707
708        let (io_write_log, io_root) = txn_tree.commit().expect("io commit must succeed");
709
710        let header = ComputeResultsHeader {
711            round: header.round + 1,
712            previous_hash: header.encoded_hash(),
713            io_root: Some(io_root),
714            state_root: Some(new_state_root),
715            messages_hash: Some(roothash::Message::messages_hash(&results.messages)),
716            in_msgs_hash: Some(roothash::IncomingMessage::in_messages_hash(
717                &in_msgs[..results.in_msgs_count],
718            )),
719            in_msgs_count: results.in_msgs_count.try_into().unwrap(),
720        };
721
722        debug!(self.logger, "Transaction batch execution complete";
723            "previous_hash" => ?header.previous_hash,
724            "io_root" => ?header.io_root,
725            "state_root" => ?header.state_root,
726            "messages_hash" => ?header.messages_hash,
727            "in_msgs_hash" => ?header.in_msgs_hash,
728        );
729
730        let rak_sig = self
731            .identity
732            .sign(
733                COMPUTE_RESULTS_HEADER_SIGNATURE_CONTEXT,
734                &cbor::to_vec(header.clone()),
735            )
736            .unwrap();
737
738        Ok(Body::RuntimeExecuteTxBatchResponse {
739            batch: ComputedBatch {
740                header,
741                io_write_log,
742                state_write_log,
743                rak_sig,
744                messages: results.messages,
745            },
746            tx_hashes: hashes,
747            tx_reject_hashes: results.tx_reject_hashes,
748            tx_input_root: input_io_root,
749            tx_input_write_log: input_write_log,
750        })
751    }
752
753    #[allow(clippy::too_many_arguments)]
754    async fn dispatch_txn(
755        self: &Arc<Self>,
756        cache_set: cache::CacheSet,
757        txn_dispatcher: &Arc<dyn TxnDispatcher>,
758        protocol: &Arc<Protocol>,
759        io_root: Hash,
760        inputs: TxnBatch,
761        in_msgs: Vec<roothash::IncomingMessage>,
762        state: TxDispatchState,
763    ) -> Result<Body, Error> {
764        // Make sure to abort the process on panic during transaction processing as that indicates
765        // a serious problem and should make sure to clean up the process.
766        let _guard = AbortOnPanic;
767
768        debug!(self.logger, "Received transaction batch request";
769            "state_root" => ?state.header.state_root,
770            "round" => state.header.round + 1,
771            "round_results" => ?state.round_results,
772            "tx_count" => inputs.len(),
773            "in_msg_count" => in_msgs.len(),
774            "check_only" => state.check_only,
775        );
776
777        // Verify that the runtime ID matches the block's namespace. This is a protocol violation
778        // as the compute node should never change the runtime ID.
779        assert!(
780            state.header.namespace == protocol.get_runtime_id(),
781            "block namespace does not match runtime id (namespace: {:?} runtime ID: {:?})",
782            state.header.namespace,
783            protocol.get_runtime_id(),
784        );
785
786        let protocol = protocol.clone();
787        let dispatcher = self.clone();
788        let txn_dispatcher = txn_dispatcher.clone();
789
790        tokio::task::spawn_blocking(move || {
791            if state.check_only {
792                dispatcher.txn_check_batch(protocol, cache_set, &txn_dispatcher, inputs, state)
793            } else {
794                dispatcher.txn_execute_batch(
795                    protocol,
796                    cache_set,
797                    &txn_dispatcher,
798                    inputs,
799                    in_msgs,
800                    io_root,
801                    state,
802                )
803            }
804        })
805        .await
806        .unwrap() // Propagate panics during transaction dispatch.
807    }
808
809    async fn dispatch_secure_rpc(
810        &self,
811        state: State,
812        request: Vec<u8>,
813        peer_id: Vec<u8>,
814    ) -> Result<Body, Error> {
815        // Make sure to abort the process on panic during RPC processing as that indicates a
816        // serious problem and should make sure to clean up the process.
817        let _guard = AbortOnPanic;
818
819        // Process frame.
820        let mut buffer = vec![];
821        let (mut session, message) = state
822            .rpc_demux
823            .process_frame(peer_id, request, &mut buffer)
824            .await?;
825
826        if let Some(message) = message {
827            // Dispatch request.
828            assert!(
829                buffer.is_empty(),
830                "must have no handshake data in transport mode"
831            );
832
833            match message {
834                RpcMessage::Request(req) => {
835                    // Request, dispatch.
836                    let response = self
837                        .dispatch_rpc(req, RpcKind::NoiseSession, session.info(), &state)
838                        .await?;
839                    let response = RpcMessage::Response(response);
840
841                    // Note: MKVS commit is omitted, this MUST be global side-effect free.
842
843                    debug!(self.logger, "RPC call dispatch complete";
844                        "kind" => ?RpcKind::NoiseSession,
845                    );
846
847                    let mut buffer = vec![];
848                    session
849                        .write_message(response, &mut buffer)
850                        .map_err(|err| {
851                            error!(self.logger, "Error while writing response"; "err" => %err);
852                            Error::new("rhp/dispatcher", 1, &format!("{err}"))
853                        })
854                        .map(|_| Body::RuntimeRPCCallResponse { response: buffer })
855                }
856                RpcMessage::Close => {
857                    // Session close.
858                    let mut buffer = vec![];
859                    state
860                        .rpc_demux
861                        .close(session, &mut buffer)
862                        .map_err(|err| {
863                            error!(self.logger, "Error while closing session"; "err" => %err);
864                            Error::new("rhp/dispatcher", 1, &format!("{err}"))
865                        })
866                        .map(|_| Body::RuntimeRPCCallResponse { response: buffer })
867                }
868                msg => {
869                    warn!(self.logger, "Ignoring invalid RPC message type"; "msg" => ?msg);
870                    Err(Error::new("rhp/dispatcher", 1, "invalid RPC message type"))
871                }
872            }
873        } else {
874            // Send back any handshake frames.
875            Ok(Body::RuntimeRPCCallResponse { response: buffer })
876        }
877    }
878
879    async fn dispatch_insecure_rpc(&self, state: State, request: Vec<u8>) -> Result<Body, Error> {
880        // Make sure to abort the process on panic during RPC processing as that indicates a
881        // serious problem and should make sure to clean up the process.
882        let _guard = AbortOnPanic;
883
884        let request: RpcRequest = cbor::from_slice(&request)
885            .map_err(|_| Error::new("rhp/dispatcher", 1, "malformed request"))?;
886
887        // Request, dispatch.
888        let response = self
889            .dispatch_rpc(request, RpcKind::InsecureQuery, None, &state)
890            .await?;
891        let response = cbor::to_vec(response);
892
893        // Note: MKVS commit is omitted, this MUST be global side-effect free.
894
895        debug!(self.logger, "RPC call dispatch complete";
896            "kind" => ?RpcKind::InsecureQuery,
897        );
898
899        Ok(Body::RuntimeRPCCallResponse { response })
900    }
901
902    async fn dispatch_local_rpc(&self, state: State, request: Vec<u8>) -> Result<Body, Error> {
903        // Make sure to abort the process on panic during local RPC processing as that indicates a
904        // serious problem and should make sure to clean up the process.
905        let _guard = AbortOnPanic;
906
907        let request = cbor::from_slice(&request)
908            .map_err(|_| Error::new("rhp/dispatcher", 1, "malformed request"))?;
909
910        // Request, dispatch.
911        let response = self
912            .dispatch_rpc(request, RpcKind::LocalQuery, None, &state)
913            .await?;
914        let response = RpcMessage::Response(response);
915        let response = cbor::to_vec(response);
916
917        debug!(self.logger, "RPC call dispatch complete";
918            "kind" => ?RpcKind::LocalQuery,
919        );
920
921        Ok(Body::RuntimeLocalRPCCallResponse { response })
922    }
923
924    async fn dispatch_rpc(
925        &self,
926        request: RpcRequest,
927        kind: RpcKind,
928        session_info: Option<Arc<SessionInfo>>,
929        state: &State,
930    ) -> Result<RpcResponse, Error> {
931        let rpc_dispatcher = state.rpc_dispatcher.clone();
932
933        let response = tokio::task::spawn_blocking(move || {
934            let rpc_ctx = RpcContext::new(session_info);
935            rpc_dispatcher.dispatch(rpc_ctx, request, kind)
936        })
937        .await?;
938
939        Ok(response)
940    }
941
942    async fn handle_km_status_update(
943        &self,
944        state: State,
945        status: KeyManagerStatus,
946    ) -> Result<Body, Error> {
947        // Make sure to abort the process on panic during policy processing as that indicates a
948        // serious problem and should make sure to clean up the process.
949        let _guard = AbortOnPanic;
950
951        debug!(self.logger, "Received km status update request");
952
953        // Verify and decode the status.
954        let runtime_id = state.protocol.get_host_info().runtime_id;
955
956        tokio::task::spawn_blocking(move || -> Result<(), Error> {
957            let key_manager = state.policy_verifier.key_manager(&runtime_id)?;
958            let published_status = state
959                .policy_verifier
960                .verify_key_manager_status(status, key_manager)?;
961
962            // Dispatch the local RPC call.
963            state
964                .rpc_dispatcher
965                .handle_km_status_update(published_status);
966
967            Ok(())
968        })
969        .await
970        .unwrap()?; // Propagate panics during key manager status update.
971
972        debug!(self.logger, "KM status update request complete");
973
974        Ok(Body::RuntimeKeyManagerStatusUpdateResponse {})
975    }
976
977    async fn handle_km_quote_policy_update(
978        &self,
979        state: State,
980        quote_policy: QuotePolicy,
981    ) -> Result<Body, Error> {
982        // Make sure to abort the process on panic during quote policy processing as that indicates
983        // a serious problem and should make sure to clean up the process.
984        let _guard = AbortOnPanic;
985
986        debug!(self.logger, "Received km quote policy update request");
987
988        // Verify and decode the policy.
989        let runtime_id = state.protocol.get_host_info().runtime_id;
990
991        tokio::task::spawn_blocking(move || -> Result<(), Error> {
992            let key_manager = state.policy_verifier.key_manager(&runtime_id)?;
993            let policy =
994                state
995                    .policy_verifier
996                    .verify_quote_policy(quote_policy, &key_manager, None)?;
997
998            // Dispatch the local RPC call.
999            state.rpc_dispatcher.handle_km_quote_policy_update(policy);
1000
1001            Ok(())
1002        })
1003        .await
1004        .unwrap()?; // Propagate panics during key manager quote policy update.
1005
1006        debug!(self.logger, "KM quote policy update request complete");
1007
1008        Ok(Body::RuntimeKeyManagerQuotePolicyUpdateResponse {})
1009    }
1010}