Skip to content

Commit

Permalink
RefreshSchema method on RefreshSchema activity (airbytehq#18460)
Browse files Browse the repository at this point in the history
* Add refreshSchema method to RefreshSchema activity
  • Loading branch information
alovew authored and jhammarstedt committed Oct 31, 2022
1 parent 9ef2233 commit 601d148
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.workers.temporal.sync;

import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.validation.json.JsonValidationException;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import java.io.IOException;
Expand All @@ -15,4 +18,6 @@ public interface RefreshSchemaActivity {
@ActivityMethod
boolean shouldRefreshSchema(UUID sourceCatalogId) throws IOException;

public void refreshSchema(UUID sourceCatalogId) throws JsonValidationException, ConfigNotFoundException, IOException, ApiException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.persistence.ConfigRepository;
import java.io.IOException;
Expand All @@ -18,8 +21,11 @@ public class RefreshSchemaActivityImpl implements RefreshSchemaActivity {

private final Optional<ConfigRepository> configRepository;

public RefreshSchemaActivityImpl(Optional<ConfigRepository> configRepository) {
private final SourceApi sourceApi;

public RefreshSchemaActivityImpl(Optional<ConfigRepository> configRepository, SourceApi sourceApi) {
this.configRepository = configRepository;
this.sourceApi = sourceApi;
}

@Override
Expand All @@ -33,6 +39,13 @@ public boolean shouldRefreshSchema(UUID sourceCatalogId) throws IOException {
return !schemaRefreshRanRecently(sourceCatalogId);
}

@Override
public void refreshSchema(UUID sourceCatalogId) throws ApiException {
SourceDiscoverSchemaRequestBody requestBody =
new SourceDiscoverSchemaRequestBody().sourceId(sourceCatalogId).disableCache(true);
sourceApi.discoverSchemaForSource(requestBody);
}

private boolean schemaRefreshRanRecently(UUID sourceCatalogId) throws IOException {
Optional<ActorCatalogFetchEvent> mostRecentFetchEvent = configRepository.get().getMostRecentActorCatalogFetchEventForSource(sourceCatalogId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@
package io.airbyte.workers.temporal.scheduling.activities;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.workers.temporal.sync.RefreshSchemaActivityImpl;
Expand All @@ -24,14 +29,18 @@
class RefreshSchemaActivityTest {

static private ConfigRepository mConfigRepository;

static private SourceApi mSourceApi;

static private RefreshSchemaActivityImpl refreshSchemaActivity;

static private final UUID SOURCE_ID = UUID.randomUUID();

@BeforeEach
void setUp() {
mConfigRepository = mock(ConfigRepository.class);
refreshSchemaActivity = new RefreshSchemaActivityImpl(Optional.of(mConfigRepository));
mSourceApi = mock(SourceApi.class);
refreshSchemaActivity = new RefreshSchemaActivityImpl(Optional.of(mConfigRepository), mSourceApi);
}

@Test
Expand All @@ -56,4 +65,13 @@ void testShouldRefreshSchemaRecentRefreshLessThan24HoursAgo() throws IOException
Assertions.assertThat(false).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID));
}

@Test
void testRefreshSchema() throws ApiException {
UUID sourceId = UUID.randomUUID();
refreshSchemaActivity.refreshSchema(sourceId);
SourceDiscoverSchemaRequestBody requestBody =
new SourceDiscoverSchemaRequestBody().sourceId(sourceId).disableCache(true);
verify(mSourceApi, times(1)).discoverSchemaForSource(requestBody);
}

}

0 comments on commit 601d148

Please sign in to comment.