1use std::collections::{HashMap, HashSet};
2
3use anyhow::Result;
4
5use crate::{
6 common::crypto::{hash::Hash, signature::PublicKey},
7 consensus::{
8 registry::{Node, Runtime, TEEHardware},
9 roothash::{Block, Error, Message, OpenCommitment},
10 scheduler::{Committee, CommitteeKind, Role},
11 },
12};
13
14use super::ExecutorCommitment;
15
16pub trait NodeLookup {
18 fn node(&self, id: PublicKey) -> Result<Node, Error>;
19}
20
21pub trait MessageValidator {
23 fn validate(&self, msgs: &[Message]) -> Result<()>;
24}
25
26impl<F> MessageValidator for F
27where
28 F: Fn(&[Message]) -> Result<()>,
29{
30 fn validate(&self, msgs: &[Message]) -> Result<()> {
31 (*self)(msgs)
32 }
33}
34
35pub struct Pool {
40 runtime: Runtime,
43 committee: Committee,
45 round: u64,
47 execute_commitments: HashMap<PublicKey, ExecutorCommitment>,
50 discrepancy: bool,
52 _next_timeout: i64,
55
56 member_set: HashSet<PublicKey>,
59 _worker_set: HashSet<PublicKey>,
62}
63
64impl Pool {
65 pub fn new(runtime: Runtime, committee: Committee, round: u64) -> Self {
67 let mut member_set = HashSet::new();
68 let mut _worker_set = HashSet::new();
69
70 for m in &committee.members {
71 member_set.insert(m.public_key);
72 if m.role == Role::Worker {
73 _worker_set.insert(m.public_key);
74 }
75 }
76
77 Pool {
78 runtime,
79 committee,
80 round,
81 execute_commitments: HashMap::new(),
82 discrepancy: false,
83 _next_timeout: 0,
84 member_set,
85 _worker_set,
86 }
87 }
88
89 fn is_member(&self, id: &PublicKey) -> bool {
90 self.member_set.contains(id)
91 }
92
93 fn _is_worker(&self, id: &PublicKey) -> bool {
94 self._worker_set.contains(id)
95 }
96
97 fn is_scheduler(&self, id: &PublicKey) -> bool {
98 if let Ok(scheduler) = self.committee.transaction_scheduler(self.round) {
99 return &scheduler.public_key == id;
100 }
101 false
102 }
103
104 fn add_verified_executor_commitment(
106 &mut self,
107 blk: &Block,
108 nl: &impl NodeLookup,
109 msg_validator: &impl MessageValidator,
110 commit: ExecutorCommitment,
111 ) -> Result<()> {
112 if self.committee.kind != CommitteeKind::ComputeExecutor {
113 return Err(Error::InvalidCommitteeKind.into());
114 }
115
116 if !self.is_member(&commit.node_id) {
121 return Err(Error::NotInCommittee.into());
122 }
123
124 if self.execute_commitments.contains_key(&commit.node_id) {
126 return Err(Error::AlreadyCommitted.into());
127 }
128
129 if self.round != blk.header.round {
130 return Err(Error::InvalidRound.into());
131 }
132
133 if !commit.header.header.is_parent_of(&blk.header) {
135 return Err(Error::NotBasedOnCorrectBlock.into());
136 }
137
138 if commit.validate_basic().is_err() {
139 return Err(Error::BadExecutorCommitment.into());
140 }
141
142 if !commit.is_indicating_failure() {
145 if self.runtime.tee_hardware != TEEHardware::TEEHardwareInvalid {
147 let n = nl.node(commit.node_id).map_err(|_|
148 Error::NotInCommittee)?;
150
151 let ad = self
152 .runtime
153 .active_deployment(self.committee.valid_for)
154 .ok_or(
155 Error::NoRuntime,
157 )?;
158
159 let rt = n.get_runtime(&self.runtime.id, &ad.version).ok_or(
160 Error::NotInCommittee,
163 )?;
164
165 let tee = rt.capabilities.tee.ok_or(
166 Error::RakSigInvalid,
168 )?;
169
170 commit
171 .header
172 .verify_rak(tee.rak)
173 .map_err(|_| Error::RakSigInvalid)?;
174 }
175
176 match self.is_scheduler(&commit.node_id) {
178 true => {
179 if commit.messages.len() as u32 > self.runtime.executor.max_messages {
181 return Err(Error::InvalidMessages.into());
182 }
183
184 let messages_hash = commit
185 .header
186 .header
187 .messages_hash
188 .ok_or(Error::InvalidMessages)?;
189 let h = Message::messages_hash(&commit.messages);
190 if h != messages_hash {
191 return Err(Error::InvalidMessages.into());
192 }
193
194 if !commit.messages.is_empty() {
196 msg_validator.validate(&commit.messages)?;
197 }
198 }
199 false => {
200 if !commit.messages.is_empty() {
202 return Err(Error::InvalidMessages.into());
203 }
204 }
205 }
206 }
207
208 self.execute_commitments.insert(commit.node_id, commit);
209
210 Ok(())
211 }
212
213 pub fn add_executor_commitment(
215 &mut self,
216 blk: &Block,
217 nl: &impl NodeLookup,
218 commit: ExecutorCommitment,
219 msg_validator: &impl MessageValidator,
220 chain_context: &String,
221 ) -> Result<()> {
222 commit.verify(&self.runtime.id, chain_context)?;
224
225 self.add_verified_executor_commitment(blk, nl, msg_validator, commit)
226 }
227
228 pub fn process_commitments(&mut self, did_timeout: bool) -> Result<&dyn OpenCommitment> {
231 if self.committee.kind != CommitteeKind::ComputeExecutor {
232 panic!(
233 "roothash/commitment: unknown committee kind: {:?}",
234 self.committee.kind
235 );
236 }
237
238 #[derive(Default)]
239 struct Vote<'a> {
240 commit: Option<&'a ExecutorCommitment>,
241 tally: u16,
242 }
243
244 let mut total = 0;
245 let mut commits = 0;
246 let mut failures = 0;
247
248 let mut votes: HashMap<Hash, Vote> = HashMap::new();
250 for n in &self.committee.members {
251 if !self.discrepancy && n.role != Role::Worker {
252 continue;
253 }
254 if self.discrepancy && n.role != Role::BackupWorker {
255 continue;
256 }
257
258 total += 1;
259 let commit = match self.execute_commitments.get(&n.public_key) {
260 Some(commit) => commit,
261 None => continue,
262 };
263 commits += 1;
264
265 if commit.is_indicating_failure() {
266 failures += 1;
267 continue;
268 }
269
270 let k = commit.to_vote();
271 match votes.get_mut(&k) {
272 Some(v) => v.tally += 1,
273 None => {
274 votes.insert(
275 k,
276 Vote {
277 tally: 1,
278 commit: Some(commit),
279 },
280 );
281 }
282 }
283
284 if !self.discrepancy && votes.len() > 1 {
285 self.discrepancy = true;
286 return Err(Error::DiscrepancyDetected.into());
287 }
288 }
289
290 let proposer = self
292 .committee
293 .transaction_scheduler(self.round)
294 .map_err(|_| Error::NoCommittee)?;
295 let proposer_commit = self.execute_commitments.get(&proposer.public_key);
296 if proposer_commit.is_none() && did_timeout {
297 return Err(Error::NoProposerCommitment.into());
298 }
299
300 match self.discrepancy {
301 false => {
302 let allowed_stragglers = self.runtime.executor.allowed_stragglers;
304
305 if failures > allowed_stragglers {
309 self.discrepancy = true;
310 return Err(Error::DiscrepancyDetected.into());
311 }
312
313 let mut required = total;
315
316 if did_timeout {
318 required -= allowed_stragglers;
319 commits -= failures }
321
322 if commits < required || proposer_commit.is_none() {
324 return Err(Error::StillWaiting.into());
325 }
326 }
327 true => {
328 let required = total / 2 + 1;
330
331 let mut top_vote = &Vote::default();
333 for v in votes.values() {
334 if v.tally > top_vote.tally {
335 top_vote = v;
336 }
337 }
338
339 let remaining = total - commits;
342 if top_vote.tally + remaining < required {
343 return Err(Error::InsufficientVotes.into());
344 }
345
346 if top_vote.tally < required || proposer_commit.is_none() {
348 if did_timeout {
349 return Err(Error::InsufficientVotes.into());
350 }
351 return Err(Error::StillWaiting.into());
352 }
353
354 let proposer_commit = proposer_commit.expect("proposer commit should be set");
355 let top_vote_commit = top_vote.commit.expect("top vote commit should be set");
356
357 if !proposer_commit.mostly_equal(top_vote_commit) {
359 return Err(Error::BadProposerCommitment.into());
360 }
361 }
362 }
363
364 let proposer_commit = proposer_commit.expect("proposer commit should be set");
366 Ok(proposer_commit)
367 }
368}
369
370#[cfg(test)]
371mod tests {
372 use anyhow::{anyhow, Result};
373
374 use crate::{
375 common::{
376 crypto::{
377 hash::Hash,
378 signature::{self, PublicKey, Signature},
379 },
380 namespace::Namespace,
381 versioned::Versioned,
382 },
383 consensus::{
384 registry::{
385 ExecutorParameters, Node, NodeRuntime, Runtime, RuntimeGovernanceModel,
386 RuntimeKind, TEEHardware,
387 },
388 roothash::{
389 Block, ComputeResultsHeader, Error, ExecutorCommitment, ExecutorCommitmentFailure,
390 ExecutorCommitmentHeader, HeaderType, Message, Pool, RegistryMessage,
391 StakingMessage,
392 },
393 scheduler::{Committee, CommitteeKind, CommitteeNode, Role},
394 staking::Transfer,
395 },
396 };
397
398 use super::NodeLookup;
399
400 struct StaticNodeLookup {
401 runtime: NodeRuntime,
402 }
403
404 impl NodeLookup for StaticNodeLookup {
405 fn node(&self, id: PublicKey) -> Result<Node, Error> {
406 Ok(Node {
407 id,
408 runtimes: Some(vec![self.runtime.clone()]),
409 ..Default::default()
410 })
411 }
412 }
413
414 #[test]
415 fn test_pool_single_commitment() {
416 let chain_context = "test: oasis-core tests".to_owned();
417
418 let id =
420 Namespace::from("0000000000000000000000000000000000000000000000000000000000000000");
421
422 let rt = Runtime {
423 id,
424 kind: RuntimeKind::KindCompute,
425 tee_hardware: TEEHardware::TEEHardwareInvalid,
426 executor: ExecutorParameters {
427 max_messages: 32,
428 ..Default::default()
429 },
430 governance_model: RuntimeGovernanceModel::GovernanceEntity,
431 ..Default::default()
432 };
433
434 let sk = signature::PrivateKey::generate();
436
437 let committee = Committee {
439 kind: CommitteeKind::ComputeExecutor,
440 members: vec![CommitteeNode {
441 role: Role::Worker,
442 public_key: sk.public_key(),
443 }],
444 runtime_id: id,
445 valid_for: 0,
446 };
447
448 let mut pool = Pool::new(rt, committee, 0);
450
451 let (child_blk, _, mut ec) = generate_executor_commitment(id, pool.round);
453
454 let nl = StaticNodeLookup {
455 runtime: NodeRuntime {
456 id,
457 ..Default::default()
458 },
459 };
460
461 let tcs: Vec<(&str, fn(&mut ExecutorCommitment), Error)> = vec![
463 (
464 "BlockBadRound",
465 |ec: &mut ExecutorCommitment| ec.header.header.round -= 1,
466 Error::NotBasedOnCorrectBlock,
467 ),
468 (
469 "BlockBadPreviousHash",
470 |ec: &mut ExecutorCommitment| {
471 ec.header.header.previous_hash = Hash::digest_bytes(b"invalid")
472 },
473 Error::NotBasedOnCorrectBlock,
474 ),
475 (
476 "MissingIORootHash",
477 |ec: &mut ExecutorCommitment| ec.header.header.io_root = None,
478 Error::BadExecutorCommitment,
479 ),
480 (
481 "MissingStateRootHash",
482 |ec: &mut ExecutorCommitment| ec.header.header.state_root = None,
483 Error::BadExecutorCommitment,
484 ),
485 (
486 "MissingMessagesHash",
487 |ec: &mut ExecutorCommitment| ec.header.header.messages_hash = None,
488 Error::BadExecutorCommitment,
489 ),
490 (
491 "MissingInMessagesHash",
492 |ec: &mut ExecutorCommitment| ec.header.header.in_msgs_hash = None,
493 Error::BadExecutorCommitment,
494 ),
495 (
496 "BadFailureIndicating",
497 |ec: &mut ExecutorCommitment| {
498 ec.header.failure = ExecutorCommitmentFailure::FailureUnknown
499 },
500 Error::BadExecutorCommitment,
501 ),
502 ];
503
504 let msg_validator = |_: &_| Ok(());
505 for (name, f, expected_err) in tcs {
506 let (_, _, mut invalid_ec) = generate_executor_commitment(id, pool.round);
507 f(&mut invalid_ec);
508
509 invalid_ec.node_id = sk.public_key();
510 let res = invalid_ec.sign(&sk, &id, &chain_context);
511 assert!(res.is_ok(), "invalid_ec.sign({})", name);
512
513 let res = pool.add_executor_commitment(
514 &child_blk,
515 &nl,
516 invalid_ec,
517 &msg_validator,
518 &chain_context,
519 );
520 assert!(res.is_err(), "add_executor_commitment({})", name);
521 assert_eq!(
522 res.err().unwrap().to_string(),
523 expected_err.to_string(),
524 "add_executor_commitment({})",
525 name
526 );
527 }
528
529 ec.node_id = sk.public_key();
531 let res = ec.sign(&sk, &id, &chain_context);
532 assert!(res.is_ok(), "ec.sign");
533
534 let res = pool.process_commitments(false);
536 assert_eq!(
537 res.err().unwrap().to_string(),
538 Error::StillWaiting.to_string(),
539 "process_commitments",
540 );
541
542 let res = pool.process_commitments(true);
543 assert_eq!(
544 res.err().unwrap().to_string(),
545 Error::NoProposerCommitment.to_string(),
546 "process_commitments",
547 );
548
549 let mut ec_with_msgs = ec.clone();
551 ec_with_msgs.messages = vec![
552 Message::Staking(Versioned {
553 version: 0,
554 inner: StakingMessage::Transfer(Transfer::default()),
555 }),
556 Message::Registry(Versioned {
557 version: 0,
558 inner: RegistryMessage::UpdateRuntime(Runtime::default()),
559 }),
560 ];
561 let msg_hash = Message::messages_hash(&ec_with_msgs.messages);
562 ec_with_msgs.header.header.messages_hash = Some(msg_hash);
563
564 let res = ec_with_msgs.sign(&sk, &id, &chain_context);
565 assert!(res.is_ok(), "ec_with_msgs.sign");
566
567 let error_msg = "message validation error";
568 let always_fail_msg_validator = |_: &_| -> Result<()> { Err(anyhow!(error_msg)) };
569 let res = pool.add_executor_commitment(
570 &child_blk,
571 &nl,
572 ec_with_msgs,
573 &always_fail_msg_validator,
574 &chain_context,
575 );
576 assert!(res.is_err(), "add_executor_commitment");
577 assert_eq!(
578 res.err().unwrap().to_string(),
579 error_msg,
580 "add_executor_commitment",
581 );
582
583 let res = pool.add_executor_commitment(
585 &child_blk,
586 &nl,
587 ec.clone(),
588 &msg_validator,
589 &chain_context,
590 );
591 assert!(res.is_ok(), "add_executor_commitment");
592
593 let res = pool.add_executor_commitment(
595 &child_blk,
596 &nl,
597 ec.clone(),
598 &msg_validator,
599 &chain_context,
600 );
601 assert!(res.is_err(), "add_executor_commitment, duplicate");
602
603 let res = pool.process_commitments(false);
605 assert!(res.is_ok(), "process_commitments");
606 let dd_ec = res
607 .unwrap()
608 .to_dd_result()
609 .downcast_ref::<ExecutorCommitment>();
610 assert_eq!(dd_ec, Some(&ec), "DD should return the correct commitment");
611 assert_eq!(false, pool.discrepancy);
612 }
613
614 fn generate_executor_commitment(
615 id: Namespace,
616 round: u64,
617 ) -> (Block, Block, ExecutorCommitment) {
618 let child_blk = Block::new_genesis_block(id, round);
619 let parent_blk = Block::new_empty_block(&child_blk, 1, HeaderType::Normal);
620
621 let msgs_hash = Message::messages_hash(&vec![]);
623 let in_msgs_hash = Message::in_messages_hash(&vec![]);
625
626 let ec = ExecutorCommitment {
627 header: ExecutorCommitmentHeader {
628 header: ComputeResultsHeader {
629 round: parent_blk.header.round,
630 previous_hash: parent_blk.header.previous_hash,
631 io_root: Some(parent_blk.header.io_root),
632 state_root: Some(parent_blk.header.state_root),
633 messages_hash: Some(msgs_hash),
634 in_msgs_hash: Some(in_msgs_hash),
635 in_msgs_count: 0,
636 },
637 failure: ExecutorCommitmentFailure::FailureNone,
638 rak_signature: None,
639 },
640 node_id: PublicKey::default(),
641 signature: Signature::default(),
642 messages: vec![],
643 };
644
645 (child_blk, parent_blk, ec)
646 }
647}