oasis_runtime_sdk/modules/rofl/app/
registration.rs1use std::{sync::Arc, time::Duration};
2
3use anyhow::{anyhow, Result};
4use tokio::sync::mpsc;
5
6use crate::{
7 core::{
8 common::logger::get_logger,
9 consensus::{
10 beacon::EpochTime, state::beacon::ImmutableState as BeaconState, verifier::Verifier,
11 },
12 },
13 modules::rofl::types::{AppInstanceQuery, Register, Registration},
14};
15
16use super::{client::SubmitTxOpts, processor, App, Environment};
17
18pub(super) struct Task<A: App> {
20 imp: Option<Impl<A>>,
21 tx: mpsc::Sender<()>,
22}
23
24impl<A> Task<A>
25where
26 A: App,
27{
28 pub(super) fn new(state: Arc<processor::State<A>>, env: Environment<A>) -> Self {
30 let (tx, rx) = mpsc::channel(1);
31
32 let imp = Impl {
33 state,
34 env,
35 logger: get_logger("modules/rofl/app/registration"),
36 notify: rx,
37 last_registration_epoch: None,
38 };
39
40 Self { imp: Some(imp), tx }
41 }
42
43 pub(super) fn start(&mut self) {
45 if let Some(imp) = self.imp.take() {
46 imp.start();
47 }
48 }
49
50 pub(super) fn refresh(&self) {
52 let _ = self.tx.try_send(());
53 }
54}
55
56struct Impl<A: App> {
57 state: Arc<processor::State<A>>,
58 env: Environment<A>,
59 logger: slog::Logger,
60
61 notify: mpsc::Receiver<()>,
62 last_registration_epoch: Option<EpochTime>,
63}
64
65impl<A> Impl<A>
66where
67 A: App,
68{
69 pub(super) fn start(self) {
71 tokio::task::spawn(self.run());
72 }
73
74 async fn run(mut self) {
76 slog::info!(self.logger, "starting registration task");
77
78 while self.notify.recv().await.is_some() {
80 if let Err(err) = self.refresh_registration().await {
81 slog::error!(self.logger, "failed to refresh registration";
82 "err" => ?err,
83 );
84 }
85 }
86
87 slog::info!(self.logger, "registration task stopped");
88 }
89
90 async fn refresh_registration(&mut self) -> Result<()> {
92 let state = self.state.consensus_verifier.latest_state().await?;
94 let epoch = tokio::task::spawn_blocking(move || {
95 let beacon = BeaconState::new(&state);
96 beacon.epoch()
97 })
98 .await??;
99
100 if self.last_registration_epoch == Some(epoch) {
102 return Ok(());
103 }
104
105 let round = self.env.client().latest_round().await?;
107 if let Ok(existing) = self
108 .env
109 .client()
110 .query::<_, Registration>(
111 round,
112 "rofl.AppInstance",
113 AppInstanceQuery {
114 app: A::id(),
115 rak: self.state.identity.public_rak().into(),
116 },
117 )
118 .await
119 {
120 if existing.expiration >= epoch + 2 {
122 slog::info!(self.logger, "registration already refreshed"; "epoch" => epoch);
123
124 self.last_registration_epoch = Some(epoch);
125 self.env
126 .send_command(processor::Command::RegistrationRefreshed)
127 .await?;
128 return Ok(());
129 }
130 }
131
132 slog::info!(self.logger, "refreshing registration";
133 "last_registration_epoch" => self.last_registration_epoch,
134 "epoch" => epoch,
135 );
136
137 let metadata = match self.state.app.clone().get_metadata(self.env.clone()).await {
138 Ok(metadata) => metadata,
139 Err(err) => {
140 slog::error!(self.logger, "failed to get instance metadata"; "err" => ?err);
141 Default::default()
143 }
144 };
145
146 let ect = self
148 .state
149 .identity
150 .endorsed_capability_tee()
151 .ok_or(anyhow!("endorsed TEE capability not available"))?;
152 let register = Register {
153 app: A::id(),
154 ect,
155 expiration: epoch + 2,
156 extra_keys: vec![self.env.signer().public_key()],
157 metadata,
158 };
159
160 let tx = self.state.app.new_transaction("rofl.Register", register);
161 let result = self
162 .env
163 .client()
164 .multi_sign_and_submit_tx_opts(
165 &[self.state.identity.clone(), self.env.signer()],
166 tx,
167 SubmitTxOpts {
168 timeout: Some(Duration::from_millis(60_000)),
169 encrypt: false, ..Default::default()
171 },
172 )
173 .await?
174 .ok()?;
175
176 slog::info!(self.logger, "refreshed registration"; "result" => ?result);
177
178 if self.last_registration_epoch.is_none() {
179 self.env
182 .send_command(processor::Command::InitialRegistrationCompleted)
183 .await?;
184 }
185 self.last_registration_epoch = Some(epoch);
186
187 self.env
189 .send_command(processor::Command::RegistrationRefreshed)
190 .await?;
191
192 Ok(())
193 }
194}