Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KSQL-12911 | Transform to Process Migration (2nd PR) #10628

Merged
merged 47 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
87aa49e
Use assertEquals instead
VedarthConfluent Dec 6, 2024
b4ec319
Fix assert statement
VedarthConfluent Dec 6, 2024
c10bc81
Fix asser equals
VedarthConfluent Dec 6, 2024
dbeaef1
Fix tests
VedarthConfluent Dec 9, 2024
ecd9e6a
Fixes a test
VedarthConfluent Dec 9, 2024
600b73d
Revert test updates
VedarthConfluent Dec 11, 2024
dd790e6
Encode the test url
VedarthConfluent Dec 13, 2024
700a0d6
Revert url encoding
VedarthConfluent Dec 13, 2024
0d4da08
fix: replaced transformer with processor for sinkbuilder
hrishabhg Dec 19, 2024
788a093
refactor: renamed vars
hrishabhg Dec 19, 2024
31e0b56
refactor: reverted changes
hrishabhg Dec 19, 2024
2ed9f5c
refactor: renamed parameter with better name
hrishabhg Dec 20, 2024
71dba6d
fix: replaced ksqltransformer with ksqlprocessor
hrishabhg Dec 20, 2024
fe41cff
fix: replaced ksqltransformer with ksqlprocessor
hrishabhg Dec 20, 2024
525f2f3
fix: kstream transformer replaced with processor
hrishabhg Dec 22, 2024
adca69f
fix: fixed tests
hrishabhg Dec 23, 2024
15687d3
fix: fixed tests for streambuilder
hrishabhg Dec 23, 2024
fdec55e
fix: fixed tests for streambuilder
hrishabhg Dec 23, 2024
e634d5f
fix: fixed test
hrishabhg Dec 23, 2024
4c17c6f
refactor: simplified ksprocessor
hrishabhg Dec 24, 2024
f16b882
refactor: interface implementation simplified
hrishabhg Dec 24, 2024
04a5e31
refactor: renamed transform with process
hrishabhg Dec 26, 2024
afdf679
refactor: renamed with kstream convention
hrishabhg Dec 26, 2024
fd05218
fix: added test for ksprocessor and ksfixedkeyprocessor
hrishabhg Dec 26, 2024
08fb0d2
fix: updated the fakestream impl
hrishabhg Dec 26, 2024
8515e5c
fix: updated stream topology collection in test
hrishabhg Dec 26, 2024
72f9419
fix: fixed tests
hrishabhg Dec 26, 2024
9f21f2a
fix: ignore postAggregationMapper Tes
hrishabhg Dec 27, 2024
b35ee9c
fix: fixed pre-agg test
hrishabhg Dec 27, 2024
d225a84
fix: fixed pre-agg test
hrishabhg Dec 27, 2024
fa8d6df
fix: checkstyle for unused import
hrishabhg Dec 27, 2024
6c4c4ff
attempt to fix backward incompatibility
mjsax Dec 28, 2024
4ca4f40
fix: burn index by peek
hrishabhg Dec 30, 2024
8ed2f1a
fix: checkstyle-removed unused imports
hrishabhg Dec 30, 2024
88f5c83
fix: test
hrishabhg Dec 30, 2024
9ef8c4a
fix: test
hrishabhg Dec 30, 2024
4cbeb83
fix: checkstyle fix
hrishabhg Jan 2, 2025
d086480
fix: fixed historical plan tests
hrishabhg Jan 2, 2025
fff5571
fix: reverted semaphore
hrishabhg Jan 2, 2025
06523fe
fix: fixed the error message check
hrishabhg Jan 2, 2025
1757a55
fix: updated historical plans version
hrishabhg Jan 3, 2025
7f7c9bf
Merge branch 'master' into master-build-failure-18122024
hrishabhg Jan 3, 2025
a3033d6
fix: removed ksqlprocessingctx usage
hrishabhg Jan 3, 2025
43344d5
fix: removed rowtime checks
hrishabhg Jan 3, 2025
24655f6
refactor: refactored for consistency
hrishabhg Jan 7, 2025
ecbbdaa
Merge branch 'master' into transformer-migration-2
hrishabhg Jan 7, 2025
6810fef
refactor: imported namespace FixedKeyProcessorContext
hrishabhg Jan 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.confluent.ksql.execution.common.QueryRow;
import io.confluent.ksql.execution.common.QueryRowImpl;
import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.execution.streams.materialization.PullProcessingContext;
import io.confluent.ksql.execution.transform.KsqlTransformer;
import io.confluent.ksql.execution.transform.select.SelectValueMapper;
import io.confluent.ksql.execution.transform.select.SelectValueMapperFactory;
Expand Down Expand Up @@ -107,9 +106,7 @@ public Object next() {

final GenericRow mapped = transformer.transform(
row.key(),
intermediate,
new PullProcessingContext(row.rowTime())
);
intermediate);
validateProjection(mapped, logicalNode.getSchema());

