Skip to content

Commit

Permalink
fix(e2e): Add e2e test cases (#1067)
Browse files Browse the repository at this point in the history
* fix(e2e): Add e2e test cases

Signed-off-by: tianpingan <tianpingan2000@gmail.com>

* fix(e2e): Add time limit

Signed-off-by: tianpingan <tianpingan2000@gmail.com>

* Fix comments

Signed-off-by: tianpingan <tianpingan2000@gmail.com>

* Fix comments

Signed-off-by: tianpingan <tianpingan2000@gmail.com>

* Add comments

Signed-off-by: tianpingan <tianpingan2000@gmail.com>

---------

Signed-off-by: tianpingan <tianpingan2000@gmail.com>
  • Loading branch information
Tianpingan authored Sep 1, 2023
1 parent 5fc1af3 commit fcfe8aa
Show file tree
Hide file tree
Showing 26 changed files with 1,477 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.automq.elasticstream.client.tools.e2e;

import static org.junit.Assert.assertTrue;

import java.util.concurrent.ExecutionException;

import com.automq.elasticstream.client.api.Client;
import com.automq.elasticstream.client.api.CreateStreamOptions;
import com.automq.elasticstream.client.api.OpenStreamOptions;
import com.automq.elasticstream.client.api.Stream;

public class AppendTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
E2EOption option = new E2EOption();
Client client = Client.builder().endpoint(option.getEndPoint()).kvEndpoint(option.getKvEndPoint()).build();
// 1. Create an new stream, append records to it and fetch records from it
Stream stream0 = client.streamClient()
.createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0)
.replicaCount(option.getReplica()).build())
.get();
long streamId = stream0.streamId();
assertTrue(Utils.appendRecords(stream0, 0, option.getCount(), option.getBatchSize()));
assertTrue(Utils.fetchRecords(stream0, 0, option.getCount(), option.getBatchSize()));
stream0.close().get();
// 2 . Open a stream with new epoch, append records to it and fetch records
// from it
Stream stream1 = Utils.openStream(client, streamId, OpenStreamOptions.newBuilder().epoch(1).build());
assertTrue(stream1 != null);
assertTrue(Utils.appendRecords(stream1, option.getCount(), option.getCount(), option.getBatchSize()));
assertTrue(Utils.fetchRecords(stream1, option.getCount(), option.getCount(), option.getBatchSize()));
stream1.close().get();
// 3 . Open a stream with old epoch
Stream stream2 = Utils.openStream(client, streamId, OpenStreamOptions.newBuilder().epoch(0).build());
assertTrue(stream2 == null);
System.out.println("PASS");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.automq.elasticstream.client.tools.e2e;

