Skip to content

Commit

Permalink
Merge branch 'master' into fix/issue4669
Browse files Browse the repository at this point in the history
  • Loading branch information
jackgerrits authored Feb 20, 2024
2 parents a185764 + 1675c6b commit 2f47639
Show file tree
Hide file tree
Showing 9 changed files with 363 additions and 61 deletions.
4 changes: 2 additions & 2 deletions ext_libs/ext_libs.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ if(RAPIDJSON_SYS_DEP)
# Since EXACT is not specified, any version compatible with 1.1.0 is accepted (>= 1.1.0)
find_package(RapidJSON 1.1.0 CONFIG REQUIRED)
add_library(RapidJSON INTERFACE)
target_include_directories(RapidJSON INTERFACE ${RapidJSON_INCLUDE_DIRS})
target_include_directories(RapidJSON INTERFACE ${RapidJSON_INCLUDE_DIRS} ${RAPIDJSON_INCLUDE_DIRS})
else()
add_library(RapidJSON INTERFACE)
target_include_directories(RapidJSON SYSTEM INTERFACE "${CMAKE_CURRENT_LIST_DIR}/rapidjson/include")
Expand Down Expand Up @@ -127,4 +127,4 @@ if(VW_FEAT_CB_GRAPH_FEEDBACK)
target_include_directories(mlpack_ensmallen SYSTEM INTERFACE ${CMAKE_CURRENT_LIST_DIR}/armadillo-code/include)

