Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 44 additions & 62 deletions be/src/vec/functions/function_json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include <glog/logging.h>
#include <rapidjson/allocators.h>
#include <rapidjson/document.h>
#include <rapidjson/encodings.h>
Expand Down Expand Up @@ -144,45 +145,7 @@ rapidjson::Value* match_value(const std::vector<JsonPath>& parsed_paths, rapidjs
const std::string& col = parsed_paths[i].key;
int index = parsed_paths[i].idx;
if (LIKELY(!col.empty())) {
if (root->IsArray()) {
array_obj = static_cast<rapidjson::Value*>(
mem_allocator.Malloc(sizeof(rapidjson::Value)));
array_obj->SetArray();
bool is_null = true;

// if array ,loop the array,find out all Objects,then find the results from the objects
for (int j = 0; j < root->Size(); j++) {
rapidjson::Value* json_elem = &((*root)[j]);

if (json_elem->IsArray() || json_elem->IsNull()) {
continue;
} else {
if (!json_elem->IsObject()) {
continue;
}
if (!json_elem->HasMember(col.c_str())) {
if (is_insert_null) { // not found item, then insert a null object.
is_null = false;
rapidjson::Value nullObject(rapidjson::kNullType);
array_obj->PushBack(nullObject, mem_allocator);
}
continue;
}
rapidjson::Value* obj = &((*json_elem)[col.c_str()]);
if (obj->IsArray()) {
is_null = false;
for (int k = 0; k < obj->Size(); k++) {
array_obj->PushBack((*obj)[k], mem_allocator);
}
} else if (!obj->IsNull()) {
is_null = false;
array_obj->PushBack(*obj, mem_allocator);
}
}
}

root = is_null ? &(array_obj->SetNull()) : array_obj;
} else if (root->IsObject()) {
if (root->IsObject()) {
if (!root->HasMember(col.c_str())) {
return nullptr;
} else {
Expand Down Expand Up @@ -233,8 +196,17 @@ rapidjson::Value* get_json_object(std::string_view json_string, std::string_view

//Cannot use '\' as the last character, return NULL
if (path_string.back() == '\\') {
document->SetNull();
return document;
return nullptr;
}

std::string fixed_string;
if (path_string.size() >= 2 && path_string[0] == '$' && path_string[1] != '.') {
// Boost tokenizer requires explicit "." after "$" to correctly extract JSON path tokens.
// Without this, expressions like "$[0].key" cannot be properly split.
// This commit ensures a "." is automatically added after "$" to maintain consistent token parsing behavior.
fixed_string = "$.";
fixed_string += path_string.substr(1);
path_string = fixed_string;
}

try {
Expand All @@ -251,13 +223,13 @@ rapidjson::Value* get_json_object(std::string_view json_string, std::string_view
}
} catch (boost::escaped_list_error&) {
// meet unknown escape sequence, example '$.name\k'
return document;
return nullptr;
}

parsed_paths = &tmp_parsed_paths;

if (!(*parsed_paths)[0].is_valid) {
return document;
return nullptr;
}

if (UNLIKELY((*parsed_paths).size() == 1)) {
Expand All @@ -272,8 +244,7 @@ rapidjson::Value* get_json_object(std::string_view json_string, std::string_view
if (UNLIKELY(document->HasParseError())) {
// VLOG_CRITICAL << "Error at offset " << document->GetErrorOffset() << ": "
// << GetParseError_En(document->GetParseError());
document->SetNull();
return document;
return nullptr;
}

return match_value(*parsed_paths, document, document->GetAllocator());
Expand Down Expand Up @@ -858,9 +829,10 @@ template <typename Name, bool remove_quotes>
struct FunctionJsonExtractImpl {
static constexpr auto name = Name::name;

static rapidjson::Value parse_json(const ColumnString* json_col, const ColumnString* path_col,
rapidjson::Document::AllocatorType& allocator, const int row,
const int col, std::vector<bool>& column_is_consts) {
static std::pair<bool, rapidjson::Value> parse_json(
const ColumnString* json_col, const ColumnString* path_col,
rapidjson::Document::AllocatorType& allocator, const int row, const int col,
std::vector<bool>& column_is_consts) {
rapidjson::Value value;
rapidjson::Document document;

Expand All @@ -869,10 +841,13 @@ struct FunctionJsonExtractImpl {
const auto path = path_col->get_data_at(index_check_const(row, column_is_consts[col]));
std::string_view path_string(path.data, path.size);
auto* root = get_json_object<JSON_FUN_STRING>(json_string, path_string, &document);
bool found = false;
if (root != nullptr) {
found = true;
value.CopyFrom(*root, allocator);
}
return value;

return {found, std::move(value)};
}

static rapidjson::Value* get_document(const ColumnString* path_col,
Expand Down Expand Up @@ -913,8 +888,9 @@ struct FunctionJsonExtractImpl {
rapidjson::StringBuffer buf;
rapidjson::Writer<rapidjson::StringBuffer> writer(buf);
const auto* json_col = data_columns[0];
auto insert_result_lambda = [&](rapidjson::Value& value, int row) {
if (value.IsNull()) {

auto insert_result_lambda = [&](rapidjson::Value& value, bool is_null, int row) {
if (is_null) {
null_map[row] = 1;
result_column.insert_default();
} else {
Expand All @@ -935,12 +911,13 @@ struct FunctionJsonExtractImpl {
}
};
if (data_columns.size() == 2) {
rapidjson::Value value;
if (column_is_consts[1]) {
std::vector<JsonPath> parsed_paths;
auto* root = get_document(data_columns[1], &document, parsed_paths, 0,
column_is_consts[1]);
for (size_t row = 0; row < input_rows_count; row++) {
bool is_null = false;
rapidjson::Value value;
if (root != nullptr) {
const auto& obj = json_col->get_data_at(row);
std::string_view json_string(obj.data, obj.size);
Expand All @@ -957,17 +934,18 @@ struct FunctionJsonExtractImpl {
if (root_val != nullptr) {
value.CopyFrom(*root_val, allocator);
} else {
rapidjson::Value tmp;
value.Swap(tmp);
is_null = true;
}
} else {
is_null = true;
}
insert_result_lambda(value, row);
insert_result_lambda(value, is_null, row);
}
} else {
for (size_t row = 0; row < input_rows_count; row++) {
value = parse_json(json_col, data_columns[1], allocator, row, 1,
column_is_consts);
insert_result_lambda(value, row);
auto result = parse_json(json_col, data_columns[1], allocator, row, 1,
column_is_consts);
insert_result_lambda(result.second, !result.first, row);
}
}

Expand All @@ -977,12 +955,16 @@ struct FunctionJsonExtractImpl {
value.Reserve(data_columns.size() - 1, allocator);
for (size_t row = 0; row < input_rows_count; row++) {
value.Clear();
bool found_any = false;
for (size_t col = 1; col < data_columns.size(); ++col) {
value.PushBack(parse_json(json_col, data_columns[col], allocator, row, col,
column_is_consts),
allocator);
auto result = parse_json(json_col, data_columns[col], allocator, row, col,
column_is_consts);
if (result.first) {
found_any = true;
value.PushBack(std::move(result.second), allocator);
}
}
insert_result_lambda(value, row);
insert_result_lambda(value, !found_any, row);
}
}
}
Expand Down
16 changes: 7 additions & 9 deletions be/test/vec/function/function_json_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,13 @@ TEST(FunctionJsonTEST, GetJsonStringTest) {
std::string func_name = "get_json_string";
InputTypeSet input_types = {TypeIndex::String, TypeIndex::String};
DataSet data_set = {
{{VARCHAR("{\"k1\":\"v1\", \"k2\":\"v2\"}"), VARCHAR("$.k1")}, VARCHAR("v1")},
{{VARCHAR("{\"k1\":\"v1\", \"my.key\":[\"e1\", \"e2\", \"e3\"]}"),
VARCHAR("$.\"my.key\"[1]")},
{{VARCHAR(R"({"k1":"v1", "k2":"v2"})"), VARCHAR("$.k1")}, VARCHAR("v1")},
{{VARCHAR(R"({"k1":"v1", "my.key":["e1", "e2", "e3"]})"), VARCHAR("$.\"my.key\"[1]")},
VARCHAR("e2")},
{{VARCHAR("{\"k1.key\":{\"k2\":[\"v1\", \"v2\"]}}"), VARCHAR("$.\"k1.key\".k2[0]")},
{{VARCHAR(R"({"k1.key":{"k2":["v1", "v2"]}})"), VARCHAR("$.\"k1.key\".k2[0]")},
VARCHAR("v1")},
{{VARCHAR("[{\"k1\":\"v1\"}, {\"k2\":\"v2\"}, {\"k1\":\"v3\"}, {\"k1\":\"v4\"}]"),
VARCHAR("$.k1")},
VARCHAR("[\"v1\",\"v3\",\"v4\"]")}};
{{VARCHAR(R"([{"k1":"v1"}, {"k2":"v2"}, {"k1":"v3"}, {"k1":"v4"}])"), VARCHAR("$.k1")},
Null()}};

static_cast<void>(check_function<DataTypeString, true>(func_name, input_types, data_set));
}
Expand All @@ -93,7 +91,7 @@ TEST(FunctionJsonTEST, JsonExtractTest) {
// json_extract root
DataSet data_set = {
{{Null(), STRING("$")}, Null()},
{{STRING("null"), STRING("$")}, Null()},
{{STRING("null"), STRING("$")}, STRING("null")},
{{STRING("true"), STRING("$")}, STRING("true")},
{{STRING("false"), STRING("$")}, STRING("false")},
{{STRING("100"), STRING("$")}, STRING("100")}, //int8
Expand Down Expand Up @@ -127,7 +125,7 @@ TEST(FunctionJsonTEST, JsonExtractTest) {

data_set = {
{{Null(), STRING("$")}, Null()},
{{STRING("null"), STRING("$")}, Null()},
{{STRING("null"), STRING("$")}, STRING("null")},
{{STRING("true"), STRING("$")}, STRING("true")},
{{STRING("false"), STRING("$")}, STRING("false")},
{{STRING("100"), STRING("$")}, STRING("100")}, //int8
Expand Down
Loading
Loading