oasis_runtime_sdk/modules/rofl/app/
processor.rs1use std::sync::Arc;
2
3use anyhow::{anyhow, Result};
4use tokio::sync::{mpsc, oneshot};
5
6use crate::{
7 core::{
8 common::logger::get_logger,
9 consensus::{roothash, verifier::Verifier},
10 dispatcher::PreInitState,
11 host::{self, Host as _},
12 identity::Identity,
13 protocol::Protocol,
14 },
15 crypto::signature::{secp256k1, Signer},
16};
17use rand::rngs::OsRng;
18
19use super::{notifier, registration, watchdog, App, Environment};
20
21const CMDQ_BACKLOG: usize = 32;
23
24#[allow(clippy::large_enum_variant)]
26#[derive(Debug)]
27pub(super) enum Command {
28 ProcessRuntimeBlock(roothash::AnnotatedBlock),
30 GetLatestRound(oneshot::Sender<u64>),
32 InitialRegistrationCompleted,
34 RegistrationRefreshed,
36}
37
38pub(super) struct State<A: App> {
40 pub(super) identity: Arc<Identity>,
41 pub(super) host: Arc<Protocol>,
42 pub(super) consensus_verifier: Arc<dyn Verifier>,
43 pub(super) signer: Arc<dyn Signer>,
44 pub(super) app: Arc<A>,
45}
46
47struct Tasks<A: App> {
48 registration: registration::Task<A>,
49 notifier: notifier::Task<A>,
50 watchdog: watchdog::Task,
51}
52
53pub(super) struct Processor<A: App> {
55 state: Arc<State<A>>,
56 env: Environment<A>,
57 tasks: Tasks<A>,
58 cmdq: mpsc::Receiver<Command>,
59 logger: slog::Logger,
60
61 latest_round: u64,
62}
63
64impl<A> Processor<A>
65where
66 A: App,
67{
68 pub(super) fn start(mut app: A, state: &PreInitState<'_>) -> mpsc::Sender<Command> {
70 let (tx, rx) = mpsc::channel(CMDQ_BACKLOG);
72
73 app.init(state.protocol.clone());
75
76 let signer = secp256k1::MemorySigner::random(&mut OsRng).unwrap();
80
81 let state = Arc::new(State {
83 identity: state.identity.clone(),
84 host: state.protocol.clone(),
85 consensus_verifier: state.consensus_verifier.clone(),
86 signer: Arc::new(signer),
87 app: Arc::new(app),
88 });
89
90 let env = Environment::new(state.clone(), tx.downgrade());
92
93 let processor = Self {
95 tasks: Tasks {
96 registration: registration::Task::new(state.clone(), env.clone()),
97 notifier: notifier::Task::new(state.clone(), env.clone()),
98 watchdog: watchdog::Task::new(),
99 },
100 state,
101 env,
102 cmdq: rx,
103 logger: get_logger("modules/rofl/app"),
104 latest_round: 0,
105 };
106 tokio::spawn(processor.run());
107
108 tx
109 }
110
111 async fn run(mut self) {
113 slog::info!(self.logger, "starting processor";
114 "app_id" => A::id(),
115 );
116
117 if let Err(err) = self
119 .state
120 .host
121 .register_notify(host::RegisterNotifyOpts {
122 runtime_block: true,
123 runtime_event: vec![],
124 })
125 .await
126 {
127 slog::error!(self.logger, "failed to register for notifications";
128 "err" => ?err,
129 );
130 }
131
132 self.tasks.registration.start();
134 self.tasks.notifier.start();
135 self.tasks.watchdog.start();
136
137 slog::info!(self.logger, "entering processor loop");
138 while let Some(cmd) = self.cmdq.recv().await {
139 if let Err(err) = self.process(cmd).await {
140 slog::error!(self.logger, "failed to process command";
141 "err" => ?err,
142 );
143 }
144 }
145
146 slog::info!(self.logger, "processor stopped");
147 }
148
149 async fn process(&mut self, cmd: Command) -> Result<()> {
151 match cmd {
152 Command::ProcessRuntimeBlock(blk) => self.cmd_process_runtime_block(blk).await,
153 Command::GetLatestRound(ch) => self.cmd_get_latest_round(ch).await,
154 Command::InitialRegistrationCompleted => {
155 self.cmd_initial_registration_completed().await
156 }
157 Command::RegistrationRefreshed => self.tasks.watchdog.keep_alive().await,
158 }
159 }
160
161 async fn cmd_process_runtime_block(&mut self, blk: roothash::AnnotatedBlock) -> Result<()> {
162 if blk.block.header.round <= self.latest_round {
164 return Err(anyhow!("round seems to have moved backwards"));
165 }
166 self.latest_round = blk.block.header.round;
167
168 self.tasks.registration.refresh();
170 let _ = self
172 .tasks
173 .notifier
174 .notify(notifier::Notify::RuntimeBlock(self.latest_round))
175 .await;
176
177 Ok(())
178 }
179
180 async fn cmd_get_latest_round(&self, ch: oneshot::Sender<u64>) -> Result<()> {
181 let _ = ch.send(self.latest_round);
182 Ok(())
183 }
184
185 async fn cmd_initial_registration_completed(&self) -> Result<()> {
186 slog::info!(self.logger, "initial registration completed");
187
188 slog::info!(self.logger, "starting application");
190 tokio::spawn(self.state.app.clone().run(self.env.clone()));
191
192 let app = self.state.app.clone();
194 let env = self.env.clone();
195 let logger = self.logger.clone();
196 tokio::spawn(async move {
197 slog::info!(
198 logger,
199 "performing app-specific post-registration initialization"
200 );
201
202 app.post_registration_init(env).await;
203 });
204
205 self.tasks
207 .notifier
208 .notify(notifier::Notify::InitialRegistrationCompleted)
209 .await?;
210
211 Ok(())
212 }
213}