oasis_runtime_sdk/modules/rofl/app/
processor.rs

1use std::sync::Arc;
2
3use anyhow::{anyhow, Result};
4use tokio::sync::{mpsc, oneshot};
5
6use crate::{
7    core::{
8        common::logger::get_logger,
9        consensus::{roothash, verifier::Verifier},
10        dispatcher::PreInitState,
11        host::{self, Host as _},
12        identity::Identity,
13        protocol::Protocol,
14    },
15    crypto::signature::{secp256k1, Signer},
16};
17use rand::rngs::OsRng;
18
19use super::{notifier, registration, watchdog, App, Environment};
20
21/// Size of the processor command queue.
22const CMDQ_BACKLOG: usize = 32;
23
24/// Command sent to the processor task.
25#[allow(clippy::large_enum_variant)]
26#[derive(Debug)]
27pub(super) enum Command {
28    /// Process a notification of a new runtime block.
29    ProcessRuntimeBlock(roothash::AnnotatedBlock),
30    /// Retrieve the latest known round.
31    GetLatestRound(oneshot::Sender<u64>),
32    /// Notification that initial registration has been completed.
33    InitialRegistrationCompleted,
34    /// Registration refreshed.
35    RegistrationRefreshed,
36}
37
38/// Processor state.
39pub(super) struct State<A: App> {
40    pub(super) identity: Arc<Identity>,
41    pub(super) host: Arc<Protocol>,
42    pub(super) consensus_verifier: Arc<dyn Verifier>,
43    pub(super) signer: Arc<dyn Signer>,
44    pub(super) app: Arc<A>,
45}
46
47struct Tasks<A: App> {
48    registration: registration::Task<A>,
49    notifier: notifier::Task<A>,
50    watchdog: watchdog::Task,
51}
52
53/// Processor.
54pub(super) struct Processor<A: App> {
55    state: Arc<State<A>>,
56    env: Environment<A>,
57    tasks: Tasks<A>,
58    cmdq: mpsc::Receiver<Command>,
59    logger: slog::Logger,
60
61    latest_round: u64,
62}
63
64impl<A> Processor<A>
65where
66    A: App,
67{
68    /// Create and start a new processor.
69    pub(super) fn start(mut app: A, state: &PreInitState<'_>) -> mpsc::Sender<Command> {
70        // Create the command channel.
71        let (tx, rx) = mpsc::channel(CMDQ_BACKLOG);
72
73        // Perform early application initialization.
74        app.init(state.protocol.clone());
75
76        // Provision keys. Currently we provision a random key for signing transactions to avoid
77        // using the RAK directly as the RAK is an Ed25519 key which cannot easily be used for EVM
78        // calls due to the limitations of the current implementation.
79        let signer = secp256k1::MemorySigner::random(&mut OsRng).unwrap();
80
81        // Prepare state.
82        let state = Arc::new(State {
83            identity: state.identity.clone(),
84            host: state.protocol.clone(),
85            consensus_verifier: state.consensus_verifier.clone(),
86            signer: Arc::new(signer),
87            app: Arc::new(app),
88        });
89
90        // Prepare application environment.
91        let env = Environment::new(state.clone(), tx.downgrade());
92
93        // Create the processor and start it.
94        let processor = Self {
95            tasks: Tasks {
96                registration: registration::Task::new(state.clone(), env.clone()),
97                notifier: notifier::Task::new(state.clone(), env.clone()),
98                watchdog: watchdog::Task::new(),
99            },
100            state,
101            env,
102            cmdq: rx,
103            logger: get_logger("modules/rofl/app"),
104            latest_round: 0,
105        };
106        tokio::spawn(processor.run());
107
108        tx
109    }
110
111    /// Run the processor.
112    async fn run(mut self) {
113        slog::info!(self.logger, "starting processor";
114            "app_id" => A::id(),
115        );
116
117        // Register for notifications.
118        if let Err(err) = self
119            .state
120            .host
121            .register_notify(host::RegisterNotifyOpts {
122                runtime_block: true,
123                runtime_event: vec![],
124            })
125            .await
126        {
127            slog::error!(self.logger, "failed to register for notifications";
128                "err" => ?err,
129            );
130        }
131
132        // Start the tasks.
133        self.tasks.registration.start();
134        self.tasks.notifier.start();
135        self.tasks.watchdog.start();
136
137        slog::info!(self.logger, "entering processor loop");
138        while let Some(cmd) = self.cmdq.recv().await {
139            if let Err(err) = self.process(cmd).await {
140                slog::error!(self.logger, "failed to process command";
141                    "err" => ?err,
142                );
143            }
144        }
145
146        slog::info!(self.logger, "processor stopped");
147    }
148
149    /// Process a command.
150    async fn process(&mut self, cmd: Command) -> Result<()> {
151        match cmd {
152            Command::ProcessRuntimeBlock(blk) => self.cmd_process_runtime_block(blk).await,
153            Command::GetLatestRound(ch) => self.cmd_get_latest_round(ch).await,
154            Command::InitialRegistrationCompleted => {
155                self.cmd_initial_registration_completed().await
156            }
157            Command::RegistrationRefreshed => self.tasks.watchdog.keep_alive().await,
158        }
159    }
160
161    async fn cmd_process_runtime_block(&mut self, blk: roothash::AnnotatedBlock) -> Result<()> {
162        // Update latest known round.
163        if blk.block.header.round <= self.latest_round {
164            return Err(anyhow!("round seems to have moved backwards"));
165        }
166        self.latest_round = blk.block.header.round;
167
168        // Notify registration task.
169        self.tasks.registration.refresh();
170        // Notify notifier task.
171        let _ = self
172            .tasks
173            .notifier
174            .notify(notifier::Notify::RuntimeBlock(self.latest_round))
175            .await;
176
177        Ok(())
178    }
179
180    async fn cmd_get_latest_round(&self, ch: oneshot::Sender<u64>) -> Result<()> {
181        let _ = ch.send(self.latest_round);
182        Ok(())
183    }
184
185    async fn cmd_initial_registration_completed(&self) -> Result<()> {
186        slog::info!(self.logger, "initial registration completed");
187
188        // Start application after first registration.
189        slog::info!(self.logger, "starting application");
190        tokio::spawn(self.state.app.clone().run(self.env.clone()));
191
192        // Perform post-registration initialization.
193        let app = self.state.app.clone();
194        let env = self.env.clone();
195        let logger = self.logger.clone();
196        tokio::spawn(async move {
197            slog::info!(
198                logger,
199                "performing app-specific post-registration initialization"
200            );
201
202            app.post_registration_init(env).await;
203        });
204
205        // Notify notifier task.
206        self.tasks
207            .notifier
208            .notify(notifier::Notify::InitialRegistrationCompleted)
209            .await?;
210
211        Ok(())
212    }
213}