Skip to content

Commit

Permalink
Refactor code to avoid missing check (#21046)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp authored Jan 5, 2023
1 parent 94513f0 commit f846678
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.features;

import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class FeatureFlagHelper {

public static boolean isFieldSelectionEnabledForWorkspace(final FeatureFlags featureFlags, final UUID workspaceId) {
final String workspaceIdsString = featureFlags.fieldSelectionWorkspaces();
final Set<UUID> workspaceIds = new HashSet<>();
if (workspaceIdsString != null && !workspaceIdsString.isEmpty()) {
for (final String id : workspaceIdsString.split(",")) {
try {
workspaceIds.add(UUID.fromString(id));
} catch (final IllegalArgumentException e) {
log.warn("Malformed workspace id for field selection: {}", id);
}
}
}
if (workspaceId != null && workspaceIds.contains(workspaceId)) {
return true;
}

return featureFlags.applyFieldSelection();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.features;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class FeatureFlagHelperTest {

FeatureFlags featureFlags;

@BeforeEach
void beforeEach() {
featureFlags = mock(FeatureFlags.class);
}

@Test
void isFieldSelectionEnabledForWorkspaceWithEmptyString() {
when(featureFlags.fieldSelectionWorkspaces()).thenReturn("");

assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID()));
}

@Test
void isFieldSelectionEnabledForWorkspaceWithSpaceString() {
when(featureFlags.fieldSelectionWorkspaces()).thenReturn(" ");

assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID()));
}

@Test
void isFieldSelectionEnabledForWorkspaceWithNullString() {
when(featureFlags.fieldSelectionWorkspaces()).thenReturn(null);

assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID()));
}

@Test
void isFieldSelectionEnabledForWorkspaceWithSomeIdsAndAMatch() {
final UUID workspaceId = UUID.randomUUID();
final UUID randomId = UUID.randomUUID();
when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId.toString() + "," + workspaceId.toString());

assertTrue(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, workspaceId));
}

@Test
void isFieldSelectionEnabledForWorkspaceWithSomeIdsAndNoMatch() {
final UUID workspaceId = UUID.randomUUID();
final UUID randomId1 = UUID.randomUUID();
final UUID randomId2 = UUID.randomUUID();
when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId1.toString() + "," + randomId2.toString());

assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, workspaceId));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.SOURCE_DOCKER_IMAGE_KEY;

import datadog.trace.api.Trace;
import io.airbyte.commons.features.FeatureFlagHelper;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.MdcScope;
Expand Down Expand Up @@ -45,11 +46,8 @@
import io.airbyte.workers.sync.ReplicationLauncherWorker;
import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -151,7 +149,7 @@ public Optional<String> runJob() throws Exception {
new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion())),
new AirbyteMessageTracker(),
new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)),
metricReporter, enableFieldSelection(featureFlags, syncInput.getWorkspaceId()));
metricReporter, FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, syncInput.getWorkspaceId()));

log.info("Running replication worker...");
final var jobRoot = TemporalUtils.getJobRoot(configs.getWorkspaceRoot(),
Expand All @@ -168,20 +166,4 @@ private AirbyteStreamFactory getStreamFactory(final Version protocolVersion, fin
: new DefaultAirbyteStreamFactory(mdcScope);
}

private boolean enableFieldSelection(final FeatureFlags featureFlags, final UUID workspaceId) {
final String workspaceIdsString = featureFlags.fieldSelectionWorkspaces();
final Set<UUID> workspaceIds = new HashSet<>();
for (final String id : workspaceIdsString.split(",")) {
workspaceIds.add(UUID.fromString(id));
}
for (final UUID workspace : workspaceIds) {
log.info("field selection workspace: {}", workspace);
}
if (workspaceId != null && workspaceIds.contains(workspaceId)) {
return true;
}

return featureFlags.applyFieldSelection();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.JobIdRequestBody;
import io.airbyte.commons.features.FeatureFlagHelper;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.functional.CheckedSupplier;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -72,10 +73,8 @@
import jakarta.inject.Singleton;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -306,7 +305,7 @@ private CheckedSupplier<Worker<StandardSyncInput, ReplicationOutput>, Exception>
new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion())),
new AirbyteMessageTracker(),
new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)),
metricReporter, enableFieldSelection(featureFlags, syncInput.getWorkspaceId()));
metricReporter, FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, syncInput.getWorkspaceId()));
};
}

Expand Down Expand Up @@ -344,24 +343,4 @@ private boolean isResetJob(final String dockerImage) {
return WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB.equalsIgnoreCase(dockerImage);
}

private boolean enableFieldSelection(final FeatureFlags featureFlags, final UUID workspaceId) {
final String workspaceIdsString = featureFlags.fieldSelectionWorkspaces();
final Set<UUID> workspaceIds = new HashSet<>();
LOGGER.debug("Field selection enabled for {}", workspaceIdsString);
if (!workspaceIdsString.isEmpty()) {
for (final String id : workspaceIdsString.split(",")) {
try {
workspaceIds.add(UUID.fromString(id));
} catch (IllegalArgumentException e) {
LOGGER.warn("Malformed workspace id for field selection: {}", id);
}
}
}
if (workspaceId != null && workspaceIds.contains(workspaceId)) {
return true;
}

return featureFlags.applyFieldSelection();
}

}

0 comments on commit f846678

Please sign in to comment.