Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge branch 'master' into aruha-1322
Browse files Browse the repository at this point in the history
  • Loading branch information
adyach authored Nov 16, 2017
2 parents 768fd9c + 29bf399 commit 4c95617
Show file tree
Hide file tree
Showing 24 changed files with 539 additions and 71 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

### Added
- Change default storage via env var or at runtime

### Changed
- Limited stream_timeout for consumption to 1h ± 10min

## [2.2.9] - 2017-11-14

### Fixed
- Fixed displaying of streamId for /stats endpoint

## [2.2.8] - 2017-11-01

### Fixed
Expand Down
34 changes: 33 additions & 1 deletion docs/_data/nakadi-event-bus-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1582,6 +1582,34 @@ paths:
schema:
$ref: '#/definitions/Problem'

/storages/default/{id}:
put:
tags:
- timelines-api
description: |
Sets default storage to use in Nakadi.
The oauth resource owner username has to be equal to 'nakadi.oauth2.adminClientId' property
to be able to access this endpoint.
parameters:
- name: id
in: path
description: storage backend ID
type: string
required: true
responses:
'200':
description: OK
schema:
$ref: '#/definitions/Storage'
'404':
description: Storage backend not found
schema:
$ref: '#/definitions/Problem'
'403':
description: Access forbidden
schema:
$ref: '#/definitions/Problem'

/event-types/{name}/timelines:
post:
tags:
Expand Down Expand Up @@ -2640,16 +2668,20 @@ parameters:
in: query
description: |
Maximum time in seconds a stream will live before connection is closed by the server.
If 0 or unspecified will stream indefinitely.
If 0 or unspecified will stream for 1h ±10min.
If this timeout is reached, any pending messages (in the sense of `stream_limit`) will be flushed
to the client.
Stream initialization will fail if `stream_timeout` is lower than `batch_flush_timeout`.
If the `stream_timeout` is greater than max value (4200 seconds) - Nakadi will treat this as not
specifying `stream_timeout` (this is done due to backwards compatibility).
type: number
format: int32
required: false
default: 0
minimum: 0
maximum: 4200

SubscriptionId:
name: subscription_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.zalando.nakadi.config.JsonConfig;
import org.zalando.nakadi.domain.Storage;
import org.zalando.nakadi.exceptions.runtime.DuplicatedStorageException;
import org.zalando.nakadi.repository.db.EventTypeDbRepository;
import org.zalando.nakadi.repository.db.StorageDbRepository;
import org.zalando.nakadi.repository.db.TimelineDbRepository;

