1use std::{
2 collections::{BTreeMap, HashSet},
3 sync::{
4 atomic::{AtomicU64, Ordering},
5 Arc,
6 },
7 time::Duration,
8};
9
10use anyhow::{anyhow, Context as _, Result};
11use rand::{rngs::OsRng, Rng};
12use tokio::sync::{mpsc, oneshot};
13
14use crate::{
15 core::{
16 common::crypto::{hash::Hash, mrae::deoxysii},
17 consensus::{
18 registry::{SGXConstraints, TEEHardware},
19 state::{
20 beacon::ImmutableState as BeaconState, registry::ImmutableState as RegistryState,
21 },
22 },
23 enclave_rpc::{client::RpcClient, session},
24 host::{self, Host as _},
25 storage::mkvs,
26 },
27 crypto::signature::{PublicKey, Signer},
28 enclave_rpc::{QueryRequest, METHOD_QUERY},
29 modules::{
30 self,
31 accounts::types::NonceQuery,
32 core::types::{CallDataPublicKeyQueryResponse, EstimateGasQuery},
33 },
34 state::CurrentState,
35 storage::{host::new_mkvs_tree_for_round, HostStore},
36 types::{
37 address::{Address, SignatureAddressSpec},
38 callformat, token,
39 transaction::{self, CallerAddress},
40 },
41};
42
43use super::{processor, App};
44
45const CMDQ_BACKLOG: usize = 16;
47
48const ENCLAVE_RPC_ENDPOINT_RONL: &str = "ronl";
50
51#[derive(Clone, Debug)]
53pub struct SubmitTxOpts {
54 pub timeout: Option<Duration>,
57 pub encrypt: bool,
59 pub verify: bool,
61}
62
63impl Default for SubmitTxOpts {
64 fn default() -> Self {
65 Self {
66 timeout: Some(Duration::from_millis(15_000)), encrypt: true,
68 verify: true,
69 }
70 }
71}
72
73#[derive(Clone, Debug, Default)]
75pub struct DeriveKeyRequest {
76 pub kind: modules::rofl::types::KeyKind,
78 pub scope: modules::rofl::types::KeyScope,
80 pub generation: u64,
82 pub key_id: Vec<u8>,
84}
85
86pub struct Client<A: App> {
88 state: Arc<processor::State<A>>,
89 imp: ClientImpl<A>,
90 submission_mgr: Arc<SubmissionManager<A>>,
91}
92
93impl<A> Client<A>
94where
95 A: App,
96{
97 pub(super) fn new(
99 state: Arc<processor::State<A>>,
100 cmdq: mpsc::WeakSender<processor::Command>,
101 ) -> Self {
102 let imp = ClientImpl::new(state.clone(), cmdq);
103 let mut submission_mgr = SubmissionManager::new(imp.clone());
104 submission_mgr.start();
105
106 Self {
107 state,
108 imp,
109 submission_mgr: Arc::new(submission_mgr),
110 }
111 }
112
113 pub async fn latest_round(&self) -> Result<u64> {
115 self.imp.latest_round().await
116 }
117
118 pub async fn account_nonce(&self, round: u64, address: Address) -> Result<u64> {
120 self.imp.account_nonce(round, address).await
121 }
122
123 pub async fn gas_price(&self, round: u64, denom: &token::Denomination) -> Result<u128> {
125 self.imp.gas_price(round, denom).await
126 }
127
128 pub async fn query<Rq, Rs>(&self, round: u64, method: &str, args: Rq) -> Result<Rs>
130 where
131 Rq: cbor::Encode,
132 Rs: cbor::Decode + Send + 'static,
133 {
134 self.imp.query(round, method, args).await
135 }
136
137 pub async fn estimate_gas(&self, req: EstimateGasQuery) -> Result<u64> {
139 self.imp.estimate_gas(req).await
140 }
141
142 pub async fn app_cfg(&self) -> Result<modules::rofl::types::AppConfig> {
144 self.imp.app_cfg().await
145 }
146
147 pub async fn multi_sign_and_submit_tx(
151 &self,
152 signers: &[Arc<dyn Signer>],
153 tx: transaction::Transaction,
154 ) -> Result<transaction::CallResult> {
155 self.multi_sign_and_submit_tx_opts(signers, tx, SubmitTxOpts::default())
156 .await
157 }
158
159 pub async fn multi_sign_and_submit_tx_opts(
163 &self,
164 signers: &[Arc<dyn Signer>],
165 tx: transaction::Transaction,
166 opts: SubmitTxOpts,
167 ) -> Result<transaction::CallResult> {
168 self.submission_mgr
169 .multi_sign_and_submit_tx(signers, tx, opts)
170 .await
171 }
172
173 pub async fn sign_and_submit_tx(
175 &self,
176 signer: Arc<dyn Signer>,
177 tx: transaction::Transaction,
178 ) -> Result<transaction::CallResult> {
179 self.multi_sign_and_submit_tx(&[signer], tx).await
180 }
181
182 pub async fn with_store_for_round<F, R>(&self, round: u64, f: F) -> Result<R>
184 where
185 F: FnOnce() -> Result<R> + Send + 'static,
186 R: Send + 'static,
187 {
188 self.imp.with_store_for_round(round, f).await
189 }
190
191 pub async fn store_for_round(&self, round: u64) -> Result<HostStore> {
193 self.imp.store_for_round(round).await
194 }
195
196 pub async fn derive_key(
198 &self,
199 signer: Arc<dyn Signer>,
200 request: DeriveKeyRequest,
201 ) -> Result<modules::rofl::types::DeriveKeyResponse> {
202 let tx = self.state.app.new_transaction(
203 "rofl.DeriveKey",
204 modules::rofl::types::DeriveKey {
205 app: A::id(),
206 kind: request.kind,
207 scope: request.scope,
208 generation: request.generation,
209 key_id: request.key_id,
210 },
211 );
212 let response = self.sign_and_submit_tx(signer, tx).await?;
213 Ok(cbor::from_value(response.ok()?)?)
214 }
215}
216
217impl<A> Clone for Client<A>
218where
219 A: App,
220{
221 fn clone(&self) -> Self {
222 Self {
223 state: self.state.clone(),
224 imp: self.imp.clone(),
225 submission_mgr: self.submission_mgr.clone(),
226 }
227 }
228}
229
230struct ClientImpl<A: App> {
231 state: Arc<processor::State<A>>,
232 cmdq: mpsc::WeakSender<processor::Command>,
233 latest_round: Arc<AtomicU64>,
234 rpc: Arc<RpcClient>,
235}
236
237impl<A> ClientImpl<A>
238where
239 A: App,
240{
241 fn new(state: Arc<processor::State<A>>, cmdq: mpsc::WeakSender<processor::Command>) -> Self {
242 Self {
243 cmdq,
244 latest_round: Arc::new(AtomicU64::new(0)),
245 rpc: Arc::new(RpcClient::new_runtime(
246 state.host.clone(),
247 ENCLAVE_RPC_ENDPOINT_RONL,
248 session::Builder::default()
249 .use_endorsement(true)
250 .quote_policy(None) .local_identity(state.identity.clone())
252 .remote_enclaves(Some(HashSet::new())), 2, 1, 1, )),
257 state,
258 }
259 }
260
261 async fn latest_round(&self) -> Result<u64> {
263 let cmdq = self
264 .cmdq
265 .upgrade()
266 .ok_or(anyhow!("processor has shut down"))?;
267 let (tx, rx) = oneshot::channel();
268 cmdq.send(processor::Command::GetLatestRound(tx)).await?;
269 let round = rx.await?;
270 Ok(self
271 .latest_round
272 .fetch_max(round, Ordering::SeqCst)
273 .max(round))
274 }
275
276 async fn account_nonce(&self, round: u64, address: Address) -> Result<u64> {
278 self.query(round, "accounts.Nonce", NonceQuery { address })
279 .await
280 }
281
282 async fn gas_price(&self, round: u64, denom: &token::Denomination) -> Result<u128> {
284 let mgp: BTreeMap<token::Denomination, u128> =
285 self.query(round, "core.MinGasPrice", ()).await?;
286 mgp.get(denom)
287 .ok_or(anyhow!("denomination not supported"))
288 .copied()
289 }
290
291 async fn call_data_public_key(&self) -> Result<CallDataPublicKeyQueryResponse> {
293 let round = self.latest_round().await?;
294 self.query(round, "core.CallDataPublicKey", ()).await
295 }
296
297 async fn query<Rq, Rs>(&self, round: u64, method: &str, args: Rq) -> Result<Rs>
299 where
300 Rq: cbor::Encode,
301 Rs: cbor::Decode + Send + 'static,
302 {
303 let state = self.state.consensus_verifier.latest_state().await?;
305 let runtime_id = self.state.host.get_runtime_id();
306 let tee = tokio::task::spawn_blocking(move || -> Result<_> {
307 let beacon = BeaconState::new(&state);
308 let epoch = beacon.epoch()?;
309 let registry = RegistryState::new(&state);
310 let runtime = registry
311 .runtime(&runtime_id)?
312 .ok_or(anyhow!("runtime not available"))?;
313 let ad = runtime
314 .active_deployment(epoch)
315 .ok_or(anyhow!("active runtime deployment not available"))?;
316
317 match runtime.tee_hardware {
318 TEEHardware::TEEHardwareIntelSGX => Ok(ad.try_decode_tee::<SGXConstraints>()?),
319 _ => Err(anyhow!("unsupported TEE platform")),
320 }
321 })
322 .await??;
323
324 let enclaves = HashSet::from_iter(tee.enclaves().clone());
325 let quote_policy = tee.policy();
326 self.rpc.update_enclaves(Some(enclaves)).await;
327 self.rpc.update_quote_policy(quote_policy).await;
328
329 let response: Vec<u8> = self
330 .rpc
331 .secure_call(
332 METHOD_QUERY,
333 QueryRequest {
334 round,
335 method: method.to_string(),
336 args: cbor::to_vec(args),
337 },
338 vec![],
339 )
340 .await
341 .into_result()?;
342
343 Ok(cbor::from_slice(&response)?)
344 }
345
346 async fn estimate_gas(&self, req: EstimateGasQuery) -> Result<u64> {
348 let round = self.latest_round().await?;
349 self.query(round, "core.EstimateGas", req).await
350 }
351
352 async fn app_cfg(&self) -> Result<modules::rofl::types::AppConfig> {
354 let round = self.latest_round().await?;
355 self.query(
356 round,
357 "rofl.App",
358 modules::rofl::types::AppQuery { id: A::id() },
359 )
360 .await
361 }
362
363 async fn with_store_for_round<F, R>(&self, round: u64, f: F) -> Result<R>
365 where
366 F: FnOnce() -> Result<R> + Send + 'static,
367 R: Send + 'static,
368 {
369 let store = self.store_for_round(round).await?;
370
371 tokio::task::spawn_blocking(move || CurrentState::enter(store, f)).await?
372 }
373
374 async fn store_for_round(&self, round: u64) -> Result<HostStore> {
376 HostStore::new_for_round(
377 self.state.host.clone(),
378 &self.state.consensus_verifier,
379 self.state.host.get_runtime_id(),
380 round,
381 )
382 .await
383 }
384}
385
386impl<A> Clone for ClientImpl<A>
387where
388 A: App,
389{
390 fn clone(&self) -> Self {
391 Self {
392 state: self.state.clone(),
393 cmdq: self.cmdq.clone(),
394 latest_round: self.latest_round.clone(),
395 rpc: self.rpc.clone(),
396 }
397 }
398}
399
400enum Cmd {
401 SubmitTx(
402 Vec<Arc<dyn Signer>>,
403 transaction::Transaction,
404 SubmitTxOpts,
405 oneshot::Sender<Result<transaction::CallResult>>,
406 ),
407}
408
409struct SubmissionManager<A: App> {
411 imp: Option<SubmissionManagerImpl<A>>,
412 cmdq_tx: mpsc::Sender<Cmd>,
413}
414
415impl<A> SubmissionManager<A>
416where
417 A: App,
418{
419 fn new(client: ClientImpl<A>) -> Self {
421 let (tx, rx) = mpsc::channel(CMDQ_BACKLOG);
422
423 Self {
424 imp: Some(SubmissionManagerImpl {
425 client,
426 cmdq_rx: rx,
427 }),
428 cmdq_tx: tx,
429 }
430 }
431
432 fn start(&mut self) {
434 if let Some(imp) = self.imp.take() {
435 imp.start();
436 }
437 }
438
439 async fn multi_sign_and_submit_tx(
441 &self,
442 signers: &[Arc<dyn Signer>],
443 tx: transaction::Transaction,
444 opts: SubmitTxOpts,
445 ) -> Result<transaction::CallResult> {
446 let (ch, rx) = oneshot::channel();
447 self.cmdq_tx
448 .send(Cmd::SubmitTx(signers.to_vec(), tx, opts, ch))
449 .await?;
450 rx.await?
451 }
452}
453
454struct SubmissionManagerImpl<A: App> {
455 client: ClientImpl<A>,
456 cmdq_rx: mpsc::Receiver<Cmd>,
457}
458
459impl<A> SubmissionManagerImpl<A>
460where
461 A: App,
462{
463 fn start(self) {
465 tokio::task::spawn(self.run());
466 }
467
468 async fn run(mut self) {
470 let (notify_tx, mut notify_rx) = mpsc::channel::<HashSet<PublicKey>>(CMDQ_BACKLOG);
471 let mut queue: Vec<Cmd> = Vec::new();
472 let mut pending: HashSet<PublicKey> = HashSet::new();
473
474 loop {
475 tokio::select! {
476 Some(cmd) = self.cmdq_rx.recv() => queue.push(cmd),
478
479 Some(signers) = notify_rx.recv() => {
481 for pk in signers {
482 pending.remove(&pk);
483 }
484 },
485
486 else => break,
487 }
488
489 let mut new_queue = Vec::with_capacity(queue.len());
491 for cmd in queue {
492 match cmd {
493 Cmd::SubmitTx(signers, tx, opts, ch) => {
494 let signer_set =
496 HashSet::from_iter(signers.iter().map(|signer| signer.public_key()));
497 if !signer_set.is_disjoint(&pending) {
498 new_queue.push(Cmd::SubmitTx(signers, tx, opts, ch));
500 continue;
501 }
502 pending.extend(signer_set.iter().cloned());
504
505 let client = self.client.clone();
507 let notify_tx = notify_tx.clone();
508
509 tokio::spawn(async move {
510 let result =
511 Self::multi_sign_and_submit_tx(client, &signers, tx, opts).await;
512 let _ = ch.send(result);
513
514 let _ = notify_tx.send(signer_set).await;
516 });
517 }
518 }
519 }
520 queue = new_queue;
521 }
522 }
523
524 async fn multi_sign_and_submit_tx(
526 client: ClientImpl<A>,
527 signers: &[Arc<dyn Signer>],
528 mut tx: transaction::Transaction,
529 opts: SubmitTxOpts,
530 ) -> Result<transaction::CallResult> {
531 if signers.is_empty() {
532 return Err(anyhow!("no signers specified"));
533 }
534
535 let addresses = signers
537 .iter()
538 .map(|signer| -> Result<_> {
539 let sigspec = SignatureAddressSpec::try_from_pk(&signer.public_key())
540 .ok_or(anyhow!("signature scheme not supported"))?;
541 Ok((Address::from_sigspec(&sigspec), sigspec))
542 })
543 .collect::<Result<Vec<_>>>()?;
544
545 let round = client.latest_round().await?;
546
547 for (address, sigspec) in &addresses {
549 let nonce = client.account_nonce(round, *address).await?;
550
551 tx.append_auth_signature(sigspec.clone(), nonce);
552 }
553
554 if tx.fee_gas() == 0 {
557 let signer = &signers[0]; let gas = client
559 .estimate_gas(EstimateGasQuery {
560 caller: if let PublicKey::Secp256k1(pk) = signer.public_key() {
561 Some(CallerAddress::EthAddress(
562 pk.to_eth_address().try_into().unwrap(),
563 ))
564 } else {
565 Some(CallerAddress::Address(addresses[0].0)) },
567 tx: tx.clone(),
568 propagate_failures: false,
569 })
570 .await?;
571
572 let mut gas = gas.saturating_add(gas.saturating_mul(20).saturating_div(100));
575
576 if opts.encrypt {
578 let envelope_size_estimate = cbor::to_vec(callformat::CallEnvelopeX25519DeoxysII {
579 epoch: u64::MAX,
580 ..Default::default()
581 })
582 .len()
583 .try_into()
584 .unwrap();
585
586 let params: modules::core::Parameters =
587 client.query(round, "core.Parameters", ()).await?;
588 gas = gas.saturating_add(params.gas_costs.callformat_x25519_deoxysii);
589 gas = gas.saturating_add(
590 params
591 .gas_costs
592 .tx_byte
593 .saturating_mul(envelope_size_estimate),
594 );
595 }
596
597 tx.set_fee_gas(gas);
598 }
599
600 let meta = if opts.encrypt {
602 let runtime_pk = client.call_data_public_key().await?;
604 let client_kp = deoxysii::generate_key_pair();
606 let mut nonce = [0u8; deoxysii::NONCE_SIZE];
607 OsRng.fill(&mut nonce);
608 let call = transaction::Call {
610 format: transaction::CallFormat::EncryptedX25519DeoxysII,
611 method: "".to_string(),
612 body: cbor::to_value(callformat::CallEnvelopeX25519DeoxysII {
613 pk: client_kp.0.into(),
614 nonce,
615 epoch: runtime_pk.epoch,
616 data: deoxysii::box_seal(
617 &nonce,
618 cbor::to_vec(std::mem::take(&mut tx.call)),
619 vec![],
620 &runtime_pk.public_key.key.0,
621 &client_kp.1,
622 )?,
623 }),
624 ..Default::default()
625 };
626 tx.call = call;
627
628 Some((runtime_pk, client_kp))
629 } else {
630 None
631 };
632
633 if tx.fee_amount().amount() == 0 {
635 let mgp = client
636 .gas_price(round, &token::Denomination::NATIVE)
637 .await?;
638 let fee = mgp.saturating_mul(tx.fee_gas().into());
639 tx.set_fee_amount(token::BaseUnits::new(fee, token::Denomination::NATIVE));
640 }
641
642 let mut tx = tx.prepare_for_signing();
644 for signer in signers {
645 tx.append_sign(signer)?;
646 }
647 let tx = tx.finalize();
648 let raw_tx = cbor::to_vec(tx);
649 let tx_hash = Hash::digest_bytes(&raw_tx);
650
651 let submit_tx_task = client.state.host.submit_tx(
653 raw_tx,
654 host::SubmitTxOpts {
655 wait: true,
656 ..Default::default()
657 },
658 );
659 let result = if let Some(timeout) = opts.timeout {
660 tokio::time::timeout(timeout, submit_tx_task).await?
661 } else {
662 submit_tx_task.await
663 };
664 let result = result?.ok_or(anyhow!("missing result"))?;
665
666 if opts.verify {
667 let io_tree = new_mkvs_tree_for_round(
671 client.state.host.clone(),
672 &client.state.consensus_verifier,
673 client.state.host.get_runtime_id(),
674 result.round,
675 mkvs::RootType::IO,
676 )
677 .await?;
678 let verified_result = tokio::task::spawn_blocking(move || -> Result<_> {
681 let key = [b"T", tx_hash.as_ref(), &[0x02]].concat();
682 let output_artifacts = io_tree
683 .get(&key)
684 .context("failed to verify transaction result")?
685 .ok_or(anyhow!("failed to verify transaction result"))?;
686
687 let output_artifacts: (Vec<u8>,) = cbor::from_slice(&output_artifacts)
688 .context("malformed output transaction artifacts")?;
689 Ok(output_artifacts.0)
690 })
691 .await??;
692 if result.output != verified_result {
693 return Err(anyhow!("failed to verify transaction result"));
694 }
695 }
696
697 client
699 .latest_round
700 .fetch_max(result.round, Ordering::SeqCst);
701
702 let result: transaction::CallResult =
704 cbor::from_slice(&result.output).map_err(|_| anyhow!("malformed result"))?;
705 match result {
706 transaction::CallResult::Unknown(raw) => {
707 let meta = meta.ok_or(anyhow!("unknown result but calldata was not encrypted"))?;
708 let envelope: callformat::ResultEnvelopeX25519DeoxysII =
709 cbor::from_value(raw).map_err(|_| anyhow!("malformed encrypted result"))?;
710 let data = deoxysii::box_open(
711 &envelope.nonce,
712 envelope.data,
713 vec![],
714 &meta.0.public_key.key.0,
715 &meta.1 .1,
716 )
717 .map_err(|_| anyhow!("malformed encrypted result"))?;
718
719 cbor::from_slice(&data).map_err(|_| anyhow!("malformed encrypted result"))
720 }
721 _ => Ok(result),
722 }
723 }
724}