Skip to content

Commit

Permalink
APP-2996: Datafeed ID repository for Datafeed Service V1 (#192)
Browse files Browse the repository at this point in the history
* Simplified configuration when the configuration of api client is the same

4 properties added to BdkConfig to specify the global configurations for host,
port, scheme and context. These properties can be used if they are not configured
in the api client configuration.

Version of configuration file is also removed because it seems useless.

* Using enhanced configuration in bdk example

* Add more test cases and update documentation

* Update documentation

* Datafeed Repository implementation

Create an interface DatafeedRepository for handling storing
a datafeed id which is created by using DatafeedServiceV1.

Create OnDiskDatafeedRepository implementing the DatafeedRepository
for storing datafeed id on disk.

In DatafeedServiceV1, users can use the OnDiskDatafeedRepository
implementation by default or we can inject their own repository, for
example: MongoDB, SQL storage...

Update unittest

* Update Thibault's comment

Rename DatafeedRepository to DatafeedIdRepository

Make DatafeedRepository#read return Optional<String>

* Update unittest
  • Loading branch information
symphony-hong authored Aug 28, 2020
1 parent 9623d47 commit 5b30c83
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 72 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.symphony.bdk.core.service.datafeed;

import java.util.Optional;

/**
* A repository interface for storing a datafeed id.
* By using {@link com.symphony.bdk.core.service.datafeed.impl.DatafeedServiceV1}, the created datafeed id should be persisted manually on the BDK side.
*/
public interface DatafeedIdRepository {

/**
* Persists the created datafeed id into the storage.
*
* @param datafeedId the datafeed id to be persisted.
*/
void write(String datafeedId);

/**
* Read the persisted datafeed id from the storage.
*
* @return The persisted datafeed id.
*/
Optional<String> read();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,16 @@
import com.symphony.bdk.core.auth.AuthSession;
import com.symphony.bdk.core.auth.exception.AuthUnauthorizedException;
import com.symphony.bdk.core.config.model.BdkConfig;
import com.symphony.bdk.core.service.datafeed.DatafeedIdRepository;
import com.symphony.bdk.gen.api.DatafeedApi;
import com.symphony.bdk.gen.api.model.V4Event;

import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand All @@ -44,15 +39,19 @@
@Slf4j
public class DatafeedServiceV1 extends AbstractDatafeedService {

private static final String DATAFEED_ID_FILE = "datafeed.id";

private final AtomicBoolean started = new AtomicBoolean();
private final DatafeedIdRepository datafeedRepository;
private String datafeedId;

public DatafeedServiceV1(DatafeedApi datafeedApi, AuthSession authSession, BdkConfig config) {
super(datafeedApi, authSession, config);
this.started.set(false);
this.datafeedId = null;
this(datafeedApi, authSession, config, new OnDiskDatafeedIdRepository(config));
}

public DatafeedServiceV1(DatafeedApi datafeedApi, AuthSession authSession, BdkConfig config, DatafeedIdRepository repository) {
super(datafeedApi, authSession, config);
this.started.set(false);
this.datafeedId = null;
this.datafeedRepository = repository;
}

/**
Expand All @@ -63,13 +62,10 @@ public void start() throws AuthUnauthorizedException, ApiException {
if (this.started.get()) {
throw new IllegalStateException("The datafeed service is already started");
}
this.datafeedId = this.retrieveDatafeedIdFromDisk();
Optional<String> persistedDatafeedId = this.retrieveDatafeed();

try {
if (this.datafeedId == null) {
this.datafeedId = this.createDatafeedAndSaveToDisk();
}

this.datafeedId = persistedDatafeedId.orElse(this.createDatafeed());
log.debug("Start reading events from datafeed {}", datafeedId);
this.started.set(true);
do {
Expand Down Expand Up @@ -115,7 +111,7 @@ private void readDatafeed() throws Throwable {
if (e.isClientError()) {
log.info("Recreate a new datafeed and try again");
try {
datafeedId = this.createDatafeedAndSaveToDisk();
datafeedId = this.createDatafeed();
} catch (Throwable throwable) {
e.addSuppressed(throwable);
}
Expand All @@ -127,14 +123,14 @@ private void readDatafeed() throws Throwable {
});
}

protected String createDatafeedAndSaveToDisk() throws Throwable {
log.debug("Start creating a new datafeed and save to disk");
protected String createDatafeed() throws Throwable {
log.debug("Start creating a new datafeed and persisting it");
Retry retry = this.getRetryInstance("Create Datafeed");
return retry.executeCheckedSupplier(() -> {
try {
String id = this.datafeedApi.v4DatafeedCreatePost(authSession.getSessionToken(), authSession.getKeyManagerToken()).getId();
this.writeDatafeedIdToDisk(id);
log.debug("Datafeed: {} was created and saved to disk", id);
this.datafeedRepository.write(id);
log.debug("Datafeed: {} was created and persisted", id);
return id;
} catch (ApiException e) {
if (e.isUnauthorized()) {
Expand All @@ -148,42 +144,9 @@ protected String createDatafeedAndSaveToDisk() throws Throwable {
});
}

protected String retrieveDatafeedIdFromDisk() {
log.debug("Start retrieving datafeed id from disk");
String datafeedId = null;
try {
File file = this.getDatafeedIdFile();
Path datafeedIdPath = Paths.get(file.getPath());
List<String> lines = Files.readAllLines(datafeedIdPath);
if (lines.isEmpty()) {
return null;
}
String[] persistedDatafeed = lines.get(0).split("@");
datafeedId = persistedDatafeed[0];
log.info("Retrieve datafeed id from persisted file: {}", datafeedId);
} catch (IOException e) {
log.debug("No persisted datafeed id could be retrieved from the filesystem");
}
return datafeedId;
}

protected void writeDatafeedIdToDisk(String datafeedId) {
String agentUrl = bdkConfig.getAgent().getHost() + ":" + bdkConfig.getAgent().getPort();
try {
FileUtils.writeStringToFile(this.getDatafeedIdFile(), datafeedId + "@" + agentUrl, StandardCharsets.UTF_8);
} catch (IOException e) {
log.error(e.getMessage());
}
}

private File getDatafeedIdFile() {
String pathToDatafeedIdFile = bdkConfig.getDatafeed().getIdFilePath();

File file = new File(pathToDatafeedIdFile);
if (file.isDirectory()) {
file = new File(pathToDatafeedIdFile + File.separator + DATAFEED_ID_FILE);
}
return file;
protected Optional<String> retrieveDatafeed() {
log.debug("Start retrieving datafeed id");
return this.datafeedRepository.read();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.symphony.bdk.core.service.datafeed.impl;

import com.symphony.bdk.core.config.model.BdkConfig;
import com.symphony.bdk.core.service.datafeed.DatafeedIdRepository;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Optional;

/**
* The implementation of {@link DatafeedIdRepository} interface for persisting a datafeed id on disk.
*/
@Slf4j
public class OnDiskDatafeedIdRepository implements DatafeedIdRepository {

private static final String DATAFEED_ID_FILE = "datafeed.id";

private final BdkConfig config;

public OnDiskDatafeedIdRepository(BdkConfig config) {
this.config = config;
}

/**
* {@inheritDoc}
*/
@Override
public void write(String datafeedId) {
log.debug("Writing datafeed id {} to file: {}", datafeedId, this.getDatafeedIdFile().toString());
String agentUrl = this.config.getAgent().getHost() + ":" + this.config.getAgent().getPort();
try {
FileUtils.writeStringToFile(this.getDatafeedIdFile(), datafeedId + "@" + agentUrl, StandardCharsets.UTF_8);
} catch (IOException e) {
log.error(e.getMessage());
}
}

/**
* {@inheritDoc}
*/
@Override
public Optional<String> read() {
log.debug("Reading datafeed id from file: {}", this.getDatafeedIdFile().toString());
String datafeedId;
try {
File file = this.getDatafeedIdFile();
Path datafeedIdPath = Paths.get(file.getPath());
List<String> lines = Files.readAllLines(datafeedIdPath);
if (lines.isEmpty() || !lines.get(0).contains("@")) {
return Optional.empty();
}
String[] persistedDatafeed = lines.get(0).split("@");
datafeedId = persistedDatafeed[0];
log.info("Retrieved datafeed id from datafeed repository: {}", datafeedId);
return Optional.of(datafeedId);
} catch (IOException e) {
log.debug("No persisted datafeed id could be retrieved from disk");
return Optional.empty();
}
}

private File getDatafeedIdFile() {
File file = new File(this.config.getDatafeed().getIdFilePath());
if (file.isDirectory()) {
file = new File(this.config.getDatafeed().getIdFilePath() + File.separator + DATAFEED_ID_FILE);
}
return file;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package com.symphony.bdk.core.service.datafeed.impl;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -46,17 +43,21 @@
import com.symphony.bdk.gen.api.model.V4UserRequestedToJoinRoom;

import io.github.resilience4j.retry.Retry;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

public class DatafeedServiceV1Test {
Expand Down Expand Up @@ -204,8 +205,9 @@ void retrieveDatafeedIdFromDatafeedDir(@TempDir Path tempDir) throws IOException
datafeedConfig.setIdFilePath(tempDir.toString());
bdkConfig.setDatafeed(datafeedConfig);

String datafeedId = this.datafeedService.retrieveDatafeedIdFromDisk();
assertEquals(datafeedId, "8e7c8672-220");
Optional<String> datafeedId = this.datafeedService.retrieveDatafeed();
assertTrue(datafeedId.isPresent());
assertEquals(datafeedId.get(), "8e7c8672-220");
}

@Test
Expand All @@ -217,8 +219,21 @@ void retrieveDatafeedIdFromDatafeedFile(@TempDir Path tempDir) throws IOExceptio
datafeedConfig.setIdFilePath(datafeedFile.toString());
bdkConfig.setDatafeed(datafeedConfig);

String datafeedId = this.datafeedService.retrieveDatafeedIdFromDisk();
assertEquals(datafeedId, "8e7c8672-220");
Optional<String> datafeedId = this.datafeedService.retrieveDatafeed();
assertTrue(datafeedId.isPresent());
assertEquals(datafeedId.get(), "8e7c8672-220");
}

@Test
void retrieveDatafeedIdFromInvalidDatafeedFile(@TempDir Path tempDir) throws IOException {
Path datafeedFile = tempDir.resolve("datafeed.id");
FileUtils.writeStringToFile(new File(String.valueOf(datafeedFile)), "8e7c8672-220", StandardCharsets.UTF_8 );
BdkDatafeedConfig datafeedConfig = bdkConfig.getDatafeed();
datafeedConfig.setIdFilePath(datafeedFile.toString());
bdkConfig.setDatafeed(datafeedConfig);

Optional<String> datafeedId = this.datafeedService.retrieveDatafeed();
assertFalse(datafeedId.isPresent());
}

@Test
Expand All @@ -227,8 +242,8 @@ void retrieveDatafeedIdFromUnknownPath() {
datafeedConfig.setIdFilePath("unknown_path");
bdkConfig.setDatafeed(datafeedConfig);

String datafeedId = this.datafeedService.retrieveDatafeedIdFromDisk();
assertNull(datafeedId);
Optional<String> datafeedId = this.datafeedService.retrieveDatafeed();
assertFalse(datafeedId.isPresent());
}

@Test
Expand All @@ -237,8 +252,8 @@ void retrieveDatafeedIdFromEmptyFile(@TempDir Path tempDir) {
BdkDatafeedConfig datafeedConfig = bdkConfig.getDatafeed();
datafeedConfig.setIdFilePath(datafeedFile.toString());

String datafeedId = this.datafeedService.retrieveDatafeedIdFromDisk();
assertNull(datafeedId);
Optional<String> datafeedId = this.datafeedService.retrieveDatafeed();
assertFalse(datafeedId.isPresent());
}

@Test
Expand Down
Loading

0 comments on commit 5b30c83

Please sign in to comment.