Expand All @@ -31,7 +30,6 @@ public abstract class BaseAT {
private static final JdbcTemplate JDBC_TEMPLATE = new JdbcTemplate(
new DriverManagerDataSource(POSTGRES_URL, POSTGRES_USER, POSTGRES_PWD));
protected static final ObjectMapper MAPPER = (new JsonConfig()).jacksonObjectMapper();
protected static final EventTypeDbRepository EVENT_TYPE_REPO = new EventTypeDbRepository(JDBC_TEMPLATE, MAPPER);
protected static final StorageDbRepository STORAGE_DB_REPOSITORY = new StorageDbRepository(JDBC_TEMPLATE, MAPPER);
protected static final TimelineDbRepository TIMELINE_REPOSITORY = new TimelineDbRepository(JDBC_TEMPLATE, MAPPER);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.zalando.nakadi.webservice;

import org.junit.Assert;
import org.junit.Test;
import org.zalando.nakadi.utils.EventTypeTestBuilder;
import org.zalando.nakadi.webservice.utils.NakadiTestUtils;

import static com.jayway.restassured.RestAssured.given;
import static com.jayway.restassured.http.ContentType.JSON;

public class StorageControllerAT extends BaseAT {

@Test
public void shouldChangeDefaultStorageWhenRequested() throws Exception {
given()
.body("{\"id\": \"default-test\",\"kafka_configuration\": {\"exhibitor_address\": null," +
"\"exhibitor_port\": 0,\"zk_address\": \"127.0.0.1:2181\",\"zk_path\": \"\"}," +
"\"storage_type\": \"kafka\"}")
.contentType(JSON).post("/storages");

NakadiTestUtils.createEventTypeInNakadi(EventTypeTestBuilder.builder().name("event_a").build());
String storageId = (String) NakadiTestUtils.listTimelines("event_a").get(0).get("storage_id");
Assert.assertEquals("default", storageId);

given().contentType(JSON).put("/storages/default/default-test");
NakadiTestUtils.createEventTypeInNakadi(EventTypeTestBuilder.builder().name("event_b").build());
storageId = (String) NakadiTestUtils.listTimelines("event_b").get(0).get("storage_id");
Assert.assertEquals("default-test", storageId);

}
}
69 changes: 55 additions & 14 deletions src/main/java/org/zalando/nakadi/config/NakadiConfig.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.zalando.nakadi.config;

import com.google.common.base.Charsets;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
Expand All @@ -9,12 +12,16 @@
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.zalando.nakadi.domain.DefaultStorage;
import org.zalando.nakadi.domain.Storage;
import org.zalando.nakadi.exceptions.InternalNakadiException;
import org.zalando.nakadi.exceptions.runtime.DuplicatedStorageException;
import org.zalando.nakadi.repository.db.StorageDbRepository;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperLockFactory;
import org.zalando.nakadi.service.StorageService;

import java.util.Optional;

@Configuration
@EnableScheduling
Expand All @@ -34,22 +41,56 @@ public ZooKeeperLockFactory zooKeeperLockFactory(final ZooKeeperHolder zooKeeper

@Bean
@Qualifier("default_storage")
public Storage defaultStorage(final StorageDbRepository storageDbRepository,
final Environment environment) throws InternalNakadiException {
final Storage storage = new Storage();
storage.setId("default");
storage.setType(Storage.Type.KAFKA);
storage.setConfiguration(new Storage.KafkaConfiguration(
environment.getProperty("nakadi.zookeeper.exhibitor.brokers"),
Integer.valueOf(environment.getProperty("nakadi.zookeeper.exhibitor.port", "0")),
environment.getProperty("nakadi.zookeeper.brokers"),
environment.getProperty("nakadi.zookeeper.kafkaNamespace", "")));
public DefaultStorage defaultStorage(final StorageDbRepository storageDbRepository,
final Environment environment,
final ZooKeeperHolder zooKeeperHolder)
throws InternalNakadiException {
final String storageId = getStorageId(zooKeeperHolder, environment);
final Optional<Storage> storageOpt = storageDbRepository.getStorage(storageId);
if (!storageOpt.isPresent()) {
LOGGER.info("Creating timelines storage `{}` from defaults", storageId);
final Storage storage = new Storage();
storage.setId(storageId);
storage.setType(Storage.Type.KAFKA);
storage.setConfiguration(new Storage.KafkaConfiguration(
environment.getProperty("nakadi.zookeeper.exhibitor.brokers"),
Integer.valueOf(environment.getProperty("nakadi.zookeeper.exhibitor.port", "0")),
environment.getProperty("nakadi.zookeeper.brokers"),
environment.getProperty("nakadi.zookeeper.kafkaNamespace", "")));
try {
storageDbRepository.createStorage(storage);
} catch (final DuplicatedStorageException e) {
LOGGER.info("Creation of default storage failed: {}", e.getMessage());
}
return new DefaultStorage(storage);
} else {
return new DefaultStorage(storageOpt.get());
}
}

private String getStorageId(final ZooKeeperHolder zooKeeperHolder,
final Environment environment) {
final CuratorFramework curator = zooKeeperHolder.get();
try {
storageDbRepository.createStorage(storage);
} catch (final DuplicatedStorageException e) {
LOGGER.info("Creation of default storage failed: {}", e.getMessage());
curator.create().creatingParentsIfNeeded()
.forPath(StorageService.ZK_TIMELINES_DEFAULT_STORAGE, null);
} catch (final KeeperException.NodeExistsException e) {
LOGGER.trace("Node {} already is there", StorageService.ZK_TIMELINES_DEFAULT_STORAGE);
} catch (final Exception e) {
LOGGER.error("Zookeeper access error {}", e.getMessage(), e);
}
return storage;

try {
final byte[] storageIdBytes = curator.getData()
.forPath(StorageService.ZK_TIMELINES_DEFAULT_STORAGE);
if (storageIdBytes != null) {
return new String(storageIdBytes, Charsets.UTF_8);
}
} catch (final Exception e) {
LOGGER.warn("Init of default storage from zk failed, will use default from env", e);
}

return environment.getProperty("nakadi.timelines.storage.default");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,16 @@ public ResponseEntity<?> deleteStorage(@PathVariable("id") final String id, fina
}
return Responses.create(result.getProblem(), request);
}

@RequestMapping(value = "/storages/default/{id}", method = RequestMethod.PUT)
public ResponseEntity<?> setDefaultStorage(@PathVariable("id") final String id, final NativeWebRequest request) {
if (!adminService.isAdmin(AuthorizationService.Operation.WRITE)) {
return status(FORBIDDEN).build();
}
final Result<Storage> result = storageService.setDefaultStorage(id);
if (result.isSuccessful()) {
return status(OK).body(result.getValue());
}
return Responses.create(result.getProblem(), request);
}
}
18 changes: 18 additions & 0 deletions src/main/java/org/zalando/nakadi/domain/DefaultStorage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.zalando.nakadi.domain;

public class DefaultStorage {

private volatile Storage storage;

public DefaultStorage(final Storage storage) {
this.storage = storage;
}

public void setStorage(final Storage storage) {
this.storage = storage;
}

public Storage getStorage() {
return storage;
}
}
13 changes: 10 additions & 3 deletions src/main/java/org/zalando/nakadi/service/EventStreamConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
import java.util.List;
import java.util.Random;

@Immutable
public class EventStreamConfig {

public static final int MAX_STREAM_TIMEOUT = 3600 + 600; // 1h 10m

private static final int BATCH_LIMIT_DEFAULT = 1;
private static final int STREAM_LIMIT_DEFAULT = 0;
private static final int BATCH_FLUSH_TIMEOUT_DEFAULT = 30;
private static final int STREAM_TIMEOUT_DEFAULT = 0;
private static final int STREAM_KEEP_ALIVE_LIMIT_DEFAULT = 0;
private static final Random RANDOM = new Random();

private final List<NakadiCursor> cursors;
private final int batchLimit;
Expand Down Expand Up @@ -108,13 +111,17 @@ public static Builder builder() {
return new Builder();
}

public static int generateDefaultStreamTimeout() {
return 3600 + RANDOM.nextInt(1200) - 600; // 1h ± 10min
}

public static class Builder {

private List<NakadiCursor> cursors = null;
private int batchLimit = BATCH_LIMIT_DEFAULT;
private int streamLimit = STREAM_LIMIT_DEFAULT;
private int batchTimeout = BATCH_FLUSH_TIMEOUT_DEFAULT;
private int streamTimeout = STREAM_TIMEOUT_DEFAULT;
private int streamTimeout = generateDefaultStreamTimeout();
private int streamKeepAliveLimit = STREAM_KEEP_ALIVE_LIMIT_DEFAULT;
private String etName;
private String consumingAppId;
Expand Down Expand Up @@ -150,7 +157,7 @@ public Builder withBatchTimeout(@Nullable final Integer batchTimeout) {
}

public Builder withStreamTimeout(@Nullable final Integer streamTimeout) {
if (streamTimeout != null) {
if (streamTimeout != null && streamTimeout <= MAX_STREAM_TIMEOUT && streamTimeout > 0) {
this.streamTimeout = streamTimeout;
}
return this;
Expand Down
Loading

0 comments on commit 4c95617

Please sign in to comment.