oasis_core_runtime/storage/mkvs/tree/
iterator.rs

1//! Tree iterator.
2use std::{collections::VecDeque, fmt};
3
4use anyhow::{Error, Result};
5
6use crate::storage::mkvs::{
7    self,
8    cache::{Cache, ReadSyncFetcher},
9    sync::{IterateRequest, Proof, ReadSync, TreeID},
10    tree::{Depth, Key, KeyTrait, NodeBox, NodeKind, NodePtrRef, Root, Tree},
11};
12
13pub(super) struct FetcherSyncIterate<'a> {
14    key: &'a Key,
15    prefetch: usize,
16}
17
18impl<'a> FetcherSyncIterate<'a> {
19    pub(super) fn new(key: &'a Key, prefetch: usize) -> Self {
20        Self { key, prefetch }
21    }
22}
23
24impl<'a> ReadSyncFetcher for FetcherSyncIterate<'a> {
25    fn fetch(&self, root: Root, ptr: NodePtrRef, rs: &mut Box<dyn ReadSync>) -> Result<Proof> {
26        let rsp = rs.sync_iterate(IterateRequest {
27            tree: TreeID {
28                root,
29                position: ptr.borrow().hash,
30            },
31            key: self.key.clone(),
32            prefetch: self.prefetch as u16,
33        })?;
34        Ok(rsp.proof)
35    }
36}
37
38/// Visit state of a node.
39#[derive(Debug, PartialEq)]
40enum VisitState {
41    Before,
42    At,
43    AtLeft,
44    After,
45}
46
47/// Atom in the current iterator path. Can be used to resume iteration
48/// from a given position.
49struct PathAtom {
50    ptr: NodePtrRef,
51    bit_depth: Depth,
52    path: Key,
53    state: VisitState,
54}
55
56impl fmt::Debug for PathAtom {
57    fn fmt(&self, f: &mut fmt::Formatter) -> std::result::Result<(), fmt::Error> {
58        f.debug_struct("PathAtom")
59            .field("bit_depth", &self.bit_depth)
60            .field("path", &self.path)
61            .field("state", &self.state)
62            .finish()
63    }
64}
65
66/// Tree iterator.
67pub struct TreeIterator<'tree> {
68    tree: &'tree Tree,
69    prefetch: usize,
70    pos: VecDeque<PathAtom>,
71    key: Option<Key>,
72    value: Option<Vec<u8>>,
73    error: Option<Error>,
74}
75
76impl<'tree> TreeIterator<'tree> {
77    /// Create a new tree iterator.
78    fn new(tree: &'tree Tree) -> Self {
79        Self {
80            tree,
81            prefetch: 0,
82            pos: VecDeque::new(),
83            key: None,
84            value: None,
85            error: None,
86        }
87    }
88
89    fn reset(&mut self) {
90        self.pos.clear();
91        self.key = None;
92        self.value = None;
93    }
94
95    fn next(&mut self) {
96        if self.error.is_some() {
97            return;
98        }
99
100        while !self.pos.is_empty() {
101            // Start where we left off.
102            let atom = self.pos.pop_front().expect("not empty");
103            let mut remainder = std::mem::take(&mut self.pos);
104
105            // Remember where the path from root to target node ends (will end).
106            let mut cache = self.tree.cache.borrow_mut();
107            cache.mark_position();
108            for atom in &remainder {
109                cache.use_node(atom.ptr.clone());
110            }
111            drop(cache);
112
113            // Try to proceed with the current node. If we don't succeed, proceed to the
114            // next node.
115            let key = self.key.take().expect("iterator is valid");
116            self.reset();
117            if let Err(error) =
118                self._next(atom.ptr, atom.bit_depth, atom.path, key.clone(), atom.state)
119            {
120                self.error = Some(error);
121                self.reset();
122                return;
123            }
124            if self.key.is_some() {
125                // Key has been found.
126                self.pos.append(&mut remainder);
127                return;
128            }
129
130            self.key = Some(key);
131            self.pos = remainder;
132        }
133
134        // We have reached the end of the tree, make sure everything is reset.
135        self.key = None;
136        self.value = None;
137    }
138
139    fn _next(
140        &mut self,
141        ptr: NodePtrRef,
142        bit_depth: Depth,
143        path: Key,
144        mut key: Key,
145        mut state: VisitState,
146    ) -> Result<()> {
147        let node_ref = self.tree.cache.borrow_mut().deref_node_ptr(
148            ptr.clone(),
149            Some(FetcherSyncIterate::new(&key, self.prefetch)),
150        )?;
151
152        match classify_noderef!(?node_ref) {
153            NodeKind::None => {
154                // Reached a nil node, there is nothing here.
155                Ok(())
156            }
157            NodeKind::Internal => {
158                let node_ref = node_ref.unwrap();
159                if let NodeBox::Internal(ref n) = *node_ref.borrow() {
160                    // Internal node.
161                    let bit_length = bit_depth + n.label_bit_length;
162                    let new_path = path.merge(bit_depth, &n.label, n.label_bit_length);
163
164                    // Check if the key is longer than the current path but lexicographically smaller. In this
165                    // case everything in this subtree will be larger so we need to take the first value.
166                    let take_first =
167                        bit_length > 0 && key.bit_length() >= bit_length && key < new_path;
168
169                    // Does lookup key end here? Look into LeafNode.
170                    if (state == VisitState::Before
171                        && (key.bit_length() <= bit_length || take_first))
172                        || state == VisitState::At
173                    {
174                        if state == VisitState::Before {
175                            self._next(
176                                n.leaf_node.clone(),
177                                bit_length,
178                                path.clone(),
179                                key.clone(),
180                                VisitState::Before,
181                            )?;
182                            if self.key.is_some() {
183                                // Key has been found.
184                                self.pos.push_back(PathAtom {
185                                    ptr,
186                                    bit_depth,
187                                    path,
188                                    state: VisitState::At,
189                                });
190                                return Ok(());
191                            }
192                        }
193                        // Key has not been found, continue with search for next key.
194                        if key.bit_length() <= bit_length {
195                            key = key.append_bit(bit_length, false);
196                        }
197                    }
198
199                    if state == VisitState::Before {
200                        state = VisitState::At;
201                    }
202
203                    // Continue recursively based on a bit value.
204                    if (state == VisitState::At && (!key.get_bit(bit_length) || take_first))
205                        || state == VisitState::AtLeft
206                    {
207                        if state == VisitState::At {
208                            self._next(
209                                n.left.clone(),
210                                bit_length,
211                                new_path.append_bit(bit_length, false),
212                                key.clone(),
213                                VisitState::Before,
214                            )?;
215                            if self.key.is_some() {
216                                // Key has been found.
217                                self.pos.push_back(PathAtom {
218                                    ptr,
219                                    bit_depth,
220                                    path,
221                                    state: VisitState::AtLeft,
222                                });
223                                return Ok(());
224                            }
225                        }
226                        // Key has not been found, continue with search for next key.
227                        key = key.split(bit_length, key.bit_length()).0;
228                        key = key.append_bit(bit_length, true);
229                    }
230
231                    if state == VisitState::At || state == VisitState::AtLeft {
232                        self._next(
233                            n.right.clone(),
234                            bit_length,
235                            new_path.append_bit(bit_length, true),
236                            key,
237                            VisitState::Before,
238                        )?;
239                        if self.key.is_some() {
240                            // Key has been found.
241                            self.pos.push_back(PathAtom {
242                                ptr,
243                                bit_depth,
244                                path,
245                                state: VisitState::After,
246                            });
247                            return Ok(());
248                        }
249                    }
250
251                    return Ok(());
252                }
253
254                unreachable!("node kind is internal node");
255            }
256            NodeKind::Leaf => {
257                // Reached a leaf node.
258                let node_ref = node_ref.unwrap();
259                if let NodeBox::Leaf(ref n) = *node_ref.borrow() {
260                    if n.key >= key {
261                        self.key = Some(n.key.clone());
262                        self.value = Some(n.value.clone());
263                    }
264                } else {
265                    unreachable!("node kind is leaf node");
266                }
267
268                Ok(())
269            }
270        }
271    }
272}
273
274impl<'tree> Iterator for TreeIterator<'tree> {
275    type Item = (Vec<u8>, Vec<u8>);
276
277    fn next(&mut self) -> Option<Self::Item> {
278        use mkvs::Iterator;
279
280        if !self.is_valid() {
281            return None;
282        }
283
284        let key = self.key.as_ref().expect("iterator is valid").clone();
285        let value = self.value.as_ref().expect("iterator is valid").clone();
286        TreeIterator::next(self);
287
288        Some((key, value))
289    }
290}
291
292impl<'tree> mkvs::Iterator for TreeIterator<'tree> {
293    fn set_prefetch(&mut self, prefetch: usize) {
294        self.prefetch = prefetch;
295    }
296
297    fn is_valid(&self) -> bool {
298        self.key.is_some()
299    }
300
301    fn error(&self) -> &Option<Error> {
302        &self.error
303    }
304
305    fn rewind(&mut self) {
306        self.seek(&[])
307    }
308
309    fn seek(&mut self, key: &[u8]) {
310        if self.error.is_some() {
311            return;
312        }
313
314        self.reset();
315        let pending_root = self.tree.cache.borrow().get_pending_root();
316        if let Err(error) = self._next(
317            pending_root,
318            0,
319            Key::new(),
320            key.to_vec(),
321            VisitState::Before,
322        ) {
323            self.error = Some(error);
324            self.reset();
325        }
326    }
327
328    fn get_key(&self) -> &Option<Key> {
329        &self.key
330    }
331
332    fn get_value(&self) -> &Option<Vec<u8>> {
333        &self.value
334    }
335
336    fn next(&mut self) {
337        TreeIterator::next(self)
338    }
339}
340
341impl Tree {
342    /// Return an iterator over the tree.
343    pub fn iter(&self) -> TreeIterator {
344        TreeIterator::new(self)
345    }
346}
347
348#[cfg(test)]
349pub(super) mod test {
350    use std::iter;
351
352    use rustc_hex::FromHex;
353
354    use super::{super::tree_test::generate_key_value_pairs_ex, *};
355    use crate::storage::mkvs::{
356        interop::{Driver, ProtocolServer},
357        sync::{NoopReadSyncer, StatsCollector},
358        Iterator, OverlayTree, RootType,
359    };
360
361    #[test]
362    fn test_iterator() {
363        let server = ProtocolServer::new(None);
364
365        let mut tree = Tree::builder()
366            .with_root_type(RootType::State)
367            .build(Box::new(NoopReadSyncer));
368
369        // Test with an empty tree.
370        let mut it = tree.iter();
371        it.rewind();
372        assert!(
373            !it.is_valid(),
374            "iterator should be invalid on an empty tree"
375        );
376
377        // Test with one item.
378        tree.insert(b"key", b"first").unwrap();
379        let mut it = tree.iter();
380        it.rewind();
381        assert!(
382            it.is_valid(),
383            "iterator should be valid on a non-empty tree"
384        );
385
386        // Insert some items.
387        let items = vec![
388            (b"key".to_vec(), b"first".to_vec()),
389            (b"key 1".to_vec(), b"one".to_vec()),
390            (b"key 2".to_vec(), b"two".to_vec()),
391            (b"key 5".to_vec(), b"five".to_vec()),
392            (b"key 8".to_vec(), b"eight".to_vec()),
393            (b"key 9".to_vec(), b"nine".to_vec()),
394        ];
395        for (key, value) in items.iter() {
396            tree.insert(key, value).unwrap();
397        }
398
399        let tests = vec![
400            (b"k".to_vec(), 0),
401            (b"key 1".to_vec(), 1),
402            (b"key 3".to_vec(), 3),
403            (b"key 4".to_vec(), 3),
404            (b"key 5".to_vec(), 3),
405            (b"key 6".to_vec(), 4),
406            (b"key 7".to_vec(), 4),
407            (b"key 8".to_vec(), 4),
408            (b"key 9".to_vec(), 5),
409            (b"key A".to_vec(), -1),
410        ];
411
412        // Direct.
413        let it = tree.iter();
414        test_iterator_with(&items, it, &tests);
415
416        // Remote.
417        let hash = tree.commit(Default::default(), 0).expect("commit");
418        let write_log = items
419            .iter()
420            .cloned()
421            .map(|(key, value)| mkvs::LogEntry {
422                key,
423                value: Some(value),
424            })
425            .collect();
426        server.apply(&write_log, hash, Default::default(), 0);
427
428        let remote_tree = Tree::builder()
429            .with_capacity(0, 0)
430            .with_root(Root {
431                root_type: RootType::State,
432                hash,
433                ..Default::default()
434            })
435            .build(server.read_sync());
436
437        let it = remote_tree.iter();
438        test_iterator_with(&items, it, &tests);
439
440        // Remote with prefetch (10).
441        let stats = StatsCollector::new(server.read_sync());
442        let remote_tree = Tree::builder()
443            .with_capacity(0, 0)
444            .with_root(Root {
445                root_type: RootType::State,
446                hash,
447                ..Default::default()
448            })
449            .build(Box::new(stats));
450
451        let mut it = remote_tree.iter();
452        it.set_prefetch(10);
453        test_iterator_with(&items, it, &tests);
454
455        let cache = remote_tree.cache.borrow();
456        let stats = cache
457            .get_read_syncer()
458            .as_any()
459            .downcast_ref::<StatsCollector>()
460            .expect("stats");
461        assert_eq!(0, stats.sync_get_count, "sync_get_count");
462        assert_eq!(0, stats.sync_get_prefixes_count, "sync_get_prefixes_count");
463        assert_eq!(1, stats.sync_iterate_count, "sync_iterate_count");
464
465        // Remote with prefetch (3).
466        let stats = StatsCollector::new(server.read_sync());
467        let remote_tree = Tree::builder()
468            .with_capacity(0, 0)
469            .with_root(Root {
470                root_type: RootType::State,
471                hash,
472                ..Default::default()
473            })
474            .build(Box::new(stats));
475
476        let mut it = remote_tree.iter();
477        it.set_prefetch(3);
478        test_iterator_with(&items, it, &tests);
479
480        let cache = remote_tree.cache.borrow();
481        let stats = cache
482            .get_read_syncer()
483            .as_any()
484            .downcast_ref::<StatsCollector>()
485            .expect("stats");
486        assert_eq!(0, stats.sync_get_count, "sync_get_count");
487        assert_eq!(0, stats.sync_get_prefixes_count, "sync_get_prefixes_count");
488        assert_eq!(2, stats.sync_iterate_count, "sync_iterate_count");
489    }
490
491    #[test]
492    fn test_iterator_case1() {
493        let mut tree = Tree::builder()
494            .with_root_type(RootType::State)
495            .build(Box::new(NoopReadSyncer));
496
497        let items = vec![
498            (b"key 5".to_vec(), b"fivey".to_vec()),
499            (b"key 7".to_vec(), b"seven".to_vec()),
500        ];
501        for (key, value) in items.iter() {
502            tree.insert(key, value).unwrap();
503        }
504
505        let tests = vec![(b"key 3".to_vec(), 0)];
506
507        let it = tree.iter();
508        test_iterator_with(&items, it, &tests);
509    }
510
511    #[test]
512    fn test_iterator_case2() {
513        let mut tree = Tree::builder()
514            .with_root_type(RootType::State)
515            .build(Box::new(NoopReadSyncer));
516
517        let items: Vec<(Vec<u8>, Vec<u8>)> = vec![
518            (
519                "54dcb497eb46bc7cb1a1a29d143d5d41f1a684c97e12f2ae536eceb828c15fc34c02"
520                    .from_hex()
521                    .unwrap(),
522                b"value".to_vec(),
523            ),
524            (
525                "54dcb497eb46bc7cb1a1a29d143d5d41f1a684c97e12f2ae536eceb828c15fc34c02"
526                    .from_hex()
527                    .unwrap(),
528                b"value".to_vec(),
529            ),
530        ];
531        for (key, value) in items.iter() {
532            tree.insert(key, value).unwrap();
533        }
534
535        let mut it = tree.iter();
536        let missing_key: Vec<u8> =
537            "54da85be3251772db943cba67341d402117c87ada2a9e8aad7171d40b6b4dc9fbc"
538                .from_hex()
539                .unwrap();
540        it.seek(&missing_key);
541        assert!(it.is_valid(), "iterator should be valid");
542        let item = iter::Iterator::next(&mut it);
543        assert_eq!(
544            Some((items[0].0.clone(), b"value".to_vec())),
545            item,
546            "value should be correct"
547        );
548    }
549
550    #[test]
551    fn test_iterator_eviction() {
552        let server = ProtocolServer::new(None);
553
554        let mut tree = OverlayTree::new(
555            Tree::builder()
556                .with_capacity(0, 0)
557                .with_root_type(RootType::State)
558                .build(Box::new(NoopReadSyncer)),
559        );
560
561        let (keys, values) = generate_key_value_pairs_ex("T".to_owned(), 100);
562        let items: Vec<(Vec<u8>, Vec<u8>)> = keys.into_iter().zip(values.into_iter()).collect();
563        for (key, value) in &items {
564            tree.insert(&key, &value).unwrap();
565        }
566
567        let (write_log, hash) = tree.commit_both(Default::default(), 0).expect("commit");
568        server.apply(&write_log, hash, Default::default(), 0);
569
570        // Create a remote tree with limited cache capacity so that nodes will
571        // be evicted while iterating.
572        let stats = StatsCollector::new(server.read_sync());
573        let remote_tree = Tree::builder()
574            .with_capacity(50, 16 * 1024 * 1024)
575            .with_root(Root {
576                root_type: RootType::State,
577                hash,
578                ..Default::default()
579            })
580            .build(Box::new(stats));
581
582        let mut it = remote_tree.iter();
583        it.set_prefetch(1000);
584        test_iterator_with(&items, it, &vec![]);
585
586        let cache = remote_tree.cache.borrow();
587        let stats = cache
588            .get_read_syncer()
589            .as_any()
590            .downcast_ref::<StatsCollector>()
591            .expect("stats");
592        assert_eq!(0, stats.sync_get_count, "sync_get_count");
593        assert_eq!(0, stats.sync_get_prefixes_count, "sync_get_prefixes_count");
594        // We require multiple fetches as we can only store a limited amount of
595        // results per fetch due to the cache being too small.
596        assert_eq!(2, stats.sync_iterate_count, "sync_iterate_count");
597    }
598
599    pub(in super::super) fn test_iterator_with<I: mkvs::Iterator>(
600        items: &[(Vec<u8>, Vec<u8>)],
601        mut it: I,
602        tests: &[(Vec<u8>, isize)],
603    ) {
604        // Iterate through the whole tree.
605        let mut iterations = 0;
606        it.rewind();
607        for (idx, (key, value)) in it.by_ref().enumerate() {
608            if !tests.is_empty() {
609                assert_eq!(items[idx].0, key, "iterator should have the correct key");
610                assert_eq!(
611                    items[idx].1, value,
612                    "iterator should have the correct value"
613                );
614            }
615            iterations += 1;
616        }
617        assert!(it.error().is_none(), "iterator should not error");
618        assert_eq!(iterations, items.len(), "iterator should go over all items");
619
620        for (seek, pos) in tests {
621            it.seek(&seek);
622            if *pos == -1 {
623                assert!(!it.is_valid(), "iterator should not be valid after seek");
624                continue;
625            }
626
627            for expected in &items[*pos as usize..] {
628                let item = iter::Iterator::next(&mut it);
629                assert_eq!(
630                    Some(expected.clone()),
631                    item,
632                    "iterator should have the correct item"
633                );
634            }
635        }
636    }
637}