Skip to content

Commit

Permalink
fixing schema copy (#204)
Browse files Browse the repository at this point in the history
* fixing schema copy
  • Loading branch information
Patrick Duin authored Oct 26, 2020
1 parent 6dfe1bc commit ecfb74f
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 14 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
## [16.3.2] - 2020-10-27
### Fixed
* Issue where external AVRO schemas generated lots of copy jobs. See [#203](https://github.com/HotelsDotCom/circus-train/issues/203).

## [16.3.1] - 2020-09-15
### Changed
* Added fields `sourceTable` and `sourcePartitions` to the `CopierContext` class.
* Added fields `sourceTable` and `sourcePartitions` to the `CopierContext` class.

## [16.3.0] - 2020-09-01
### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@

import static com.google.common.base.Preconditions.checkNotNull;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

import com.hotels.bdp.circustrain.api.CircusTrainException;
import com.hotels.bdp.circustrain.api.Modules;
import com.hotels.bdp.circustrain.api.conf.TableReplication;
import com.hotels.bdp.circustrain.api.copier.Copier;
Expand All @@ -45,15 +48,18 @@ public class SchemaCopier {
private static final Logger LOG = LoggerFactory.getLogger(SchemaCopier.class);

private final Configuration sourceHiveConf;
private final Configuration replicaHiveConf;
private final CopierFactoryManager copierFactoryManager;
private final CopierOptions globalCopierOptions;

@Autowired
public SchemaCopier(
Configuration sourceHiveConf,
Configuration replicaHiveConf,
CopierFactoryManager copierFactoryManager,
CopierOptions globalCopierOptions) {
this.sourceHiveConf = sourceHiveConf;
this.replicaHiveConf = replicaHiveConf;
this.copierFactoryManager = copierFactoryManager;
this.globalCopierOptions = globalCopierOptions;
}
Expand All @@ -68,20 +74,32 @@ public Path copy(String source, String destination, EventTableReplication eventT
sourceLocation = sourceFileSystemPathResolver.resolveNameServices(sourceLocation);

Path destinationSchemaFile = new Path(destination, sourceLocation.getName());
try {
FileSystem targetFileSystem = destinationSchemaFile.getFileSystem(replicaHiveConf);
if (targetFileSystem.exists(destinationSchemaFile)) {
LOG
.info("Avro schema has already been copied from '{}' to '{}', skipping schema copy step.", sourceLocation,
destinationSchemaFile);
} else {
Map<String, Object> mergedCopierOptions = new HashMap<>(TableReplication
.getMergedCopierOptions(globalCopierOptions.getCopierOptions(), eventTableReplication.getCopierOptions()));

Map<String, Object> mergedCopierOptions = new HashMap<>(TableReplication
.getMergedCopierOptions(globalCopierOptions.getCopierOptions(), eventTableReplication.getCopierOptions()));
mergedCopierOptions.put(CopierOptions.COPY_DESTINATION_IS_FILE, "true");
CopierFactory copierFactory = copierFactoryManager
.getCopierFactory(sourceLocation, destinationSchemaFile, mergedCopierOptions);
LOG.info("Replicating Avro schema from '{}' to '{}'", sourceLocation, destinationSchemaFile);
CopierContext copierContext = new CopierContext(eventId, sourceLocation, destinationSchemaFile, mergedCopierOptions);
Copier copier = copierFactory.newInstance(copierContext);
Metrics metrics = copier.copy();
mergedCopierOptions.put(CopierOptions.COPY_DESTINATION_IS_FILE, "true");
CopierFactory copierFactory = copierFactoryManager
.getCopierFactory(sourceLocation, destinationSchemaFile, mergedCopierOptions);
LOG.info("Replicating Avro schema from '{}' to '{}'", sourceLocation, destinationSchemaFile);
CopierContext copierContext = new CopierContext(eventId, sourceLocation, destinationSchemaFile,
mergedCopierOptions);
Copier copier = copierFactory.newInstance(copierContext);
Metrics metrics = copier.copy();

LOG
.info("Avro schema '{} bytes' has been copied from '{}' to '{}'", metrics.getBytesReplicated(), sourceLocation,
destinationSchemaFile);
LOG
.info("Avro schema '{} bytes' has been copied from '{}' to '{}'", metrics.getBytesReplicated(),
sourceLocation, destinationSchemaFile);
}
} catch (IOException e) {
throw new CircusTrainException("Could not check target FileSystem:", e);
}
return destinationSchemaFile;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

import java.io.File;
Expand All @@ -36,6 +37,8 @@
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

import com.google.common.io.Files;

import com.hotels.bdp.circustrain.api.copier.Copier;
import com.hotels.bdp.circustrain.api.copier.CopierContext;
import com.hotels.bdp.circustrain.api.copier.CopierFactory;
Expand All @@ -60,7 +63,7 @@ public class SchemaCopierTest {

@Before
public void setUp() {
schemaCopier = new SchemaCopier(new HiveConf(), copierFactoryManager, copierOptions);
schemaCopier = new SchemaCopier(new HiveConf(), new HiveConf(), copierFactoryManager, copierOptions);
}

@Test
Expand All @@ -79,6 +82,20 @@ public void copiedToCorrectDestination() throws IOException {
assertThat(result, is(targetFile));
}

@Test
public void skipCopyIfDestinationExists() throws IOException {
Path source = new Path(temporaryFolder.newFile("test.txt").toURI());
File destination = temporaryFolder.newFolder();
Path targetFile = new Path(destination.toString(), "test.txt");
Files.touch(new File(targetFile.toUri().toString()));
Map<String, Object> copierOptionsMap = new HashMap<>();
copierOptionsMap.put(CopierOptions.COPY_DESTINATION_IS_FILE, "true");

Path result = schemaCopier.copy(source.toString(), destination.toString(), eventTableReplication, eventId);
assertThat(result, is(targetFile));
verifyZeroInteractions(copierFactoryManager, copierFactory, copier, metrics);
}

@Test(expected = NullPointerException.class)
public void copyWithNullSourceParamThrowsException() throws IOException {
File destination = temporaryFolder.newFolder();
Expand Down

0 comments on commit ecfb74f

Please sign in to comment.