Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes to Source Job Connector and Master Monitor Logging #703

Merged
merged 2 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ public MantisClient(MasterClientWrapper clientWrapper, boolean disablePingFilter
this.clientWrapper = clientWrapper;
}

public MantisClient(HighAvailabilityServices haServices) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest adding a javadoc here to let user know this will start running the HAService. Also in this case who should be closing the haService if it creates other threads?

haServices.awaitRunning();
clientWrapper = new MasterClientWrapper(haServices.getMasterClientApi());
this.disablePingFiltering = false;
}

public MantisClient(MasterClientWrapper clientWrapper) {
this(clientWrapper, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.mantisrx.runtime.parameter.Parameter;
import io.mantisrx.runtime.parameter.SinkParameters;
import io.mantisrx.server.master.client.ConditionalRetry;
import io.mantisrx.server.master.client.HighAvailabilityServices;
import io.mantisrx.server.master.client.NoSuchJobException;
import io.reactivx.mantis.operators.DropOperator;
import java.io.Closeable;
Expand Down Expand Up @@ -179,10 +180,6 @@ public static class Builder {
private Observer<SinkConnectionsStatus> sinkConnectionsStatusObserver = null;
private long dataRecvTimeoutSecs = 5;

public Builder(Properties properties) {
this(new MantisClient(properties));
}

public Builder() {
Copy link
Collaborator Author

@kmg-stripe kmg-stripe Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were seeing log spam, where a JobSource triggers this code path. It doesn't appear to break anything, but seems a bit weird and leads to log spam. HighAvailabilityServicesUtil logs a few warnings (i.e. "HA service running in local mode. This is only valid in local test."), because this path attempts to "create" the HA services with no properties. I think it is a little weird because I wouldn't expect a job writer to pass Mantis platform configuration for discovering the master, so it seems best to use HighAvailabilityServicesUtil. That is, unless I am missing something...

My understanding is that the HA services should generally be configured if we are here, so should just try to get the reference to HA services.

Properties properties = new Properties();
properties.setProperty("mantis.zookeeper.connectionTimeMs", "1000");
Expand All @@ -191,10 +188,18 @@ public Builder() {
properties.setProperty("mantis.zookeeper.connectString", System.getenv("mantis.zookeeper.connectString"));
properties.setProperty("mantis.zookeeper.root", System.getenv("mantis.zookeeper.root"));
properties.setProperty("mantis.zookeeper.leader.announcement.path",
System.getenv("mantis.zookeeper.leader.announcement.path"));
System.getenv("mantis.zookeeper.leader.announcement.path"));
mantisClient = new MantisClient(properties);
}

public Builder(HighAvailabilityServices haServices) {
this(new MantisClient(haServices));
}

public Builder(Properties properties) {
this(new MantisClient(properties));
}

public Builder(MantisClient mantisClient) {
this.mantisClient = mantisClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {
implementation project(":mantis-runtime")
implementation project(":mantis-client")
implementation project(":mantis-control-plane:mantis-control-plane-core")
implementation project(":mantis-control-plane:mantis-control-plane-client")
implementation project(":mantis-publish:mantis-publish-core")

implementation "com.google.code.gson:gson:$gsonVersion"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import io.mantisrx.server.master.client.HighAvailabilityServicesUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observer;
Expand Down Expand Up @@ -59,12 +61,17 @@ public class MantisSourceJobConnector {
private static final String ZK_ROOT = "mantis.zookeeper.root";
private static final String ZK_LEADER_PATH = "mantis.zookeeper.leader.announcement.path";

public MantisSourceJobConnector(Properties props) {
this.props = props;
public MantisSourceJobConnector(boolean configureDefaults) {
if (configureDefaults) {
props = defaultProperties();
} else {
props = null;
}
}

public MantisSourceJobConnector() {
props = new Properties();
// todo(kmg-stripe): Can we remove this? It seems it is only used by main in this class for testing.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feel free to remove this (or move it to test).

private static Properties defaultProperties() {
Properties props = new Properties();

final String defaultZkConnect = "127.0.0.1:2181";
final String defaultZkRoot = "/mantis/master";
Expand Down Expand Up @@ -99,6 +106,7 @@ public MantisSourceJobConnector() {
}

LOGGER.info("Mantis Zk settings used for Source Job connector: connectString {} root {} path {}", connectString, zookeeperRoot, zookeeperLeaderAnnouncementPath);
return props;
}

@Deprecated
Expand Down Expand Up @@ -130,7 +138,8 @@ public MantisSSEJob connectToJob(
String jobName,
SinkParameters params,
Observer<SinkConnectionsStatus> sinkObserver) {
return new MantisSSEJob.Builder(props)
MantisSSEJob.Builder builder = props != null ? new MantisSSEJob.Builder(props) : new MantisSSEJob.Builder(HighAvailabilityServicesUtil.get());
return builder
.name(jobName)
.sinkConnectionsStatusObserver(sinkObserver)
.onConnectionReset(throwable -> LOGGER.error("Reconnecting due to error: " + throwable.getMessage()))
Expand Down Expand Up @@ -163,7 +172,7 @@ public static void main(String[] args) {
Args.parse(MantisSourceJobConnector.class, args);

final CountDownLatch latch = new CountDownLatch(20);
MantisSourceJobConnector sourceJobConnector = new MantisSourceJobConnector();
MantisSourceJobConnector sourceJobConnector = new MantisSourceJobConnector(true);
MantisSSEJob job = sourceJobConnector.connectToJob("TestSourceJob", params);
Subscription subscription = job.connectAndGetObservable()
.doOnNext(o -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@
public class MantisSourceJobConnectorFactory {

public static MantisSourceJobConnector getConnector() {
return new MantisSourceJobConnector();
return new MantisSourceJobConnector(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.mantisrx.server.master.client;

import com.mantisrx.common.utils.Services;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
Expand Down Expand Up @@ -82,6 +83,17 @@ public static HighAvailabilityServices createHAServices(CoreConfiguration config
return HAServiceInstanceRef.get();
}

// This getter is used in situations where the context does not know the core configuration. For example, this
// is used to create a MantisClient when configuring a JobSource, where a job instance does not know how Mantis
// is configured.
// Note that in this context, the agent should have configured HighAvailabilityServices.
public static HighAvailabilityServices get() {
if (HAServiceInstanceRef.get() == null) {
throw new RuntimeException("HighAvailabilityServices have not been initialized");
}
return HAServiceInstanceRef.get();
}

private static class LocalHighAvailabilityServices extends AbstractIdleService implements HighAvailabilityServices {
private final MasterMonitor masterMonitor;
private final CoreConfiguration configuration;
Expand Down Expand Up @@ -131,6 +143,7 @@ private static class HighAvailabilityServicesImpl extends AbstractIdleService im
private final MasterMonitor masterMonitor;
private final Counter resourceLeaderChangeCounter;
private final Counter resourceLeaderAlreadyRegisteredCounter;
private final Counter resourceLeaderIsEmptyCounter;
private final AtomicInteger rmConnections = new AtomicInteger(0);
private final CoreConfiguration configuration;

Expand All @@ -152,9 +165,11 @@ public HighAvailabilityServicesImpl(CoreConfiguration configuration) {
.name(metricsGroup)
.addCounter("resourceLeaderChangeCounter")
.addCounter("resourceLeaderAlreadyRegisteredCounter")
.addCounter("resourceLeaderIsEmptyCounter")
.build());
resourceLeaderChangeCounter = metrics.getCounter("resourceLeaderChangeCounter");
resourceLeaderAlreadyRegisteredCounter = metrics.getCounter("resourceLeaderAlreadyRegisteredCounter");
resourceLeaderIsEmptyCounter = metrics.getCounter("resourceLeaderIsEmptyCounter");

}

Expand Down Expand Up @@ -209,7 +224,12 @@ public void register(ResourceLeaderChangeListener<ResourceClusterGateway> change
.subscribe(nextDescription -> {
log.info("nextDescription={}", nextDescription);

if (nextDescription.equals(((ResourceClusterGatewayClient)currentResourceClusterGateway).getMasterDescription())) {
// We do not want to update if the master is set to null. This is usually due to a newly
// initialized master monitor.
if (nextDescription.equals(MasterDescription.MASTER_NULL)) {
resourceLeaderIsEmptyCounter.increment();
return;
} else if (nextDescription.equals(((ResourceClusterGatewayClient)currentResourceClusterGateway).getMasterDescription())) {
resourceLeaderAlreadyRegisteredCounter.increment();
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public class MasterDescription {
private final long createTime;
private final int consolePort;

public static final MasterDescription MASTER_NULL =
new MasterDescription("NONE", "localhost", -1, -1, -1, "uri://", -1, -1L);

@JsonCreator
@JsonIgnoreProperties(ignoreUnknown = true)
public MasterDescription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@
import rx.Observable;
import rx.subjects.BehaviorSubject;


@Slf4j
public class DynamoDBMasterMonitor extends BaseService implements MasterMonitor {

private static final Logger logger = LoggerFactory.getLogger(DynamoDBMasterMonitor.class);

public static final MasterDescription MASTER_NULL =
new MasterDescription("NONE", "localhost", -1, -1, -1, "uri://", -1, -1L);

private final ThreadFactory monitorThreadFactory = r -> {
Thread thread = new Thread(r);
thread.setName("dynamodb-monitor-" + System.currentTimeMillis());
Expand Down Expand Up @@ -92,7 +92,7 @@ public DynamoDBMasterMonitor(
String partitionKey,
Duration pollInterval,
Duration gracefulShutdown) {
masterSubject = BehaviorSubject.create(MASTER_NULL);
masterSubject = BehaviorSubject.create(MasterDescription.MASTER_NULL);
this.lockClient = lockClient;
this.partitionKey = partitionKey;
this.pollInterval = pollInterval;
Expand Down Expand Up @@ -147,8 +147,6 @@ private void getCurrentLeader() {
if (optionalLock.isPresent()) {
final LockItem lock = optionalLock.get();
nextDescription = lock.getData().map(this::bytesToMaster).orElse(null);
logger.warn("failed to decode leader bytes");
this.lockDecodeFailedCounter.increment();
} else {
nextDescription = null;
logger.warn("no leader found");
Expand All @@ -163,8 +161,8 @@ private void getCurrentLeader() {
}

private void updateLeader(@Nullable MasterDescription nextDescription) {
final MasterDescription prev = Optional.ofNullable(masterSubject.getValue()).orElse(MASTER_NULL);
final MasterDescription next = (nextDescription == null) ? MASTER_NULL : nextDescription;
final MasterDescription prev = Optional.ofNullable(masterSubject.getValue()).orElse(MasterDescription.MASTER_NULL);
final MasterDescription next = (nextDescription == null) ? MasterDescription.MASTER_NULL : nextDescription;
if (!prev.equals(next)) {
logger.info("leader changer information previous {} and next {}", prev.getHostname(), next.getHostname());
masterSubject.onNext(next);
Expand All @@ -183,8 +181,9 @@ private MasterDescription bytesToMaster(ByteBuffer data) {
return jsonMapper.readValue(bytes, MasterDescription.class);
} catch (IOException e) {
logger.error("unable to parse master description bytes: {}", data, e);
this.lockDecodeFailedCounter.increment();
}
return MASTER_NULL;
return MasterDescription.MASTER_NULL;
}

@Override
Expand All @@ -201,6 +200,6 @@ public Observable<MasterDescription> getMasterObservable() {
@Override
@Nullable
public MasterDescription getLatestMaster() {
return Optional.ofNullable(masterSubject.getValue()).orElse(MASTER_NULL);
return Optional.ofNullable(masterSubject.getValue()).orElse(MasterDescription.MASTER_NULL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void highAvailabilityServices() throws InterruptedException, IOException

// We can, depending on timing, sometimes get a MASTER_NULL value which is safe to ignore.
MasterDescription[] actualLeaders = testSubscriber.getOnNextEvents().stream()
.filter(md -> md != DynamoDBMasterMonitor.MASTER_NULL)
.filter(md -> md != MasterDescription.MASTER_NULL)
.collect(Collectors.toList())
.toArray(new MasterDescription[]{});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.mantisrx.extensions.dynamodb;

import static io.mantisrx.extensions.dynamodb.DynamoDBMasterMonitor.MASTER_NULL;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.*;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -79,7 +78,7 @@ public void getCurrentLeader() throws JsonProcessingException, InterruptedExcept
TestSubscriber<MasterDescription> testSubscriber = new TestSubscriber<>();
m.getMasterObservable().subscribe(testSubscriber);
m.start();
assertEquals(MASTER_NULL, m.getLatestMaster());
assertEquals(MasterDescription.MASTER_NULL, m.getLatestMaster());
lockSupport.takeLock(lockKey, otherMaster);
await()
.atLeast(DynamoDBLockSupportRule.heartbeatDuration)
Expand All @@ -93,7 +92,7 @@ public void getCurrentLeader() throws JsonProcessingException, InterruptedExcept
.pollDelay(DynamoDBLockSupportRule.heartbeatDuration)
.atMost(Duration.ofMillis(DynamoDBLockSupportRule.heartbeatDuration.toMillis()*2))
.untilAsserted(() -> assertEquals(m.getLatestMaster(), thatMaster));
testSubscriber.assertValues(MASTER_NULL, otherMaster, thatMaster);
testSubscriber.assertValues(MasterDescription.MASTER_NULL, otherMaster, thatMaster);
m.shutdown();
}

Expand Down Expand Up @@ -134,7 +133,7 @@ public void monitorDoesNotReturnNull() throws IOException, InterruptedException
.atLeast(DynamoDBLockSupportRule.heartbeatDuration)
.pollDelay(DynamoDBLockSupportRule.heartbeatDuration)
.atMost(Duration.ofMillis(DynamoDBLockSupportRule.heartbeatDuration.toMillis()*2))
.untilAsserted(() -> assertEquals(MASTER_NULL, m.getLatestMaster()));
.untilAsserted(() -> assertEquals(MasterDescription.MASTER_NULL, m.getLatestMaster()));
lockSupport.releaseLock(lockKey);

m.shutdown();
Expand Down
Loading