-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixes to Source Job Connector and Master Monitor Logging #703
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ | |
import io.mantisrx.runtime.parameter.Parameter; | ||
import io.mantisrx.runtime.parameter.SinkParameters; | ||
import io.mantisrx.server.master.client.ConditionalRetry; | ||
import io.mantisrx.server.master.client.HighAvailabilityServices; | ||
import io.mantisrx.server.master.client.NoSuchJobException; | ||
import io.reactivx.mantis.operators.DropOperator; | ||
import java.io.Closeable; | ||
|
@@ -179,10 +180,6 @@ public static class Builder { | |
private Observer<SinkConnectionsStatus> sinkConnectionsStatusObserver = null; | ||
private long dataRecvTimeoutSecs = 5; | ||
|
||
public Builder(Properties properties) { | ||
this(new MantisClient(properties)); | ||
} | ||
|
||
public Builder() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We were seeing log spam, where a JobSource triggers this code path. It doesn't appear to break anything, but seems a bit weird and leads to log spam. My understanding is that the HA services should generally be configured if we are here, so should just try to get the reference to HA services. |
||
Properties properties = new Properties(); | ||
properties.setProperty("mantis.zookeeper.connectionTimeMs", "1000"); | ||
|
@@ -191,10 +188,18 @@ public Builder() { | |
properties.setProperty("mantis.zookeeper.connectString", System.getenv("mantis.zookeeper.connectString")); | ||
properties.setProperty("mantis.zookeeper.root", System.getenv("mantis.zookeeper.root")); | ||
properties.setProperty("mantis.zookeeper.leader.announcement.path", | ||
System.getenv("mantis.zookeeper.leader.announcement.path")); | ||
System.getenv("mantis.zookeeper.leader.announcement.path")); | ||
mantisClient = new MantisClient(properties); | ||
} | ||
|
||
public Builder(HighAvailabilityServices haServices) { | ||
this(new MantisClient(haServices)); | ||
} | ||
|
||
public Builder(Properties properties) { | ||
this(new MantisClient(properties)); | ||
} | ||
|
||
public Builder(MantisClient mantisClient) { | ||
this.mantisClient = mantisClient; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,8 @@ | |
import java.util.Properties; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import io.mantisrx.server.master.client.HighAvailabilityServicesUtil; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import rx.Observer; | ||
|
@@ -59,12 +61,17 @@ public class MantisSourceJobConnector { | |
private static final String ZK_ROOT = "mantis.zookeeper.root"; | ||
private static final String ZK_LEADER_PATH = "mantis.zookeeper.leader.announcement.path"; | ||
|
||
public MantisSourceJobConnector(Properties props) { | ||
this.props = props; | ||
public MantisSourceJobConnector(boolean configureDefaults) { | ||
if (configureDefaults) { | ||
props = defaultProperties(); | ||
} else { | ||
props = null; | ||
} | ||
} | ||
|
||
public MantisSourceJobConnector() { | ||
props = new Properties(); | ||
// todo(kmg-stripe): Can we remove this? It seems it is only used by main in this class for testing. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. feel free to remove this (or move it to test). |
||
private static Properties defaultProperties() { | ||
Properties props = new Properties(); | ||
|
||
final String defaultZkConnect = "127.0.0.1:2181"; | ||
final String defaultZkRoot = "/mantis/master"; | ||
|
@@ -99,6 +106,7 @@ public MantisSourceJobConnector() { | |
} | ||
|
||
LOGGER.info("Mantis Zk settings used for Source Job connector: connectString {} root {} path {}", connectString, zookeeperRoot, zookeeperLeaderAnnouncementPath); | ||
return props; | ||
} | ||
|
||
@Deprecated | ||
|
@@ -130,7 +138,8 @@ public MantisSSEJob connectToJob( | |
String jobName, | ||
SinkParameters params, | ||
Observer<SinkConnectionsStatus> sinkObserver) { | ||
return new MantisSSEJob.Builder(props) | ||
MantisSSEJob.Builder builder = props != null ? new MantisSSEJob.Builder(props) : new MantisSSEJob.Builder(HighAvailabilityServicesUtil.get()); | ||
return builder | ||
.name(jobName) | ||
.sinkConnectionsStatusObserver(sinkObserver) | ||
.onConnectionReset(throwable -> LOGGER.error("Reconnecting due to error: " + throwable.getMessage())) | ||
|
@@ -163,7 +172,7 @@ public static void main(String[] args) { | |
Args.parse(MantisSourceJobConnector.class, args); | ||
|
||
final CountDownLatch latch = new CountDownLatch(20); | ||
MantisSourceJobConnector sourceJobConnector = new MantisSourceJobConnector(); | ||
MantisSourceJobConnector sourceJobConnector = new MantisSourceJobConnector(true); | ||
MantisSSEJob job = sourceJobConnector.connectToJob("TestSourceJob", params); | ||
Subscription subscription = job.connectAndGetObservable() | ||
.doOnNext(o -> { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest adding a javadoc here to let user know this will start running the HAService. Also in this case who should be closing the haService if it creates other threads?