Skip to content

Commit

Permalink
Merge pull request #2 from Andrei-Luksha2/EGD-5267-read-default-proce…
Browse files Browse the repository at this point in the history
…ssingLoopInterval-for-DefaultAsyncMessageProcessor-from-env-var

Egd 5267 read default processing loop interval for default async message processor from env var
  • Loading branch information
markovarghese authored May 26, 2022
2 parents 9c9f2a9 + f26f09f commit f0b3590
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 5 deletions.
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,16 @@ replication.
4. Native cross data center replication.

## Getting Started
Once you have setup a Cassandra cluster then deploy the necessary schema located [here link](https://github.homeawaycorp.com/coe-data-tools/photon/tree/master/src/main/resources/cassandra).
Once you have setup a Cassandra cluster then deploy the necessary schema located in
[homeaway github (old version)](https://github.homeawaycorp.com/coe-data-tools/photon/tree/master/src/main/resources/cassandra)
or in [Expedia Group github (newer version)](https://github.expedia.biz/eg-edv-datasync/datasync-schema).

You need to use Java version compatible with your Cassandra version to execute unit-tests for this project.
E.g. if you use Cassandra 3, you need to use Java 8.
Higher Java versions will cause errors like "NoHostAvailable" failures in unit tests.
You can change JDK by setting JAVA_HOME environment variable in the shell, where you run the tests.

Warning: some unit tests are observed to fail occasionally, then succeed without any code changes.

Make sure you include the Photon dependency below in your code
```xml
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.expediagroup.photon</groupId>
<artifactId>photon</artifactId>
<version>0.1.8</version>
<version>0.1.10</version>
<packaging>jar</packaging>

<name>${project.groupId}:${project.artifactId}</name>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.homeaway.datatools.photon.utils;

import org.slf4j.Logger;

import java.util.function.Function;

public class EnvironmentVariablesUtils {
public static <T> T getEnvVarOrUseDefault(
final String envVarName,
T defaultValue,
Logger log,
Function<String, T> parsingFunction
) {
T result;
final String envVarValue = System.getenv(envVarName);
if (envVarValue == null) {
log.info(
"Environment variable {} is not set. Will use the default {} value.",
envVarName,
defaultValue
);
result = defaultValue;
} else {
log.info(
"Environment variable {} is set to {}",
envVarName,
envVarValue
);
result = parsingFunction.apply(envVarValue);
}
return result;
}

public static long getLongOrUseDefault(final String envVarName, Long defaultValue, Logger log) {
Function<String, Long> parsingFunction = (String arg) -> parseLong(envVarName, arg);
return getEnvVarOrUseDefault(envVarName, defaultValue, log, parsingFunction);
}

public static long parseLong(final String envVarName, final String envVarValue) {
long result;
try {
result = Long.parseLong(envVarValue);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
String.format(
"Failed to parse value %s of environment variable %s as a Long value.",
envVarValue,
envVarName
),
e
);
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import com.google.common.collect.Maps;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import com.homeaway.datatools.photon.utils.EnvironmentVariablesUtils;
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
Expand Down Expand Up @@ -65,7 +67,7 @@ public DefaultAsyncMessageProcessor(final MessageEventHandler<K, V> eventHandler
final ProcessorManifest<K, V, T> processorManifest,
int maxEvents) {
this(Maps.newConcurrentMap(), Maps.newConcurrentMap(),
eventHandler, processorManifest, Duration.ofMillis(1), maxEvents);
eventHandler, processorManifest, getDefaultProcessingLoopInterval(), maxEvents);
}

public DefaultAsyncMessageProcessor(final ConcurrentMap<String, EventQueueMap<V>> eventQueueMapMap,
Expand All @@ -81,6 +83,10 @@ public DefaultAsyncMessageProcessor(final ConcurrentMap<String, EventQueueMap<V>
this.processingLoopInterval = processingLoopInterval;
this.maxEvents = maxEvents;
this.count = new LongAdder();
log.info(
"DefaultAsyncMessageProcessor created with processingLoopInterval={} ms",
processingLoopInterval.toMillis()
);
}

@Override
Expand Down Expand Up @@ -230,4 +236,15 @@ private ExecutorService getExecutorService() {
return executorService;
});
}

/**
* Gets value of DEFAULT_MESSAGE_PROCESSOR_LOOP_INTERVAL_MS environment variable if it is set.
* Otherwise, returns a default value.
*/
protected static Duration getDefaultProcessingLoopInterval() {
final String envVarName = "DEFAULT_MESSAGE_PROCESSOR_LOOP_INTERVAL_MS";
final Long defaultValue = 1L;
long defaultProcessingLoopInterval = EnvironmentVariablesUtils.getLongOrUseDefault(envVarName, defaultValue, log);
return Duration.ofMillis(defaultProcessingLoopInterval);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class CassandraBeamDataManifestDaoTest {

Expand Down Expand Up @@ -183,8 +185,21 @@ public void testGetBeamDataManifest() {
now = now.plus(1L, ChronoUnit.MINUTES);
}
try {
Assert.assertEquals(20, Futures.successfulAsList(resultSetFutureList).get()
.stream().mapToInt(PhotonRowSet::getSize).sum());
List<PhotonRowSet> photonRowSetsFromSuccessfulFutures = Futures.successfulAsList(resultSetFutureList).get();
final int actualCount = photonRowSetsFromSuccessfulFutures
.stream().mapToInt(PhotonRowSet::getSize).sum();
Assert.assertEquals(
String.format(
"Expected total of 20 elements in following list, but got %s. The list is: %s",
actualCount,
photonRowSetsFromSuccessfulFutures.stream()
.flatMap(x -> StreamSupport.stream(x.spliterator(), false))
.map(Object::toString)
.collect(Collectors.joining(",\n"))
),
20,
actualCount
);
} catch (ExecutionException | InterruptedException e) {

}
Expand Down

0 comments on commit f0b3590

Please sign in to comment.