Skip to content

Commit

Permalink
[CPP] Implements GetPrimaryKeys on flight sql server (apache#162)
Browse files Browse the repository at this point in the history
* Add Schema template for the primary keys

* Implement GetPrimaryKeys on server

* Add an integrated test for GetPrimaryKeys

* Fix checkstyle

* Add a comment to the query on primary keys query

* Use GetFlightInfoForCommand helper method on GetFlightInfoPrimaryKeys

Co-authored-by: Rafael Telles <rafael@telles.dev>
  • Loading branch information
jcralmeida and rafael-telles committed Dec 3, 2021
1 parent e8efe63 commit 171d540
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 1 deletion.
45 changes: 45 additions & 0 deletions cpp/src/arrow/flight/flight-sql/example/sqlite_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,51 @@ Status SQLiteFlightSqlServer::DoGetTableTypes(const ServerCallContext& context,
return Status::OK();
}

Status SQLiteFlightSqlServer::GetFlightInfoPrimaryKeys(
const pb::sql::CommandGetPrimaryKeys& command, const ServerCallContext& context,
const FlightDescriptor& descriptor, std::unique_ptr<FlightInfo>* info) {
return GetFlightInfoForCommand(descriptor, info, command,
SqlSchema::GetPrimaryKeysSchema());
}

Status
SQLiteFlightSqlServer::DoGetPrimaryKeys(const pb::sql::CommandGetPrimaryKeys &command,
const ServerCallContext &context,
std::unique_ptr<FlightDataStream> *result) {
std::stringstream table_query;

// The field key_name can not be recovered by the sqlite, so it is being set
// to null following the same pattern for catalog_name and schema_name.
table_query << "SELECT null as catalog_name, null as schema_name, table_name, "
"name as column_name, pk as key_sequence, null as key_name\n"
"FROM pragma_table_info(table_name)\n"
" JOIN (SELECT null as catalog_name, null as schema_name, name as "
"table_name, type as table_type\n"
"FROM sqlite_master) where 1=1 and pk != 0";

if (command.has_catalog()) {
table_query << " and catalog_name LIKE '" << command.catalog() << "'";
}

if (command.has_schema()) {
table_query << " and schema_name LIKE '" << command.schema() << "'";
}

table_query << " and table_name LIKE '" << command.table() << "'";

std::shared_ptr<SqliteStatement> statement;
ARROW_RETURN_NOT_OK(SqliteStatement::Create(db_, table_query.str(), &statement));

std::shared_ptr<SqliteStatementBatchReader> reader;
ARROW_RETURN_NOT_OK(SqliteStatementBatchReader::Create(
statement, SqlSchema::GetPrimaryKeysSchema(), &reader));

*result = std::unique_ptr<FlightDataStream>(
new RecordBatchStream(reader));

return Status::OK();
}

} // namespace example
} // namespace sql
} // namespace flight
Expand Down
11 changes: 10 additions & 1 deletion cpp/src/arrow/flight/flight-sql/example/sqlite_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,16 @@ class SQLiteFlightSqlServer : public FlightSqlServerBase {
Status DoGetTableTypes(const ServerCallContext &context,
std::unique_ptr<FlightDataStream> *result) override;

private:
Status GetFlightInfoPrimaryKeys(const pb::sql::CommandGetPrimaryKeys &command,
const ServerCallContext &context,
const FlightDescriptor &descriptor,
std::unique_ptr<FlightInfo> *info) override;

Status DoGetPrimaryKeys(const pb::sql::CommandGetPrimaryKeys &command,
const ServerCallContext &context,
std::unique_ptr<FlightDataStream> *result) override;

private:
sqlite3* db_;
};

Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/flight/flight-sql/sql_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,12 @@ std::shared_ptr<Schema> SqlSchema::GetTableTypesSchema() {
return arrow::schema({field("table_type", utf8())});
}

std::shared_ptr<Schema> SqlSchema::GetPrimaryKeysSchema() {
return arrow::schema({field("catalog_name", utf8()), field("schema_name", utf8()),
field("table_name", utf8()), field("column_name", utf8()),
field("key_sequence", int64()), field("key_name", utf8())});
}

} // namespace sql
} // namespace flight
} // namespace arrow
5 changes: 5 additions & 0 deletions cpp/src/arrow/flight/flight-sql/sql_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@ class SqlSchema {
/// \brief Gets the Schema used on CommandGetTableTypes response.
/// \return The default schema template.
static std::shared_ptr<Schema> GetTableTypesSchema();

/// \brief Gets the Schema used on CommandGetPrimaryKeys response when included schema
/// flags is set to true.
/// \return The default schema template.
static std::shared_ptr<Schema> GetPrimaryKeysSchema();
};
} // namespace sql
} // namespace flight
Expand Down
26 changes: 26 additions & 0 deletions cpp/src/arrow/flight/flight-sql/sql_server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,32 @@ TEST(TestFlightSqlServer, TestCommandStatementUpdate) {
ASSERT_EQ(3, result);
}

TEST(TestFlightSqlServer, TestCommandGetPrimaryKeys) {
std::unique_ptr<FlightInfo> flight_info;
std::vector<std::string> table_types;
ASSERT_OK(sql_client->GetPrimaryKeys({}, nullptr, nullptr, "int%",
&flight_info));

std::unique_ptr<FlightStreamReader> stream;
ASSERT_OK(sql_client->DoGet({}, flight_info->endpoints()[0].ticket, &stream));

std::shared_ptr<Table> table;
ASSERT_OK(stream->ReadAll(&table));

DECLARE_NULL_ARRAY(catalog_name, String, 1);
DECLARE_NULL_ARRAY(schema_name, String, 1);
DECLARE_ARRAY(table_name, String, ({"intTable"}));
DECLARE_ARRAY(column_name, String, ({"id"}));
DECLARE_ARRAY(key_sequence, Int64, ({1}));
DECLARE_NULL_ARRAY(key_name, String, 1);

const std::shared_ptr<Table>& expected_table = Table::Make(
SqlSchema::GetPrimaryKeysSchema(),
{catalog_name, schema_name, table_name, column_name, key_sequence, key_name});

ASSERT_TRUE(expected_table->Equals(*table));
}

auto env =
::testing::AddGlobalTestEnvironment(new TestFlightSqlServer);

Expand Down

0 comments on commit 171d540

Please sign in to comment.