Skip to content

Commit 122a759

Browse files
committed
Refactoring sweep and cleanup of public IPC API. Move non-public APIs from metadata.h to metadata-internal.h and create message.h, dictionary.h
Change-Id: I3fbacc6a0f5d02b734a04b9717d426cb1b89ace2
1 parent b646f96 commit 122a759

25 files changed

+947
-622
lines changed

cpp/CMakeLists.txt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,12 +217,12 @@ include(BuildUtils)
217217
# Compiler flags
218218
############################################################
219219

220-
include(SetupCxxFlags)
221-
222220
if (ARROW_NO_DEPRECATED_API)
223221
add_definitions(-DARROW_NO_DEPRECATED_API)
224222
endif()
225223

224+
include(SetupCxxFlags)
225+
226226
############################################################
227227
# Dependencies
228228
############################################################
@@ -780,10 +780,12 @@ endif()
780780

781781
if (ARROW_IPC)
782782
set(ARROW_SRCS ${ARROW_SRCS}
783+
src/arrow/ipc/dictionary.cc
783784
src/arrow/ipc/feather.cc
784785
src/arrow/ipc/json.cc
785786
src/arrow/ipc/json-internal.cc
786-
src/arrow/ipc/metadata.cc
787+
src/arrow/ipc/message.cc
788+
src/arrow/ipc/metadata-internal.cc
787789
src/arrow/ipc/reader.cc
788790
src/arrow/ipc/writer.cc
789791
)

