oasis_runtime_sdk/modules/rofl/app/
registration.rs

1use std::{sync::Arc, time::Duration};
2
3use anyhow::{anyhow, Result};
4use tokio::sync::mpsc;
5
6use crate::{
7    core::{
8        common::logger::get_logger,
9        consensus::{
10            beacon::EpochTime, state::beacon::ImmutableState as BeaconState, verifier::Verifier,
11        },
12    },
13    modules::rofl::types::{AppInstanceQuery, Register, Registration},
14};
15
16use super::{client::SubmitTxOpts, processor, App, Environment};
17
18/// Registration task.
19pub(super) struct Task<A: App> {
20    imp: Option<Impl<A>>,
21    tx: mpsc::Sender<()>,
22}
23
24impl<A> Task<A>
25where
26    A: App,
27{
28    /// Create a registration task.
29    pub(super) fn new(state: Arc<processor::State<A>>, env: Environment<A>) -> Self {
30        let (tx, rx) = mpsc::channel(1);
31
32        let imp = Impl {
33            state,
34            env,
35            logger: get_logger("modules/rofl/app/registration"),
36            notify: rx,
37            last_registration_epoch: None,
38        };
39
40        Self { imp: Some(imp), tx }
41    }
42
43    /// Start the registration task.
44    pub(super) fn start(&mut self) {
45        if let Some(imp) = self.imp.take() {
46            imp.start();
47        }
48    }
49
50    /// Ask the registration task to refresh the registration.
51    pub(super) fn refresh(&self) {
52        let _ = self.tx.try_send(());
53    }
54}
55
56struct Impl<A: App> {
57    state: Arc<processor::State<A>>,
58    env: Environment<A>,
59    logger: slog::Logger,
60
61    notify: mpsc::Receiver<()>,
62    last_registration_epoch: Option<EpochTime>,
63}
64
65impl<A> Impl<A>
66where
67    A: App,
68{
69    /// Start the registration task.
70    pub(super) fn start(self) {
71        tokio::task::spawn(self.run());
72    }
73
74    /// Run the registration task.
75    async fn run(mut self) {
76        slog::info!(self.logger, "starting registration task");
77
78        // TODO: Handle retries etc.
79        while self.notify.recv().await.is_some() {
80            if let Err(err) = self.refresh_registration().await {
81                slog::error!(self.logger, "failed to refresh registration";
82                    "err" => ?err,
83                );
84            }
85        }
86
87        slog::info!(self.logger, "registration task stopped");
88    }
89
90    /// Perform application registration refresh.
91    async fn refresh_registration(&mut self) -> Result<()> {
92        // Determine current epoch.
93        let state = self.state.consensus_verifier.latest_state().await?;
94        let epoch = tokio::task::spawn_blocking(move || {
95            let beacon = BeaconState::new(&state);
96            beacon.epoch()
97        })
98        .await??;
99
100        // Skip refresh in case epoch has not changed.
101        if self.last_registration_epoch == Some(epoch) {
102            return Ok(());
103        }
104
105        // Query our current registration and see if we need to update it.
106        let round = self.env.client().latest_round().await?;
107        if let Ok(existing) = self
108            .env
109            .client()
110            .query::<_, Registration>(
111                round,
112                "rofl.AppInstance",
113                AppInstanceQuery {
114                    app: A::id(),
115                    rak: self.state.identity.public_rak().into(),
116                },
117            )
118            .await
119        {
120            // Check if we already registered for this epoch by comparing expiration.
121            if existing.expiration >= epoch + 2 {
122                slog::info!(self.logger, "registration already refreshed"; "epoch" => epoch);
123
124                self.last_registration_epoch = Some(epoch);
125                self.env
126                    .send_command(processor::Command::RegistrationRefreshed)
127                    .await?;
128                return Ok(());
129            }
130        }
131
132        slog::info!(self.logger, "refreshing registration";
133            "last_registration_epoch" => self.last_registration_epoch,
134            "epoch" => epoch,
135        );
136
137        let metadata = match self.state.app.clone().get_metadata(self.env.clone()).await {
138            Ok(metadata) => metadata,
139            Err(err) => {
140                slog::error!(self.logger, "failed to get instance metadata"; "err" => ?err);
141                // Do not prevent registration, just clear metadata.
142                Default::default()
143            }
144        };
145
146        // Refresh registration.
147        let ect = self
148            .state
149            .identity
150            .endorsed_capability_tee()
151            .ok_or(anyhow!("endorsed TEE capability not available"))?;
152        let register = Register {
153            app: A::id(),
154            ect,
155            expiration: epoch + 2,
156            extra_keys: vec![self.env.signer().public_key()],
157            metadata,
158        };
159
160        let tx = self.state.app.new_transaction("rofl.Register", register);
161        let result = self
162            .env
163            .client()
164            .multi_sign_and_submit_tx_opts(
165                &[self.state.identity.clone(), self.env.signer()],
166                tx,
167                SubmitTxOpts {
168                    timeout: Some(Duration::from_millis(60_000)),
169                    encrypt: false, // Needed for initial fee payments.
170                    ..Default::default()
171                },
172            )
173            .await?
174            .ok()?;
175
176        slog::info!(self.logger, "refreshed registration"; "result" => ?result);
177
178        if self.last_registration_epoch.is_none() {
179            // If this is the first registration, notify processor that initial registration has
180            // been completed so it can do other stuff.
181            self.env
182                .send_command(processor::Command::InitialRegistrationCompleted)
183                .await?;
184        }
185        self.last_registration_epoch = Some(epoch);
186
187        // Notify about registration refresh.
188        self.env
189            .send_command(processor::Command::RegistrationRefreshed)
190            .await?;
191
192        Ok(())
193    }
194}