1use std::{
3 convert::TryInto,
4 sync::{Arc, Condvar, Mutex},
5 thread,
6};
7
8use anyhow::Result as AnyResult;
9use rustc_hex::ToHex;
10use slog::{debug, error, info, warn, Logger};
11use tokio::sync::mpsc;
12
13use crate::{
14 app::{self, METHOD_ROFL_GET_CONFIG},
15 attestation, cache,
16 common::{
17 crypto::{hash::Hash, signature::Signer},
18 logger::get_logger,
19 panic::AbortOnPanic,
20 sgx::QuotePolicy,
21 },
22 consensus::{
23 beacon::EpochTime,
24 roothash::{self, ComputeResultsHeader, Header, COMPUTE_RESULTS_HEADER_SIGNATURE_CONTEXT},
25 state::keymanager::Status as KeyManagerStatus,
26 verifier::Verifier,
27 LightBlock,
28 },
29 enclave_rpc::{
30 demux::Demux as RpcDemux,
31 dispatcher::Dispatcher as RpcDispatcher,
32 session::{self, SessionInfo},
33 types::{
34 Kind as RpcKind, Message as RpcMessage, Request as RpcRequest, Response as RpcResponse,
35 },
36 Context as RpcContext,
37 },
38 future::block_on,
39 identity::Identity,
40 policy::PolicyVerifier,
41 protocol::Protocol,
42 storage::mkvs::{sync::NoopReadSyncer, OverlayTree, Root, RootType},
43 transaction::{
44 dispatcher::{Dispatcher as TxnDispatcher, NoopDispatcher as TxnNoopDispatcher},
45 tree::Tree as TxnTree,
46 types::TxnBatch,
47 Context as TxnContext,
48 },
49 types::{Body, ComputedBatch, Error, ExecutionMode},
50};
51
52const BACKLOG_SIZE: usize = 1000;
54
55const RPC_MAX_SESSIONS: usize = 1024;
57const RPC_MAX_SESSIONS_PER_PEER: usize = 8;
60const RPC_STALE_SESSION_TIMEOUT_SECS: i64 = 10;
63
64pub trait Initializer: Send + Sync {
66 fn init(self: Box<Self>, state: PreInitState<'_>) -> PostInitState;
68}
69
70impl<F> Initializer for F
71where
72 F: FnOnce(PreInitState<'_>) -> PostInitState + Send + Sync,
73{
74 fn init(self: Box<Self>, state: PreInitState<'_>) -> PostInitState {
75 self(state)
76 }
77}
78
79pub struct PreInitState<'a> {
81 pub protocol: &'a Arc<Protocol>,
83 pub identity: &'a Arc<Identity>,
85 pub rpc_demux: &'a mut RpcDemux,
87 pub rpc_dispatcher: &'a mut RpcDispatcher,
89 pub consensus_verifier: &'a Arc<dyn Verifier>,
91}
92
93#[derive(Default)]
95pub struct PostInitState {
96 pub txn_dispatcher: Option<Box<dyn TxnDispatcher>>,
98 pub app: Option<Box<dyn app::App>>,
100}
101
102impl From<tokio::task::JoinError> for Error {
103 fn from(e: tokio::task::JoinError) -> Self {
104 Error::new(
105 "dispatcher",
106 1,
107 &format!("error while processing request: {e}"),
108 )
109 }
110}
111
112struct TxDispatchState {
114 mode: ExecutionMode,
115 consensus_block: LightBlock,
116 consensus_verifier: Arc<dyn Verifier>,
117 header: Header,
118 epoch: EpochTime,
119 round_results: roothash::RoundResults,
120 max_messages: u32,
121 check_only: bool,
122}
123
124struct ProtocolState {
126 protocol: Arc<Protocol>,
127 consensus_verifier: Arc<dyn Verifier>,
128}
129
130#[derive(Clone)]
132struct State {
133 protocol: Arc<Protocol>,
134 consensus_verifier: Arc<dyn Verifier>,
135 dispatcher: Arc<Dispatcher>,
136 app: Arc<dyn app::App>,
137 rpc_demux: Arc<RpcDemux>,
138 rpc_dispatcher: Arc<RpcDispatcher>,
139 txn_dispatcher: Arc<dyn TxnDispatcher>,
140 attestation_handler: attestation::Handler,
141 policy_verifier: Arc<PolicyVerifier>,
142 cache_set: cache::CacheSet,
143}
144
145#[derive(Debug)]
146enum Command {
147 Request(u64, Body),
148}
149
150pub struct Dispatcher {
152 logger: Logger,
153 queue_tx: mpsc::Sender<Command>,
154 identity: Arc<Identity>,
155
156 state: Mutex<Option<ProtocolState>>,
157 state_cond: Condvar,
158
159 tokio_runtime: tokio::runtime::Handle,
160}
161
162impl Dispatcher {
163 pub fn new(
165 tokio_runtime: tokio::runtime::Handle,
166 initializer: Box<dyn Initializer>,
167 identity: Arc<Identity>,
168 ) -> Arc<Self> {
169 let (tx, rx) = mpsc::channel(BACKLOG_SIZE);
170
171 let dispatcher = Arc::new(Dispatcher {
172 logger: get_logger("runtime/dispatcher"),
173 queue_tx: tx,
174 identity,
175 state: Mutex::new(None),
176 state_cond: Condvar::new(),
177 tokio_runtime,
178 });
179
180 let d = dispatcher.clone();
182 thread::spawn(move || {
183 let _guard = AbortOnPanic;
184 d.run(initializer, rx);
185 });
186
187 dispatcher
188 }
189
190 pub fn start(&self, protocol: Arc<Protocol>, consensus_verifier: Box<dyn Verifier>) {
192 let consensus_verifier = Arc::from(consensus_verifier);
193 let mut s = self.state.lock().unwrap();
194 *s = Some(ProtocolState {
195 protocol,
196 consensus_verifier,
197 });
198 self.state_cond.notify_one();
199 }
200
201 pub fn queue_request(&self, id: u64, body: Body) -> AnyResult<()> {
203 self.queue_tx.blocking_send(Command::Request(id, body))?;
204 Ok(())
205 }
206
207 fn run(self: &Arc<Self>, initializer: Box<dyn Initializer>, mut rx: mpsc::Receiver<Command>) {
208 let ProtocolState {
210 protocol,
211 consensus_verifier,
212 } = {
213 let mut guard = self.state.lock().unwrap();
214 while guard.is_none() {
215 guard = self.state_cond.wait(guard).unwrap();
216 }
217
218 guard.take().unwrap()
219 };
220
221 let _guard = self.tokio_runtime.enter();
223
224 info!(self.logger, "Starting the runtime dispatcher");
226 let mut rpc_demux = RpcDemux::new(
227 session::Builder::default().local_identity(self.identity.clone()),
228 RPC_MAX_SESSIONS,
229 RPC_MAX_SESSIONS_PER_PEER,
230 RPC_STALE_SESSION_TIMEOUT_SECS,
231 );
232 let mut rpc_dispatcher = RpcDispatcher::default();
233 let pre_init_state = PreInitState {
234 protocol: &protocol,
235 identity: &self.identity,
236 rpc_demux: &mut rpc_demux,
237 rpc_dispatcher: &mut rpc_dispatcher,
238 consensus_verifier: &consensus_verifier,
239 };
240 let post_init_state = initializer.init(pre_init_state);
241 let txn_dispatcher = post_init_state
242 .txn_dispatcher
243 .unwrap_or_else(|| Box::<TxnNoopDispatcher>::default());
244 let mut app = post_init_state
245 .app
246 .unwrap_or_else(|| Box::new(app::NoopApp));
247
248 if let Err(err) = app.on_init(protocol.clone()) {
250 error!(self.logger, "ROFL application initialization failed"; "err" => ?err);
251 }
252
253 let app: Arc<dyn app::App> = Arc::from(app);
254 let state = State {
255 protocol: protocol.clone(),
256 consensus_verifier: consensus_verifier.clone(),
257 dispatcher: self.clone(),
258 app: app.clone(),
259 rpc_demux: Arc::new(rpc_demux),
260 rpc_dispatcher: Arc::new(rpc_dispatcher),
261 txn_dispatcher: Arc::from(txn_dispatcher),
262 attestation_handler: attestation::Handler::new(
263 self.identity.clone(),
264 protocol.clone(),
265 consensus_verifier.clone(),
266 protocol.get_runtime_id(),
267 protocol.get_config().version,
268 app,
269 ),
270 policy_verifier: Arc::new(PolicyVerifier::new(consensus_verifier)),
271 cache_set: cache::CacheSet::new(protocol.clone()),
272 };
273
274 self.tokio_runtime.block_on(async move {
276 while let Some(cmd) = rx.recv().await {
277 match cmd {
279 Command::Request(id, request) => {
280 let state = state.clone();
282
283 tokio::spawn(async move {
284 let protocol = state.protocol.clone();
285 let dispatcher = state.dispatcher.clone();
286 let result = dispatcher.handle_request(state, request).await;
287
288 let response = match result {
290 Ok(body) => body,
291 Err(error) => Body::Error(error),
292 };
293 protocol.send_response(id, response).unwrap();
294 });
295 }
296 }
297 }
298 });
299
300 info!(self.logger, "Runtime call dispatcher is terminating");
301 }
302
303 async fn handle_request(self: &Arc<Self>, state: State, request: Body) -> Result<Body, Error> {
304 match request {
305 Body::RuntimeCapabilityTEERakInitRequest { .. }
307 | Body::RuntimeCapabilityTEERakReportRequest {}
308 | Body::RuntimeCapabilityTEERakAvrRequest { .. }
309 | Body::RuntimeCapabilityTEERakQuoteRequest { .. }
310 | Body::RuntimeCapabilityTEEUpdateEndorsementRequest { .. } => {
311 Ok(state.attestation_handler.handle(request).await?)
312 }
313
314 Body::RuntimeRPCCallRequest {
316 request,
317 kind,
318 peer_id,
319 } => {
320 debug!(self.logger, "Received RPC call request";
321 "kind" => ?kind,
322 "peer_id" => peer_id.to_hex::<String>(),
323 );
324
325 match kind {
326 RpcKind::NoiseSession => {
327 self.dispatch_secure_rpc(state, request, peer_id).await
328 }
329 RpcKind::InsecureQuery => self.dispatch_insecure_rpc(state, request).await,
330 RpcKind::LocalQuery => self.dispatch_local_rpc(state, request).await,
331 }
332 }
333 Body::RuntimeLocalRPCCallRequest { request } => {
334 debug!(self.logger, "Received RPC call request";
335 "kind" => ?RpcKind::LocalQuery,
336 );
337
338 self.dispatch_local_rpc(state, request).await
339 }
340
341 Body::RuntimeExecuteTxBatchRequest {
344 mode,
345 consensus_block,
346 round_results,
347 io_root,
348 inputs,
349 in_msgs,
350 block,
351 epoch,
352 max_messages,
353 } => {
354 self.dispatch_txn(
356 state.cache_set,
357 &state.txn_dispatcher,
358 &state.protocol,
359 io_root,
360 inputs.unwrap_or_default(),
361 in_msgs,
362 TxDispatchState {
363 mode,
364 consensus_block,
365 consensus_verifier: state.consensus_verifier,
366 header: block.header,
367 epoch,
368 round_results,
369 max_messages,
370 check_only: false,
371 },
372 )
373 .await
374 }
375 Body::RuntimeCheckTxBatchRequest {
376 consensus_block,
377 inputs,
378 block,
379 epoch,
380 max_messages,
381 } => {
382 self.dispatch_txn(
384 state.cache_set,
385 &state.txn_dispatcher,
386 &state.protocol,
387 Hash::default(),
388 inputs,
389 vec![],
390 TxDispatchState {
391 mode: ExecutionMode::Execute,
392 consensus_block,
393 consensus_verifier: state.consensus_verifier,
394 header: block.header,
395 epoch,
396 round_results: Default::default(),
397 max_messages,
398 check_only: true,
399 },
400 )
401 .await
402 }
403 Body::RuntimeQueryRequest {
404 consensus_block,
405 header,
406 epoch,
407 max_messages,
408 method,
409 args,
410 } if state.txn_dispatcher.is_supported() => {
411 self.dispatch_query(
413 state.cache_set,
414 &state.txn_dispatcher,
415 &state.protocol,
416 method,
417 args,
418 TxDispatchState {
419 mode: ExecutionMode::Execute,
420 consensus_block,
421 consensus_verifier: state.consensus_verifier,
422 header,
423 epoch,
424 round_results: Default::default(),
425 max_messages,
426 check_only: true,
427 },
428 )
429 .await
430 }
431
432 Body::RuntimeQueryRequest { method, args, .. } if state.app.is_rofl() => {
434 match method.as_str() {
435 METHOD_ROFL_GET_CONFIG => Ok(cbor::to_vec(state.app.get_config())),
436 _ => state.app.query(&method, args).await,
437 }
438 .map(|data| Body::RuntimeQueryResponse { data })
439 .map_err(Into::into)
440 }
441 Body::RuntimeNotifyRequest {
442 runtime_block,
443 runtime_event,
444 } => {
445 if let Some(runtime_block) = runtime_block {
446 if let Err(err) = state.app.on_runtime_block(&runtime_block).await {
447 error!(self.logger, "Application block notification failed"; "err" => ?err);
448 }
449 }
450 if let Some(runtime_event) = runtime_event {
451 if let Err(err) = state
452 .app
453 .on_runtime_event(&runtime_event.block, &runtime_event.tags)
454 .await
455 {
456 error!(self.logger, "Application event notification failed"; "err" => ?err);
457 }
458 }
459
460 Ok(Body::Empty {})
461 }
462
463 Body::RuntimeKeyManagerStatusUpdateRequest { status } => {
465 self.handle_km_status_update(state, status).await
467 }
468 Body::RuntimeKeyManagerQuotePolicyUpdateRequest {
469 policy: quote_policy,
470 } => {
471 self.handle_km_quote_policy_update(state, quote_policy)
473 .await
474 }
475 Body::RuntimeConsensusSyncRequest { height } => state
476 .consensus_verifier
477 .sync(height)
478 .await
479 .map_err(Into::into)
480 .map(|_| Body::RuntimeConsensusSyncResponse {}),
481
482 _ => {
483 error!(self.logger, "Unsupported request type");
484 Err(Error::new("dispatcher", 1, "Unsupported request type"))
485 }
486 }
487 }
488
489 #[allow(clippy::too_many_arguments)]
490 async fn dispatch_query(
491 &self,
492 cache_set: cache::CacheSet,
493 txn_dispatcher: &Arc<dyn TxnDispatcher>,
494 protocol: &Arc<Protocol>,
495 method: String,
496 args: Vec<u8>,
497 state: TxDispatchState,
498 ) -> Result<Body, Error> {
499 debug!(self.logger, "Received query request";
500 "method" => &method,
501 "state_root" => ?state.header.state_root,
502 "round" => ?state.header.round,
503 );
504
505 if state.header.namespace != protocol.get_runtime_id() {
508 return Err(Error::new(
509 "dispatcher",
510 1,
511 &format!(
512 "block namespace does not match runtime id (namespace: {:?} runtime ID: {:?})",
513 state.header.namespace,
514 protocol.get_runtime_id(),
515 ),
516 ));
517 }
518
519 let protocol = protocol.clone();
520 let txn_dispatcher = txn_dispatcher.clone();
521
522 let consensus_state = state
525 .consensus_verifier
526 .unverified_state(state.consensus_block.clone())
527 .await?;
528
529 tokio::task::spawn_blocking(move || {
530 let cache = cache_set.query(Root {
531 namespace: state.header.namespace,
532 version: state.header.round,
533 root_type: RootType::State,
534 hash: state.header.state_root,
535 });
536 let mut cache = cache.borrow_mut();
537 let mut overlay = OverlayTree::new(cache.tree_mut());
538
539 let txn_ctx = TxnContext::new(
540 protocol,
541 &state.consensus_block,
542 consensus_state,
543 &mut overlay,
544 &state.header,
545 state.epoch,
546 &state.round_results,
547 state.max_messages,
548 state.check_only,
549 );
550
551 txn_dispatcher
552 .query(txn_ctx, &method, args)
553 .map(|data| Body::RuntimeQueryResponse { data })
554 })
555 .await?
556 }
557
558 fn txn_check_batch(
559 &self,
560 protocol: Arc<Protocol>,
561 cache_set: cache::CacheSet,
562 txn_dispatcher: &dyn TxnDispatcher,
563 inputs: TxnBatch,
564 state: TxDispatchState,
565 ) -> Result<Body, Error> {
566 let consensus_state = block_on(
569 state
570 .consensus_verifier
571 .unverified_state(state.consensus_block.clone()),
572 )?;
573
574 let mut cache = cache_set.check(Root {
575 namespace: state.header.namespace,
576 version: state.header.round,
577 root_type: RootType::State,
578 hash: state.header.state_root,
579 });
580 let mut overlay = OverlayTree::new(cache.tree_mut());
581
582 let txn_ctx = TxnContext::new(
583 protocol.clone(),
584 &state.consensus_block,
585 consensus_state,
586 &mut overlay,
587 &state.header,
588 state.epoch,
589 &state.round_results,
590 state.max_messages,
591 state.check_only,
592 );
593 let results = txn_dispatcher.check_batch(txn_ctx, &inputs);
594
595 if protocol.get_config().persist_check_tx_state {
596 let _ = overlay.commit().unwrap();
599 }
600
601 debug!(self.logger, "Transaction batch check complete");
602
603 results.map(|results| Body::RuntimeCheckTxBatchResponse { results })
604 }
605
606 #[allow(clippy::too_many_arguments)]
607 fn txn_execute_batch(
608 &self,
609 protocol: Arc<Protocol>,
610 cache_set: cache::CacheSet,
611 txn_dispatcher: &dyn TxnDispatcher,
612 mut inputs: TxnBatch,
613 in_msgs: Vec<roothash::IncomingMessage>,
614 io_root: Hash,
615 state: TxDispatchState,
616 ) -> Result<Body, Error> {
617 let consensus_state = block_on(state.consensus_verifier.verify(
620 state.consensus_block.clone(),
621 state.header.clone(),
622 state.epoch,
623 ))?;
624 protocol.ensure_initialized()?;
626
627 let header = &state.header;
628
629 let mut cache = cache_set.execute(Root {
630 namespace: state.header.namespace,
631 version: state.header.round,
632 root_type: RootType::State,
633 hash: state.header.state_root,
634 });
635 let mut overlay = OverlayTree::new(cache.tree_mut());
636
637 let txn_ctx = TxnContext::new(
638 protocol,
639 &state.consensus_block,
640 consensus_state,
641 &mut overlay,
642 header,
643 state.epoch,
644 &state.round_results,
645 state.max_messages,
646 state.check_only,
647 );
648
649 let mut results = match state.mode {
651 ExecutionMode::Execute => {
652 txn_dispatcher.execute_batch(txn_ctx, &inputs, &in_msgs)?
654 }
655 ExecutionMode::Schedule => {
656 txn_dispatcher.schedule_and_execute_batch(txn_ctx, &mut inputs, &in_msgs)?
658 }
659 };
660
661 let (state_write_log, new_state_root) = overlay
663 .commit_both(header.namespace, header.round + 1)
664 .expect("state commit must succeed");
665
666 txn_dispatcher.finalize(new_state_root);
667 cache.commit(header.round + 1, new_state_root);
668
669 let mut txn_tree = TxnTree::new(
673 Box::new(NoopReadSyncer),
674 Root {
675 namespace: header.namespace,
676 version: header.round + 1,
677 root_type: RootType::IO,
678 hash: Hash::empty_hash(),
679 },
680 );
681 let mut hashes = Vec::new();
682 for (batch_order, input) in inputs.drain(..).enumerate() {
683 hashes.push(Hash::digest_bytes(&input));
684 txn_tree
685 .add_input(input, batch_order.try_into().unwrap())
686 .expect("add transaction must succeed");
687 }
688
689 let (input_write_log, input_io_root) = txn_tree.commit().expect("io commit must succeed");
690
691 assert!(
692 state.mode != ExecutionMode::Execute || input_io_root == io_root,
693 "dispatcher: I/O root inconsistent with inputs (expected: {:?} got: {:?})",
694 io_root,
695 input_io_root
696 );
697
698 for (tx_hash, result) in hashes.iter().zip(results.results.drain(..)) {
699 txn_tree
700 .add_output(*tx_hash, result.output, result.tags)
701 .expect("add transaction must succeed");
702 }
703
704 txn_tree
705 .add_block_tags(results.block_tags)
706 .expect("adding block tags must succeed");
707
708 let (io_write_log, io_root) = txn_tree.commit().expect("io commit must succeed");
709
710 let header = ComputeResultsHeader {
711 round: header.round + 1,
712 previous_hash: header.encoded_hash(),
713 io_root: Some(io_root),
714 state_root: Some(new_state_root),
715 messages_hash: Some(roothash::Message::messages_hash(&results.messages)),
716 in_msgs_hash: Some(roothash::IncomingMessage::in_messages_hash(
717 &in_msgs[..results.in_msgs_count],
718 )),
719 in_msgs_count: results.in_msgs_count.try_into().unwrap(),
720 };
721
722 debug!(self.logger, "Transaction batch execution complete";
723 "previous_hash" => ?header.previous_hash,
724 "io_root" => ?header.io_root,
725 "state_root" => ?header.state_root,
726 "messages_hash" => ?header.messages_hash,
727 "in_msgs_hash" => ?header.in_msgs_hash,
728 );
729
730 let rak_sig = self
731 .identity
732 .sign(
733 COMPUTE_RESULTS_HEADER_SIGNATURE_CONTEXT,
734 &cbor::to_vec(header.clone()),
735 )
736 .unwrap();
737
738 Ok(Body::RuntimeExecuteTxBatchResponse {
739 batch: ComputedBatch {
740 header,
741 io_write_log,
742 state_write_log,
743 rak_sig,
744 messages: results.messages,
745 },
746 tx_hashes: hashes,
747 tx_reject_hashes: results.tx_reject_hashes,
748 tx_input_root: input_io_root,
749 tx_input_write_log: input_write_log,
750 })
751 }
752
753 #[allow(clippy::too_many_arguments)]
754 async fn dispatch_txn(
755 self: &Arc<Self>,
756 cache_set: cache::CacheSet,
757 txn_dispatcher: &Arc<dyn TxnDispatcher>,
758 protocol: &Arc<Protocol>,
759 io_root: Hash,
760 inputs: TxnBatch,
761 in_msgs: Vec<roothash::IncomingMessage>,
762 state: TxDispatchState,
763 ) -> Result<Body, Error> {
764 let _guard = AbortOnPanic;
767
768 debug!(self.logger, "Received transaction batch request";
769 "state_root" => ?state.header.state_root,
770 "round" => state.header.round + 1,
771 "round_results" => ?state.round_results,
772 "tx_count" => inputs.len(),
773 "in_msg_count" => in_msgs.len(),
774 "check_only" => state.check_only,
775 );
776
777 assert!(
780 state.header.namespace == protocol.get_runtime_id(),
781 "block namespace does not match runtime id (namespace: {:?} runtime ID: {:?})",
782 state.header.namespace,
783 protocol.get_runtime_id(),
784 );
785
786 let protocol = protocol.clone();
787 let dispatcher = self.clone();
788 let txn_dispatcher = txn_dispatcher.clone();
789
790 tokio::task::spawn_blocking(move || {
791 if state.check_only {
792 dispatcher.txn_check_batch(protocol, cache_set, &txn_dispatcher, inputs, state)
793 } else {
794 dispatcher.txn_execute_batch(
795 protocol,
796 cache_set,
797 &txn_dispatcher,
798 inputs,
799 in_msgs,
800 io_root,
801 state,
802 )
803 }
804 })
805 .await
806 .unwrap() }
808
809 async fn dispatch_secure_rpc(
810 &self,
811 state: State,
812 request: Vec<u8>,
813 peer_id: Vec<u8>,
814 ) -> Result<Body, Error> {
815 let _guard = AbortOnPanic;
818
819 let mut buffer = vec![];
821 let (mut session, message) = state
822 .rpc_demux
823 .process_frame(peer_id, request, &mut buffer)
824 .await?;
825
826 if let Some(message) = message {
827 assert!(
829 buffer.is_empty(),
830 "must have no handshake data in transport mode"
831 );
832
833 match message {
834 RpcMessage::Request(req) => {
835 let response = self
837 .dispatch_rpc(req, RpcKind::NoiseSession, session.info(), &state)
838 .await?;
839 let response = RpcMessage::Response(response);
840
841 debug!(self.logger, "RPC call dispatch complete";
844 "kind" => ?RpcKind::NoiseSession,
845 );
846
847 let mut buffer = vec![];
848 session
849 .write_message(response, &mut buffer)
850 .map_err(|err| {
851 error!(self.logger, "Error while writing response"; "err" => %err);
852 Error::new("rhp/dispatcher", 1, &format!("{err}"))
853 })
854 .map(|_| Body::RuntimeRPCCallResponse { response: buffer })
855 }
856 RpcMessage::Close => {
857 let mut buffer = vec![];
859 state
860 .rpc_demux
861 .close(session, &mut buffer)
862 .map_err(|err| {
863 error!(self.logger, "Error while closing session"; "err" => %err);
864 Error::new("rhp/dispatcher", 1, &format!("{err}"))
865 })
866 .map(|_| Body::RuntimeRPCCallResponse { response: buffer })
867 }
868 msg => {
869 warn!(self.logger, "Ignoring invalid RPC message type"; "msg" => ?msg);
870 Err(Error::new("rhp/dispatcher", 1, "invalid RPC message type"))
871 }
872 }
873 } else {
874 Ok(Body::RuntimeRPCCallResponse { response: buffer })
876 }
877 }
878
879 async fn dispatch_insecure_rpc(&self, state: State, request: Vec<u8>) -> Result<Body, Error> {
880 let _guard = AbortOnPanic;
883
884 let request: RpcRequest = cbor::from_slice(&request)
885 .map_err(|_| Error::new("rhp/dispatcher", 1, "malformed request"))?;
886
887 let response = self
889 .dispatch_rpc(request, RpcKind::InsecureQuery, None, &state)
890 .await?;
891 let response = cbor::to_vec(response);
892
893 debug!(self.logger, "RPC call dispatch complete";
896 "kind" => ?RpcKind::InsecureQuery,
897 );
898
899 Ok(Body::RuntimeRPCCallResponse { response })
900 }
901
902 async fn dispatch_local_rpc(&self, state: State, request: Vec<u8>) -> Result<Body, Error> {
903 let _guard = AbortOnPanic;
906
907 let request = cbor::from_slice(&request)
908 .map_err(|_| Error::new("rhp/dispatcher", 1, "malformed request"))?;
909
910 let response = self
912 .dispatch_rpc(request, RpcKind::LocalQuery, None, &state)
913 .await?;
914 let response = RpcMessage::Response(response);
915 let response = cbor::to_vec(response);
916
917 debug!(self.logger, "RPC call dispatch complete";
918 "kind" => ?RpcKind::LocalQuery,
919 );
920
921 Ok(Body::RuntimeLocalRPCCallResponse { response })
922 }
923
924 async fn dispatch_rpc(
925 &self,
926 request: RpcRequest,
927 kind: RpcKind,
928 session_info: Option<Arc<SessionInfo>>,
929 state: &State,
930 ) -> Result<RpcResponse, Error> {
931 let rpc_dispatcher = state.rpc_dispatcher.clone();
932
933 let response = tokio::task::spawn_blocking(move || {
934 let rpc_ctx = RpcContext::new(session_info);
935 rpc_dispatcher.dispatch(rpc_ctx, request, kind)
936 })
937 .await?;
938
939 Ok(response)
940 }
941
942 async fn handle_km_status_update(
943 &self,
944 state: State,
945 status: KeyManagerStatus,
946 ) -> Result<Body, Error> {
947 let _guard = AbortOnPanic;
950
951 debug!(self.logger, "Received km status update request");
952
953 let runtime_id = state.protocol.get_host_info().runtime_id;
955
956 tokio::task::spawn_blocking(move || -> Result<(), Error> {
957 let key_manager = state.policy_verifier.key_manager(&runtime_id)?;
958 let published_status = state
959 .policy_verifier
960 .verify_key_manager_status(status, key_manager)?;
961
962 state
964 .rpc_dispatcher
965 .handle_km_status_update(published_status);
966
967 Ok(())
968 })
969 .await
970 .unwrap()?; debug!(self.logger, "KM status update request complete");
973
974 Ok(Body::RuntimeKeyManagerStatusUpdateResponse {})
975 }
976
977 async fn handle_km_quote_policy_update(
978 &self,
979 state: State,
980 quote_policy: QuotePolicy,
981 ) -> Result<Body, Error> {
982 let _guard = AbortOnPanic;
985
986 debug!(self.logger, "Received km quote policy update request");
987
988 let runtime_id = state.protocol.get_host_info().runtime_id;
990
991 tokio::task::spawn_blocking(move || -> Result<(), Error> {
992 let key_manager = state.policy_verifier.key_manager(&runtime_id)?;
993 let policy =
994 state
995 .policy_verifier
996 .verify_quote_policy(quote_policy, &key_manager, None)?;
997
998 state.rpc_dispatcher.handle_km_quote_policy_update(policy);
1000
1001 Ok(())
1002 })
1003 .await
1004 .unwrap()?; debug!(self.logger, "KM quote policy update request complete");
1007
1008 Ok(Body::RuntimeKeyManagerQuotePolicyUpdateResponse {})
1009 }
1010}