Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store): introduce StoreException #54

Merged
merged 4 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.store.exception;

public enum StoreErrorCode {
FILE_SYSTEM_PERMISSION,
KV_SERVICE_IS_NOT_RUNNING,
ILLEGAL_ARGUMENT,
KV_ENGINE_ERROR,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.store.exception;

public class StoreException extends Exception {
private final StoreErrorCode code;

public StoreException(StoreErrorCode code, String message) {
super(message);
this.code = code;
}

public StoreException(StoreErrorCode code, String message, Throwable source) {
super(message, source);
this.code = code;
}

public StoreErrorCode code() {
return code;

Check warning on line 34 in store/src/main/java/com/automq/rocketmq/store/exception/StoreException.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/exception/StoreException.java#L34

Added line #L34 was not covered by tests
}

public String message() {
return getMessage();

Check warning on line 38 in store/src/main/java/com/automq/rocketmq/store/exception/StoreException.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/exception/StoreException.java#L38

Added line #L38 was not covered by tests
}

public Throwable source() {
return getCause();

Check warning on line 42 in store/src/main/java/com/automq/rocketmq/store/exception/StoreException.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/exception/StoreException.java#L42

Added line #L42 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.automq.rocketmq.metadata.StoreMetadataService;
import com.automq.rocketmq.store.MessageStore;
import com.automq.rocketmq.store.StreamStore;
import com.automq.rocketmq.store.exception.StoreException;
import com.automq.rocketmq.store.model.generated.CheckPoint;
import com.automq.rocketmq.store.model.generated.ReceiptHandle;
import com.automq.rocketmq.store.model.kv.BatchDeleteRequest;
Expand All @@ -46,7 +47,6 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.tuple.Pair;
import org.rocksdb.RocksDBException;

import static com.automq.rocketmq.store.util.SerializeUtil.buildCheckPointKey;
import static com.automq.rocketmq.store.util.SerializeUtil.buildCheckPointValue;
Expand Down Expand Up @@ -87,7 +87,7 @@

public void writeCheckPoint(long topicId, int queueId, long streamId, long offset, long consumerGroupId,
long operationId,
boolean fifo, boolean retry, long operationTimestamp, long nextVisibleTimestamp) throws RocksDBException {
boolean fifo, boolean retry, long operationTimestamp, long nextVisibleTimestamp) throws StoreException {
// If this message is not orderly or has not been consumed, write check point and timer tag to KV service atomically.
List<BatchRequest> requestList = new ArrayList<>();
BatchWriteRequest writeCheckPointRequest = new BatchWriteRequest(KV_NAMESPACE_CHECK_POINT,
Expand All @@ -111,7 +111,7 @@
}

public Optional<CheckPoint> retrieveFifoCheckPoint(long consumerGroupId, long topicId, int queueId,
long offset) throws RocksDBException {
long offset) throws StoreException {
// TODO: Undefined behavior if last operation is not orderly.
byte[] orderIndexKey = buildOrderIndexKey(consumerGroupId, topicId, queueId, offset);
byte[] bytes = kvService.get(KV_NAMESPACE_FIFO_INDEX, orderIndexKey);
Expand All @@ -132,7 +132,7 @@

public void renewFifoCheckPoint(CheckPoint lastCheckPoint, long topicId, int queueId, long streamId, long offset,
long consumerGroupId, long operationId, long operationTimestamp,
long nextVisibleTimestamp) throws RocksDBException {
long nextVisibleTimestamp) throws StoreException {
// Delete last check point and timer tag.
BatchDeleteRequest deleteLastCheckPointRequest = new BatchDeleteRequest(KV_NAMESPACE_CHECK_POINT,
buildCheckPointKey(topicId, queueId, offset, lastCheckPoint.operationId()));
Expand Down Expand Up @@ -295,10 +295,15 @@
}
}

return new PopResult(0, operationId, operationTimestamp, messageList);
} catch (RocksDBException e) {
// TODO: handle exception
throw new RuntimeException(e);
PopResult.Status status;
if (!messageList.isEmpty()) {
status = PopResult.Status.FOUND;
} else {
status = PopResult.Status.NOT_FOUND;

Check warning on line 302 in store/src/main/java/com/automq/rocketmq/store/impl/MessageStoreImpl.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/impl/MessageStoreImpl.java#L302

Added line #L302 was not covered by tests
}
return new PopResult(status, operationId, operationTimestamp, messageList);
} catch (StoreException e) {
return new PopResult(PopResult.Status.ERROR, operationId, operationTimestamp, messageList);

Check warning on line 306 in store/src/main/java/com/automq/rocketmq/store/impl/MessageStoreImpl.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/impl/MessageStoreImpl.java#L305-L306

Added lines #L305 - L306 were not covered by tests
}
});
}
Expand All @@ -324,7 +329,7 @@
byte[] buffer = kvService.get(KV_NAMESPACE_CHECK_POINT, checkPointKey);
if (buffer == null) {
// TODO: Check point not found
return new AckResult();
return new AckResult(AckResult.Status.ERROR);

Check warning on line 332 in store/src/main/java/com/automq/rocketmq/store/impl/MessageStoreImpl.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/impl/MessageStoreImpl.java#L332

Added line #L332 was not covered by tests
}

// TODO: Data race between ack and revive.
Expand All @@ -345,12 +350,11 @@
}

kvService.batch(requestList.toArray(new BatchRequest[0]));
} catch (RocksDBException e) {
// TODO: handle exception
throw new RuntimeException(e);
} catch (StoreException e) {
return new AckResult(AckResult.Status.ERROR);

Check warning on line 354 in store/src/main/java/com/automq/rocketmq/store/impl/MessageStoreImpl.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/impl/MessageStoreImpl.java#L353-L354

Added lines #L353 - L354 were not covered by tests
}

return new AckResult();
return new AckResult(AckResult.Status.SUCCESS);
});
}

Expand All @@ -372,7 +376,7 @@
byte[] buffer = kvService.get(KV_NAMESPACE_CHECK_POINT, checkPointKey);
if (buffer == null) {
// TODO: Check point not found
return new ChangeInvisibleDurationResult();
return new ChangeInvisibleDurationResult(ChangeInvisibleDurationResult.Status.ERROR);

Check warning on line 379 in store/src/main/java/com/automq/rocketmq/store/impl/MessageStoreImpl.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/impl/MessageStoreImpl.java#L379

Added line #L379 was not covered by tests
}

// Delete last timer tag.
Expand Down Expand Up @@ -400,12 +404,11 @@
streamId, checkPoint.messageOffset(), checkPoint.operationId()));

kvService.batch(deleteLastTimerTagRequest, writeCheckPointRequest, writeTimerTagRequest);
} catch (RocksDBException e) {
// TODO: handle exception
throw new RuntimeException(e);
} catch (StoreException e) {
return new ChangeInvisibleDurationResult(ChangeInvisibleDurationResult.Status.ERROR);

Check warning on line 408 in store/src/main/java/com/automq/rocketmq/store/impl/MessageStoreImpl.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/impl/MessageStoreImpl.java#L407-L408

Added lines #L407 - L408 were not covered by tests
}

return new ChangeInvisibleDurationResult();
return new ChangeInvisibleDurationResult(ChangeInvisibleDurationResult.Status.SUCCESS);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,9 @@

package com.automq.rocketmq.store.model.message;

public record AckResult() {
public record AckResult(Status status) {
public enum Status {
SUCCESS,
ERROR
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,9 @@

package com.automq.rocketmq.store.model.message;

public record ChangeInvisibleDurationResult() {
public record ChangeInvisibleDurationResult(Status status) {
public enum Status {
SUCCESS,
ERROR
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,11 @@
import com.automq.rocketmq.common.model.MessageExt;
import java.util.List;

public record PopResult(int status, long operationId, long deliveryTimestamp, List<MessageExt> messageList) {
public record PopResult(Status status, long operationId, long deliveryTimestamp, List<MessageExt> messageList) {
public enum Status {
FOUND,
NOT_FOUND,
END_OF_QUEUE,
ERROR
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,29 @@

package com.automq.rocketmq.store.service;

import com.automq.rocketmq.store.exception.StoreException;
import com.automq.rocketmq.store.model.kv.BatchRequest;
import com.automq.rocketmq.store.model.kv.IteratorCallback;
import org.rocksdb.RocksDBException;

public interface KVService {
// TODO: Map RocksDBException into StoreException

/**
* Get value with specified key from backend kv engine.
*
* @param namespace the namespace storing required the kv pair
* @param key the key for querying
* @return the value of the specified key
* @throws RocksDBException if backend engine fails
* @throws StoreException if backend engine fails
*/
byte[] get(final String namespace, final byte[] key) throws RocksDBException;
byte[] get(final String namespace, final byte[] key) throws StoreException;

/**
* Iterate all the k-v pairs.
*
* @param namespace the namespace storing required the k-v pair
* @param callback the iterator will call {@link IteratorCallback#onRead} to consume the kv pair
* @throws RocksDBException if backend engine fails
* @throws StoreException if backend engine fails
*/
void iterate(final String namespace, IteratorCallback callback) throws RocksDBException;
void iterate(final String namespace, IteratorCallback callback) throws StoreException;

/**
* Iterate the k-v pair with the given prefix, start and end.
Expand All @@ -55,55 +53,57 @@ public interface KVService {
* @param start the lower bound to start iterate
* @param end the upper bound to end iterate
* @param callback the iterator will call {@link IteratorCallback#onRead} to consume the kv pair
* @throws RocksDBException if backend engine fails
* @throws StoreException if backend engine fails
*/
void iterate(final String namespace, final byte[] prefix, final byte[] start,
final byte[] end, IteratorCallback callback) throws RocksDBException;
final byte[] end, IteratorCallback callback) throws StoreException;

/**
* Put the kv pair into the backend engine.
*
* @param namespace the namespace storing required the k-v pair
* @param key the key for inserting
* @param value the value for inserting
* @throws RocksDBException if backend engine fails
* @throws StoreException if backend engine fails
*/
void put(final String namespace, byte[] key, byte[] value) throws RocksDBException;
void put(final String namespace, byte[] key, byte[] value) throws StoreException;

/**
* Put or delete the kv pair in batch.
*
* @param requests the mutation requests
* @throws RocksDBException if backend engine fails
* @throws StoreException if backend engine fails
*/
void batch(BatchRequest... requests) throws RocksDBException;
void batch(BatchRequest... requests) throws StoreException;

/**
* Delete value with specified key from backend kv engine.
*
* @param namespace the namespace storing required the k-v pair
* @param key the key for deleting
* @throws RocksDBException if backend engine fails
* @throws StoreException if backend engine fails
*/
void delete(final String namespace, byte[] key) throws RocksDBException;
void delete(final String namespace, byte[] key) throws StoreException;

/**
* Forced dirty pages to the hard disk.
*
* @param sync synchronous or not
* @throws RocksDBException if backend engine fails
* @throws StoreException if backend engine fails
*/
void flush(boolean sync) throws RocksDBException;
void flush(boolean sync) throws StoreException;

/**
* Flush all dirty pages and shutdown the backend engine.
*
* @throws StoreException if backend engine fails
*/
void close() throws RocksDBException;
void close() throws StoreException;

/**
* Delete all data in the backend engine.
*
* @throws RocksDBException if backend engine fails
* @throws StoreException if backend engine fails
*/
void destroy() throws RocksDBException;
void destroy() throws StoreException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.automq.rocketmq.common.model.MessageExt;
import com.automq.rocketmq.metadata.StoreMetadataService;
import com.automq.rocketmq.store.StreamStore;
import com.automq.rocketmq.store.exception.StoreException;
import com.automq.rocketmq.store.model.generated.TimerTag;
import com.automq.rocketmq.store.model.kv.BatchDeleteRequest;
import com.automq.rocketmq.store.model.stream.SingleRecord;
Expand All @@ -29,7 +30,6 @@
import com.automq.rocketmq.stream.api.FetchResult;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.rocksdb.RocksDBException;

public class ReviveService implements Runnable {
private Thread thread;
Expand Down Expand Up @@ -79,15 +79,15 @@
try {
tryRevive();
Thread.sleep(1000);
} catch (RocksDBException e) {
// TODO: handle exception
} catch (StoreException e) {

Check warning on line 82 in store/src/main/java/com/automq/rocketmq/store/service/impl/ReviveService.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/service/impl/ReviveService.java#L82

Added line #L82 was not covered by tests
// TODO: log exception
e.printStackTrace();
} catch (InterruptedException ignored) {
}
}
}

protected void tryRevive() throws RocksDBException {
protected void tryRevive() throws StoreException {
byte[] start = ByteBuffer.allocate(8).putLong(0).array();
byte[] end = ByteBuffer.allocate(8).putLong(System.currentTimeMillis() - 1).array();

Expand All @@ -98,7 +98,7 @@
TimerTag timerTag = TimerTag.getRootAsTimerTag(ByteBuffer.wrap(value));
// The timer tag may belong to origin topic or retry topic.
FetchResult result = streamStore.fetch(timerTag.streamId(), timerTag.offset(), 1).join();
// TODO: handle exception
// TODO: log exception
assert result.recordBatchList().size() <= 1;
// Message has already been deleted.
if (result.recordBatchList().isEmpty()) {
Expand All @@ -125,8 +125,8 @@
BatchDeleteRequest deleteTimerTagRequest = new BatchDeleteRequest(timerTagNamespace,
SerializeUtil.buildTimerTagKey(timerTag.nextVisibleTimestamp(), timerTag.originTopicId(), timerTag.originQueueId(), timerTag.offset(), timerTag.operationId()));
kvService.batch(deleteCheckPointRequest, deleteTimerTagRequest);
} catch (RocksDBException e) {
// TODO: handle exception
} catch (StoreException e) {

Check warning on line 128 in store/src/main/java/com/automq/rocketmq/store/service/impl/ReviveService.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/service/impl/ReviveService.java#L128

Added line #L128 was not covered by tests
// TODO: log exception
}
});
}
Expand Down
Loading