From b2bc1f4568e3ce09d2e67e2074b8a15b53d1fcf0 Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Sat, 8 May 2021 15:40:30 +0800 Subject: [PATCH] [FLINK-22559][table-planner] The consumed DataType of ExecSink should only consider physical columns --- .../connectors/kafka/table/UpsertKafkaTableITCase.java | 2 -- .../table/planner/plan/nodes/exec/common/CommonExecSink.java | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java index 1a26568b2db5d..4f9d4853fafa5 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java @@ -23,7 +23,6 @@ import org.apache.flink.table.utils.LegacyRowResource; import org.apache.flink.types.Row; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -198,7 +197,6 @@ public void testSourceSinkWithKeyAndPartialValue() throws Exception { } @Test - @Ignore // FLINK-22559 public void testKafkaSourceSinkWithKeyAndFullValue() throws Exception { // we always use a different topic name for each parameterized topic, // in order to make sure the topic can be created. diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java index 90a096048f4d3..0fbc6a719d22d 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java @@ -319,6 +319,6 @@ private InternalTypeInfo getInputTypeInfo() { } private RowType getConsumedRowType(ResolvedSchema schema) { - return (RowType) schema.toSinkRowDataType().getLogicalType(); + return (RowType) schema.toPhysicalRowDataType().getLogicalType(); } }