Skip to content

Commit

Permalink
Merge pull request apache#154 from rafael-telles/flight-sql-cpp-serve…
Browse files Browse the repository at this point in the history
…r-update

[C++] Implement CommandStatementUpdate on server example
  • Loading branch information
rafael-telles authored Oct 8, 2021
2 parents 4e66c39 + fef63b7 commit 732d519
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 1 deletion.
3 changes: 2 additions & 1 deletion cpp/src/arrow/flight/flight-sql/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ Status FlightSqlClientT<T>::ExecuteUpdate(const FlightCallOptions& options,

const FlightDescriptor& descriptor = GetFlightDescriptorForCommand(command);

std::unique_ptr<FlightStreamWriter> writer;
std::unique_ptr<FlightMetadataReader> reader;

ARROW_RETURN_NOT_OK(client->DoPut(options, descriptor, NULLPTR, NULL, &reader));
ARROW_RETURN_NOT_OK(client->DoPut(options, descriptor, NULLPTR, &writer, &reader));

std::shared_ptr<Buffer> metadata;

Expand Down
22 changes: 22 additions & 0 deletions cpp/src/arrow/flight/flight-sql/example/sqlite_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,28 @@ Status SQLiteFlightSqlServer::DoGetTables(const pb::sql::CommandGetTables& comma
return Status::OK();
}

Status SQLiteFlightSqlServer::DoPutCommandStatementUpdate(
const pb::sql::CommandStatementUpdate& command, const ServerCallContext& context,
std::unique_ptr<FlightMessageReader>& reader,
std::unique_ptr<FlightMetadataWriter>& writer) {
const std::string& sql = command.query();

std::shared_ptr<SqliteStatement> statement;
ARROW_RETURN_NOT_OK(SqliteStatement::Create(db_, sql, &statement));

pb::sql::DoPutUpdateResult result;

int64_t record_count;
ARROW_RETURN_NOT_OK(statement->ExecuteUpdate(&record_count));

result.set_record_count(record_count);

const std::shared_ptr<Buffer>& buffer = Buffer::FromString(result.SerializeAsString());
ARROW_RETURN_NOT_OK(writer->WriteMetadata(*buffer));

return Status::OK();
}

} // namespace example
} // namespace sql
} // namespace flight
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/flight/flight-sql/example/sqlite_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ class SQLiteFlightSqlServer : public FlightSqlServerBase {
Status DoGetSchemas(const pb::sql::CommandGetSchemas& command,
const ServerCallContext& context,
std::unique_ptr<FlightDataStream>* result) override;
Status DoPutCommandStatementUpdate(
const pb::sql::CommandStatementUpdate& update, const ServerCallContext& context,
std::unique_ptr<FlightMessageReader>& reader,
std::unique_ptr<FlightMetadataWriter>& writer) override;

Status GetFlightInfoTables(const pb::sql::CommandGetTables& command,
const ServerCallContext& context,
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/arrow/flight/flight-sql/example/sqlite_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ Status SqliteStatement::Reset(int* rc) {

sqlite3_stmt* SqliteStatement::GetSqlite3Stmt() { return stmt_; }

Status SqliteStatement::ExecuteUpdate(int64_t* result) {
int rc;
ARROW_RETURN_NOT_OK(Step(&rc));

*result = sqlite3_changes(db_);

return Status::OK();
}

} // namespace example
} // namespace sql
} // namespace flight
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/flight/flight-sql/example/sqlite_statement.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ class SqliteStatement {
/// \brief Returns the underlying sqlite3_stmt.
sqlite3_stmt* GetSqlite3Stmt();

/// \brief Executes an UPDATE, INSERT or DELETE statement.
/// \param[out] result The number of rows changed by execution.
/// \return Status.
Status ExecuteUpdate(int64_t* result);

private:
sqlite3* db_;
sqlite3_stmt* stmt_;
Expand Down
24 changes: 24 additions & 0 deletions cpp/src/arrow/flight/flight-sql/sql_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,23 @@ Status FlightSqlServerBase::DoGet(const ServerCallContext& context, const Ticket
return Status::Invalid("The defined request is invalid.");
}

Status FlightSqlServerBase::DoPut(const ServerCallContext& context,
std::unique_ptr<FlightMessageReader> reader,
std::unique_ptr<FlightMetadataWriter> writer) {
const FlightDescriptor& request = reader->descriptor();

google::protobuf::Any any;
any.ParseFromArray(request.cmd.data(), static_cast<int>(request.cmd.size()));

if (any.Is<pb::sql::CommandStatementUpdate>()) {
pb::sql::CommandStatementUpdate command;
any.UnpackTo(&command);
return DoPutCommandStatementUpdate(command, context, reader, writer);
}

return Status::Invalid("The defined request is invalid.");
}

Status FlightSqlServerBase::GetFlightInfoCatalogs(const ServerCallContext& context,
const FlightDescriptor& descriptor,
std::unique_ptr<FlightInfo>* info) {
Expand Down Expand Up @@ -246,6 +263,13 @@ Status FlightSqlServerBase::GetFlightInfoImportedKeys(
return Status::NotImplemented("DoGetExportedKeys not implemented");
}

Status FlightSqlServerBase::DoPutCommandStatementUpdate(
const pb::sql::CommandStatementUpdate& command, const ServerCallContext& context,
std::unique_ptr<FlightMessageReader>& reader,
std::unique_ptr<FlightMetadataWriter>& writer) {
return Status::NotImplemented("DoPutCommandStatementUpdate not implemented");
}

std::shared_ptr<Schema> SqlSchema::GetCatalogsSchema() {
return arrow::schema({field("catalog_name", utf8())});
}
Expand Down
15 changes: 15 additions & 0 deletions cpp/src/arrow/flight/flight-sql/sql_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ class FlightSqlServerBase : public FlightServerBase {
Status DoGet(const ServerCallContext& context, const Ticket& request,
std::unique_ptr<FlightDataStream>* stream) override;

Status DoPut(const ServerCallContext& context,
std::unique_ptr<FlightMessageReader> reader,
std::unique_ptr<FlightMetadataWriter> writer) override;

/// \brief Gets a FlightInfo for executing a SQL query.
/// \param[in] command The CommandStatementQuery object containing the SQL
/// statement.
Expand Down Expand Up @@ -257,6 +261,17 @@ class FlightSqlServerBase : public FlightServerBase {
virtual Status DoGetImportedKeys(const pb::sql::CommandGetImportedKeys& command,
const ServerCallContext& context,
std::unique_ptr<FlightDataStream>* result);

/// \brief Executes an update SQL statement.
/// \param[in] command The CommandStatementUpdate object containing the SQL statement.
/// \param[in] context The call context.
/// \param[in] reader a sequence of uploaded record batches.
/// \param[in] writer send metadata back to the client.
/// \return Status.
virtual Status DoPutCommandStatementUpdate(
const pb::sql::CommandStatementUpdate& command, const ServerCallContext& context,
std::unique_ptr<FlightMessageReader>& reader,
std::unique_ptr<FlightMetadataWriter>& writer);
};

/// \brief Auxiliary class containing all Schemas used on Flight SQL.
Expand Down
21 changes: 21 additions & 0 deletions cpp/src/arrow/flight/flight-sql/sql_server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,27 @@ TEST(TestFlightSqlServer, TestCommandGetSchemas) {
ASSERT_EQ(0, table->num_rows());
}

TEST(TestFlightSqlServer, TestCommandStatementUpdate) {
int64_t result;
ASSERT_OK(sql_client->ExecuteUpdate(
{},
"INSERT INTO intTable (keyName, value) VALUES "
"('KEYNAME1', 1001), ('KEYNAME2', 1002), ('KEYNAME3', 1003)",
&result));
ASSERT_EQ(3, result);

ASSERT_OK(
sql_client->ExecuteUpdate({},
"UPDATE intTable SET keyName = 'KEYNAME1' "
"WHERE keyName = 'KEYNAME2' OR keyName = 'KEYNAME3'",
&result));
ASSERT_EQ(2, result);

ASSERT_OK(sql_client->ExecuteUpdate(
{}, "DELETE FROM intTable WHERE keyName = 'KEYNAME1'", &result));
ASSERT_EQ(3, result);
}

auto env =
::testing::AddGlobalTestEnvironment(new TestFlightSqlServer);

Expand Down

0 comments on commit 732d519

Please sign in to comment.