Skip to content

Commit

Permalink
Implement post record web api
Browse files Browse the repository at this point in the history
  • Loading branch information
brandboat committed Jun 23, 2022
1 parent e666e4d commit e31250e
Show file tree
Hide file tree
Showing 4 changed files with 324 additions and 0 deletions.
8 changes: 8 additions & 0 deletions app/src/main/java/org/astraea/app/web/PostRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,12 @@ default short shortValue(String key, short defaultValue) {
default short shortValue(String key) {
return Short.parseShort(value(key));
}

default boolean booleanValue(String key, boolean defaultValue) {
return get(key).map(Boolean::parseBoolean).orElse(defaultValue);
}

default boolean booleanValue(String key) {
return Boolean.parseBoolean(value(key));
}
}
145 changes: 145 additions & 0 deletions app/src/main/java/org/astraea/app/web/RecordHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.web;

import static java.util.Objects.requireNonNull;

import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import org.astraea.app.producer.Producer;
import org.astraea.app.producer.Serializer;

public class RecordHandler implements Handler {
static final String KEY_SERIALIZER = "keySerializer";
static final String VALUE_SERIALIZER = "valueSerializer";
static final String TOPIC = "topic";
static final String KEY = "key";
static final String VALUE = "value";
static final String TIMESTAMP = "timestamp";
static final String PARTITION = "partition";
static final String ASYNC = "async";

// visible for testing
final Producer<byte[], byte[]> producer;

RecordHandler(String bootstrapServers) {
this.producer = Producer.builder().bootstrapServers(requireNonNull(bootstrapServers)).build();
}

@Override
public JsonObject get(Optional<String> target, Map<String, String> queries) {
// TODO: Implement topic read web apis (https://github.com/skiptests/astraea/pull/405)
return ErrorObject.for404("GET is not supported yet");
}

@Override
public JsonObject post(PostRequest request) {
var topic =
request.get(TOPIC).orElseThrow(() -> new IllegalArgumentException("topic must be set"));
var keySerDe =
SerDe.valueOf(
request.get(KEY_SERIALIZER).map(String::toUpperCase).orElse(SerDe.STRING.name()));
var valueSerDe =
SerDe.valueOf(
request.get(VALUE_SERIALIZER).map(String::toUpperCase).orElse(SerDe.STRING.name()));
var async = request.booleanValue(ASYNC, false);
var sender = producer.sender().topic(topic);

request.get(KEY).ifPresent(k -> sender.key(keySerDe.serializer.apply(topic, k)));
request.get(VALUE).ifPresent(v -> sender.value(valueSerDe.serializer.apply(topic, v)));
request.get(TIMESTAMP).ifPresent(t -> sender.timestamp(Long.parseLong(t)));
request.get(PARTITION).ifPresent(p -> sender.partition(Integer.parseInt(p)));

CompletableFuture<org.astraea.app.producer.Metadata> senderFuture =
sender.run().toCompletableFuture();

if (async) {
// TODO: Return HTTP status code 202 instead of 200
// (https://github.com/skiptests/astraea/issues/420)
return new JsonObject() {};
}
try {
return new Metadata(senderFuture.get());
} catch (InterruptedException | ExecutionException e) {
return ErrorObject.for404(e.getMessage());
}
}

enum SerDe {
BYTEARRAY(
(topic, value) ->
Optional.ofNullable(value).map(v -> Base64.getDecoder().decode(v)).orElse(null)),
STRING(
(topic, value) ->
Optional.ofNullable(value)
.map(v -> Serializer.STRING.serialize(topic, List.of(), value))
.orElse(null)),
LONG(
(topic, value) ->
Optional.ofNullable(value)
.map(Long::parseLong)
.map(longVal -> Serializer.LONG.serialize(topic, List.of(), longVal))
.orElse(null)),
INTEGER(
(topic, value) ->
Optional.ofNullable(value)
.map(Integer::parseInt)
.map(intVal -> Serializer.INTEGER.serialize(topic, List.of(), intVal))
.orElse(null)),
FLOAT(
(topic, value) ->
Optional.ofNullable(value)
.map(Float::parseFloat)
.map(floatVal -> Serializer.FLOAT.serialize(topic, List.of(), floatVal))
.orElse(null)),
DOUBLE(
(topic, value) ->
Optional.ofNullable(value)
.map(Double::parseDouble)
.map(doubleVal -> Serializer.DOUBLE.serialize(topic, List.of(), doubleVal))
.orElse(null));

final BiFunction<String, String, byte[]> serializer;

SerDe(BiFunction<String, String, byte[]> serializer) {
this.serializer = requireNonNull(serializer);
}
}

static class Metadata implements JsonObject {
final String topic;
final int partition;
final long offset;
final long timestamp;
final int serializedKeySize;
final int serializedValueSize;

Metadata(org.astraea.app.producer.Metadata metadata) {
topic = metadata.topic();
partition = metadata.partition();
offset = metadata.offset();
timestamp = metadata.timestamp();
serializedKeySize = metadata.serializedKeySize();
serializedValueSize = metadata.serializedValueSize();
}
}
}
1 change: 1 addition & 0 deletions app/src/main/java/org/astraea/app/web/WebService.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ private static void execute(Argument arg) throws IOException {
server.createContext("/transactions", new TransactionHandler(Admin.of(arg.configs())));
if (arg.needJmx())
server.createContext("/beans", new BeanHandler(Admin.of(arg.configs()), arg.jmxPorts()));
server.createContext("/records", new RecordHandler(arg.bootstrapServers()));
server.start();
}

