forked from microsoft/FASTER
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathsingle_threaded_recovery_test.h
148 lines (117 loc) · 4.11 KB
/
single_threaded_recovery_test.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
#pragma once
#include <cassert>
#include <cinttypes>
#include <cstdint>
#include <fstream>
#include <iostream>
#include <string>
#include "core/auto_ptr.h"
#include "core/faster.h"
#include "sum_store.h"
using namespace FASTER;
namespace sum_store {
class SingleThreadedRecoveryTest {
public:
static constexpr uint64_t kNumUniqueKeys = (1L << 23);
static constexpr uint64_t kNumOps = (1L << 25);
static constexpr uint64_t kRefreshInterval = (1L << 8);
static constexpr uint64_t kCompletePendingInterval = (1L << 12);
static constexpr uint64_t kCheckpointInterval = (1L << 20);
SingleThreadedRecoveryTest(store_t& store_)
: store{ store_ } {
}
private:
public:
void Populate() {
auto callback = [](IAsyncContext* ctxt, Status result) {
CallbackContext<RmwContext> context{ ctxt };
assert(result == Status::Ok);
};
auto hybrid_log_persistence_callback = [](Status result, uint64_t persistent_serial_num) {
if(result != Status::Ok) {
printf("Thread %" PRIu32 " reports checkpoint failed.\n",
Thread::id());
} else {
printf("Thread %" PRIu32 " reports persistence until %" PRIu64 "\n",
Thread::id(), persistent_serial_num);
}
};
// Register thread with FASTER
store.StartSession();
// Process the batch of input data
for(uint64_t idx = 0; idx < kNumOps; ++idx) {
RmwContext context{ AdId{ idx % kNumUniqueKeys}, 1 };
store.Rmw(context, callback, idx);
if(idx % kCheckpointInterval == 0) {
Guid token;
store.Checkpoint(nullptr, hybrid_log_persistence_callback, token);
printf("Calling Checkpoint(), token = %s\n", token.ToString().c_str());
}
if(idx % kCompletePendingInterval == 0) {
store.CompletePending(false);
} else if(idx % kRefreshInterval == 0) {
store.Refresh();
}
}
// Make sure operations are completed
store.CompletePending(true);
// Deregister thread from FASTER
store.StopSession();
printf("Populate successful\n");
std::string discard;
std::getline(std::cin, discard);
}
void RecoverAndTest(const Guid& index_token, const Guid& hybrid_log_token) {
auto callback = [](IAsyncContext* ctxt, Status result) {
CallbackContext<ReadContext> context{ ctxt };
assert(result == Status::Ok);
};
// Recover
uint32_t version;
std::vector<Guid> session_ids;
store.Recover(index_token, hybrid_log_token, version, session_ids);
// Create array for reading
auto read_results = alloc_aligned<uint64_t>(64, sizeof(uint64_t) * kNumUniqueKeys);
std::memset(read_results.get(), 0, sizeof(uint64_t) * kNumUniqueKeys);
Guid session_id = session_ids[0];
// Register with thread
uint64_t sno = store.ContinueSession(session_id);
// Issue read requests
for(uint64_t idx = 0; idx < kNumUniqueKeys; ++idx) {
ReadContext context{ AdId{ idx}, read_results.get() + idx };
store.Read(context, callback, idx);
}
// Complete all pending requests
store.CompletePending(true);
// Release
store.StopSession();
// Test outputs
// Compute expected array
auto expected_results = alloc_aligned<uint64_t>(64,
sizeof(uint64_t) * kNumUniqueKeys);
std::memset(expected_results.get(), 0, sizeof(uint64_t) * kNumUniqueKeys);
for(uint64_t idx = 0; idx <= sno; ++idx) {
++expected_results.get()[idx % kNumUniqueKeys];
}
// Assert if expected is same as found
for(uint64_t idx = 0; idx < kNumUniqueKeys; ++idx) {
if(expected_results.get()[idx] != read_results.get()[idx]) {
printf("Debug error for AdId %" PRIu64 ": Expected (%" PRIu64 "), Found(%" PRIu64 ")\n",
idx,
expected_results.get()[idx],
read_results.get()[idx]);
}
}
printf("Test successful\n");
std::string discard;
std::getline(std::cin, discard);
}
void Continue() {
// Not implemented.
assert(false);
}
store_t& store;
};
} // namespace sum_store