diff --git a/assemblies/plugins/dist/pom.xml b/assemblies/plugins/dist/pom.xml
index 3e036c6e08d..7b58c5cc65f 100644
--- a/assemblies/plugins/dist/pom.xml
+++ b/assemblies/plugins/dist/pom.xml
@@ -999,6 +999,19 @@
+
+ org.apache.hop
+ hop-assemblies-plugins-transforms-cratedbbulkloader
+ ${project.version}
+ zip
+
+
+ *
+ *
+
+
+
+
org.apache.hophop-assemblies-plugins-transforms-creditcardvalidator
diff --git a/assemblies/plugins/transforms/cratedbbulkloader/pom.xml b/assemblies/plugins/transforms/cratedbbulkloader/pom.xml
new file mode 100644
index 00000000000..836482ff4fa
--- /dev/null
+++ b/assemblies/plugins/transforms/cratedbbulkloader/pom.xml
@@ -0,0 +1,44 @@
+
+
+
+
+ 4.0.0
+
+ org.apache.hop
+ hop-assemblies-plugins-transforms
+ 2.9.0-SNAPSHOT
+
+
+ hop-assemblies-plugins-transforms-cratedbbulkloader
+ ${parent.version}
+ pom
+
+ Hop Assemblies Plugins Transforms CrateDB bulk loader
+
+
+
+
+ org.apache.hop
+ hop-transform-cratedbbulkloader
+ ${project.version}
+
+
+
+
diff --git a/assemblies/plugins/transforms/cratedbbulkloader/src/assembly/assembly.xml b/assemblies/plugins/transforms/cratedbbulkloader/src/assembly/assembly.xml
new file mode 100644
index 00000000000..e0ed410d341
--- /dev/null
+++ b/assemblies/plugins/transforms/cratedbbulkloader/src/assembly/assembly.xml
@@ -0,0 +1,49 @@
+
+
+
+ hop-assemblies-plugins-transforms-cratedbbulkloader
+
+ zip
+
+ transforms/cratedbbulkloader
+
+
+
+ .
+ true
+
+
+
+
+ lib
+
+ **/*
+
+
+
+
+
+ false
+
+ org.apache.hop:hop-transform-cratedbbulkloader:jar
+
+
+
+
diff --git a/assemblies/plugins/transforms/cratedbbulkloader/src/main/resources/version.xml b/assemblies/plugins/transforms/cratedbbulkloader/src/main/resources/version.xml
new file mode 100644
index 00000000000..36ab20e22eb
--- /dev/null
+++ b/assemblies/plugins/transforms/cratedbbulkloader/src/main/resources/version.xml
@@ -0,0 +1,20 @@
+
+
+
+${project.version}
diff --git a/assemblies/plugins/transforms/pom.xml b/assemblies/plugins/transforms/pom.xml
index 891eb41f78e..8fa187c1c8c 100644
--- a/assemblies/plugins/transforms/pom.xml
+++ b/assemblies/plugins/transforms/pom.xml
@@ -47,6 +47,7 @@
combinationlookupconcatfieldsconstant
+ cratedbbulkloadercreditcardvalidatorcubeinputcubeoutput
@@ -177,4 +178,4 @@
zipfile
-
\ No newline at end of file
+
diff --git a/docker/integration-tests/integration-tests-cratedb.yaml b/docker/integration-tests/integration-tests-cratedb.yaml
new file mode 100644
index 00000000000..d6044acd7c5
--- /dev/null
+++ b/docker/integration-tests/integration-tests-cratedb.yaml
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+services:
+ integration_test_database:
+ extends:
+ file: integration-tests-base.yaml
+ service: integration_test
+ depends_on:
+ cratedb:
+ condition: service_healthy
+ links:
+ - cratedb
+
+ cratedb:
+ image: crate:latest
+ ports:
+ - "4200:4200"
+ - "5432:5432"
+ healthcheck:
+ test: [ "CMD", "curl", "-f", "http://localhost:4200" ]
+ interval: 20s
+ timeout: 10s
+ retries: 6
+ start_period: 120s
+ volumes:
+ - ./resource/cratedb/crate.yml:/crate/config/crate.yml
diff --git a/docker/integration-tests/resource/cratedb/crate.yml b/docker/integration-tests/resource/cratedb/crate.yml
new file mode 100644
index 00000000000..ceef64391e8
--- /dev/null
+++ b/docker/integration-tests/resource/cratedb/crate.yml
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+auth:
+ host_based:
+ enabled: true
+ config:
+ 0:
+ user: crate
+ # address: _local_
+ method: trust
+ 99:
+ method: password
+
+network.host: _local_,_site_
+
+# Paths
+path:
+ logs: /data/log
+ data: /data/data
+blobs:
+ path: /data/blobs
diff --git a/integration-tests/cratedb/0001-copy-from-existing-file.hpl b/integration-tests/cratedb/0001-copy-from-existing-file.hpl
new file mode 100644
index 00000000000..a6fc0566621
--- /dev/null
+++ b/integration-tests/cratedb/0001-copy-from-existing-file.hpl
@@ -0,0 +1,282 @@
+
+
+
+
+ 0001-copy-from-existing-file
+ Y
+
+
+
+ Normal
+
+
+ N
+ 1000
+ 100
+ -
+ 2024/04/26 15:51:58.413
+ -
+ 2024/04/26 15:51:58.413
+
+
+
+
+
+ Read 100 rows dataset
+ CrateDB bulk loader
+ Y
+
+
+
+ CrateDB bulk loader
+ CrateDBBulkLoader
+
+ Y
+
+ 1
+
+ none
+
+
+ 10
+ cratedb-test
+ N
+
+
+ firstName
+ firstName
+
+
+ lastName
+ lastName
+
+
+ birthDate
+ birthDate
+
+
+ city
+ city
+
+
+ country
+ country
+
+
+ birthDateAndTime
+ birthDateAndTime
+
+
+ http://cratedb:4200/_sql
+ alice
+ Encrypted 2be98afc86aa7f2e4bb18bd63c99dbdde
+ N
+ doc
+ Y
+ Y
+ N
+
test
+ N
+ N
+ Y
+ N
+
+
+ 608
+ 176
+
+
+
+ Read 100 rows dataset
+ TextFileInput2
+
+ Y
+
+ 1
+
+ none
+
+
+ N
+ N
+
+
+ ,
+ "
+ N
+
+ N
+ 1
+
+ 1
+ N
+ 1
+ N
+ 80
+ 0
+ Y
+ N
+
+ N
+ N
+
+ Unix
+
+ Characters
+ N
+
+ ${PROJECT_HOME}/datasets/dataset-100.csv
+
+
+ N
+ N
+ CSV
+ None
+
+
+
+
+
+ firstName
+ String
+
+ $
+ .
+ ,
+ -
+
+ -1
+ 9
+ -1
+ none
+ N
+
+
+ lastName
+ String
+
+ $
+ .
+ ,
+ -
+
+ -1
+ 11
+ -1
+ none
+ N
+
+
+ birthDateAndTime
+ Date
+ yyyy-MM-dd HH:mm:ss.SSS
+ $
+ .
+ ,
+ -
+
+ -1
+ -1
+ -1
+ none
+ N
+
+
+ city
+ String
+
+ $
+ .
+ ,
+ -
+
+ -1
+ 17
+ -1
+ none
+ N
+
+
+ country
+ String
+
+ $
+ .
+ ,
+ -
+
+ -1
+ 37
+ -1
+ none
+ N
+
+
+ birthDate
+ Date
+ yyyy-MM-dd
+ $
+ .
+ ,
+ -
+
+ -1
+ -1
+ -1
+ none
+ N
+
+
+ 0
+ N
+ N
+
+
+ N
+
+
+
+
+
+ warning
+
+ error
+
+ line
+ N
+ en_US
+
+
+
+
+
+
+
+
+
+
+ 272
+ 176
+
+
+
+
+
+
diff --git a/integration-tests/cratedb/datasets/dataset-100.csv b/integration-tests/cratedb/datasets/dataset-100.csv
new file mode 100644
index 00000000000..8cf2bccbc06
--- /dev/null
+++ b/integration-tests/cratedb/datasets/dataset-100.csv
@@ -0,0 +1,100 @@
+Margie,Koss,1999-11-09 01:54:03.940,Clydeport,Martinique,1978-05-24
+Freddie,Marks,1970-09-29 13:24:26.739,Schaeferfort,Madagascar,1993-06-15
+Paul,McLaughlin,2001-11-10 13:57:54.338,Bayermouth,Liechtenstein,1977-09-16
+Mary,Langosh,1973-09-09 15:04:25.396,Austinstad,Slovakia (Slovak Republic),2004-07-19
+Andrea,Wunsch,2006-01-22 18:48:06.902,Lake Renettaburgh,Democratic People's Republic of Korea,1962-11-21
+Leeanne,Brekke,1974-08-30 19:28:02.225,Williamsonside,Pitcairn Islands,2000-10-09
+Laverne,Schinner,1974-09-30 11:39:23.292,North Shantay,Cape Verde,1983-09-21
+Sabrina,Hane,1998-02-20 17:44:50.627,Carlyhaven,Cote d'Ivoire,1983-10-17
+Lettie,Wintheiser,1962-11-25 06:07:49.147,Hermistonchester,Benin,1974-02-19
+Mica,Jacobs,1996-06-19 05:21:41.683,East Fermin,Turks and Caicos Islands,1978-05-22
+Brynn,Lueilwitz,1996-12-10 09:25:45.482,Feeneybury,Western Sahara,2005-11-20
+Stanton,Kirlin,1991-09-08 03:27:18.919,Port Ahmad,Romania,1999-05-09
+Olimpia,Parker,1965-08-09 09:22:32.117,Corkeryville,Gabon,1980-05-23
+Jude,Steuber,2006-03-09 04:29:42.253,Wendieport,Mongolia,2002-04-12
+Adelle,Willms,1971-11-06 22:32:14.376,Hodkiewiczstad,Pitcairn Islands,1983-12-19
+Reanna,O'Keefe,1983-04-25 03:19:08.235,Nelleshire,Congo,1990-06-12
+Les,Boyer,1986-06-08 22:53:55.766,Wehnerstad,Netherlands Antilles,1977-01-09
+Jonna,Ward,1995-07-06 11:12:24.554,Tommyside,Vanuatu,1970-12-03
+Stefani,Bernier,1971-11-22 07:54:33.695,Treenaview,Gibraltar,1970-03-14
+Maryetta,Cronin,1985-11-23 02:36:27.977,East Melissaburgh,Svalbard & Jan Mayen Islands,1984-03-11
+Sophie,Mosciski,1959-12-27 08:43:15.402,Lake Barrettland,Reunion,1984-10-20
+Oscar,Braun,1998-10-21 12:21:35.519,East Scottieside,Cameroon,1963-11-15
+Margorie,Huel,1981-02-17 16:24:44.769,New Ed,Netherlands Antilles,1963-02-20
+Lenora,Rutherford,1996-09-17 13:11:49.786,Reedbury,Micronesia,2003-05-11
+Tawana,Satterfield,1968-04-07 10:33:12.395,Noelbury,Northern Mariana Islands,1964-05-20
+Roni,Koelpin,1995-09-29 23:37:59.419,South Ericside,Serbia,1965-05-19
+Artie,Thiel,1978-04-08 15:54:37.369,West Demetrice,Grenada,1976-11-07
+Eliseo,Kshlerin,2003-10-29 00:31:20.869,Lurlenefurt,Venezuela,1966-06-11
+Alissa,Konopelski,1961-10-11 17:04:20.527,Brandeeport,Kazakhstan,1991-04-03
+Toi,Balistreri,1973-10-17 03:53:45.908,Greenfelderhaven,Saudi Arabia,1976-08-11
+Katherina,Yost,1979-05-02 09:22:52.437,West Ladonna,Macedonia,1998-10-23
+Harlan,Murazik,1987-03-30 21:49:58.228,New Tamicaburgh,Brunei Darussalam,2004-06-24
+Romaine,Weimann,1978-04-16 10:37:19.724,Port Dorismouth,Mozambique,1993-01-19
+Vanessa,Stanton,1966-08-23 09:19:34.869,Wehnerhaven,Slovenia,2002-04-10
+Clara,Wolff,1965-03-12 23:44:22.234,McDermottview,Spain,1969-08-17
+Carmel,Wisoky,1972-09-04 14:51:16.175,West Carenmouth,French Southern Territories,1999-02-24
+Jimmie,Torp,1985-05-10 06:24:54.840,East Winford,French Southern Territories,1991-06-05
+Livia,Renner,1973-07-22 04:09:16.854,Port Raymonport,Vietnam,1972-07-22
+Raul,Roberts,1987-08-22 07:05:23.376,Fernandehaven,Macedonia,2002-07-07
+Ariane,Cummings,1983-05-24 04:20:53.221,North Kitmouth,French Guiana,1969-09-04
+Janae,Farrell,1996-06-06 08:14:53.323,Ramirotown,Indonesia,2000-08-29
+Tena,Streich,1985-01-28 01:37:26.947,West Stephenview,Bhutan,1991-10-06
+Dalila,Miller,1963-12-21 09:36:45.291,Port Tanner,Andorra,1983-11-02
+Orval,Williamson,1995-10-14 09:40:32.944,Lake Jesusita,Bahamas,1998-02-19
+Loren,Bednar,1994-08-30 09:55:16.326,Leisaburgh,Slovenia,1967-06-13
+Delbert,Sanford,1977-09-28 21:31:15.998,Port Tobyview,Suriname,1997-02-18
+Bernadine,O'Reilly,1989-10-27 22:01:45.709,Dudleyport,Mauritius,1963-01-20
+Jc,Wuckert,2005-02-23 20:59:35.563,Shaneville,Jersey,1989-10-06
+Logan,Wolff,1977-05-01 09:10:12.470,Croninstad,Guinea-Bissau,1972-12-09
+Erinn,Grady,1987-06-14 00:55:33.162,East Olga,Spain,1969-01-29
+Phung,Will,1998-08-27 16:15:55.348,Nicolasville,Micronesia,1975-04-22
+Kareem,Kuvalis,1973-09-27 19:04:24.486,Loviefort,Cambodia,1995-06-27
+Thad,O'Keefe,1995-05-02 12:38:36.284,New Latonia,Palestinian Territory,1972-02-27
+Andy,Rath,1963-11-25 18:36:41.595,Port Christianaside,Cambodia,1968-10-22
+Jinny,Brakus,1963-05-08 05:47:09.106,New Gena,Lesotho,1966-06-06
+Celena,Goyette,1985-08-23 04:14:25.868,Cruickshankmouth,Equatorial Guinea,1977-05-25
+Tashina,Fisher,1975-08-05 19:53:56.902,Lake Vernon,Romania,1995-08-30
+Rhett,Bradtke,1991-03-06 00:27:07.047,East Erasmo,Pakistan,1976-06-29
+Myrtis,Prohaska,1990-01-18 10:06:09.268,Lake Arleenland,Afghanistan,1978-07-07
+Lamont,Nitzsche,1977-06-26 21:26:23.683,Marcoland,Marshall Islands,1967-09-02
+Alla,Roob,2004-09-20 01:38:54.963,New Dion,Bolivia,1996-07-31
+Lashawna,Collier,2000-12-06 09:18:12.864,Schimmelshire,South Africa,1987-08-18
+Art,Herzog,1996-08-12 18:32:24.749,West Tonyfort,Australia,1968-11-27
+Adam,Smitham,1995-07-18 01:57:39.994,Dareview,Bosnia and Herzegovina,1960-07-16
+Beatrice,Mayer,1966-12-02 04:10:46.476,Port Loren,Jordan,1996-06-14
+Gus,Cremin,1964-02-19 19:23:22.827,Alfredside,Niger,1960-08-19
+Micah,Kulas,1982-08-19 19:30:35.348,Janycemouth,Gabon,2004-08-29
+Avery,Walsh,1971-03-18 19:24:43.817,Chassidyton,Saint Martin,1974-08-20
+Rufus,Runte,2004-01-19 08:39:59.530,Randallburgh,Greenland,1986-12-09
+Curtis,Nitzsche,1964-08-08 18:56:20.513,Lake Rebecca,Mali,1988-04-16
+Dawn,Hansen,1968-09-06 20:41:25.891,North Romeo,Tajikistan,1967-11-08
+Chassidy,Lynch,1990-12-03 14:20:17.393,West Hassan,China,1997-01-13
+Laurena,Graham,1978-06-30 17:32:58.420,Gutmannview,French Polynesia,1987-07-16
+Sabine,Rippin,1985-10-09 05:10:56.815,North Chu,Equatorial Guinea,1999-08-24
+Shalonda,Purdy,2005-02-08 17:41:56.759,West Tamar,Equatorial Guinea,2005-03-29
+Trevor,Kling,1985-11-05 14:47:29.804,North Bellafort,Iceland,1974-07-25
+Warner,Hintz,1962-07-13 10:51:14.095,North Amos,Dominica,1985-03-16
+Leigh,Willms,1970-04-21 11:24:28.807,O'Konside,India,1978-01-11
+Charlyn,Schmidt,2004-05-31 06:25:42.785,Leland,Tokelau,1963-12-16
+Dessie,Paucek,1980-02-07 06:20:30.327,Bechtelarton,Malaysia,1984-04-01
+Janette,Jacobson,1992-10-09 11:00:18.605,Kumfurt,French Guiana,2005-07-03
+Young,Langworth,1997-04-14 00:32:38.622,North Geraldstad,Latvia,1980-04-09
+Sparkle,Waters,1964-08-04 12:03:55.542,Rathstad,Gambia,1984-11-27
+Yvonne,Koelpin,1993-01-25 10:09:01.958,South Ivorybury,British Indian Ocean Territory (Chagos Archipelago),1988-09-13
+Burt,Reynolds,1993-07-24 06:09:33.132,North Jonas,Jamaica,1962-04-29
+Chase,Murazik,1987-09-18 16:17:39.061,Wizaborough,Cyprus,1963-11-25
+Oliver,Ruecker,1979-09-03 02:01:21.674,West Nolan,Benin,1971-12-05
+Rolande,Funk,1971-08-04 10:08:40.002,New Wendellland,Venezuela,1988-05-01
+Floyd,Littel,2004-12-12 15:01:04.824,North Leroyton,Niger,1985-09-12
+Sung,Altenwerth,2004-10-20 17:46:24.509,McGlynnberg,British Indian Ocean Territory (Chagos Archipelago),1964-07-29
+Jacque,Torp,1985-06-07 11:19:35.069,Denesikshire,Mozambique,1988-10-05
+Kristyn,Murazik,2001-02-17 06:46:58.560,Brettmouth,Congo,1983-04-08
+Gerardo,Kris,2005-04-26 12:18:36.156,Lefflerside,Zimbabwe,1971-05-16
+Darron,O'Connell,1998-08-31 00:28:08.148,South Cary,Malta,1984-08-22
+Dominga,Dickinson,1993-05-28 17:01:25.087,East Paris,Samoa,1963-10-22
+Wilson,Nolan,1986-06-21 02:32:56.340,West Keenahaven,Norfolk Island,1999-03-16
+Morton,Koelpin,1988-04-14 19:16:46.464,Cummerataberg,Denmark,1965-04-17
+Sharan,Rice,1972-09-10 07:08:35.484,Renafurt,Somalia,1985-11-17
+Hershel,O'Kon,1983-02-27 22:00:08.139,Bethannmouth,Slovakia (Slovak Republic),1980-04-17
+Sophie,Ratke,1981-02-18 15:01:11.883,Leschfurt,Nigeria,1966-04-13
diff --git a/integration-tests/cratedb/dev-env-config.json b/integration-tests/cratedb/dev-env-config.json
new file mode 100644
index 00000000000..6367db79923
--- /dev/null
+++ b/integration-tests/cratedb/dev-env-config.json
@@ -0,0 +1,3 @@
+{
+ "variables" : []
+}
diff --git a/integration-tests/cratedb/hop-config.json b/integration-tests/cratedb/hop-config.json
new file mode 100644
index 00000000000..cc0d7486861
--- /dev/null
+++ b/integration-tests/cratedb/hop-config.json
@@ -0,0 +1,290 @@
+{
+ "variables": [
+ {
+ "name": "HOP_LENIENT_STRING_TO_NUMBER_CONVERSION",
+ "value": "N",
+ "description": "System wide flag to allow lenient string to number conversion for backward compatibility. If this setting is set to \"Y\", an string starting with digits will be converted successfully into a number. (example: 192.168.1.1 will be converted into 192 or 192.168 or 192168 depending on the decimal and grouping symbol). The default (N) will be to throw an error if non-numeric symbols are found in the string."
+ },
+ {
+ "name": "HOP_COMPATIBILITY_DB_IGNORE_TIMEZONE",
+ "value": "N",
+ "description": "System wide flag to ignore timezone while writing date/timestamp value to the database."
+ },
+ {
+ "name": "HOP_LOG_SIZE_LIMIT",
+ "value": "0",
+ "description": "The log size limit for all pipelines and workflows that don't have the \"log size limit\" property set in their respective properties."
+ },
+ {
+ "name": "HOP_EMPTY_STRING_DIFFERS_FROM_NULL",
+ "value": "N",
+ "description": "NULL vs Empty String. If this setting is set to Y, an empty string and null are different. Otherwise they are not."
+ },
+ {
+ "name": "HOP_MAX_LOG_SIZE_IN_LINES",
+ "value": "0",
+ "description": "The maximum number of log lines that are kept internally by Hop. Set to 0 to keep all rows (default)"
+ },
+ {
+ "name": "HOP_MAX_LOG_TIMEOUT_IN_MINUTES",
+ "value": "1440",
+ "description": "The maximum age (in minutes) of a log line while being kept internally by Hop. Set to 0 to keep all rows indefinitely (default)"
+ },
+ {
+ "name": "HOP_MAX_WORKFLOW_TRACKER_SIZE",
+ "value": "5000",
+ "description": "The maximum number of workflow trackers kept in memory"
+ },
+ {
+ "name": "HOP_MAX_ACTIONS_LOGGED",
+ "value": "5000",
+ "description": "The maximum number of action results kept in memory for logging purposes."
+ },
+ {
+ "name": "HOP_MAX_LOGGING_REGISTRY_SIZE",
+ "value": "10000",
+ "description": "The maximum number of logging registry entries kept in memory for logging purposes."
+ },
+ {
+ "name": "HOP_LOG_TAB_REFRESH_DELAY",
+ "value": "1000",
+ "description": "The hop log tab refresh delay."
+ },
+ {
+ "name": "HOP_LOG_TAB_REFRESH_PERIOD",
+ "value": "1000",
+ "description": "The hop log tab refresh period."
+ },
+ {
+ "name": "HOP_PLUGIN_CLASSES",
+ "value": null,
+ "description": "A comma delimited list of classes to scan for plugin annotations"
+ },
+ {
+ "name": "HOP_PLUGIN_PACKAGES",
+ "value": null,
+ "description": "A comma delimited list of packages to scan for plugin annotations (warning: slow!!)"
+ },
+ {
+ "name": "HOP_TRANSFORM_PERFORMANCE_SNAPSHOT_LIMIT",
+ "value": "0",
+ "description": "The maximum number of transform performance snapshots to keep in memory. Set to 0 to keep all snapshots indefinitely (default)"
+ },
+ {
+ "name": "HOP_ROWSET_GET_TIMEOUT",
+ "value": "50",
+ "description": "The name of the variable that optionally contains an alternative rowset get timeout (in ms). This only makes a difference for extremely short lived pipelines."
+ },
+ {
+ "name": "HOP_ROWSET_PUT_TIMEOUT",
+ "value": "50",
+ "description": "The name of the variable that optionally contains an alternative rowset put timeout (in ms). This only makes a difference for extremely short lived pipelines."
+ },
+ {
+ "name": "HOP_CORE_TRANSFORMS_FILE",
+ "value": null,
+ "description": "The name of the project variable that will contain the alternative location of the hop-transforms.xml file. You can use this to customize the list of available internal transforms outside of the codebase."
+ },
+ {
+ "name": "HOP_CORE_WORKFLOW_ACTIONS_FILE",
+ "value": null,
+ "description": "The name of the project variable that will contain the alternative location of the hop-workflow-actions.xml file."
+ },
+ {
+ "name": "HOP_SERVER_OBJECT_TIMEOUT_MINUTES",
+ "value": "1440",
+ "description": "This project variable will set a time-out after which waiting, completed or stopped pipelines and workflows will be automatically cleaned up. The default value is 1440 (one day)."
+ },
+ {
+ "name": "HOP_PIPELINE_PAN_JVM_EXIT_CODE",
+ "value": null,
+ "description": "Set this variable to an integer that will be returned as the Pan JVM exit code."
+ },
+ {
+ "name": "HOP_DISABLE_CONSOLE_LOGGING",
+ "value": "N",
+ "description": "Set this variable to Y to disable standard Hop logging to the console. (stdout)"
+ },
+ {
+ "name": "HOP_REDIRECT_STDERR",
+ "value": "N",
+ "description": "Set this variable to Y to redirect stderr to Hop logging."
+ },
+ {
+ "name": "HOP_REDIRECT_STDOUT",
+ "value": "N",
+ "description": "Set this variable to Y to redirect stdout to Hop logging."
+ },
+ {
+ "name": "HOP_DEFAULT_NUMBER_FORMAT",
+ "value": null,
+ "description": "The name of the variable containing an alternative default number format"
+ },
+ {
+ "name": "HOP_DEFAULT_BIGNUMBER_FORMAT",
+ "value": null,
+ "description": "The name of the variable containing an alternative default bignumber format"
+ },
+ {
+ "name": "HOP_DEFAULT_INTEGER_FORMAT",
+ "value": null,
+ "description": "The name of the variable containing an alternative default integer format"
+ },
+ {
+ "name": "HOP_DEFAULT_DATE_FORMAT",
+ "value": null,
+ "description": "The name of the variable containing an alternative default date format"
+ },
+ {
+ "name": "HOP_DEFAULT_TIMESTAMP_FORMAT",
+ "value": null,
+ "description": "The name of the variable containing an alternative default timestamp format"
+ },
+ {
+ "name": "HOP_DEFAULT_SERVLET_ENCODING",
+ "value": null,
+ "description": "Defines the default encoding for servlets, leave it empty to use Java default encoding"
+ },
+ {
+ "name": "HOP_FAIL_ON_LOGGING_ERROR",
+ "value": "N",
+ "description": "Set this variable to Y when you want the workflow/pipeline fail with an error when the related logging process (e.g. to a database) fails."
+ },
+ {
+ "name": "HOP_AGGREGATION_MIN_NULL_IS_VALUED",
+ "value": "N",
+ "description": "Set this variable to Y to set the minimum to NULL if NULL is within an aggregate. Otherwise by default NULL is ignored by the MIN aggregate and MIN is set to the minimum value that is not NULL. See also the variable HOP_AGGREGATION_ALL_NULLS_ARE_ZERO."
+ },
+ {
+ "name": "HOP_AGGREGATION_ALL_NULLS_ARE_ZERO",
+ "value": "N",
+ "description": "Set this variable to Y to return 0 when all values within an aggregate are NULL. Otherwise by default a NULL is returned when all values are NULL."
+ },
+ {
+ "name": "HOP_COMPATIBILITY_TEXT_FILE_OUTPUT_APPEND_NO_HEADER",
+ "value": "N",
+ "description": "Set this variable to Y for backward compatibility for the Text File Output transform. Setting this to Ywill add no header row at all when the append option is enabled, regardless if the file is existing or not."
+ },
+ {
+ "name": "HOP_PASSWORD_ENCODER_PLUGIN",
+ "value": "Hop",
+ "description": "Specifies the password encoder plugin to use by ID (Hop is the default)."
+ },
+ {
+ "name": "HOP_SYSTEM_HOSTNAME",
+ "value": null,
+ "description": "You can use this variable to speed up hostname lookup. Hostname lookup is performed by Hop so that it is capable of logging the server on which a workflow or pipeline is executed."
+ },
+ {
+ "name": "HOP_SERVER_JETTY_ACCEPTORS",
+ "value": null,
+ "description": "A variable to configure jetty option: acceptors for Carte"
+ },
+ {
+ "name": "HOP_SERVER_JETTY_ACCEPT_QUEUE_SIZE",
+ "value": null,
+ "description": "A variable to configure jetty option: acceptQueueSize for Carte"
+ },
+ {
+ "name": "HOP_SERVER_JETTY_RES_MAX_IDLE_TIME",
+ "value": null,
+ "description": "A variable to configure jetty option: lowResourcesMaxIdleTime for Carte"
+ },
+ {
+ "name": "HOP_COMPATIBILITY_MERGE_ROWS_USE_REFERENCE_STREAM_WHEN_IDENTICAL",
+ "value": "N",
+ "description": "Set this variable to Y for backward compatibility for the Merge Rows (diff) transform. Setting this to Y will use the data from the reference stream (instead of the comparison stream) in case the compared rows are identical."
+ },
+ {
+ "name": "HOP_SPLIT_FIELDS_REMOVE_ENCLOSURE",
+ "value": "false",
+ "description": "Set this variable to false to preserve enclosure symbol after splitting the string in the Split fields transform. Changing it to true will remove first and last enclosure symbol from the resulting string chunks."
+ },
+ {
+ "name": "HOP_ALLOW_EMPTY_FIELD_NAMES_AND_TYPES",
+ "value": "false",
+ "description": "Set this variable to TRUE to allow your pipeline to pass 'null' fields and/or empty types."
+ },
+ {
+ "name": "HOP_GLOBAL_LOG_VARIABLES_CLEAR_ON_EXPORT",
+ "value": "false",
+ "description": "Set this variable to false to preserve global log variables defined in pipeline / workflow Properties -> Log panel. Changing it to true will clear it when export pipeline / workflow."
+ },
+ {
+ "name": "HOP_FILE_OUTPUT_MAX_STREAM_COUNT",
+ "value": "1024",
+ "description": "This project variable is used by the Text File Output transform. It defines the max number of simultaneously open files within the transform. The transform will close/reopen files as necessary to insure the max is not exceeded"
+ },
+ {
+ "name": "HOP_FILE_OUTPUT_MAX_STREAM_LIFE",
+ "value": "0",
+ "description": "This project variable is used by the Text File Output transform. It defines the max number of milliseconds between flushes of files opened by the transform."
+ },
+ {
+ "name": "HOP_USE_NATIVE_FILE_DIALOG",
+ "value": "N",
+ "description": "Set this value to Y if you want to use the system file open/save dialog when browsing files"
+ },
+ {
+ "name": "HOP_AUTO_CREATE_CONFIG",
+ "value": "Y",
+ "description": "Set this value to N if you don't want to automatically create a hop configuration file (hop-config.json) when it's missing"
+ }
+ ],
+ "LocaleDefault": "en_BE",
+ "guiProperties": {
+ "FontFixedSize": "13",
+ "MaxUndo": "100",
+ "DarkMode": "Y",
+ "FontNoteSize": "13",
+ "ShowOSLook": "Y",
+ "FontFixedStyle": "0",
+ "FontNoteName": ".AppleSystemUIFont",
+ "FontFixedName": "Monospaced",
+ "FontGraphStyle": "0",
+ "FontDefaultSize": "13",
+ "GraphColorR": "255",
+ "FontGraphSize": "13",
+ "IconSize": "32",
+ "BackgroundColorB": "255",
+ "FontNoteStyle": "0",
+ "FontGraphName": ".AppleSystemUIFont",
+ "FontDefaultName": ".AppleSystemUIFont",
+ "GraphColorG": "255",
+ "UseGlobalFileBookmarks": "Y",
+ "FontDefaultStyle": "0",
+ "GraphColorB": "255",
+ "BackgroundColorR": "255",
+ "BackgroundColorG": "255",
+ "WorkflowDialogStyle": "RESIZE,MAX,MIN",
+ "LineWidth": "1",
+ "ContextDialogShowCategories": "Y"
+ },
+ "projectsConfig": {
+ "enabled": true,
+ "projectMandatory": true,
+ "environmentMandatory": false,
+ "defaultProject": "default",
+ "defaultEnvironment": null,
+ "standardParentProject": "default",
+ "standardProjectsFolder": null,
+ "projectConfigurations": [
+ {
+ "projectName": "default",
+ "projectHome": "${HOP_CONFIG_FOLDER}",
+ "configFilename": "project-config.json"
+ }
+ ],
+ "lifecycleEnvironments": [
+ {
+ "name": "dev",
+ "purpose": "Testing",
+ "projectName": "default",
+ "configurationFiles": [
+ "${PROJECT_HOME}/dev-env-config.json"
+ ]
+ }
+ ],
+ "projectLifecycles": []
+ }
+}
diff --git a/integration-tests/cratedb/main-0001-copy-to-from-filesystem.hwf b/integration-tests/cratedb/main-0001-copy-to-from-filesystem.hwf
new file mode 100644
index 00000000000..a26183e4971
--- /dev/null
+++ b/integration-tests/cratedb/main-0001-copy-to-from-filesystem.hwf
@@ -0,0 +1,194 @@
+
+
+
+ main-0001-copy-to-from-filesystem
+ Y
+
+
+
+ -
+ 2024/04/26 15:51:06.701
+ -
+ 2024/04/26 15:51:06.701
+
+
+
+
+ Start
+
+ SPECIAL
+
+ 1
+ 12
+ 60
+ 0
+ 0
+ N
+ 0
+ 1
+ N
+ 96
+ 144
+
+
+
+ 0001-copy-from-existing-file
+
+ PIPELINE
+
+ N
+ N
+ N
+ N
+ N
+ N
+ ${PROJECT_HOME}/0001-copy-from-existing-file.hpl
+ Basic
+
+ Y
+
+ N
+ local
+ N
+ N
+ Y
+ N
+ 800
+ 144
+
+
+
+ Create test user and grant privileges
+
+ SQL
+
+ cratedb-test
+ N
+ DROP USER IF EXISTS alice;
+CREATE USER alice WITH (password='password');
+GRANT ALL PRIVILEGES TO alice;
+ N
+ N
+ N
+ 288
+ 144
+
+
+
+ Create test table
+
+ SQL
+
+ cratedb-test
+ N
+ DROP TABLE IF EXISTS doc.test;
+CREATE TABLE doc.test
+(
+ firstName VARCHAR(255)
+, lastName VARCHAR(255)
+, birthDate TIMESTAMP
+, city VARCHAR(255)
+, country VARCHAR(100)
+, birthDateAndTime TIMESTAMP
+);
+ N
+ N
+ N
+ 544
+ 144
+
+
+
+ Check if 100 rows
+
+ EVAL_TABLE_CONTENT
+
+ cratedb-test
+ doc
+ test
+ rows_count_equal
+ 100
+ N
+ N
+
+ N
+ Y
+ N
+ 1152
+ 144
+
+
+
+ Refresh test table
+
+ SQL
+
+ cratedb-test
+ N
+ REFRESH TABLE doc.test;
+ N
+ N
+ N
+ 1008
+ 144
+
+
+
+
+
+ Start
+ Create test user and grant privileges
+ Y
+ Y
+ Y
+
+
+ Create test user and grant privileges
+ Create test table
+ Y
+ Y
+ Y
+
+
+ Create test table
+ 0001-copy-from-existing-file
+ Y
+ N
+ Y
+
+
+ 0001-copy-from-existing-file
+ Refresh test table
+ Y
+ Y
+ N
+
+
+ Refresh test table
+ Check if 100 rows
+ Y
+ Y
+ N
+
+
+
+
+
+
diff --git a/integration-tests/cratedb/metadata/pipeline-run-configuration/local.json b/integration-tests/cratedb/metadata/pipeline-run-configuration/local.json
new file mode 100644
index 00000000000..11a9ee03d27
--- /dev/null
+++ b/integration-tests/cratedb/metadata/pipeline-run-configuration/local.json
@@ -0,0 +1,21 @@
+{
+ "engineRunConfiguration": {
+ "Local": {
+ "feedback_size": "50000",
+ "sample_size": "100",
+ "sample_type_in_gui": "Last",
+ "wait_time": "20",
+ "rowset_size": "10000",
+ "safe_mode": false,
+ "show_feedback": false,
+ "topo_sort": false,
+ "gather_metrics": false,
+ "transactional": false
+ }
+ },
+ "defaultSelection": true,
+ "configurationVariables": [],
+ "name": "local",
+ "description": "",
+ "executionInfoLocationName": "Runs your pipelines locally with the standard local Hop pipeline engine"
+}
\ No newline at end of file
diff --git a/integration-tests/cratedb/metadata/rdbms/cratedb-test.json b/integration-tests/cratedb/metadata/rdbms/cratedb-test.json
new file mode 100644
index 00000000000..400703de389
--- /dev/null
+++ b/integration-tests/cratedb/metadata/rdbms/cratedb-test.json
@@ -0,0 +1,26 @@
+{
+ "rdbms": {
+ "CRATEDB": {
+ "databaseName": "test",
+ "pluginId": "CRATEDB",
+ "accessType": 0,
+ "hostname": "cratedb",
+ "password": "Encrypted 2be98afc86aa7f2e4bb18bd63c99dbdde",
+ "pluginName": "CrateDB",
+ "port": "5432",
+ "attributes": {
+ "SUPPORTS_TIMESTAMP_DATA_TYPE": "Y",
+ "QUOTE_ALL_FIELDS": "N",
+ "SUPPORTS_BOOLEAN_DATA_TYPE": "Y",
+ "FORCE_IDENTIFIERS_TO_LOWERCASE": "N",
+ "PRESERVE_RESERVED_WORD_CASE": "Y",
+ "SQL_CONNECT": "",
+ "FORCE_IDENTIFIERS_TO_UPPERCASE": "N",
+ "PREFERRED_SCHEMA_NAME": ""
+ },
+ "manualUrl": "",
+ "username": "crate"
+ }
+ },
+ "name": "cratedb-test"
+}
\ No newline at end of file
diff --git a/integration-tests/cratedb/metadata/workflow-run-configuration/local.json b/integration-tests/cratedb/metadata/workflow-run-configuration/local.json
new file mode 100644
index 00000000000..1d0cf74baec
--- /dev/null
+++ b/integration-tests/cratedb/metadata/workflow-run-configuration/local.json
@@ -0,0 +1,11 @@
+{
+ "engineRunConfiguration": {
+ "Local": {
+ "safe_mode": false,
+ "transactional": false
+ }
+ },
+ "defaultSelection": true,
+ "name": "local",
+ "description": "Runs your workflows locally with the standard local Hop workflow engine"
+}
\ No newline at end of file
diff --git a/integration-tests/cratedb/project-config.json b/integration-tests/cratedb/project-config.json
new file mode 100644
index 00000000000..6a91171e1c8
--- /dev/null
+++ b/integration-tests/cratedb/project-config.json
@@ -0,0 +1,13 @@
+{
+ "metadataBaseFolder" : "${PROJECT_HOME}/metadata",
+ "unitTestsBasePath" : "${PROJECT_HOME}",
+ "dataSetsCsvFolder" : "${PROJECT_HOME}/datasets",
+ "enforcingExecutionInHome" : true,
+ "config" : {
+ "variables" : [ {
+ "name" : "HOP_LICENSE_HEADER_FILE",
+ "value" : "${PROJECT_HOME}/../asf-header.txt",
+ "description" : "This will automatically serialize the ASF license header into pipelines and workflows in the integration test projects"
+ } ]
+ }
+}
\ No newline at end of file
diff --git a/plugins/databases/cratedb/src/main/java/org/apache/hop/databases/cratedb/CrateDbDatabaseMeta.java b/plugins/databases/cratedb/src/main/java/org/apache/hop/databases/cratedb/CrateDBDatabaseMeta.java
similarity index 97%
rename from plugins/databases/cratedb/src/main/java/org/apache/hop/databases/cratedb/CrateDbDatabaseMeta.java
rename to plugins/databases/cratedb/src/main/java/org/apache/hop/databases/cratedb/CrateDBDatabaseMeta.java
index f3edf652084..86196c6c6fb 100644
--- a/plugins/databases/cratedb/src/main/java/org/apache/hop/databases/cratedb/CrateDbDatabaseMeta.java
+++ b/plugins/databases/cratedb/src/main/java/org/apache/hop/databases/cratedb/CrateDBDatabaseMeta.java
@@ -25,9 +25,10 @@
@DatabaseMetaPlugin(
type = "CRATEDB",
typeDescription = "CrateDB",
- documentationUrl = "/database/databases/cratedb.html")
+ documentationUrl = "/database/databases/cratedb.html",
+ classLoaderGroup = "crate-db")
@GuiPlugin(id = "GUI-CrateDBDatabaseMeta")
-public class CrateDbDatabaseMeta extends PostgreSqlDatabaseMeta {
+public class CrateDBDatabaseMeta extends PostgreSqlDatabaseMeta {
private static final String SEQUENCES_NOT_SUPPORTED = "CrateDB does not support sequences";
diff --git a/plugins/databases/cratedb/src/test/java/org/apache/hop/databases/cratedb/CrateDBDatabaseMetaIT.java b/plugins/databases/cratedb/src/test/java/org/apache/hop/databases/cratedb/CrateDBDatabaseMetaIT.java
index 6360d0cdbe0..92fc989a9b2 100644
--- a/plugins/databases/cratedb/src/test/java/org/apache/hop/databases/cratedb/CrateDBDatabaseMetaIT.java
+++ b/plugins/databases/cratedb/src/test/java/org/apache/hop/databases/cratedb/CrateDBDatabaseMetaIT.java
@@ -41,7 +41,7 @@ public class CrateDBDatabaseMetaIT {
private static Connection connection;
- private CrateDbDatabaseMeta nativeMeta = new CrateDbDatabaseMeta();
+ private CrateDBDatabaseMeta nativeMeta = new CrateDBDatabaseMeta();
@BeforeClass
public static void setup() throws Exception {
@@ -83,14 +83,13 @@ public void doNotSupportSequences() {
public void sqlStatements() throws Exception {
executeUpdate(
"INSERT INTO foo (id, name, description) VALUES (1, 'Alice', 'test_description');");
- Thread.sleep(1500); // need a break to make sure the data is there: unfortunately,
- // CrateDB does not support transactions, rather it promote eventual consistency.
- // Using an async lib for assertions like awaitility would be a better approach
+ executeUpdate("REFRESH TABLE foo;");
+
int counter = 0;
ResultSet rs = executeQuery(nativeMeta.getSqlQueryFields("foo"));
while (rs.next()) {
counter++;
- assertTrue("Alice".equals(rs.getString("name")));
+ assertEquals("Alice", rs.getString("name"));
}
assertTrue(counter > 0);
@@ -98,7 +97,7 @@ public void sqlStatements() throws Exception {
rs = executeQuery(nativeMeta.getSqlTableExists("foo"));
while (rs.next()) {
counter++;
- assertTrue("Alice".equals(rs.getString("name")));
+ assertEquals("Alice", rs.getString("name"));
}
assertTrue(counter > 0);
@@ -106,7 +105,7 @@ public void sqlStatements() throws Exception {
rs = executeQuery(nativeMeta.getSqlQueryColumnFields("name", "foo"));
while (rs.next()) {
counter++;
- assertTrue("Alice".equals(rs.getString("name")));
+ assertEquals("Alice", rs.getString("name"));
}
assertTrue(counter > 0);
@@ -114,7 +113,7 @@ public void sqlStatements() throws Exception {
rs = executeQuery(nativeMeta.getSqlColumnExists("name", "foo"));
while (rs.next()) {
counter++;
- assertTrue("Alice".equals(rs.getString("name")));
+ assertEquals("Alice", rs.getString("name"));
}
assertTrue(counter > 0);
}
diff --git a/plugins/databases/cratedb/src/test/java/org/apache/hop/databases/cratedb/CrateDBDatabaseMetaTest.java b/plugins/databases/cratedb/src/test/java/org/apache/hop/databases/cratedb/CrateDBDatabaseMetaTest.java
index b43971d2675..ef46b6aed91 100644
--- a/plugins/databases/cratedb/src/test/java/org/apache/hop/databases/cratedb/CrateDBDatabaseMetaTest.java
+++ b/plugins/databases/cratedb/src/test/java/org/apache/hop/databases/cratedb/CrateDBDatabaseMetaTest.java
@@ -35,13 +35,13 @@
import org.junit.Test;
public class CrateDBDatabaseMetaTest {
- CrateDbDatabaseMeta nativeMeta;
+ CrateDBDatabaseMeta nativeMeta;
private static final String SEQUENCES_NOT_SUPPORTED = "CrateDB doesn't support sequences";
@Before
public void setupBefore() {
- nativeMeta = new CrateDbDatabaseMeta();
+ nativeMeta = new CrateDBDatabaseMeta();
nativeMeta.setAccessType(DatabaseMeta.TYPE_ACCESS_NATIVE);
}
diff --git a/plugins/databases/cratedb/src/test/java/org/apache/hop/databases/cratedb/CrateDBValueMetaBaseTest.java b/plugins/databases/cratedb/src/test/java/org/apache/hop/databases/cratedb/CrateDBValueMetaBaseTest.java
index d7d8689a941..28fabd390dc 100644
--- a/plugins/databases/cratedb/src/test/java/org/apache/hop/databases/cratedb/CrateDBValueMetaBaseTest.java
+++ b/plugins/databases/cratedb/src/test/java/org/apache/hop/databases/cratedb/CrateDBValueMetaBaseTest.java
@@ -125,7 +125,7 @@ public List getEvents() {
@Test
public void test_PDI_17126_Postgres() throws Exception {
String data = StringUtils.repeat("*", 10);
- initValueMeta(new CrateDbDatabaseMeta(), 20, data);
+ initValueMeta(new CrateDBDatabaseMeta(), 20, data);
verify(preparedStatementMock, times(1)).setString(0, data);
}
@@ -137,7 +137,7 @@ public void test_PDI_17126_Postgres() throws Exception {
@Test
public void test_Pdi_17126_postgres_DataLongerThanMetaLength() throws Exception {
String data = StringUtils.repeat("*", 20);
- initValueMeta(new CrateDbDatabaseMeta(), 10, data);
+ initValueMeta(new CrateDBDatabaseMeta(), 10, data);
verify(preparedStatementMock, times(1)).setString(0, data);
}
@@ -151,7 +151,7 @@ public void test_Pdi_17126_postgres_truncate() throws Exception {
List events = listener.getEvents();
assertEquals(0, events.size());
- databaseMetaSpy.setIDatabase(new CrateDbDatabaseMeta());
+ databaseMetaSpy.setIDatabase(new CrateDBDatabaseMeta());
doReturn(1024).when(databaseMetaSpy).getMaxTextFieldLength();
doReturn(false).when(databaseMetaSpy).supportsSetCharacterStream();
@@ -184,7 +184,7 @@ public void testMetdataPreviewSqlNumericWithUndefinedSizeUsingPostgesSql()
doReturn(0).when(resultSet).getInt("COLUMN_SIZE");
doReturn(mock(Object.class)).when(resultSet).getObject("DECIMAL_DIGITS");
doReturn(0).when(resultSet).getInt("DECIMAL_DIGITS");
- doReturn(mock(CrateDbDatabaseMeta.class)).when(dbMeta).getIDatabase();
+ doReturn(mock(CrateDBDatabaseMeta.class)).when(dbMeta).getIDatabase();
IValueMeta valueMeta = valueMetaBase.getMetadataPreview(variables, dbMeta, resultSet);
assertFalse(valueMeta.isBigNumber()); // TODO: VALIDATE!
assertEquals(0, valueMeta.getPrecision()); // TODO: VALIDATE!
@@ -194,7 +194,7 @@ public void testMetdataPreviewSqlNumericWithUndefinedSizeUsingPostgesSql()
@Test
public void testMetdataPreviewSqlBinaryToHopBinary() throws SQLException, HopDatabaseException {
doReturn(Types.BINARY).when(resultSet).getInt("DATA_TYPE");
- doReturn(mock(CrateDbDatabaseMeta.class)).when(dbMeta).getIDatabase();
+ doReturn(mock(CrateDBDatabaseMeta.class)).when(dbMeta).getIDatabase();
IValueMeta valueMeta = valueMetaBase.getMetadataPreview(variables, dbMeta, resultSet);
assertTrue(valueMeta.isBinary());
}
@@ -202,7 +202,7 @@ public void testMetdataPreviewSqlBinaryToHopBinary() throws SQLException, HopDat
@Test
public void testMetdataPreviewSqlBlobToHopBinary() throws SQLException, HopDatabaseException {
doReturn(Types.BLOB).when(resultSet).getInt("DATA_TYPE");
- doReturn(mock(CrateDbDatabaseMeta.class)).when(dbMeta).getIDatabase();
+ doReturn(mock(CrateDBDatabaseMeta.class)).when(dbMeta).getIDatabase();
IValueMeta valueMeta = valueMetaBase.getMetadataPreview(variables, dbMeta, resultSet);
assertTrue(valueMeta.isBinary());
assertTrue(valueMeta.isBinary());
@@ -212,7 +212,7 @@ public void testMetdataPreviewSqlBlobToHopBinary() throws SQLException, HopDatab
public void testMetdataPreviewSqlVarBinaryToHopBinary()
throws SQLException, HopDatabaseException {
doReturn(Types.VARBINARY).when(resultSet).getInt("DATA_TYPE");
- doReturn(mock(CrateDbDatabaseMeta.class)).when(dbMeta).getIDatabase();
+ doReturn(mock(CrateDBDatabaseMeta.class)).when(dbMeta).getIDatabase();
IValueMeta valueMeta = valueMetaBase.getMetadataPreview(variables, dbMeta, resultSet);
assertTrue(valueMeta.isBinary());
}
@@ -221,7 +221,7 @@ public void testMetdataPreviewSqlVarBinaryToHopBinary()
public void testMetdataPreviewSqlLongVarBinaryToHopBinary()
throws SQLException, HopDatabaseException {
doReturn(Types.LONGVARBINARY).when(resultSet).getInt("DATA_TYPE");
- doReturn(mock(CrateDbDatabaseMeta.class)).when(dbMeta).getIDatabase();
+ doReturn(mock(CrateDBDatabaseMeta.class)).when(dbMeta).getIDatabase();
IValueMeta valueMeta = valueMetaBase.getMetadataPreview(variables, dbMeta, resultSet);
assertTrue(valueMeta.isBinary());
}
diff --git a/plugins/databases/cratedb/src/test/java/org/apache/hop/databases/cratedb/ReleaseSavePointTest.java b/plugins/databases/cratedb/src/test/java/org/apache/hop/databases/cratedb/ReleaseSavePointTest.java
index bc6d0edd76e..b56310235b8 100644
--- a/plugins/databases/cratedb/src/test/java/org/apache/hop/databases/cratedb/ReleaseSavePointTest.java
+++ b/plugins/databases/cratedb/src/test/java/org/apache/hop/databases/cratedb/ReleaseSavePointTest.java
@@ -26,7 +26,7 @@ public class ReleaseSavePointTest {
IDatabase[] support =
new IDatabase[] {
- new CrateDbDatabaseMeta(),
+ new CrateDBDatabaseMeta(),
};
@Test
diff --git a/plugins/transforms/cratedbbulkloader/pom.xml b/plugins/transforms/cratedbbulkloader/pom.xml
new file mode 100644
index 00000000000..9d23df22d15
--- /dev/null
+++ b/plugins/transforms/cratedbbulkloader/pom.xml
@@ -0,0 +1,58 @@
+
+
+
+
+ 4.0.0
+
+ org.apache.hop
+ hop-plugins-transforms
+ 2.9.0-SNAPSHOT
+
+
+ hop-transform-cratedbbulkloader
+ jar
+
+ Hop Plugins Transforms CrateDB bulk loader
+
+
+ 2.7.0
+
+
+
+
+ io.crate
+ crate-jdbc
+ ${cratedb-jdbc.version}
+ runtime
+
+
+ org.apache.hop
+ hop-databases-cratedb
+ ${project.version}
+
+
+ org.testcontainers
+ cratedb
+ 1.19.7
+ test
+
+
+
+
diff --git a/plugins/transforms/cratedbbulkloader/src/main/java/org/apache/hop/pipeline/transforms/cratedbbulkloader/CrateDBBulkLoader.java b/plugins/transforms/cratedbbulkloader/src/main/java/org/apache/hop/pipeline/transforms/cratedbbulkloader/CrateDBBulkLoader.java
new file mode 100644
index 00000000000..06bf7c411f1
--- /dev/null
+++ b/plugins/transforms/cratedbbulkloader/src/main/java/org/apache/hop/pipeline/transforms/cratedbbulkloader/CrateDBBulkLoader.java
@@ -0,0 +1,916 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.pipeline.transforms.cratedbbulkloader;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hop.core.Const;
+import org.apache.hop.core.database.Database;
+import org.apache.hop.core.database.DatabaseMeta;
+import org.apache.hop.core.exception.HopDatabaseException;
+import org.apache.hop.core.exception.HopException;
+import org.apache.hop.core.exception.HopTransformException;
+import org.apache.hop.core.exception.HopValueException;
+import org.apache.hop.core.row.IRowMeta;
+import org.apache.hop.core.row.IValueMeta;
+import org.apache.hop.core.row.RowMeta;
+import org.apache.hop.core.row.value.ValueMetaString;
+import org.apache.hop.core.util.Utils;
+import org.apache.hop.core.vfs.HopVfs;
+import org.apache.hop.i18n.BaseMessages;
+import org.apache.hop.pipeline.Pipeline;
+import org.apache.hop.pipeline.PipelineMeta;
+import org.apache.hop.pipeline.transform.BaseTransform;
+import org.apache.hop.pipeline.transform.TransformMeta;
+import org.apache.hop.pipeline.transforms.cratedbbulkloader.http.BulkImportClient;
+import org.apache.hop.pipeline.transforms.cratedbbulkloader.http.HttpBulkImportResponse;
+import org.apache.hop.pipeline.transforms.cratedbbulkloader.http.exceptions.CrateDBHopException;
+
+public class CrateDBBulkLoader extends BaseTransform {
+ private static final Class> PKG =
+ CrateDBBulkLoader.class; // for i18n purposes, needed by Translator2!!
+ public static final String TIMESTAMP_CONVERSION_MASK = "yyyy-MM-dd HH:mm:ss.SSS";
+ public static final String DATE_CONVERSION_MASK = "yyyy-MM-dd";
+
+ private final BulkImportClient bulkImportClient =
+ new BulkImportClient(meta.getHttpEndpoint(), meta.getHttpLogin(), meta.getHttpPassword());
+
+ public CrateDBBulkLoader(
+ TransformMeta transformMeta,
+ CrateDBBulkLoaderMeta meta,
+ CrateDBBulkLoaderData data,
+ int copyNr,
+ PipelineMeta pipelineMeta,
+ Pipeline pipeline) {
+ super(transformMeta, meta, data, copyNr, pipelineMeta, pipeline);
+ }
+
+ @Override
+ public boolean init() {
+
+ if (super.init()) {
+ try {
+ // Validating that the connection has been defined.
+ verifyDatabaseConnection();
+ data.databaseMeta = this.getPipelineMeta().findDatabase(meta.getConnection(), variables);
+
+ if (meta.isStreamToS3Csv()) {
+ if (!meta.isUseHttpEndpoint()) {
+ // get the file output stream to write to S3
+ data.writer = HopVfs.getOutputStream(resolve(meta.getReadFromFilename()), false);
+ }
+ }
+
+ data.db = new Database(this, this, data.databaseMeta);
+ data.db.connect();
+ getDbFields();
+
+ if (log.isBasic()) {
+ logBasic(
+ BaseMessages.getString(
+ PKG, "CrateDBBulkLoader.Connection.Connected", data.db.getDatabaseMeta()));
+ }
+ initBinaryDataFields();
+
+ data.db.setAutoCommit(false);
+
+ return true;
+ } catch (HopException e) {
+ logError("An error occurred initializing this transform: " + e.getMessage());
+ stopAll();
+ setErrors(1);
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean processRow() throws HopException {
+
+ Object[] r = getRow(); // this also waits for a previous transform to be finished.
+
+ if (r == null) { // no more input to be expected...
+ if (first && meta.isTruncateTable() && !meta.isOnlyWhenHaveRows()) {
+ truncateTable();
+ }
+
+ if (!first) {
+ try {
+ data.close();
+ closeFile();
+ if (meta.isUseHttpEndpoint()) {
+ String[] columns =
+ meta.getFields().stream()
+ .map(CrateDBBulkLoaderField::getDatabaseField)
+ .toArray(String[]::new);
+ data.outputRowMeta.getValueMetaList().forEach(v -> logBasic(v.toString()));
+ String schema = meta.getSchemaName();
+ String table = meta.getTableName();
+ writeBatchToCrateDB(schema, table, columns);
+ } else {
+ String copyStmt = buildCopyStatementSqlString();
+ Connection conn = data.db.getConnection();
+ Statement stmt = conn.createStatement();
+ final ResultSet resultSet = stmt.executeQuery(copyStmt);
+ int errorCount = 0;
+ while (resultSet.next()) {
+ String node = resultSet.getString("node");
+ String uri = resultSet.getString("uri");
+ int successCount = resultSet.getInt("success_count");
+ errorCount = resultSet.getInt("error_count");
+ String errors = resultSet.getString("errors");
+ logError(
+ "Node: "
+ + node
+ + " URI: "
+ + uri
+ + " Success Count: "
+ + successCount
+ + " Error Count: "
+ + errorCount
+ + " Errors: "
+ + errors);
+ incrementLinesOutput(successCount);
+ incrementLinesRejected(errorCount);
+ }
+ conn.commit();
+ stmt.close();
+ conn.close();
+ if (errorCount > 0) {
+ throw new HopException(
+ "Failed to COPY FROM CSV file to CrateDB: " + errorCount + " rows failed");
+ }
+ }
+
+ } catch (SQLException sqle) {
+ setErrors(1);
+ stopAll();
+ setOutputDone(); // signal end to receiver(s)
+ throw new HopDatabaseException("Error executing COPY statements", sqle);
+ } catch (IOException ioe) {
+ setErrors(1);
+ stopAll();
+ setOutputDone(); // signal end to receiver(s)
+ throw new HopTransformException("Error releasing resources", ioe);
+ } catch (CrateDBHopException e) {
+ throw new HopException(e);
+ }
+ }
+
+ return false;
+ }
+
+ if (first) {
+ first = false;
+ if (meta.isStreamToS3Csv()) {
+
+ data.fieldnrs = new HashMap<>();
+
+ // if (meta.isTruncateTable()) {
+ // // truncateTable();
+ // }
+
+ meta.getFields(data.insertRowMeta, getTransformName(), null, null, this, metadataProvider);
+
+ if (!meta.specifyFields()) {
+ // write all fields in the stream to CrateDB
+ // Just take the whole input row
+ data.insertRowMeta = getInputRowMeta().clone();
+ data.selectedRowFieldIndices = new int[data.insertRowMeta.size()];
+ // TODO Serasoft
+ // Is the statement below really needed??
+ try {
+ getDbFields();
+ } catch (HopException e) {
+ logError("Error getting database fields", e);
+ setErrors(1);
+ stopAll();
+ setOutputDone(); // signal end to receiver(s)
+ return false;
+ }
+
+ defineAllFieldsMetadataList();
+ } else {
+ defineSelectedFieldsMetadataList();
+ }
+ }
+ }
+
+ data.outputRowMeta = getInputRowMeta().clone();
+
+ if (!meta.isUseHttpEndpoint()) {
+ if (meta.isStreamToS3Csv()) {
+ writeRowToFile(data.outputRowMeta, r);
+ }
+ } else {
+ appendRowAsJsonLine(data.outputRowMeta, r);
+ try {
+ writeIfBatchSizeRecordsAreReached();
+ } catch (CrateDBHopException e) {
+ throw new HopException(e);
+ }
+ }
+ putRow(getInputRowMeta().clone(), r);
+
+ return true;
+ }
+
+ private void incrementLinesRejected(int count) {
+ for (int i = 0; i < count; i++) {
+ incrementLinesRejected();
+ }
+ }
+
+ private void incrementLinesOutput(int count) {
+ for (int i = 0; i < count; i++) {
+ incrementLinesOutput();
+ }
+ }
+
+ private void defineAllFieldsMetadataList() throws HopException {
+ for (int i = 0; i < meta.getFields().size(); i++) {
+ int streamFieldLocation =
+ data.insertRowMeta.indexOfValue(meta.getFields().get(i).getStreamField());
+ if (streamFieldLocation < 0) {
+ throw new HopTransformException(
+ "Field ["
+ + meta.getFields().get(i).getStreamField()
+ + "] couldn't be found in the input stream!");
+ }
+
+ data.selectedRowFieldIndices[i] = streamFieldLocation;
+
+ int dbFieldLocation = -1;
+ for (int e = 0; e < data.dbFields.size(); e++) {
+ String[] field = data.dbFields.get(e);
+ if (field[0].equalsIgnoreCase(meta.getFields().get(i).getDatabaseField())) {
+ dbFieldLocation = e;
+ break;
+ }
+ }
+ if (dbFieldLocation < 0) {
+ throw new HopException(
+ "Field ["
+ + meta.getFields().get(i).getDatabaseField()
+ + "] couldn't be found in the table!");
+ }
+ IValueMeta inputValueMeta = getInputRowMeta().getValueMeta(streamFieldLocation);
+
+ IValueMeta insertValueMeta = inputValueMeta.clone();
+ insertValueMeta.setName(data.dbFields.get(dbFieldLocation)[0]);
+
+ data.insertRowMeta.addValueMeta(insertValueMeta);
+
+ data.fieldnrs.put(
+ meta.getFields().get(i).getDatabaseField().toUpperCase(), streamFieldLocation);
+ }
+ }
+
+ private void defineSelectedFieldsMetadataList() throws HopTransformException {
+ // use the columns/fields mapping.
+ int numberOfInsertFields = meta.getFields().size();
+ data.insertRowMeta = new RowMeta();
+
+ // Cache the position of the selected fields in the row array
+ data.selectedRowFieldIndices = new int[numberOfInsertFields];
+ for (int i = 0; i < meta.getFields().size(); i++) {
+ CrateDBBulkLoaderField vbf = meta.getFields().get(i);
+ String inputFieldName = vbf.getStreamField();
+ int inputFieldIdx = getInputRowMeta().indexOfValue(inputFieldName);
+ if (inputFieldIdx < 0) {
+ throw new HopTransformException(
+ BaseMessages.getString(
+ PKG, "CrateDBBulkLoader.Exception.FieldRequired", inputFieldName)); // $NON-NLS-1$
+ }
+ data.selectedRowFieldIndices[i] = inputFieldIdx;
+ String insertFieldName = vbf.getDatabaseField();
+ IValueMeta inputValueMeta = getInputRowMeta().getValueMeta(inputFieldIdx);
+ if (inputValueMeta == null) {
+ throw new HopTransformException(
+ BaseMessages.getString(
+ PKG,
+ "CrateDBBulkLoader.Exception.FailedToFindField",
+ vbf.getStreamField())); // $NON-NLS-1$
+ }
+ IValueMeta insertValueMeta = inputValueMeta.clone();
+ insertValueMeta.setName(insertFieldName);
+ data.insertRowMeta.addValueMeta(insertValueMeta);
+ data.fieldnrs.put(meta.getFields().get(i).getDatabaseField().toUpperCase(), inputFieldIdx);
+ }
+ }
+
+ private void writeIfBatchSizeRecordsAreReached() throws HopException, CrateDBHopException {
+ int maxBatchSize = Integer.parseInt(meta.getBatchSize());
+ if (data.httpBulkArgs.size() >= maxBatchSize) {
+ String[] columns =
+ meta.getFields().stream()
+ .map(CrateDBBulkLoaderField::getDatabaseField)
+ .toArray(String[]::new);
+ String schema = meta.getSchemaName();
+ String table = meta.getTableName();
+ writeBatchToCrateDB(schema, table, columns);
+ }
+ }
+
+ private void writeBatchToCrateDB(String schema, String table, String[] columns)
+ throws HopException, CrateDBHopException {
+ try {
+ final HttpBulkImportResponse httpResponse =
+ bulkImportClient.batchInsert(schema, table, columns, data.httpBulkArgs);
+ // TODO Serasoft
+ // Review this way to calculate lines output
+ for (int i = 0; i < httpResponse.outputRows(); i++) {
+ incrementLinesOutput();
+ }
+ for (int i = 0; i < httpResponse.rejectedRows(); i++) {
+ incrementLinesRejected();
+ }
+ switch (httpResponse.statusCode()) {
+ case 200:
+ data.httpBulkArgs.clear();
+ break;
+ case 401:
+ throw new HopException("Unauthorized access to CrateDB");
+ default:
+ throw new HopException("Error sending bulk import request");
+ }
+ if (200 == httpResponse.statusCode()) {
+ data.httpBulkArgs.clear();
+ } else {
+ throw new HopException("Error sending bulk import request");
+ }
+ } catch (JsonProcessingException e) {
+ throw new HopException("Error sending bulk import request ", e);
+ }
+ }
+
+ private void appendRowAsJsonLine(IRowMeta rowMeta, Object[] row) throws HopTransformException {
+ Object[] args = new Object[rowMeta.size()];
+ try {
+ for (int i = 0; i < data.insertRowMeta.size(); i++) {
+ IValueMeta v = data.insertRowMeta.getValueMeta(i);
+ args[i] = convertDatatypeIfNeeded(v, row[data.selectedRowFieldIndices[i]], i);
+ }
+
+ data.convertedRowMetaReady = true;
+
+ data.httpBulkArgs.add(args);
+ } catch (Exception e) {
+ throw new HopTransformException("Error writing JSON line to file", e);
+ }
+ }
+
+ private String convertDatatypeIfNeeded(IValueMeta v, Object rowItem, int pos)
+ throws HopException {
+ IValueMeta vc = null;
+ String convertedValue = null;
+
+ if (!data.convertedRowMetaReady && data.convertedRowMeta == null)
+ data.convertedRowMeta = data.insertRowMeta.clone();
+
+ switch (v.getType()) {
+ case IValueMeta.TYPE_STRING:
+ convertedValue = (String) rowItem;
+ break;
+ case IValueMeta.TYPE_INTEGER:
+ convertedValue = String.valueOf(rowItem);
+ break;
+ case IValueMeta.TYPE_TIMESTAMP:
+ vc = new ValueMetaString();
+ vc.setName(v.getName());
+ v.setConversionMask(TIMESTAMP_CONVERSION_MASK);
+ vc.setConversionMask(TIMESTAMP_CONVERSION_MASK);
+ convertedValue = (String) vc.convertData(v, rowItem);
+ break;
+ case IValueMeta.TYPE_DATE:
+ vc = new ValueMetaString();
+ vc.setName(v.getName());
+ v.setConversionMask(DATE_CONVERSION_MASK);
+ vc.setConversionMask(DATE_CONVERSION_MASK);
+ convertedValue = (String) vc.convertData(v, rowItem);
+ break;
+ default:
+ convertedValue = (String) rowItem;
+ break;
+ }
+
+ logDetailed("Field: " + v.getName() + " - Converted Value: " + convertedValue);
+
+ if (vc != null && !data.convertedRowMetaReady) data.convertedRowMeta.setValueMeta(pos, vc);
+
+ return convertedValue;
+ }
+
+ /**
+ * Closes a file so that its file handle is no longer open
+ *
+ * @return true if we successfully closed the file
+ */
+ private boolean closeFile() {
+ boolean returnValue = false;
+
+ try {
+ if (data.writer != null) {
+ data.writer.flush();
+ data.writer.close();
+ }
+ data.writer = null;
+ if (log.isDebug()) {
+ logDebug("Closing normal file ...");
+ }
+
+ returnValue = true;
+ } catch (Exception e) {
+ logError("Exception trying to close file: " + e.toString());
+ setErrors(1);
+ returnValue = false;
+ }
+ return returnValue;
+ }
+
+ private String buildCopyStatementSqlString() {
+ final DatabaseMeta databaseMeta = data.db.getDatabaseMeta();
+
+ StringBuilder sb = new StringBuilder(150);
+ sb.append("COPY ");
+
+ sb.append(
+ databaseMeta.getQuotedSchemaTableCombination(
+ variables,
+ data.db.resolve(meta.getSchemaName()),
+ data.db.resolve(meta.getTableName())));
+
+ sb.append(" (");
+ List fieldList = meta.getFields();
+ for (int i = 0; i < fieldList.size(); i++) {
+ CrateDBBulkLoaderField field = fieldList.get(i);
+ if (i > 0) {
+ sb.append(", " + field.getDatabaseField());
+ } else {
+ sb.append(field.getDatabaseField());
+ }
+ }
+ sb.append(")");
+
+ String awsAccessKeyId = "";
+ String awsSecretAccessKey = "";
+ if (meta.isUseSystemEnvVars()) {
+ awsAccessKeyId = System.getenv("AWS_ACCESS_KEY_ID");
+ awsSecretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY");
+ } else {
+ awsAccessKeyId = resolve(meta.getAwsAccessKeyId());
+ awsSecretAccessKey = resolve(meta.getAwsSecretAccessKey());
+ }
+
+ String awsSec = awsAccessKeyId + ":" + awsSecretAccessKey;
+ String filename = resolve(meta.getReadFromFilename());
+
+ String uriLeft = filename.substring(0, 5);
+ String uriRight = filename.substring(5, filename.length());
+
+ filename = uriLeft + awsSec + "@" + uriRight;
+
+ sb.append(" FROM '" + filename + "'");
+ sb.append(" WITH (format='csv', wait_for_completion=true");
+ sb.append(", header=false");
+ sb.append(", delimiter='" + CrateDBBulkLoaderMeta.DEFAULT_CSV_DELIMITER + "'");
+ sb.append(")");
+ sb.append(" RETURN SUMMARY");
+
+ logDetailed("Copy stmt: " + sb.toString());
+
+ return sb.toString();
+ }
+
+ /**
+ * Runs a desc table to get the fields, and field types from the database. Uses a desc table as
+ * opposed to the select * from table limit 0 that Hop normally uses to get the fields and types,
+ * due to the need to handle the Time type. The select * method through Hop does not give us the
+ * ability to differentiate time from timestamp.
+ *
+ * @throws HopException
+ */
+ private void getDbFields() throws HopException {
+ data.dbFields = new ArrayList<>();
+
+ IRowMeta rowMeta = null;
+
+ if (!StringUtils.isEmpty(resolve(meta.getSchemaName()))) {
+ rowMeta = data.db.getTableFields(meta.getSchemaName() + "." + meta.getTableName());
+ } else {
+ rowMeta = data.db.getTableFields(meta.getTableName());
+ }
+ try {
+ if (rowMeta.size() == 0) {
+ throw new HopException("No fields found in table");
+ }
+
+ for (int i = 0; i < rowMeta.size(); i++) {
+ String field[] = new String[2];
+ field[0] = rowMeta.getValueMeta(i).getName().toUpperCase();
+ field[1] = rowMeta.getValueMeta(i).getTypeDesc().toUpperCase();
+ data.dbFields.add(field);
+ }
+ } catch (Exception ex) {
+ throw new HopException("Error getting database fields", ex);
+ }
+ }
+
+ protected void verifyDatabaseConnection() throws HopException {
+ // Confirming Database Connection is defined.
+ if (meta.getConnection() == null) {
+ throw new HopException(
+ BaseMessages.getString(PKG, "CrateDBBulkLoaderMeta.Error.NoConnection"));
+ }
+ }
+
+ /**
+ * Initialize the binary values of delimiters, enclosures, and escape characters
+ *
+ * @throws HopException
+ */
+ private void initBinaryDataFields() throws HopException {
+ try {
+ data.binarySeparator = new byte[] {};
+ data.binaryEnclosure = new byte[] {};
+ data.binaryNewline = new byte[] {};
+ data.escapeCharacters = new byte[] {};
+
+ data.binarySeparator =
+ resolve(CrateDBBulkLoaderMeta.DEFAULT_CSV_DELIMITER).getBytes(StandardCharsets.UTF_8);
+ data.binaryEnclosure =
+ resolve(CrateDBBulkLoaderMeta.ENCLOSURE).getBytes(StandardCharsets.UTF_8);
+ data.binaryNewline =
+ CrateDBBulkLoaderMeta.CSV_RECORD_DELIMITER.getBytes(StandardCharsets.UTF_8);
+ data.escapeCharacters =
+ CrateDBBulkLoaderMeta.CSV_ESCAPE_CHAR.getBytes(StandardCharsets.UTF_8);
+
+ data.binaryNullValue = "".getBytes(StandardCharsets.UTF_8);
+ } catch (Exception e) {
+ throw new HopException("Unexpected error while encoding binary fields", e);
+ }
+ }
+
+ /**
+ * Writes an individual row of data to a temp file
+ *
+ * @param rowMeta The metadata about the row
+ * @param row The input row
+ * @throws HopTransformException
+ */
+ private void writeRowToFile(IRowMeta rowMeta, Object[] row) throws HopTransformException {
+
+ try {
+ for (int i = 0; i < data.insertRowMeta.size(); i++) {
+ if (i > 0 && data.binarySeparator.length > 0) {
+ data.writer.write(data.binarySeparator);
+ }
+
+ Object convertedValue = null;
+ IValueMeta v = data.insertRowMeta.getValueMeta(i);
+ convertedValue = convertDatatypeIfNeeded(v, row[data.selectedRowFieldIndices[i]], i);
+ writeField(
+ data.convertedRowMeta.getValueMeta(i),
+ convertedValue,
+ (!meta.isSpecifyFields() ? null : data.binaryNullValue));
+ }
+ data.convertedRowMetaReady = true;
+ data.writer.write(data.binaryNewline);
+ } catch (Exception e) {
+ throw new HopTransformException("Error writing line", e);
+ }
+ }
+
+ /**
+ * Writes an individual field to the temp file.
+ *
+ * @param v The metadata about the column
+ * @param valueData The data for the column
+ * @param nullString The bytes to put in the temp file if the value is null
+ * @throws HopTransformException
+ */
+ private void writeField(IValueMeta v, Object valueData, byte[] nullString)
+ throws HopTransformException {
+ try {
+ byte[] str;
+
+ // First check whether or not we have a null string set
+ // These values should be set when a null value passes
+ //
+ if (nullString != null && v.isNull(valueData)) {
+ str = nullString;
+ } else {
+ str = formatField(v, valueData);
+ }
+
+ if (str != null && str.length > 0) {
+ List enclosures = null;
+ boolean writeEnclosures = false;
+
+ if (v.isString()) {
+ writeEnclosures = true;
+
+ if (containsSeparatorOrEnclosure(
+ str, data.binarySeparator, data.binaryEnclosure, data.escapeCharacters)) {
+ writeEnclosures = true;
+ }
+ }
+
+ if (writeEnclosures) {
+ data.writer.write(data.binaryEnclosure);
+ enclosures = getEnclosurePositions(str);
+ }
+
+ if (enclosures == null) {
+ data.writer.write(str);
+ } else {
+ // Skip the enclosures, escape them instead...
+ int from = 0;
+ for (Integer enclosure : enclosures) {
+ // Minus one to write the escape before the enclosure
+ int position = enclosure;
+ data.writer.write(str, from, position - from);
+ data.writer.write(data.escapeCharacters); // write enclosure a second time
+ from = position;
+ }
+ if (from < str.length) {
+ data.writer.write(str, from, str.length - from);
+ }
+ }
+
+ if (writeEnclosures) {
+ data.writer.write(data.binaryEnclosure);
+ }
+ }
+ } catch (Exception e) {
+ throw new HopTransformException("Error writing field content to file", e);
+ }
+ }
+
+ /**
+ * Takes an input field and converts it to bytes to be stored in the temp file.
+ *
+ * @param v The metadata about the column
+ * @param valueData The column data
+ * @return The bytes for the value
+ * @throws HopValueException
+ */
+ private byte[] formatField(IValueMeta v, Object valueData) throws HopValueException {
+ if (v.isString()) {
+ if (v.isStorageBinaryString()
+ && v.getTrimType() == IValueMeta.TRIM_TYPE_NONE
+ && v.getLength() < 0
+ && StringUtils.isEmpty(v.getStringEncoding())) {
+ return (byte[]) valueData;
+ } else {
+ String svalue = (valueData instanceof String) ? (String) valueData : v.getString(valueData);
+
+ // trim or cut to size if needed.
+ //
+ return convertStringToBinaryString(v, Const.trimToType(svalue, v.getTrimType()));
+ }
+ } else {
+ return v.getBinaryString(valueData);
+ }
+ }
+
+ /**
+ * Converts an input string to the bytes for the string
+ *
+ * @param v The metadata about the column
+ * @param string The column data
+ * @return The bytes for the value
+ * @throws HopValueException
+ */
+ private byte[] convertStringToBinaryString(IValueMeta v, String string) {
+ int length = v.getLength();
+
+ if (string == null) {
+ return new byte[] {};
+ }
+
+ if (length > -1 && length < string.length()) {
+ // we need to truncate
+ String tmp = string.substring(0, length);
+ return tmp.getBytes(StandardCharsets.UTF_8);
+
+ } else {
+ byte[] text;
+ text = string.getBytes(StandardCharsets.UTF_8);
+
+ if (length > string.length()) {
+ // we need to pad this
+
+ int size = 0;
+ byte[] filler;
+ filler = " ".getBytes(StandardCharsets.UTF_8);
+ size = text.length + filler.length * (length - string.length());
+
+ byte[] bytes = new byte[size];
+ System.arraycopy(text, 0, bytes, 0, text.length);
+ if (filler.length == 1) {
+ java.util.Arrays.fill(bytes, text.length, size, filler[0]);
+ } else {
+ int currIndex = text.length;
+ for (int i = 0; i < (length - string.length()); i++) {
+ for (byte aFiller : filler) {
+ bytes[currIndex++] = aFiller;
+ }
+ }
+ }
+ return bytes;
+ } else {
+ // do not need to pad or truncate
+ return text;
+ }
+ }
+ }
+
+ /**
+ * Check if a string contains separators or enclosures. Can be used to determine if the string
+ * needs enclosures around it or not.
+ *
+ * @param source The string to check
+ * @param separator The separator character(s)
+ * @param enclosure The enclosure character(s)
+ * @param escape The escape character(s)
+ * @return True if the string contains separators or enclosures
+ */
+ @SuppressWarnings("Duplicates")
+ private boolean containsSeparatorOrEnclosure(
+ byte[] source, byte[] separator, byte[] enclosure, byte[] escape) {
+ boolean result = false;
+
+ boolean enclosureExists = enclosure != null && enclosure.length > 0;
+ boolean separatorExists = separator != null && separator.length > 0;
+ boolean escapeExists = escape != null && escape.length > 0;
+
+ // Skip entire test if neither separator nor enclosure exist
+ if (separatorExists || enclosureExists || escapeExists) {
+
+ // Search for the first occurrence of the separator or enclosure
+ for (int index = 0; !result && index < source.length; index++) {
+ if (enclosureExists && source[index] == enclosure[0]) {
+
+ // Potential match found, make sure there are enough bytes to support a full match
+ if (index + enclosure.length <= source.length) {
+ // First byte of enclosure found
+ result = true; // Assume match
+ for (int i = 1; i < enclosure.length; i++) {
+ if (source[index + i] != enclosure[i]) {
+ // Enclosure match is proven false
+ result = false;
+ break;
+ }
+ }
+ }
+
+ } else if (separatorExists && source[index] == separator[0]) {
+
+ // Potential match found, make sure there are enough bytes to support a full match
+ if (index + separator.length <= source.length) {
+ // First byte of separator found
+ result = true; // Assume match
+ for (int i = 1; i < separator.length; i++) {
+ if (source[index + i] != separator[i]) {
+ // Separator match is proven false
+ result = false;
+ break;
+ }
+ }
+ }
+
+ } else if (escapeExists && source[index] == escape[0]) {
+
+ // Potential match found, make sure there are enough bytes to support a full match
+ if (index + escape.length <= source.length) {
+ // First byte of separator found
+ result = true; // Assume match
+ for (int i = 1; i < escape.length; i++) {
+ if (source[index + i] != escape[i]) {
+ // Separator match is proven false
+ result = false;
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Gets the positions of any double quotes or backslashes in the string
+ *
+ * @param str The string to check
+ * @return The positions within the string of double quotes and backslashes.
+ */
+ private List getEnclosurePositions(byte[] str) {
+ List positions = null;
+ // +1 because otherwise we will not find it at the end
+ for (int i = 0, len = str.length; i < len; i++) {
+ // verify if on position i there is an enclosure
+ //
+ boolean found = true;
+ for (int x = 0; found && x < data.binaryEnclosure.length; x++) {
+ if (str[i + x] != data.binaryEnclosure[x]) {
+ found = false;
+ }
+ }
+
+ if (!found) {
+ found = true;
+ for (int x = 0; found && x < data.escapeCharacters.length; x++) {
+ if (str[i + x] != data.escapeCharacters[x]) {
+ found = false;
+ }
+ }
+ }
+
+ if (found) {
+ if (positions == null) {
+ positions = new ArrayList<>();
+ }
+ positions.add(i);
+ }
+ }
+ return positions;
+ }
+
+ @Override
+ public void stopRunning() throws HopException {
+ setStopped(true);
+ if (data.workerThread != null) {
+ synchronized (data.workerThread) {
+ if (data.workerThread.isAlive() && !data.workerThread.isInterrupted()) {
+ try {
+ data.workerThread.interrupt();
+ data.workerThread.join();
+ } catch (InterruptedException e) { // Checkstyle:OFF:
+ }
+ // Checkstyle:ONN:
+ }
+ }
+ }
+
+ super.stopRunning();
+ }
+
+ void truncateTable() throws HopDatabaseException {
+ if (meta.isTruncateTable() && ((getCopy() == 0) || !Utils.isEmpty(getPartitionId()))) {
+ data.db.truncateTable(resolve(meta.getSchemaName()), resolve(meta.getTableName()));
+ }
+ }
+
+ @Override
+ public void dispose() {
+
+ setOutputDone();
+
+ try {
+ if (getErrors() > 0) {
+ data.db.rollback();
+ }
+ } catch (HopDatabaseException e) {
+ logError("Unexpected error rolling back the database connection.", e);
+ }
+
+ if (data.workerThread != null) {
+ try {
+ data.workerThread.join();
+ } catch (InterruptedException e) { // Checkstyle:OFF:
+ }
+ // Checkstyle:ONN:
+ }
+
+ if (data.db != null) {
+ data.db.disconnect();
+ }
+ super.dispose();
+ }
+}
diff --git a/plugins/transforms/cratedbbulkloader/src/main/java/org/apache/hop/pipeline/transforms/cratedbbulkloader/CrateDBBulkLoaderData.java b/plugins/transforms/cratedbbulkloader/src/main/java/org/apache/hop/pipeline/transforms/cratedbbulkloader/CrateDBBulkLoaderData.java
new file mode 100644
index 00000000000..7328188eca1
--- /dev/null
+++ b/plugins/transforms/cratedbbulkloader/src/main/java/org/apache/hop/pipeline/transforms/cratedbbulkloader/CrateDBBulkLoaderData.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.pipeline.transforms.cratedbbulkloader;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.hop.core.database.Database;
+import org.apache.hop.core.database.DatabaseMeta;
+import org.apache.hop.core.row.IRowMeta;
+import org.apache.hop.pipeline.transform.BaseTransformData;
+import org.apache.hop.pipeline.transform.ITransformData;
+
+public class CrateDBBulkLoaderData extends BaseTransformData implements ITransformData {
+ protected Database db;
+ protected DatabaseMeta databaseMeta;
+
+ protected int[] selectedRowFieldIndices;
+
+ protected IRowMeta outputRowMeta;
+ protected IRowMeta insertRowMeta;
+ protected IRowMeta convertedRowMeta;
+
+ protected boolean convertedRowMetaReady = false;
+
+ // A list of table fields mapped to their data type. String[0] is the field name, String[1] is
+ // the CrateDB
+ // data type
+ public ArrayList dbFields;
+
+ // Maps table fields to the location of the corresponding field on the input stream.
+ public Map fieldnrs;
+
+ protected OutputStream writer;
+
+ protected List