Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion accel-config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ do
done

sudo accel-config config-wq ${SWQ} --group-id=0
sudo accel-config config-wq ${SWQ} --priority=5
sudo accel-config config-wq ${SWQ} --priority=1
sudo accel-config config-wq ${SWQ} --wq-size=128
sudo accel-config config-wq ${SWQ} --max-batch-size=1024
sudo accel-config config-wq ${SWQ} --max-transfer-size=4194304
sudo accel-config config-wq ${SWQ} --block-on-fault=0
sudo accel-config config-wq ${SWQ} --type=user
sudo accel-config config-wq ${SWQ} --name="dsa-test"
sudo accel-config config-wq ${SWQ} --mode=shared
Expand Down
146 changes: 65 additions & 81 deletions cachelib/allocator/CacheAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -2079,7 +2079,7 @@ auto& mmContainer = getMMContainer(tid, pid, cid);
return evictions;
}

static std::string dsa_error_string(dml::mem_copy_result& result) {
static std::string dsa_error_string(dml::batch_result& result) {
std::string error;
switch (result.status) {
case dml::status_code::false_predicate: /**< Operation completed successfully: but result is unexpected */
Expand Down Expand Up @@ -2139,8 +2139,12 @@ auto& mmContainer = getMMContainer(tid, pid, cid);
}
return error;
}

// exposed for the background evictor to iterate through the memory and evict
// in batch. This should improve insertion path for tiered memory config
using allocator_t = std::allocator<dml::byte_t>;
using batch_handler_t = dml::handler<dml::batch_operation, allocator_t>;

size_t traverseAndEvictItemsUsingDsa(unsigned int tid,
unsigned int pid,
unsigned int cid,
Expand Down Expand Up @@ -2217,68 +2221,52 @@ auto& mmContainer = getMMContainer(tid, pid, cid);
}
XDCHECK_EQ(newItemHandles.size(),validHandleCnt);

/* for DML async op */
auto dmlHandles = std::vector<dml::handler<dml::mem_copy_operation,
std::allocator<dml::byte_t>>>();
//dmlHandles.reserve(validHandleCnt);

//std::default_random_engine generator;
//std::bernoulli_distribution distribution(1.0);

auto sequence = dml::sequence<allocator_t>(validHandleCnt);
for (auto index = 0U; index < candidates.size(); index++) {
XDCHECK_EQ(newItemHandles[index].get(),newItemPtr[index]);
if (newItemHandles[index].get() != newItemPtr[index]) {
throw std::runtime_error(folly::sformat("dml error - newItemHdls {:}, newItemPtr {:}",newItemHandles[index].get(),newItemPtr[index]));
throw std::runtime_error(folly::sformat(
"dml error - newItemHdls {:}, newItemPtr {:}",
newItemHandles[index].get(), newItemPtr[index]));
}
XDCHECK_EQ(newItemHandles[index]->getKey(), candidates[index]->getKey());
if (newItemHandles[index]->getKey() != candidates[index]->getKey()) {
throw std::runtime_error(folly::sformat("dml error - newItemHdl key {}, ptr {:}, old key {}, ptr {:}",newItemHandles[index]->getKey(), newItemHandles[index].get(),
candidates[index]->getKey(),candidates[index]));
throw std::runtime_error(folly::sformat(
"dml error - newItemHdl key {}, ptr {:}, old key {}, ptr {:}",
newItemHandles[index]->getKey(), newItemHandles[index].get(),
candidates[index]->getKey(),candidates[index]));
}
XDCHECK_EQ(newItemHandles[index]->getSize(), candidates[index]->getSize());
XDCHECK_EQ(newItemHandles[index]->getRefCount(), 1);
XDCHECK_EQ(candidates[index]->getRefCount(), 0);

//if (distribution(generator)) {
dml::const_data_view srcView = dml::make_view(
dml::const_data_view srcView = dml::make_view(
reinterpret_cast<uint8_t*>(candidates[index]->getMemory()),
candidates[index]->getSize());
dml::data_view dstView = dml::make_view(
dml::data_view dstView = dml::make_view(
reinterpret_cast<uint8_t*>(newItemHandles[index]->getMemory()),
newItemHandles[index]->getSize());
(*stats_.dsaEvictionSubmits)[tid][pid][cid].inc();
if (config_.dsaAsync) {
if (cid < 11) {
dmlHandles.emplace_back(
dml::submit<dml::software>(dml::mem_copy, srcView, dstView));
} else {
dmlHandles.emplace_back(
dml::submit<dml::hardware>(dml::mem_copy, srcView, dstView));
}
} else {
auto dmlHandle = dml::submit<dml::hardware>(dml::mem_copy, srcView, dstView);
auto result = dmlHandle.get();
if (result.status != dml::status_code::ok) {
std::string error = dsa_error_string(result);
throw std::runtime_error(folly::sformat("dml error: {} for item: {} count {}",
error, candidates[index]->toString(), (*stats_.dsaEvictionSubmits)[tid][pid][cid].get()));
}
XDCHECK_EQ(result.status,dml::status_code::ok);
}
//} else {
// std::memcpy(newItemHandles[index]->getMemory(),
// candidates[index]->getMemory(),
// candidates[index]->getSize());
// }
if (sequence.add(dml::mem_copy, srcView, dstView) != dml::status_code::ok) {
throw std::runtime_error("failed to add dml::mem_copy operation to the sequence.");
}
}
if (config_.dsaAsync) {
if (newItemHandles.size() != dmlHandles.size()) {
throw std::runtime_error(folly::sformat("dml error - newItemHdls {:}, dmlHandles {:}",newItemHandles.size(),dmlHandles.size()));
}
XDCHECK_EQ(newItemHandles.size(),dmlHandles.size());
XDCHECK_EQ(newItemHandles.size(),candidates.size());
} else {
XDCHECK_EQ(0,dmlHandles.size());

batch_handler_t handler{};
/* Use software path (memcpy) if class size is small (identified by class id < 11) */
if (cid < 11) {
handler = dml::submit<dml::software>(dml::batch, sequence);
XDCHECK(handler.valid());
if (!handler.valid()) {
throw std::runtime_error("Failed dml::software sequence submission");
}
} else { // larger items
handler = dml::submit<dml::hardware>(dml::batch, sequence,
dml::default_execution_interface<dml::hardware>{}, 0);
XDCHECK(handler.valid());
if (!handler.valid()) {
throw std::runtime_error("Failed dml::hardware sequence submission");
}
(*stats_.dsaEvictionSubmits)[tid][pid][cid].inc();
}

// Adding the item to mmContainer has to succeed since no one
Expand All @@ -2289,8 +2277,6 @@ auto& mmContainer = getMMContainer(tid, pid, cid);

for (auto index = 0U; index < candidates.size(); index++) {
XDCHECK(newItemHandles[index]);


// no one can add or remove chained items at this point
if (candidates[index]->hasChainedItem()) {
// safe to acquire handle for a moving Item
Expand All @@ -2307,32 +2293,31 @@ auto& mmContainer = getMMContainer(tid, pid, cid);
XLOGF(DFATAL, "{}", e.what());
throw;
}

XDCHECK(!candidates[index]->hasChainedItem());
XDCHECK(newItemHandles[index]->hasChainedItem());
}
auto predicate = [&](const Item& item) {
// we rely on moving flag being set (it should block all readers)
XDCHECK(item.getRefCount() == 0);
return true;
};

if (config_.dsaAsync) {
auto result = dmlHandles[index].get();
if (result.status != dml::status_code::ok) {
uint64_t dsaCallsPost = 0;
for (ClassId cid : classIds) {
dsaCallsPost += (*stats_.dsaEvictionSubmits)[tid][pid][cid].get();
}
uint64_t dsaCalls = dsaCallsPost - dsaCallsPre;
std::string error = dsa_error_string(result);
auto errorStr = folly::sformat("dml error: {} for item: {}, submitsi {:}", error, candidates[index]->toString(),dsaCalls);
XDCHECK_EQ(result.status,dml::status_code::ok) << errorStr;
throw std::runtime_error(errorStr);
}
XDCHECK_EQ(result.status,dml::status_code::ok);
//verify that queue has decreased in size
}

auto predicate = [&](const Item& item) {
// we rely on moving flag being set (it should block all readers)
XDCHECK(item.getRefCount() == 0);
return true;
};

auto result = handler.get();
if (result.status != dml::status_code::ok) {
uint64_t dsaCallsPost = 0;
for (ClassId cid : classIds) {
dsaCallsPost += (*stats_.dsaEvictionSubmits)[tid][pid][cid].get();
}
uint64_t dsaCalls = dsaCallsPost - dsaCallsPre;
std::string error = dsa_error_string(result);
auto errorStr = folly::sformat("dml error: submits {:}", error, dsaCalls);
throw std::runtime_error(errorStr);
}
XDCHECK_EQ(result.status, dml::status_code::ok);

for (auto index = 0U; index < candidates.size(); index++) {
// another thread may have called insertOrReplace() which
// could have marked this item as unaccessible causing the
// replaceIf() in the access container to fail - in this
Expand All @@ -2352,22 +2337,21 @@ auto& mmContainer = getMMContainer(tid, pid, cid);
(*stats_.numWritebacks)[tid][pid][cid].inc();
wakeUpWaiters(*candidates[index], std::move(newItemHandles[index]));
} else {
auto token = createPutToken(*candidates[index]);
auto ret = candidates[index]->markForEvictionWhenMoving();
XDCHECK(ret);
auto token = createPutToken(*candidates[index]);
auto ret = candidates[index]->markForEvictionWhenMoving();
XDCHECK(ret);

unlinkItemForEviction(*candidates[index]);
// wake up any readers that wait for the move to complete
// it's safe to do now, as we have the item marked exclusive and
// no other reader can be added to the waiters list
wakeUpWaiters(*candidates[index], WriteHandle{});
// wake up any readers that wait for the move to complete
// it's safe to do now, as we have the item marked exclusive and
// no other reader can be added to the waiters list
wakeUpWaiters(*candidates[index], WriteHandle{});

if (token.isValid() && shouldWriteToNvmCacheExclusive(*candidates[index])) {
nvmCache_->put(*candidates[index], std::move(token));
if (token.isValid() && shouldWriteToNvmCacheExclusive(*candidates[index])) {
nvmCache_->put(*candidates[index], std::move(token));
}
}


evictions++;
if (candidates[index]->hasChainedItem()) {
(*stats_.chainedItemEvictions)[tid][pid][cid].inc();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"cache_config": {
"cacheSizeMB": 32768,
"poolRebalanceIntervalSec": 0,
"htBucketPower": 27,
"htBucketLock": 27,
"backgroundEvictorIntervalMilSec": 1,
Expand All @@ -15,7 +16,7 @@
},
{
"ratio": 1,
"memBindNodes": 0
"memBindNodes": 1
}
]
},
Expand Down