diff --git a/release-note.md b/release-note.md index 5c6d5738a86..8e29b08365e 100644 --- a/release-note.md +++ b/release-note.md @@ -125,6 +125,7 @@ - [Connector-v2] [Neo4j] Supports neo4j sink batch write mode (#4835) - [Transform-V2] Optimize SQL Transform package and Fix Spark type conversion bug of transform (#4490) - [Connector-V2] [Common] Remove assert key word (#5915) +- [Connector-V2] Replace CommonErrorCodeDeprecated.JSON_OPERATION_FAILED. (#5978) ### CI diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java index a62247f3d92..064b5370879 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java @@ -120,14 +120,14 @@ public static SeaTunnelRuntimeException getCatalogTablesWithUnsupportedType( GET_CATALOG_TABLES_WITH_UNSUPPORTED_TYPE_ERROR, params); } - public static SeaTunnelRuntimeException jsonOperationError(String format, String payload) { - return jsonOperationError(format, payload, null); + public static SeaTunnelRuntimeException jsonOperationError(String identifier, String payload) { + return jsonOperationError(identifier, payload, null); } public static SeaTunnelRuntimeException jsonOperationError( - String format, String payload, Throwable cause) { + String identifier, String payload, Throwable cause) { Map params = new HashMap<>(); - params.put("format", format); + params.put("identifier", identifier); params.put("payload", payload); SeaTunnelErrorCode code = JSON_OPERATION_FAILED; diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java index 1fe001c07b2..bbfafa57b24 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java @@ -19,7 +19,8 @@ /** SeaTunnel connector error code interface */ public enum CommonErrorCode implements SeaTunnelErrorCode { - JSON_OPERATION_FAILED("COMMON-02", " JSON convert/parse '' operation failed."), + JSON_OPERATION_FAILED( + "COMMON-02", " JSON convert/parse '' operation failed."), UNSUPPORTED_DATA_TYPE( "COMMON-07", "'' unsupported data type '' of ''"), diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCodeDeprecated.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCodeDeprecated.java index 9d091db89fd..d7f5507aed5 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCodeDeprecated.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCodeDeprecated.java @@ -26,7 +26,6 @@ public enum CommonErrorCodeDeprecated implements SeaTunnelErrorCode { FILE_OPERATION_FAILED( "COMMON-01", "File operation failed, such as (read,list,write,move,copy,sync) etc..."), - JSON_OPERATION_FAILED("COMMON-02", "Json covert/parse operation failed"), REFLECT_CLASS_OPERATION_FAILED("COMMON-03", "Reflect class operation failed"), SERIALIZE_OPERATION_FAILED("COMMON-04", "Serialize class operation failed"), UNSUPPORTED_OPERATION("COMMON-05", "Unsupported operation"), diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java index f103057816e..5e7c24bac8f 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java @@ -20,8 +20,7 @@ import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException; +import org.apache.seatunnel.common.exception.CommonError; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -51,8 +50,7 @@ public void injectFields(PreparedStatement statement, int index, Object value) statement.setString(index, value.toString()); } } catch (JsonProcessingException e) { - throw new ClickhouseConnectorException( - CommonErrorCodeDeprecated.JSON_OPERATION_FAILED, e.getMessage()); + throw CommonError.jsonOperationError("Clickhouse", value.toString(), e); } } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java index f7f9302fc84..584c373ae74 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonError; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.ElasticsearchClusterInfo; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo; @@ -82,65 +83,82 @@ public String serializeRow(SeaTunnelRow row) { private String serializeUpsert(SeaTunnelRow row) { String key = keyExtractor.apply(row); Map document = toDocumentMap(row); + String documentStr; try { - if (key != null) { - Map upsertMetadata = createMetadata(row, key); - /** - * format example: { "update" : {"_index" : "${your_index}", "_id" : - * "${your_document_id}"} }\n { "doc" : ${your_document_json}, "doc_as_upsert" : - * true } - */ - return new StringBuilder() - .append("{ \"update\" :") - .append(objectMapper.writeValueAsString(upsertMetadata)) - .append("}") - .append("\n") - .append("{ \"doc\" :") - .append(objectMapper.writeValueAsString(document)) - .append(", \"doc_as_upsert\" : true }") - .toString(); - } else { - Map indexMetadata = createMetadata(row); - /** - * format example: { "index" : {"_index" : "${your_index}", "_id" : - * "${your_document_id}"} }\n ${your_document_json} - */ - return new StringBuilder() - .append("{ \"index\" :") - .append(objectMapper.writeValueAsString(indexMetadata)) - .append("}") - .append("\n") - .append(objectMapper.writeValueAsString(document)) - .toString(); + documentStr = objectMapper.writeValueAsString(document); + } catch (JsonProcessingException e) { + throw CommonError.jsonOperationError( + "Elasticsearch", "document:" + document.toString(), e); + } + + if (key != null) { + Map upsertMetadata = createMetadata(row, key); + String upsertMetadataStr; + try { + upsertMetadataStr = objectMapper.writeValueAsString(upsertMetadata); + } catch (JsonProcessingException e) { + throw CommonError.jsonOperationError( + "Elasticsearch", "upsertMetadata:" + upsertMetadata.toString(), e); } + + /** + * format example: { "update" : {"_index" : "${your_index}", "_id" : + * "${your_document_id}"} }\n { "doc" : ${your_document_json}, "doc_as_upsert" : true } + */ + return new StringBuilder() + .append("{ \"update\" :") + .append(upsertMetadataStr) + .append(" }") + .append("\n") + .append("{ \"doc\" :") + .append(documentStr) + .append(", \"doc_as_upsert\" : true }") + .toString(); + } + + Map indexMetadata = createMetadata(row); + String indexMetadataStr; + try { + indexMetadataStr = objectMapper.writeValueAsString(indexMetadata); } catch (JsonProcessingException e) { - throw new ElasticsearchConnectorException( - CommonErrorCodeDeprecated.JSON_OPERATION_FAILED, - "Object json deserialization exception.", - e); + throw CommonError.jsonOperationError( + "Elasticsearch", "indexMetadata:" + indexMetadata.toString(), e); } + + /** + * format example: { "index" : {"_index" : "${your_index}", "_id" : "${your_document_id}"} + * }\n ${your_document_json} + */ + return new StringBuilder() + .append("{ \"index\" :") + .append(indexMetadataStr) + .append(" }") + .append("\n") + .append(documentStr) + .toString(); } private String serializeDelete(SeaTunnelRow row) { String key = keyExtractor.apply(row); Map deleteMetadata = createMetadata(row, key); + String deleteMetadataStr; try { - /** - * format example: { "delete" : {"_index" : "${your_index}", "_id" : - * "${your_document_id}"} } - */ - return new StringBuilder() - .append("{ \"delete\" :") - .append(objectMapper.writeValueAsString(deleteMetadata)) - .append("}") - .toString(); + deleteMetadataStr = objectMapper.writeValueAsString(deleteMetadata); } catch (JsonProcessingException e) { - throw new ElasticsearchConnectorException( - CommonErrorCodeDeprecated.JSON_OPERATION_FAILED, - "Object json deserialization exception.", - e); + throw CommonError.jsonOperationError( + "Elasticsearch", "deleteMetadata:" + deleteMetadata.toString(), e); } + + /** + * format example: { "delete" : {"_index" : "${your_index}", "_id" : "${your_document_id}"} + * } + */ + return new StringBuilder() + .append("{ \"delete\" :") + .append(deleteMetadataStr) + .append(" }") + .toString(); } private Map toDocumentMap(SeaTunnelRow row) { diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java new file mode 100644 index 00000000000..f888a8575f5 --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; + +import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonError; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.ElasticsearchClusterInfo; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE; + +public class ElasticsearchRowSerializerTest { + @Test + public void testSerializeUpsert() { + String index = "st_index"; + String primaryKey = "id"; + Map confMap = new HashMap<>(); + confMap.put(SinkConfig.INDEX.key(), index); + confMap.put(SinkConfig.PRIMARY_KEYS.key(), Arrays.asList(primaryKey)); + + Config pluginConf = ConfigFactory.parseMap(confMap); + ElasticsearchClusterInfo clusterInfo = + ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build(); + IndexInfo indexInfo = new IndexInfo(pluginConf); + SeaTunnelRowType schema = + new SeaTunnelRowType( + new String[] {primaryKey, "name"}, + new SeaTunnelDataType[] {STRING_TYPE, STRING_TYPE}); + + final ElasticsearchRowSerializer serializer = + new ElasticsearchRowSerializer(clusterInfo, indexInfo, schema); + + String id = "0001"; + String name = "jack"; + SeaTunnelRow row = new SeaTunnelRow(new Object[] {id, name}); + row.setRowKind(RowKind.UPDATE_AFTER); + + String expected = + "{ \"update\" :{\"_index\":\"" + + index + + "\",\"_id\":\"" + + id + + "\"} }\n" + + "{ \"doc\" :{\"name\":\"" + + name + + "\",\"id\":\"" + + id + + "\"}, \"doc_as_upsert\" : true }"; + + String upsertStr = serializer.serializeRow(row); + Assertions.assertEquals(expected, upsertStr); + } + + @Test + public void testSerializeUpsertWithoutKey() { + String index = "st_index"; + Map confMap = new HashMap<>(); + confMap.put(SinkConfig.INDEX.key(), index); + + Config pluginConf = ConfigFactory.parseMap(confMap); + ElasticsearchClusterInfo clusterInfo = + ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build(); + IndexInfo indexInfo = new IndexInfo(pluginConf); + SeaTunnelRowType schema = + new SeaTunnelRowType( + new String[] {"id", "name"}, + new SeaTunnelDataType[] {STRING_TYPE, STRING_TYPE}); + + final ElasticsearchRowSerializer serializer = + new ElasticsearchRowSerializer(clusterInfo, indexInfo, schema); + + String id = "0001"; + String name = "jack"; + SeaTunnelRow row = new SeaTunnelRow(new Object[] {id, name}); + row.setRowKind(RowKind.UPDATE_AFTER); + + String expected = + "{ \"index\" :{\"_index\":\"" + + index + + "\"} }\n" + + "{\"name\":\"" + + name + + "\",\"id\":\"" + + id + + "\"}"; + + String upsertStr = serializer.serializeRow(row); + Assertions.assertEquals(expected, upsertStr); + } + + @Test + public void testSerializeUpsertDocumentError() { + String index = "st_index"; + String primaryKey = "id"; + Map confMap = new HashMap<>(); + confMap.put(SinkConfig.INDEX.key(), index); + confMap.put(SinkConfig.PRIMARY_KEYS.key(), Arrays.asList(primaryKey)); + + Config pluginConf = ConfigFactory.parseMap(confMap); + ElasticsearchClusterInfo clusterInfo = + ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build(); + IndexInfo indexInfo = new IndexInfo(pluginConf); + SeaTunnelRowType schema = + new SeaTunnelRowType( + new String[] {primaryKey, "name"}, + new SeaTunnelDataType[] {STRING_TYPE, STRING_TYPE}); + + final ElasticsearchRowSerializer serializer = + new ElasticsearchRowSerializer(clusterInfo, indexInfo, schema); + + String id = "0001"; + Object mockObj = new Object(); + SeaTunnelRow row = new SeaTunnelRow(new Object[] {id, mockObj}); + row.setRowKind(RowKind.UPDATE_AFTER); + + Map expectedMap = new HashMap<>(); + expectedMap.put(primaryKey, id); + expectedMap.put("name", mockObj); + + SeaTunnelRuntimeException expected = + CommonError.jsonOperationError( + "Elasticsearch", "document:" + expectedMap.toString()); + SeaTunnelRuntimeException actual = + Assertions.assertThrows( + SeaTunnelRuntimeException.class, () -> serializer.serializeRow(row)); + Assertions.assertEquals(expected.getMessage(), actual.getMessage()); + } + + @Test + public void testSerializeDelete() { + String index = "st_index"; + String primaryKey = "id"; + Map confMap = new HashMap<>(); + confMap.put(SinkConfig.INDEX.key(), index); + confMap.put(SinkConfig.PRIMARY_KEYS.key(), Arrays.asList(primaryKey)); + + Config pluginConf = ConfigFactory.parseMap(confMap); + ElasticsearchClusterInfo clusterInfo = + ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build(); + IndexInfo indexInfo = new IndexInfo(pluginConf); + SeaTunnelRowType schema = + new SeaTunnelRowType( + new String[] {primaryKey, "name"}, + new SeaTunnelDataType[] {STRING_TYPE, STRING_TYPE}); + + final ElasticsearchRowSerializer serializer = + new ElasticsearchRowSerializer(clusterInfo, indexInfo, schema); + + String id = "0001"; + String name = "jack"; + SeaTunnelRow row = new SeaTunnelRow(new Object[] {id, name}); + row.setRowKind(RowKind.DELETE); + + String expected = "{ \"delete\" :{\"_index\":\"" + index + "\",\"_id\":\"" + id + "\"} }"; + + String upsertStr = serializer.serializeRow(row); + Assertions.assertEquals(expected, upsertStr); + } +} diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java index 2c9559078fd..8bacb1a94d5 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java @@ -26,6 +26,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonError; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig; import org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorException; @@ -67,7 +68,7 @@ private SeaTunnelRow convertRow(FakeConfig.RowData rowData) { seaTunnelRow.setTableId(tableId); return seaTunnelRow; } catch (IOException e) { - throw new FakeConnectorException(CommonErrorCodeDeprecated.JSON_OPERATION_FAILED, e); + throw CommonError.jsonOperationError("Fake", rowData.getFieldsJson(), e); } } diff --git a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/deserialize/GoogleSheetsDeserializer.java b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/deserialize/GoogleSheetsDeserializer.java index 3dc933d2f9c..76cce870c72 100644 --- a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/deserialize/GoogleSheetsDeserializer.java +++ b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/deserialize/GoogleSheetsDeserializer.java @@ -17,12 +17,13 @@ package org.apache.seatunnel.connectors.seatunnel.google.sheets.deserialize; +import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.seatunnel.api.serialization.DeserializationSchema; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; -import org.apache.seatunnel.connectors.seatunnel.google.sheets.exception.GoogleSheetsConnectorException; +import org.apache.seatunnel.common.exception.CommonError; +import org.apache.seatunnel.connectors.seatunnel.google.sheets.exception.GoogleSheetsError; import java.io.IOException; import java.util.HashMap; @@ -43,20 +44,20 @@ public GoogleSheetsDeserializer( @Override public SeaTunnelRow deserializeRow(List row) { - try { - Map map = new HashMap<>(); - for (int i = 0; i < row.size(); i++) { - if (i < fields.length) { - map.put(fields[i], row.get(i)); - } + Map map = new HashMap<>(); + for (int i = 0; i < row.size(); i++) { + if (i < fields.length) { + map.put(fields[i], row.get(i)); } + } + + try { String rowStr = objectMapper.writeValueAsString(map); return deserializationSchema.deserialize(rowStr.getBytes()); + } catch (JsonProcessingException e) { + throw CommonError.jsonOperationError("GoogleSheets", map.toString(), e); } catch (IOException e) { - throw new GoogleSheetsConnectorException( - CommonErrorCodeDeprecated.JSON_OPERATION_FAILED, - "Object json deserialization failed.", - e); + throw GoogleSheetsError.deserializeError(map.toString(), e); } } } diff --git a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/exception/GoogleSheetsError.java b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/exception/GoogleSheetsError.java new file mode 100644 index 00000000000..9decd7de0ee --- /dev/null +++ b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/exception/GoogleSheetsError.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.google.sheets.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +import java.util.HashMap; +import java.util.Map; + +public class GoogleSheetsError { + public static SeaTunnelRuntimeException deserializeError(String payload) { + return deserializeError(payload, null); + } + + public static SeaTunnelRuntimeException deserializeError(String payload, Throwable cause) { + Map params = new HashMap<>(); + params.put("payload", payload); + GoogleSheetsErrorCode code = GoogleSheetsErrorCode.DESERIALIZE_FAILED; + + if (cause != null) { + return new SeaTunnelRuntimeException(code, params, cause); + } else { + return new SeaTunnelRuntimeException(code, params); + } + } +} diff --git a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/exception/GoogleSheetsErrorCode.java b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/exception/GoogleSheetsErrorCode.java new file mode 100644 index 00000000000..e32d2e57250 --- /dev/null +++ b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/exception/GoogleSheetsErrorCode.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.google.sheets.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; + +public enum GoogleSheetsErrorCode implements SeaTunnelErrorCode { + DESERIALIZE_FAILED("GOOGLE-SHEETS-01", "Fail to deserialize Google Sheets ''"); + + private final String code; + private final String description; + + GoogleSheetsErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return code; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/seatunnel-connectors-v2/connector-google-sheets/src/test/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/deserialize/GoogleSheetsDeserializerTest.java b/seatunnel-connectors-v2/connector-google-sheets/src/test/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/deserialize/GoogleSheetsDeserializerTest.java new file mode 100644 index 00000000000..8c81a51c581 --- /dev/null +++ b/seatunnel-connectors-v2/connector-google-sheets/src/test/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/deserialize/GoogleSheetsDeserializerTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.google.sheets.deserialize; + +import org.apache.seatunnel.api.serialization.DeserializationSchema; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonError; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; +import org.apache.seatunnel.format.json.JsonDeserializationSchema; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class GoogleSheetsDeserializerTest { + @Test + public void testJsonParseError() { + SeaTunnelRowType schema = + new SeaTunnelRowType(new String[] {"name"}, new SeaTunnelDataType[] {STRING_TYPE}); + final DeserializationSchema deser = + new JsonDeserializationSchema(false, false, schema); + final GoogleSheetsDeserializer googleSheetsDeser = + new GoogleSheetsDeserializer(schema.getFieldNames(), deser); + List row = new ArrayList<>(); + Object mockObj = new Object(); + row.add(mockObj); + + String expectedPayload = String.format("{name=%s}", mockObj.toString()); + SeaTunnelRuntimeException expected = + CommonError.jsonOperationError("GoogleSheets", expectedPayload); + + SeaTunnelRuntimeException actual = + assertThrows( + SeaTunnelRuntimeException.class, + () -> { + googleSheetsDeser.deserializeRow(row); + }, + "expecting exception message: " + expected.getMessage()); + + assertEquals(expected.getMessage(), actual.getMessage()); + } +} diff --git a/seatunnel-connectors-v2/connector-google-sheets/src/test/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/exception/GoogleSheetsErrorTest.java b/seatunnel-connectors-v2/connector-google-sheets/src/test/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/exception/GoogleSheetsErrorTest.java new file mode 100644 index 00000000000..06c05a773f8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-google-sheets/src/test/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/exception/GoogleSheetsErrorTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.google.sheets.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class GoogleSheetsErrorTest { + @Test + public void testError() { + SeaTunnelRuntimeException error = GoogleSheetsError.deserializeError("{}"); + Assertions.assertEquals( + GoogleSheetsErrorCode.DESERIALIZE_FAILED.getCode(), + error.getSeaTunnelErrorCode().getCode()); + String expectedMsg = + "ErrorCode:[GOOGLE-SHEETS-01], ErrorDescription:[Fail to deserialize Google Sheets '{}']"; + Assertions.assertEquals(expectedMsg, error.getMessage()); + } +}