diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake index 3c58ba649c4dd..9dcb426f079fe 100644 --- a/cpp/cmake_modules/ThirdpartyToolchain.cmake +++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake @@ -4613,8 +4613,11 @@ macro(build_opentelemetry) set(_OPENTELEMETRY_LIBS common http_client_curl + logs + ostream_log_record_exporter ostream_span_exporter otlp_http_client + otlp_http_log_record_exporter otlp_http_exporter otlp_recordable proto @@ -4647,6 +4650,14 @@ macro(build_opentelemetry) set(_OPENTELEMETRY_STATIC_LIBRARY "${OPENTELEMETRY_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}opentelemetry_exporter_otlp_http${CMAKE_STATIC_LIBRARY_SUFFIX}" ) + elseif(_OPENTELEMETRY_LIB STREQUAL "otlp_http_log_record_exporter") + set(_OPENTELEMETRY_STATIC_LIBRARY + "${OPENTELEMETRY_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}opentelemetry_exporter_otlp_http_log${CMAKE_STATIC_LIBRARY_SUFFIX}" + ) + elseif(_OPENTELEMETRY_LIB STREQUAL "ostream_log_record_exporter") + set(_OPENTELEMETRY_STATIC_LIBRARY + "${OPENTELEMETRY_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}opentelemetry_exporter_ostream_logs${CMAKE_STATIC_LIBRARY_SUFFIX}" + ) else() set(_OPENTELEMETRY_STATIC_LIBRARY "${OPENTELEMETRY_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}opentelemetry_${_OPENTELEMETRY_LIB}${CMAKE_STATIC_LIBRARY_SUFFIX}" @@ -4681,9 +4692,16 @@ macro(build_opentelemetry) IMPORTED_LOCATION) list(APPEND OPENTELEMETRY_CMAKE_ARGS - -DWITH_OTLP=ON -DWITH_OTLP_HTTP=ON -DWITH_OTLP_GRPC=OFF + # Disabled because it seemed to cause linking errors. May be worth a closer look. + -DWITH_FUNC_TESTS=OFF + # These options are slated for removal in v1.14 and their features are deemed stable + # as of v1.13. However, setting their corresponding ENABLE_* macros in headers seems + # finicky - resulting in build failures or ABI-related runtime errors during HTTP + # client initialization. There may still be a solution, but we disable them for now. + -DWITH_OTLP_HTTP_SSL_PREVIEW=OFF + -DWITH_OTLP_HTTP_SSL_TLS_PREVIEW=OFF "-DProtobuf_INCLUDE_DIR=${OPENTELEMETRY_PROTOBUF_INCLUDE_DIR}" "-DProtobuf_LIBRARY=${OPENTELEMETRY_PROTOBUF_INCLUDE_DIR}" "-DProtobuf_PROTOC_EXECUTABLE=${OPENTELEMETRY_PROTOC_EXECUTABLE}") @@ -4757,19 +4775,25 @@ macro(build_opentelemetry) target_link_libraries(opentelemetry-cpp::resources INTERFACE opentelemetry-cpp::common) target_link_libraries(opentelemetry-cpp::trace INTERFACE opentelemetry-cpp::common opentelemetry-cpp::resources) + target_link_libraries(opentelemetry-cpp::logs INTERFACE opentelemetry-cpp::common + opentelemetry-cpp::resources) target_link_libraries(opentelemetry-cpp::http_client_curl - INTERFACE opentelemetry-cpp::ext CURL::libcurl) + INTERFACE opentelemetry-cpp::common opentelemetry-cpp::ext + CURL::libcurl) target_link_libraries(opentelemetry-cpp::proto INTERFACE ${ARROW_PROTOBUF_LIBPROTOBUF}) target_link_libraries(opentelemetry-cpp::otlp_recordable - INTERFACE opentelemetry-cpp::trace opentelemetry-cpp::resources - opentelemetry-cpp::proto) + INTERFACE opentelemetry-cpp::logs opentelemetry-cpp::trace + opentelemetry-cpp::resources opentelemetry-cpp::proto) target_link_libraries(opentelemetry-cpp::otlp_http_client - INTERFACE opentelemetry-cpp::sdk opentelemetry-cpp::proto + INTERFACE opentelemetry-cpp::common opentelemetry-cpp::proto opentelemetry-cpp::http_client_curl nlohmann_json::nlohmann_json) target_link_libraries(opentelemetry-cpp::otlp_http_exporter INTERFACE opentelemetry-cpp::otlp_recordable opentelemetry-cpp::otlp_http_client) + target_link_libraries(opentelemetry-cpp::otlp_http_log_record_exporter + INTERFACE opentelemetry-cpp::otlp_recordable + opentelemetry-cpp::otlp_http_client) foreach(_OPENTELEMETRY_LIB ${_OPENTELEMETRY_LIBS}) add_dependencies(opentelemetry-cpp::${_OPENTELEMETRY_LIB} opentelemetry_ep) @@ -4791,7 +4815,11 @@ if(ARROW_WITH_OPENTELEMETRY) set(opentelemetry-cpp_SOURCE "AUTO") resolve_dependency(opentelemetry-cpp) set(ARROW_OPENTELEMETRY_LIBS - opentelemetry-cpp::trace opentelemetry-cpp::ostream_span_exporter + opentelemetry-cpp::trace + opentelemetry-cpp::logs + opentelemetry-cpp::otlp_http_log_record_exporter + opentelemetry-cpp::ostream_log_record_exporter + opentelemetry-cpp::ostream_span_exporter opentelemetry-cpp::otlp_http_exporter) get_target_property(OPENTELEMETRY_INCLUDE_DIR opentelemetry-cpp::api INTERFACE_INCLUDE_DIRECTORIES) diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 5bcd4625b3b67..6dc8358f502f5 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -522,6 +522,7 @@ set(ARROW_UTIL_SRCS util/int_util.cc util/io_util.cc util/list_util.cc + util/logger.cc util/logging.cc util/key_value_metadata.cc util/memory.cc @@ -627,6 +628,17 @@ if(ARROW_WITH_ZSTD) endforeach() endif() +if(ARROW_WITH_OPENTELEMETRY) + arrow_add_object_library(ARROW_TELEMETRY telemetry/logging.cc) + + foreach(ARROW_TELEMETRY_TARGET ${ARROW_TELEMETRY_TARGETS}) + target_link_libraries(${ARROW_TELEMETRY_TARGET} PRIVATE ${ARROW_OPENTELEMETRY_LIBS}) + endforeach() +else() + set(ARROW_TELEMETRY_TARGET_SHARED) + set(ARROW_TELEMETRY_TARGET_STATIC) +endif() + set(ARROW_TESTING_SHARED_LINK_LIBS arrow_shared ${ARROW_GTEST_GTEST}) set(ARROW_TESTING_SHARED_PRIVATE_LINK_LIBS arrow::flatbuffers RapidJSON) set(ARROW_TESTING_STATIC_LINK_LIBS arrow::flatbuffers RapidJSON arrow_static @@ -1016,6 +1028,7 @@ add_arrow_lib(arrow ${ARROW_JSON_TARGET_SHARED} ${ARROW_MEMORY_POOL_TARGET_SHARED} ${ARROW_ORC_TARGET_SHARED} + ${ARROW_TELEMETRY_TARGET_SHARED} ${ARROW_UTIL_TARGET_SHARED} ${ARROW_VENDORED_TARGET_SHARED} ${ARROW_SHARED_PRIVATE_LINK_LIBS} @@ -1031,6 +1044,7 @@ add_arrow_lib(arrow ${ARROW_JSON_TARGET_STATIC} ${ARROW_MEMORY_POOL_TARGET_STATIC} ${ARROW_ORC_TARGET_STATIC} + ${ARROW_TELEMETRY_TARGET_STATIC} ${ARROW_UTIL_TARGET_STATIC} ${ARROW_VENDORED_TARGET_STATIC} ${ARROW_SYSTEM_LINK_LIBS} @@ -1260,6 +1274,10 @@ if(ARROW_SUBSTRAIT) add_subdirectory(engine) endif() +if(ARROW_WITH_OPENTELEMETRY) + add_subdirectory(telemetry) +endif() + if(ARROW_TENSORFLOW) add_subdirectory(adapters/tensorflow) endif() diff --git a/cpp/src/arrow/flight/CMakeLists.txt b/cpp/src/arrow/flight/CMakeLists.txt index 8eba89b8e78a6..43ac48b87678e 100644 --- a/cpp/src/arrow/flight/CMakeLists.txt +++ b/cpp/src/arrow/flight/CMakeLists.txt @@ -157,6 +157,10 @@ set(ARROW_FLIGHT_SRCS transport/grpc/util_internal.cc types.cc) +if(ARROW_WITH_OPENTELEMETRY) + list(APPEND ARROW_FLIGHT_SRCS otel_logging.cc) +endif() + if(MSVC) # Protobuf generated files trigger spurious warnings on MSVC. foreach(GENERATED_SOURCE "${CMAKE_CURRENT_BINARY_DIR}/Flight.pb.cc" diff --git a/cpp/src/arrow/flight/flight_test.cc b/cpp/src/arrow/flight/flight_test.cc index 55cc938870f85..e179f3406d65e 100644 --- a/cpp/src/arrow/flight/flight_test.cc +++ b/cpp/src/arrow/flight/flight_test.cc @@ -71,6 +71,7 @@ #ifdef ARROW_WITH_OPENTELEMETRY #include #include +#include #include #include #endif diff --git a/cpp/src/arrow/flight/otel_logging.cc b/cpp/src/arrow/flight/otel_logging.cc new file mode 100644 index 0000000000000..62c18238af430 --- /dev/null +++ b/cpp/src/arrow/flight/otel_logging.cc @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "arrow/flight/otel_logging.h" +#include "arrow/flight/otel_logging_internal.h" +#include "arrow/result.h" +#include "arrow/util/logger.h" +#include "arrow/util/logging.h" + +namespace arrow::flight { + +namespace { +constexpr std::string_view kGrpcClientName = "arrow-flight-grpc-client-otel"; +constexpr std::string_view kGrpcServerName = "arrow-flight-grpc-server-otel"; +constexpr std::string_view kSqlClientName = "arrow-flight-sql-client-otel"; +constexpr std::string_view kSqlServerName = "arrow-flight-sql-server-otel"; +} // namespace + +Status RegisterFlightOtelLoggers(const telemetry::OtelLoggingOptions& options) { + for (auto name : {kGrpcClientName, kGrpcServerName, kSqlClientName, kSqlServerName}) { + ARROW_ASSIGN_OR_RAISE(auto logger, + telemetry::OtelLoggerProvider::MakeLogger(name, options)); + DCHECK_NE(logger, nullptr); + ARROW_RETURN_NOT_OK(util::LoggerRegistry::RegisterLogger(name, std::move(logger))); + } + return Status::OK(); +} + +namespace internal { + +std::shared_ptr GetOtelGrpcClientLogger() { + return util::LoggerRegistry::GetLogger(kGrpcClientName); +} +std::shared_ptr GetOtelGrpcServerLogger() { + return util::LoggerRegistry::GetLogger(kGrpcServerName); +} +std::shared_ptr GetOtelSqlClientLogger() { + return util::LoggerRegistry::GetLogger(kSqlClientName); +} +std::shared_ptr GetOtelSqlServerLogger() { + return util::LoggerRegistry::GetLogger(kSqlServerName); +} + +} // namespace internal + +} // namespace arrow::flight diff --git a/cpp/src/arrow/flight/otel_logging.h b/cpp/src/arrow/flight/otel_logging.h new file mode 100644 index 0000000000000..9a91e5d99ce7d --- /dev/null +++ b/cpp/src/arrow/flight/otel_logging.h @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/util/config.h" + +#ifdef ARROW_WITH_OPENTELEMETRY +#include "arrow/status.h" +#include "arrow/telemetry/logging.h" +#include "arrow/util/macros.h" + +namespace arrow::flight { + +ARROW_EXPORT Status +RegisterFlightOtelLoggers(const telemetry::OtelLoggingOptions& options); + +} // namespace arrow::flight +#endif diff --git a/cpp/src/arrow/flight/otel_logging_internal.h b/cpp/src/arrow/flight/otel_logging_internal.h new file mode 100644 index 0000000000000..52602f0fe8aa5 --- /dev/null +++ b/cpp/src/arrow/flight/otel_logging_internal.h @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/util/config.h" + +#include "arrow/util/macros.h" +#ifdef ARROW_WITH_OPENTELEMETRY +#include "arrow/flight/otel_logging.h" +#include "arrow/util/logger.h" + +namespace arrow::flight::internal { + +ARROW_EXPORT std::shared_ptr GetOtelGrpcClientLogger(); +ARROW_EXPORT std::shared_ptr GetOtelGrpcServerLogger(); +ARROW_EXPORT std::shared_ptr GetOtelSqlClientLogger(); +ARROW_EXPORT std::shared_ptr GetOtelSqlServerLogger(); + +} // namespace arrow::flight::internal + +#define ARROW_FLIGHT_OTELLOG_CLIENT(LEVEL, ...) \ + ARROW_LOGGER_CALL(::arrow::flight::internal::GetOtelGrpcClientLogger(), LEVEL, \ + __VA_ARGS__) +#define ARROW_FLIGHT_OTELLOG_SERVER(LEVEL, ...) \ + ARROW_LOGGER_CALL(::arrow::flight::internal::GetOtelGrpcServerLogger(), LEVEL, \ + __VA_ARGS__) +#define ARROW_FLIGHT_OTELLOG_SQL_CLIENT(LEVEL, ...) \ + ARROW_LOGGER_CALL(::arrow::flight::internal::GetOtelSqlClientLogger(), LEVEL, \ + __VA_ARGS__) +#define ARROW_FLIGHT_OTELLOG_SQL_SERVER(LEVEL, ...) \ + ARROW_LOGGER_CALL(::arrow::flight::internal::GetOtelSqlServerLogger(), LEVEL, \ + __VA_ARGS__) + +#else + +#define ARROW_FLIGHT_OTELLOG_CLIENT(LEVEL, ...) ARROW_UNUSED(0) +#define ARROW_FLIGHT_OTELLOG_SERVER(LEVEL, ...) ARROW_UNUSED(0) +#define ARROW_FLIGHT_OTELLOG_SQL_CLIENT(LEVEL, ...) ARROW_UNUSED(0) +#define ARROW_FLIGHT_OTELLOG_SQL_SERVER(LEVEL, ...) ARROW_UNUSED(0) + +#endif diff --git a/cpp/src/arrow/flight/server_tracing_middleware.cc b/cpp/src/arrow/flight/server_tracing_middleware.cc index b5326d88a43eb..02520cb66fd0e 100644 --- a/cpp/src/arrow/flight/server_tracing_middleware.cc +++ b/cpp/src/arrow/flight/server_tracing_middleware.cc @@ -137,7 +137,7 @@ class TracingServerMiddlewareFactory : public ServerMiddlewareFactory { options.kind = otel::trace::SpanKind::kServer; options.parent = otel::trace::GetSpan(new_otel_context)->GetContext(); - auto* tracer = arrow::internal::tracing::GetTracer(); + auto tracer = otel::trace::Provider::GetTracerProvider()->GetTracer("arrow"); auto method_name = ToString(info.method); auto span = tracer->StartSpan( method_name, diff --git a/cpp/src/arrow/flight/sql/client.cc b/cpp/src/arrow/flight/sql/client.cc index 86fd4868bad2d..c64b72c47067a 100644 --- a/cpp/src/arrow/flight/sql/client.cc +++ b/cpp/src/arrow/flight/sql/client.cc @@ -24,6 +24,7 @@ #include #include "arrow/buffer.h" +#include "arrow/flight/otel_logging_internal.h" #include "arrow/flight/sql/protocol_internal.h" #include "arrow/flight/types.h" #include "arrow/io/memory.h" @@ -63,6 +64,7 @@ arrow::Result GetFlightDescriptorForCommand( arrow::Result> GetFlightInfoForCommand( FlightSqlClient* client, const FlightCallOptions& options, const google::protobuf::Message& command) { + ARROW_FLIGHT_OTELLOG_SQL_CLIENT(INFO, "[Example message] func=", __func__); ARROW_ASSIGN_OR_RAISE(FlightDescriptor descriptor, GetFlightDescriptorForCommand(command)); return client->GetFlightInfo(options, descriptor); diff --git a/cpp/src/arrow/flight/sql/server.cc b/cpp/src/arrow/flight/sql/server.cc index 63d1f5c5225fa..41cfbd33c5554 100644 --- a/cpp/src/arrow/flight/sql/server.cc +++ b/cpp/src/arrow/flight/sql/server.cc @@ -26,11 +26,13 @@ #include "arrow/buffer.h" #include "arrow/builder.h" +#include "arrow/flight/otel_logging_internal.h" #include "arrow/flight/serialization_internal.h" #include "arrow/flight/sql/protocol_internal.h" #include "arrow/flight/sql/sql_info_internal.h" #include "arrow/type.h" #include "arrow/util/checked_cast.h" +#include "arrow/util/logger.h" #define PROPERTY_TO_OPTIONAL(COMMAND, PROPERTY) \ COMMAND.has_##PROPERTY() ? std::make_optional(COMMAND.PROPERTY()) : std::nullopt @@ -576,6 +578,7 @@ arrow::Result CreateStatementQueryTicket( Status FlightSqlServerBase::GetFlightInfo(const ServerCallContext& context, const FlightDescriptor& request, std::unique_ptr* info) { + ARROW_FLIGHT_OTELLOG_SQL_SERVER(INFO, "[Example message] func=", __func__); google::protobuf::Any any; if (!any.ParseFromArray(request.cmd.data(), static_cast(request.cmd.size()))) { return Status::Invalid("Unable to parse command"); diff --git a/cpp/src/arrow/flight/sql/test_app_cli.cc b/cpp/src/arrow/flight/sql/test_app_cli.cc index 66a6b6f5aa4e5..194ecf5e57808 100644 --- a/cpp/src/arrow/flight/sql/test_app_cli.cc +++ b/cpp/src/arrow/flight/sql/test_app_cli.cc @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include "arrow/util/config.h" + #include #define BOOST_NO_CXX98_FUNCTION_BASE // ARROW-17805 #include @@ -25,18 +27,33 @@ #include "arrow/array/builder_binary.h" #include "arrow/array/builder_primitive.h" #include "arrow/flight/api.h" +#include "arrow/flight/client_tracing_middleware.h" #include "arrow/flight/sql/api.h" #include "arrow/io/memory.h" #include "arrow/pretty_print.h" #include "arrow/status.h" #include "arrow/table.h" +#ifdef ARROW_WITH_OPENTELEMETRY +#include "arrow/flight/otel_logging.h" +#include "arrow/util/tracing_internal.h" + +#include +#include +#include +#include +#include +#include +#include +#endif + using arrow::Result; using arrow::Schema; using arrow::Status; using arrow::flight::ClientAuthHandler; using arrow::flight::FlightCallOptions; using arrow::flight::FlightClient; +using arrow::flight::FlightClientOptions; using arrow::flight::FlightDescriptor; using arrow::flight::FlightEndpoint; using arrow::flight::FlightInfo; @@ -58,6 +75,41 @@ DEFINE_string(catalog, "", "Catalog"); DEFINE_string(schema, "", "Schema"); DEFINE_string(table, "", "Table"); +#ifdef ARROW_WITH_OPENTELEMETRY +class OtelScope { + public: + explicit OtelScope(opentelemetry::trace::Scope scope) : scope_(std::move(scope)) {} + + static Result> Make() { + // Implicitly sets up TracerProvider + auto tracer = arrow::internal::tracing::GetTracer(); + + opentelemetry::context::propagation::GlobalTextMapPropagator::SetGlobalPropagator( + opentelemetry::nostd::shared_ptr< + opentelemetry::context::propagation::TextMapPropagator>( + new opentelemetry::trace::propagation::HttpTraceContext())); + + ARROW_RETURN_NOT_OK(arrow::telemetry::internal::InitializeOtelLoggerProvider()); + + auto logging_options = arrow::telemetry::OtelLoggingOptions::Defaults(); + logging_options.severity_threshold = arrow::telemetry::LogLevel::ARROW_TRACE; + // Flush after every log message + logging_options.flush_severity = arrow::telemetry::LogLevel::ARROW_TRACE; + ARROW_RETURN_NOT_OK(arrow::flight::RegisterFlightOtelLoggers(logging_options)); + + opentelemetry::trace::StartSpanOptions span_options; + span_options.kind = opentelemetry::trace::SpanKind::kClient; + auto span = tracer->StartSpan("flight-sql-test-app", span_options); + auto scope = tracer->WithActiveSpan(span); + + return std::make_unique(std::move(scope)); + } + + private: + opentelemetry::trace::Scope scope_; +}; +#endif + Status PrintResultsForEndpoint(FlightSqlClient& client, const FlightCallOptions& call_options, const FlightEndpoint& endpoint) { @@ -101,8 +153,15 @@ Status PrintResults(FlightSqlClient& client, const FlightCallOptions& call_optio } Status RunMain() { +#ifdef ARROW_WITH_OPENTELEMETRY + ARROW_ASSIGN_OR_RAISE(auto otel_scope, OtelScope::Make()); +#endif + ARROW_ASSIGN_OR_RAISE(auto location, Location::ForGrpcTcp(FLAGS_host, FLAGS_port)); - ARROW_ASSIGN_OR_RAISE(auto client, FlightClient::Connect(location)); + auto client_options = FlightClientOptions::Defaults(); + client_options.middleware.push_back( + arrow::flight::MakeTracingClientMiddlewareFactory()); + ARROW_ASSIGN_OR_RAISE(auto client, FlightClient::Connect(location, client_options)); FlightCallOptions call_options; diff --git a/cpp/src/arrow/flight/sql/test_server_cli.cc b/cpp/src/arrow/flight/sql/test_server_cli.cc index 8f5a78ce268f5..a8124140497c6 100644 --- a/cpp/src/arrow/flight/sql/test_server_cli.cc +++ b/cpp/src/arrow/flight/sql/test_server_cli.cc @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include "arrow/util/config.h" + #include #include @@ -23,16 +25,54 @@ #include #include "arrow/flight/server.h" +#include "arrow/flight/server_tracing_middleware.h" #include "arrow/flight/sql/example/sqlite_server.h" #include "arrow/io/test_common.h" #include "arrow/util/logging.h" +#ifdef ARROW_WITH_OPENTELEMETRY +#include "arrow/flight/otel_logging.h" +#include "arrow/util/tracing_internal.h" + +#include +#include +#include +#endif + DEFINE_int32(port, 31337, "Server port to listen on"); +#ifdef ARROW_WITH_OPENTELEMETRY +arrow::Status SetupOTel() { + auto tracer = arrow::internal::tracing::GetTracer(); + ARROW_UNUSED(tracer); + + opentelemetry::context::propagation::GlobalTextMapPropagator::SetGlobalPropagator( + opentelemetry::nostd::shared_ptr< + opentelemetry::context::propagation::TextMapPropagator>( + new opentelemetry::trace::propagation::HttpTraceContext())); + + ARROW_RETURN_NOT_OK(arrow::telemetry::internal::InitializeOtelLoggerProvider()); + + auto logging_options = arrow::telemetry::OtelLoggingOptions::Defaults(); + logging_options.severity_threshold = arrow::telemetry::LogLevel::ARROW_TRACE; + // Flush after every log message + logging_options.flush_severity = arrow::telemetry::LogLevel::ARROW_TRACE; + ARROW_RETURN_NOT_OK(arrow::flight::RegisterFlightOtelLoggers(logging_options)); + + return arrow::Status::OK(); +} +#endif + arrow::Status RunMain() { +#ifdef ARROW_WITH_OPENTELEMETRY + ARROW_RETURN_NOT_OK(SetupOTel()); +#endif + ARROW_ASSIGN_OR_RAISE(auto location, arrow::flight::Location::ForGrpcTcp("0.0.0.0", FLAGS_port)); arrow::flight::FlightServerOptions options(location); + options.middleware.emplace_back("tracing", + arrow::flight::MakeTracingServerMiddlewareFactory()); std::shared_ptr server; ARROW_ASSIGN_OR_RAISE(server, diff --git a/cpp/src/arrow/flight/transport/grpc/grpc_client.cc b/cpp/src/arrow/flight/transport/grpc/grpc_client.cc index f799ba761c40d..0bfb291d400a3 100644 --- a/cpp/src/arrow/flight/transport/grpc/grpc_client.cc +++ b/cpp/src/arrow/flight/transport/grpc/grpc_client.cc @@ -50,6 +50,7 @@ #include "arrow/flight/client_middleware.h" #include "arrow/flight/cookie_internal.h" #include "arrow/flight/middleware.h" +#include "arrow/flight/otel_logging_internal.h" #include "arrow/flight/serialization_internal.h" #include "arrow/flight/transport.h" #include "arrow/flight/transport/grpc/serialization_internal.h" @@ -928,6 +929,7 @@ class GrpcClientImpl : public internal::ClientTransport { Status GetFlightInfo(const FlightCallOptions& options, const FlightDescriptor& descriptor, std::unique_ptr* info) override { + ARROW_FLIGHT_OTELLOG_CLIENT(INFO, "[Example message] func=", __func__); pb::FlightDescriptor pb_descriptor; pb::FlightInfo pb_response; diff --git a/cpp/src/arrow/flight/transport/grpc/grpc_server.cc b/cpp/src/arrow/flight/transport/grpc/grpc_server.cc index 28fc736aa0088..88ffa89f72858 100644 --- a/cpp/src/arrow/flight/transport/grpc/grpc_server.cc +++ b/cpp/src/arrow/flight/transport/grpc/grpc_server.cc @@ -28,6 +28,7 @@ #include #include "arrow/buffer.h" +#include "arrow/flight/otel_logging_internal.h" #include "arrow/flight/serialization_internal.h" #include "arrow/flight/server.h" #include "arrow/flight/server_middleware.h" @@ -36,6 +37,7 @@ #include "arrow/flight/transport/grpc/util_internal.h" #include "arrow/flight/transport_server.h" #include "arrow/flight/types.h" +#include "arrow/util/logger.h" #include "arrow/util/logging.h" #include "arrow/util/uri.h" @@ -402,6 +404,7 @@ class GrpcServiceHandler final : public FlightService::Service { ::grpc::Status GetFlightInfo(ServerContext* context, const pb::FlightDescriptor* request, pb::FlightInfo* response) { + ARROW_FLIGHT_OTELLOG_SERVER(INFO, "[Example message] func=", __func__); GrpcServerCallContext flight_context(context); GRPC_RETURN_NOT_GRPC_OK( CheckAuth(FlightMethod::GetFlightInfo, context, flight_context)); diff --git a/cpp/src/arrow/telemetry/CMakeLists.txt b/cpp/src/arrow/telemetry/CMakeLists.txt new file mode 100644 index 0000000000000..2b9720d6d899a --- /dev/null +++ b/cpp/src/arrow/telemetry/CMakeLists.txt @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# +# arrow_telemetry +# + +# Headers: top level +arrow_install_all_headers("arrow/telemetry") + +set(ARROW_TELEMETRY_TEST_LINK_LIBS ${ARROW_OPENTELEMETRY_LIBS}) + +add_arrow_test(telemetry_test + SOURCES + telemetry_test.cc + EXTRA_LINK_LIBS + ${ARROW_TELEMETRY_TEST_LINK_LIBS}) diff --git a/cpp/src/arrow/telemetry/logging.cc b/cpp/src/arrow/telemetry/logging.cc new file mode 100644 index 0000000000000..7e9a69afedbb5 --- /dev/null +++ b/cpp/src/arrow/telemetry/logging.cc @@ -0,0 +1,293 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/telemetry/logging.h" +#include "arrow/telemetry/util_internal.h" +#include "arrow/util/io_util.h" +#include "arrow/util/logging.h" + +#ifdef _MSC_VER +#pragma warning(push) +#pragma warning(disable : 4522) +#endif + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include + +#include +#ifdef _MSC_VER +#pragma warning(pop) +#endif + +namespace arrow { +namespace telemetry { + +namespace { + +using internal::OtelLogExporterOptions; + +constexpr const char kLoggingBackendEnvVar[] = "ARROW_LOGGING_BACKEND"; + +class OtlpOStreamLogRecordExporter final : public otel::sdk::logs::LogRecordExporter { + public: + explicit OtlpOStreamLogRecordExporter(std::ostream* sink) : sink_(sink) { + pb_json_options_.add_whitespace = false; + } + + otel::sdk::common::ExportResult Export( + const otel_span>& records) noexcept + override { + otel::proto::collector::logs::v1::ExportLogsServiceRequest request; + otel::exporter::otlp::OtlpRecordableUtils::PopulateRequest(records, &request); + + for (const auto& logs : request.resource_logs()) { + std::string out; + auto status = + google::protobuf::util::MessageToJsonString(logs, &out, pb_json_options_); + if (ARROW_PREDICT_FALSE(!status.ok())) { + return otel::sdk::common::ExportResult::kFailure; + } + (*sink_) << out << std::endl; + } + + return otel::sdk::common::ExportResult::kSuccess; + } + + bool ForceFlush(std::chrono::microseconds timeout) noexcept override { + return exporter_.ForceFlush(timeout); + } + + bool Shutdown(std::chrono::microseconds timeout) noexcept override { + return exporter_.Shutdown(timeout); + } + + std::unique_ptr MakeRecordable() noexcept override { + return exporter_.MakeRecordable(); + } + + private: + std::ostream* sink_; + otel::exporter::otlp::OtlpHttpLogRecordExporter exporter_; + google::protobuf::util::JsonPrintOptions pb_json_options_; +}; + +otel::logs::Severity ToOtelSeverity(LogLevel level) { + switch (level) { + case LogLevel::ARROW_TRACE: + return otel::logs::Severity::kTrace; + case LogLevel::ARROW_DEBUG: + return otel::logs::Severity::kDebug; + case LogLevel::ARROW_INFO: + return otel::logs::Severity::kInfo; + case LogLevel::ARROW_WARNING: + return otel::logs::Severity::kWarn; + case LogLevel::ARROW_ERROR: + return otel::logs::Severity::kError; + case LogLevel::ARROW_FATAL: + return otel::logs::Severity::kFatal; + } + return otel::logs::Severity::kInvalid; +} + +enum class ExporterKind { + OSTREAM, + OTLP_HTTP, + OTLP_OSTREAM, +}; + +std::unique_ptr MakeExporter( + ExporterKind exporter_kind, std::ostream* ostream = nullptr) { + switch (exporter_kind) { + case ExporterKind::OSTREAM: { + return std::make_unique(*ostream); + } break; + case ExporterKind::OTLP_HTTP: { + namespace otlp = otel::exporter::otlp; + otlp::OtlpHttpLogRecordExporterOptions options{}; + return std::make_unique(options); + } break; + case ExporterKind::OTLP_OSTREAM: { + return std::make_unique(ostream); + } break; + default: + break; + } + return nullptr; +} + +std::unique_ptr MakeExporterFromEnv( + const OtelLogExporterOptions& exporter_options) { + auto maybe_env_var = arrow::internal::GetEnvVar(kLoggingBackendEnvVar); + if (maybe_env_var.ok()) { + auto env_var = maybe_env_var.ValueOrDie(); + auto* default_ostream = + exporter_options.default_stream ? exporter_options.default_stream : &std::cerr; + if (env_var == "ostream") { + // TODO: Currently disabled as the log records returned by otel's ostream exporter + // don't maintain copies of their attributes, leading to lifetime issues. If/when + // this is addressed, we can enable it. See: + // https://github.com/open-telemetry/opentelemetry-cpp/issues/2402 +#if 0 + return MakeExporter(ExporterKind::OSTREAM, default_ostream); +#else + ARROW_LOG(WARNING) << "Requested unimplemented backend " << kLoggingBackendEnvVar + << "=" << env_var << ". Falling back to arrow_otlp_ostream"; + return MakeExporter(ExporterKind::OTLP_OSTREAM, default_ostream); +#endif + } else if (env_var == "otlp_http") { + return MakeExporter(ExporterKind::OTLP_HTTP); + } else if (env_var == "arrow_otlp_stdout") { + return MakeExporter(ExporterKind::OTLP_OSTREAM, &std::cout); + } else if (env_var == "arrow_otlp_stderr") { + return MakeExporter(ExporterKind::OTLP_OSTREAM, &std::cerr); + } else if (env_var == "arrow_otlp_ostream") { + return MakeExporter(ExporterKind::OTLP_OSTREAM, default_ostream); + } else if (!env_var.empty()) { + ARROW_LOG(WARNING) << "Requested unknown backend " << kLoggingBackendEnvVar << "=" + << env_var; + } + } + return nullptr; +} + +std::unique_ptr MakeLogRecordProcessor( + std::unique_ptr exporter) { + otel::sdk::logs::BatchLogRecordProcessorOptions options{}; + return std::make_unique(std::move(exporter), + options); +} + +otel_shared_ptr MakeLoggerProvider( + const OtelLogExporterOptions& exporter_options) { + auto exporter = MakeExporterFromEnv(exporter_options); + if (exporter) { + auto processor = MakeLogRecordProcessor(std::move(exporter)); + return otel_shared_ptr( + new otel::sdk::logs::LoggerProvider(std::move(processor))); + } + return otel_shared_ptr( + new otel::logs::NoopLoggerProvider{}); +} + +class OtelLoggerImpl : public OtelLogger { + public: + OtelLoggerImpl(OtelLoggingOptions options, + otel_shared_ptr ot_logger) + : logger_(ot_logger), options_(std::move(options)) {} + + void Log(const util::LogDetails& details) override { + if (details.severity < options_.severity_threshold) { + return; + } + + auto log = logger_->CreateLogRecord(); + if (log == nullptr) { + return; + } + + // We set the remaining attributes AFTER the custom attributes in AttributeHolder + // because, in the event of key collisions, these should take precedence. + log->SetTimestamp(otel::common::SystemTimestamp(details.timestamp)); + log->SetSeverity(ToOtelSeverity(details.severity)); + + auto span_ctx = otel::trace::Tracer::GetCurrentSpan()->GetContext(); + log->SetSpanId(span_ctx.span_id()); + log->SetTraceId(span_ctx.trace_id()); + log->SetTraceFlags(span_ctx.trace_flags()); + + log->SetBody(ToOtel(details.message)); + + logger_->EmitLogRecord(std::move(log)); + + if (details.severity >= options_.flush_severity) { + util::Logger::Flush(); + } + } + + bool Flush(std::chrono::microseconds timeout) override { + return OtelLoggerProvider::Flush(timeout); + } + + bool is_enabled() const override { return true; } + + util::ArrowLogLevel severity_threshold() const override { + return options_.severity_threshold; + } + + std::string_view name() const override { + otel_string_view s = logger_->GetName(); + return std::string_view(s.data(), s.length()); + } + + private: + otel_shared_ptr logger_; + OtelLoggingOptions options_; +}; + +} // namespace + +bool OtelLoggerProvider::Flush(std::chrono::microseconds timeout) { + auto provider = otel::logs::Provider::GetLoggerProvider(); + if (auto sdk_provider = + dynamic_cast(provider.get())) { + return sdk_provider->ForceFlush(timeout); + } + return false; +} + +Result> OtelLoggerProvider::MakeLogger( + std::string_view name, const OtelLoggingOptions& options) { + auto ot_logger = otel::logs::Provider::GetLoggerProvider()->GetLogger(ToOtel(name)); + return std::make_shared(options, std::move(ot_logger)); +} + +namespace internal { + +Status InitializeOtelLoggerProvider(const OtelLogExporterOptions& exporter_options) { + otel::logs::Provider::SetLoggerProvider(MakeLoggerProvider(exporter_options)); + return Status::OK(); +} + +bool ShutdownOtelLoggerProvider() { + auto provider = otel::logs::Provider::GetLoggerProvider(); + if (auto sdk_provider = + dynamic_cast(provider.get())) { + return sdk_provider->Shutdown(); + } + return false; +} + +} // namespace internal + +} // namespace telemetry +} // namespace arrow diff --git a/cpp/src/arrow/telemetry/logging.h b/cpp/src/arrow/telemetry/logging.h new file mode 100644 index 0000000000000..04b39e6c1985e --- /dev/null +++ b/cpp/src/arrow/telemetry/logging.h @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/logger.h" +#include "arrow/util/logging.h" +#include "arrow/util/macros.h" +#include "arrow/util/visibility.h" + +namespace arrow { +namespace telemetry { + +using LogLevel = util::ArrowLogLevel; + +struct OtelLoggingOptions { + /// \brief Minimum severity required to emit an OpenTelemetry log record + LogLevel severity_threshold = LogLevel::ARROW_INFO; + + /// \brief Minimum severity required to immediately attempt to flush pending log records + LogLevel flush_severity = LogLevel::ARROW_ERROR; + + static OtelLoggingOptions Defaults() { return OtelLoggingOptions{}; } +}; + +class ARROW_EXPORT OtelLogger : public util::Logger { + public: + virtual ~OtelLogger() = default; + + virtual std::string_view name() const = 0; +}; + +/// \brief A wrapper interface for `opentelemetry::logs::Provider` +/// \details Application authors will typically want to set the global OpenTelemetry +/// logger provider themselves after configuring an exporter, processor, resource etc. +/// This API will then defer to the provider returned by +/// `opentelemetry::logs::Provider::GetLoggerProvider` +class ARROW_EXPORT OtelLoggerProvider { + public: + /// \brief Attempt to flush the log record processor associated with the provider + /// \return `true` if the flush occured + static bool Flush(std::chrono::microseconds timeout = std::chrono::microseconds::max()); + + static Result> MakeLogger( + std::string_view name, + const OtelLoggingOptions& options = OtelLoggingOptions::Defaults()); +}; + +namespace internal { + +// These utilities are primarily intended for Arrow developers + +struct OtelLogExporterOptions { + /// \brief Default stream to use for the ostream/arrow_otlp_ostream log record exporters + /// \details If null, stderr will be used + std::ostream* default_stream = NULLPTR; + + static OtelLogExporterOptions Defaults() { return OtelLogExporterOptions{}; } +}; + +/// \brief Initialize the global OpenTelemetry logger provider with a default exporter +/// (based on the ARROW_LOGGING_BACKEND envvar) and batch processor +ARROW_EXPORT Status InitializeOtelLoggerProvider( + const OtelLogExporterOptions& exporter_options = OtelLogExporterOptions::Defaults()); + +/// \brief Attempt to shut down the global OpenTelemetry logger provider +/// \return `true` if shutdown was successful +ARROW_EXPORT bool ShutdownOtelLoggerProvider(); + +} // namespace internal + +} // namespace telemetry +} // namespace arrow diff --git a/cpp/src/arrow/telemetry/telemetry_test.cc b/cpp/src/arrow/telemetry/telemetry_test.cc new file mode 100644 index 0000000000000..d0bdd36755e57 --- /dev/null +++ b/cpp/src/arrow/telemetry/telemetry_test.cc @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "arrow/telemetry/logging.h" +#include "arrow/telemetry/util_internal.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/util.h" +#include "arrow/util/logging.h" +#include "arrow/util/tracing_internal.h" + +#include +#include +#include +#include +#include +#include + +namespace arrow { +namespace telemetry { + +class OtelEnvironment : public ::testing::Environment { + public: + static constexpr std::string_view kLoggerName = "arrow-telemetry-test"; + + void SetUp() override { + // Implicitly sets up span processors + tracer provider + auto tracer = arrow::internal::tracing::GetTracer(); + ARROW_UNUSED(tracer); + + otel::context::propagation::GlobalTextMapPropagator::SetGlobalPropagator( + otel::nostd::shared_ptr( + new otel::trace::propagation::HttpTraceContext())); + + ASSERT_OK(internal::InitializeOtelLoggerProvider()); + + auto logging_options = OtelLoggingOptions::Defaults(); + logging_options.severity_threshold = LogLevel::ARROW_TRACE; + logging_options.flush_severity = LogLevel::ARROW_TRACE; + ASSERT_OK_AND_ASSIGN(auto logger, + OtelLoggerProvider::MakeLogger(kLoggerName, logging_options)); + ASSERT_NE(logger, nullptr); + ASSERT_OK(util::LoggerRegistry::RegisterLogger(logger->name(), logger)); + } + + void TearDown() override { EXPECT_TRUE(internal::ShutdownOtelLoggerProvider()); } +}; + +static ::testing::Environment* kOtelEnvironment = + ::testing::AddGlobalTestEnvironment(new OtelEnvironment); + +void Log(LogLevel severity, std::string_view message) { + auto logger = std::dynamic_pointer_cast( + util::LoggerRegistry::GetLogger(OtelEnvironment::kLoggerName)); + ASSERT_NE(logger, nullptr); + util::LogDetails details; + details.severity = severity; + details.message = message; + logger->Log(details); +} + +class TestLogging : public ::testing::Test { + public: + void SetUp() override { + tracer_ = arrow::internal::tracing::GetTracer(); + span_ = tracer_->StartSpan("test-logging"); + } + + otel::trace::Scope MakeScope() { return tracer_->WithActiveSpan(span_); } + + protected: + otel::trace::Tracer* tracer_; + otel::nostd::shared_ptr span_; +}; + +TEST_F(TestLogging, Basics) { + auto scope = MakeScope(); + Log(LogLevel::ARROW_ERROR, "foo bar"); + Log(LogLevel::ARROW_WARNING, "baz bal"); +} + +} // namespace telemetry +} // namespace arrow diff --git a/cpp/src/arrow/telemetry/util_internal.h b/cpp/src/arrow/telemetry/util_internal.h new file mode 100644 index 0000000000000..6dfe4dec62058 --- /dev/null +++ b/cpp/src/arrow/telemetry/util_internal.h @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include + +namespace arrow { +namespace telemetry { + +namespace otel = ::opentelemetry; + +template +using otel_shared_ptr = otel::nostd::shared_ptr; +template +using otel_span = otel::nostd::span; +using otel_string_view = otel::nostd::string_view; + +inline otel_string_view ToOtel(std::string_view in) { + return otel_string_view(in.data(), in.length()); +} + +} // namespace telemetry +} // namespace arrow diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index 087e4e3879e56..24a1c1177240d 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -64,6 +64,7 @@ add_arrow_test(utility-test ${IO_UTIL_TEST_SOURCES} iterator_test.cc list_util_test.cc + logger_test.cc logging_test.cc queue_test.cc range_test.cc diff --git a/cpp/src/arrow/util/logger.cc b/cpp/src/arrow/util/logger.cc new file mode 100644 index 0000000000000..35269710f899b --- /dev/null +++ b/cpp/src/arrow/util/logger.cc @@ -0,0 +1,198 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +#include "arrow/util/logger.h" + +namespace arrow { +namespace util { + +namespace { + +class NoopLogger : public Logger { + public: + void Log(const LogDetails&) override {} + bool is_enabled() const override { return false; } +}; + +class SimpleLogger : public Logger { + public: + SimpleLogger(ArrowLogLevel severity_threshold, std::ostream* sink) + : severity_threshold_(severity_threshold), sink_(sink) {} + + void Log(const LogDetails& details) override { + *sink_ << details.source_location.file << ":" << details.source_location.line << ": " + << std::string(details.message) << std::endl; + } + + bool Flush(std::chrono::microseconds) override { + sink_->flush(); + return true; + } + + bool is_enabled() const override { return true; } + + ArrowLogLevel severity_threshold() const override { return severity_threshold_; } + + private: + ArrowLogLevel severity_threshold_; + std::ostream* sink_; +}; + +using LoggerMap = std::unordered_map>; + +struct RegistryImpl { + LoggerMap loggers; + std::shared_ptr default_logger = MakeOStreamLogger(ArrowLogLevel::ARROW_INFO); + std::mutex mtx; +}; + +RegistryImpl* GetRegistry() { + static auto instance = std::make_unique(); + return instance.get(); +} + +} // namespace + +std::shared_ptr MakeOStreamLogger(ArrowLogLevel severity_threshold, + std::ostream& sink) { + return std::make_shared(severity_threshold, &sink); +} +std::shared_ptr MakeOStreamLogger(ArrowLogLevel severity_threshold) { + return MakeOStreamLogger(severity_threshold, std::cerr); +} + +Status LoggerRegistry::RegisterLogger(std::string_view name, + std::shared_ptr logger) { + DCHECK_NE(logger, nullptr); + auto registry = GetRegistry(); + + std::lock_guard lk(registry->mtx); + auto ret = registry->loggers.emplace(name, std::move(logger)); + ARROW_RETURN_IF(!ret.second, Status::Invalid("Logger with name \"", std::string(name), + "\" is already registered")); + return Status::OK(); +} + +void LoggerRegistry::UnregisterLogger(std::string_view name) { + auto registry = GetRegistry(); + std::lock_guard lk(registry->mtx); + registry->loggers.erase(std::string(name)); +} + +std::shared_ptr LoggerRegistry::GetLogger(std::string_view name) { + if (name.empty()) { + return GetDefaultLogger(); + } + + auto registry = GetRegistry(); + { + std::lock_guard lk(registry->mtx); + const auto& loggers = registry->loggers; + if (auto it = loggers.find(std::string(name)); it != loggers.end()) { + return it->second; + } + } + return std::make_shared(); +} + +std::shared_ptr LoggerRegistry::GetDefaultLogger() { + auto registry = GetRegistry(); + std::lock_guard lk(registry->mtx); + return registry->default_logger; +} + +void LoggerRegistry::SetDefaultLogger(std::shared_ptr logger) { + DCHECK_NE(logger, nullptr); + auto registry = GetRegistry(); + std::lock_guard lk(registry->mtx); + registry->default_logger = std::move(logger); +} + +class LogMessage::Impl { + public: + Impl(ArrowLogLevel severity, SourceLocation source_location) { + details.severity = severity; + details.source_location = source_location; + } + Impl(ArrowLogLevel severity, std::shared_ptr logger, + SourceLocation source_location) + : Impl(severity, source_location) { + logger_ = std::move(logger); + } + Impl(ArrowLogLevel severity, std::string logger_name, SourceLocation source_location) + : Impl(severity, source_location) { + this->logger_name = std::move(logger_name); + } + + ~Impl() { + auto* logger = GetResolvedLogger(); + + if (logger && logger->is_enabled() && + details.severity >= logger->severity_threshold()) { + auto str = stream.str(); + details.message = str; + logger->Log(details); + } + + // It's debatable whether this should be the underlying logger's responsibility + if (details.severity == ArrowLogLevel::ARROW_FATAL) { + if (logger) { + logger->Flush(); + } + std::abort(); + } + } + + Logger* GetResolvedLogger() { + if (!logger_) { + logger_ = LoggerRegistry::GetLogger(logger_name); + } + return logger_.get(); + } + + LogDetails details{}; + std::string logger_name; + std::stringstream stream; + + private: + std::shared_ptr logger_; +}; + +LogMessage::LogMessage(ArrowLogLevel severity, std::shared_ptr logger, + SourceLocation source_location) + : impl_(std::make_shared(severity, std::move(logger), source_location)) {} + +LogMessage::LogMessage(ArrowLogLevel severity, std::string_view logger_name, + SourceLocation source_location) + : impl_(std::make_shared(severity, std::string(logger_name), source_location)) { +} + +std::ostream& LogMessage::Stream() { return impl_->stream; } + +bool LogMessage::CheckIsEnabled() { + auto* logger = impl_->GetResolvedLogger(); + return (logger && logger->is_enabled() && + impl_->details.severity >= logger->severity_threshold()); +} + +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/logger.h b/cpp/src/arrow/util/logger.h new file mode 100644 index 0000000000000..5200503bb4fdb --- /dev/null +++ b/cpp/src/arrow/util/logger.h @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/logging.h" +#include "arrow/util/macros.h" +#include "arrow/util/visibility.h" + +namespace arrow { +namespace util { + +struct SourceLocation { + const char* file = ""; + int line = 0; +}; + +struct LogDetails { + ArrowLogLevel severity = ArrowLogLevel::ARROW_INFO; + std::chrono::system_clock::time_point timestamp = std::chrono::system_clock::now(); + SourceLocation source_location{}; + std::string_view message = ""; +}; + +/// \brief A base interface for custom loggers. +/// +/// Loggers can be added to the LoggerRegistry for global access or directly provided to +/// certain logging utilities. +class Logger { + public: + virtual ~Logger() = default; + + virtual void Log(const LogDetails& details) = 0; + + virtual bool Flush(std::chrono::microseconds timeout) { return true; } + bool Flush() { return this->Flush(std::chrono::microseconds::max()); } + + virtual bool is_enabled() const { return true; } + + virtual ArrowLogLevel severity_threshold() const { return ArrowLogLevel::ARROW_TRACE; } +}; + +/// \brief Creates a simple logger that redirects output to std::cerr +ARROW_EXPORT std::shared_ptr MakeOStreamLogger(ArrowLogLevel severity_threshold); +/// \brief Creates a simple logger that redirects output to the provided ostream +ARROW_EXPORT std::shared_ptr MakeOStreamLogger(ArrowLogLevel severity_threshold, + std::ostream& sink); + +class ARROW_EXPORT LoggerRegistry { + public: + /// \brief Add a logger to the registry with the associated name + /// + /// Returns Invalid if a logger with the provided name already exists. Users should call + /// `UnregisterLogger` first if they wish to overwrite it. + static Status RegisterLogger(std::string_view name, std::shared_ptr logger); + + /// \brief Remove a logger from the registry + static void UnregisterLogger(std::string_view name); + + /// \brief Return the logger associated with the provided name + /// + /// If `name` is empty, the default logger is returned. If `name` doesn't match any of + /// the registered loggers then a non-null noop logger is returned + static std::shared_ptr GetLogger(std::string_view name = ""); + + /// \brief Return the default logger + static std::shared_ptr GetDefaultLogger(); + /// \brief Set the default logger + static void SetDefaultLogger(std::shared_ptr logger); +}; + +/// \brief Represents a single log record to be emitted by an underlying logger +class ARROW_EXPORT LogMessage { + public: + /// \brief Construct a LogMessage with the provided underlying logger + LogMessage(ArrowLogLevel severity, std::shared_ptr logger, + SourceLocation source_location = {}); + /// \brief Construct a LogMessage with the provided logger name, which will be used to + /// find an underlying logger in the registry + LogMessage(ArrowLogLevel severity, std::string_view logger_name, + SourceLocation source_location = {}); + + std::ostream& Stream(); + + // Convenience method - mainly for use in ARROW_LOG_* macros. This prevents unnecessary + // argument evaluation when log statements are stripped in certain builds + template + LogMessage& Append(Args&&... args) { + if constexpr (sizeof...(Args) > 0) { + if (CheckIsEnabled()) { + (Stream() << ... << args); + } + } + return *this; + } + + private: + bool CheckIsEnabled(); + + class Impl; + std::shared_ptr impl_; +}; + +} // namespace util +} // namespace arrow + +// For the following macros, log statements with a lower severity than +// `ARROW_MINIMUM_LOG_LEVEL` will be stripped from the build +#ifndef ARROW_MINIMUM_LOG_LEVEL +#define ARROW_MINIMUM_LOG_LEVEL -1000 +#endif + +#define ARROW_LOGGER_INTERNAL(LOGGER, LEVEL) \ + (::arrow::util::LogMessage(::arrow::util::ArrowLogLevel::ARROW_##LEVEL, LOGGER, \ + ::arrow::util::SourceLocation{__FILE__, __LINE__})) + +static_assert(static_cast(::arrow::util::ArrowLogLevel::ARROW_TRACE) == -2); +#if ARROW_MINIMUM_LOG_LEVEL <= -2 +#define ARROW_LOGGER_TRACE(LOGGER, ...) \ + (ARROW_LOGGER_INTERNAL(LOGGER, TRACE).Append(__VA_ARGS__)) +#else +#define ARROW_LOGGER_TRACE(...) ARROW_UNUSED(0) +#endif + +static_assert(static_cast(::arrow::util::ArrowLogLevel::ARROW_DEBUG) == -1); +#if ARROW_MINIMUM_LOG_LEVEL <= -1 +#define ARROW_LOGGER_DEBUG(LOGGER, ...) \ + (ARROW_LOGGER_INTERNAL(LOGGER, DEBUG).Append(__VA_ARGS__)) +#else +#define ARROW_LOGGER_DEBUG(...) ARROW_UNUSED(0) +#endif + +static_assert(static_cast(::arrow::util::ArrowLogLevel::ARROW_INFO) == 0); +#if ARROW_MINIMUM_LOG_LEVEL <= 0 +#define ARROW_LOGGER_INFO(LOGGER, ...) \ + (ARROW_LOGGER_INTERNAL(LOGGER, INFO).Append(__VA_ARGS__)) +#else +#define ARROW_LOGGER_INFO(...) ARROW_UNUSED(0) +#endif + +static_assert(static_cast(::arrow::util::ArrowLogLevel::ARROW_WARNING) == 1); +#if ARROW_MINIMUM_LOG_LEVEL <= 1 +#define ARROW_LOGGER_WARNING(LOGGER, ...) \ + (ARROW_LOGGER_INTERNAL(LOGGER, WARNING).Append(__VA_ARGS__)) +#else +#define ARROW_LOGGER_WARNING(...) ARROW_UNUSED(0) +#endif + +static_assert(static_cast(::arrow::util::ArrowLogLevel::ARROW_ERROR) == 2); +#if ARROW_MINIMUM_LOG_LEVEL <= 2 +#define ARROW_LOGGER_ERROR(LOGGER, ...) \ + (ARROW_LOGGER_INTERNAL(LOGGER, ERROR).Append(__VA_ARGS__)) +#else +#define ARROW_LOGGER_ERROR(...) ARROW_UNUSED(0) +#endif + +static_assert(static_cast(::arrow::util::ArrowLogLevel::ARROW_FATAL) == 3); +#if ARROW_MINIMUM_LOG_LEVEL <= 3 +#define ARROW_LOGGER_FATAL(LOGGER, ...) \ + (ARROW_LOGGER_INTERNAL(LOGGER, FATAL).Append(__VA_ARGS__)) +#else +#define ARROW_LOGGER_FATAL(...) ARROW_UNUSED(0) +#endif + +#define ARROW_LOGGER_CALL(LOGGER, LEVEL, ...) ARROW_LOGGER_##LEVEL(LOGGER, __VA_ARGS__) diff --git a/cpp/src/arrow/util/logger_test.cc b/cpp/src/arrow/util/logger_test.cc new file mode 100644 index 0000000000000..0faea81a598fd --- /dev/null +++ b/cpp/src/arrow/util/logger_test.cc @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include + +#include "arrow/testing/gtest_util.h" +#include "arrow/util/logger.h" + +// Emit log via the default logger +#define DO_LOG(LEVEL, ...) ARROW_LOGGER_CALL("", LEVEL, __VA_ARGS__) + +namespace arrow { +namespace util { + +namespace { + +class MockLogger : public Logger { + public: + explicit MockLogger(ArrowLogLevel threshold = ArrowLogLevel::ARROW_TRACE) + : threshold(threshold) {} + void Log(const LogDetails& details) override { + if (callback) { + callback(details); + } + } + ArrowLogLevel severity_threshold() const override { return threshold; } + bool is_enabled() const override { return enabled; } + + ArrowLogLevel threshold; + bool enabled = true; + std::function callback; +}; + +struct OstreamableTracer { + mutable bool was_evaluated = false; + friend std::ostream& operator<<(std::ostream& os, const OstreamableTracer& tracer) { + tracer.was_evaluated = true; + return os; + } +}; + +} // namespace + +TEST(LoggerTest, Basics) { + // Basic tests using the default logger + DO_LOG(ERROR); + DO_LOG(ERROR, "foo"); + auto to_int = [](ArrowLogLevel lvl) { return static_cast(lvl); }; + DO_LOG(TRACE, "sev: ", to_int(ArrowLogLevel::ARROW_TRACE), ":TRACE"); + DO_LOG(DEBUG, "sev: ", to_int(ArrowLogLevel::ARROW_DEBUG), ":DEBUG"); + DO_LOG(INFO, "sev: ", to_int(ArrowLogLevel::ARROW_INFO), ":INFO"); + DO_LOG(WARNING, "sev: ", to_int(ArrowLogLevel::ARROW_WARNING), ":WARNING"); + DO_LOG(ERROR, "sev: ", to_int(ArrowLogLevel::ARROW_ERROR), ":ERROR"); + + { + auto logger = std::make_shared(ArrowLogLevel::ARROW_WARNING); + OstreamableTracer tracers[5]{}; + ARROW_LOGGER_CALL(logger, TRACE, "foo", tracers[0], "bar"); + ARROW_LOGGER_CALL(logger, DEBUG, "foo", tracers[1], "bar"); + ARROW_LOGGER_CALL(logger, INFO, "foo", tracers[2], "bar"); + ARROW_LOGGER_CALL(logger, WARNING, "foo", tracers[3], "bar"); + ARROW_LOGGER_CALL(logger, ERROR, "foo", tracers[4], "bar"); + + // If the message severity doesn't meet the logger's minimum severity, the LogMessage + // stream shouldn't be appended to + ASSERT_FALSE(tracers[0].was_evaluated); + ASSERT_FALSE(tracers[1].was_evaluated); + ASSERT_FALSE(tracers[2].was_evaluated); + ASSERT_TRUE(tracers[3].was_evaluated); + ASSERT_TRUE(tracers[4].was_evaluated); + } + + { + auto logger = std::make_shared(ArrowLogLevel::ARROW_WARNING); + logger->enabled = false; + OstreamableTracer tracer; + // If the underlying logger is disabled, the LogMessage stream shouldn't be appended + // to (regardless of severity) + ARROW_LOGGER_CALL(logger, WARNING, tracer); + ARROW_LOGGER_CALL(logger, ERROR, tracer); + ASSERT_FALSE(tracer.was_evaluated); + } +} + +TEST(LoggerTest, EmittedMessages) { + std::vector messages; + auto callback = [&messages](const LogDetails& details) { + messages.push_back(std::string(details.message)); + }; + + auto logger = std::make_shared(ArrowLogLevel::ARROW_TRACE); + logger->callback = callback; + for (int i = 0; i < 3; ++i) { + ARROW_LOGGER_CALL(logger, TRACE, "i=", i); + } + + ASSERT_EQ(messages.size(), 3); + EXPECT_EQ(messages[0], "i=0"); + EXPECT_EQ(messages[1], "i=1"); + EXPECT_EQ(messages[2], "i=2"); +} + +TEST(LoggerTest, Registry) { + std::string name = "test-logger"; + std::shared_ptr logger = std::make_shared(); + + ASSERT_OK(LoggerRegistry::RegisterLogger(name, logger)); + ASSERT_EQ(logger, LoggerRegistry::GetLogger(name)); + + // Error if the logger is already registered + ASSERT_RAISES(Invalid, LoggerRegistry::RegisterLogger(name, logger)); + + LoggerRegistry::UnregisterLogger(name); + ASSERT_OK(LoggerRegistry::RegisterLogger(name, logger)); + ASSERT_EQ(logger, LoggerRegistry::GetLogger(name)); + + LoggerRegistry::UnregisterLogger(name); +} + +#undef DO_LOG + +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/logging.cc b/cpp/src/arrow/util/logging.cc index 25c336a6d2111..ca4edcc5a5deb 100644 --- a/cpp/src/arrow/util/logging.cc +++ b/cpp/src/arrow/util/logging.cc @@ -118,6 +118,8 @@ static std::unique_ptr log_dir_; // Glog's severity map. static google::LogSeverity GetMappedSeverity(ArrowLogLevel severity) { switch (severity) { + case ArrowLogLevel::ARROW_TRACE: + return google::GLOG_INFO; case ArrowLogLevel::ARROW_DEBUG: return google::GLOG_INFO; case ArrowLogLevel::ARROW_INFO: diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h index 2baa560563bb4..2a2175ec0fc72 100644 --- a/cpp/src/arrow/util/logging.h +++ b/cpp/src/arrow/util/logging.h @@ -46,6 +46,7 @@ namespace arrow { namespace util { enum class ArrowLogLevel : int { + ARROW_TRACE = -2, ARROW_DEBUG = -1, ARROW_INFO = 0, ARROW_WARNING = 1, diff --git a/cpp/src/arrow/util/tracing_internal.cc b/cpp/src/arrow/util/tracing_internal.cc index 58668cab18bc6..f4f65ad1e6132 100644 --- a/cpp/src/arrow/util/tracing_internal.cc +++ b/cpp/src/arrow/util/tracing_internal.cc @@ -35,6 +35,7 @@ #include #include #include +#include #include #include #include diff --git a/cpp/thirdparty/versions.txt b/cpp/thirdparty/versions.txt index 06506d32bef7c..67275ffea5cf6 100644 --- a/cpp/thirdparty/versions.txt +++ b/cpp/thirdparty/versions.txt @@ -86,8 +86,8 @@ ARROW_MIMALLOC_BUILD_VERSION=v2.0.6 ARROW_MIMALLOC_BUILD_SHA256_CHECKSUM=9f05c94cc2b017ed13698834ac2a3567b6339a8bde27640df5a1581d49d05ce5 ARROW_NLOHMANN_JSON_BUILD_VERSION=v3.10.5 ARROW_NLOHMANN_JSON_BUILD_SHA256_CHECKSUM=5daca6ca216495edf89d167f808d1d03c4a4d929cef7da5e10f135ae1540c7e4 -ARROW_OPENTELEMETRY_BUILD_VERSION=v1.8.1 -ARROW_OPENTELEMETRY_BUILD_SHA256_CHECKSUM=3d640201594b07f08dade9cd1017bd0b59674daca26223b560b9bb6bf56264c2 +ARROW_OPENTELEMETRY_BUILD_VERSION=v1.13.0 +ARROW_OPENTELEMETRY_BUILD_SHA256_CHECKSUM=7735cc56507149686e6019e06f588317099d4522480be5f38a2a09ec69af1706 ARROW_OPENTELEMETRY_PROTO_BUILD_VERSION=v0.17.0 ARROW_OPENTELEMETRY_PROTO_BUILD_SHA256_CHECKSUM=f269fbcb30e17b03caa1decd231ce826e59d7651c0f71c3b28eb5140b4bb5412 ARROW_ORC_BUILD_VERSION=2.0.0