oasis_core_runtime/storage/mkvs/tree/
insert.rs

1use std::mem;
2
3use anyhow::{anyhow, Result};
4
5use crate::storage::mkvs::{
6    cache::Cache,
7    tree::{Depth, Key, KeyTrait, NodeBox, NodeKind, NodePointer, NodePtrRef, Tree, Value},
8};
9
10use super::lookup::FetcherSyncGet;
11
12impl Tree {
13    /// Insert a key/value pair into the tree.
14    pub fn insert(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
15        let pending_root = self.cache.borrow().get_pending_root();
16        let boxed_key = key.to_vec();
17        let boxed_val = value.to_vec();
18
19        // Remember where the path from root to target node ends (will end).
20        self.cache.borrow_mut().mark_position();
21
22        let (new_root, old_val) = self._insert(pending_root, 0, &boxed_key, boxed_val)?;
23        self.cache.borrow_mut().set_pending_root(new_root);
24
25        Ok(old_val)
26    }
27
28    fn _insert(
29        &mut self,
30        ptr: NodePtrRef,
31        bit_depth: Depth,
32        key: &Key,
33        val: Value,
34    ) -> Result<(NodePtrRef, Option<Value>)> {
35        let node_ref = self
36            .cache
37            .borrow_mut()
38            .deref_node_ptr(ptr.clone(), Some(FetcherSyncGet::new(key, false)))?;
39
40        let (_, key_remainder) = key.split(bit_depth, key.bit_length());
41
42        match classify_noderef!(?node_ref) {
43            NodeKind::None => {
44                return Ok((self.cache.borrow_mut().new_leaf_node(key, val), None));
45            }
46            NodeKind::Internal => {
47                let node_ref = node_ref.unwrap();
48                let (leaf_node, left, right): (NodePtrRef, NodePtrRef, NodePtrRef);
49                let cp_len: Depth;
50                let label_prefix: Key;
51                if let NodeBox::Internal(ref mut n) = *node_ref.borrow_mut() {
52                    cp_len = n.label.common_prefix_len(
53                        n.label_bit_length,
54                        &key_remainder,
55                        key.bit_length() - bit_depth,
56                    );
57
58                    if cp_len == n.label_bit_length {
59                        // The current part of key matched the node's Label. Do recursion.
60                        let r: (NodePtrRef, Option<Value>);
61                        if key.bit_length() == bit_depth + n.label_bit_length {
62                            // Key to insert ends exactly at this node. Add it to the
63                            // existing internal node as LeafNode.
64                            r = self._insert(
65                                n.leaf_node.clone(),
66                                bit_depth + n.label_bit_length,
67                                key,
68                                val,
69                            )?;
70                            n.leaf_node = r.0;
71                        } else if key.get_bit(bit_depth + n.label_bit_length) {
72                            // Insert recursively based on the bit value.
73                            r = self._insert(
74                                n.right.clone(),
75                                bit_depth + n.label_bit_length,
76                                key,
77                                val,
78                            )?;
79                            n.right = r.0;
80                        } else {
81                            r = self._insert(
82                                n.left.clone(),
83                                bit_depth + n.label_bit_length,
84                                key,
85                                val,
86                            )?;
87                            n.left = r.0;
88                        }
89
90                        if !n.leaf_node.borrow().clean
91                            || !n.left.borrow().clean
92                            || !n.right.borrow().clean
93                        {
94                            n.clean = false;
95                            ptr.borrow_mut().clean = false;
96                            // No longer eligible for eviction as it is dirty.
97                            self.cache
98                                .borrow_mut()
99                                .rollback_node(ptr.clone(), NodeKind::Internal);
100                        }
101
102                        return Ok((ptr, r.1));
103                    }
104
105                    // Key mismatches the label at position cp_len. Split the edge and
106                    // insert new leaf.
107                    let label_split = n.label.split(cp_len, n.label_bit_length);
108                    label_prefix = label_split.0;
109                    n.label = label_split.1;
110                    n.label_bit_length -= cp_len;
111                    n.clean = false;
112                    ptr.borrow_mut().clean = false;
113                    // No longer eligible for eviction as it is dirty.
114                    self.cache
115                        .borrow_mut()
116                        .rollback_node(ptr.clone(), NodeKind::Internal);
117
118                    let new_leaf = self.cache.borrow_mut().new_leaf_node(key, val);
119                    if key.bit_length() - bit_depth == cp_len {
120                        // The key is a prefix of existing path.
121                        leaf_node = new_leaf;
122                        if n.label.get_bit(0) {
123                            left = NodePointer::null_ptr();
124                            right = ptr;
125                        } else {
126                            left = ptr;
127                            right = NodePointer::null_ptr();
128                        }
129                    } else {
130                        leaf_node = NodePointer::null_ptr();
131                        if key_remainder.get_bit(cp_len) {
132                            left = ptr;
133                            right = new_leaf;
134                        } else {
135                            left = new_leaf;
136                            right = ptr;
137                        }
138                    }
139                } else {
140                    return Err(anyhow!(
141                        "insert.rs: unknown internal node_ref {:?}",
142                        node_ref
143                    ));
144                }
145
146                return Ok((
147                    self.cache.borrow_mut().new_internal_node(
148                        &label_prefix,
149                        cp_len,
150                        leaf_node,
151                        left,
152                        right,
153                    ),
154                    None,
155                ));
156            }
157            NodeKind::Leaf => {
158                // If the key matches, we can just update the value.
159                let node_ref = node_ref.unwrap();
160                let (leaf_node, left, right): (NodePtrRef, NodePtrRef, NodePtrRef);
161                let cp_len: Depth;
162                let label_prefix: Key;
163                if let NodeBox::Leaf(ref mut n) = *node_ref.borrow_mut() {
164                    // Should always succeed.
165                    if n.key == *key {
166                        // If the key matches, we can just update the value.
167                        if n.value == val {
168                            return Ok((ptr, Some(val)));
169                        }
170                        let old_val = mem::replace(&mut n.value, val);
171                        n.clean = false;
172                        ptr.borrow_mut().clean = false;
173                        // No longer eligible for eviction as it is dirty.
174                        self.cache
175                            .borrow_mut()
176                            .rollback_node(ptr.clone(), NodeKind::Leaf);
177                        return Ok((ptr, Some(old_val)));
178                    }
179
180                    let (_, leaf_key_remainder) = n.key.split(bit_depth, n.key.bit_length());
181                    cp_len = leaf_key_remainder.common_prefix_len(
182                        n.key.bit_length() - bit_depth,
183                        &key_remainder,
184                        key.bit_length() - bit_depth,
185                    );
186
187                    // Key mismatches the label at position cp_len. Split the edge.
188                    label_prefix = leaf_key_remainder
189                        .split(cp_len, leaf_key_remainder.bit_length())
190                        .0;
191                    let new_leaf = self.cache.borrow_mut().new_leaf_node(key, val);
192
193                    if key.bit_length() - bit_depth == cp_len {
194                        // Inserted key is a prefix of the label.
195                        leaf_node = new_leaf;
196                        if leaf_key_remainder.get_bit(cp_len) {
197                            left = NodePointer::null_ptr();
198                            right = ptr;
199                        } else {
200                            left = ptr;
201                            right = NodePointer::null_ptr();
202                        }
203                    } else if n.key.bit_length() - bit_depth == cp_len {
204                        // Label is a prefix of the inserted key.
205                        leaf_node = ptr;
206                        if key_remainder.get_bit(cp_len) {
207                            left = NodePointer::null_ptr();
208                            right = new_leaf;
209                        } else {
210                            left = new_leaf;
211                            right = NodePointer::null_ptr();
212                        }
213                    } else {
214                        leaf_node = NodePointer::null_ptr();
215                        if key_remainder.get_bit(cp_len) {
216                            left = ptr;
217                            right = new_leaf;
218                        } else {
219                            left = new_leaf;
220                            right = ptr;
221                        }
222                    }
223                } else {
224                    return Err(anyhow!("insert.rs: invalid leaf node_ref {:?}", node_ref));
225                }
226
227                let new_internal = self.cache.borrow_mut().new_internal_node(
228                    &label_prefix,
229                    cp_len,
230                    leaf_node,
231                    left,
232                    right,
233                );
234                Ok((new_internal, None))
235            }
236        }
237    }
238}