Skip to content

Commit

Permalink
Merge pull request #24 from RADAR-base/release-0.2.1
Browse files Browse the repository at this point in the history
Release 0.2.1
  • Loading branch information
blootsvoets authored Jan 9, 2019
2 parents a3bbebf + f2ff022 commit 1938b64
Show file tree
Hide file tree
Showing 27 changed files with 594 additions and 101 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ COPY ./kafka-connect-fitbit-source/src/ /code/kafka-connect-fitbit-source/src

RUN ./gradlew jar

FROM confluentinc/cp-kafka-connect-base:5.0.0
FROM confluentinc/cp-kafka-connect-base:5.1.0

MAINTAINER Joris Borgdorff <joris@thehyve.nl>

Expand Down
16 changes: 14 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ your Fitbit App client ID and client secret. The following tables shows the poss
<tr>
<td>rest.source.base.url</td></td><td>Base URL for REST source connector.</td></td><td>string</td></td><td></td></td><td></td></td><td>high</td></td></tr>
<tr>
<td>rest.source.destination.topics</td></td><td>The list of destination topics for the REST source connector.</td></td><td>list</td></td><td>""</td></td><td></td></td><td>high</td></td></tr>
<tr>
<td>rest.source.topic.selector</td></td><td>The topic selector class for REST source connector.</td></td><td>class</td></td><td>org.radarbase.connect.rest.selector.SimpleTopicSelector</td></td><td>Class extending org.radarbase.connect.rest.selector.TopicSelector</td></td><td>high</td></td></tr>
<tr>
<td>rest.source.payload.converter.class</td></td><td>Class to be used to convert messages from REST calls to SourceRecords</td></td><td>class</td></td><td>org.radarbase.connect.rest.converter.StringPayloadConverter</td></td><td>Class extending org.radarbase.connect.rest.converter.PayloadToSourceRecordConverter</td></td><td>low</td></td></tr>
<tr>
<td>rest.source.request.generator.class</td></td><td>Class to be used to generate REST requests</td></td><td>class</td></td><td>org.radarbase.connect.rest.single.SingleRequestGenerator</td></td><td>Class extending org.radarbase.connect.rest.request.RequestGenerator</td></td><td>low</td></td></tr>
<tr>
<td>fitbit.users</td></td><td>The user ID of Fitbit users to include in polling, separated by commas. Non existing user names will be ignored. If empty, all users in the user directory will be used.</td></td><td>list</td></td><td>""</td></td><td></td></td><td>high</td></td></tr>
Expand All @@ -47,10 +53,14 @@ your Fitbit App client ID and client secret. The following tables shows the poss
<tr>
<td>fitbit.api.secret</td></td><td>Secret for the Fitbit API client set in fitbit.api.client.</td></td><td>password</td></td><td></td></td><td></td></td><td>high</td></td></tr>
<tr>
<td>fitbit.user.repository.class</td></td><td>Class for managing users and authentication.</td></td><td>class</td></td><td>org.radarbase.connect.rest.fitbit.user.YamlFitbitUserRepository</td></td><td>Class extending org.radarbase.connect.rest.fitbit.user.FitbitUserRepository</td></td><td>medium</td></td></tr>
<td>fitbit.api.intraday</td></td><td>Set to true if the client has permissions to Fitbit Intraday API, false otherwise.</td></td><td>boolean</td></td><td>true</td></td><td></td></td><td>medium</td></td></tr>
<tr>
<td>fitbit.user.repository.class</td></td><td>Class for managing users and authentication.</td></td><td>class</td></td><td>org.radarbase.connect.rest.fitbit.user.YamlUserRepository</td></td><td>Class extending org.radarbase.connect.rest.fitbit.user.UserRepository</td></td><td>medium</td></td></tr>
<tr>
<td>fitbit.user.dir</td></td><td>Directory containing Fitbit user information and credentials. Only used if a file-based user repository is configured.</td></td><td>string</td></td><td>/var/lib/kafka-connect-fitbit-source/users</td></td><td></td></td><td>low</td></td></tr>
<tr>
<td>fitbit.user.repository.url</td></td><td>URL for webservice containing user credentials. Only used if a webservice-based user repository is configured.</td></td><td>string</td></td><td>""</td></td><td></td></td><td>low</td></td></tr>
<tr>
<td>fitbit.max.users.per.poll</td></td><td>Maximum number of users to query in a single poll operation. Decrease this if memory constrains are pressing.</td></td><td>int</td></td><td>100</td></td><td>[1,...]</td></td><td>low</td></td></tr>
<tr>
<td>fitbit.intraday.steps.topic</td></td><td>Topic for Fitbit intraday steps</td></td><td>string</td></td><td>connect_fitbit_intraday_steps</td></td><td>non-empty string without control characters</td></td><td>low</td></td></tr>
Expand All @@ -61,7 +71,9 @@ your Fitbit App client ID and client secret. The following tables shows the poss
<tr>
<td>fitbit.sleep.classic.topic</td></td><td>Topic for Fitbit sleep classic data</td></td><td>string</td></td><td>connect_fitbit_sleep_classic</td></td><td>non-empty string without control characters</td></td><td>low</td></td></tr>
<tr>
<td>fitbit.time.zone.topic</td></td><td>Topic for Fitbit profile timezone</td></td><td>string</td></td><td>connect_fitbit_time_zone</td></td><td>non-empty string without control characters</td></td><td>low</td></td></tr>
<td>fitbit.time.zone.topic</td></td><td>Topic for Fitbit profile time zone</td></td><td>string</td></td><td>connect_fitbit_time_zone</td></td><td>non-empty string without control characters</td></td><td>low</td></td></tr>
<tr>
<td>fitbit.activity.log.topic</td></td><td>Topic for Fitbit activity log.</td></td><td>string</td></td><td>connect_fitbit_activity_log</td></td><td>non-empty string without control characters</td></td><td>low</td></td></tr>
</tbody></table>

