Skip to content

Commit

Permalink
ARIES works !
Browse files Browse the repository at this point in the history
  • Loading branch information
jarulraj committed Feb 18, 2014
1 parent 53ea2b9 commit 0936955
Show file tree
Hide file tree
Showing 20 changed files with 151 additions and 150 deletions.
19 changes: 9 additions & 10 deletions build.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,11 @@
RecoveryProtoMessage.cpp
RecoveryProtoMessageBuilder.cpp
DefaultTupleSerializer.cpp
"""
# StringRef.cpp
# ThreadLocalPool.cpp
# CompactingStringPool.cpp
# CompactingStringStorage.cpp
#"""
StringRef.cpp
ThreadLocalPool.cpp
CompactingStringPool.cpp
CompactingStringStorage.cpp
"""

CTX.INPUT['execution'] = """
JNITopend.cpp
Expand Down Expand Up @@ -310,10 +309,10 @@
Logrecord.cpp
"""

#CTX.INPUT['structures'] = """
# CompactingPool.cpp
# ContiguousAllocator.cpp
#"""
CTX.INPUT['structures'] = """
CompactingPool.cpp
ContiguousAllocator.cpp
"""

# specify the third party input

Expand Down
26 changes: 26 additions & 0 deletions src/ee/common/ThreadLocalPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include "common/FatalException.hpp"
#include <iostream>

#include "common/debuglog.h"