target_include_directories(mlpack_ensmallen SYSTEM INTERFACE ${CMAKE_CURRENT_LIST_DIR}/ensmallen/include)
endif()
endif()
3 changes: 3 additions & 0 deletions vowpalwabbit/core/include/vw/core/error_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ ERROR_CODE_DEFINITION(
ERROR_CODE_DEFINITION(
13, fb_parser_size_mismatch_ft_names_ft_values, "Size of feature names and feature values do not match. ")
ERROR_CODE_DEFINITION(14, unknown_label_type, "Label type in Flatbuffer not understood. ")
ERROR_CODE_DEFINITION(15, fb_parser_span_misaligned, "Input Flatbuffer span is not aligned to an 8-byte boundary. ")
ERROR_CODE_DEFINITION(
16, fb_parser_span_length_mismatch, "Input Flatbuffer span does not match flatbuffer size prefix. ")

// TODO: This is temporary until we switch to the new error handling mechanism.
ERROR_CODE_DEFINITION(10000, vw_exception, "vw_exception: ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

#pragma once

#include "vw/core/api_status.h"
#include "vw/core/example.h"
#include "vw/core/multi_ex.h"
#include "vw/core/shared_data.h"
Expand All @@ -14,15 +13,21 @@
namespace VW
{

namespace experimental
{
class api_status;
}

using example_sink_f = std::function<void(VW::multi_ex&& spare_examples)>;

namespace parsers
{
namespace flatbuffer
{
int flatbuffer_to_examples(VW::workspace* all, io_buf& buf, VW::multi_ex& examples);
bool read_span_flatbuffer(
VW::workspace* all, const uint8_t* span, size_t length, example_factory_t example_factory, VW::multi_ex& examples);

int read_span_flatbuffer(VW::workspace* all, const uint8_t* span, size_t length, example_factory_t example_factory,
VW::multi_ex& examples, example_sink_f example_sink = nullptr, VW::experimental::api_status* status = nullptr);

class parser
{
Expand Down Expand Up @@ -57,6 +62,19 @@ class parser
VW::experimental::api_status* status = nullptr);
int get_namespace_index(const Namespace* ns, namespace_index& ni, VW::experimental::api_status* status = nullptr);

inline void reset_active_multi_ex()
{
_multi_ex_index = 0;
_active_multi_ex = false;
_multi_example_object = nullptr;
}

inline void reset_active_collection()
{
_example_index = 0;
_active_collection = false;
}

void parse_simple_label(shared_data* sd, polylabel* l, reduction_features* red_features, const SimpleLabel* label);
void parse_cb_label(polylabel* l, const CBLabel* label);
void parse_ccb_label(polylabel* l, const CCBLabel* label);
Expand Down
130 changes: 80 additions & 50 deletions vowpalwabbit/fb_parser/src/parse_example_flatbuffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
#include "vw/fb_parser/parse_example_flatbuffer.h"

#include "vw/core/action_score.h"
#include "vw/core/api_status.h"
#include "vw/core/best_constant.h"
#include "vw/core/cb.h"
#include "vw/core/constant.h"
#include "vw/core/error_constants.h"
#include "vw/core/global_data.h"
#include "vw/core/parser.h"
#include "vw/core/scope_exit.h"
#include "vw/core/vw.h"

#include <cfloat>
Expand Down Expand Up @@ -43,8 +45,8 @@ int flatbuffer_to_examples(VW::workspace* all, io_buf& buf, VW::multi_ex& exampl
return static_cast<int>(status.get_error_code() == VW::experimental::error_code::success);
}

bool read_span_flatbuffer(
VW::workspace* all, const uint8_t* span, size_t length, example_factory_t example_factory, VW::multi_ex& examples)
int read_span_flatbuffer(VW::workspace* all, const uint8_t* span, size_t length, example_factory_t example_factory,
VW::multi_ex& examples, example_sink_f example_sink, VW::experimental::api_status* status)
{
// we expect context to contain a size_prefixed flatbuffer (technically a binary string)
// which means:
Expand All @@ -59,16 +61,15 @@ bool read_span_flatbuffer(
// thus context.size() = sizeof(length) + length
io_buf unused;

// TODO: How do we report errors out of here? (This is a general API problem with the parsers)
size_t address = reinterpret_cast<size_t>(span);
if (address % 8 != 0)
{
std::stringstream sstream;
sstream << "fb_parser error: flatbuffer data not aligned to 8 bytes" << std::endl;
sstream << " span => @" << std::hex << address << std::dec << " % " << 8 << " = " << address % 8
<< " (vs desired = " << 0 << ")";
THROW(sstream.str());
return false;

RETURN_ERROR_LS(status, fb_parser_span_misaligned) << sstream.str();
}

flatbuffers::uoffset_t flatbuffer_object_size =
Expand All @@ -79,42 +80,80 @@ bool read_span_flatbuffer(
sstream << "fb_parser error: flatbuffer size prefix does not match actual size" << std::endl;
sstream << " span => @" << std::hex << address << std::dec << " size_prefix = " << flatbuffer_object_size
<< " length = " << length;
THROW(sstream.str());
return false;

RETURN_ERROR_LS(status, fb_parser_span_length_mismatch) << sstream.str();
}

VW::multi_ex temp_ex;
temp_ex.push_back(&example_factory());

// Use scope_exit because the parser reports errors by throwing exceptions (the code path in the vw driver
// uses the return value to signal completion, not errors).
auto scope_guard = VW::scope_exit(
[&temp_ex, &all, &example_sink]()
{
if (example_sink == nullptr) { VW::finish_example(*all, temp_ex); }
else { example_sink(std::move(temp_ex)); }
});

// There is a bit of unhappiness with the interface of the read_XYZ_<format>() functions, because they often
// expect the input multi_ex to have a single "empty" example there. This contributes, in part, to the large
// proliferation of entry points into the JSON parser(s). We want to avoid exposing that insofar as possible,
// so we will check whether we already received a perfectly good example and use that, or create a new one if
// needed.
if (examples.size() > 0)
{
assert(examples.size() == 1);
temp_ex.push_back(examples[0]);
examples.pop_back();
}
else { temp_ex.push_back(&example_factory()); }

bool has_more = true;
VW::experimental::api_status status;
do {
switch (all->parser_runtime.flat_converter->parse_examples(all, unused, temp_ex, span, &status))
switch (int result = all->parser_runtime.flat_converter->parse_examples(all, unused, temp_ex, span, status))
{
case VW::experimental::error_code::success:
has_more = true;
break;
// Because nothing_to_parse is not an error we have to filter it out here, otherwise
// we could simply do RETURN_IF_FAIL(result) and let the macro handle it.
case VW::experimental::error_code::nothing_to_parse:
has_more = false;
break;
default:
std::stringstream sstream;
sstream << "Error parsing examples: " << std::endl;
THROW(sstream.str());
return false;
RETURN_IF_FAIL(result);
}

// The underlying parser will emit a newline example when terminating the parsing
// of a multi_ex block. Since we are collecting it into a multi_ex, we want to
// swallow it here, but should the parser not have followed its contract w.r.t.
// the return value, we should use the presence of the newline example to override
// has_more.
has_more &= !temp_ex[0]->is_newline;

// If this is a real example, we need to move it to the output multi_ex; we also
// need to create a new example to replace it for the next run through the parser.
if (!temp_ex[0]->is_newline)
{
examples.push_back(&example_factory());
std::swap(examples[examples.size() - 1], temp_ex[0]);
// We avoid doing moves or copy construction here because multi_ex contains
// example pointers. The compile-time code here is meant to call attention
// to here if the underlying type changes.
using temp_ex_element_t = std::remove_reference<decltype(temp_ex[0])>::type;
using examples_element_t = std::remove_reference<decltype(examples[0])>::type;

static_assert(std::is_same<temp_ex_element_t, examples_element_t>::value &&
std::is_same<temp_ex_element_t, VW::example*>::value,
"temp_ex and example must be vector-like over VW::example*");

examples.push_back(temp_ex[0]);

// Since we are using a vector of pointers, we can simply reassign the slot to
// the pointer of the newly created destination example for the parser.
temp_ex[0] = &example_factory();
}
} while (has_more);

VW::finish_example(*all, temp_ex);
return true;
return VW::experimental::error_code::success;
}

const VW::parsers::flatbuffer::ExampleRoot* parser::data() { return _data; }
Expand Down Expand Up @@ -198,16 +237,17 @@ int parser::process_collection_item(VW::workspace* all, VW::multi_ex& examples,
{
_active_multi_ex = true;
_multi_example_object = _data->example_obj_as_ExampleCollection()->multi_examples()->Get(_example_index);

// read from active multi_ex
RETURN_IF_FAIL(parse_multi_example(all, examples[0], _multi_example_object, status));
// read from active collection

// if we are done with the multi example, move to the next one, or finish the collection
if (!_active_multi_ex)
{
_example_index++;
if (_example_index == _data->example_obj_as_ExampleCollection()->multi_examples()->size())
{
_example_index = 0;
_active_collection = false;
reset_active_collection();
}
}
}
Expand All @@ -216,11 +256,7 @@ int parser::process_collection_item(VW::workspace* all, VW::multi_ex& examples,
const auto ex = _data->example_obj_as_ExampleCollection()->examples()->Get(_example_index);
RETURN_IF_FAIL(parse_example(all, examples[0], ex, status));
_example_index++;
if (_example_index == _data->example_obj_as_ExampleCollection()->examples()->size())
{
_example_index = 0;
_active_collection = false;
}
if (_example_index == _data->example_obj_as_ExampleCollection()->examples()->size()) { reset_active_collection(); }
}
return VW::experimental::error_code::success;
}
Expand All @@ -231,6 +267,20 @@ int parser::parse_examples(VW::workspace* all, io_buf& buf, VW::multi_ex& exampl
#define RETURN_SUCCESS_FINISHED() \
return buffer_pointer ? VW::experimental::error_code::nothing_to_parse : VW::experimental::error_code::success;

// If we are re-using a single parser instance across multiple invocations, we need to reset
// the state when we get a new buffer_pointer. Otherwise we may be in the middle of a multi_ex
// or example_collection, and the following parse will attempt to reuse the object references
// from the previous buffer, which may have been deallocated.
// TODO: Rewrite the parser to avoid this convoluted, re-entrant logic.
if (buffer_pointer && _flatbuffer_pointer != buffer_pointer)
{
reset_active_multi_ex();
reset_active_collection();
}

// The ExampleCollection processing code owns dispatching to parse_multi_example to handle
// iteration through the outer collection correctly, thus it must have the first chance to
// incoming parse request.
if (_active_collection)
{
RETURN_IF_FAIL(process_collection_item(all, examples, status));
Expand Down Expand Up @@ -307,9 +357,7 @@ int parser::parse_multi_example(
{
// done with multi example, send a newline example and reset
ae->is_newline = true;
_multi_ex_index = 0;
_active_multi_ex = false;
_multi_example_object = nullptr;
reset_active_multi_ex();
return VW::experimental::error_code::success;
}

Expand All @@ -325,30 +373,11 @@ int parser::get_namespace_index(const Namespace* ns, namespace_index& ni, VW::ex
ni = static_cast<uint8_t>(ns->name()->c_str()[0]);
return VW::experimental::error_code::success;
}
else if (flatbuffers::IsFieldPresent(ns, Namespace::VT_HASH))
else
{
ni = ns->hash();
return VW::experimental::error_code::success;
}

if (_active_collection && _active_multi_ex)
{
RETURN_ERROR_LS(status, fb_parser_name_hash_missing)
<< "Either name or hash field must be specified to get the namespace index in collection item with example "
"index "
<< _example_index << "and multi example index " << _multi_ex_index;
}
else if (_active_multi_ex)
{
RETURN_ERROR_LS(status, fb_parser_name_hash_missing)
<< "Either name or hash field must be specified to get the namespace index in multi example index "
<< _multi_ex_index;
}
else
{
RETURN_ERROR_LS(status, fb_parser_name_hash_missing)
<< "Either name or hash field must be specified to get the namespace index";
}
}

bool get_namespace_hash(VW::workspace* all, const Namespace* ns, uint64_t& hash)
Expand Down Expand Up @@ -462,7 +491,7 @@ int parser::parse_namespaces(VW::workspace* all, example* ae, const Namespace* n
}
else
{
if (!has_hashes) { RETURN_NS_PARSER_ERROR(status, fb_parser_name_hash_missing) }
if (!has_hashes) { RETURN_NS_PARSER_ERROR(status, fb_parser_feature_hashes_names_missing) }

if (ns->feature_hashes()->size() != ns->feature_values()->size())
{
Expand Down Expand Up @@ -541,6 +570,7 @@ int parser::parse_flat_label(
break;
}
case Label_NONE:
case Label_no_label:
break;
default:
if (_active_collection && _active_multi_ex)
Expand Down
1 change: 1 addition & 0 deletions vowpalwabbit/fb_parser/src/parse_label.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// license as described in the file LICENSE.

#include "vw/core/action_score.h"
#include "vw/core/api_status.h"
#include "vw/core/best_constant.h"
#include "vw/core/cb.h"
#include "vw/core/constant.h"
Expand Down
Loading

0 comments on commit 2f47639

Please sign in to comment.