diff --git a/c_glib/arrow-glib/field.cpp b/c_glib/arrow-glib/field.cpp index 1bb2dd181288d..526f9a6773b3c 100644 --- a/c_glib/arrow-glib/field.cpp +++ b/c_glib/arrow-glib/field.cpp @@ -160,7 +160,7 @@ garrow_field_import(gpointer c_abi_schema, GError **error) /** * garrow_field_new: * @name: The name of the field. - * @data_type: The data type of the field. + * @data_type: (transfer full): The data type of the field. * * Returns: A newly created #GArrowField. */ diff --git a/c_glib/example/meson.build b/c_glib/example/meson.build index e43f1553dd8cf..e2d55d4788ab5 100644 --- a/c_glib/example/meson.build +++ b/c_glib/example/meson.build @@ -29,12 +29,20 @@ executable('read-file', 'read-file.c', executable('read-stream', 'read-stream.c', dependencies: [arrow_glib], link_language: 'c') +executable('receive-network', 'receive-network.c', + dependencies: [arrow_glib], + link_language: 'c') +executable('send-network', 'send-network.c', + dependencies: [arrow_glib], + link_language: 'c') install_data('README.md', 'build.c', 'extension-type.c', 'read-file.c', 'read-stream.c', + 'receive-network.c', + 'send-network.c', install_dir: join_paths(data_dir, meson.project_name(), 'example')) subdir('lua') diff --git a/c_glib/example/receive-network.c b/c_glib/example/receive-network.c new file mode 100644 index 0000000000000..aa7aaa0140375 --- /dev/null +++ b/c_glib/example/receive-network.c @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include + +#ifdef G_OS_UNIX +# include +# include +#endif + +static void +service_event(GSocketListener *listener, + GSocketListenerEvent event, + GSocket *socket, + gpointer user_data) +{ + if (event != G_SOCKET_LISTENER_BOUND) { + return; + } + + GError *error = NULL; + GSocketAddress* local_address = g_socket_get_local_address(socket, &error); + if (!local_address) { + g_print("failed to get local address: %s\n", error->message); + g_error_free(error); + g_object_unref(socket); + return; + } + gchar *local_address_string = + g_socket_connectable_to_string(G_SOCKET_CONNECTABLE(local_address)); + g_print("address: %s\n", local_address_string); + g_free(local_address_string); + g_object_unref(local_address); +} + +static void +print_array(GArrowArray *array) +{ + GArrowType value_type; + gint64 i, n; + + value_type = garrow_array_get_value_type(array); + + g_print("["); + n = garrow_array_get_length(array); + +#define ARRAY_CASE(type, Type, TYPE, format) \ + case GARROW_TYPE_ ## TYPE: \ + { \ + GArrow ## Type ## Array *real_array; \ + real_array = GARROW_ ## TYPE ## _ARRAY(array); \ + for (i = 0; i < n; i++) { \ + if (i > 0) { \ + g_print(", "); \ + } \ + g_print(format, \ + garrow_ ## type ## _array_get_value(real_array, i)); \ + } \ + } \ + break + + switch (value_type) { + ARRAY_CASE(uint8, UInt8, UINT8, "%hhu"); + ARRAY_CASE(uint16, UInt16, UINT16, "%" G_GUINT16_FORMAT); + ARRAY_CASE(uint32, UInt32, UINT32, "%" G_GUINT32_FORMAT); + ARRAY_CASE(uint64, UInt64, UINT64, "%" G_GUINT64_FORMAT); + ARRAY_CASE( int8, Int8, INT8, "%hhd"); + ARRAY_CASE( int16, Int16, INT16, "%" G_GINT16_FORMAT); + ARRAY_CASE( int32, Int32, INT32, "%" G_GINT32_FORMAT); + ARRAY_CASE( int64, Int64, INT64, "%" G_GINT64_FORMAT); + ARRAY_CASE( float, Float, FLOAT, "%g"); + ARRAY_CASE(double, Double, DOUBLE, "%g"); + default: + break; + } +#undef ARRAY_CASE + + g_print("]\n"); +} + +static void +print_record_batch(GArrowRecordBatch *record_batch) +{ + guint nth_column, n_columns; + + n_columns = garrow_record_batch_get_n_columns(record_batch); + for (nth_column = 0; nth_column < n_columns; nth_column++) { + GArrowArray *array; + + g_print("columns[%u](%s): ", + nth_column, + garrow_record_batch_get_column_name(record_batch, nth_column)); + array = garrow_record_batch_get_column_data(record_batch, nth_column); + print_array(array); + g_object_unref(array); + } +} + +static gboolean +service_incoming(GSocketService *service, + GSocketConnection *connection, + GObject *source_object, + gpointer user_data) +{ + GArrowGIOInputStream *input = + garrow_gio_input_stream_new( + g_io_stream_get_input_stream(G_IO_STREAM(connection))); + GError *error = NULL; + GArrowRecordBatchStreamReader *reader = + garrow_record_batch_stream_reader_new(GARROW_INPUT_STREAM(input), &error); + if (!reader) { + g_print("failed to create reader: %s\n", error->message); + g_error_free(error); + g_object_unref(input); + return FALSE; + } + + while (TRUE) { + GArrowRecordBatch *record_batch = + garrow_record_batch_reader_read_next(GARROW_RECORD_BATCH_READER(reader), + &error); + if (error) { + g_print("failed to read the next record batch: %s\n", error->message); + g_error_free(error); + g_object_unref(reader); + g_object_unref(input); + return EXIT_FAILURE; + } + + if (!record_batch) { + break; + } + + print_record_batch(record_batch); + g_object_unref(record_batch); + } + + g_object_unref(reader); + g_object_unref(input); + + return FALSE; +} + +#ifdef G_OS_UNIX +typedef struct { + GSocketService *service; + GMainLoop *loop; +} StopData; + +static gboolean +stop(gpointer user_data) +{ + StopData* data = user_data; + g_object_unref(data->service); + g_main_loop_quit(data->loop); + return G_SOURCE_REMOVE; +} +#endif + +int +main(int argc, char **argv) +{ + GSocketService *service = g_threaded_socket_service_new(-1); + g_signal_connect(service, "event", G_CALLBACK(service_event), NULL); + g_signal_connect(service, "incoming", G_CALLBACK(service_incoming), NULL); + + GError *error = NULL; + gboolean success = + g_socket_listener_add_any_inet_port(G_SOCKET_LISTENER(service), + NULL, + &error); + if (!success) { + g_print("failed to add a listen IP address: %s\n", error->message); + g_error_free(error); + return EXIT_FAILURE; + } + + g_socket_service_start(service); + + GMainLoop *loop = g_main_loop_new(NULL, FALSE); +#ifdef G_OS_UNIX + StopData data; + data.service = service; + data.loop = loop; + g_unix_signal_add(SIGINT, stop, &data); + g_unix_signal_add(SIGTERM, stop, &data); +#else + /* TODO: Implement graceful stop. */ +#endif + g_main_loop_run(loop); + g_main_loop_unref(loop); + + return EXIT_SUCCESS; +} diff --git a/c_glib/example/send-network.c b/c_glib/example/send-network.c new file mode 100644 index 0000000000000..d298c5a173d21 --- /dev/null +++ b/c_glib/example/send-network.c @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include + +static GArrowSchema * +build_schema(void) +{ + GList *fields = NULL; + GArrowBooleanDataType *boolean_data_type = garrow_boolean_data_type_new(); + fields = g_list_append(fields, + garrow_field_new("boolean", + GARROW_DATA_TYPE(boolean_data_type))); + GArrowInt32DataType *int32_data_type = garrow_int32_data_type_new(); + fields = g_list_append(fields, + garrow_field_new("int32", + GARROW_DATA_TYPE(int32_data_type))); + GArrowSchema *schema = garrow_schema_new(fields); + g_list_free_full(fields, g_object_unref); + + return schema; +} + +static GArrowRecordBatch * +build_record_batch(void) +{ + GArrowSchema *schema = build_schema(); + if (!schema) { + return NULL; + } + GError *error = NULL; + GArrowRecordBatchBuilder *builder = + garrow_record_batch_builder_new(schema, &error); + g_object_unref(schema); + if (!builder) { + g_print("failed to build record batch builder: %s\n", error->message); + g_error_free(error); + return NULL; + } + + const gint64 n_records = 3; + GArrowBooleanArrayBuilder *boolean_builder = + GARROW_BOOLEAN_ARRAY_BUILDER( + garrow_record_batch_builder_get_column_builder(builder, 0)); + gboolean boolean_values[] = {TRUE, TRUE, FALSE}; + gboolean boolean_is_valids[] = {TRUE, FALSE, TRUE}; + if (!garrow_boolean_array_builder_append_values(boolean_builder, + boolean_values, + n_records, + boolean_is_valids, + n_records, + &error)) { + g_print("failed to append boolean values: %s\n", error->message); + g_error_free(error); + g_object_unref(boolean_builder); + g_object_unref(builder); + return NULL; + } + + GArrowInt32ArrayBuilder *int32_builder = + GARROW_INT32_ARRAY_BUILDER( + garrow_record_batch_builder_get_column_builder(builder, 1)); + gint32 int32_values[] = {1, 11, 111}; + gint32 int32_is_valids[] = {FALSE, TRUE, TRUE}; + if (!garrow_int32_array_builder_append_values(int32_builder, + int32_values, + n_records, + int32_is_valids, + n_records, + &error)) { + g_print("failed to append int32 values: %s\n", error->message); + g_error_free(error); + g_object_unref(int32_builder); + g_object_unref(builder); + return NULL; + } + + GArrowRecordBatch *record_batch = + garrow_record_batch_builder_flush(builder, &error); + if (!record_batch) { + g_print("failed to build record batch: %s\n", error->message); + g_error_free(error); + g_object_unref(builder); + return NULL; + } + + g_object_unref(builder); + + return record_batch; +} + +int +main(int argc, char **argv) +{ + if (argc != 2) { + g_print("Usage: %s PORT\n", argv[0]); + g_print(" e.g.: %s 2929\n", argv[0]); + return EXIT_FAILURE; + } + + guint port = atoi(argv[1]); + + GSocketClient *client = g_socket_client_new(); + GSocketAddress *address = g_inet_socket_address_new_from_string("127.0.0.1", + port); + GError *error = NULL; + GSocketConnection *connection = + g_socket_client_connect(client, G_SOCKET_CONNECTABLE(address), NULL, &error); + if (!connection) { + g_print("failed to connect: %s\n", error->message); + g_error_free(error); + return EXIT_FAILURE; + } + + GArrowSchema *schema = build_schema(); + if (!schema) { + return EXIT_FAILURE; + } + GArrowGIOOutputStream *output = + garrow_gio_output_stream_new( + g_io_stream_get_output_stream(G_IO_STREAM(connection))); + GArrowRecordBatchStreamWriter *writer = + garrow_record_batch_stream_writer_new(GARROW_OUTPUT_STREAM(output), + schema, + &error); + g_object_unref(schema); + if (!writer) { + g_print("failed to create writer: %s\n", error->message); + g_error_free(error); + g_object_unref(output); + g_object_unref(connection); + g_object_unref(client); + return EXIT_FAILURE; + } + + gsize n_record_batches = 5; + gsize i; + for (i = 0; i < n_record_batches; i++) { + GArrowRecordBatch *record_batch = build_record_batch(); + if (!record_batch) { + g_object_unref(writer); + g_object_unref(output); + g_object_unref(connection); + g_object_unref(client); + return EXIT_FAILURE; + } + gboolean success = + garrow_record_batch_writer_write_record_batch( + GARROW_RECORD_BATCH_WRITER(writer), + record_batch, + &error); + g_object_unref(record_batch); + if (!success) { + g_print("failed to write record batch: %s\n", error->message); + g_error_free(error); + g_object_unref(output); + g_object_unref(connection); + g_object_unref(client); + return EXIT_FAILURE; + } + } + + g_object_unref(writer); + g_object_unref(output); + g_object_unref(connection); + g_object_unref(client); + + return EXIT_SUCCESS; +}