-
Notifications
You must be signed in to change notification settings - Fork 0
/
collection.h
178 lines (149 loc) · 4.47 KB
/
collection.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
/* -*- mode: C++; c-file-style: "Google"; c-basic-offset: 4 -*- */
#pragma once
#include <iostream>
#include <memory>
#include <mutex>
#include <sstream>
#include <string>
#include <thread>
#include <vector>
#include <boost/program_options.hpp>
#include "mongo/client/dbclient.h"
#include "counter.h"
#include "options.h"
#include "output.h"
#include "thread.h"
namespace cortisol {
using std::cerr;
using std::endl;
using std::unique_ptr;
using std::string;
using std::stringstream;
using std::vector;
using out::ofs;
using out::ors;
using mongo::BSONObj;
using mongo::BSONObjBuilder;
extern thread_interrupter interrupter;
class ConnectionInfo {
string _ns;
protected:
Options _opts;
const string &ns() const {
return _ns;
}
string dbname() const {
stringstream ss;
const char *n = _ns.c_str();
const char *dot = strchr(n, '.');
ss.write(n, dot - n);
return ss.str();
}
string collname() const {
stringstream ss;
const char *n = _ns.c_str();
const char *dot = strchr(n, '.');
ss.write(dot + 1, _ns.size() - ((dot + 1) - n));
return ss.str();
}
public:
ConnectionInfo(const Options &opts, const string &ns) : _ns(ns), _opts(opts) {}
};
class Collection : public ConnectionInfo {
unique_ptr<mongo::ScopedDbConnection> _c;
bool _is_tokumx;
vector<BSONObj> index_specs() const;
void create_options(BSONObjBuilder &b) const;
void ensure_indexes();
mongo::DBClientBase &conn() const {
return _c->conn();
}
bool is_tokumx() const {
return _is_tokumx;
}
public:
Collection(const Options &opts, const string &ns) : ConnectionInfo(opts, ns), _c(mongo::ScopedDbConnection::getScopedDbConnection(opts.host)) {
BSONObj res;
bool ok = conn().simpleCommand("admin", &res, "buildInfo");
assert(ok);
_is_tokumx = res["tokumxVersion"].ok();
}
~Collection() {
if (_c) {
_c->done();
}
}
void drop();
void fill();
// config
static size_t collections;
static size_t indexes;
static bool clustering;
static size_t fields;
static size_t documents;
static size_t padding;
static double compressibility;
static po::options_description options_description();
Collection(Collection &&o) = default;
Collection &operator=(Collection &&o) = default;
};
class CollectionRunner : public ConnectionInfo {
bool _running;
size_t _id;
counter<size_t> _steps;
public:
CollectionRunner(const Options &opts, const string &ns, size_t id, timestamp_t t0) : ConnectionInfo(opts, ns), _running(true), _id(id), _steps(t0) {}
class UnimplementedException : public std::exception {};
virtual void step(mongo::DBClientBase &) {
throw UnimplementedException();
}
virtual const string &name() const {
static const string n = "unknown";
return n;
}
void operator()() {
while (_running) {
unique_ptr<mongo::ScopedDbConnection> c(mongo::ScopedDbConnection::getScopedDbConnection(_opts.host));
try {
interrupter.check_for_interrupt();
step(c->conn());
_steps++;
} catch (interrupt_exception &e) {
stop();
} catch (UnimplementedException) {
cerr << "unimplemented step()" << endl;
stop();
} catch (std::exception &e) {
cerr << "caught exception " << e.what() << endl;
}
c->done();
}
}
template<class ostream_type>
static void header(ostream_type &os) {
os << "# " << out::pad(16) << "ns" << ofs
<< out::pad(10) << "type" << ofs
<< out::pad(4) << "id" << ofs
<< counter<size_t>::header() << ors;
}
template<class ostream_type>
void report(ostream_type &os, timestamp_t ti) {
static std::mutex _m;
std::lock_guard<std::mutex> lk(_m);
os << out::pad(18) << ns() << ofs
<< out::pad(10) << name() << ofs
<< out::pad(4) << _id << ofs
<< _steps.report(ti) << ors;
}
template<class ostream_type>
void total(ostream_type &os, timestamp_t ti) {
os << out::pad(18) << ns() << ofs
<< out::pad(10) << name() << ofs
<< out::pad(4) << _id << ofs
<< _steps.total(ti) << ors;
}
void stop() {
_running = false;
}
};
} // namespace cortisol