return QueryRowImpl.of(logicalNode.getSchema(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.confluent.ksql.execution.common.QueryRow;
import io.confluent.ksql.execution.common.QueryRowImpl;
import io.confluent.ksql.execution.streams.SqlPredicateFactory;
import io.confluent.ksql.execution.streams.materialization.PullProcessingContext;
import io.confluent.ksql.execution.transform.KsqlTransformer;
import io.confluent.ksql.execution.transform.sqlpredicate.SqlPredicate;
import io.confluent.ksql.logging.processing.ProcessingLogger;
Expand Down Expand Up @@ -84,10 +83,7 @@ public Object next() {
private Optional<QueryRow> transformRow(final QueryRow queryRow) {
final GenericRow intermediate = PhysicalOperatorUtil.getIntermediateRow(
queryRow, logicalNode.getAddAdditionalColumnsToIntermediateSchema());
return transformer.transform(
queryRow.key(),
intermediate,
new PullProcessingContext(queryRow.rowTime()))
return transformer.transform(queryRow.key(), intermediate)
.map(r -> QueryRowImpl.of(
logicalNode.getIntermediateSchema(),
queryRow.key(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
import io.confluent.ksql.GenericKey;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.Window;
import io.confluent.ksql.execution.common.operators.AbstractPhysicalOperator;
import io.confluent.ksql.execution.common.operators.ProjectOperator;
import io.confluent.ksql.execution.streams.materialization.PullProcessingContext;
import io.confluent.ksql.execution.transform.KsqlTransformer;
import io.confluent.ksql.execution.transform.select.SelectValueMapper;
import io.confluent.ksql.execution.transform.select.SelectValueMapperFactory.SelectValueMapperFactorySupplier;
Expand Down Expand Up @@ -122,7 +119,7 @@ public void shouldProjectAllColumnsWhenSelectStarNonWindowed() {
when(selectValueMapperFactorySupplier.create(any(), any()))
.thenReturn(selectValueMapper);
when(selectValueMapper.getTransformer(logger)).thenReturn(transformer);
when(transformer.transform(A_KEY, row.value(), new PullProcessingContext(12335L)))
when(transformer.transform(A_KEY, row.value()))
.thenReturn(GenericRow.genericRow("k", "a", "b"));
projectOperator.open();

Expand Down Expand Up @@ -156,7 +153,7 @@ public void shouldProjectAllColumnsWhenSelectStarWindowed() {
when(selectValueMapperFactorySupplier.create(any(), any()))
.thenReturn(selectValueMapper);
when(selectValueMapper.getTransformer(logger)).thenReturn(transformer);
when(transformer.transform(A_KEY, windowedRow.value(), new PullProcessingContext(A_ROWTIME)))
when(transformer.transform(A_KEY, windowedRow.value()))
.thenReturn(GenericRow.genericRow("k", A_WINDOW.start().toEpochMilli(), A_WINDOW.end().toEpochMilli(), "a", "b"));
projectOperator.open();

Expand Down Expand Up @@ -194,15 +191,15 @@ public void shouldCallTransformWithCorrectArguments() {
when(selectValueMapperFactorySupplier.create(any(), any()))
.thenReturn(selectValueMapper);
when(selectValueMapper.getTransformer(logger)).thenReturn(transformer);
when(transformer.transform(any(), any(), any())).thenReturn(GenericRow.genericRow("k", "a", "b"));
when(transformer.transform(any(), any())).thenReturn(GenericRow.genericRow("k", "a", "b"));
projectOperator.open();

// When:
projectOperator.next();

// Then:
verify(transformer).transform(
A_KEY, GenericRow.genericRow("a", "b", 12335L, "k", 12335L, "k"), new PullProcessingContext(12335L));
A_KEY, GenericRow.genericRow("a", "b", 12335L, "k", 12335L, "k"));
}

@Test
Expand All @@ -227,7 +224,7 @@ public void shouldCallTransformWithCorrectArgumentsWindowed() {
when(selectValueMapperFactorySupplier.create(any(), any()))
.thenReturn(selectValueMapper);
when(selectValueMapper.getTransformer(logger)).thenReturn(transformer);
when(transformer.transform(any(), any(), any())).thenReturn(GenericRow.genericRow("k", "a", "b"));
when(transformer.transform(any(), any())).thenReturn(GenericRow.genericRow("k", "a", "b"));
projectOperator.open();

// When:
Expand All @@ -237,8 +234,8 @@ public void shouldCallTransformWithCorrectArgumentsWindowed() {
verify(transformer).transform(
A_KEY,
GenericRow.genericRow("a", "b", 12335L, "k", A_WINDOW.start().toEpochMilli(), A_WINDOW.end().toEpochMilli(),
12335L, "k", A_WINDOW.start().toEpochMilli(), A_WINDOW.end().toEpochMilli()),
new PullProcessingContext(12335L));
12335L, "k", A_WINDOW.start().toEpochMilli(), A_WINDOW.end().toEpochMilli())
);
}

@Test
Expand Down Expand Up @@ -266,7 +263,7 @@ public void shouldProjectOnlyKeyNonWindowed() {
when(selectValueMapperFactorySupplier.create(any(), any()))
.thenReturn(selectValueMapper);
when(selectValueMapper.getTransformer(logger)).thenReturn(transformer);
when(transformer.transform(A_KEY, row.value(), new PullProcessingContext(12335L)))
when(transformer.transform(A_KEY, row.value()))
.thenReturn(GenericRow.genericRow("k"));
projectOperator.open();

Expand Down Expand Up @@ -302,7 +299,7 @@ public void shouldProjectOnlyValueNonWindowed() {
when(selectValueMapperFactorySupplier.create(any(), any()))
.thenReturn(selectValueMapper);
when(selectValueMapper.getTransformer(logger)).thenReturn(transformer);
when(transformer.transform(A_KEY, row.value(), new PullProcessingContext(12335L)))
when(transformer.transform(A_KEY, row.value()))
.thenReturn(GenericRow.genericRow("b"));
projectOperator.open();

Expand Down Expand Up @@ -338,7 +335,7 @@ public void shouldProjectOnlyWindowStartWindowed() {
when(selectValueMapperFactorySupplier.create(any(), any()))
.thenReturn(selectValueMapper);
when(selectValueMapper.getTransformer(logger)).thenReturn(transformer);
when(transformer.transform(A_KEY, windowedRow.value(), new PullProcessingContext(12335L)))
when(transformer.transform(A_KEY, windowedRow.value()))
.thenReturn(GenericRow.genericRow(A_WINDOW.start().toEpochMilli()));
projectOperator.open();

Expand Down Expand Up @@ -376,7 +373,7 @@ public void shouldProjectKeyAndValueNonWindowed() {
when(selectValueMapperFactorySupplier.create(any(), any()))
.thenReturn(selectValueMapper);
when(selectValueMapper.getTransformer(logger)).thenReturn(transformer);
when(transformer.transform(A_KEY, row.value(), new PullProcessingContext(12335L)))
when(transformer.transform(A_KEY, row.value()))
.thenReturn(GenericRow.genericRow("k","b"));
projectOperator.open();

Expand Down Expand Up @@ -417,8 +414,8 @@ public void shouldProjectKeyAndValueWindowed() {
when(selectValueMapper.getTransformer(logger)).thenReturn(transformer);
when(transformer.transform(
A_KEY,
windowedRow.value(),
new PullProcessingContext(12335L)))
windowedRow.value()
))
.thenReturn(GenericRow.genericRow("k", "b"));
projectOperator.open();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@

import io.confluent.ksql.GenericKey;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.common.operators.AbstractPhysicalOperator;
import io.confluent.ksql.execution.common.operators.SelectOperator;
import io.confluent.ksql.execution.streams.SqlPredicateFactory;
import io.confluent.ksql.execution.streams.materialization.PullProcessingContext;
import io.confluent.ksql.Window;
import io.confluent.ksql.execution.transform.KsqlTransformer;
import io.confluent.ksql.execution.transform.sqlpredicate.SqlPredicate;
Expand Down Expand Up @@ -120,7 +117,7 @@ public void shouldSelectKeyNonWindowed() {
GenericRow.genericRow("a", "b", A_ROWTIME, "k"),
A_ROWTIME
);
when(transformer.transform(A_KEY, intermediateRow.value(), new PullProcessingContext(12335L)))
when(transformer.transform(A_KEY, intermediateRow.value()))
.thenReturn(Optional.of(GenericRow.genericRow("a", "b", A_ROWTIME, "k")));
selectOperator.open();

Expand Down Expand Up @@ -158,7 +155,7 @@ public void shouldSelectKeyWindowed() {
GenericRow.genericRow("a", "b", A_ROWTIME, "k", A_WINDOW.start().toEpochMilli(), A_WINDOW.end().toEpochMilli()),
A_ROWTIME
);
when(transformer.transform(A_KEY, intermediateWindowedRow.value(), new PullProcessingContext(12335L)))
when(transformer.transform(A_KEY, intermediateWindowedRow.value()))
.thenReturn(Optional.of(GenericRow.genericRow("a", "b", A_ROWTIME, "k", A_WINDOW.start().toEpochMilli(), A_WINDOW.end().toEpochMilli())));
selectOperator.open();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ public void shouldSelectChosenColumns() {
// When:
final GenericRow transformed = selectTransformer.transform(
NON_WINDOWED_KEY,
genericRow("hi", "bye", 2.0D, "blah", "dar", ImmutableList.of(), 1521834663L, 0, 0L, 1L),
ctx
genericRow("hi", "bye", 2.0D, "blah", "dar", ImmutableList.of(), 1521834663L, 0, 0L, 1L)
);

// Then:
Expand All @@ -90,8 +89,7 @@ public void shouldApplyUdfsToColumns() {
// When:
final GenericRow row = selectTransformer.transform(
NON_WINDOWED_KEY,
genericRow("foo", "whatever", 6.9D, "boo", "hoo", 0, 0L, ImmutableList.of(), 1521834663L, 2L),
ctx
genericRow("foo", "whatever", 6.9D, "boo", "hoo", 0, 0L, ImmutableList.of(), 1521834663L, 2L)
);

// Then:
Expand Down
Loading