oasis_runtime_sdk/modules/rofl/app/
client.rs

1use std::{
2    collections::{BTreeMap, HashSet},
3    sync::{
4        atomic::{AtomicU64, Ordering},
5        Arc,
6    },
7    time::Duration,
8};
9
10use anyhow::{anyhow, Context as _, Result};
11use rand::{rngs::OsRng, Rng};
12use tokio::sync::{mpsc, oneshot};
13
14use crate::{
15    core::{
16        common::crypto::{hash::Hash, mrae::deoxysii},
17        consensus::{
18            registry::{SGXConstraints, TEEHardware},
19            state::{
20                beacon::ImmutableState as BeaconState, registry::ImmutableState as RegistryState,
21            },
22        },
23        enclave_rpc::{client::RpcClient, session},
24        host::{self, Host as _},
25        storage::mkvs,
26    },
27    crypto::signature::{PublicKey, Signer},
28    enclave_rpc::{QueryRequest, METHOD_QUERY},
29    modules::{
30        self,
31        accounts::types::NonceQuery,
32        core::types::{CallDataPublicKeyQueryResponse, EstimateGasQuery},
33    },
34    state::CurrentState,
35    storage::{host::new_mkvs_tree_for_round, HostStore},
36    types::{
37        address::{Address, SignatureAddressSpec},
38        callformat, token,
39        transaction::{self, CallerAddress},
40    },
41};
42
43use super::{processor, App};
44
45/// Size of various command queues.
46const CMDQ_BACKLOG: usize = 16;
47
48/// EnclaveRPC endpoint for communicating with the RONL component.
49const ENCLAVE_RPC_ENDPOINT_RONL: &str = "ronl";
50
51/// Transaction submission options.
52#[derive(Clone, Debug)]
53pub struct SubmitTxOpts {
54    /// Optional timeout when submitting a transaction. Setting this to `None` means that the host
55    /// node timeout will be used.
56    pub timeout: Option<Duration>,
57    /// Whether the call data should be encrypted (true by default).
58    pub encrypt: bool,
59    /// Whether to verify the transaction result (true by default).
60    pub verify: bool,
61}
62
63impl Default for SubmitTxOpts {
64    fn default() -> Self {
65        Self {
66            timeout: Some(Duration::from_millis(15_000)), // 15 seconds.
67            encrypt: true,
68            verify: true,
69        }
70    }
71}
72
73/// App-specific key derivation request.
74#[derive(Clone, Debug, Default)]
75pub struct DeriveKeyRequest {
76    /// Key kind.
77    pub kind: modules::rofl::types::KeyKind,
78    /// Key scope.
79    pub scope: modules::rofl::types::KeyScope,
80    /// Key generation.
81    pub generation: u64,
82    /// Key identifier.
83    pub key_id: Vec<u8>,
84}
85
86/// A runtime client meant for use within runtimes.
87pub struct Client<A: App> {
88    state: Arc<processor::State<A>>,
89    imp: ClientImpl<A>,
90    submission_mgr: Arc<SubmissionManager<A>>,
91}
92
93impl<A> Client<A>
94where
95    A: App,
96{
97    /// Create a new runtime client.
98    pub(super) fn new(
99        state: Arc<processor::State<A>>,
100        cmdq: mpsc::WeakSender<processor::Command>,
101    ) -> Self {
102        let imp = ClientImpl::new(state.clone(), cmdq);
103        let mut submission_mgr = SubmissionManager::new(imp.clone());
104        submission_mgr.start();
105
106        Self {
107            state,
108            imp,
109            submission_mgr: Arc::new(submission_mgr),
110        }
111    }
112
113    /// Retrieve the latest known runtime round.
114    pub async fn latest_round(&self) -> Result<u64> {
115        self.imp.latest_round().await
116    }
117
118    /// Retrieve the nonce for the given account.
119    pub async fn account_nonce(&self, round: u64, address: Address) -> Result<u64> {
120        self.imp.account_nonce(round, address).await
121    }
122
123    /// Retrieve the gas price in the given denomination.
124    pub async fn gas_price(&self, round: u64, denom: &token::Denomination) -> Result<u128> {
125        self.imp.gas_price(round, denom).await
126    }
127
128    /// Securely query the on-chain runtime component.
129    pub async fn query<Rq, Rs>(&self, round: u64, method: &str, args: Rq) -> Result<Rs>
130    where
131        Rq: cbor::Encode,
132        Rs: cbor::Decode + Send + 'static,
133    {
134        self.imp.query(round, method, args).await
135    }
136
137    /// Securely perform gas estimation.
138    pub async fn estimate_gas(&self, req: EstimateGasQuery) -> Result<u64> {
139        self.imp.estimate_gas(req).await
140    }
141
142    /// Retrieves application configuration.
143    pub async fn app_cfg(&self) -> Result<modules::rofl::types::AppConfig> {
144        self.imp.app_cfg().await
145    }
146
147    /// Sign a given transaction, submit it and wait for block inclusion.
148    ///
149    /// This method supports multiple transaction signers.
150    pub async fn multi_sign_and_submit_tx(
151        &self,
152        signers: &[Arc<dyn Signer>],
153        tx: transaction::Transaction,
154    ) -> Result<transaction::CallResult> {
155        self.multi_sign_and_submit_tx_opts(signers, tx, SubmitTxOpts::default())
156            .await
157    }
158
159    /// Sign a given transaction, submit it and wait for block inclusion.
160    ///
161    /// This method supports multiple transaction signers.
162    pub async fn multi_sign_and_submit_tx_opts(
163        &self,
164        signers: &[Arc<dyn Signer>],
165        tx: transaction::Transaction,
166        opts: SubmitTxOpts,
167    ) -> Result<transaction::CallResult> {
168        self.submission_mgr
169            .multi_sign_and_submit_tx(signers, tx, opts)
170            .await
171    }
172
173    /// Sign a given transaction, submit it and wait for block inclusion.
174    pub async fn sign_and_submit_tx(
175        &self,
176        signer: Arc<dyn Signer>,
177        tx: transaction::Transaction,
178    ) -> Result<transaction::CallResult> {
179        self.multi_sign_and_submit_tx(&[signer], tx).await
180    }
181
182    /// Run a closure inside a `CurrentState` context with store for the given round.
183    pub async fn with_store_for_round<F, R>(&self, round: u64, f: F) -> Result<R>
184    where
185        F: FnOnce() -> Result<R> + Send + 'static,
186        R: Send + 'static,
187    {
188        self.imp.with_store_for_round(round, f).await
189    }
190
191    /// Return a store corresponding to the given round.
192    pub async fn store_for_round(&self, round: u64) -> Result<HostStore> {
193        self.imp.store_for_round(round).await
194    }
195
196    /// Derive an application-specific key.
197    pub async fn derive_key(
198        &self,
199        signer: Arc<dyn Signer>,
200        request: DeriveKeyRequest,
201    ) -> Result<modules::rofl::types::DeriveKeyResponse> {
202        let tx = self.state.app.new_transaction(
203            "rofl.DeriveKey",
204            modules::rofl::types::DeriveKey {
205                app: A::id(),
206                kind: request.kind,
207                scope: request.scope,
208                generation: request.generation,
209                key_id: request.key_id,
210            },
211        );
212        let response = self.sign_and_submit_tx(signer, tx).await?;
213        Ok(cbor::from_value(response.ok()?)?)
214    }
215}
216
217impl<A> Clone for Client<A>
218where
219    A: App,
220{
221    fn clone(&self) -> Self {
222        Self {
223            state: self.state.clone(),
224            imp: self.imp.clone(),
225            submission_mgr: self.submission_mgr.clone(),
226        }
227    }
228}
229
230struct ClientImpl<A: App> {
231    state: Arc<processor::State<A>>,
232    cmdq: mpsc::WeakSender<processor::Command>,
233    latest_round: Arc<AtomicU64>,
234    rpc: Arc<RpcClient>,
235}
236
237impl<A> ClientImpl<A>
238where
239    A: App,
240{
241    fn new(state: Arc<processor::State<A>>, cmdq: mpsc::WeakSender<processor::Command>) -> Self {
242        Self {
243            cmdq,
244            latest_round: Arc::new(AtomicU64::new(0)),
245            rpc: Arc::new(RpcClient::new_runtime(
246                state.host.clone(),
247                ENCLAVE_RPC_ENDPOINT_RONL,
248                session::Builder::default()
249                    .use_endorsement(true)
250                    .quote_policy(None) // Forbid all until configured.
251                    .local_identity(state.identity.clone())
252                    .remote_enclaves(Some(HashSet::new())), // Forbid all until configured.
253                2, // Maximum number of sessions (one extra for reserve).
254                1, // Maximum number of sessions per peer (we only communicate with RONL).
255                1, // Stale session timeout.
256            )),
257            state,
258        }
259    }
260
261    /// Retrieve the latest known runtime round.
262    async fn latest_round(&self) -> Result<u64> {
263        let cmdq = self
264            .cmdq
265            .upgrade()
266            .ok_or(anyhow!("processor has shut down"))?;
267        let (tx, rx) = oneshot::channel();
268        cmdq.send(processor::Command::GetLatestRound(tx)).await?;
269        let round = rx.await?;
270        Ok(self
271            .latest_round
272            .fetch_max(round, Ordering::SeqCst)
273            .max(round))
274    }
275
276    /// Retrieve the nonce for the given account.
277    async fn account_nonce(&self, round: u64, address: Address) -> Result<u64> {
278        self.query(round, "accounts.Nonce", NonceQuery { address })
279            .await
280    }
281
282    /// Retrieve the gas price in the given denomination.
283    async fn gas_price(&self, round: u64, denom: &token::Denomination) -> Result<u128> {
284        let mgp: BTreeMap<token::Denomination, u128> =
285            self.query(round, "core.MinGasPrice", ()).await?;
286        mgp.get(denom)
287            .ok_or(anyhow!("denomination not supported"))
288            .copied()
289    }
290
291    /// Retrieve the calldata encryption public key.
292    async fn call_data_public_key(&self) -> Result<CallDataPublicKeyQueryResponse> {
293        let round = self.latest_round().await?;
294        self.query(round, "core.CallDataPublicKey", ()).await
295    }
296
297    /// Securely query the on-chain runtime component.
298    async fn query<Rq, Rs>(&self, round: u64, method: &str, args: Rq) -> Result<Rs>
299    where
300        Rq: cbor::Encode,
301        Rs: cbor::Decode + Send + 'static,
302    {
303        // TODO: Consider using PolicyVerifier when it has the needed methods (and is async).
304        let state = self.state.consensus_verifier.latest_state().await?;
305        let runtime_id = self.state.host.get_runtime_id();
306        let tee = tokio::task::spawn_blocking(move || -> Result<_> {
307            let beacon = BeaconState::new(&state);
308            let epoch = beacon.epoch()?;
309            let registry = RegistryState::new(&state);
310            let runtime = registry
311                .runtime(&runtime_id)?
312                .ok_or(anyhow!("runtime not available"))?;
313            let ad = runtime
314                .active_deployment(epoch)
315                .ok_or(anyhow!("active runtime deployment not available"))?;
316
317            match runtime.tee_hardware {
318                TEEHardware::TEEHardwareIntelSGX => Ok(ad.try_decode_tee::<SGXConstraints>()?),
319                _ => Err(anyhow!("unsupported TEE platform")),
320            }
321        })
322        .await??;
323
324        let enclaves = HashSet::from_iter(tee.enclaves().clone());
325        let quote_policy = tee.policy();
326        self.rpc.update_enclaves(Some(enclaves)).await;
327        self.rpc.update_quote_policy(quote_policy).await;
328
329        let response: Vec<u8> = self
330            .rpc
331            .secure_call(
332                METHOD_QUERY,
333                QueryRequest {
334                    round,
335                    method: method.to_string(),
336                    args: cbor::to_vec(args),
337                },
338                vec![],
339            )
340            .await
341            .into_result()?;
342
343        Ok(cbor::from_slice(&response)?)
344    }
345
346    /// Securely perform gas estimation.
347    async fn estimate_gas(&self, req: EstimateGasQuery) -> Result<u64> {
348        let round = self.latest_round().await?;
349        self.query(round, "core.EstimateGas", req).await
350    }
351
352    /// Retrieves application configuration.
353    async fn app_cfg(&self) -> Result<modules::rofl::types::AppConfig> {
354        let round = self.latest_round().await?;
355        self.query(
356            round,
357            "rofl.App",
358            modules::rofl::types::AppQuery { id: A::id() },
359        )
360        .await
361    }
362
363    /// Run a closure inside a `CurrentState` context with store for the given round.
364    async fn with_store_for_round<F, R>(&self, round: u64, f: F) -> Result<R>
365    where
366        F: FnOnce() -> Result<R> + Send + 'static,
367        R: Send + 'static,
368    {
369        let store = self.store_for_round(round).await?;
370
371        tokio::task::spawn_blocking(move || CurrentState::enter(store, f)).await?
372    }
373
374    /// Return a store corresponding to the given round.
375    async fn store_for_round(&self, round: u64) -> Result<HostStore> {
376        HostStore::new_for_round(
377            self.state.host.clone(),
378            &self.state.consensus_verifier,
379            self.state.host.get_runtime_id(),
380            round,
381        )
382        .await
383    }
384}
385
386impl<A> Clone for ClientImpl<A>
387where
388    A: App,
389{
390    fn clone(&self) -> Self {
391        Self {
392            state: self.state.clone(),
393            cmdq: self.cmdq.clone(),
394            latest_round: self.latest_round.clone(),
395            rpc: self.rpc.clone(),
396        }
397    }
398}
399
400enum Cmd {
401    SubmitTx(
402        Vec<Arc<dyn Signer>>,
403        transaction::Transaction,
404        SubmitTxOpts,
405        oneshot::Sender<Result<transaction::CallResult>>,
406    ),
407}
408
409/// Transaction submission manager for avoiding nonce conflicts.
410struct SubmissionManager<A: App> {
411    imp: Option<SubmissionManagerImpl<A>>,
412    cmdq_tx: mpsc::Sender<Cmd>,
413}
414
415impl<A> SubmissionManager<A>
416where
417    A: App,
418{
419    /// Create a new submission manager.
420    fn new(client: ClientImpl<A>) -> Self {
421        let (tx, rx) = mpsc::channel(CMDQ_BACKLOG);
422
423        Self {
424            imp: Some(SubmissionManagerImpl {
425                client,
426                cmdq_rx: rx,
427            }),
428            cmdq_tx: tx,
429        }
430    }
431
432    /// Start the submission manager task.
433    fn start(&mut self) {
434        if let Some(imp) = self.imp.take() {
435            imp.start();
436        }
437    }
438
439    /// Sign a given transaction, submit it and wait for block inclusion.
440    async fn multi_sign_and_submit_tx(
441        &self,
442        signers: &[Arc<dyn Signer>],
443        tx: transaction::Transaction,
444        opts: SubmitTxOpts,
445    ) -> Result<transaction::CallResult> {
446        let (ch, rx) = oneshot::channel();
447        self.cmdq_tx
448            .send(Cmd::SubmitTx(signers.to_vec(), tx, opts, ch))
449            .await?;
450        rx.await?
451    }
452}
453
454struct SubmissionManagerImpl<A: App> {
455    client: ClientImpl<A>,
456    cmdq_rx: mpsc::Receiver<Cmd>,
457}
458
459impl<A> SubmissionManagerImpl<A>
460where
461    A: App,
462{
463    /// Start the submission manager task.
464    fn start(self) {
465        tokio::task::spawn(self.run());
466    }
467
468    /// Run the submission manager task.
469    async fn run(mut self) {
470        let (notify_tx, mut notify_rx) = mpsc::channel::<HashSet<PublicKey>>(CMDQ_BACKLOG);
471        let mut queue: Vec<Cmd> = Vec::new();
472        let mut pending: HashSet<PublicKey> = HashSet::new();
473
474        loop {
475            tokio::select! {
476                // Process incoming commands.
477                Some(cmd) = self.cmdq_rx.recv() => queue.push(cmd),
478
479                // Process incoming completion notifications.
480                Some(signers) = notify_rx.recv() => {
481                    for pk in signers {
482                        pending.remove(&pk);
483                    }
484                },
485
486                else => break,
487            }
488
489            // Check if there is anything in the queue that can be executed without conflicts.
490            let mut new_queue = Vec::with_capacity(queue.len());
491            for cmd in queue {
492                match cmd {
493                    Cmd::SubmitTx(signers, tx, opts, ch) => {
494                        // Check if transaction can be executed (no conflicts with in-flight txs).
495                        let signer_set =
496                            HashSet::from_iter(signers.iter().map(|signer| signer.public_key()));
497                        if !signer_set.is_disjoint(&pending) {
498                            // Defer any non-executable commands.
499                            new_queue.push(Cmd::SubmitTx(signers, tx, opts, ch));
500                            continue;
501                        }
502                        // Include all signers in the pending set.
503                        pending.extend(signer_set.iter().cloned());
504
505                        // Execute in a separate task.
506                        let client = self.client.clone();
507                        let notify_tx = notify_tx.clone();
508
509                        tokio::spawn(async move {
510                            let result =
511                                Self::multi_sign_and_submit_tx(client, &signers, tx, opts).await;
512                            let _ = ch.send(result);
513
514                            // Notify the submission manager task that submission is done.
515                            let _ = notify_tx.send(signer_set).await;
516                        });
517                    }
518                }
519            }
520            queue = new_queue;
521        }
522    }
523
524    /// Sign a given transaction, submit it and wait for block inclusion.
525    async fn multi_sign_and_submit_tx(
526        client: ClientImpl<A>,
527        signers: &[Arc<dyn Signer>],
528        mut tx: transaction::Transaction,
529        opts: SubmitTxOpts,
530    ) -> Result<transaction::CallResult> {
531        if signers.is_empty() {
532            return Err(anyhow!("no signers specified"));
533        }
534
535        // Resolve signer addresses.
536        let addresses = signers
537            .iter()
538            .map(|signer| -> Result<_> {
539                let sigspec = SignatureAddressSpec::try_from_pk(&signer.public_key())
540                    .ok_or(anyhow!("signature scheme not supported"))?;
541                Ok((Address::from_sigspec(&sigspec), sigspec))
542            })
543            .collect::<Result<Vec<_>>>()?;
544
545        let round = client.latest_round().await?;
546
547        // Resolve account nonces.
548        for (address, sigspec) in &addresses {
549            let nonce = client.account_nonce(round, *address).await?;
550
551            tx.append_auth_signature(sigspec.clone(), nonce);
552        }
553
554        // Perform gas estimation after all signer infos have been added as otherwise we may
555        // underestimate the amount of gas needed.
556        if tx.fee_gas() == 0 {
557            let signer = &signers[0]; // Checked to have at least one signer above.
558            let gas = client
559                .estimate_gas(EstimateGasQuery {
560                    caller: if let PublicKey::Secp256k1(pk) = signer.public_key() {
561                        Some(CallerAddress::EthAddress(
562                            pk.to_eth_address().try_into().unwrap(),
563                        ))
564                    } else {
565                        Some(CallerAddress::Address(addresses[0].0)) // Checked above.
566                    },
567                    tx: tx.clone(),
568                    propagate_failures: false,
569                })
570                .await?;
571
572            // The estimate may be off due to current limitations in confidential gas estimation.
573            // Inflate the estimated gas by 20%.
574            let mut gas = gas.saturating_add(gas.saturating_mul(20).saturating_div(100));
575
576            // When encrypting transactions, also add the cost of calldata encryption.
577            if opts.encrypt {
578                let envelope_size_estimate = cbor::to_vec(callformat::CallEnvelopeX25519DeoxysII {
579                    epoch: u64::MAX,
580                    ..Default::default()
581                })
582                .len()
583                .try_into()
584                .unwrap();
585
586                let params: modules::core::Parameters =
587                    client.query(round, "core.Parameters", ()).await?;
588                gas = gas.saturating_add(params.gas_costs.callformat_x25519_deoxysii);
589                gas = gas.saturating_add(
590                    params
591                        .gas_costs
592                        .tx_byte
593                        .saturating_mul(envelope_size_estimate),
594                );
595            }
596
597            tx.set_fee_gas(gas);
598        }
599
600        // Optionally perform calldata encryption.
601        let meta = if opts.encrypt {
602            // Obtain runtime's current ephemeral public key.
603            let runtime_pk = client.call_data_public_key().await?;
604            // Generate local key pair and nonce.
605            let client_kp = deoxysii::generate_key_pair();
606            let mut nonce = [0u8; deoxysii::NONCE_SIZE];
607            OsRng.fill(&mut nonce);
608            // Encrypt and encode call.
609            let call = transaction::Call {
610                format: transaction::CallFormat::EncryptedX25519DeoxysII,
611                method: "".to_string(),
612                body: cbor::to_value(callformat::CallEnvelopeX25519DeoxysII {
613                    pk: client_kp.0.into(),
614                    nonce,
615                    epoch: runtime_pk.epoch,
616                    data: deoxysii::box_seal(
617                        &nonce,
618                        cbor::to_vec(std::mem::take(&mut tx.call)),
619                        vec![],
620                        &runtime_pk.public_key.key.0,
621                        &client_kp.1,
622                    )?,
623                }),
624                ..Default::default()
625            };
626            tx.call = call;
627
628            Some((runtime_pk, client_kp))
629        } else {
630            None
631        };
632
633        // Determine gas price. Currently we always use the native denomination.
634        if tx.fee_amount().amount() == 0 {
635            let mgp = client
636                .gas_price(round, &token::Denomination::NATIVE)
637                .await?;
638            let fee = mgp.saturating_mul(tx.fee_gas().into());
639            tx.set_fee_amount(token::BaseUnits::new(fee, token::Denomination::NATIVE));
640        }
641
642        // Sign the transaction.
643        let mut tx = tx.prepare_for_signing();
644        for signer in signers {
645            tx.append_sign(signer)?;
646        }
647        let tx = tx.finalize();
648        let raw_tx = cbor::to_vec(tx);
649        let tx_hash = Hash::digest_bytes(&raw_tx);
650
651        // Submit the transaction.
652        let submit_tx_task = client.state.host.submit_tx(
653            raw_tx,
654            host::SubmitTxOpts {
655                wait: true,
656                ..Default::default()
657            },
658        );
659        let result = if let Some(timeout) = opts.timeout {
660            tokio::time::timeout(timeout, submit_tx_task).await?
661        } else {
662            submit_tx_task.await
663        };
664        let result = result?.ok_or(anyhow!("missing result"))?;
665
666        if opts.verify {
667            // TODO: Ensure consensus verifier is up to date.
668
669            // Verify transaction inclusion and result.
670            let io_tree = new_mkvs_tree_for_round(
671                client.state.host.clone(),
672                &client.state.consensus_verifier,
673                client.state.host.get_runtime_id(),
674                result.round,
675                mkvs::RootType::IO,
676            )
677            .await?;
678            // TODO: Add transaction accessors in transaction:Tree in Oasis Core.
679            // TODO: Remove spawn once we have async MKVS.
680            let verified_result = tokio::task::spawn_blocking(move || -> Result<_> {
681                let key = [b"T", tx_hash.as_ref(), &[0x02]].concat();
682                let output_artifacts = io_tree
683                    .get(&key)
684                    .context("failed to verify transaction result")?
685                    .ok_or(anyhow!("failed to verify transaction result"))?;
686
687                let output_artifacts: (Vec<u8>,) = cbor::from_slice(&output_artifacts)
688                    .context("malformed output transaction artifacts")?;
689                Ok(output_artifacts.0)
690            })
691            .await??;
692            if result.output != verified_result {
693                return Err(anyhow!("failed to verify transaction result"));
694            }
695        }
696
697        // Update latest known round.
698        client
699            .latest_round
700            .fetch_max(result.round, Ordering::SeqCst);
701
702        // Decrypt result if it is encrypted.
703        let result: transaction::CallResult =
704            cbor::from_slice(&result.output).map_err(|_| anyhow!("malformed result"))?;
705        match result {
706            transaction::CallResult::Unknown(raw) => {
707                let meta = meta.ok_or(anyhow!("unknown result but calldata was not encrypted"))?;
708                let envelope: callformat::ResultEnvelopeX25519DeoxysII =
709                    cbor::from_value(raw).map_err(|_| anyhow!("malformed encrypted result"))?;
710                let data = deoxysii::box_open(
711                    &envelope.nonce,
712                    envelope.data,
713                    vec![],
714                    &meta.0.public_key.key.0,
715                    &meta.1 .1,
716                )
717                .map_err(|_| anyhow!("malformed encrypted result"))?;
718
719                cbor::from_slice(&data).map_err(|_| anyhow!("malformed encrypted result"))
720            }
721            _ => Ok(result),
722        }
723    }
724}