Now you can run a full Kafka stack using
Expand Down
8 changes: 4 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ description = 'kafka-connect-rest-source'

subprojects {
ext {
kafkaVersion = '2.0.0'
confluentVersion = '5.0.0'
jacksonVersion = '2.8.9'
kafkaVersion = '2.1.0-cp1'
confluentVersion = '5.1.0'
jacksonVersion = '2.9.8'
}

apply plugin: 'java'
apply plugin: 'java-library'

group = 'org.radarcns'
version = '0.2.0'
version = '0.2.1'

sourceCompatibility = 1.8
targetCompatibility = 1.8
Expand Down
16 changes: 8 additions & 8 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ services:
# Zookeeper Cluster #
#---------------------------------------------------------------------------#
zookeeper-1:
image: confluentinc/cp-zookeeper:5.0.0
image: confluentinc/cp-zookeeper:5.1.0
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
Expand All @@ -19,7 +19,7 @@ services:
ZOOKEEPER_SERVERS: zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888

zookeeper-2:
image: confluentinc/cp-zookeeper:5.0.0
image: confluentinc/cp-zookeeper:5.1.0
environment:
ZOOKEEPER_SERVER_ID: 2
ZOOKEEPER_CLIENT_PORT: 2181
Expand All @@ -29,7 +29,7 @@ services:
ZOOKEEPER_SERVERS: zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888

zookeeper-3:
image: confluentinc/cp-zookeeper:5.0.0
image: confluentinc/cp-zookeeper:5.1.0
environment:
ZOOKEEPER_SERVER_ID: 3
ZOOKEEPER_CLIENT_PORT: 2181
Expand All @@ -42,7 +42,7 @@ services:
# Kafka Cluster #
#---------------------------------------------------------------------------#
kafka-1:
image: confluentinc/cp-kafka:5.0.0
image: confluentinc/cp-kafka:5.1.0
depends_on:
- zookeeper-1
- zookeeper-2
Expand All @@ -61,7 +61,7 @@ services:
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false"

kafka-2:
image: confluentinc/cp-kafka:5.0.0
image: confluentinc/cp-kafka:5.1.0
depends_on:
- zookeeper-1
- zookeeper-2
Expand All @@ -80,7 +80,7 @@ services:
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false"

kafka-3:
image: confluentinc/cp-kafka:5.0.0
image: confluentinc/cp-kafka:5.1.0
depends_on:
- zookeeper-1
- zookeeper-2
Expand All @@ -102,7 +102,7 @@ services:
# Schema Registry #
#---------------------------------------------------------------------------#
schema-registry-1:
image: confluentinc/cp-schema-registry:5.0.0
image: confluentinc/cp-schema-registry:5.1.0
depends_on:
- zookeeper-1
- zookeeper-2
Expand All @@ -124,7 +124,7 @@ services:
# REST proxy #
#---------------------------------------------------------------------------#
rest-proxy-1:
image: confluentinc/cp-kafka-rest:5.0.0
image: confluentinc/cp-kafka-rest:5.1.0
depends_on:
- zookeeper-1
- zookeeper-2
Expand Down
10 changes: 7 additions & 3 deletions kafka-connect-fitbit-source/build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
repositories {
maven { url 'https://oss.jfrog.org/artifactory/oss-snapshot-local' }
}

dependencies {
api project(':kafka-connect-rest-source')
api group: 'io.confluent', name: 'kafka-connect-avro-converter', version: confluentVersion
api group: 'org.radarcns', name: 'radar-schemas-commons', version: '0.3.7-SNAPSHOT'
api group: 'org.radarcns', name: 'radar-schemas-commons', version: '0.4.4-SNAPSHOT'

implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: jacksonVersion
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: jacksonVersion
Expand All @@ -10,8 +14,8 @@ dependencies {
compileOnly group: 'org.apache.kafka', name: 'connect-api', version: kafkaVersion
compileOnly group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: jacksonVersion

testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: '5.2.0'
testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine', version: '5.2.0'
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: '5.3.2'
testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine', version: '5.3.2'
testRuntimeOnly group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.25'
testImplementation group: 'org.apache.kafka', name: 'connect-api', version: kafkaVersion
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -64,6 +65,11 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig {
private static final String FITBIT_USER_REPOSITORY_DOC = "Class for managing users and authentication.";
private static final String FITBIT_USER_REPOSITORY_DISPLAY = "User repository class";

public static final String FITBIT_API_INTRADAY_ACCESS_CONFIG = "fitbit.api.intraday";
private static final String FITBIT_API_INTRADAY_ACCESS_DOC = "Set to true if the client has permissions to Fitbit Intraday API, false otherwise.";
private static final boolean FITBIT_API_INTRADAY_ACCESS_DEFAULT = false;
private static final String FITBIT_API_INTRADAY_ACCESS_DISPLAY = "Is Fitbit Intraday API available?";

public static final String FITBIT_USER_CREDENTIALS_DIR_CONFIG = "fitbit.user.dir";
private static final String FITBIT_USER_CREDENTIALS_DIR_DOC = "Directory containing Fitbit user information and credentials. Only used if a file-based user repository is configured.";
private static final String FITBIT_USER_CREDENTIALS_DIR_DISPLAY = "User directory";
Expand Down Expand Up @@ -95,21 +101,26 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig {
private static final String FITBIT_SLEEP_CLASSIC_TOPIC_DISPLAY = "Classic sleep topic";

private static final String FITBIT_TIME_ZONE_TOPIC_CONFIG = "fitbit.time.zone.topic";
private static final String FITBIT_TIME_ZONE_TOPIC_DOC = "Topic for Fitbit profile timezone";
private static final String FITBIT_TIME_ZONE_TOPIC_DOC = "Topic for Fitbit profile time zone";
private static final String FITBIT_TIME_ZONE_TOPIC_DEFAULT = "connect_fitbit_time_zone";
private static final String FITBIT_TIME_ZONE_TOPIC_DISPLAY = "Timezone topic";
private static final String FITBIT_TIME_ZONE_TOPIC_DISPLAY = "Time zone topic";

private static final String FITBIT_MAX_USERS_PER_POLL_CONFIG = "fitbit.max.users.per.poll";
private static final String FITBIT_MAX_USERS_PER_POLL_DOC = "Maximum number of users to query in a single poll operation. Decrease this if memory constrains are pressing.";
private static final int FITBIT_MAX_USERS_PER_POLL_DEFAULT = 100;
private static final String FITBIT_MAX_USERS_PER_POLL_DISPLAY = "Maximum users per poll";

private static final String FITBIT_ACTIVITY_LOG_TOPIC_CONFIG = "fitbit.activity.log.topic";
private static final String FITBIT_ACTIVITY_LOG_TOPIC_DOC = "Topic for Fitbit activity log.";
private static final String FITBIT_ACTIVITY_LOG_TOPIC_DEFAULT = "connect_fitbit_activity_log";
private static final String FITBIT_ACTIVITY_LOG_TOPIC_DISPLAY = "Activity log topic";

private final UserRepository userRepository;
private final Headers clientCredentials;

@SuppressWarnings("unchecked")
public FitbitRestSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
public FitbitRestSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig, boolean doLog) {
super(config, parsedConfig, doLog);

try {
userRepository = ((Class<? extends UserRepository>)
Expand All @@ -125,8 +136,8 @@ public FitbitRestSourceConnectorConfig(ConfigDef config, Map<String, String> par
this.clientCredentials = Headers.of("Authorization", "Basic " + credentialsBase64);
}

public FitbitRestSourceConnectorConfig(Map<String, String> parsedConfig) {
this(FitbitRestSourceConnectorConfig.conf(), parsedConfig);
public FitbitRestSourceConnectorConfig(Map<String, String> parsedConfig, boolean doLog) {
this(FitbitRestSourceConnectorConfig.conf(), parsedConfig, doLog);
}

public static ConfigDef conf() {
Expand Down Expand Up @@ -173,6 +184,16 @@ public String toString() {
Width.SHORT,
FITBIT_API_SECRET_DISPLAY)

.define(FITBIT_API_INTRADAY_ACCESS_CONFIG,
Type.BOOLEAN,
true,
Importance.MEDIUM,
FITBIT_API_INTRADAY_ACCESS_DOC,
group,
++orderInGroup,
Width.SHORT,
FITBIT_API_INTRADAY_ACCESS_DISPLAY)

.define(FITBIT_USER_REPOSITORY_CONFIG,
Type.CLASS,
YamlUserRepository.class,
Expand Down Expand Up @@ -269,6 +290,17 @@ public String toString() {
++orderInGroup,
Width.SHORT,
FITBIT_TIME_ZONE_TOPIC_DISPLAY)

.define(FITBIT_ACTIVITY_LOG_TOPIC_CONFIG,
Type.STRING,
FITBIT_ACTIVITY_LOG_TOPIC_DEFAULT,
nonControlChar,
Importance.LOW,
FITBIT_ACTIVITY_LOG_TOPIC_DOC,
group,
++orderInGroup,
Width.SHORT,
FITBIT_ACTIVITY_LOG_TOPIC_DISPLAY)
;
}

Expand Down Expand Up @@ -333,4 +365,12 @@ public Headers getClientCredentials() {
public long getMaxUsersPerPoll() {
return getInt(FITBIT_MAX_USERS_PER_POLL_CONFIG);
}

public String getActivityLogTopic() {
return getString(FITBIT_ACTIVITY_LOG_TOPIC_CONFIG);
}

public boolean hasIntradayAccess() {
return getBoolean(FITBIT_API_INTRADAY_ACCESS_CONFIG);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,15 @@ public void start(Map<String, String> props) {
executor.scheduleAtFixedRate(() -> {
try {
logger.info("Requesting latest user details...");
Set<? extends User> newUsers = getConfig(props).getUserRepository().stream()
Set<? extends User> newUsers = getConfig(props, false).getUserRepository().stream()
.collect(Collectors.toSet());
if (configuredUsers != null && !newUsers.equals(configuredUsers)) {
logger.info("User info mismatch found. Requesting reconfiguration...");
reconfigure();
}
} catch (IOException e) {
e.printStackTrace();
logger.warn("Failed to refresh users: {}", e.toString());
}

},0, 5, TimeUnit.MINUTES);
}

Expand All @@ -73,9 +72,13 @@ public void stop() {
configuredUsers = null;
}

private FitbitRestSourceConnectorConfig getConfig(Map<String, String> conf, boolean doLog) {
return new FitbitRestSourceConnectorConfig(conf, doLog);
}

@Override
public FitbitRestSourceConnectorConfig getConfig(Map<String, String> conf) {
return new FitbitRestSourceConnectorConfig(conf);
return getConfig(conf, true);
}

@Override
Expand Down
Loading

0 comments on commit 1938b64

Please sign in to comment.