Expand Down
170 changes: 170 additions & 0 deletions app/src/test/java/org/astraea/app/web/RecordHandlerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.web;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.astraea.app.web.RecordHandler.ASYNC;
import static org.astraea.app.web.RecordHandler.KEY;
import static org.astraea.app.web.RecordHandler.KEY_SERIALIZER;
import static org.astraea.app.web.RecordHandler.PARTITION;
import static org.astraea.app.web.RecordHandler.TIMESTAMP;
import static org.astraea.app.web.RecordHandler.TOPIC;
import static org.astraea.app.web.RecordHandler.VALUE;
import static org.astraea.app.web.RecordHandler.VALUE_SERIALIZER;
import static org.junit.jupiter.params.provider.Arguments.arguments;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Base64;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import org.astraea.app.common.Utils;
import org.astraea.app.consumer.Consumer;
import org.astraea.app.consumer.Record;
import org.astraea.app.service.RequireBrokerCluster;
import org.astraea.app.web.RecordHandler.Metadata;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class RecordHandlerTest extends RequireBrokerCluster {

@Test
void testInvalidPost() {
var handler = new RecordHandler(bootstrapServers());
Assertions.assertThrows(
IllegalArgumentException.class,
() -> handler.post(PostRequest.of(Map.of())),
"topic must be set");
}

@Test
void testPost() {
var topic = Utils.randomString(10);
var handler = new RecordHandler(bootstrapServers());
var currentTimestamp = System.currentTimeMillis();
var result =
Assertions.assertInstanceOf(
Metadata.class,
handler.post(
PostRequest.of(
Map.of(
KEY_SERIALIZER, "string",
VALUE_SERIALIZER, "integer",
TOPIC, topic,
KEY, "foo",
VALUE, "100",
TIMESTAMP, "" + currentTimestamp,
PARTITION, "0"))));

Assertions.assertEquals(0, result.offset);
Assertions.assertEquals(0, result.partition);
Assertions.assertEquals(topic, result.topic);
Assertions.assertEquals("foo".getBytes(UTF_8).length, result.serializedKeySize);
Assertions.assertEquals(4, result.serializedValueSize);

try (Consumer<byte[], byte[]> consumer =
Consumer.forTopics(Set.of(topic))
.bootstrapServers(bootstrapServers())
.fromBeginning()
.build()) {
Record<byte[], byte[]> record = consumer.poll(1, Duration.ofSeconds(10)).iterator().next();
Assertions.assertEquals(topic, record.topic());
Assertions.assertEquals(currentTimestamp, record.timestamp());
Assertions.assertEquals(0, record.partition());
Assertions.assertEquals("foo".getBytes(UTF_8).length, record.serializedKeySize());
Assertions.assertEquals(4, record.serializedValueSize());
Assertions.assertArrayEquals("foo".getBytes(UTF_8), record.key());
Assertions.assertArrayEquals(ByteBuffer.allocate(4).putInt(100).array(), record.value());
}
}

@Test
void testPostWithAsync() {
var topic = Utils.randomString(10);
var handler = new RecordHandler(bootstrapServers());
var currentTimestamp = System.currentTimeMillis();
var result =
Assertions.assertInstanceOf(
JsonObject.class,
handler.post(
PostRequest.of(
Map.of(
KEY_SERIALIZER, "string",
VALUE_SERIALIZER, "integer",
TOPIC, topic,
KEY, "foo",
VALUE, "100",
TIMESTAMP, "" + currentTimestamp,
ASYNC, "true",
PARTITION, "0"))));
Assertions.assertEquals(new JsonObject() {}.json(), result.json());

handler.producer.flush();

try (Consumer<byte[], byte[]> consumer =
Consumer.forTopics(Set.of(topic))
.bootstrapServers(bootstrapServers())
.fromBeginning()
.build()) {
Record<byte[], byte[]> record = consumer.poll(1, Duration.ofSeconds(10)).iterator().next();
Assertions.assertEquals(topic, record.topic());
Assertions.assertEquals(currentTimestamp, record.timestamp());
Assertions.assertEquals(0, record.partition());
Assertions.assertEquals("foo".getBytes(UTF_8).length, record.serializedKeySize());
Assertions.assertEquals(4, record.serializedValueSize());
Assertions.assertArrayEquals("foo".getBytes(UTF_8), record.key());
Assertions.assertArrayEquals(ByteBuffer.allocate(4).putInt(100).array(), record.value());
}
}

@ParameterizedTest
@MethodSource("forTestSerializer")
void testSerializer(String serializer, String actual, byte[] expected) {
var topic = Utils.randomString(10);
var handler = new RecordHandler(bootstrapServers());
Assertions.assertInstanceOf(
Metadata.class,
handler.post(
PostRequest.of(Map.of(KEY_SERIALIZER, serializer, TOPIC, topic, KEY, actual))));

try (Consumer<byte[], byte[]> consumer =
Consumer.forTopics(Set.of(topic))
.bootstrapServers(bootstrapServers())
.fromBeginning()
.build()) {
Record<byte[], byte[]> record = consumer.poll(1, Duration.ofSeconds(10)).iterator().next();
Assertions.assertArrayEquals(expected, record.key());
}
}

private static Stream<Arguments> forTestSerializer() {
return Stream.of(
arguments("integer", "10", ByteBuffer.allocate(Integer.BYTES).putInt(10).array()),
arguments("long", "11", ByteBuffer.allocate(Long.BYTES).putLong(11).array()),
arguments("float", "0.1", ByteBuffer.allocate(Float.BYTES).putFloat(0.1f).array()),
arguments("double", "0.1", ByteBuffer.allocate(Double.BYTES).putDouble(0.1).array()),
arguments("string", "astraea", "astraea".getBytes(UTF_8)),
arguments(
"bytearray",
Base64.getEncoder().encodeToString("astraea".getBytes(UTF_8)),
"astraea".getBytes(UTF_8)));
}
}

0 comments on commit e31250e

Please sign in to comment.