public class E2EOption {
private String endpoint = "127.0.0.1:12378";
private String kvEndpoint = "127.0.0.1:12379";
private long count = 100;
private long streamId = -1;
private long startSeq = 0;
private int replica = 1;
private int batchSize = 10;

public E2EOption() {
String endpoint = System.getenv("E2E_END_POINT");
if (endpoint != null) {
this.endpoint = endpoint;
}
String kvEndpoint = System.getenv("E2E_KV_END_POINT");
if (kvEndpoint != null) {
this.kvEndpoint = kvEndpoint;
}
String count = System.getenv("E2E_COUNT");
if (count != null) {
this.count = Long.parseLong(count);
}
String streamId = System.getenv("E2E_STREAM_ID");
if (streamId != null) {
this.streamId = Long.parseLong(streamId);
}
String startSeq = System.getenv("E2E_START_SEQ");
if (startSeq != null) {
this.startSeq = Long.parseLong(startSeq);
}
String replica = System.getenv("E2E_REPLICA");
if (replica != null) {
this.replica = Integer.parseInt(replica);
}
String batchSize = System.getenv("E2E_BATCH_SIZE");
if (batchSize != null) {
this.batchSize = Integer.parseInt(batchSize);
}
}

public String getEndPoint() {
return this.endpoint;
}

public String getKvEndPoint() {
return this.kvEndpoint;
}

public long getCount() {
return this.count;
}

public long getStreamId() {
return this.streamId;
}

public long getStartSeq() {
return this.startSeq;
}

public int getReplica() {
return this.replica;
}

public int getBatchSize() {
return this.batchSize;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package com.automq.elasticstream.client.tools.e2e;

import static org.junit.Assert.assertEquals;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;

import com.automq.elasticstream.client.DefaultRecordBatch;
import com.automq.elasticstream.client.api.AppendResult;
import com.automq.elasticstream.client.api.Client;
import com.automq.elasticstream.client.api.CreateStreamOptions;
import com.automq.elasticstream.client.api.FetchResult;
import com.automq.elasticstream.client.api.OpenStreamOptions;
import com.automq.elasticstream.client.api.RecordBatchWithContext;
import com.automq.elasticstream.client.api.Stream;

public class ExampleTest {

public static void main(String[] args) throws Exception {
E2EOption option = new E2EOption();
Client client = Client.builder().endpoint(option.getEndPoint()).kvEndpoint(option.getKvEndPoint()).build();
Stream stream = client.streamClient()
.createAndOpenStream(CreateStreamOptions.newBuilder().replicaCount(option.getReplica()).build()).get();
long streamId = stream.streamId();

System.out.println("Step1: append 10 records to stream:" + streamId);
int count = 10;
CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; i++) {
int index = i;
byte[] payload = String.format("hello world %03d", i).getBytes(StandardCharsets.UTF_8);
ByteBuffer buffer = ByteBuffer.wrap(payload);
long startNanos = System.nanoTime();
CompletableFuture<AppendResult> cf = stream
.append(new DefaultRecordBatch(10, 0, Collections.emptyMap(), buffer));
System.out.println("append " + index + " async cost:" + (System.nanoTime() - startNanos) / 1000 + "us");
cf.whenComplete((rst, ex) -> {
if (ex == null) {
long offset = rst.baseOffset();
assertEquals(index * 10, offset);
System.out.println(
"append " + index + " callback cost:" + (System.nanoTime() - startNanos) / 1000 + "us");
}
latch.countDown();
});
}
latch.await();

System.out.println("Step2: read 10 record batch one by one");
for (int i = 0; i < 10; i++) {
FetchResult fetchResult = stream.fetch(i * 10, i * 10 + 10, Integer.MAX_VALUE).get();
assertEquals(1, fetchResult.recordBatchList().size());
RecordBatchWithContext recordBatch = fetchResult.recordBatchList().get(0);
assertEquals(i * 10, recordBatch.baseOffset());
assertEquals(i * 10 + 10, recordBatch.lastOffset());

byte[] rawPayload = new byte[recordBatch.rawPayload().remaining()];
recordBatch.rawPayload().get(rawPayload);
String payloadStr = new String(rawPayload, StandardCharsets.UTF_8);
System.out.println("fetch record result offset[" + recordBatch.baseOffset() + ","
+ recordBatch.lastOffset() + "]" + " payload:" + payloadStr + ".");
assertEquals(String.format("hello world %03d", i), payloadStr);
fetchResult.free();
}

System.out.println("Step3: cross read 10 record batch");
for (int i = 0; i < 9; i++) {
FetchResult fetchResult = stream.fetch(i * 10, i * 10 + 11, Integer.MAX_VALUE).get();
assertEquals(2, fetchResult.recordBatchList().size());
for (int j = 0; j < 2; j++) {
int index = i + j;
RecordBatchWithContext recordBatch = fetchResult.recordBatchList().get(j);
assertEquals(index * 10, recordBatch.baseOffset());
assertEquals(index * 10 + 10, recordBatch.lastOffset());
byte[] rawPayload = new byte[recordBatch.rawPayload().remaining()];
recordBatch.rawPayload().get(rawPayload);
String payloadStr = new String(rawPayload, StandardCharsets.UTF_8);
assertEquals(String.format("hello world %03d", index), payloadStr);
System.out.println("fetch record result offset[" + recordBatch.baseOffset() + ","
+ recordBatch.lastOffset() + "]" + " payload:" + payloadStr + ".");
}
fetchResult.free();
}

System.out.println("Step4: reopen stream");
stream.close().get();
stream = client.streamClient().openStream(streamId, OpenStreamOptions.newBuilder().build()).get();

System.out.println("Step5: append more 10 record batches");
for (int i = 0; i < 10; i++) {
int index = i + 10;
byte[] payload = String.format("hello world %03d", index).getBytes(StandardCharsets.UTF_8);
ByteBuffer buffer = ByteBuffer.wrap(payload);
AppendResult appendResult = stream
.append(new DefaultRecordBatch(10, 0, Collections.emptyMap(), buffer)).get();
long offset = appendResult.baseOffset();
System.out.println("append record result offset:" + offset);
assertEquals(index * 10, offset);
}

System.out.println("Step6: read 20 record batch one by one");
for (int i = 0; i < 20; i++) {
FetchResult fetchResult = stream.fetch(i * 10, i * 10 + 10, Integer.MAX_VALUE).get();
assertEquals(1, fetchResult.recordBatchList().size());
RecordBatchWithContext recordBatch = fetchResult.recordBatchList().get(0);
assertEquals(i * 10, recordBatch.baseOffset());
assertEquals(i * 10 + 10, recordBatch.lastOffset());

byte[] rawPayload = new byte[recordBatch.rawPayload().remaining()];
recordBatch.rawPayload().get(rawPayload);
String payloadStr = new String(rawPayload, StandardCharsets.UTF_8);
System.out.println("fetch record result offset[" + recordBatch.baseOffset() + ","
+ recordBatch.lastOffset() + "]" + " payload:" + payloadStr + ".");
assertEquals(String.format("hello world %03d", i), payloadStr);
fetchResult.free();
}
System.out.println("PASS");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.automq.elasticstream.client.tools.e2e;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;

import com.automq.elasticstream.client.api.Client;
import com.automq.elasticstream.client.api.CreateStreamOptions;
import com.automq.elasticstream.client.api.FetchResult;
import com.automq.elasticstream.client.api.RecordBatchWithContext;
import com.automq.elasticstream.client.api.Stream;

public class FetchTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
E2EOption option = new E2EOption();
Client client = Client.builder().endpoint(option.getEndPoint()).kvEndpoint(option.getKvEndPoint())
.build();
// 1. Create an new stream and append records to it
Stream stream0 = client.streamClient()
.createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0)
.replicaCount(option.getReplica()).build())
.get();
assertTrue(Utils.appendRecords(stream0, 0, option.getCount(), option.getBatchSize()));
// 2. Fetch records one by one
assertTrue(Utils.fetchRecords(stream0, 0, option.getCount(), option.getBatchSize()));
// 3. Fetch all records in one fetch result
FetchResult fetchResult = stream0.fetch(0, option.getCount() * option.getBatchSize(), Integer.MAX_VALUE)
.get();
int len = fetchResult.recordBatchList().size();
assertEquals(len, option.getCount());
for (int i = 0; i < len; i++) {
RecordBatchWithContext recordBatch = fetchResult.recordBatchList().get(i);
assertEquals(i * option.getBatchSize(), recordBatch.baseOffset());
assertEquals(i * option.getBatchSize() + option.getBatchSize(), recordBatch.lastOffset());
byte[] rawPayload = new byte[recordBatch.rawPayload().remaining()];
recordBatch.rawPayload().get(rawPayload);
String payloadStr = new String(rawPayload, StandardCharsets.UTF_8);
assertTrue(String.format("hello world %03d", i).equals(payloadStr));
}
stream0.close().get();
// 4. Create an new stream and append a batch of records
Stream stream1 = client.streamClient()
.createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0)
.replicaCount(1).build())
.get();
// 5. Fetch records from any part of batch
assertTrue(Utils.appendRecords(stream1, 0, 2, 1024));
assertTrue(Utils.checkFetchResult(stream1, 0, 512, 0, 1));
assertTrue(Utils.checkFetchResult(stream1, 512, 1024, 0, 1));
assertTrue(Utils.checkFetchResult(stream1, 512, 1024 + 1, 0, 2));
assertTrue(Utils.checkFetchResult(stream1, 1024 - 1, 1024 + 1, 0, 2));
assertTrue(Utils.checkFetchResult(stream1, 1024, 1024 + 1, 1, 2));
System.out.println("PASS");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.automq.elasticstream.client.tools.e2e;

