Skip to content

Commit

Permalink
Take the node id into account when creating geoip tmp dir.
Browse files Browse the repository at this point in the history
This change adjust where the geoip tmp directory is created
to avoid issues when running multiple nodes on the same machine.

In the java tmp dir, a 'geoip-databases' directory is created and
directly under this directory a directory with the node id as name is created.
This allows safely running multiple nodes on the same machine (this
happens mainly during tests).

Closes elastic#69972
Relates to elastic#68920
  • Loading branch information
martijnvg committed Mar 16, 2021
1 parent 65191d0 commit ce9d7bf
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
Expand Down Expand Up @@ -148,7 +149,6 @@ public void testGeoIpDatabasesDownload() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/69972")
@TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/69972")
public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
// setup:
Expand Down Expand Up @@ -233,12 +233,22 @@ public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
Settings.Builder settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true);
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));

final List<Path> geoipTmpDirs = StreamSupport.stream(internalCluster().getInstances(Environment.class).spliterator(), false)
final List<Path> geoipTmpDirs = StreamSupport.stream(internalCluster().getDataNodeInstances(Environment.class).spliterator(), false)
.map(env -> {
Path geoipTmpDir = env.tmpFile().resolve("geoip-databases");
assertThat(Files.exists(geoipTmpDir), is(true));
return geoipTmpDir;
}).flatMap(path -> {
try {
return Files.list(path);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}).filter(path -> {
return StreamSupport.stream(clusterService().state().nodes().getDataNodes().values().spliterator(), false)
.anyMatch(cursor -> cursor.value.getId().equals(path.getFileName().toString()));
}).collect(Collectors.toList());
assertThat(geoipTmpDirs.size(), equalTo(internalCluster().numDataNodes()));
assertBusy(() -> {
for (Path geoipTmpDir : geoipTmpDirs) {
try (Stream<Path> list = Files.list(geoipTmpDir)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private static DatabaseRegistry createRegistry(Path geoIpModulesDir, Path geoIpC
LocalDatabases localDatabases = new LocalDatabases(geoIpModulesDir, geoIpConfigDir, cache);
DatabaseRegistry databaseRegistry =
new DatabaseRegistry(geoIpTmpDir, mock(Client.class), cache, localDatabases, Runnable::run);
databaseRegistry.initialize(mock(ResourceWatcherService.class), mock(IngestService.class));
databaseRegistry.initialize("nodeId", mock(ResourceWatcherService.class), mock(IngestService.class));
return databaseRegistry;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ final class DatabaseRegistry implements Closeable {

private final Client client;
private final GeoIpCache cache;
private final Path geoipTmpDirectory;
private final Path geoipTmpBaseDirectory;
private Path geoipTmpDirectory;
private final LocalDatabases localDatabases;
private final Consumer<Runnable> genericExecutor;

Expand All @@ -100,13 +101,14 @@ final class DatabaseRegistry implements Closeable {
Consumer<Runnable> genericExecutor) {
this.client = client;
this.cache = cache;
this.geoipTmpDirectory = tmpDir.resolve("geoip-databases");
this.geoipTmpBaseDirectory = tmpDir.resolve("geoip-databases");
this.localDatabases = localDatabases;
this.genericExecutor = genericExecutor;
}

public void initialize(ResourceWatcherService resourceWatcher, IngestService ingestService) throws IOException {
public void initialize(String nodeId, ResourceWatcherService resourceWatcher, IngestService ingestService) throws IOException {
localDatabases.initialize(resourceWatcher);
geoipTmpDirectory = geoipTmpBaseDirectory.resolve(nodeId);
Files.walkFileTree(geoipTmpDirectory, new FileVisitor<>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) {
Expand Down Expand Up @@ -138,7 +140,7 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
}
});
if (Files.exists(geoipTmpDirectory) == false) {
Files.createDirectory(geoipTmpDirectory);
Files.createDirectories(geoipTmpDirectory);
}
LOGGER.info("initialized database registry, using geoip-databases directory [{}]", geoipTmpDirectory);
ingestService.addIngestClusterStateListener(this::checkDatabases);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public Collection<Object> createComponents(Client client,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier) {
try {
databaseRegistry.get().initialize(resourceWatcherService, ingestService.get());
String nodeId = nodeEnvironment.nodeId();
databaseRegistry.get().initialize(nodeId, resourceWatcherService, ingestService.get());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void setup() throws IOException {
LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, cache);
geoIpTmpDir = createTempDir();
databaseRegistry = new DatabaseRegistry(geoIpTmpDir, client, cache, localDatabases, Runnable::run);
databaseRegistry.initialize(resourceWatcherService, mock(IngestService.class));
databaseRegistry.initialize("nodeId", resourceWatcherService, mock(IngestService.class));
}

@After
Expand Down Expand Up @@ -142,7 +142,7 @@ public void testCheckDatabases() throws Exception {
databaseRegistry.checkDatabases(state);
assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), notNullValue());
verify(client, times(10)).search(any());
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases"))) {
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
assertThat(files.collect(Collectors.toList()), hasSize(1));
}
}
Expand All @@ -166,7 +166,7 @@ public void testCheckDatabases_dontCheckDatabaseOnNonIngestNode() throws Excepti
databaseRegistry.checkDatabases(state);
assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue());
verify(client, never()).search(any());
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases"))) {
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
assertThat(files.collect(Collectors.toList()), empty());
}
}
Expand All @@ -188,7 +188,7 @@ public void testCheckDatabases_dontCheckDatabaseWhenNoDatabasesIndex() throws Ex
databaseRegistry.checkDatabases(state);
assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue());
verify(client, never()).search(any());
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases"))) {
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
assertThat(files.collect(Collectors.toList()), empty());
}
}
Expand All @@ -209,7 +209,7 @@ public void testCheckDatabases_dontCheckDatabaseWhenGeoIpDownloadTask() throws E
databaseRegistry.checkDatabases(state);
assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue());
verify(client, never()).search(any());
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases"))) {
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
assertThat(files.collect(Collectors.toList()), empty());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public void testLoadingCustomDatabase() throws IOException {
Client client = mock(Client.class);
GeoIpCache cache = new GeoIpCache(1000);
DatabaseRegistry databaseRegistry = new DatabaseRegistry(createTempDir(), client, cache, localDatabases, Runnable::run);
databaseRegistry.initialize(resourceWatcherService, mock(IngestService.class));
databaseRegistry.initialize("nodeId", resourceWatcherService, mock(IngestService.class));
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry);
for (DatabaseReaderLazyLoader lazyLoader : localDatabases.getAllDatabases()) {
assertNull(lazyLoader.databaseReader.get());
Expand Down

0 comments on commit ce9d7bf

Please sign in to comment.