use std::{
collections::{BTreeMap, HashSet},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};
use anyhow::{anyhow, Result};
use tokio::sync::{mpsc, oneshot};
use crate::{
core::{
consensus::{
registry::{SGXConstraints, TEEHardware},
state::{
beacon::ImmutableState as BeaconState, registry::ImmutableState as RegistryState,
},
},
enclave_rpc::{client::RpcClient, session},
host::{self, Host as _},
},
crypto::signature::{PublicKey, Signer},
enclave_rpc::{QueryRequest, METHOD_QUERY},
modules::{accounts::types::NonceQuery, core::types::EstimateGasQuery},
state::CurrentState,
storage::HostStore,
types::{
address::{Address, SignatureAddressSpec},
token,
transaction::{self, CallerAddress},
},
};
use super::{processor, App};
const CMDQ_BACKLOG: usize = 16;
const ENCLAVE_RPC_ENDPOINT_RONL: &str = "ronl";
#[derive(Clone, Debug)]
pub struct SubmitTxOpts {
pub timeout: Option<Duration>,
}
impl Default for SubmitTxOpts {
fn default() -> Self {
Self {
timeout: Some(Duration::from_millis(15_000)), }
}
}
pub struct Client<A: App> {
imp: ClientImpl<A>,
submission_mgr: Arc<SubmissionManager<A>>,
}
impl<A> Client<A>
where
A: App,
{
pub(super) fn new(
state: Arc<processor::State<A>>,
cmdq: mpsc::WeakSender<processor::Command>,
) -> Self {
let imp = ClientImpl::new(state, cmdq);
let mut submission_mgr = SubmissionManager::new(imp.clone());
submission_mgr.start();
Self {
imp,
submission_mgr: Arc::new(submission_mgr),
}
}
pub async fn latest_round(&self) -> Result<u64> {
self.imp.latest_round().await
}
pub async fn account_nonce(&self, round: u64, address: Address) -> Result<u64> {
self.imp.account_nonce(round, address).await
}
pub async fn gas_price(&self, round: u64, denom: &token::Denomination) -> Result<u128> {
self.imp.gas_price(round, denom).await
}
pub async fn query<Rq, Rs>(&self, round: u64, method: &str, args: Rq) -> Result<Rs>
where
Rq: cbor::Encode,
Rs: cbor::Decode + Send + 'static,
{
self.imp.query(round, method, args).await
}
pub async fn estimate_gas(&self, req: EstimateGasQuery) -> Result<u64> {
self.imp.estimate_gas(req).await
}
pub async fn multi_sign_and_submit_tx(
&self,
signers: &[Arc<dyn Signer>],
tx: transaction::Transaction,
) -> Result<transaction::CallResult> {
self.multi_sign_and_submit_tx_opts(signers, tx, SubmitTxOpts::default())
.await
}
pub async fn multi_sign_and_submit_tx_opts(
&self,
signers: &[Arc<dyn Signer>],
tx: transaction::Transaction,
opts: SubmitTxOpts,
) -> Result<transaction::CallResult> {
self.submission_mgr
.multi_sign_and_submit_tx(signers, tx, opts)
.await
}
pub async fn sign_and_submit_tx(
&self,
signer: Arc<dyn Signer>,
tx: transaction::Transaction,
) -> Result<transaction::CallResult> {
self.multi_sign_and_submit_tx(&[signer], tx).await
}
pub async fn with_store_for_round<F, R>(&self, round: u64, f: F) -> Result<R>
where
F: FnOnce() -> Result<R> + Send + 'static,
R: Send + 'static,
{
self.imp.with_store_for_round(round, f).await
}
pub async fn store_for_round(&self, round: u64) -> Result<HostStore> {
self.imp.store_for_round(round).await
}
}
impl<A> Clone for Client<A>
where
A: App,
{
fn clone(&self) -> Self {
Self {
imp: self.imp.clone(),
submission_mgr: self.submission_mgr.clone(),
}
}
}
struct ClientImpl<A: App> {
state: Arc<processor::State<A>>,
cmdq: mpsc::WeakSender<processor::Command>,
latest_round: Arc<AtomicU64>,
rpc: Arc<RpcClient>,
}
impl<A> ClientImpl<A>
where
A: App,
{
fn new(state: Arc<processor::State<A>>, cmdq: mpsc::WeakSender<processor::Command>) -> Self {
Self {
cmdq,
latest_round: Arc::new(AtomicU64::new(0)),
rpc: Arc::new(RpcClient::new_runtime(
state.host.clone(),
ENCLAVE_RPC_ENDPOINT_RONL,
session::Builder::default()
.use_endorsement(true)
.quote_policy(None) .local_identity(state.identity.clone())
.remote_enclaves(Some(HashSet::new())), 2, 1, 1, )),
state,
}
}
async fn latest_round(&self) -> Result<u64> {
let cmdq = self
.cmdq
.upgrade()
.ok_or(anyhow!("processor has shut down"))?;
let (tx, rx) = oneshot::channel();
cmdq.send(processor::Command::GetLatestRound(tx)).await?;
let round = rx.await?;
Ok(self
.latest_round
.fetch_max(round, Ordering::SeqCst)
.max(round))
}
async fn account_nonce(&self, round: u64, address: Address) -> Result<u64> {
self.query(round, "accounts.Nonce", NonceQuery { address })
.await
}
async fn gas_price(&self, round: u64, denom: &token::Denomination) -> Result<u128> {
let mgp: BTreeMap<token::Denomination, u128> =
self.query(round, "core.MinGasPrice", ()).await?;
mgp.get(denom)
.ok_or(anyhow!("denomination not supported"))
.copied()
}
async fn query<Rq, Rs>(&self, round: u64, method: &str, args: Rq) -> Result<Rs>
where
Rq: cbor::Encode,
Rs: cbor::Decode + Send + 'static,
{
let state = self.state.consensus_verifier.latest_state().await?;
let runtime_id = self.state.host.get_runtime_id();
let tee = tokio::task::spawn_blocking(move || -> Result<_> {
let beacon = BeaconState::new(&state);
let epoch = beacon.epoch()?;
let registry = RegistryState::new(&state);
let runtime = registry
.runtime(&runtime_id)?
.ok_or(anyhow!("runtime not available"))?;
let ad = runtime
.active_deployment(epoch)
.ok_or(anyhow!("active runtime deployment not available"))?;
match runtime.tee_hardware {
TEEHardware::TEEHardwareIntelSGX => Ok(ad.try_decode_tee::<SGXConstraints>()?),
_ => Err(anyhow!("unsupported TEE platform")),
}
})
.await??;
let enclaves = HashSet::from_iter(tee.enclaves().clone());
let quote_policy = tee.policy();
self.rpc.update_enclaves(Some(enclaves)).await;
self.rpc.update_quote_policy(quote_policy).await;
let response: Vec<u8> = self
.rpc
.secure_call(
METHOD_QUERY,
QueryRequest {
round,
method: method.to_string(),
args: cbor::to_vec(args),
},
vec![],
)
.await
.into_result()?;
Ok(cbor::from_slice(&response)?)
}
async fn estimate_gas(&self, req: EstimateGasQuery) -> Result<u64> {
let round = self.latest_round().await?;
self.query(round, "core.EstimateGas", req).await
}
async fn with_store_for_round<F, R>(&self, round: u64, f: F) -> Result<R>
where
F: FnOnce() -> Result<R> + Send + 'static,
R: Send + 'static,
{
let store = self.store_for_round(round).await?;
tokio::task::spawn_blocking(move || CurrentState::enter(store, f)).await?
}
async fn store_for_round(&self, round: u64) -> Result<HostStore> {
HostStore::new_for_round(
self.state.host.clone(),
&self.state.consensus_verifier,
self.state.host.get_runtime_id(),
round,
)
.await
}
}
impl<A> Clone for ClientImpl<A>
where
A: App,
{
fn clone(&self) -> Self {
Self {
state: self.state.clone(),
cmdq: self.cmdq.clone(),
latest_round: self.latest_round.clone(),
rpc: self.rpc.clone(),
}
}
}
enum Cmd {
SubmitTx(
Vec<Arc<dyn Signer>>,
transaction::Transaction,
SubmitTxOpts,
oneshot::Sender<Result<transaction::CallResult>>,
),
}
struct SubmissionManager<A: App> {
imp: Option<SubmissionManagerImpl<A>>,
cmdq_tx: mpsc::Sender<Cmd>,
}
impl<A> SubmissionManager<A>
where
A: App,
{
fn new(client: ClientImpl<A>) -> Self {
let (tx, rx) = mpsc::channel(CMDQ_BACKLOG);
Self {
imp: Some(SubmissionManagerImpl {
client,
cmdq_rx: rx,
}),
cmdq_tx: tx,
}
}
fn start(&mut self) {
if let Some(imp) = self.imp.take() {
imp.start();
}
}
async fn multi_sign_and_submit_tx(
&self,
signers: &[Arc<dyn Signer>],
tx: transaction::Transaction,
opts: SubmitTxOpts,
) -> Result<transaction::CallResult> {
let (ch, rx) = oneshot::channel();
self.cmdq_tx
.send(Cmd::SubmitTx(signers.to_vec(), tx, opts, ch))
.await?;
rx.await?
}
}
struct SubmissionManagerImpl<A: App> {
client: ClientImpl<A>,
cmdq_rx: mpsc::Receiver<Cmd>,
}
impl<A> SubmissionManagerImpl<A>
where
A: App,
{
fn start(self) {
tokio::task::spawn(self.run());
}
async fn run(mut self) {
let (notify_tx, mut notify_rx) = mpsc::channel::<HashSet<PublicKey>>(CMDQ_BACKLOG);
let mut queue: Vec<Cmd> = Vec::new();
let mut pending: HashSet<PublicKey> = HashSet::new();
loop {
tokio::select! {
Some(cmd) = self.cmdq_rx.recv() => queue.push(cmd),
Some(signers) = notify_rx.recv() => {
for pk in signers {
pending.remove(&pk);
}
},
else => break,
}
let mut new_queue = Vec::with_capacity(queue.len());
for cmd in queue {
match cmd {
Cmd::SubmitTx(signers, tx, opts, ch) => {
let signer_set =
HashSet::from_iter(signers.iter().map(|signer| signer.public_key()));
if !signer_set.is_disjoint(&pending) {
new_queue.push(Cmd::SubmitTx(signers, tx, opts, ch));
continue;
}
pending.extend(signer_set.iter().cloned());
let client = self.client.clone();
let notify_tx = notify_tx.clone();
tokio::spawn(async move {
let result =
Self::multi_sign_and_submit_tx(client, &signers, tx, opts).await;
let _ = ch.send(result);
let _ = notify_tx.send(signer_set).await;
});
}
}
}
queue = new_queue;
}
}
async fn multi_sign_and_submit_tx(
client: ClientImpl<A>,
signers: &[Arc<dyn Signer>],
mut tx: transaction::Transaction,
opts: SubmitTxOpts,
) -> Result<transaction::CallResult> {
if signers.is_empty() {
return Err(anyhow!("no signers specified"));
}
let addresses = signers
.iter()
.map(|signer| -> Result<_> {
let sigspec = SignatureAddressSpec::try_from_pk(&signer.public_key())
.ok_or(anyhow!("signature scheme not supported"))?;
Ok((Address::from_sigspec(&sigspec), sigspec))
})
.collect::<Result<Vec<_>>>()?;
let round = client.latest_round().await?;
for (address, sigspec) in &addresses {
let nonce = client.account_nonce(round, *address).await?;
tx.append_auth_signature(sigspec.clone(), nonce);
}
if tx.fee_gas() == 0 {
let signer = &signers[0]; let gas = client
.estimate_gas(EstimateGasQuery {
caller: if let PublicKey::Secp256k1(pk) = signer.public_key() {
Some(CallerAddress::EthAddress(
pk.to_eth_address().try_into().unwrap(),
))
} else {
Some(CallerAddress::Address(addresses[0].0)) },
tx: tx.clone(),
propagate_failures: false,
})
.await?;
let gas = gas.saturating_add(gas.saturating_mul(20).saturating_div(100));
tx.set_fee_gas(gas);
}
let mgp = client
.gas_price(round, &token::Denomination::NATIVE)
.await?;
let fee = mgp.saturating_mul(tx.fee_gas().into());
tx.set_fee_amount(token::BaseUnits::new(fee, token::Denomination::NATIVE));
let mut tx = tx.prepare_for_signing();
for signer in signers {
tx.append_sign(signer)?;
}
let tx = tx.finalize();
let submit_tx_task = client.state.host.submit_tx(
cbor::to_vec(tx),
host::SubmitTxOpts {
wait: true,
..Default::default()
},
);
let result = if let Some(timeout) = opts.timeout {
tokio::time::timeout(timeout, submit_tx_task).await?
} else {
submit_tx_task.await
};
let result = result?.ok_or(anyhow!("missing result"))?;
client
.latest_round
.fetch_max(result.round, Ordering::SeqCst);
cbor::from_slice(&result.output).map_err(|_| anyhow!("malformed result"))
}
}