cpp/src/arrow/ipc/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,10 @@ add_custom_target(metadata_fbs DEPENDS ${FBS_OUTPUT_FILES})
8383
# Headers: top level
8484
install(FILES
8585
api.h
86+
dictionary.h
8687
feather.h
8788
json.h
88-
metadata.h
89+
message.h
8990
reader.h
9091
writer.h
9192
DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/ipc")

cpp/src/arrow/ipc/api.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
#ifndef ARROW_IPC_API_H
1919
#define ARROW_IPC_API_H
2020

21+
#include "arrow/ipc/dictionary.h"
2122
#include "arrow/ipc/feather.h"
2223
#include "arrow/ipc/json.h"
23-
#include "arrow/ipc/metadata.h"
24+
#include "arrow/ipc/message.h"
2425
#include "arrow/ipc/reader.h"
2526
#include "arrow/ipc/writer.h"
2627

cpp/src/arrow/ipc/dictionary.cc

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "arrow/ipc/dictionary.h"
19+
20+
#include <cstdint>
21+
#include <memory>
22+
#include <sstream>
23+
24+
#include "arrow/array.h"
25+
#include "arrow/status.h"
26+
#include "arrow/type.h"
27+
28+
namespace arrow {
29+
namespace ipc {
30+
31+
DictionaryMemo::DictionaryMemo() {}
32+
33+
// Returns KeyError if dictionary not found
34+
Status DictionaryMemo::GetDictionary(int64_t id,
35+
std::shared_ptr<Array>* dictionary) const {
36+
auto it = id_to_dictionary_.find(id);
37+
if (it == id_to_dictionary_.end()) {
38+
std::stringstream ss;
39+
ss << "Dictionary with id " << id << " not found";
40+
return Status::KeyError(ss.str());
41+
}
42+
*dictionary = it->second;
43+
return Status::OK();
44+
}
45+
46+
int64_t DictionaryMemo::GetId(const std::shared_ptr<Array>& dictionary) {
47+
intptr_t address = reinterpret_cast<intptr_t>(dictionary.get());
48+
auto it = dictionary_to_id_.find(address);
49+
if (it != dictionary_to_id_.end()) {
50+
// Dictionary already observed, return the id
51+
return it->second;
52+
} else {
53+
int64_t new_id = static_cast<int64_t>(dictionary_to_id_.size());
54+
dictionary_to_id_[address] = new_id;
55+
id_to_dictionary_[new_id] = dictionary;
56+
return new_id;
57+
}
58+
}
59+
60+
bool DictionaryMemo::HasDictionary(const std::shared_ptr<Array>& dictionary) const {
61+
intptr_t address = reinterpret_cast<intptr_t>(dictionary.get());
62+
auto it = dictionary_to_id_.find(address);
63+
return it != dictionary_to_id_.end();
64+
}
65+
66+
bool DictionaryMemo::HasDictionaryId(int64_t id) const {
67+
auto it = id_to_dictionary_.find(id);
68+
return it != id_to_dictionary_.end();
69+
}
70+
71+
Status DictionaryMemo::AddDictionary(int64_t id,
72+
const std::shared_ptr<Array>& dictionary) {
73+
if (HasDictionaryId(id)) {
74+
std::stringstream ss;
75+
ss << "Dictionary with id " << id << " already exists";
76+
return Status::KeyError(ss.str());
77+
}
78+
intptr_t address = reinterpret_cast<intptr_t>(dictionary.get());
79+
id_to_dictionary_[id] = dictionary;
80+
dictionary_to_id_[address] = id;
81+
return Status::OK();
82+
}
83+
84+
} // namespace ipc
85+
} // namespace arrow

cpp/src/arrow/ipc/dictionary.h

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
// Tools for dictionaries in IPC context
19+
20+
#ifndef ARROW_IPC_DICTIONARY_H
21+
#define ARROW_IPC_DICTIONARY_H
22+
23+
#include <cstdint>
24+
#include <memory>
25+
#include <string>
26+
#include <unordered_map>
27+
#include <vector>
28+
29+
#include "arrow/status.h"
30+
#include "arrow/util/macros.h"
31+
#include "arrow/util/visibility.h"
32+
33+
namespace arrow {
34+
35+
class Array;
36+
class Buffer;
37+
class Field;
38+
39+
namespace io {
40+
41+
class InputStream;
42+
class OutputStream;
43+
class RandomAccessFile;
44+
45+
} // namespace io
46+
47+
namespace ipc {
48+
49+
using DictionaryMap = std::unordered_map<int64_t, std::shared_ptr<Array>>;
50+
using DictionaryTypeMap = std::unordered_map<int64_t, std::shared_ptr<Field>>;
51+
52+
// Memoization data structure for handling shared dictionaries
53+
class ARROW_EXPORT DictionaryMemo {
54+
public:
55+
DictionaryMemo();
56+
57+
// Returns KeyError if dictionary not found
58+
Status GetDictionary(int64_t id, std::shared_ptr<Array>* dictionary) const;
59+
60+
/// Return id for dictionary, computing new id if necessary
61+
int64_t GetId(const std::shared_ptr<Array>& dictionary);
62+
63+
bool HasDictionary(const std::shared_ptr<Array>& dictionary) const;
64+
bool HasDictionaryId(int64_t id) const;
65+
66+
// Add a dictionary to the memo with a particular id. Returns KeyError if
67+
// that dictionary already exists
68+
Status AddDictionary(int64_t id, const std::shared_ptr<Array>& dictionary);
69+
70+
const DictionaryMap& id_to_dictionary() const { return id_to_dictionary_; }
71+
72+
int size() const { return static_cast<int>(id_to_dictionary_.size()); }
73+
74+
private:
75+
// Dictionary memory addresses, to track whether a dictionary has been seen
76+
// before
77+
std::unordered_map<intptr_t, int64_t> dictionary_to_id_;
78+
79+
// Map of dictionary id to dictionary array
80+
DictionaryMap id_to_dictionary_;
81+
82+
DISALLOW_COPY_AND_ASSIGN(DictionaryMemo);
83+
};
84+
85+
} // namespace ipc
86+
} // namespace arrow
87+
88+
#endif // ARROW_IPC_DICTIONARY_H

cpp/src/arrow/ipc/file-to-stream.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ Status ConvertToStream(const char* path) {
3535
RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(in_file, &reader));
3636

3737
io::StdoutStream sink;
38-
std::shared_ptr<RecordBatchStreamWriter> writer;
38+
std::shared_ptr<RecordBatchWriter> writer;
3939
RETURN_NOT_OK(RecordBatchStreamWriter::Open(&sink, reader->schema(), &writer));
4040
for (int i = 0; i < reader->num_record_batches(); ++i) {
4141
std::shared_ptr<RecordBatch> chunk;

cpp/src/arrow/ipc/ipc-read-write-benchmark.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ static void BM_ReadRecordBatch(benchmark::State& state) { // NOLINT non-const r
109109
std::shared_ptr<RecordBatch> result;
110110
io::BufferReader reader(buffer);
111111

112-
if (!ipc::ReadRecordBatch(record_batch->schema(), 0, &reader, &result).ok()) {
112+
if (!ipc::ReadRecordBatch(record_batch->schema(), &reader, &result).ok()) {
113113
state.SkipWithError("Failed to read!");
114114
}
115115
}

cpp/src/arrow/ipc/ipc-read-write-test.cc

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "arrow/io/memory.h"
3030
#include "arrow/io/test-common.h"
3131
#include "arrow/ipc/api.h"
32+
#include "arrow/ipc/metadata-internal.h"
3233
#include "arrow/ipc/test-common.h"
3334
#include "arrow/ipc/util.h"
3435
#include "arrow/memory_pool.h"
@@ -47,21 +48,14 @@ class TestSchemaMetadata : public ::testing::Test {
4748
public:
4849
void SetUp() {}
4950

50-
void CheckRoundtrip(const Schema& schema, DictionaryMemo* memo) {
51+
void CheckRoundtrip(const Schema& schema) {
5152
std::shared_ptr<Buffer> buffer;
52-
ASSERT_OK(WriteSchemaMessage(schema, memo, &buffer));
53+
ASSERT_OK(SerializeSchema(schema, default_memory_pool(), &buffer));
5354

54-
std::unique_ptr<Message> message;
55-
ASSERT_OK(Message::Open(buffer, nullptr, &message));
56-
57-
ASSERT_EQ(Message::SCHEMA, message->type());
58-
59-
DictionaryMemo empty_memo;
60-
61-
std::shared_ptr<Schema> schema2;
62-
ASSERT_OK(GetSchema(message->header(), empty_memo, &schema2));
63-
64-
AssertSchemaEqual(schema, *schema2);
55+
std::shared_ptr<Schema> result;
56+
io::BufferReader reader(buffer);
57+
ASSERT_OK(ReadSchema(&reader, &result));
58+
AssertSchemaEqual(schema, *result);
6559
}
6660
};
6761

@@ -107,9 +101,7 @@ TEST_F(TestSchemaMetadata, PrimitiveFields) {
107101
auto f10 = field("f10", std::make_shared<BooleanType>());
108102

109103
Schema schema({f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10});
110-
DictionaryMemo memo;
111-
112-
CheckRoundtrip(schema, &memo);
104+
CheckRoundtrip(schema);
113105
}
114106

115107
TEST_F(TestSchemaMetadata, NestedFields) {
@@ -121,9 +113,7 @@ TEST_F(TestSchemaMetadata, NestedFields) {
121113
auto f1 = field("f1", type2);
122114

123115
Schema schema({f0, f1});
124-
DictionaryMemo memo;
125-
126-
CheckRoundtrip(schema, &memo);
116+
CheckRoundtrip(schema);
127117
}
128118

129119
#define BATCH_CASES() \
@@ -137,13 +127,21 @@ static int g_file_number = 0;
137127

138128
class IpcTestFixture : public io::MemoryMapFixture {
139129
public:
130+
Status DoSchemaRoundTrip(const Schema& schema, std::shared_ptr<Schema>* result) {
131+
std::shared_ptr<Buffer> serialized_schema;
132+
RETURN_NOT_OK(SerializeSchema(schema, pool_, &serialized_schema));
133+
134+
io::BufferReader buf_reader(serialized_schema);
135+
return ReadSchema(&buf_reader, result);
136+
}
137+
140138
Status DoStandardRoundTrip(const RecordBatch& batch,
141139
std::shared_ptr<RecordBatch>* batch_result) {
142140
std::shared_ptr<Buffer> serialized_batch;
143141
RETURN_NOT_OK(SerializeRecordBatch(batch, pool_, &serialized_batch));
144142

145143
io::BufferReader buf_reader(serialized_batch);
146-
return ReadRecordBatch(batch.schema(), 0, &buf_reader, batch_result);
144+
return ReadRecordBatch(batch.schema(), &buf_reader, batch_result);
147145
}
148146

149147
Status DoLargeRoundTrip(const RecordBatch& batch, bool zero_data,
@@ -153,7 +151,7 @@ class IpcTestFixture : public io::MemoryMapFixture {
153151
}
154152
RETURN_NOT_OK(mmap_->Seek(0));
155153

156-
std::shared_ptr<RecordBatchFileWriter> file_writer;
154+
std::shared_ptr<RecordBatchWriter> file_writer;
157155
RETURN_NOT_OK(RecordBatchFileWriter::Open(mmap_.get(), batch.schema(), &file_writer));
158156
RETURN_NOT_OK(file_writer->WriteRecordBatch(batch, true));
159157
RETURN_NOT_OK(file_writer->Close());
@@ -182,6 +180,10 @@ class IpcTestFixture : public io::MemoryMapFixture {
182180
ss << "test-write-row-batch-" << g_file_number++;
183181
ASSERT_OK(io::MemoryMapFixture::InitMemoryMap(buffer_size, ss.str(), &mmap_));
184182

183+
std::shared_ptr<Schema> schema_result;
184+
ASSERT_OK(DoSchemaRoundTrip(*batch.schema(), &schema_result));
185+
ASSERT_TRUE(batch.schema()->Equals(*schema_result));
186+
185187
std::shared_ptr<RecordBatch> result;
186188
ASSERT_OK(DoStandardRoundTrip(batch, &result));
187189
CheckReadResult(*result, batch);
@@ -498,7 +500,7 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
498500

499501
Status RoundTripHelper(const BatchVector& in_batches, BatchVector* out_batches) {
500502
// Write the file
501-
std::shared_ptr<RecordBatchFileWriter> writer;
503+
std::shared_ptr<RecordBatchWriter> writer;
502504
RETURN_NOT_OK(
503505
RecordBatchFileWriter::Open(sink_.get(), in_batches[0]->schema(), &writer));
504506

@@ -565,7 +567,7 @@ class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
565567
Status RoundTripHelper(const RecordBatch& batch,
566568
std::vector<std::shared_ptr<RecordBatch>>* out_batches) {
567569
// Write the file
568-
std::shared_ptr<RecordBatchStreamWriter> writer;
570+
std::shared_ptr<RecordBatchWriter> writer;
569571
RETURN_NOT_OK(RecordBatchStreamWriter::Open(sink_.get(), batch.schema(), &writer));
570572
int num_batches = 5;
571573
for (int i = 0; i < num_batches; ++i) {
@@ -575,10 +577,10 @@ class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
575577
RETURN_NOT_OK(sink_->Close());
576578

577579
// Open the file
578-
auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
580+
io::BufferReader buf_reader(buffer_);
579581

580-
std::shared_ptr<RecordBatchStreamReader> reader;
581-
RETURN_NOT_OK(RecordBatchStreamReader::Open(buf_reader, &reader));
582+
std::shared_ptr<RecordBatchReader> reader;
583+
RETURN_NOT_OK(RecordBatchStreamReader::Open(&buf_reader, &reader));
582584

583585
std::shared_ptr<RecordBatch> chunk;
584586
while (true) {

cpp/src/arrow/ipc/json-integration-test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ static Status ConvertJsonToArrow(const std::string& json_path,
7777
std::cout << "Found schema: " << reader->schema()->ToString() << std::endl;
7878
}
7979

80-
std::shared_ptr<ipc::RecordBatchFileWriter> writer;
80+
std::shared_ptr<ipc::RecordBatchWriter> writer;
8181
RETURN_NOT_OK(
8282
ipc::RecordBatchFileWriter::Open(out_file.get(), reader->schema(), &writer));
8383

cpp/src/arrow/ipc/json-internal.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232

3333
#include "arrow/array.h"
3434
#include "arrow/builder.h"
35-
#include "arrow/ipc/metadata.h"
35+
#include "arrow/ipc/metadata-internal.h"
3636
#include "arrow/memory_pool.h"
3737
#include "arrow/status.h"
3838
#include "arrow/table.h"

0 commit comments

Comments
 (0)