Skip to content

Commit

Permalink
fix: transfer type resolution on dp self registration
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelmag110 committed Jan 9, 2025
1 parent 64a6d8e commit 2b7df74
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class TransferTypeParserImpl implements TransferTypeParser {

/**
* Parses a compose transfer type string into a {@link TransferType}:
* {@code DESTTYPE-{PUSH|PULL}(-RESPONSETYPE)}, for example {@code HttpData-PULL/Websocket}
* {@code DESTTYPE-{PUSH|PULL}(-RESPONSETYPE)}, for example {@code HttpData-PULL-Websocket}
*
* @param transferType the transfer type string representation.
* @return a {@link TransferType}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void shutdown() {

private @NotNull Stream<String> toTransferTypes(FlowType pull, Set<String> types, Set<String> responseTypes) {
Stream<String> transferTypes = types.stream().map(it -> "%s-%s".formatted(it, pull));
return Stream.concat(transferTypes, responseTypes.stream().flatMap(responseType -> types.stream().map(it -> "%s-%s/%s".formatted(it, pull, responseType))));
return Stream.concat(transferTypes, responseTypes.stream().flatMap(responseType -> types.stream().map(it -> "%s-%s-%s".formatted(it, pull, responseType))));
}

private class DataPlaneHealthCheck implements LivenessProvider, ReadinessProvider, StartupStatusProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ void shouldRegisterInstanceAtStartup(DataplaneSelfRegistrationExtension extensio
when(pipelineService.supportedSinkTypes()).thenReturn(Set.of("sinkType", "anotherSinkType"));
when(pipelineService.supportedSourceTypes()).thenReturn(Set.of("sourceType", "anotherSourceType"));
when(publicEndpointGeneratorService.supportedDestinationTypes()).thenReturn(Set.of("pullDestType", "anotherPullDestType"));
when(publicEndpointGeneratorService.supportedResponseTypes()).thenReturn(Set.of("responseType", "anotherResponseType"));
when(dataPlaneSelectorService.addInstance(any())).thenReturn(ServiceResult.success());

extension.initialize(context);
Expand All @@ -88,7 +89,18 @@ void shouldRegisterInstanceAtStartup(DataplaneSelfRegistrationExtension extensio
assertThat(dataPlaneInstance.getAllowedSourceTypes()).containsExactlyInAnyOrder("sourceType", "anotherSourceType");
assertThat(dataPlaneInstance.getAllowedDestTypes()).containsExactlyInAnyOrder("sinkType", "anotherSinkType");
assertThat(dataPlaneInstance.getAllowedTransferTypes())
.containsExactlyInAnyOrder("pullDestType-PULL", "anotherPullDestType-PULL", "sinkType-PUSH", "anotherSinkType-PUSH");
.containsExactlyInAnyOrder("anotherPullDestType-PULL-anotherResponseType",
"anotherSinkType-PUSH-anotherResponseType",
"anotherPullDestType-PULL",
"anotherSinkType-PUSH-responseType",
"anotherSinkType-PUSH",
"pullDestType-PULL",
"anotherPullDestType-PULL-responseType",
"pullDestType-PULL-anotherResponseType",
"sinkType-PUSH-anotherResponseType",
"pullDestType-PULL-responseType",
"sinkType-PUSH-responseType",
"sinkType-PUSH");

verify(healthCheckService).addStartupStatusProvider(any());
verify(healthCheckService).addLivenessProvider(any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public String stateAsString() {
public boolean canHandle(DataAddress sourceAddress, @Nullable String transferType) {
Objects.requireNonNull(sourceAddress, "source cannot be null!");
Objects.requireNonNull(transferType, "transferType cannot be null!");
// startsWith: the allowed transferType could be HttpData-PULL/someResponseChannel, and we only need to match the HttpData-PULL
// startsWith: the allowed transferType could be HttpData-PULL-someResponseChannel, and we only need to match the HttpData-PULL
return allowedSourceTypes.contains(sourceAddress.getType()) && allowedTransferTypes.contains(transferType);
}

Expand Down

0 comments on commit 2b7df74

Please sign in to comment.