diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala index b9453f5d0f06c..509fb27983990 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala @@ -86,43 +86,49 @@ object WindowJoinUtil { windowEndEqualityRightKeys) = excludeWindowStartEqualityAndEndEqualityFromJoinInfoPairs(join) - val joinInfo = join.analyzeCondition() + val joinSpec = JoinUtil.createJoinSpec(join) val (remainLeftKeys, remainRightKeys, remainCondition) = if ( windowStartEqualityLeftKeys.nonEmpty || windowEndEqualityLeftKeys.nonEmpty) { val leftChildFieldsType = join.getLeft.getRowType.getFieldList val rightChildFieldsType = join.getRight.getRowType.getFieldList val leftFieldCnt = join.getLeft.getRowType.getFieldCount val rexBuilder = join.getCluster.getRexBuilder - val remainEquals = mutable.ArrayBuffer[RexNode]() + val remainingConditions = mutable.ArrayBuffer[RexNode]() val remainLeftKeysArray = mutable.ArrayBuffer[Int]() val remainRightKeysArray = mutable.ArrayBuffer[Int]() // convert remain pairs to RexInputRef tuple for building SqlStdOperatorTable.EQUALS calls // or SqlStdOperatorTable.IS_NOT_DISTINCT_FROM - joinInfo.pairs().foreach { p => - if (!windowStartEqualityLeftKeys.contains(p.source) && - !windowEndEqualityLeftKeys.contains(p.source)) { - val leftFieldType = leftChildFieldsType.get(p.source).getType - val leftInputRef = new RexInputRef(p.source, leftFieldType) - val rightFieldType = rightChildFieldsType.get(p.target).getType - val rightIndex = leftFieldCnt + p.target + joinSpec.getLeftKeys.zip(joinSpec.getRightKeys). + zip(joinSpec.getFilterNulls).foreach { case ((source, target), filterNull) => + if (!windowStartEqualityLeftKeys.contains(source) && + !windowEndEqualityLeftKeys.contains(source)) { + val leftFieldType = leftChildFieldsType.get(source).getType + val leftInputRef = new RexInputRef(source, leftFieldType) + val rightFieldType = rightChildFieldsType.get(target).getType + val rightIndex = leftFieldCnt + target val rightInputRef = new RexInputRef(rightIndex, rightFieldType) - val remainEqual = rexBuilder.makeCall( - SqlStdOperatorTable.EQUALS, - leftInputRef, - rightInputRef) - remainEquals.add(remainEqual) - remainLeftKeysArray.add(p.source) - remainRightKeysArray.add(p.target) + val op = if (filterNull) { + SqlStdOperatorTable.EQUALS + } else { + SqlStdOperatorTable.IS_NOT_DISTINCT_FROM + } + val remainEqual = rexBuilder.makeCall(op, leftInputRef, rightInputRef) + remainingConditions += remainEqual + remainLeftKeysArray += source + remainRightKeysArray += target } } - val remainAnds = remainEquals ++ joinInfo.nonEquiConditions + val notEquiCondition = joinSpec.getNonEquiCondition + if (notEquiCondition.isPresent) { + remainingConditions += notEquiCondition.get() + } ( remainLeftKeysArray.toArray, remainRightKeysArray.toArray, // build a new condition - RexUtil.composeConjunction(rexBuilder, remainAnds.toList)) + RexUtil.composeConjunction(rexBuilder, remainingConditions.toList)) } else { - (joinInfo.leftKeys.toIntArray, joinInfo.rightKeys.toIntArray, join.getCondition) + (joinSpec.getLeftKeys, joinSpec.getRightKeys, join.getCondition) } ( diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml index 0210a61b80d14..e0839143364e0 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml @@ -537,7 +537,7 @@ LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], cnt