1use std::{collections::VecDeque, fmt};
3
4use anyhow::{Error, Result};
5
6use crate::storage::mkvs::{
7 self,
8 cache::{Cache, ReadSyncFetcher},
9 sync::{IterateRequest, Proof, ReadSync, TreeID},
10 tree::{Depth, Key, KeyTrait, NodeBox, NodeKind, NodePtrRef, Root, Tree},
11};
12
13pub(super) struct FetcherSyncIterate<'a> {
14 key: &'a Key,
15 prefetch: usize,
16}
17
18impl<'a> FetcherSyncIterate<'a> {
19 pub(super) fn new(key: &'a Key, prefetch: usize) -> Self {
20 Self { key, prefetch }
21 }
22}
23
24impl<'a> ReadSyncFetcher for FetcherSyncIterate<'a> {
25 fn fetch(&self, root: Root, ptr: NodePtrRef, rs: &mut Box<dyn ReadSync>) -> Result<Proof> {
26 let rsp = rs.sync_iterate(IterateRequest {
27 tree: TreeID {
28 root,
29 position: ptr.borrow().hash,
30 },
31 key: self.key.clone(),
32 prefetch: self.prefetch as u16,
33 })?;
34 Ok(rsp.proof)
35 }
36}
37
38#[derive(Debug, PartialEq)]
40enum VisitState {
41 Before,
42 At,
43 AtLeft,
44 After,
45}
46
47struct PathAtom {
50 ptr: NodePtrRef,
51 bit_depth: Depth,
52 path: Key,
53 state: VisitState,
54}
55
56impl fmt::Debug for PathAtom {
57 fn fmt(&self, f: &mut fmt::Formatter) -> std::result::Result<(), fmt::Error> {
58 f.debug_struct("PathAtom")
59 .field("bit_depth", &self.bit_depth)
60 .field("path", &self.path)
61 .field("state", &self.state)
62 .finish()
63 }
64}
65
66pub struct TreeIterator<'tree> {
68 tree: &'tree Tree,
69 prefetch: usize,
70 pos: VecDeque<PathAtom>,
71 key: Option<Key>,
72 value: Option<Vec<u8>>,
73 error: Option<Error>,
74}
75
76impl<'tree> TreeIterator<'tree> {
77 fn new(tree: &'tree Tree) -> Self {
79 Self {
80 tree,
81 prefetch: 0,
82 pos: VecDeque::new(),
83 key: None,
84 value: None,
85 error: None,
86 }
87 }
88
89 fn reset(&mut self) {
90 self.pos.clear();
91 self.key = None;
92 self.value = None;
93 }
94
95 fn next(&mut self) {
96 if self.error.is_some() {
97 return;
98 }
99
100 while !self.pos.is_empty() {
101 let atom = self.pos.pop_front().expect("not empty");
103 let mut remainder = std::mem::take(&mut self.pos);
104
105 let mut cache = self.tree.cache.borrow_mut();
107 cache.mark_position();
108 for atom in &remainder {
109 cache.use_node(atom.ptr.clone());
110 }
111 drop(cache);
112
113 let key = self.key.take().expect("iterator is valid");
116 self.reset();
117 if let Err(error) =
118 self._next(atom.ptr, atom.bit_depth, atom.path, key.clone(), atom.state)
119 {
120 self.error = Some(error);
121 self.reset();
122 return;
123 }
124 if self.key.is_some() {
125 self.pos.append(&mut remainder);
127 return;
128 }
129
130 self.key = Some(key);
131 self.pos = remainder;
132 }
133
134 self.key = None;
136 self.value = None;
137 }
138
139 fn _next(
140 &mut self,
141 ptr: NodePtrRef,
142 bit_depth: Depth,
143 path: Key,
144 mut key: Key,
145 mut state: VisitState,
146 ) -> Result<()> {
147 let node_ref = self.tree.cache.borrow_mut().deref_node_ptr(
148 ptr.clone(),
149 Some(FetcherSyncIterate::new(&key, self.prefetch)),
150 )?;
151
152 match classify_noderef!(?node_ref) {
153 NodeKind::None => {
154 Ok(())
156 }
157 NodeKind::Internal => {
158 let node_ref = node_ref.unwrap();
159 if let NodeBox::Internal(ref n) = *node_ref.borrow() {
160 let bit_length = bit_depth + n.label_bit_length;
162 let new_path = path.merge(bit_depth, &n.label, n.label_bit_length);
163
164 let take_first =
167 bit_length > 0 && key.bit_length() >= bit_length && key < new_path;
168
169 if (state == VisitState::Before
171 && (key.bit_length() <= bit_length || take_first))
172 || state == VisitState::At
173 {
174 if state == VisitState::Before {
175 self._next(
176 n.leaf_node.clone(),
177 bit_length,
178 path.clone(),
179 key.clone(),
180 VisitState::Before,
181 )?;
182 if self.key.is_some() {
183 self.pos.push_back(PathAtom {
185 ptr,
186 bit_depth,
187 path,
188 state: VisitState::At,
189 });
190 return Ok(());
191 }
192 }
193 if key.bit_length() <= bit_length {
195 key = key.append_bit(bit_length, false);
196 }
197 }
198
199 if state == VisitState::Before {
200 state = VisitState::At;
201 }
202
203 if (state == VisitState::At && (!key.get_bit(bit_length) || take_first))
205 || state == VisitState::AtLeft
206 {
207 if state == VisitState::At {
208 self._next(
209 n.left.clone(),
210 bit_length,
211 new_path.append_bit(bit_length, false),
212 key.clone(),
213 VisitState::Before,
214 )?;
215 if self.key.is_some() {
216 self.pos.push_back(PathAtom {
218 ptr,
219 bit_depth,
220 path,
221 state: VisitState::AtLeft,
222 });
223 return Ok(());
224 }
225 }
226 key = key.split(bit_length, key.bit_length()).0;
228 key = key.append_bit(bit_length, true);
229 }
230
231 if state == VisitState::At || state == VisitState::AtLeft {
232 self._next(
233 n.right.clone(),
234 bit_length,
235 new_path.append_bit(bit_length, true),
236 key,
237 VisitState::Before,
238 )?;
239 if self.key.is_some() {
240 self.pos.push_back(PathAtom {
242 ptr,
243 bit_depth,
244 path,
245 state: VisitState::After,
246 });
247 return Ok(());
248 }
249 }
250
251 return Ok(());
252 }
253
254 unreachable!("node kind is internal node");
255 }
256 NodeKind::Leaf => {
257 let node_ref = node_ref.unwrap();
259 if let NodeBox::Leaf(ref n) = *node_ref.borrow() {
260 if n.key >= key {
261 self.key = Some(n.key.clone());
262 self.value = Some(n.value.clone());
263 }
264 } else {
265 unreachable!("node kind is leaf node");
266 }
267
268 Ok(())
269 }
270 }
271 }
272}
273
274impl<'tree> Iterator for TreeIterator<'tree> {
275 type Item = (Vec<u8>, Vec<u8>);
276
277 fn next(&mut self) -> Option<Self::Item> {
278 use mkvs::Iterator;
279
280 if !self.is_valid() {
281 return None;
282 }
283
284 let key = self.key.as_ref().expect("iterator is valid").clone();
285 let value = self.value.as_ref().expect("iterator is valid").clone();
286 TreeIterator::next(self);
287
288 Some((key, value))
289 }
290}
291
292impl<'tree> mkvs::Iterator for TreeIterator<'tree> {
293 fn set_prefetch(&mut self, prefetch: usize) {
294 self.prefetch = prefetch;
295 }
296
297 fn is_valid(&self) -> bool {
298 self.key.is_some()
299 }
300
301 fn error(&self) -> &Option<Error> {
302 &self.error
303 }
304
305 fn rewind(&mut self) {
306 self.seek(&[])
307 }
308
309 fn seek(&mut self, key: &[u8]) {
310 if self.error.is_some() {
311 return;
312 }
313
314 self.reset();
315 let pending_root = self.tree.cache.borrow().get_pending_root();
316 if let Err(error) = self._next(
317 pending_root,
318 0,
319 Key::new(),
320 key.to_vec(),
321 VisitState::Before,
322 ) {
323 self.error = Some(error);
324 self.reset();
325 }
326 }
327
328 fn get_key(&self) -> &Option<Key> {
329 &self.key
330 }
331
332 fn get_value(&self) -> &Option<Vec<u8>> {
333 &self.value
334 }
335
336 fn next(&mut self) {
337 TreeIterator::next(self)
338 }
339}
340
341impl Tree {
342 pub fn iter(&self) -> TreeIterator {
344 TreeIterator::new(self)
345 }
346}
347
348#[cfg(test)]
349pub(super) mod test {
350 use std::iter;
351
352 use rustc_hex::FromHex;
353
354 use super::{super::tree_test::generate_key_value_pairs_ex, *};
355 use crate::storage::mkvs::{
356 interop::{Driver, ProtocolServer},
357 sync::{NoopReadSyncer, StatsCollector},
358 Iterator, OverlayTree, RootType,
359 };
360
361 #[test]
362 fn test_iterator() {
363 let server = ProtocolServer::new(None);
364
365 let mut tree = Tree::builder()
366 .with_root_type(RootType::State)
367 .build(Box::new(NoopReadSyncer));
368
369 let mut it = tree.iter();
371 it.rewind();
372 assert!(
373 !it.is_valid(),
374 "iterator should be invalid on an empty tree"
375 );
376
377 tree.insert(b"key", b"first").unwrap();
379 let mut it = tree.iter();
380 it.rewind();
381 assert!(
382 it.is_valid(),
383 "iterator should be valid on a non-empty tree"
384 );
385
386 let items = vec![
388 (b"key".to_vec(), b"first".to_vec()),
389 (b"key 1".to_vec(), b"one".to_vec()),
390 (b"key 2".to_vec(), b"two".to_vec()),
391 (b"key 5".to_vec(), b"five".to_vec()),
392 (b"key 8".to_vec(), b"eight".to_vec()),
393 (b"key 9".to_vec(), b"nine".to_vec()),
394 ];
395 for (key, value) in items.iter() {
396 tree.insert(key, value).unwrap();
397 }
398
399 let tests = vec![
400 (b"k".to_vec(), 0),
401 (b"key 1".to_vec(), 1),
402 (b"key 3".to_vec(), 3),
403 (b"key 4".to_vec(), 3),
404 (b"key 5".to_vec(), 3),
405 (b"key 6".to_vec(), 4),
406 (b"key 7".to_vec(), 4),
407 (b"key 8".to_vec(), 4),
408 (b"key 9".to_vec(), 5),
409 (b"key A".to_vec(), -1),
410 ];
411
412 let it = tree.iter();
414 test_iterator_with(&items, it, &tests);
415
416 let hash = tree.commit(Default::default(), 0).expect("commit");
418 let write_log = items
419 .iter()
420 .cloned()
421 .map(|(key, value)| mkvs::LogEntry {
422 key,
423 value: Some(value),
424 })
425 .collect();
426 server.apply(&write_log, hash, Default::default(), 0);
427
428 let remote_tree = Tree::builder()
429 .with_capacity(0, 0)
430 .with_root(Root {
431 root_type: RootType::State,
432 hash,
433 ..Default::default()
434 })
435 .build(server.read_sync());
436
437 let it = remote_tree.iter();
438 test_iterator_with(&items, it, &tests);
439
440 let stats = StatsCollector::new(server.read_sync());
442 let remote_tree = Tree::builder()
443 .with_capacity(0, 0)
444 .with_root(Root {
445 root_type: RootType::State,
446 hash,
447 ..Default::default()
448 })
449 .build(Box::new(stats));
450
451 let mut it = remote_tree.iter();
452 it.set_prefetch(10);
453 test_iterator_with(&items, it, &tests);
454
455 let cache = remote_tree.cache.borrow();
456 let stats = cache
457 .get_read_syncer()
458 .as_any()
459 .downcast_ref::<StatsCollector>()
460 .expect("stats");
461 assert_eq!(0, stats.sync_get_count, "sync_get_count");
462 assert_eq!(0, stats.sync_get_prefixes_count, "sync_get_prefixes_count");
463 assert_eq!(1, stats.sync_iterate_count, "sync_iterate_count");
464
465 let stats = StatsCollector::new(server.read_sync());
467 let remote_tree = Tree::builder()
468 .with_capacity(0, 0)
469 .with_root(Root {
470 root_type: RootType::State,
471 hash,
472 ..Default::default()
473 })
474 .build(Box::new(stats));
475
476 let mut it = remote_tree.iter();
477 it.set_prefetch(3);
478 test_iterator_with(&items, it, &tests);
479
480 let cache = remote_tree.cache.borrow();
481 let stats = cache
482 .get_read_syncer()
483 .as_any()
484 .downcast_ref::<StatsCollector>()
485 .expect("stats");
486 assert_eq!(0, stats.sync_get_count, "sync_get_count");
487 assert_eq!(0, stats.sync_get_prefixes_count, "sync_get_prefixes_count");
488 assert_eq!(2, stats.sync_iterate_count, "sync_iterate_count");
489 }
490
491 #[test]
492 fn test_iterator_case1() {
493 let mut tree = Tree::builder()
494 .with_root_type(RootType::State)
495 .build(Box::new(NoopReadSyncer));
496
497 let items = vec![
498 (b"key 5".to_vec(), b"fivey".to_vec()),
499 (b"key 7".to_vec(), b"seven".to_vec()),
500 ];
501 for (key, value) in items.iter() {
502 tree.insert(key, value).unwrap();
503 }
504
505 let tests = vec![(b"key 3".to_vec(), 0)];
506
507 let it = tree.iter();
508 test_iterator_with(&items, it, &tests);
509 }
510
511 #[test]
512 fn test_iterator_case2() {
513 let mut tree = Tree::builder()
514 .with_root_type(RootType::State)
515 .build(Box::new(NoopReadSyncer));
516
517 let items: Vec<(Vec<u8>, Vec<u8>)> = vec![
518 (
519 "54dcb497eb46bc7cb1a1a29d143d5d41f1a684c97e12f2ae536eceb828c15fc34c02"
520 .from_hex()
521 .unwrap(),
522 b"value".to_vec(),
523 ),
524 (
525 "54dcb497eb46bc7cb1a1a29d143d5d41f1a684c97e12f2ae536eceb828c15fc34c02"
526 .from_hex()
527 .unwrap(),
528 b"value".to_vec(),
529 ),
530 ];
531 for (key, value) in items.iter() {
532 tree.insert(key, value).unwrap();
533 }
534
535 let mut it = tree.iter();
536 let missing_key: Vec<u8> =
537 "54da85be3251772db943cba67341d402117c87ada2a9e8aad7171d40b6b4dc9fbc"
538 .from_hex()
539 .unwrap();
540 it.seek(&missing_key);
541 assert!(it.is_valid(), "iterator should be valid");
542 let item = iter::Iterator::next(&mut it);
543 assert_eq!(
544 Some((items[0].0.clone(), b"value".to_vec())),
545 item,
546 "value should be correct"
547 );
548 }
549
550 #[test]
551 fn test_iterator_eviction() {
552 let server = ProtocolServer::new(None);
553
554 let mut tree = OverlayTree::new(
555 Tree::builder()
556 .with_capacity(0, 0)
557 .with_root_type(RootType::State)
558 .build(Box::new(NoopReadSyncer)),
559 );
560
561 let (keys, values) = generate_key_value_pairs_ex("T".to_owned(), 100);
562 let items: Vec<(Vec<u8>, Vec<u8>)> = keys.into_iter().zip(values.into_iter()).collect();
563 for (key, value) in &items {
564 tree.insert(&key, &value).unwrap();
565 }
566
567 let (write_log, hash) = tree.commit_both(Default::default(), 0).expect("commit");
568 server.apply(&write_log, hash, Default::default(), 0);
569
570 let stats = StatsCollector::new(server.read_sync());
573 let remote_tree = Tree::builder()
574 .with_capacity(50, 16 * 1024 * 1024)
575 .with_root(Root {
576 root_type: RootType::State,
577 hash,
578 ..Default::default()
579 })
580 .build(Box::new(stats));
581
582 let mut it = remote_tree.iter();
583 it.set_prefetch(1000);
584 test_iterator_with(&items, it, &vec![]);
585
586 let cache = remote_tree.cache.borrow();
587 let stats = cache
588 .get_read_syncer()
589 .as_any()
590 .downcast_ref::<StatsCollector>()
591 .expect("stats");
592 assert_eq!(0, stats.sync_get_count, "sync_get_count");
593 assert_eq!(0, stats.sync_get_prefixes_count, "sync_get_prefixes_count");
594 assert_eq!(2, stats.sync_iterate_count, "sync_iterate_count");
597 }
598
599 pub(in super::super) fn test_iterator_with<I: mkvs::Iterator>(
600 items: &[(Vec<u8>, Vec<u8>)],
601 mut it: I,
602 tests: &[(Vec<u8>, isize)],
603 ) {
604 let mut iterations = 0;
606 it.rewind();
607 for (idx, (key, value)) in it.by_ref().enumerate() {
608 if !tests.is_empty() {
609 assert_eq!(items[idx].0, key, "iterator should have the correct key");
610 assert_eq!(
611 items[idx].1, value,
612 "iterator should have the correct value"
613 );
614 }
615 iterations += 1;
616 }
617 assert!(it.error().is_none(), "iterator should not error");
618 assert_eq!(iterations, items.len(), "iterator should go over all items");
619
620 for (seek, pos) in tests {
621 it.seek(&seek);
622 if *pos == -1 {
623 assert!(!it.is_valid(), "iterator should not be valid after seek");
624 continue;
625 }
626
627 for expected in &items[*pos as usize..] {
628 let item = iter::Iterator::next(&mut it);
629 assert_eq!(
630 Some(expected.clone()),
631 item,
632 "iterator should have the correct item"
633 );
634 }
635 }
636 }
637}