forked from facebookarchive/scribe
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscribe_server.h
136 lines (114 loc) · 4.74 KB
/
scribe_server.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// Copyright (c) 2007-2009 Facebook
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// See accompanying file LICENSE or visit the Scribe site at:
// http://developers.facebook.com/scribe/
//
// @author Bobby Johnson
// @author James Wang
// @author Jason Sobel
// @author Avinash Lakshman
// @author Anthony Giardullo
#ifndef SCRIBE_SERVER_H
#define SCRIBE_SERVER_H
#include "store.h"
#include "store_queue.h"
typedef std::vector<boost::shared_ptr<StoreQueue> > store_list_t;
typedef std::map<std::string, boost::shared_ptr<store_list_t> > category_map_t;
class scribeHandler : virtual public scribe::thrift::scribeIf,
public facebook::fb303::FacebookBase {
public:
scribeHandler(unsigned long int port, const std::string& conf_file);
~scribeHandler();
void shutdown();
void initialize();
void reinitialize();
scribe::thrift::ResultCode Log(const std::vector<scribe::thrift::LogEntry>& messages);
void getVersion(std::string& _return) {_return = scribeversion;}
facebook::fb303::fb_status getStatus();
void getStatusDetails(std::string& _return);
void setStatus(facebook::fb303::fb_status new_status);
void setStatusDetails(const std::string& new_status_details);
unsigned long int port; // it's long because that's all I implemented in the conf class
// number of threads processing new Thrift connections
size_t numThriftServerThreads;
inline unsigned long long getMaxQueueSize() {
return maxQueueSize;
}
inline const StoreConf& getConfig() const {
return config;
}
void incCounter(std::string category, std::string counter);
void incCounter(std::string category, std::string counter, long amount);
void incCounter(std::string counter);
void incCounter(std::string counter, long amount);
inline void setServer(
boost::shared_ptr<apache::thrift::server::TNonblockingServer> & server) {
this->server = server;
}
unsigned long getMaxConn() {
return maxConn;
}
private:
boost::shared_ptr<apache::thrift::server::TNonblockingServer> server;
unsigned long checkPeriod; // periodic check interval for all contained stores
// This map has an entry for each configured category.
// Each of these entries is a map of type->StoreQueue.
// The StoreQueue contains a store, which could contain additional stores.
category_map_t categories;
category_map_t category_prefixes;
// the default stores
store_list_t defaultStores;
std::string configFilename;
facebook::fb303::fb_status status;
std::string statusDetails;
apache::thrift::concurrency::Mutex statusLock;
time_t lastMsgTime;
unsigned long numMsgLastSecond;
unsigned long maxMsgPerSecond;
unsigned long maxConn;
unsigned long long maxQueueSize;
StoreConf config;
bool newThreadPerCategory;
/* mutex to syncronize access to scribeHandler.
* A single mutex is fine since it only needs to be locked in write mode
* during start/stop/reinitialize or when we need to create a new category.
*/
boost::shared_ptr<apache::thrift::concurrency::ReadWriteMutex>
scribeHandlerLock;
// disallow empty construction, copy, and assignment
scribeHandler();
scribeHandler(const scribeHandler& rhs);
const scribeHandler& operator=(const scribeHandler& rhs);
protected:
bool throttleDeny(int num_messages); // returns true if overloaded
void deleteCategoryMap(category_map_t& cats);
const char* statusAsString(facebook::fb303::fb_status new_status);
bool createCategoryFromModel(const std::string &category,
const boost::shared_ptr<StoreQueue> &model);
boost::shared_ptr<StoreQueue>
configureStoreCategory(pStoreConf store_conf,
const std::string &category,
const boost::shared_ptr<StoreQueue> &model,
bool category_list=false);
bool configureStore(pStoreConf store_conf, int* num_stores);
void stopStores();
bool throttleRequest(const std::vector<scribe::thrift::LogEntry>& messages);
boost::shared_ptr<store_list_t>
createNewCategory(const std::string& category);
void addMessage(const scribe::thrift::LogEntry& entry,
const boost::shared_ptr<store_list_t>& store_list);
};
extern boost::shared_ptr<scribeHandler> g_Handler;
#endif // SCRIBE_SERVER_H