Skip to content
This repository has been archived by the owner on Jul 28, 2024. It is now read-only.

Commit

Permalink
support memcached
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhong-zhong committed Sep 25, 2021
1 parent a193e21 commit 3b9eb85
Show file tree
Hide file tree
Showing 9 changed files with 349 additions and 2 deletions.
7 changes: 7 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,10 @@ add_executable(init_redis ${CoreSource} ${RedisClientSource} redis/init_redis.cp
add_executable(run_redis ${CoreSource} ${RedisClientSource} redis/run_redis.cpp)
target_link_libraries(init_redis pthread hiredis ${YAML_CPP_LIBRARIES})
target_link_libraries(run_redis pthread hiredis ${YAML_CPP_LIBRARIES})

set(MemcachedClientSource memcached/memcached_client.cpp)
add_executable(init_memcached ${CoreSource} ${MemcachedClientSource} memcached/init_memcached.cpp)
add_executable(run_memcached ${CoreSource} ${MemcachedClientSource} memcached/run_memcached.cpp)
target_link_libraries(init_memcached pthread memcached ${YAML_CPP_LIBRARIES})
target_link_libraries(run_memcached pthread memcached ${YAML_CPP_LIBRARIES})

29 changes: 29 additions & 0 deletions memcached/config/uniform.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Database Parameters
database:
key_size: 16
value_size: 4096
nr_entry: 262144

# Workload Parameters
workload:
nr_warmup_op: 0
nr_op: 5000000
nr_init_thread: 1
nr_thread: 1
next_op_interval_ns: 0
operation_proportion:
read: 1
update: 0
insert: 0
scan: 0
read_modify_write: 0
request_distribution: "uniform"
# for zipfian distribution
zipfian_constant: 0.99
# for scan operation
scan_length: 100

# Memcached Parameters
memcached:
addr: "127.0.0.1"
port: 11211
26 changes: 26 additions & 0 deletions memcached/init_memcached.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#include <iostream>
#include "worker.h"
#include "memcached_client.h"
#include "memcached_config.h"
#include "yaml-cpp/yaml.h"

int main(int argc, char *argv[]) {
if (argc != 2 && argc != 3) {
printf("Usage: %s <config file> <redis port (optional)>\n", argv[0]);
return -EINVAL;
}
YAML::Node file = YAML::LoadFile(argv[1]);
MemcachedConfig config = MemcachedConfig::parse_yaml(file);
int port = config.memcached.port;
if (argc == 3)
port = atoi(argv[2]);

MemcachedFactory factory(config.memcached.addr.c_str(), port);

run_init_workload_with_op_measurement("Initialization",
&factory,
config.database.nr_entry,
config.database.key_size,
config.database.value_size,
config.workload.nr_init_thread);
}
104 changes: 104 additions & 0 deletions memcached/memcached_client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#include "memcached_client.h"

MemcachedClient::MemcachedClient(MemcachedFactory *factory, int id)
: Client(id, factory), memcached_context(nullptr), last_reply(nullptr) {
char config[MEMCACHED_MAX_CONFIG_LEN];
int ret = sprintf(config, "--SERVER=%s:%d", factory->memcached_addr, factory->memcached_port);
if (ret < 0) {
fprintf(stderr, "MemcachedClient: failed to generate config string with addr %s and port %d\n",
factory->memcached_addr, factory->memcached_port);
throw std::invalid_argument("failed to generate config string");
}
this->memcached_context = memcached(config, strlen(config));
if (this->memcached_context == nullptr) {
fprintf(stderr, "MemcachedClient: failed to create memcached context with addr %s and port %d\n",
factory->memcached_addr, factory->memcached_port);
throw std::invalid_argument("failed to create memcached context");
}
}

MemcachedClient::~MemcachedClient() {
if (this->last_reply != nullptr)
free(this->last_reply);
if (this->memcached_context != nullptr)
memcached_free(this->memcached_context);
}

int MemcachedClient::do_operation(Operation *op) {
switch (op->type) {
case READ:
return this->do_read(op);
case INSERT:
case UPDATE:
return this->do_update(op);
default:
throw std::invalid_argument("invalid op type");
}
}

int MemcachedClient::do_read(Operation *op) {
size_t value_length;
uint32_t flags;
memcached_return_t rc;
char *value = memcached_get(this->memcached_context,
op->key_buffer,
strlen(op->key_buffer),
&value_length,
&flags,
&rc);
if (value == nullptr) {
fprintf(stderr, "MemcachedClient: READ error: %s\n",
memcached_strerror(this->memcached_context, rc));
throw std::invalid_argument("failed to READ");
}
op->reply_value_buffer = value;
this->set_last_reply(value);
return 0;
}

int MemcachedClient::do_update(Operation *op) {
memcached_return_t rc;
rc = memcached_set(this->memcached_context, op->key_buffer, strlen(op->key_buffer),
op->value_buffer, strlen(op->value_buffer), 0, 0);
if (rc != MEMCACHED_SUCCESS) {
fprintf(stderr, "MemcachedClient: SET error: %s\n",
memcached_strerror(this->memcached_context, rc));
throw std::invalid_argument("failed to SET");
}
return 0;
}

int MemcachedClient::reset() {
return 0;
}

void MemcachedClient::close() {
if (this->last_reply != nullptr)
free(this->last_reply);
this->last_reply = nullptr;
if (this->memcached_context != nullptr)
memcached_free(this->memcached_context);
this->memcached_context = nullptr;
}

void MemcachedClient::set_last_reply(char *reply) {
if (this->last_reply)
free(this->last_reply);
this->last_reply = reply;
}

MemcachedFactory::MemcachedFactory(const char *memcached_addr, int memcached_port)
: memcached_addr(memcached_addr), memcached_port(memcached_port), client_id(0) {
;
}

MemcachedClient *MemcachedFactory::create_client() {
MemcachedClient *client = new MemcachedClient(this, this->client_id++);
return client;
}

void MemcachedFactory::destroy_client(Client *client) {
MemcachedClient *memcached_client = (MemcachedClient *) client;
memcached_client->close();
delete memcached_client;
}
38 changes: 38 additions & 0 deletions memcached/memcached_client.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#ifndef YCSB_MEMCACHED_CLIENT_H
#define YCSB_MEMCACHED_CLIENT_H

#include <cstring>
#include <libmemcached/memcached.h>
#include "client.h"

#define MEMCACHED_MAX_CONFIG_LEN 256

struct MemcachedFactory;

struct MemcachedClient : public Client {
memcached_st *memcached_context;
char *last_reply;

MemcachedClient(MemcachedFactory *factory, int id);
~MemcachedClient();
int do_operation(Operation *op) override;
int reset() override;
void close() override;

private:
int do_update(Operation *op);
int do_read(Operation *op);
void set_last_reply(char *reply);
};

struct MemcachedFactory : public ClientFactory {
const char *memcached_addr;
const int memcached_port;
std::atomic<int> client_id;

MemcachedFactory(const char *memcached_addr, int memcached_port);
MemcachedClient *create_client() override;
void destroy_client(Client *client) override;
};

#endif //YCSB_MEMCACHED_CLIENT_H
71 changes: 71 additions & 0 deletions memcached/memcached_config.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#ifndef YCSB_MEMCACHED_CONFIG_H
#define YCSB_MEMCACHED_CONFIG_H

#include <string>
#include "yaml-cpp/yaml.h"

using std::string;

struct MemcachedConfig {
struct {
long key_size;
long value_size;
long nr_entry;
} database;
struct {
long nr_warmup_op;
long nr_op;
int nr_init_thread;
int nr_thread;
long next_op_interval_ns;
struct {
float read;
float update;
float insert;
float scan;
float read_modify_write;
} operation_proportion;
string request_distribution;
double zipfian_constant;
long scan_length;
} workload;
struct {
string addr;
int port;
} memcached;

static MemcachedConfig parse_yaml(YAML::Node &root);
};


MemcachedConfig MemcachedConfig::parse_yaml(YAML::Node &root) {
MemcachedConfig config;
YAML::Node database = root["database"];
config.database.key_size = database["key_size"].as<long>();
config.database.value_size = database["value_size"].as<long>();
config.database.nr_entry = database["nr_entry"].as<long>();

YAML::Node workload = root["workload"];
config.workload.nr_warmup_op = workload["nr_warmup_op"].as<long>();
config.workload.nr_op = workload["nr_op"].as<long>();
config.workload.nr_init_thread = workload["nr_init_thread"].as<int>();
config.workload.nr_thread = workload["nr_thread"].as<int>();
config.workload.next_op_interval_ns = workload["next_op_interval_ns"].as<long>();
YAML::Node operation_proportion = workload["operation_proportion"];
config.workload.operation_proportion.read = operation_proportion["read"].as<float>();
config.workload.operation_proportion.update = operation_proportion["update"].as<float>();
config.workload.operation_proportion.insert = operation_proportion["insert"].as<float>();
config.workload.operation_proportion.scan = operation_proportion["scan"].as<float>();
config.workload.operation_proportion.read_modify_write = operation_proportion["read_modify_write"].as<float>();
config.workload.request_distribution = workload["request_distribution"].as<string>();
config.workload.zipfian_constant = workload["zipfian_constant"].as<double>();
config.workload.scan_length = workload["scan_length"].as<long>();

YAML::Node memcached = root["memcached"];
config.memcached.addr = memcached["addr"].as<string>();
config.memcached.port = memcached["port"].as<int>();

return config;
}

#endif //YCSB_MEMCACHED_CONFIG_H
72 changes: 72 additions & 0 deletions memcached/run_memcached.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#include <iostream>
#include "worker.h"
#include "memcached_client.h"
#include "memcached_config.h"
#include "yaml-cpp/yaml.h"

int main(int argc, char *argv[]) {
if (argc != 2 && argc != 3) {
printf("Usage: %s <config file> <redis port (optional)>\n", argv[0]);
return -EINVAL;
}
YAML::Node file = YAML::LoadFile(argv[1]);
MemcachedConfig config = MemcachedConfig::parse_yaml(file);
int port = config.memcached.port;
if (argc == 3)
port = atoi(argv[2]);

MemcachedFactory factory(config.memcached.addr.c_str(), port);

OpProportion op_prop;
op_prop.op[READ] = config.workload.operation_proportion.read;
op_prop.op[UPDATE] = config.workload.operation_proportion.update;
op_prop.op[INSERT] = config.workload.operation_proportion.insert;
op_prop.op[SCAN] = config.workload.operation_proportion.scan;
op_prop.op[READ_MODIFY_WRITE] = config.workload.operation_proportion.read_modify_write;
for (int i = 0; i < 2; ++i) {
long nr_op;
if (i == 0) {
if (config.workload.nr_warmup_op == 0)
continue;
nr_op = config.workload.nr_warmup_op;
} else {
nr_op = config.workload.nr_op;
}
if (config.workload.request_distribution == "uniform") {
run_uniform_workload_with_op_measurement(i == 0 ? "Uniform (Warm-Up)" : "Uniform",
&factory,
config.database.nr_entry,
config.database.key_size,
config.database.value_size,
config.workload.scan_length,
config.workload.nr_thread,
op_prop,
nr_op,
config.workload.next_op_interval_ns);
} else if (config.workload.request_distribution == "zipfian") {
run_zipfian_workload_with_op_measurement(i == 0 ? "Zipfian (Warm-Up)" : "Zipfian",
&factory,
config.database.nr_entry,
config.database.key_size,
config.database.value_size,
config.workload.scan_length,
config.workload.nr_thread,
op_prop,
config.workload.zipfian_constant,
nr_op,
config.workload.next_op_interval_ns);
} else if (config.workload.request_distribution == "latest") {
run_latest_workload_with_op_measurement(i == 0 ? "Latest (Warm-Up)" : "Latest",
&factory,
config.database.nr_entry,
config.database.key_size,
config.database.value_size,
config.workload.nr_thread,
op_prop.op[READ],
config.workload.zipfian_constant,
nr_op,
config.workload.next_op_interval_ns);
}
}
}

2 changes: 1 addition & 1 deletion redis/config/uniform.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ workload:
# for scan operation
scan_length: 100

# WiredTiger Parameters
# Redis Parameters
redis:
addr: "127.0.0.1"
port: 6379
Expand Down
2 changes: 1 addition & 1 deletion redis/config/zipfian.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ workload:
# for scan operation
scan_length: 100

# WiredTiger Parameters
# Redis Parameters
redis:
addr: "127.0.0.1"
port: 6379
Expand Down

0 comments on commit 3b9eb85

Please sign in to comment.