Skip to content

Commit

Permalink
Merge pull request #11 from synadia-io/sd-fork-integration
Browse files Browse the repository at this point in the history
JetStream Support
  • Loading branch information
scottf authored Nov 15, 2024
2 parents 9ec25bb + 8c47d9c commit 2caff5c
Show file tree
Hide file tree
Showing 47 changed files with 1,625 additions and 349 deletions.
7 changes: 4 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ plugins {
id 'signing'
}

def jarVersion = "1.0.1"
def jarVersion = "2.0.0-beta1"
group = 'io.synadia'

def isMerge = System.getenv("BUILD_EVENT") == "push"
Expand All @@ -29,13 +29,14 @@ java {

repositories {
mavenCentral()

maven {
url "https://oss.sonatype.org/content/repositories/releases/"
}
}

dependencies {
implementation 'io.nats:jnats:2.18.1'
implementation 'io.nats:jnats:2.20.2'

ext.flinkVersion='1.17.1'
implementation "org.apache.flink:flink-core:${flinkVersion}"
Expand All @@ -47,7 +48,7 @@ dependencies {
implementation 'org.slf4j:slf4j-api:2.0.9'

testImplementation 'org.slf4j:slf4j-simple:2.0.9'
testImplementation 'io.nats:jnats-server-runner:1.2.5'
testImplementation 'io.nats:jnats-server-runner:1.2.8'
testImplementation 'org.junit.jupiter:junit-jupiter:5.7.0'
testImplementation 'com.github.stefanbirkner:system-lambda:1.2.1'
testImplementation 'nl.jqno.equalsverifier:equalsverifier:3.12.3'
Expand Down
2 changes: 2 additions & 0 deletions env.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
set JAVA_HOME=C:\Program Files\Java\jdk-11.0.16.1
set PATH=C:\Program Files\Java\jdk-11.0.16.1\bin;%PATH%
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
import io.nats.client.Dispatcher;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.synadia.flink.Constants;
import io.synadia.flink.Utils;
import io.synadia.flink.examples.support.ExampleConnectionListener;
import io.synadia.flink.examples.support.ExampleErrorListener;
import io.synadia.flink.examples.support.Publisher;
import io.synadia.flink.sink.NatsSink;
import io.synadia.flink.sink.NatsSinkBuilder;
import io.synadia.flink.source.NatsSource;
import io.synadia.flink.source.NatsSourceBuilder;
import io.synadia.flink.utils.PropertiesUtils;
import io.synadia.flink.v0.sink.NatsSink;
import io.synadia.flink.v0.sink.NatsSinkBuilder;
import io.synadia.flink.v0.source.NatsSource;
import io.synadia.flink.v0.source.NatsSourceBuilder;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
Expand All @@ -26,7 +25,7 @@
public class SourceToSinkExample {
public static void main(String[] args) throws Exception {
// load properties from a file for example application.properties
Properties props = Utils.loadPropertiesFromFile("src/examples/resources/application.properties");
Properties props = PropertiesUtils.loadPropertiesFromFile("src/examples/resources/application.properties");

// make a connection to publish and listen with
// props has io.nats.client.url in it
Expand All @@ -36,15 +35,15 @@ public static void main(String[] args) throws Exception {
// the source will have missed some messages by the time it gets running
// but that's typical for a non-stream subject and something for
// the developer to plan for
List<String> sourceSubjects = Utils.getPropertyAsList(props, Constants.SOURCE_SUBJECTS);
List<String> sourceSubjects = PropertiesUtils.getPropertyAsList(props, "source.subjects");
Publisher publisher = new Publisher(nc, sourceSubjects);
new Thread(publisher).start();

// listen for messages that the sink publishes
Dispatcher dispatcher = nc.createDispatcher(m -> {
System.out.printf("Listening. Subject: %s Payload: %s\n", m.getSubject(), new String(m.getData()));
});
List<String> sinkSubjects = Utils.getPropertyAsList(props, Constants.SINK_SUBJECTS);
List<String> sinkSubjects = PropertiesUtils.getPropertyAsList(props, "sink.subjects");
for (String subject : sinkSubjects) {
dispatcher.subscribe(subject);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,77 @@
import io.nats.client.*;
import io.nats.client.support.Status;

/**
* This error listener is very similar to io.nats.client.impl.ErrorListenerConsoleImpl
*/
public class ExampleErrorListener implements ErrorListener {

/**
* {@inheritDoc}
*/
@Override
public void errorOccurred(final Connection conn, final String error) {
System.err.println(supplyMessage("errorOccurred", conn, null, null, "Error: ", error));
}

/**
* {@inheritDoc}
*/
@Override
public void exceptionOccurred(final Connection conn, final Exception exp) {
System.err.println(supplyMessage("exceptionOccurred", conn, null, null, "Exception: ", exp));
}

/**
* {@inheritDoc}
*/
@Override
public void slowConsumerDetected(final Connection conn, final Consumer consumer) {
System.err.println(supplyMessage("slowConsumerDetected", conn, consumer, null));
}

/**
* {@inheritDoc}
*/
@Override
public void messageDiscarded(final Connection conn, final Message msg) {
System.out.println(supplyMessage("messageDiscarded", conn, null, null, "Message: ", msg));
}

/**
* {@inheritDoc}
*/
@Override
public void heartbeatAlarm(final Connection conn, final JetStreamSubscription sub,
final long lastStreamSequence, final long lastConsumerSequence) {
System.err.println(supplyMessage("heartbeatAlarm", conn, null, sub, "lastStreamSequence: ", lastStreamSequence, "lastConsumerSequence: ", lastConsumerSequence));
}

/**
* {@inheritDoc}
*/
@Override
public void unhandledStatus(final Connection conn, final JetStreamSubscription sub, final Status status) {
System.err.println(supplyMessage("unhandledStatus", conn, null, sub, "Status:", status));
}

/**
* {@inheritDoc}
*/
@Override
public void pullStatusWarning(Connection conn, JetStreamSubscription sub, Status status) {
// this warning is usually ignored
// System.out.println(supplyMessage("pullStatusWarning", conn, null, sub, "Status:", status));
}

/**
* {@inheritDoc}
*/
@Override
public void pullStatusError(Connection conn, JetStreamSubscription sub, Status status) {
System.err.println(supplyMessage("pullStatusError", conn, null, sub, "Status:", status));
}

/**
* {@inheritDoc}
*/
@Override
public void flowControlProcessed(Connection conn, JetStreamSubscription sub, String id, FlowControlSource source) {
System.out.println(supplyMessage("flowControlProcessed", conn, null, sub, "FlowControlSource:", source));
Expand Down
4 changes: 2 additions & 2 deletions src/examples/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
io.nats.client.url=nats://localhost:4222

source.subjects=source1,source2
source.payload.deserializer=io.synadia.flink.payload.StringPayloadDeserializer
source.payload.deserializer=io.synadia.flink.v0.payload.StringPayloadDeserializer

sink.subjects=sink1,sink2
sink.payload.serializer=io.synadia.flink.payload.StringPayloadSerializer
sink.payload.serializer=io.synadia.flink.v0.payload.StringPayloadSerializer
sink.startup.jitter.min=1
sink.startup.jitter.max=1000
16 changes: 0 additions & 16 deletions src/main/java/io/synadia/flink/Constants.java

This file was deleted.

144 changes: 0 additions & 144 deletions src/main/java/io/synadia/flink/Utils.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.synadia.flink.common;
package io.synadia.flink.utils;

import io.nats.client.Connection;
import io.nats.client.Nats;
Expand All @@ -8,8 +8,8 @@
import java.io.Serializable;
import java.util.Properties;

import static io.synadia.flink.Utils.jitter;
import static io.synadia.flink.Utils.loadPropertiesFromFile;
import static io.synadia.flink.utils.PropertiesUtils.jitter;
import static io.synadia.flink.utils.PropertiesUtils.loadPropertiesFromFile;

public class ConnectionFactory implements Serializable {
private final Properties connectionProperties;
Expand Down
36 changes: 36 additions & 0 deletions src/main/java/io/synadia/flink/utils/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2023-2024 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.flink.utils;

import org.apache.flink.connector.base.source.reader.SourceReaderOptions;

public interface Constants {
String NATS_PREFIX = "nats.";
String SOURCE_PREFIX = "source.";
String SINK_PREFIX = "sink.";

String SUBJECTS = "subjects";
String STARTUP_JITTER_MIN = "startup.jitter.min";
String STARTUP_JITTER_MAX = "startup.jitter.max";
String PAYLOAD_DESERIALIZER = "payload.deserializer";
String PAYLOAD_SERIALIZER = "payload.serializer";

String READER_ELEMENT_QUEUE_CAPACITY = "reader.element.queue.capacity";
int DEFAULT_ELEMENT_QUEUE_CAPACITY = SourceReaderOptions.ELEMENT_QUEUE_CAPACITY.defaultValue();

String FETCH_ONE_MESSAGE_TIMEOUT = "fetch.one.timeout";
long DEFAULT_FETCH_ONE_MESSAGE_TIMEOUT_MS = 1000;

String MAX_FETCH_RECORDS = "max.fetch.records";
int DEFAULT_MAX_FETCH_RECORDS = 100;

String FETCH_TIMEOUT = "fetch.timeout";
long DEFAULT_FETCH_TIMEOUT_MS = 1000;

String AUTO_ACK_INTERVAL = "auto.ack.interval";
long DEFAULT_AUTO_ACK_INTERVAL_MS = 5000;

String ENABLE_AUTO_ACK = "enable.auto.ack";
boolean DEFAULT_ENABLE_AUTO_ACK = false;
}
Loading

0 comments on commit 2caff5c

Please sign in to comment.