namespace voltdb {
/**
* Thread local key for storing thread specific memory pools
Expand Down Expand Up @@ -59,6 +61,22 @@ ThreadLocalPool::ThreadLocalPool() {
}
}

void ThreadLocalPool::initialize() {
(void) pthread_once(&m_keyOnce, createThreadLocalKey);
if (pthread_getspecific(m_key) == NULL) {
pthread_setspecific(m_keyAllocated,
static_cast<const void *>(new std::size_t(0)));
pthread_setspecific(m_key,
static_cast<const void *>(new PairType(1, new MapType())));
pthread_setspecific(m_stringKey,
static_cast<const void*>(new CompactingStringStorage()));
} else {
PairTypePtr p = static_cast<PairTypePtr>(pthread_getspecific(m_key));
pthread_setspecific(m_key, new PairType(p->first + 1, p->second));
delete p;
}
}

ThreadLocalPool::~ThreadLocalPool() {
PairTypePtr p =
static_cast<PairTypePtr>(pthread_getspecific(m_key));
Expand Down Expand Up @@ -175,8 +193,15 @@ boost::shared_ptr<boost::pool<voltdb_pool_allocator_new_delete> > ThreadLocalPoo
}

boost::shared_ptr<boost::pool<voltdb_pool_allocator_new_delete> > ThreadLocalPool::getExact(std::size_t size) {
//VOLT_WARN("getExact : thread id %lu size %lu m_key %u pthread_getspecific %p", pthread_self(), size, m_key, (pthread_getspecific(m_key)));

if(pthread_getspecific(m_key) == NULL){
initialize();
}

MapTypePtr pools =
static_cast< PairTypePtr >(pthread_getspecific(m_key))->second;

boost::unordered_map< std::size_t, boost::shared_ptr<boost::pool<voltdb_pool_allocator_new_delete> > >::iterator
iter = pools->find(size);
if (iter == pools->end()) {
Expand Down Expand Up @@ -206,6 +231,7 @@ boost::shared_ptr<boost::pool<voltdb_pool_allocator_new_delete> > ThreadLocalPoo
pool->set_next_size(2);
}
}

return iter->second;
}

Expand Down
3 changes: 3 additions & 0 deletions src/ee/common/ThreadLocalPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class ThreadLocalPool {
ThreadLocalPool();
~ThreadLocalPool();

static void initialize();

/**
* Retrieve a pool that allocates approximately sized chunks of memory. Provides pools that
* are powers of two and powers of two + the previous power of two.
Expand All @@ -60,6 +62,7 @@ class ThreadLocalPool {
static std::size_t getPoolAllocationSize();

static CompactingStringStorage* getStringPool();

};
}

Expand Down
26 changes: 13 additions & 13 deletions src/ee/execution/VoltDBEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,6 @@ bool VoltDBEngine::loadTable(PersistentTable *table,
int64_t lastCommittedTxnId, bool isExecutionNormal) {
// Don't do this if we are recovering
if (isExecutionNormal) {

LogRecord *logrecord = new LogRecord(computeTimeStamp(),
LogRecord::T_BULKLOAD, // we are bulk loading bytes directly
LogRecord::T_FORWARD, // the system is running normally
Expand Down Expand Up @@ -761,7 +760,7 @@ bool VoltDBEngine::loadTable(PersistentTable *table,

LogManager* m_logManager = getLogManager();
Logger m_ariesLogger = m_logManager->getAriesLogger();
/*

const Logger *logger = m_logManager->getThreadLogger(LOGGERID_MM_ARIES);

assert(logger != NULL);
Expand Down Expand Up @@ -789,7 +788,6 @@ bool VoltDBEngine::loadTable(PersistentTable *table,
// next log the raw bytes of the bulkload array
// CHANGE :: skip
logger->log(LOGLEVEL_INFO, reinterpret_cast<const char *>(serializeIn.getRawPointer(0)), numBytes);
*/

delete[] logrecordBuffer;
logrecordBuffer = NULL;
Expand Down Expand Up @@ -1590,6 +1588,8 @@ void VoltDBEngine::doAriesRecovery(char *logData, size_t length, int64_t replay_
TableTuple *afterImage = NULL;

if (logrecord.getType() == LogRecord::T_INSERT) {
VOLT_WARN("Log record recovery : INSERT start");

// at this point, don't worry about
// logging during recovery
// XXX: note that duplicate inserts won't happen silently:
Expand All @@ -1607,8 +1607,10 @@ void VoltDBEngine::doAriesRecovery(char *logData, size_t length, int64_t replay_
afterImage = NULL;
}

VOLT_WARN("Log record recovery : INSERT");
VOLT_WARN("Log record recovery : INSERT end");
} else if (logrecord.getType() == LogRecord::T_UPDATE) {
VOLT_WARN("Log record recovery : UPDATE start");

beforeImage = logrecord.getTupleBeforeImage();
afterImage = logrecord.getTupleAfterImage();

Expand All @@ -1630,8 +1632,10 @@ void VoltDBEngine::doAriesRecovery(char *logData, size_t length, int64_t replay_
delete afterImage;
afterImage = NULL;

VOLT_WARN("Log record recovery : UPDATE");
VOLT_WARN("Log record recovery : UPDATE end");
} else if (logrecord.getType() == LogRecord::T_BULKLOAD) {
VOLT_WARN("Log record recovery : BULKLOAD start");

numBulkLoadBytes = input.readLong();

// make sure we create a separate input reader
Expand All @@ -1655,15 +1659,11 @@ void VoltDBEngine::doAriesRecovery(char *logData, size_t length, int64_t replay_
// advance read position to the correct place.
input.getRawPointer(numBulkLoadBytes);

VOLT_WARN("Log record recovery : BULKLOAD");
VOLT_WARN("Log record recovery : BULKLOAD end");
} else if (logrecord.getType() == LogRecord::T_DELETE) {
beforeImage = logrecord.getTupleBeforeImage();
TableTuple* primaryKey = logrecord.getPrimaryKey();
VOLT_WARN("Log record recovery : DELETE start");

if(primaryKey != NULL)
VOLT_WARN("DEBUG PKEY : %s", primaryKey->debugNoHeader().c_str());
if(beforeImage != NULL)
VOLT_WARN("DEBUG BEFORE IMAGE : %s", beforeImage->debugNoHeader().c_str());
beforeImage = logrecord.getTupleBeforeImage();

table->deleteTuple(*beforeImage, true);

Expand All @@ -1672,7 +1672,7 @@ void VoltDBEngine::doAriesRecovery(char *logData, size_t length, int64_t replay_
delete beforeImage;
beforeImage = NULL;

VOLT_WARN("Log record recovery : DELETE");
VOLT_WARN("Log record recovery : DELETE end");
} else if (logrecord.getType() == LogRecord::T_TRUNCATE) {
table->deleteAllTuples(true);

Expand Down
5 changes: 5 additions & 0 deletions src/ee/execution/VoltDBEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
#include "stats/StatsAgent.h"
#include "storage/persistenttable.h"
#include "storage/mmap_persistenttable.h"
#include "common/ThreadLocalPool.h"

#ifdef ANTICACHE
#include "anticache/EvictedTupleAccessException.h"
Expand Down Expand Up @@ -611,6 +612,10 @@ class __attribute__((visibility("default"))) VoltDBEngine {
ExecutorContext *m_executorContext;

DefaultTupleSerializer m_tupleSerializer;

private:
ThreadLocalPool m_tlPool;

};

inline void VoltDBEngine::resetReusedResultOutputBuffer(const size_t headerSize) {
Expand Down
12 changes: 4 additions & 8 deletions src/ee/executors/deleteexecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,11 @@ bool DeleteExecutor::p_execute(const NValueArray &params, ReadWriteTracker *trac
TableTuple *keyTuple = NULL;
char *keydata = NULL;

/*
// See if we use an index instead
TableIndex *index = m_targetTable->primaryKeyIndex();
if (index != NULL) {
// First construct tuple for primary key
keydata = new char[index->getKeySchema()->tupleLength()];
Expand All @@ -193,6 +195,7 @@ bool DeleteExecutor::p_execute(const NValueArray &params, ReadWriteTracker *trac
// no before image need be recorded, just the primary key
beforeImage = NULL;
}
*/

LogRecord *logrecord = new LogRecord(computeTimeStamp(),
LogRecord::T_DELETE, // this is a delete record
Expand All @@ -216,23 +219,16 @@ bool DeleteExecutor::p_execute(const NValueArray &params, ReadWriteTracker *trac

logrecord->serializeTo(output);


LogManager* m_logManager = this->m_engine->getLogManager();
Logger m_ariesLogger = m_logManager->getAriesLogger();
//VOLT_WARN("m_logManager : %p AriesLogger : %p",&m_logManager, &m_ariesLogger);

/*
const Logger *logger = m_logManager->getThreadLogger(LOGGERID_MM_ARIES);

// CHANGE ::
logger->log(LOGLEVEL_INFO, output.data(), output.position());

if(beforeImage != NULL){
VOLT_WARN("DEBUG : %s", beforeImage->debugNoHeader().c_str());
}
else if(keyTuple != NULL){
VOLT_WARN("DEBUG : beforeImage null :: keyTuple : %s", keyTuple->debugNoHeader().c_str());
}
*/

delete[] logrecordBuffer;
logrecordBuffer = NULL;
Expand Down
4 changes: 4 additions & 0 deletions src/ee/expressions/expressionutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ constantValueFactory(json_spirit::Object &obj,
case VALUE_TYPE_VARCHAR:
newvalue = ValueFactory::getStringValue(valueValue.get_str());
break;
case VALUE_TYPE_VARBINARY:
// uses hex encoding
newvalue = ValueFactory::getBinaryValue(valueValue.get_str());
break;
case VALUE_TYPE_TIMESTAMP:
newvalue = ValueFactory::getTimestampValue(static_cast<int64_t>(valueValue.get_int64()));
break;
Expand Down
4 changes: 4 additions & 0 deletions src/ee/indexes/tableindex.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
#include "common/tabletuple.h"
#include "common/TupleSchema.h"
#include "indexes/IndexStats.h"
#include "common/ThreadLocalPool.h"

namespace voltdb {

Expand Down Expand Up @@ -344,6 +345,9 @@ class TableIndex

// stats
IndexStats m_stats;

private:
ThreadLocalPool m_tlPool;
};

}
Expand Down
4 changes: 2 additions & 2 deletions src/ee/logging/AriesLogProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ void AriesLogProxy::log(LoggerId loggerId, LogLevel level, const char *statement

void AriesLogProxy::logBinaryOutput(const char *data, size_t size) {
if (jniLogging) {
VOLT_WARN("AriesLogProxy : logToEngineBuffer : %lu", size);
//VOLT_WARN("AriesLogProxy : logToEngineBuffer : %lu", size);
logToEngineBuffer(data, size);
} else {
VOLT_WARN("AriesLogProxy : logLocally : %lu", size);
//VOLT_WARN("AriesLogProxy : logLocally : %lu", size);
logLocally(data, size);
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/ee/logging/LogManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ class LogManager {
const Logger* logger = manager->getLogger(id);
assert(logger != NULL);

VOLT_WARN("Thread id : %lu",pthread_self());
VOLT_WARN("LogManager : %p Logger : %p", manager, logger);
//VOLT_WARN("Thread id : %lu",pthread_self());
//VOLT_WARN("LogManager : %p Logger : %p", manager, logger);

return logger;
}
Expand Down
Loading

0 comments on commit 0936955

Please sign in to comment.