-
Notifications
You must be signed in to change notification settings - Fork 9
/
kvstore.rs
108 lines (97 loc) · 3.12 KB
/
kvstore.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
use bincode;
use bytes::Bytes;
use paxos::{ReplicatedState, Slot};
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
convert::TryFrom,
sync::{Arc, Mutex},
};
use tokio::sync::oneshot::{channel, Receiver, Sender};
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum KvCommand {
Get { request_id: u64, key: Bytes },
Set { request_id: u64, key: Bytes, value: Bytes },
}
impl Into<Bytes> for KvCommand {
fn into(self) -> Bytes {
bincode::serialize(&self).unwrap().into()
}
}
impl TryFrom<Bytes> for KvCommand {
// TODO: better error handling for the parser
type Error = ();
fn try_from(mut value: Bytes) -> Result<Self, Self::Error> {
bincode::deserialize(&mut value).map_err(|e| {
error!("Error deserializing key value command: {:?}", e);
})
}
}
struct Inner {
values: HashMap<Bytes, Bytes>,
pending_set: HashMap<u64, Sender<Slot>>,
pending_get: HashMap<u64, Sender<Option<(Slot, Bytes)>>>,
}
#[derive(Clone)]
pub struct KeyValueStore {
inner: Arc<Mutex<Inner>>,
}
impl Default for KeyValueStore {
fn default() -> KeyValueStore {
KeyValueStore {
inner: Arc::new(Mutex::new(Inner {
values: HashMap::default(),
pending_set: HashMap::default(),
pending_get: HashMap::default(),
})),
}
}
}
impl KeyValueStore {
pub fn register_get(&self, id: u64) -> Receiver<Option<(Slot, Bytes)>> {
let (snd, recv) = channel();
{
let mut inner = self.inner.lock().unwrap();
&inner.pending_get.insert(id, snd);
}
recv
}
pub fn register_set(&self, id: u64) -> Receiver<Slot> {
let (snd, recv) = channel();
{
let mut inner = self.inner.lock().unwrap();
inner.pending_set.insert(id, snd);
}
recv
}
pub fn prune_listeners(&self) {
let mut inner = self.inner.lock().unwrap();
inner.pending_get.retain(|_, val| !val.is_closed());
inner.pending_set.retain(|_, val| !val.is_closed());
}
}
impl ReplicatedState for KeyValueStore {
fn execute(&mut self, slot: Slot, cmd: Bytes) {
match KvCommand::try_from(cmd) {
Ok(KvCommand::Get { request_id, key }) => {
let mut inner = self.inner.lock().unwrap();
let sender = match inner.pending_get.remove(&request_id) {
Some(sender) => sender,
None => return,
};
match inner.values.get(&key).cloned() {
Some(val) => sender.send(Some((slot, val))).unwrap_or(()),
None => sender.send(None).unwrap_or(()),
}
}
Ok(KvCommand::Set { request_id, key, value }) => {
let mut inner = self.inner.lock().unwrap();
inner.values.insert(key, value);
if let Some(sender) = inner.pending_set.remove(&request_id) {
sender.send(slot).unwrap_or(());
}
}
Err(()) => {}
}
}
}