import static org.junit.Assert.assertEquals;
import java.util.concurrent.ExecutionException;
import com.automq.elasticstream.client.api.Client;
import com.automq.elasticstream.client.api.CreateStreamOptions;
import com.automq.elasticstream.client.api.OpenStreamOptions;
import com.automq.elasticstream.client.api.Stream;

public class MetadataTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
E2EOption option = new E2EOption();
Client client = Client.builder().endpoint(option.getEndPoint()).kvEndpoint(option.getKvEndPoint()).build();

assertEquals(Utils.openStream(client, 0, OpenStreamOptions.newBuilder().build()), null);
assertEquals(Utils.openStream(client, Long.MAX_VALUE, OpenStreamOptions.newBuilder().build()), null);

// 1. Create stream and check its stream id
for (int i = 0; i < option.getCount(); i++) {
Stream stream = client.streamClient()
.createAndOpenStream(CreateStreamOptions.newBuilder().replicaCount(option.getReplica()).build())
.get();
assertEquals(stream.streamId(), i);
stream.close();
}
// 2. Reopen stream and check its stream id
for (long i = 0; i < option.getCount(); i++) {
Stream stream = client.streamClient().openStream(i, OpenStreamOptions.newBuilder().build()).get();
assertEquals(stream.streamId(), i);
stream.close();
}
System.out.println("PASS");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.automq.elasticstream.client.tools.e2e;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.ExecutionException;
import com.automq.elasticstream.client.api.Client;
import com.automq.elasticstream.client.api.CreateStreamOptions;
import com.automq.elasticstream.client.api.Stream;

public class TrimAndDeleteTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
E2EOption option = new E2EOption();
Client client = Client.builder().endpoint(option.getEndPoint()).kvEndpoint(option.getKvEndPoint())
.build();
// 1. Create an new stream
Stream stream0 = client.streamClient()
.createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0)
.replicaCount(option.getReplica()).build())
.get();
assertTrue(Utils.appendRecords(stream0, 0, option.getCount(), 1));
assertTrue(Utils.fetchRecords(stream0, 0, option.getCount() / 2, 1));
// 2. Trim the stream and try to fetch records from it
stream0.trim(option.getCount() / 2).get();
assertFalse(Utils.fetchRecords(stream0, 0, option.getCount() / 2, 1));
stream0.close().get();
// 3. Create an new stream
Stream stream1 = client.streamClient()
.createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0)
.replicaCount(option.getReplica()).build())
.get();
assertTrue(Utils.appendRecords(stream1, 0, option.getCount(), 1));
assertTrue(Utils.fetchRecords(stream1, 0, option.getCount(), 1));
// 4. Destroy the stream and try to fetch records from it
stream1.destroy().get();
assertFalse(Utils.fetchRecords(stream1, 0, option.getCount(), 1));
assertFalse(Utils.appendRecordsWithTimeout(stream1, option.getCount(),
option.getCount(), 1));
assertFalse(Utils.fetchRecords(stream1, option.getCount(), option.getCount(), 1));
System.out.println("PASS");
}
}
Loading

0 comments on commit fcfe8aa

Please sign in to comment.