Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

process config control messages during check and discover #20894

Merged
merged 78 commits into from
Jan 6, 2023
Merged
Show file tree
Hide file tree
Changes from 64 commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
736ac57
track latest config message
pedroslopez Nov 26, 2022
715c170
pass new config as part of outputs
pedroslopez Nov 26, 2022
2df5039
persist new config
pedroslopez Nov 26, 2022
59122d7
persist config as the messages come through, dont set output
pedroslopez Nov 28, 2022
7f5a4e7
clean up old implementation
pedroslopez Nov 28, 2022
2637e62
accept control messages for destinations
pedroslopez Nov 28, 2022
289672d
get api client from micronaut
pedroslopez Nov 28, 2022
9c2bded
mask instance-wide oauth params when updating configs
pedroslopez Nov 30, 2022
1ebed55
defaultreplicationworker tests
pedroslopez Nov 30, 2022
6ec3db5
formatting
pedroslopez Nov 30, 2022
1bad8e9
tests for source/destination handlers
pedroslopez Nov 30, 2022
753dafe
rm todo
pedroslopez Nov 30, 2022
ab8e2fc
refactor test a bit to fix pmd
pedroslopez Nov 30, 2022
de0d638
fix pmd
pedroslopez Nov 30, 2022
d159366
fix test
pedroslopez Nov 30, 2022
0f1c122
add PersistConfigHelperTest
pedroslopez Nov 30, 2022
519b920
update message tracker comment
pedroslopez Nov 30, 2022
4ef8b91
fix pmd
pedroslopez Dec 1, 2022
24c2968
Merge branch 'master' into pedroslopez/config-control-msg
pedroslopez Dec 1, 2022
dda5861
format
pedroslopez Dec 1, 2022
c30e8f6
move ApiClientBeanFactory to commons-worker, use in container-orchest…
pedroslopez Dec 1, 2022
51f1976
pull out config updating to separate methods
pedroslopez Dec 6, 2022
8203432
add jitter
pedroslopez Dec 6, 2022
bff1d44
rename PersistConfigHelper -> UpdateConnectorConfigHelper, docs
pedroslopez Dec 6, 2022
9563ddc
Merge branch 'master' into pedroslopez/config-control-msg
pedroslopez Dec 6, 2022
44be06d
fix exception type
pedroslopez Dec 6, 2022
5227c73
fmt
pedroslopez Dec 6, 2022
f2ec8dd
move message type check into runnable
pedroslopez Dec 6, 2022
329f641
formatting
pedroslopez Dec 6, 2022
b94a8f8
pass api client env vars to container orchestrator
pedroslopez Dec 8, 2022
6fbbfbe
pass micronaut envs to container orchestrator
pedroslopez Dec 8, 2022
b486f6d
print stacktrace for debugging
pedroslopez Dec 8, 2022
2710f27
Merge branch 'master' into pedroslopez/config-control-msg
pedroslopez Dec 8, 2022
adabd35
different api host for container orchestrator
pedroslopez Dec 15, 2022
27ebbfb
fix default env var
pedroslopez Dec 16, 2022
2fc87c6
Merge branch 'master' into pedroslopez/config-control-msg
pedroslopez Dec 16, 2022
d88f12f
format
pedroslopez Dec 16, 2022
ef656dc
fix errors after merge
pedroslopez Dec 16, 2022
a8a0496
Merge branch 'master' into pedroslopez/config-control-msg
pedroslopez Dec 16, 2022
8a67e13
set source and destination actor id as part of the sync input
pedroslopez Dec 19, 2022
8d3deab
fix: get destination definition
pedroslopez Dec 20, 2022
0f81248
fix null ptr
pedroslopez Dec 20, 2022
8b64f4a
remove "actor" from naming
pedroslopez Dec 20, 2022
e4e3584
fix missing change from rename
pedroslopez Dec 20, 2022
875228e
revert ContainerOrchestratorConfigBeanFactory changes
pedroslopez Dec 20, 2022
4e6ff2e
Merge branch 'master' into pedroslopez/config-control-msg
pedroslopez Dec 21, 2022
01e3aee
Merge branch 'master' into pedroslopez/config-control-msg
pedroslopez Dec 21, 2022
6ecac44
inject sourceapi/destinationapi directly rather than airbyteapiclient
pedroslopez Dec 21, 2022
c20f61e
UpdateConnectorConfigHelper -> ConnectorConfigUpdater
pedroslopez Dec 21, 2022
b3a443c
rm log
pedroslopez Dec 21, 2022
079c25d
fix test
pedroslopez Dec 21, 2022
e7f4736
dont fail on config update error
pedroslopez Dec 26, 2022
c4ff5dc
process control messages for discover jobs
pedroslopez Dec 27, 2022
a7f0365
process control messages for CHECK
pedroslopez Dec 27, 2022
784414d
persist config updates on check_connection_for_update
pedroslopez Dec 28, 2022
6fd7ad3
get last config message rather than first
pedroslopez Dec 28, 2022
c655be8
fix pmd
pedroslopez Dec 28, 2022
c658271
fix failing tests
pedroslopez Dec 28, 2022
ee133d2
add tests
pedroslopez Dec 28, 2022
90b707b
source id not required for check connection (create case)
pedroslopez Dec 28, 2022
935c3ee
suppress pmd warning for BusyWait literal
pedroslopez Dec 28, 2022
d0f865a
source id not required for checkc onnection (create case) (p2)
pedroslopez Dec 28, 2022
d6eb05f
pass id, not full config to runnables/accept control message
pedroslopez Jan 3, 2023
860a478
Merge branch 'master' into pedroslopez/config-control-msg
pedroslopez Jan 3, 2023
87c8fe2
add new config required for api client
pedroslopez Jan 3, 2023
beb9f70
add test file
pedroslopez Jan 3, 2023
e1261d8
Merge branch 'master' into pedroslopez/config-control-msg
pedroslopez Jan 3, 2023
89622f6
remove debugging logs
pedroslopez Jan 4, 2023
d94feb1
rename method (getLast -> getMostRecent)
pedroslopez Jan 4, 2023
ad8d6b0
Merge branch 'master' into pedroslopez/config-control-msg
pedroslopez Jan 6, 2023
5aa94ca
Merge branch 'pedroslopez/config-control-msg' into pedroslopez/synchr…
pedroslopez Jan 6, 2023
8002968
rm version check (re-added this in by mistake on merge)
pedroslopez Jan 6, 2023
6e42699
fix test compatibility
pedroslopez Jan 6, 2023
92d7c04
Merge branch 'pedroslopez/config-control-msg' into pedroslopez/synchr…
pedroslopez Jan 6, 2023
6554039
Merge branch 'master' into pedroslopez/config-control-msg
pedroslopez Jan 6, 2023
2408c0f
Merge branch 'pedroslopez/config-control-msg' into pedroslopez/synchr…
pedroslopez Jan 6, 2023
60987e2
simplify
pedroslopez Jan 6, 2023
63d596b
Merge branch 'master' into pedroslopez/synchronous-job-control-msg
pedroslopez Jan 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2667,6 +2667,8 @@ components:
- connectionConfiguration
- workspaceId
properties:
sourceId:
$ref: "#/components/schemas/SourceId"
sourceDefinitionId:
$ref: "#/components/schemas/SourceDefinitionId"
connectionConfiguration:
Expand Down Expand Up @@ -2959,6 +2961,8 @@ components:
- destinationDefinitionId
- connectionConfiguration
properties:
destinationId:
$ref: "#/components/schemas/DestinationId"
destinationDefinitionId:
$ref: "#/components/schemas/DestinationDefinitionId"
connectionConfiguration:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,10 @@ public TemporalResponse<ConnectorJobOutput> submitCheckConnection(final UUID job
.withDockerImage(config.getDockerImage())
.withProtocolVersion(config.getProtocolVersion())
.withIsCustomConnector(config.getIsCustomConnector());
final StandardCheckConnectionInput input = new StandardCheckConnectionInput().withConnectionConfiguration(config.getConnectionConfiguration());
final StandardCheckConnectionInput input = new StandardCheckConnectionInput()
.withActorType(config.getActorType())
.withActorId(config.getActorId())
Comment on lines +353 to +354
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm amazed we got this far without sending this information!

.withConnectionConfiguration(config.getConnectionConfiguration());

return execute(jobRunConfig,
() -> getWorkflowStub(CheckConnectionWorkflow.class, TemporalJobType.CHECK_CONNECTION).run(jobRunConfig, launcherConfig, input));
Expand Down
1 change: 1 addition & 0 deletions airbyte-commons-worker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies {
implementation libs.bundles.micronaut

implementation 'io.fabric8:kubernetes-client:5.12.2'
implementation 'com.auth0:java-jwt:3.19.2'
implementation libs.guava
implementation (libs.temporal.sdk) {
exclude module: 'guava'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.protocol.models.AirbyteControlConnectorConfigMessage;
import io.airbyte.protocol.models.AirbyteControlMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
Expand Down Expand Up @@ -91,6 +93,7 @@ public static void cancelProcess(final Process process) {
*/
public static WorkerSourceConfig syncToWorkerSourceConfig(final StandardSyncInput sync) {
return new WorkerSourceConfig()
.withSourceId(sync.getSourceId())
.withSourceConnectionConfiguration(sync.getSourceConfiguration())
.withCatalog(sync.getCatalog())
.withState(sync.getState());
Expand All @@ -102,11 +105,22 @@ public static WorkerSourceConfig syncToWorkerSourceConfig(final StandardSyncInpu
*/
public static WorkerDestinationConfig syncToWorkerDestinationConfig(final StandardSyncInput sync) {
return new WorkerDestinationConfig()
.withDestinationId(sync.getDestinationId())
.withDestinationConnectionConfiguration(sync.getDestinationConfiguration())
.withCatalog(sync.getCatalog())
.withState(sync.getState());
}

public static Optional<AirbyteControlConnectorConfigMessage> getMostRecentConfigControlMessage(final Map<Type, List<AirbyteMessage>> messagesByType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it always safe to discard older control messages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep! control messages contain the full config object, so the final message should have the latest version we need to update

final List<AirbyteControlConnectorConfigMessage> configMessages = messagesByType.getOrDefault(Type.CONTROL, new ArrayList<>()).stream()
.map(AirbyteMessage::getControl)
.filter(control -> control.getType() == AirbyteControlMessage.Type.CONNECTOR_CONFIG)
.map(AirbyteControlMessage::getConnectorConfig)
.toList();

return configMessages.isEmpty() ? Optional.empty() : Optional.of(configMessages.get(configMessages.size() - 1));
}

public static ConnectorJobOutput getJobFailureOutputOrThrow(final OutputType outputType,
final Map<Type, List<AirbyteMessage>> messagesByType,
final String defaultErrorMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.auth0.jwt.algorithms.Algorithm;
import com.google.auth.oauth2.ServiceAccountCredentials;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.generated.DestinationApi;
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.invoker.generated.ApiClient;
import io.airbyte.commons.temporal.config.WorkerMode;
Expand Down Expand Up @@ -58,7 +59,7 @@ public ApiClient apiClient(@Value("${airbyte.internal.api.auth-header.name}") fi
}

@Singleton
public AirbyteApiClient airbyteApiClient(ApiClient apiClient) {
public AirbyteApiClient airbyteApiClient(final ApiClient apiClient) {
return new AirbyteApiClient(apiClient);
}

Expand All @@ -67,6 +68,11 @@ public SourceApi sourceApi(final ApiClient apiClient) {
return new SourceApi(apiClient);
}

@Singleton
public DestinationApi destinationApi(final ApiClient apiClient) {
return new DestinationApi(apiClient);
}

@Singleton
public HttpClient httpClient() {
return HttpClient.newHttpClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteControlConnectorConfigMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.ConnectorConfigUpdater;
import io.airbyte.workers.internal.AirbyteStreamFactory;
import io.airbyte.workers.internal.DefaultAirbyteStreamFactory;
import io.airbyte.workers.process.IntegrationLauncher;
Expand All @@ -43,18 +45,21 @@ public class DefaultCheckConnectionWorker implements CheckConnectionWorker {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultCheckConnectionWorker.class);

private final IntegrationLauncher integrationLauncher;
private final ConnectorConfigUpdater connectorConfigUpdater;
private final AirbyteStreamFactory streamFactory;

private Process process;

public DefaultCheckConnectionWorker(final IntegrationLauncher integrationLauncher,
final ConnectorConfigUpdater connectorConfigUpdater,
final AirbyteStreamFactory streamFactory) {
this.integrationLauncher = integrationLauncher;
this.connectorConfigUpdater = connectorConfigUpdater;
this.streamFactory = streamFactory;
}

public DefaultCheckConnectionWorker(final IntegrationLauncher integrationLauncher) {
this(integrationLauncher, new DefaultAirbyteStreamFactory());
public DefaultCheckConnectionWorker(final IntegrationLauncher integrationLauncher, final ConnectorConfigUpdater connectorConfigUpdater) {
this(integrationLauncher, connectorConfigUpdater, new DefaultAirbyteStreamFactory());
}

@Trace(operationName = WORKER_OPERATION_NAME)
Expand Down Expand Up @@ -84,6 +89,21 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa
.map(AirbyteMessage::getConnectionStatus)
.findFirst();

if (input.getActorId() != null && input.getActorType() != null) {
final Optional<AirbyteControlConnectorConfigMessage> optionalConfigMsg = WorkerUtils.getMostRecentConfigControlMessage(messagesByType);
optionalConfigMsg.ifPresent(
configMessage -> {
switch (input.getActorType()) {
case SOURCE -> connectorConfigUpdater.updateSource(
input.getActorId(),
configMessage.getConfig());
case DESTINATION -> connectorConfigUpdater.updateDestination(
input.getActorId(),
configMessage.getConfig());
}
});
}

if (status.isPresent() && exitCode == 0) {
final StandardCheckConnectionOutput output = new StandardCheckConnectionOutput()
.withStatus(Enums.convertTo(status.get().getStatus(), Status.class))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteControlConnectorConfigMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.ConnectorConfigUpdater;
import io.airbyte.workers.internal.AirbyteStreamFactory;
import io.airbyte.workers.internal.DefaultAirbyteStreamFactory;
import io.airbyte.workers.process.IntegrationLauncher;
Expand All @@ -48,20 +50,24 @@ public class DefaultDiscoverCatalogWorker implements DiscoverCatalogWorker {

private final IntegrationLauncher integrationLauncher;
private final AirbyteStreamFactory streamFactory;
private final ConnectorConfigUpdater connectorConfigUpdater;

private volatile Process process;

public DefaultDiscoverCatalogWorker(final ConfigRepository configRepository,
final IntegrationLauncher integrationLauncher,
final ConnectorConfigUpdater connectorConfigUpdater,
final AirbyteStreamFactory streamFactory) {
this.configRepository = configRepository;
this.integrationLauncher = integrationLauncher;
this.streamFactory = streamFactory;
this.connectorConfigUpdater = connectorConfigUpdater;
}

public DefaultDiscoverCatalogWorker(final ConfigRepository configRepository,
final IntegrationLauncher integrationLauncher) {
this(configRepository, integrationLauncher, new DefaultAirbyteStreamFactory());
final IntegrationLauncher integrationLauncher,
final ConnectorConfigUpdater connectorConfigUpdater) {
this(configRepository, integrationLauncher, connectorConfigUpdater, new DefaultAirbyteStreamFactory());
}

@Trace(operationName = WORKER_OPERATION_NAME)
Expand Down Expand Up @@ -90,6 +96,12 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI
.map(AirbyteMessage::getCatalog)
.findFirst();

final Optional<AirbyteControlConnectorConfigMessage> optionalConfigMsg = WorkerUtils.getMostRecentConfigControlMessage(messagesByType);
optionalConfigMsg.ifPresent(
configMessage -> connectorConfigUpdater.updateSource(
UUID.fromString(discoverSchemaInput.getSourceId()),
configMessage.getConfig()));

final int exitCode = process.exitValue();
if (exitCode == 0) {
if (catalog.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.protocol.models.AirbyteControlMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
Expand All @@ -36,6 +37,7 @@
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.RecordSchemaValidationException;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.ConnectorConfigUpdater;
import io.airbyte.workers.helper.FailureHelper;
import io.airbyte.workers.helper.ThreadedTimeTracker;
import io.airbyte.workers.internal.AirbyteDestination;
Expand Down Expand Up @@ -98,6 +100,7 @@ public class DefaultReplicationWorker implements ReplicationWorker {
private final AtomicBoolean hasFailed;
private final RecordSchemaValidator recordSchemaValidator;
private final WorkerMetricReporter metricReporter;
private final ConnectorConfigUpdater connectorConfigUpdater;
private final boolean fieldSelectionEnabled;

public DefaultReplicationWorker(final String jobId,
Expand All @@ -108,6 +111,7 @@ public DefaultReplicationWorker(final String jobId,
final MessageTracker messageTracker,
final RecordSchemaValidator recordSchemaValidator,
final WorkerMetricReporter metricReporter,
final ConnectorConfigUpdater connectorConfigUpdater,
final boolean fieldSelectionEnabled) {
this.jobId = jobId;
this.attempt = attempt;
Expand All @@ -118,6 +122,7 @@ public DefaultReplicationWorker(final String jobId,
this.executors = Executors.newFixedThreadPool(2);
this.recordSchemaValidator = recordSchemaValidator;
this.metricReporter = metricReporter;
this.connectorConfigUpdater = connectorConfigUpdater;
this.fieldSelectionEnabled = fieldSelectionEnabled;

this.cancelled = new AtomicBoolean(false);
Expand Down Expand Up @@ -191,7 +196,7 @@ private void replicate(final Path jobRoot,
// note: `whenComplete` is used instead of `exceptionally` so that the original exception is still
// thrown
final CompletableFuture<?> readFromDstThread = CompletableFuture.runAsync(
readFromDstRunnable(destination, cancelled, messageTracker, mdc, timeTracker),
readFromDstRunnable(destination, cancelled, messageTracker, connectorConfigUpdater, mdc, timeTracker, destinationConfig),
executors)
.whenComplete((msg, ex) -> {
if (ex != null) {
Expand All @@ -212,10 +217,12 @@ private void replicate(final Path jobRoot,
cancelled,
mapper,
messageTracker,
connectorConfigUpdater,
mdc,
recordSchemaValidator,
metricReporter,
timeTracker,
sourceConfig,
fieldSelectionEnabled),
executors)
.whenComplete((msg, ex) -> {
Expand Down Expand Up @@ -253,8 +260,10 @@ private void replicate(final Path jobRoot,
private static Runnable readFromDstRunnable(final AirbyteDestination destination,
final AtomicBoolean cancelled,
final MessageTracker messageTracker,
final ConnectorConfigUpdater connectorConfigUpdater,
final Map<String, String> mdc,
final ThreadedTimeTracker timeHolder) {
final ThreadedTimeTracker timeHolder,
final WorkerDestinationConfig destinationConfig) {
return () -> {
MDC.setContextMap(mdc);
LOGGER.info("Destination output thread started.");
Expand All @@ -267,8 +276,18 @@ private static Runnable readFromDstRunnable(final AirbyteDestination destination
throw new DestinationException("Destination process read attempt failed", e);
}
if (messageOptional.isPresent()) {
LOGGER.info("State in DefaultReplicationWorker from destination: {}", messageOptional.get());
messageTracker.acceptFromDestination(messageOptional.get());
final AirbyteMessage message = messageOptional.get();
LOGGER.info("State in DefaultReplicationWorker from destination: {}", message);

messageTracker.acceptFromDestination(message);

try {
if (message.getType() == Type.CONTROL) {
acceptDstControlMessage(destinationConfig, message.getControl(), connectorConfigUpdater);
}
} catch (final Exception e) {
LOGGER.error("Error updating destination configuration", e);
}
}
}
timeHolder.trackDestinationWriteEndTime();
Expand Down Expand Up @@ -300,10 +319,12 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou
final AtomicBoolean cancelled,
final AirbyteMapper mapper,
final MessageTracker messageTracker,
final ConnectorConfigUpdater connectorConfigUpdater,
final Map<String, String> mdc,
final RecordSchemaValidator recordSchemaValidator,
final WorkerMetricReporter metricReporter,
final ThreadedTimeTracker timeHolder,
final WorkerSourceConfig sourceConfig,
final boolean fieldSelectionEnabled) {
return () -> {
MDC.setContextMap(mdc);
Expand Down Expand Up @@ -333,6 +354,14 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou

messageTracker.acceptFromSource(message);

try {
if (message.getType() == Type.CONTROL) {
acceptSrcControlMessage(sourceConfig, message.getControl(), connectorConfigUpdater);
}
} catch (final Exception e) {
LOGGER.error("Error updating source configuration", e);
}

try {
if (message.getType() == Type.RECORD || message.getType() == Type.STATE) {
destination.accept(message);
Expand Down Expand Up @@ -391,6 +420,22 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou
};
}

private static void acceptSrcControlMessage(final WorkerSourceConfig sourceConfig,
final AirbyteControlMessage controlMessage,
final ConnectorConfigUpdater connectorConfigUpdater) {
if (controlMessage.getType() == AirbyteControlMessage.Type.CONNECTOR_CONFIG) {
connectorConfigUpdater.updateSource(sourceConfig.getSourceId(), controlMessage.getConnectorConfig().getConfig());
}
}

private static void acceptDstControlMessage(final WorkerDestinationConfig destinationConfig,
final AirbyteControlMessage controlMessage,
final ConnectorConfigUpdater connectorConfigUpdater) {
if (controlMessage.getType() == AirbyteControlMessage.Type.CONNECTOR_CONFIG) {
connectorConfigUpdater.updateDestination(destinationConfig.getDestinationId(), controlMessage.getConnectorConfig().getConfig());
}
}

private ReplicationOutput getReplicationOutput(final StandardSyncInput syncInput,
final WorkerDestinationConfig destinationConfig,
final AtomicReference<FailureReason> replicationRunnableFailureRef,
Expand Down
Loading