From 60273afc967896c0ad16231d110c953e8e17ade5 Mon Sep 17 00:00:00 2001 From: Rafael Telles Date: Thu, 7 Oct 2021 14:20:27 -0300 Subject: [PATCH 1/2] Implement CommandStatementUpdate on server example --- cpp/src/arrow/flight/flight-sql/client_impl.h | 3 ++- .../flight-sql/example/sqlite_server.cc | 22 +++++++++++++++++ .../flight/flight-sql/example/sqlite_server.h | 4 ++++ .../flight-sql/example/sqlite_statement.cc | 9 +++++++ .../flight-sql/example/sqlite_statement.h | 5 ++++ .../arrow/flight/flight-sql/sql_server.cpp | 24 +++++++++++++++++++ cpp/src/arrow/flight/flight-sql/sql_server.h | 9 +++++++ .../flight/flight-sql/sql_server_test.cc | 21 ++++++++++++++++ 8 files changed, 96 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/flight/flight-sql/client_impl.h b/cpp/src/arrow/flight/flight-sql/client_impl.h index 33f5933b6738a..46e063816b8f6 100644 --- a/cpp/src/arrow/flight/flight-sql/client_impl.h +++ b/cpp/src/arrow/flight/flight-sql/client_impl.h @@ -91,9 +91,10 @@ Status FlightSqlClientT::ExecuteUpdate(const FlightCallOptions& options, const FlightDescriptor& descriptor = GetFlightDescriptorForCommand(command); + std::unique_ptr writer; std::unique_ptr reader; - ARROW_RETURN_NOT_OK(client->DoPut(options, descriptor, NULLPTR, NULL, &reader)); + ARROW_RETURN_NOT_OK(client->DoPut(options, descriptor, NULLPTR, &writer, &reader)); std::shared_ptr metadata; diff --git a/cpp/src/arrow/flight/flight-sql/example/sqlite_server.cc b/cpp/src/arrow/flight/flight-sql/example/sqlite_server.cc index 38fe21e1b2c1a..5e47f1a002492 100644 --- a/cpp/src/arrow/flight/flight-sql/example/sqlite_server.cc +++ b/cpp/src/arrow/flight/flight-sql/example/sqlite_server.cc @@ -265,6 +265,28 @@ Status SQLiteFlightSqlServer::DoGetTables(const pb::sql::CommandGetTables& comma return Status::OK(); } +Status SQLiteFlightSqlServer::DoPutCommandStatementUpdate( + const pb::sql::CommandStatementUpdate& command, const ServerCallContext& context, + std::unique_ptr& reader, + std::unique_ptr& writer) { + const std::string& sql = command.query(); + + std::shared_ptr statement; + ARROW_RETURN_NOT_OK(SqliteStatement::Create(db_, sql, &statement)); + + pb::sql::DoPutUpdateResult result; + + int64_t record_count; + ARROW_RETURN_NOT_OK(statement->ExecuteUpdate(&record_count)); + + result.set_record_count(record_count); + + const std::shared_ptr& buffer = Buffer::FromString(result.SerializeAsString()); + ARROW_RETURN_NOT_OK(writer->WriteMetadata(*buffer)); + + return Status::OK(); +} + } // namespace example } // namespace sql } // namespace flight diff --git a/cpp/src/arrow/flight/flight-sql/example/sqlite_server.h b/cpp/src/arrow/flight/flight-sql/example/sqlite_server.h index 4464920f17b99..d41c57a6c93d4 100644 --- a/cpp/src/arrow/flight/flight-sql/example/sqlite_server.h +++ b/cpp/src/arrow/flight/flight-sql/example/sqlite_server.h @@ -61,6 +61,10 @@ class SQLiteFlightSqlServer : public FlightSqlServerBase { Status DoGetSchemas(const pb::sql::CommandGetSchemas& command, const ServerCallContext& context, std::unique_ptr* result) override; + Status DoPutCommandStatementUpdate( + const pb::sql::CommandStatementUpdate& update, const ServerCallContext& context, + std::unique_ptr& reader, + std::unique_ptr& writer) override; Status GetFlightInfoTables(const pb::sql::CommandGetTables& command, const ServerCallContext& context, diff --git a/cpp/src/arrow/flight/flight-sql/example/sqlite_statement.cc b/cpp/src/arrow/flight/flight-sql/example/sqlite_statement.cc index 1774cf69f96a3..a024371d624e4 100644 --- a/cpp/src/arrow/flight/flight-sql/example/sqlite_statement.cc +++ b/cpp/src/arrow/flight/flight-sql/example/sqlite_statement.cc @@ -94,6 +94,15 @@ Status SqliteStatement::Reset(int* rc) { sqlite3_stmt* SqliteStatement::GetSqlite3Stmt() { return stmt_; } +Status SqliteStatement::ExecuteUpdate(int64_t* result) { + int rc; + ARROW_RETURN_NOT_OK(Step(&rc)); + + *result = sqlite3_changes(db_); + + return Status::OK(); +} + } // namespace example } // namespace sql } // namespace flight diff --git a/cpp/src/arrow/flight/flight-sql/example/sqlite_statement.h b/cpp/src/arrow/flight/flight-sql/example/sqlite_statement.h index 5c573f2df9d2c..ee9191e42b3c8 100644 --- a/cpp/src/arrow/flight/flight-sql/example/sqlite_statement.h +++ b/cpp/src/arrow/flight/flight-sql/example/sqlite_statement.h @@ -56,6 +56,11 @@ class SqliteStatement { /// \brief Returns the underlying sqlite3_stmt. sqlite3_stmt* GetSqlite3Stmt(); + /// \brief Executes an UPDATE, INSERT or DELETE statement. + /// \param[out] result The number of rows changed by execution. + /// \return Status. + Status ExecuteUpdate(int64_t* result); + private: sqlite3* db_; sqlite3_stmt* stmt_; diff --git a/cpp/src/arrow/flight/flight-sql/sql_server.cpp b/cpp/src/arrow/flight/flight-sql/sql_server.cpp index e9a03649a0535..dfb700df6661f 100644 --- a/cpp/src/arrow/flight/flight-sql/sql_server.cpp +++ b/cpp/src/arrow/flight/flight-sql/sql_server.cpp @@ -126,6 +126,23 @@ Status FlightSqlServerBase::DoGet(const ServerCallContext& context, const Ticket return Status::Invalid("The defined request is invalid."); } +Status FlightSqlServerBase::DoPut(const ServerCallContext& context, + std::unique_ptr reader, + std::unique_ptr writer) { + const FlightDescriptor& request = reader->descriptor(); + + google::protobuf::Any any; + any.ParseFromArray(request.cmd.data(), static_cast(request.cmd.size())); + + if (any.Is()) { + pb::sql::CommandStatementUpdate command; + any.UnpackTo(&command); + return DoPutCommandStatementUpdate(command, context, reader, writer); + } + + return Status::Invalid("The defined request is invalid."); +} + Status FlightSqlServerBase::GetFlightInfoCatalogs(const ServerCallContext& context, const FlightDescriptor& descriptor, std::unique_ptr* info) { @@ -246,6 +263,13 @@ Status FlightSqlServerBase::GetFlightInfoImportedKeys( return Status::NotImplemented("DoGetExportedKeys not implemented"); } +Status FlightSqlServerBase::DoPutCommandStatementUpdate( + const pb::sql::CommandStatementUpdate& command, const ServerCallContext& context, + std::unique_ptr& reader, + std::unique_ptr& writer) { + return Status::NotImplemented("DoPutCommandStatementUpdate not implemented"); +} + std::shared_ptr SqlSchema::GetCatalogsSchema() { return arrow::schema({field("catalog_name", utf8())}); } diff --git a/cpp/src/arrow/flight/flight-sql/sql_server.h b/cpp/src/arrow/flight/flight-sql/sql_server.h index e5b2a52159766..4fa86fe68b4c0 100644 --- a/cpp/src/arrow/flight/flight-sql/sql_server.h +++ b/cpp/src/arrow/flight/flight-sql/sql_server.h @@ -39,6 +39,10 @@ class FlightSqlServerBase : public FlightServerBase { Status DoGet(const ServerCallContext& context, const Ticket& request, std::unique_ptr* stream) override; + Status DoPut(const ServerCallContext& context, + std::unique_ptr reader, + std::unique_ptr writer) override; + /// \brief Gets a FlightInfo for executing a SQL query. /// \param[in] command The CommandStatementQuery object containing the SQL /// statement. @@ -257,6 +261,11 @@ class FlightSqlServerBase : public FlightServerBase { virtual Status DoGetImportedKeys(const pb::sql::CommandGetImportedKeys& command, const ServerCallContext& context, std::unique_ptr* result); + + virtual Status DoPutCommandStatementUpdate( + const pb::sql::CommandStatementUpdate& command, const ServerCallContext& context, + std::unique_ptr& reader, + std::unique_ptr& writer); }; /// \brief Auxiliary class containing all Schemas used on Flight SQL. diff --git a/cpp/src/arrow/flight/flight-sql/sql_server_test.cc b/cpp/src/arrow/flight/flight-sql/sql_server_test.cc index 2bafa101d2406..c8cc94ac7c143 100644 --- a/cpp/src/arrow/flight/flight-sql/sql_server_test.cc +++ b/cpp/src/arrow/flight/flight-sql/sql_server_test.cc @@ -285,6 +285,27 @@ TEST(TestFlightSqlServer, TestCommandGetSchemas) { ASSERT_EQ(0, table->num_rows()); } +TEST(TestFlightSqlServer, TestCommandStatementUpdate) { + int64_t result; + ASSERT_OK(sql_client->ExecuteUpdate( + {}, + "INSERT INTO intTable (keyName, value) VALUES " + "('KEYNAME1', 1001), ('KEYNAME2', 1002), ('KEYNAME3', 1003)", + &result)); + ASSERT_EQ(3, result); + + ASSERT_OK( + sql_client->ExecuteUpdate({}, + "UPDATE intTable SET keyName = 'KEYNAME1' " + "WHERE keyName = 'KEYNAME2' OR keyName = 'KEYNAME3'", + &result)); + ASSERT_EQ(2, result); + + ASSERT_OK(sql_client->ExecuteUpdate( + {}, "DELETE FROM intTable WHERE keyName = 'KEYNAME1'", &result)); + ASSERT_EQ(3, result); +} + auto env = ::testing::AddGlobalTestEnvironment(new TestFlightSqlServer); From fef63b70d5a39aad566abf99ec292884055d3773 Mon Sep 17 00:00:00 2001 From: Rafael Telles Date: Thu, 7 Oct 2021 17:49:05 -0300 Subject: [PATCH 2/2] Add missing docs for DoPutCommandStatementUpdate --- cpp/src/arrow/flight/flight-sql/sql_server.h | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cpp/src/arrow/flight/flight-sql/sql_server.h b/cpp/src/arrow/flight/flight-sql/sql_server.h index 4fa86fe68b4c0..430fe7587ecf8 100644 --- a/cpp/src/arrow/flight/flight-sql/sql_server.h +++ b/cpp/src/arrow/flight/flight-sql/sql_server.h @@ -262,6 +262,12 @@ class FlightSqlServerBase : public FlightServerBase { const ServerCallContext& context, std::unique_ptr* result); + /// \brief Executes an update SQL statement. + /// \param[in] command The CommandStatementUpdate object containing the SQL statement. + /// \param[in] context The call context. + /// \param[in] reader a sequence of uploaded record batches. + /// \param[in] writer send metadata back to the client. + /// \return Status. virtual Status DoPutCommandStatementUpdate( const pb::sql::CommandStatementUpdate& command, const ServerCallContext& context, std::unique_ptr& reader,