Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New State DB wrapper #3925

Closed
damip opened this issue May 9, 2023 · 3 comments · Fixed by #3783
Closed

New State DB wrapper #3925

damip opened this issue May 9, 2023 · 3 comments · Fixed by #3783
Assignees

Comments

@damip
Copy link
Member

damip commented May 9, 2023

Have the following strucutre wrapping rocksdb:

type Key = Vec<u8>;
type Value = Vec<u8>;

pub struct StreamBatch {
    change_id: ChangeID,
    updates_on_previous_elements: BTreeMap<Key, Option<Value>>,
    new_elements: BTreeMap<Key, Value>
}

struct ChangeHistoryElement<ChangeID: PartialOrd + Ord + PartialEq + Eq> {
    change_id: ChangeID,
    changes: BTreeMap<Key, Option<Value>> // None means that the entry was deleted
}

struct StateDB<ChangeID: PartialOrd + Ord + PartialEq + Eq> {
    db: rocksdb,  // the underlying rocksdb DB
    change_history: VecDeque<ChangeHistoryElement>,
    max_history_len: usize,
    hash_tracker: HashTracker, // can be a XOR or a sparse merkle tree etc...
    cur_change_id,  // current ChangeID (typically a Slot) to which the state is attached
}

impl StateDB {
    // load from file
    pub fn from_file(file_path, max_history_len) -> Result<Self> {
       let db = load_rocksdb_from_file(file_path)?;
       let cur_change_id = // TODO read a special entry of the DB storing the ChangeID at which it is attached (can be another column family);
       let mut hash_tracker = HashTracker::new();
       // TODO iterate over the DB to update the hash_tracker
       Ok(StateDB {
            db,
            change_history: VecDeque::with_capacity(max_history_len),
            max_history_len,
            hash_tracker,
            cur_change_id
       })
    }
    
    // TODO functions to atomically read the db

    // atomically write
    pub fn write_batch(&mut self, changes: BTreeMap<Key, Option<Value>>, change_id: ChangeID) -> Result<()> {
        if change_id <= self.cur_change_id {
            return Err("change_id should monotonically increase after every write");
        }
        self.db.write_batch(&changes)?; // note that None values have to be deleted
        self.hash_tracker.write_batch(&changes); 
        self.cur_change_id = change_id;
        // TODO also write the change_id to the db
        self.change_history.push_back(ChangeHistoryElement {
            change_id,
            changes
       });
       while(self.change_history.len() > self.max_history_len) {
          self.change_history.pop_front();
       }
    }
    
    /// a server sends us a bootstrap batch
    pub fn stream_input(changes: BTreeMap<Key, Option<Value>>) -> Result<()> {
        self.db.write_batch(&changes)?; // note that None values have to be deleted
        self.hash_tracker.write_batch(&changes);
        self.change_history.clear(); // we are initializing the system
    }
   
   /// we are bootstrapping someone: get a stream batch to send them
   /// last_obtained should contain (max_key_obtained_in_the_last_fully_obtained_batch, changeid_of_the_last_fully_obtained_batch) Or None if it's the first batch being asked by the client
   pub fn get_stream_output(last_obtained: Option<(Key, ChangeID)>, max_new_elements: usize) -> Result<StreamBatch> {
       let (updates_on_previous_elements, max_key) = if let Some((max_key, last_change_id)) = last_obtained {
           if last_change_id > self.cur_change_id {
              return Err("we don't have this change yet on this node (it's in the future for us)");
           } else if last_change_id == self.cur_change_id {
              (BTreemap::new(), Some(max_key)) // no updates 
           } else  {
             // TODO binary search last_change_id in self.change_history to find the first element that has a STRICTLY HIGHER change id than last_change_id => let's call the index of that element start_index 
             // TODO if the history is empty, or if all its changes are strictly AFTER last_change_id => return error
             let mut updates =  BTreeMap::new();
             for idx in range(start_index..self.change_history.len()) {
               updates.extend(
                  self.changes_history[idx].changes.iter().filter(|(k, v)| { k <=  max_key})
               );
             }
            updates
           }
       } else {
           (BTreemap::new(), None)
       };

       let new_elements = ; // TODO read a batch of up to max_new_elements elements from the db, starting from the key strictly after max_key (or from the beginning if max_key is None)

     Ok(StreamBatch {
         change_id: self.cur_change_id,
         updates_on_previous_elements,
         new_elements
     })
       
   }
}


@Eitu33
Copy link
Contributor

Eitu33 commented May 9, 2023

Since we stream changes and new entries at the same time it becomes unclear when to begin the consensus blocks streaming as we don't have a simple way of knowing when the majority of final state has been streamed anymore. Also write_batch & stream_input are pretty similar, would it not be best to just have a flag in write_batch to clear the history when bootstrapping? I believe we would like to keep track of the ChangeID in the bootstrapped state as well if we ever want to use the downloaded information after a mid way fail. Besides that it looks good to me.

@damip
Copy link
Member Author

damip commented May 9, 2023

Since we stream changes and new entries at the same time it becomes unclear when to begin the consensus blocks streaming as we don't have a simple way of knowing when the majority of final state has been streamed anymore. Also write_batch & stream_input are pretty similar, would it not be best to just have a flag in write_batch to clear the history when bootstrapping? I believe we would like to keep track of the ChangeID in the bootstrapped state as well if we ever want to use the downloaded information after a mid way fail. Besides that it looks good to me.

Feel free to arrange the methods as you feel is the best.
For streaming consensus, we can add a StreamBatch::is_empty() method that returns true if both updates_on_previous_elements and new_elements are empty. And when the server reaches an empty batch for the first time, it can start streaming the consensus, while continuing to stream non-empty StreamBatch-es as they appear. We consider the bootstrap done when we have sent a final block at the same slot as the last StreamBatch

@damip
Copy link
Member Author

damip commented May 10, 2023

Note:

self.changes_history[idx].changes.iter().filter(|(k, v)| { k <= max_key})

can be made more efficient by using https://doc.rust-lang.org/std/collections/struct.BTreeMap.html#method.range :

self.changes_history[idx].changes.range(Unbounded, Included(&max_key))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants