Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update pravega version and enable connection pooling #419

Merged
merged 2 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion driver-pravega/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ repositories {

dependencies {
api project(":sbk-api")
api "io.pravega:pravega-client:0.10.0"
api "io.pravega:pravega-client:0.12.0"
}
5 changes: 4 additions & 1 deletion driver-pravega/src/main/java/io/sbk/Pravega/Pravega.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public void addArgs(final InputOptions params) throws IllegalArgumentException {
params.addOption("segments", true, "Number of segments, default :" + config.segmentCount);
params.addOption("recreate", true,
"If the stream is already existing, delete and recreate the same, default :" + config.recreate);
params.addOption("connpool", true, "Enable Connection pooling, default :" + config.connPooling);
}

@Override
Expand All @@ -74,6 +75,8 @@ public void parseArgs(final ParameterOptions params) throws IllegalArgumentExcep
} else {
config.recreate = params.getWritersCount() > 0 && params.getReadersCount() > 0;
}
config.connPooling = Boolean.parseBoolean(params.getOptionValue("connpool",
Boolean.toString(config.connPooling)));

if (config.recreate) {
rdGrpName = config.streamName + System.currentTimeMillis();
Expand Down Expand Up @@ -127,7 +130,7 @@ public void closeStorage(final ParameterOptions params) throws IOException {
@Override
public DataWriter<byte[]> createWriter(final int id, final ParameterOptions params) {
try {
return new PravegaWriter(id, params, config.streamName, factory);
return new PravegaWriter(id, params, config.streamName, factory, config.connPooling);
} catch (IOException ex) {
ex.printStackTrace();
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ public class PravegaConfig {
public String streamName;
public int segmentCount;
public boolean recreate;
public boolean connPooling;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@
public class PravegaWriter implements Writer<byte[]> {
final EventStreamWriter<byte[]> producer;

public PravegaWriter(int id, ParameterOptions params, String streamName, EventStreamClientFactory factory) throws IOException {
public PravegaWriter(int id, ParameterOptions params, String streamName, EventStreamClientFactory factory,
boolean connectionPooling) throws IOException {
this.producer = factory.createEventWriter(streamName,
new ByteArraySerializer(),
EventWriterConfig.builder().build());
EventWriterConfig.builder()
.enableConnectionPooling(connectionPooling)
.build());
}

/**
Expand Down
5 changes: 4 additions & 1 deletion driver-pravega/src/main/resources/pravega.properties
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@ segmentCount=1

# Create the topic
# if the writers and readers are supplied then it is set to true
recreate=false
recreate=false

#Disable Connection Pooling by default
connPooling=false