Skip to content

Commit

Permalink
update momento client
Browse files Browse the repository at this point in the history
Update the momento sdk to the latest version
  • Loading branch information
brayniac committed Jun 10, 2024
1 parent 50f95f5 commit 6f20475
Show file tree
Hide file tree
Showing 31 changed files with 343 additions and 245 deletions.
28 changes: 24 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ hyper = { version = "1.0.0-rc.4", features = ["http1", "http2", "client"]}
metriken = "0.6.0"
metriken-exposition = { version = "0.7.0", features = ["json", "parquet-conversion"] }
mio = "0.8.8"
momento = "0.32.1"
momento = "0.39.7"
pelikan-net = { version = "0.2.0", default-features = false }
once_cell = "1.18.0"
openssl = { version = "0.10.64", optional = true }
Expand Down
14 changes: 8 additions & 6 deletions src/clients/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use crate::workload::ClientRequest;
use crate::workload::ClientWorkItem as WorkItem;
use crate::*;
use ::momento::MomentoError;

use ::momento::{MomentoError, MomentoErrorCode};
use async_channel::Receiver;
use std::io::{Error, ErrorKind, Result};
use std::time::Instant;
use tokio::io::*;
use tokio::runtime::Runtime;
use tokio::time::{timeout, Duration};

use std::io::{Error, ErrorKind, Result};
use std::time::Instant;

