oasis_core_runtime/consensus/roothash/commitment/
pool.rs

1use std::collections::{HashMap, HashSet};
2
3use anyhow::Result;
4
5use crate::{
6    common::crypto::{hash::Hash, signature::PublicKey},
7    consensus::{
8        registry::{Node, Runtime, TEEHardware},
9        roothash::{Block, Error, Message, OpenCommitment},
10        scheduler::{Committee, CommitteeKind, Role},
11    },
12};
13
14use super::ExecutorCommitment;
15
16/// A trait for looking up registry node descriptors.
17pub trait NodeLookup {
18    fn node(&self, id: PublicKey) -> Result<Node, Error>;
19}
20
21/// A trait that validates messages for validity. It can be used for gas accounting.
22pub trait MessageValidator {
23    fn validate(&self, msgs: &[Message]) -> Result<()>;
24}
25
26impl<F> MessageValidator for F
27where
28    F: Fn(&[Message]) -> Result<()>,
29{
30    fn validate(&self, msgs: &[Message]) -> Result<()> {
31        (*self)(msgs)
32    }
33}
34
35/// A pool of commitments that can be used to perform
36/// discrepancy detection.
37///
38/// The pool is not safe for concurrent use.
39pub struct Pool {
40    /// The runtime descriptor this pool is collecting
41    /// the commitments for.
42    runtime: Runtime,
43    /// The committee this pool is collecting the commitments for.
44    committee: Committee,
45    /// The current protocol round.
46    round: u64,
47    // The commitments in the pool iff Committee.Kind
48    // is scheduler.KindComputeExecutor.
49    execute_commitments: HashMap<PublicKey, ExecutorCommitment>,
50    // A flag signalling that a discrepancy has been detected.
51    discrepancy: bool,
52    // The time when the next call to TryFinalize(true) should
53    // be scheduled to be executed. Zero means that no timeout is to be scheduled.
54    _next_timeout: i64,
55
56    // A cached committee member set. It will be automatically
57    // constructed based on the passed Committee.
58    member_set: HashSet<PublicKey>,
59    // A cached committee worker set. It will be automatically
60    // constructed based on the passed Committee.
61    _worker_set: HashSet<PublicKey>,
62}
63
64impl Pool {
65    /// Creates a new pool.
66    pub fn new(runtime: Runtime, committee: Committee, round: u64) -> Self {
67        let mut member_set = HashSet::new();
68        let mut _worker_set = HashSet::new();
69
70        for m in &committee.members {
71            member_set.insert(m.public_key);
72            if m.role == Role::Worker {
73                _worker_set.insert(m.public_key);
74            }
75        }
76
77        Pool {
78            runtime,
79            committee,
80            round,
81            execute_commitments: HashMap::new(),
82            discrepancy: false,
83            _next_timeout: 0,
84            member_set,
85            _worker_set,
86        }
87    }
88
89    fn is_member(&self, id: &PublicKey) -> bool {
90        self.member_set.contains(id)
91    }
92
93    fn _is_worker(&self, id: &PublicKey) -> bool {
94        self._worker_set.contains(id)
95    }
96
97    fn is_scheduler(&self, id: &PublicKey) -> bool {
98        if let Ok(scheduler) = self.committee.transaction_scheduler(self.round) {
99            return &scheduler.public_key == id;
100        }
101        false
102    }
103
104    /// Verifies and adds a new executor commitment to the pool.
105    fn add_verified_executor_commitment(
106        &mut self,
107        blk: &Block,
108        nl: &impl NodeLookup,
109        msg_validator: &impl MessageValidator,
110        commit: ExecutorCommitment,
111    ) -> Result<()> {
112        if self.committee.kind != CommitteeKind::ComputeExecutor {
113            return Err(Error::InvalidCommitteeKind.into());
114        }
115
116        // Ensure that the node is actually a committee member. We do not enforce specific
117        // roles based on current discrepancy state to allow commitments arriving in any
118        // order (e.g., a backup worker can submit a commitment even before there is a
119        // discrepancy).
120        if !self.is_member(&commit.node_id) {
121            return Err(Error::NotInCommittee.into());
122        }
123
124        // Ensure the node did not already submit a commitment.
125        if self.execute_commitments.contains_key(&commit.node_id) {
126            return Err(Error::AlreadyCommitted.into());
127        }
128
129        if self.round != blk.header.round {
130            return Err(Error::InvalidRound.into());
131        }
132
133        // Check if the block is based on the previous block.
134        if !commit.header.header.is_parent_of(&blk.header) {
135            return Err(Error::NotBasedOnCorrectBlock.into());
136        }
137
138        if commit.validate_basic().is_err() {
139            return Err(Error::BadExecutorCommitment.into());
140        }
141
142        // TODO: Check for evidence of equivocation (oasis-core#3685).
143
144        if !commit.is_indicating_failure() {
145            // Verify RAK-attestation.
146            if self.runtime.tee_hardware != TEEHardware::TEEHardwareInvalid {
147                let n = nl.node(commit.node_id).map_err(|_|
148                    // This should never happen as nodes cannot disappear mid-epoch.
149                    Error::NotInCommittee)?;
150
151                let ad = self
152                    .runtime
153                    .active_deployment(self.committee.valid_for)
154                    .ok_or(
155                        // This should never happen as we prevent this elsewhere.
156                        Error::NoRuntime,
157                    )?;
158
159                let rt = n.get_runtime(&self.runtime.id, &ad.version).ok_or(
160                    // We currently prevent this case throughout the rest of the system.
161                    // Still, it's prudent to check.
162                    Error::NotInCommittee,
163                )?;
164
165                let tee = rt.capabilities.tee.ok_or(
166                    // This should never happen as we prevent this elsewhere.
167                    Error::RakSigInvalid,
168                )?;
169
170                commit
171                    .header
172                    .verify_rak(tee.rak)
173                    .map_err(|_| Error::RakSigInvalid)?;
174            }
175
176            // Check emitted runtime messages.
177            match self.is_scheduler(&commit.node_id) {
178                true => {
179                    // The transaction scheduler can include messages.
180                    if commit.messages.len() as u32 > self.runtime.executor.max_messages {
181                        return Err(Error::InvalidMessages.into());
182                    }
183
184                    let messages_hash = commit
185                        .header
186                        .header
187                        .messages_hash
188                        .ok_or(Error::InvalidMessages)?;
189                    let h = Message::messages_hash(&commit.messages);
190                    if h != messages_hash {
191                        return Err(Error::InvalidMessages.into());
192                    }
193
194                    // Perform custom message validation and propagate the error unchanged.
195                    if !commit.messages.is_empty() {
196                        msg_validator.validate(&commit.messages)?;
197                    }
198                }
199                false => {
200                    // Other workers cannot include any messages.
201                    if !commit.messages.is_empty() {
202                        return Err(Error::InvalidMessages.into());
203                    }
204                }
205            }
206        }
207
208        self.execute_commitments.insert(commit.node_id, commit);
209
210        Ok(())
211    }
212
213    /// Verifies and adds a new executor commitment to the pool.
214    pub fn add_executor_commitment(
215        &mut self,
216        blk: &Block,
217        nl: &impl NodeLookup,
218        commit: ExecutorCommitment,
219        msg_validator: &impl MessageValidator,
220        chain_context: &String,
221    ) -> Result<()> {
222        // Check executor commitment signature.
223        commit.verify(&self.runtime.id, chain_context)?;
224
225        self.add_verified_executor_commitment(blk, nl, msg_validator, commit)
226    }
227
228    /// Performs a single round of commitment checks. If there are enough commitments
229    /// in the pool, it performs discrepancy detection or resolution.
230    pub fn process_commitments(&mut self, did_timeout: bool) -> Result<&dyn OpenCommitment> {
231        if self.committee.kind != CommitteeKind::ComputeExecutor {
232            panic!(
233                "roothash/commitment: unknown committee kind: {:?}",
234                self.committee.kind
235            );
236        }
237
238        #[derive(Default)]
239        struct Vote<'a> {
240            commit: Option<&'a ExecutorCommitment>,
241            tally: u16,
242        }
243
244        let mut total = 0;
245        let mut commits = 0;
246        let mut failures = 0;
247
248        // Gather votes.
249        let mut votes: HashMap<Hash, Vote> = HashMap::new();
250        for n in &self.committee.members {
251            if !self.discrepancy && n.role != Role::Worker {
252                continue;
253            }
254            if self.discrepancy && n.role != Role::BackupWorker {
255                continue;
256            }
257
258            total += 1;
259            let commit = match self.execute_commitments.get(&n.public_key) {
260                Some(commit) => commit,
261                None => continue,
262            };
263            commits += 1;
264
265            if commit.is_indicating_failure() {
266                failures += 1;
267                continue;
268            }
269
270            let k = commit.to_vote();
271            match votes.get_mut(&k) {
272                Some(v) => v.tally += 1,
273                None => {
274                    votes.insert(
275                        k,
276                        Vote {
277                            tally: 1,
278                            commit: Some(commit),
279                        },
280                    );
281                }
282            }
283
284            if !self.discrepancy && votes.len() > 1 {
285                self.discrepancy = true;
286                return Err(Error::DiscrepancyDetected.into());
287            }
288        }
289
290        // Determine whether the proposer has submitted a commitment.
291        let proposer = self
292            .committee
293            .transaction_scheduler(self.round)
294            .map_err(|_| Error::NoCommittee)?;
295        let proposer_commit = self.execute_commitments.get(&proposer.public_key);
296        if proposer_commit.is_none() && did_timeout {
297            return Err(Error::NoProposerCommitment.into());
298        }
299
300        match self.discrepancy {
301            false => {
302                // Discrepancy detection.
303                let allowed_stragglers = self.runtime.executor.allowed_stragglers;
304
305                // If it is already known that the number of valid commitments will not exceed the required
306                // threshold, there is no need to wait for the timer to expire. Instead, proceed directly to
307                // the discrepancy resolution mode, regardless of any additional commits.
308                if failures > allowed_stragglers {
309                    self.discrepancy = true;
310                    return Err(Error::DiscrepancyDetected.into());
311                }
312
313                // While a timer is running, all nodes are required to answer.
314                let mut required = total;
315
316                // After the timeout has elapsed, a limited number of stragglers are allowed.
317                if did_timeout {
318                    required -= allowed_stragglers;
319                    commits -= failures // Since failures count as stragglers.
320                }
321
322                // Check if the majority has been reached.
323                if commits < required || proposer_commit.is_none() {
324                    return Err(Error::StillWaiting.into());
325                }
326            }
327            true => {
328                // Discrepancy resolution.
329                let required = total / 2 + 1;
330
331                // Find the commit with the highest number of votes.
332                let mut top_vote = &Vote::default();
333                for v in votes.values() {
334                    if v.tally > top_vote.tally {
335                        top_vote = v;
336                    }
337                }
338
339                // Fail the round if the majority cannot be reached due to insufficient votes remaining
340                // (e.g. too many nodes have failed),
341                let remaining = total - commits;
342                if top_vote.tally + remaining < required {
343                    return Err(Error::InsufficientVotes.into());
344                }
345
346                // Check if the majority has been reached.
347                if top_vote.tally < required || proposer_commit.is_none() {
348                    if did_timeout {
349                        return Err(Error::InsufficientVotes.into());
350                    }
351                    return Err(Error::StillWaiting.into());
352                }
353
354                let proposer_commit = proposer_commit.expect("proposer commit should be set");
355                let top_vote_commit = top_vote.commit.expect("top vote commit should be set");
356
357                // Make sure that the majority commitment is the same as the proposer commitment.
358                if !proposer_commit.mostly_equal(top_vote_commit) {
359                    return Err(Error::BadProposerCommitment.into());
360                }
361            }
362        }
363
364        // We must return the proposer commitment as that one contains additional data.
365        let proposer_commit = proposer_commit.expect("proposer commit should be set");
366        Ok(proposer_commit)
367    }
368}
369
370#[cfg(test)]
371mod tests {
372    use anyhow::{anyhow, Result};
373
374    use crate::{
375        common::{
376            crypto::{
377                hash::Hash,
378                signature::{self, PublicKey, Signature},
379            },
380            namespace::Namespace,
381            versioned::Versioned,
382        },
383        consensus::{
384            registry::{
385                ExecutorParameters, Node, NodeRuntime, Runtime, RuntimeGovernanceModel,
386                RuntimeKind, TEEHardware,
387            },
388            roothash::{
389                Block, ComputeResultsHeader, Error, ExecutorCommitment, ExecutorCommitmentFailure,
390                ExecutorCommitmentHeader, HeaderType, Message, Pool, RegistryMessage,
391                StakingMessage,
392            },
393            scheduler::{Committee, CommitteeKind, CommitteeNode, Role},
394            staking::Transfer,
395        },
396    };
397
398    use super::NodeLookup;
399
400    struct StaticNodeLookup {
401        runtime: NodeRuntime,
402    }
403
404    impl NodeLookup for StaticNodeLookup {
405        fn node(&self, id: PublicKey) -> Result<Node, Error> {
406            Ok(Node {
407                id,
408                runtimes: Some(vec![self.runtime.clone()]),
409                ..Default::default()
410            })
411        }
412    }
413
414    #[test]
415    fn test_pool_single_commitment() {
416        let chain_context = "test: oasis-core tests".to_owned();
417
418        // Generate a non-TEE runtime.
419        let id =
420            Namespace::from("0000000000000000000000000000000000000000000000000000000000000000");
421
422        let rt = Runtime {
423            id,
424            kind: RuntimeKind::KindCompute,
425            tee_hardware: TEEHardware::TEEHardwareInvalid,
426            executor: ExecutorParameters {
427                max_messages: 32,
428                ..Default::default()
429            },
430            governance_model: RuntimeGovernanceModel::GovernanceEntity,
431            ..Default::default()
432        };
433
434        // Generate a commitment signing key.
435        let sk = signature::PrivateKey::generate();
436
437        // Generate a committee.
438        let committee = Committee {
439            kind: CommitteeKind::ComputeExecutor,
440            members: vec![CommitteeNode {
441                role: Role::Worker,
442                public_key: sk.public_key(),
443            }],
444            runtime_id: id,
445            valid_for: 0,
446        };
447
448        // Create a pool.
449        let mut pool = Pool::new(rt, committee, 0);
450
451        // Generate a commitment.
452        let (child_blk, _, mut ec) = generate_executor_commitment(id, pool.round);
453
454        let nl = StaticNodeLookup {
455            runtime: NodeRuntime {
456                id,
457                ..Default::default()
458            },
459        };
460
461        // Test invalid commitments.
462        let tcs: Vec<(&str, fn(&mut ExecutorCommitment), Error)> = vec![
463            (
464                "BlockBadRound",
465                |ec: &mut ExecutorCommitment| ec.header.header.round -= 1,
466                Error::NotBasedOnCorrectBlock,
467            ),
468            (
469                "BlockBadPreviousHash",
470                |ec: &mut ExecutorCommitment| {
471                    ec.header.header.previous_hash = Hash::digest_bytes(b"invalid")
472                },
473                Error::NotBasedOnCorrectBlock,
474            ),
475            (
476                "MissingIORootHash",
477                |ec: &mut ExecutorCommitment| ec.header.header.io_root = None,
478                Error::BadExecutorCommitment,
479            ),
480            (
481                "MissingStateRootHash",
482                |ec: &mut ExecutorCommitment| ec.header.header.state_root = None,
483                Error::BadExecutorCommitment,
484            ),
485            (
486                "MissingMessagesHash",
487                |ec: &mut ExecutorCommitment| ec.header.header.messages_hash = None,
488                Error::BadExecutorCommitment,
489            ),
490            (
491                "MissingInMessagesHash",
492                |ec: &mut ExecutorCommitment| ec.header.header.in_msgs_hash = None,
493                Error::BadExecutorCommitment,
494            ),
495            (
496                "BadFailureIndicating",
497                |ec: &mut ExecutorCommitment| {
498                    ec.header.failure = ExecutorCommitmentFailure::FailureUnknown
499                },
500                Error::BadExecutorCommitment,
501            ),
502        ];
503
504        let msg_validator = |_: &_| Ok(());
505        for (name, f, expected_err) in tcs {
506            let (_, _, mut invalid_ec) = generate_executor_commitment(id, pool.round);
507            f(&mut invalid_ec);
508
509            invalid_ec.node_id = sk.public_key();
510            let res = invalid_ec.sign(&sk, &id, &chain_context);
511            assert!(res.is_ok(), "invalid_ec.sign({})", name);
512
513            let res = pool.add_executor_commitment(
514                &child_blk,
515                &nl,
516                invalid_ec,
517                &msg_validator,
518                &chain_context,
519            );
520            assert!(res.is_err(), "add_executor_commitment({})", name);
521            assert_eq!(
522                res.err().unwrap().to_string(),
523                expected_err.to_string(),
524                "add_executor_commitment({})",
525                name
526            );
527        }
528
529        // Generate a valid commitment.
530        ec.node_id = sk.public_key();
531        let res = ec.sign(&sk, &id, &chain_context);
532        assert!(res.is_ok(), "ec.sign");
533
534        // There should not be enough executor commitments.
535        let res = pool.process_commitments(false);
536        assert_eq!(
537            res.err().unwrap().to_string(),
538            Error::StillWaiting.to_string(),
539            "process_commitments",
540        );
541
542        let res = pool.process_commitments(true);
543        assert_eq!(
544            res.err().unwrap().to_string(),
545            Error::NoProposerCommitment.to_string(),
546            "process_commitments",
547        );
548
549        // Test message validator function.
550        let mut ec_with_msgs = ec.clone();
551        ec_with_msgs.messages = vec![
552            Message::Staking(Versioned {
553                version: 0,
554                inner: StakingMessage::Transfer(Transfer::default()),
555            }),
556            Message::Registry(Versioned {
557                version: 0,
558                inner: RegistryMessage::UpdateRuntime(Runtime::default()),
559            }),
560        ];
561        let msg_hash = Message::messages_hash(&ec_with_msgs.messages);
562        ec_with_msgs.header.header.messages_hash = Some(msg_hash);
563
564        let res = ec_with_msgs.sign(&sk, &id, &chain_context);
565        assert!(res.is_ok(), "ec_with_msgs.sign");
566
567        let error_msg = "message validation error";
568        let always_fail_msg_validator = |_: &_| -> Result<()> { Err(anyhow!(error_msg)) };
569        let res = pool.add_executor_commitment(
570            &child_blk,
571            &nl,
572            ec_with_msgs,
573            &always_fail_msg_validator,
574            &chain_context,
575        );
576        assert!(res.is_err(), "add_executor_commitment");
577        assert_eq!(
578            res.err().unwrap().to_string(),
579            error_msg,
580            "add_executor_commitment",
581        );
582
583        // Adding a commitment should succeed.
584        let res = pool.add_executor_commitment(
585            &child_blk,
586            &nl,
587            ec.clone(),
588            &msg_validator,
589            &chain_context,
590        );
591        assert!(res.is_ok(), "add_executor_commitment");
592
593        // Adding a commitment twice for the same node should fail.
594        let res = pool.add_executor_commitment(
595            &child_blk,
596            &nl,
597            ec.clone(),
598            &msg_validator,
599            &chain_context,
600        );
601        assert!(res.is_err(), "add_executor_commitment, duplicate");
602
603        // There should be enough executor commitments and no discrepancy.
604        let res = pool.process_commitments(false);
605        assert!(res.is_ok(), "process_commitments");
606        let dd_ec = res
607            .unwrap()
608            .to_dd_result()
609            .downcast_ref::<ExecutorCommitment>();
610        assert_eq!(dd_ec, Some(&ec), "DD should return the correct commitment");
611        assert_eq!(false, pool.discrepancy);
612    }
613
614    fn generate_executor_commitment(
615        id: Namespace,
616        round: u64,
617    ) -> (Block, Block, ExecutorCommitment) {
618        let child_blk = Block::new_genesis_block(id, round);
619        let parent_blk = Block::new_empty_block(&child_blk, 1, HeaderType::Normal);
620
621        // TODO: Add tests with some emitted messages.
622        let msgs_hash = Message::messages_hash(&vec![]);
623        // TODO: Add tests with some incoming messages.
624        let in_msgs_hash = Message::in_messages_hash(&vec![]);
625
626        let ec = ExecutorCommitment {
627            header: ExecutorCommitmentHeader {
628                header: ComputeResultsHeader {
629                    round: parent_blk.header.round,
630                    previous_hash: parent_blk.header.previous_hash,
631                    io_root: Some(parent_blk.header.io_root),
632                    state_root: Some(parent_blk.header.state_root),
633                    messages_hash: Some(msgs_hash),
634                    in_msgs_hash: Some(in_msgs_hash),
635                    in_msgs_count: 0,
636                },
637                failure: ExecutorCommitmentFailure::FailureNone,
638                rak_signature: None,
639            },
640            node_id: PublicKey::default(),
641            signature: Signature::default(),
642            messages: vec![],
643        };
644
645        (child_blk, parent_blk, ec)
646    }
647}