Skip to content

Commit

Permalink
Add parse-by-parts example
Browse files Browse the repository at this point in the history
Fix #527
  • Loading branch information
miloyip committed Feb 21, 2016
1 parent 10ccdf8 commit 70cad19
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 0 deletions.
1 change: 1 addition & 0 deletions example/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ set(EXAMPLES
condense
jsonx
messagereader
parsebyparts
pretty
prettyauto
schemavalidator
Expand Down
164 changes: 164 additions & 0 deletions example/parsebyparts/parsebyparts.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Example of parsing JSON to document by parts.

// Using C++11 threads
#if __cplusplus >= 201103L || (defined(_MSC_VER) && _MSC_VER >= 1600)

#include "rapidjson/document.h"
#include "rapidjson/error/en.h"
#include "rapidjson/writer.h"
#include "rapidjson/ostreamwrapper.h"
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>

using namespace rapidjson;

template<unsigned parseFlags = kParseDefaultFlags>
class AsyncDocumentParser {
public:
AsyncDocumentParser(Document& d) : ass_(*this), d_(d), parseThread_(&AsyncDocumentParser::Parse, this), completed_() {}

~AsyncDocumentParser() {
if (!parseThread_.joinable())
return;

{
std::unique_lock<std::mutex> lock(mutex_);

// Wait until the buffer is read up (or parsing is completed)
while (!ass_.Empty() && !completed_)
finish_.wait(lock);

// Automatically append '\0' as the terminator in the stream.
static const char terminator[] = "";
ass_.src_ = terminator;
ass_.end_ = terminator + 1;
notEmpty_.notify_one(); // unblock the AsyncStringStream
}

parseThread_.join();
}

void ParsePart(const char* buffer, size_t length) {
std::unique_lock<std::mutex> lock(mutex_);

// Wait until the buffer is read up (or parsing is completed)
while (!ass_.Empty() && !completed_)
finish_.wait(lock);

// Stop further parsing if the parsing process is completed.
if (completed_)
return;

// Set the buffer to stream and unblock the AsyncStringStream
ass_.src_ = buffer;
ass_.end_ = buffer + length;
notEmpty_.notify_one();
}

private:
void Parse() {
d_.ParseStream<parseFlags>(ass_);

// The stream may not be fully read, notify finish anyway to unblock ParsePart()
std::unique_lock<std::mutex> lock(mutex_);
completed_ = true; // Parsing process is completed
finish_.notify_one(); // Unblock ParsePart() or destructor if they are waiting.
}

struct AsyncStringStream {
typedef char Ch;

AsyncStringStream(AsyncDocumentParser& parser) : parser_(parser), src_(), end_(), count_() {}

char Peek() const {
std::unique_lock<std::mutex> lock(parser_.mutex_);

// If nothing in stream, block to wait.
while (Empty())
parser_.notEmpty_.wait(lock);

return *src_;
}

char Take() {
std::unique_lock<std::mutex> lock(parser_.mutex_);

// If nothing in stream, block to wait.
while (Empty())
parser_.notEmpty_.wait(lock);

count_++;
char c = *src_++;

// If all stream is read up, notify that the stream is finish.
if (Empty())
parser_.finish_.notify_one();

return c;
}

size_t Tell() const { return count_; }

// Not implemented
char* PutBegin() { return 0; }
void Put(char) {}
void Flush() {}
size_t PutEnd(char*) { return 0; }

bool Empty() const { return src_ == end_; }

AsyncDocumentParser& parser_;
const char* src_; //!< Current read position.
const char* end_; //!< End of buffer
size_t count_; //!< Number of characters taken so far.
};

AsyncStringStream ass_;
Document& d_;
std::thread parseThread_;
std::mutex mutex_;
std::condition_variable notEmpty_;
std::condition_variable finish_;
bool completed_;
};

int main() {
Document d;

{
AsyncDocumentParser<> parser(d);

const char json1[] = " { \"hello\" : \"world\", \"t\" : tr";
//const char json1[] = " { \"hello\" : \"world\", \"t\" : trX"; // Fot test parsing error
const char json2[] = "ue, \"f\" : false, \"n\": null, \"i\":123, \"pi\": 3.14";
const char json3[] = "16, \"a\":[1, 2, 3, 4] } ";

parser.ParsePart(json1, sizeof(json1) - 1);
parser.ParsePart(json2, sizeof(json2) - 1);
parser.ParsePart(json3, sizeof(json3) - 1);
}

if (d.HasParseError()) {
std::cout << "Error at offset " << d.GetErrorOffset() << ": " << GetParseError_En(d.GetParseError()) << std::endl;
return EXIT_FAILURE;
}

// Stringify the JSON to cout
OStreamWrapper os(std::cout);
Writer<OStreamWrapper> writer(os);
d.Accept(writer);
std::cout << std::endl;

return EXIT_SUCCESS;
}

#else // Not supporting C++11

#include <iostream>
int main() {
std::cout << "This example requires C++11 compiler" << std::endl;
}

#endif

0 comments on commit 70cad19

Please sign in to comment.