mod http1;
mod http2;
mod memcache;
Expand Down Expand Up @@ -73,9 +75,9 @@ pub enum ResponseError {

impl From<MomentoError> for ResponseError {
fn from(other: MomentoError) -> Self {
match other {
MomentoError::LimitExceeded { .. } => ResponseError::Ratelimited,
MomentoError::Timeout { .. } => ResponseError::BackendTimeout,
match other.error_code {
MomentoErrorCode::LimitExceededError { .. } => ResponseError::Ratelimited,
MomentoErrorCode::TimeoutError { .. } => ResponseError::BackendTimeout,
_ => ResponseError::Exception,
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/clients/momento/commands/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ use super::*;

/// Remove a key from the cache.
pub async fn delete(
client: &mut SimpleCacheClient,
client: &mut CacheClient,
config: &Config,
cache_name: &str,
request: workload::client::Delete,
) -> std::result::Result<(), ResponseError> {
DELETE.increment();

let result = timeout(
config.client().unwrap().request_timeout(),
client.delete(cache_name, (*request.key).to_owned()),
)
.await;

record_result!(result, DELETE)
}
9 changes: 6 additions & 3 deletions src/clients/momento/commands/get.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,30 @@
use super::*;

use ::momento::cache::GetResponse;

/// Retrieve a key-value pair from the cache.
pub async fn get(
client: &mut SimpleCacheClient,
client: &mut CacheClient,
config: &Config,
cache_name: &str,
request: workload::client::Get,
) -> std::result::Result<(), ResponseError> {
GET.increment();

match timeout(
config.client().unwrap().request_timeout(),
client.get(cache_name, &*request.key),
)
.await
{
Ok(Ok(r)) => match r {
::momento::response::Get::Hit { .. } => {
GetResponse::Hit { .. } => {
GET_OK.increment();
RESPONSE_HIT.increment();
GET_KEY_HIT.increment();
Ok(())
}
::momento::response::Get::Miss => {
GetResponse::Miss => {
GET_OK.increment();
RESPONSE_MISS.increment();
GET_KEY_MISS.increment();
Expand Down
8 changes: 5 additions & 3 deletions src/clients/momento/commands/hash_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@ use super::*;

/// Remove one or more fields from a hash (dictionary).
pub async fn hash_delete(
client: &mut SimpleCacheClient,
client: &mut CacheClient,
config: &Config,
cache_name: &str,
request: workload::client::HashDelete,
) -> std::result::Result<(), ResponseError> {
HASH_DELETE.increment();

let result = timeout(
config.client().unwrap().request_timeout(),
client.dictionary_delete(
client.dictionary_remove_fields(
cache_name,
&*request.key,
Fields::Some(request.fields.iter().map(|f| &**f).collect()),
request.fields.iter().map(|f| &**f),
),
)
.await;

record_result!(result, HASH_DELETE)
}
21 changes: 15 additions & 6 deletions src/clients/momento/commands/hash_get.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,37 @@
use super::*;

use ::momento::cache::DictionaryGetFieldsResponse;

/// Retrieve the value for one or more fields in a hash (dictionary).
pub async fn hash_get(
client: &mut SimpleCacheClient,
client: &mut CacheClient,
config: &Config,
cache_name: &str,
request: workload::client::HashGet,
) -> std::result::Result<(), ResponseError> {
HASH_GET.increment();

match timeout(
config.client().unwrap().request_timeout(),
client.dictionary_get(
client.dictionary_get_fields(
cache_name,
&*request.key,
request.fields.iter().map(|f| &**f).collect(),
request.fields.iter().map(|f| &**f),
),
)
.await
{
Ok(Ok(r)) => match r {
DictionaryGet::Hit { value } => {
DictionaryGetFieldsResponse::Hit { .. } => {
let mut hit = 0;
let mut miss = 0;
let dict: HashMap<Vec<u8>, Vec<u8>> = value.collect_into();
let dict: HashMap<Vec<u8>, Vec<u8>> = match r.try_into() {
Ok(d) => d,
Err(e) => {
HASH_GET_EX.increment();
return Err(e.into());
}
};
for field in request.fields {
if dict.contains_key(&*field) {
hit += 1;
Expand All @@ -36,7 +45,7 @@ pub async fn hash_get(
HASH_GET_FIELD_MISS.add(miss);
Ok(())
}
DictionaryGet::Miss => {
DictionaryGetFieldsResponse::Miss => {
RESPONSE_MISS.add(request.fields.len() as _);
HASH_GET_FIELD_MISS.add(request.fields.len() as _);
Ok(())
Expand Down
9 changes: 6 additions & 3 deletions src/clients/momento/commands/hash_get_all.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
use super::*;

use ::momento::cache::DictionaryFetchResponse;

/// Retrieve all fields for a hash (dictionary).
pub async fn hash_get_all(
client: &mut SimpleCacheClient,
client: &mut CacheClient,
config: &Config,
cache_name: &str,
request: workload::client::HashGetAll,
) -> std::result::Result<(), ResponseError> {
HASH_GET_ALL.increment();

match timeout(
config.client().unwrap().request_timeout(),
client.dictionary_fetch(cache_name, &*request.key),
)
.await
{
Ok(Ok(r)) => match r {
DictionaryFetch::Hit { .. } => {
DictionaryFetchResponse::Hit { .. } => {
RESPONSE_HIT.increment();
HASH_GET_ALL_HIT.increment();
Ok(())
}
DictionaryFetch::Miss => {
DictionaryFetchResponse::Miss => {
RESPONSE_MISS.increment();
HASH_GET_ALL_MISS.increment();
Ok(())
Expand Down
17 changes: 9 additions & 8 deletions src/clients/momento/commands/hash_increment.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
use super::*;

use ::momento::cache::{CollectionTtl, DictionaryIncrementRequest};

/// Increment the value for a field in a dictionary.
///
/// NOTE: if a TTL is specified, this command will not refresh the TTL for the
/// collection.
pub async fn hash_increment(
client: &mut SimpleCacheClient,
client: &mut CacheClient,
config: &Config,
cache_name: &str,
request: workload::client::HashIncrement,
) -> std::result::Result<(), ResponseError> {
HASH_INCR.increment();

let r =
DictionaryIncrementRequest::new(cache_name, &*request.key, &*request.field, request.amount)
.ttl(CollectionTtl::new(request.ttl, false));

match timeout(
config.client().unwrap().request_timeout(),
client.dictionary_increment(
cache_name,
&*request.key,
&*request.field,
request.amount,
CollectionTtl::new(request.ttl, false),
),
client.send_request(r),
)
.await
{
Expand Down
57 changes: 42 additions & 15 deletions src/clients/momento/commands/hash_set.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,57 @@
use super::*;

use ::momento::cache::{CollectionTtl, DictionarySetFieldRequest, DictionarySetFieldsRequest};

/// Set the value for a field in a hash (dictionary).
///
/// NOTE: if a TTL is specified, this command will not refresh the TTL for the
/// collection.
pub async fn hash_set(
client: &mut SimpleCacheClient,
client: &mut CacheClient,
config: &Config,
cache_name: &str,
request: workload::client::HashSet,
) -> std::result::Result<(), ResponseError> {
if request.data.is_empty() {
panic!("empty data for hash set");
}

HASH_SET.increment();
let data: HashMap<Vec<u8>, Vec<u8>> = request
.data
.iter()
.map(|(k, v)| (k.to_vec(), v.to_vec()))
.collect();
let result = timeout(
config.client().unwrap().request_timeout(),
client.dictionary_set(

if request.data.len() == 1 {
let (field, value) = request.data.into_iter().next().unwrap();

let r = DictionarySetFieldRequest::new(
cache_name,
&*request.key,
data,
CollectionTtl::new(request.ttl, false),
),
)
.await;
record_result!(result, HASH_SET)
&*field,
value,
)
.ttl(CollectionTtl::new(request.ttl, false));

let result = timeout(
config.client().unwrap().request_timeout(),
client.send_request(r),
)
.await;

record_result!(result, HASH_SET)
} else {
let d: Vec<(Vec<u8>, Vec<u8>)> = request
.data
.into_iter()
.map(|(k, v)| (k.to_vec(), v))
.collect();

let r = DictionarySetFieldsRequest::new(cache_name, &*request.key, d)
.ttl(CollectionTtl::new(request.ttl, false));

let result = timeout(
config.client().unwrap().request_timeout(),
client.send_request(r),
)
.await;

record_result!(result, HASH_SET)
}
}
4 changes: 3 additions & 1 deletion src/clients/momento/commands/list_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ use super::*;

/// Retrieve all the members of a list in the cache.
pub async fn list_fetch(
client: &mut SimpleCacheClient,
client: &mut CacheClient,
config: &Config,
cache_name: &str,
request: workload::client::ListFetch,
) -> std::result::Result<(), ResponseError> {
LIST_FETCH.increment();

let result = timeout(
config.client().unwrap().request_timeout(),
client.list_fetch(cache_name, &*request.key),
)
.await;

record_result!(result, LIST_FETCH)
}
Loading

0 comments on commit 6f20475

Please sign in to comment.