1use std::{
3 convert::TryInto,
4 sync::{Arc, Condvar, Mutex},
5 thread,
6};
7
8use anyhow::Result as AnyResult;
9use rustc_hex::ToHex;
10use slog::{debug, error, info, warn, Logger};
11use tokio::sync::mpsc;
12
13use crate::{
14 app, attestation, cache,
15 common::{
16 crypto::{hash::Hash, signature::Signer},
17 logger::get_logger,
18 panic::AbortOnPanic,
19 sgx::QuotePolicy,
20 },
21 consensus::{
22 beacon::EpochTime,
23 roothash::{self, ComputeResultsHeader, Header, COMPUTE_RESULTS_HEADER_SIGNATURE_CONTEXT},
24 state::keymanager::Status as KeyManagerStatus,
25 verifier::Verifier,
26 LightBlock,
27 },
28 enclave_rpc::{
29 demux::Demux as RpcDemux,
30 dispatcher::Dispatcher as RpcDispatcher,
31 session::{self, SessionInfo},
32 types::{
33 Kind as RpcKind, Message as RpcMessage, Request as RpcRequest, Response as RpcResponse,
34 },
35 Context as RpcContext,
36 },
37 future::block_on,
38 identity::Identity,
39 policy::PolicyVerifier,
40 protocol::Protocol,
41 storage::mkvs::{sync::NoopReadSyncer, OverlayTree, Root, RootType},
42 transaction::{
43 dispatcher::{Dispatcher as TxnDispatcher, NoopDispatcher as TxnNoopDispatcher},
44 tree::Tree as TxnTree,
45 types::TxnBatch,
46 Context as TxnContext,
47 },
48 types::{Body, ComputedBatch, Error, ExecutionMode},
49};
50
51const BACKLOG_SIZE: usize = 1000;
53
54const RPC_MAX_SESSIONS: usize = 1024;
56const RPC_MAX_SESSIONS_PER_PEER: usize = 8;
59const RPC_STALE_SESSION_TIMEOUT_SECS: i64 = 10;
62
63pub trait Initializer: Send + Sync {
65 fn init(self: Box<Self>, state: PreInitState<'_>) -> PostInitState;
67}
68
69impl<F> Initializer for F
70where
71 F: FnOnce(PreInitState<'_>) -> PostInitState + Send + Sync,
72{
73 fn init(self: Box<Self>, state: PreInitState<'_>) -> PostInitState {
74 self(state)
75 }
76}
77
78pub struct PreInitState<'a> {
80 pub protocol: &'a Arc<Protocol>,
82 pub identity: &'a Arc<Identity>,
84 pub rpc_demux: &'a mut RpcDemux,
86 pub rpc_dispatcher: &'a mut RpcDispatcher,
88 pub consensus_verifier: &'a Arc<dyn Verifier>,
90}
91
92#[derive(Default)]
94pub struct PostInitState {
95 pub txn_dispatcher: Option<Box<dyn TxnDispatcher>>,
97 pub app: Option<Box<dyn app::App>>,
99}
100
101impl From<tokio::task::JoinError> for Error {
102 fn from(e: tokio::task::JoinError) -> Self {
103 Error::new(
104 "dispatcher",
105 1,
106 &format!("error while processing request: {e}"),
107 )
108 }
109}
110
111struct TxDispatchState {
113 mode: ExecutionMode,
114 consensus_block: LightBlock,
115 consensus_verifier: Arc<dyn Verifier>,
116 header: Header,
117 epoch: EpochTime,
118 round_results: roothash::RoundResults,
119 max_messages: u32,
120 check_only: bool,
121}
122
123struct ProtocolState {
125 protocol: Arc<Protocol>,
126 consensus_verifier: Arc<dyn Verifier>,
127}
128
129#[derive(Clone)]
131struct State {
132 protocol: Arc<Protocol>,
133 consensus_verifier: Arc<dyn Verifier>,
134 dispatcher: Arc<Dispatcher>,
135 app: Arc<dyn app::App>,
136 rpc_demux: Arc<RpcDemux>,
137 rpc_dispatcher: Arc<RpcDispatcher>,
138 txn_dispatcher: Arc<dyn TxnDispatcher>,
139 attestation_handler: attestation::Handler,
140 policy_verifier: Arc<PolicyVerifier>,
141 cache_set: cache::CacheSet,
142}
143
144#[derive(Debug)]
145enum Command {
146 Request(u64, Body),
147}
148
149pub struct Dispatcher {
151 logger: Logger,
152 queue_tx: mpsc::Sender<Command>,
153 identity: Arc<Identity>,
154
155 state: Mutex<Option<ProtocolState>>,
156 state_cond: Condvar,
157
158 tokio_runtime: tokio::runtime::Handle,
159}
160
161impl Dispatcher {
162 pub fn new(
164 tokio_runtime: tokio::runtime::Handle,
165 initializer: Box<dyn Initializer>,
166 identity: Arc<Identity>,
167 ) -> Arc<Self> {
168 let (tx, rx) = mpsc::channel(BACKLOG_SIZE);
169
170 let dispatcher = Arc::new(Dispatcher {
171 logger: get_logger("runtime/dispatcher"),
172 queue_tx: tx,
173 identity,
174 state: Mutex::new(None),
175 state_cond: Condvar::new(),
176 tokio_runtime,
177 });
178
179 let d = dispatcher.clone();
181 thread::spawn(move || {
182 let _guard = AbortOnPanic;
183 d.run(initializer, rx);
184 });
185
186 dispatcher
187 }
188
189 pub fn start(&self, protocol: Arc<Protocol>, consensus_verifier: Box<dyn Verifier>) {
191 let consensus_verifier = Arc::from(consensus_verifier);
192 let mut s = self.state.lock().unwrap();
193 *s = Some(ProtocolState {
194 protocol,
195 consensus_verifier,
196 });
197 self.state_cond.notify_one();
198 }
199
200 pub fn queue_request(&self, id: u64, body: Body) -> AnyResult<()> {
202 self.queue_tx.blocking_send(Command::Request(id, body))?;
203 Ok(())
204 }
205
206 fn run(self: &Arc<Self>, initializer: Box<dyn Initializer>, mut rx: mpsc::Receiver<Command>) {
207 let ProtocolState {
209 protocol,
210 consensus_verifier,
211 } = {
212 let mut guard = self.state.lock().unwrap();
213 while guard.is_none() {
214 guard = self.state_cond.wait(guard).unwrap();
215 }
216
217 guard.take().unwrap()
218 };
219
220 let _guard = self.tokio_runtime.enter();
222
223 info!(self.logger, "Starting the runtime dispatcher");
225 let mut rpc_demux = RpcDemux::new(
226 session::Builder::default().local_identity(self.identity.clone()),
227 RPC_MAX_SESSIONS,
228 RPC_MAX_SESSIONS_PER_PEER,
229 RPC_STALE_SESSION_TIMEOUT_SECS,
230 );
231 let mut rpc_dispatcher = RpcDispatcher::default();
232 let pre_init_state = PreInitState {
233 protocol: &protocol,
234 identity: &self.identity,
235 rpc_demux: &mut rpc_demux,
236 rpc_dispatcher: &mut rpc_dispatcher,
237 consensus_verifier: &consensus_verifier,
238 };
239 let post_init_state = initializer.init(pre_init_state);
240 let txn_dispatcher = post_init_state
241 .txn_dispatcher
242 .unwrap_or_else(|| Box::<TxnNoopDispatcher>::default());
243 let mut app = post_init_state
244 .app
245 .unwrap_or_else(|| Box::new(app::NoopApp));
246
247 if let Err(err) = app.on_init(protocol.clone()) {
249 error!(self.logger, "ROFL application initialization failed"; "err" => ?err);
250 }
251
252 let app: Arc<dyn app::App> = Arc::from(app);
253 let state = State {
254 protocol: protocol.clone(),
255 consensus_verifier: consensus_verifier.clone(),
256 dispatcher: self.clone(),
257 app: app.clone(),
258 rpc_demux: Arc::new(rpc_demux),
259 rpc_dispatcher: Arc::new(rpc_dispatcher),
260 txn_dispatcher: Arc::from(txn_dispatcher),
261 attestation_handler: attestation::Handler::new(
262 self.identity.clone(),
263 protocol.clone(),
264 consensus_verifier.clone(),
265 protocol.get_runtime_id(),
266 protocol.get_config().version,
267 app,
268 ),
269 policy_verifier: Arc::new(PolicyVerifier::new(consensus_verifier)),
270 cache_set: cache::CacheSet::new(protocol.clone()),
271 };
272
273 self.tokio_runtime.block_on(async move {
275 while let Some(cmd) = rx.recv().await {
276 match cmd {
278 Command::Request(id, request) => {
279 let state = state.clone();
281
282 tokio::spawn(async move {
283 let protocol = state.protocol.clone();
284 let dispatcher = state.dispatcher.clone();
285 let result = dispatcher.handle_request(state, request).await;
286
287 let response = match result {
289 Ok(body) => body,
290 Err(error) => Body::Error(error),
291 };
292 protocol.send_response(id, response).unwrap();
293 });
294 }
295 }
296 }
297 });
298
299 info!(self.logger, "Runtime call dispatcher is terminating");
300 }
301
302 async fn handle_request(self: &Arc<Self>, state: State, request: Body) -> Result<Body, Error> {
303 match request {
304 Body::RuntimeCapabilityTEERakInitRequest { .. }
306 | Body::RuntimeCapabilityTEERakReportRequest {}
307 | Body::RuntimeCapabilityTEERakAvrRequest { .. }
308 | Body::RuntimeCapabilityTEERakQuoteRequest { .. }
309 | Body::RuntimeCapabilityTEEUpdateEndorsementRequest { .. } => {
310 Ok(state.attestation_handler.handle(request).await?)
311 }
312
313 Body::RuntimeRPCCallRequest {
315 request,
316 kind,
317 peer_id,
318 } => {
319 debug!(self.logger, "Received RPC call request";
320 "kind" => ?kind,
321 "peer_id" => peer_id.to_hex::<String>(),
322 );
323
324 match kind {
325 RpcKind::NoiseSession => {
326 self.dispatch_secure_rpc(state, request, peer_id).await
327 }
328 RpcKind::InsecureQuery => self.dispatch_insecure_rpc(state, request).await,
329 RpcKind::LocalQuery => self.dispatch_local_rpc(state, request).await,
330 }
331 }
332 Body::RuntimeLocalRPCCallRequest { request } => {
333 debug!(self.logger, "Received RPC call request";
334 "kind" => ?RpcKind::LocalQuery,
335 );
336
337 self.dispatch_local_rpc(state, request).await
338 }
339
340 Body::RuntimeExecuteTxBatchRequest {
343 mode,
344 consensus_block,
345 round_results,
346 io_root,
347 inputs,
348 in_msgs,
349 block,
350 epoch,
351 max_messages,
352 } => {
353 self.dispatch_txn(
355 state.cache_set,
356 &state.txn_dispatcher,
357 &state.protocol,
358 io_root,
359 inputs.unwrap_or_default(),
360 in_msgs,
361 TxDispatchState {
362 mode,
363 consensus_block,
364 consensus_verifier: state.consensus_verifier,
365 header: block.header,
366 epoch,
367 round_results,
368 max_messages,
369 check_only: false,
370 },
371 )
372 .await
373 }
374 Body::RuntimeCheckTxBatchRequest {
375 consensus_block,
376 inputs,
377 block,
378 epoch,
379 max_messages,
380 } => {
381 self.dispatch_txn(
383 state.cache_set,
384 &state.txn_dispatcher,
385 &state.protocol,
386 Hash::default(),
387 inputs,
388 vec![],
389 TxDispatchState {
390 mode: ExecutionMode::Execute,
391 consensus_block,
392 consensus_verifier: state.consensus_verifier,
393 header: block.header,
394 epoch,
395 round_results: Default::default(),
396 max_messages,
397 check_only: true,
398 },
399 )
400 .await
401 }
402 Body::RuntimeQueryRequest {
403 consensus_block,
404 header,
405 epoch,
406 max_messages,
407 method,
408 args,
409 } if state.txn_dispatcher.is_supported() => {
410 self.dispatch_query(
412 state.cache_set,
413 &state.txn_dispatcher,
414 &state.protocol,
415 method,
416 args,
417 TxDispatchState {
418 mode: ExecutionMode::Execute,
419 consensus_block,
420 consensus_verifier: state.consensus_verifier,
421 header,
422 epoch,
423 round_results: Default::default(),
424 max_messages,
425 check_only: true,
426 },
427 )
428 .await
429 }
430
431 Body::RuntimeQueryRequest { method, args, .. } if state.app.is_rofl() => state
433 .app
434 .query(&method, args)
435 .await
436 .map(|data| Body::RuntimeQueryResponse { data })
437 .map_err(Into::into),
438 Body::RuntimeNotifyRequest {
439 runtime_block,
440 runtime_event,
441 } => {
442 if let Some(runtime_block) = runtime_block {
443 if let Err(err) = state.app.on_runtime_block(&runtime_block).await {
444 error!(self.logger, "Application block notification failed"; "err" => ?err);
445 }
446 }
447 if let Some(runtime_event) = runtime_event {
448 if let Err(err) = state
449 .app
450 .on_runtime_event(&runtime_event.block, &runtime_event.tags)
451 .await
452 {
453 error!(self.logger, "Application event notification failed"; "err" => ?err);
454 }
455 }
456
457 Ok(Body::Empty {})
458 }
459
460 Body::RuntimeKeyManagerStatusUpdateRequest { status } => {
462 self.handle_km_status_update(state, status).await
464 }
465 Body::RuntimeKeyManagerQuotePolicyUpdateRequest {
466 policy: quote_policy,
467 } => {
468 self.handle_km_quote_policy_update(state, quote_policy)
470 .await
471 }
472 Body::RuntimeConsensusSyncRequest { height } => state
473 .consensus_verifier
474 .sync(height)
475 .await
476 .map_err(Into::into)
477 .map(|_| Body::RuntimeConsensusSyncResponse {}),
478
479 _ => {
480 error!(self.logger, "Unsupported request type");
481 Err(Error::new("dispatcher", 1, "Unsupported request type"))
482 }
483 }
484 }
485
486 #[allow(clippy::too_many_arguments)]
487 async fn dispatch_query(
488 &self,
489 cache_set: cache::CacheSet,
490 txn_dispatcher: &Arc<dyn TxnDispatcher>,
491 protocol: &Arc<Protocol>,
492 method: String,
493 args: Vec<u8>,
494 state: TxDispatchState,
495 ) -> Result<Body, Error> {
496 debug!(self.logger, "Received query request";
497 "method" => &method,
498 "state_root" => ?state.header.state_root,
499 "round" => ?state.header.round,
500 );
501
502 if state.header.namespace != protocol.get_runtime_id() {
505 return Err(Error::new(
506 "dispatcher",
507 1,
508 &format!(
509 "block namespace does not match runtime id (namespace: {:?} runtime ID: {:?})",
510 state.header.namespace,
511 protocol.get_runtime_id(),
512 ),
513 ));
514 }
515
516 let protocol = protocol.clone();
517 let txn_dispatcher = txn_dispatcher.clone();
518
519 let consensus_state = state
522 .consensus_verifier
523 .unverified_state(state.consensus_block.clone())
524 .await?;
525
526 tokio::task::spawn_blocking(move || {
527 let cache = cache_set.query(Root {
528 namespace: state.header.namespace,
529 version: state.header.round,
530 root_type: RootType::State,
531 hash: state.header.state_root,
532 });
533 let mut cache = cache.borrow_mut();
534 let mut overlay = OverlayTree::new(cache.tree_mut());
535
536 let txn_ctx = TxnContext::new(
537 protocol,
538 &state.consensus_block,
539 consensus_state,
540 &mut overlay,
541 &state.header,
542 state.epoch,
543 &state.round_results,
544 state.max_messages,
545 state.check_only,
546 );
547
548 txn_dispatcher
549 .query(txn_ctx, &method, args)
550 .map(|data| Body::RuntimeQueryResponse { data })
551 })
552 .await?
553 }
554
555 fn txn_check_batch(
556 &self,
557 protocol: Arc<Protocol>,
558 cache_set: cache::CacheSet,
559 txn_dispatcher: &dyn TxnDispatcher,
560 inputs: TxnBatch,
561 state: TxDispatchState,
562 ) -> Result<Body, Error> {
563 let consensus_state = block_on(
566 state
567 .consensus_verifier
568 .unverified_state(state.consensus_block.clone()),
569 )?;
570
571 let mut cache = cache_set.check(Root {
572 namespace: state.header.namespace,
573 version: state.header.round,
574 root_type: RootType::State,
575 hash: state.header.state_root,
576 });
577 let mut overlay = OverlayTree::new(cache.tree_mut());
578
579 let txn_ctx = TxnContext::new(
580 protocol.clone(),
581 &state.consensus_block,
582 consensus_state,
583 &mut overlay,
584 &state.header,
585 state.epoch,
586 &state.round_results,
587 state.max_messages,
588 state.check_only,
589 );
590 let results = txn_dispatcher.check_batch(txn_ctx, &inputs);
591
592 if protocol.get_config().persist_check_tx_state {
593 let _ = overlay.commit().unwrap();
596 }
597
598 debug!(self.logger, "Transaction batch check complete");
599
600 results.map(|results| Body::RuntimeCheckTxBatchResponse { results })
601 }
602
603 #[allow(clippy::too_many_arguments)]
604 fn txn_execute_batch(
605 &self,
606 protocol: Arc<Protocol>,
607 cache_set: cache::CacheSet,
608 txn_dispatcher: &dyn TxnDispatcher,
609 mut inputs: TxnBatch,
610 in_msgs: Vec<roothash::IncomingMessage>,
611 io_root: Hash,
612 state: TxDispatchState,
613 ) -> Result<Body, Error> {
614 let consensus_state = block_on(state.consensus_verifier.verify(
617 state.consensus_block.clone(),
618 state.header.clone(),
619 state.epoch,
620 ))?;
621 protocol.ensure_initialized()?;
623
624 let header = &state.header;
625
626 let mut cache = cache_set.execute(Root {
627 namespace: state.header.namespace,
628 version: state.header.round,
629 root_type: RootType::State,
630 hash: state.header.state_root,
631 });
632 let mut overlay = OverlayTree::new(cache.tree_mut());
633
634 let txn_ctx = TxnContext::new(
635 protocol,
636 &state.consensus_block,
637 consensus_state,
638 &mut overlay,
639 header,
640 state.epoch,
641 &state.round_results,
642 state.max_messages,
643 state.check_only,
644 );
645
646 let mut results = match state.mode {
648 ExecutionMode::Execute => {
649 txn_dispatcher.execute_batch(txn_ctx, &inputs, &in_msgs)?
651 }
652 ExecutionMode::Schedule => {
653 txn_dispatcher.schedule_and_execute_batch(txn_ctx, &mut inputs, &in_msgs)?
655 }
656 };
657
658 let (state_write_log, new_state_root) = overlay
660 .commit_both(header.namespace, header.round + 1)
661 .expect("state commit must succeed");
662
663 txn_dispatcher.finalize(new_state_root);
664 cache.commit(header.round + 1, new_state_root);
665
666 let mut txn_tree = TxnTree::new(
670 Box::new(NoopReadSyncer),
671 Root {
672 namespace: header.namespace,
673 version: header.round + 1,
674 root_type: RootType::IO,
675 hash: Hash::empty_hash(),
676 },
677 );
678 let mut hashes = Vec::new();
679 for (batch_order, input) in inputs.drain(..).enumerate() {
680 hashes.push(Hash::digest_bytes(&input));
681 txn_tree
682 .add_input(input, batch_order.try_into().unwrap())
683 .expect("add transaction must succeed");
684 }
685
686 let (input_write_log, input_io_root) = txn_tree.commit().expect("io commit must succeed");
687
688 assert!(
689 state.mode != ExecutionMode::Execute || input_io_root == io_root,
690 "dispatcher: I/O root inconsistent with inputs (expected: {:?} got: {:?})",
691 io_root,
692 input_io_root
693 );
694
695 for (tx_hash, result) in hashes.iter().zip(results.results.drain(..)) {
696 txn_tree
697 .add_output(*tx_hash, result.output, result.tags)
698 .expect("add transaction must succeed");
699 }
700
701 txn_tree
702 .add_block_tags(results.block_tags)
703 .expect("adding block tags must succeed");
704
705 let (io_write_log, io_root) = txn_tree.commit().expect("io commit must succeed");
706
707 let header = ComputeResultsHeader {
708 round: header.round + 1,
709 previous_hash: header.encoded_hash(),
710 io_root: Some(io_root),
711 state_root: Some(new_state_root),
712 messages_hash: Some(roothash::Message::messages_hash(&results.messages)),
713 in_msgs_hash: Some(roothash::IncomingMessage::in_messages_hash(
714 &in_msgs[..results.in_msgs_count],
715 )),
716 in_msgs_count: results.in_msgs_count.try_into().unwrap(),
717 };
718
719 debug!(self.logger, "Transaction batch execution complete";
720 "previous_hash" => ?header.previous_hash,
721 "io_root" => ?header.io_root,
722 "state_root" => ?header.state_root,
723 "messages_hash" => ?header.messages_hash,
724 "in_msgs_hash" => ?header.in_msgs_hash,
725 );
726
727 let rak_sig = self
728 .identity
729 .sign(
730 COMPUTE_RESULTS_HEADER_SIGNATURE_CONTEXT,
731 &cbor::to_vec(header.clone()),
732 )
733 .unwrap();
734
735 Ok(Body::RuntimeExecuteTxBatchResponse {
736 batch: ComputedBatch {
737 header,
738 io_write_log,
739 state_write_log,
740 rak_sig,
741 messages: results.messages,
742 },
743 tx_hashes: hashes,
744 tx_reject_hashes: results.tx_reject_hashes,
745 tx_input_root: input_io_root,
746 tx_input_write_log: input_write_log,
747 })
748 }
749
750 #[allow(clippy::too_many_arguments)]
751 async fn dispatch_txn(
752 self: &Arc<Self>,
753 cache_set: cache::CacheSet,
754 txn_dispatcher: &Arc<dyn TxnDispatcher>,
755 protocol: &Arc<Protocol>,
756 io_root: Hash,
757 inputs: TxnBatch,
758 in_msgs: Vec<roothash::IncomingMessage>,
759 state: TxDispatchState,
760 ) -> Result<Body, Error> {
761 let _guard = AbortOnPanic;
764
765 debug!(self.logger, "Received transaction batch request";
766 "state_root" => ?state.header.state_root,
767 "round" => state.header.round + 1,
768 "round_results" => ?state.round_results,
769 "tx_count" => inputs.len(),
770 "in_msg_count" => in_msgs.len(),
771 "check_only" => state.check_only,
772 );
773
774 assert!(
777 state.header.namespace == protocol.get_runtime_id(),
778 "block namespace does not match runtime id (namespace: {:?} runtime ID: {:?})",
779 state.header.namespace,
780 protocol.get_runtime_id(),
781 );
782
783 let protocol = protocol.clone();
784 let dispatcher = self.clone();
785 let txn_dispatcher = txn_dispatcher.clone();
786
787 tokio::task::spawn_blocking(move || {
788 if state.check_only {
789 dispatcher.txn_check_batch(protocol, cache_set, &txn_dispatcher, inputs, state)
790 } else {
791 dispatcher.txn_execute_batch(
792 protocol,
793 cache_set,
794 &txn_dispatcher,
795 inputs,
796 in_msgs,
797 io_root,
798 state,
799 )
800 }
801 })
802 .await
803 .unwrap() }
805
806 async fn dispatch_secure_rpc(
807 &self,
808 state: State,
809 request: Vec<u8>,
810 peer_id: Vec<u8>,
811 ) -> Result<Body, Error> {
812 let _guard = AbortOnPanic;
815
816 let mut buffer = vec![];
818 let (mut session, message) = state
819 .rpc_demux
820 .process_frame(peer_id, request, &mut buffer)
821 .await?;
822
823 if let Some(message) = message {
824 assert!(
826 buffer.is_empty(),
827 "must have no handshake data in transport mode"
828 );
829
830 match message {
831 RpcMessage::Request(req) => {
832 let response = self
834 .dispatch_rpc(req, RpcKind::NoiseSession, session.info(), &state)
835 .await?;
836 let response = RpcMessage::Response(response);
837
838 debug!(self.logger, "RPC call dispatch complete";
841 "kind" => ?RpcKind::NoiseSession,
842 );
843
844 let mut buffer = vec![];
845 session
846 .write_message(response, &mut buffer)
847 .map_err(|err| {
848 error!(self.logger, "Error while writing response"; "err" => %err);
849 Error::new("rhp/dispatcher", 1, &format!("{err}"))
850 })
851 .map(|_| Body::RuntimeRPCCallResponse { response: buffer })
852 }
853 RpcMessage::Close => {
854 let mut buffer = vec![];
856 state
857 .rpc_demux
858 .close(session, &mut buffer)
859 .map_err(|err| {
860 error!(self.logger, "Error while closing session"; "err" => %err);
861 Error::new("rhp/dispatcher", 1, &format!("{err}"))
862 })
863 .map(|_| Body::RuntimeRPCCallResponse { response: buffer })
864 }
865 msg => {
866 warn!(self.logger, "Ignoring invalid RPC message type"; "msg" => ?msg);
867 Err(Error::new("rhp/dispatcher", 1, "invalid RPC message type"))
868 }
869 }
870 } else {
871 Ok(Body::RuntimeRPCCallResponse { response: buffer })
873 }
874 }
875
876 async fn dispatch_insecure_rpc(&self, state: State, request: Vec<u8>) -> Result<Body, Error> {
877 let _guard = AbortOnPanic;
880
881 let request: RpcRequest = cbor::from_slice(&request)
882 .map_err(|_| Error::new("rhp/dispatcher", 1, "malformed request"))?;
883
884 let response = self
886 .dispatch_rpc(request, RpcKind::InsecureQuery, None, &state)
887 .await?;
888 let response = cbor::to_vec(response);
889
890 debug!(self.logger, "RPC call dispatch complete";
893 "kind" => ?RpcKind::InsecureQuery,
894 );
895
896 Ok(Body::RuntimeRPCCallResponse { response })
897 }
898
899 async fn dispatch_local_rpc(&self, state: State, request: Vec<u8>) -> Result<Body, Error> {
900 let _guard = AbortOnPanic;
903
904 let request = cbor::from_slice(&request)
905 .map_err(|_| Error::new("rhp/dispatcher", 1, "malformed request"))?;
906
907 let response = self
909 .dispatch_rpc(request, RpcKind::LocalQuery, None, &state)
910 .await?;
911 let response = RpcMessage::Response(response);
912 let response = cbor::to_vec(response);
913
914 debug!(self.logger, "RPC call dispatch complete";
915 "kind" => ?RpcKind::LocalQuery,
916 );
917
918 Ok(Body::RuntimeLocalRPCCallResponse { response })
919 }
920
921 async fn dispatch_rpc(
922 &self,
923 request: RpcRequest,
924 kind: RpcKind,
925 session_info: Option<Arc<SessionInfo>>,
926 state: &State,
927 ) -> Result<RpcResponse, Error> {
928 let rpc_dispatcher = state.rpc_dispatcher.clone();
929
930 let response = tokio::task::spawn_blocking(move || {
931 let rpc_ctx = RpcContext::new(session_info);
932 rpc_dispatcher.dispatch(rpc_ctx, request, kind)
933 })
934 .await?;
935
936 Ok(response)
937 }
938
939 async fn handle_km_status_update(
940 &self,
941 state: State,
942 status: KeyManagerStatus,
943 ) -> Result<Body, Error> {
944 let _guard = AbortOnPanic;
947
948 debug!(self.logger, "Received km status update request");
949
950 let runtime_id = state.protocol.get_host_info().runtime_id;
952
953 tokio::task::spawn_blocking(move || -> Result<(), Error> {
954 let key_manager = state.policy_verifier.key_manager(&runtime_id)?;
955 let published_status = state
956 .policy_verifier
957 .verify_key_manager_status(status, key_manager)?;
958
959 state
961 .rpc_dispatcher
962 .handle_km_status_update(published_status);
963
964 Ok(())
965 })
966 .await
967 .unwrap()?; debug!(self.logger, "KM status update request complete");
970
971 Ok(Body::RuntimeKeyManagerStatusUpdateResponse {})
972 }
973
974 async fn handle_km_quote_policy_update(
975 &self,
976 state: State,
977 quote_policy: QuotePolicy,
978 ) -> Result<Body, Error> {
979 let _guard = AbortOnPanic;
982
983 debug!(self.logger, "Received km quote policy update request");
984
985 let runtime_id = state.protocol.get_host_info().runtime_id;
987
988 tokio::task::spawn_blocking(move || -> Result<(), Error> {
989 let key_manager = state.policy_verifier.key_manager(&runtime_id)?;
990 let policy =
991 state
992 .policy_verifier
993 .verify_quote_policy(quote_policy, &key_manager, None)?;
994
995 state.rpc_dispatcher.handle_km_quote_policy_update(policy);
997
998 Ok(())
999 })
1000 .await
1001 .unwrap()?; debug!(self.logger, "KM quote policy update request complete");
1004
1005 Ok(Body::RuntimeKeyManagerQuotePolicyUpdateResponse {})
1006 }
1007}