Skip to content

Commit

Permalink
[ISSUE apache#8058]Support for upgrading metadata in json to rocksdb (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
LetLetMe committed Aug 22, 2024
1 parent 8859093 commit 0e29ea3
Show file tree
Hide file tree
Showing 18 changed files with 1,069 additions and 182 deletions.
27 changes: 18 additions & 9 deletions .github/workflows/maven.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,28 @@ jobs:
cache: "maven"
- name: Build with Maven
run: mvn -B package --file pom.xml
- name: Upload JVM crash logs

- name: Run tests with increased memory and debug info
run: mvn test -X -Dparallel=none -DargLine="-Xmx1024m -XX:MaxPermSize=256m"

- name: Upload Auth JVM crash logs
if: failure()
uses: actions/upload-artifact@v4
with:
name: jvm-crash-logs
path: /Users/runner/work/rocketmq/rocketmq/auth/hs_err_pid*.log
retention-days: 1
- name: Retry if failed
# if it failed , retry 2 times at most
if: failure() && fromJSON(github.run_attempt) < 3
env:
GH_REPO: ${{ github.repository }}
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}

- name: Check for broker JVM crash logs
if: failure()
run: |
echo "Attempting to retry workflow..."
gh workflow run rerun-workflow.yml -F run_id=${{ github.run_id }}
echo "Checking for JVM crash logs..."
ls -al /Users/runner/work/rocketmq/rocketmq/broker/
- name: Upload broker JVM crash logs
if: failure()
uses: actions/upload-artifact@v4
with:
name: jvm-crash-logs
path: /Users/runner/work/rocketmq/rocketmq/broker/hs_err_pid*.log
retention-days: 1
Original file line number Diff line number Diff line change
Expand Up @@ -14,46 +14,66 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.config;
package org.apache.rocketmq.broker;

import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.rocksdb.FlushOptions;
import org.rocksdb.RocksIterator;
import org.rocksdb.Statistics;
import org.rocksdb.WriteBatch;

import java.nio.charset.StandardCharsets;
import java.util.function.BiConsumer;

public class RocksDBConfigManager {
protected static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

protected volatile boolean isStop = false;
protected ConfigRocksDBStorage configRocksDBStorage = null;
public volatile boolean isStop = false;
public ConfigRocksDBStorage configRocksDBStorage = null;
private FlushOptions flushOptions = null;
private volatile long lastFlushMemTableMicroSecond = 0;

private final String filePath;
private final long memTableFlushInterval;
private DataVersion kvDataVersion = new DataVersion();


public RocksDBConfigManager(long memTableFlushInterval) {
public RocksDBConfigManager(String filePath, long memTableFlushInterval) {
this.filePath = filePath;
this.memTableFlushInterval = memTableFlushInterval;
}

public boolean load(String configFilePath, BiConsumer<byte[], byte[]> biConsumer) {
public boolean init() {
this.isStop = false;
this.configRocksDBStorage = new ConfigRocksDBStorage(configFilePath);
if (!this.configRocksDBStorage.start()) {
return false;
}
RocksIterator iterator = this.configRocksDBStorage.iterator();
this.configRocksDBStorage = new ConfigRocksDBStorage(filePath);
return this.configRocksDBStorage.start();
}
public boolean loadDataVersion() {
String currDataVersionString = null;
try {
byte[] dataVersion = this.configRocksDBStorage.getKvDataVersion();
if (dataVersion != null && dataVersion.length > 0) {
currDataVersionString = new String(dataVersion, StandardCharsets.UTF_8);
}
kvDataVersion = StringUtils.isNotBlank(currDataVersionString) ? JSON.parseObject(currDataVersionString, DataVersion.class) : new DataVersion();
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public boolean loadData(BiConsumer<byte[], byte[]> biConsumer) {
try (RocksIterator iterator = this.configRocksDBStorage.iterator()) {
iterator.seekToFirst();
while (iterator.isValid()) {
biConsumer.accept(iterator.key(), iterator.value());
iterator.next();
}
} finally {
iterator.close();
}

this.flushOptions = new FlushOptions();
Expand Down Expand Up @@ -103,6 +123,20 @@ public void delete(final byte[] keyBytes) throws Exception {
this.configRocksDBStorage.delete(keyBytes);
}

public void updateKvDataVersion() throws Exception {
kvDataVersion.nextVersion();
this.configRocksDBStorage.updateKvDataVersion(JSON.toJSONString(kvDataVersion).getBytes(StandardCharsets.UTF_8));
}

public DataVersion getKvDataVersion() {
return kvDataVersion;
}

public void updateForbidden(String key, String value) throws Exception {
this.configRocksDBStorage.updateForbidden(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8));
}


public void batchPutWithWal(final WriteBatch batch) throws Exception {
this.configRocksDBStorage.batchPutWithWal(batch);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.concurrent.ConcurrentMap;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.config.RocksDBConfigManager;
import org.apache.rocketmq.broker.RocksDBConfigManager;
import org.apache.rocketmq.common.utils.DataConverter;
import org.rocksdb.WriteBatch;

Expand All @@ -31,14 +31,19 @@

public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {

protected RocksDBConfigManager rocksDBConfigManager;

public RocksDBConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
this.rocksDBConfigManager = new RocksDBConfigManager(brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
this.rocksDBConfigManager = new RocksDBConfigManager(configFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
}

@Override
public boolean load() {
return this.rocksDBConfigManager.load(configFilePath(), this::decode0);
if (!rocksDBConfigManager.init()) {
return false;
}
return this.rocksDBConfigManager.loadData(this::decodeOffset);
}

@Override
Expand All @@ -56,8 +61,7 @@ protected void removeConsumerOffset(String topicAtGroup) {
}
}

@Override
protected void decode0(final byte[] key, final byte[] body) {
protected void decodeOffset(final byte[] key, final byte[] body) {
String topicAtGroup = new String(key, DataConverter.CHARSET_UTF8);
RocksDBOffsetSerializeWrapper wrapper = JSON.parseObject(body, RocksDBOffsetSerializeWrapper.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,116 @@
*/
package org.apache.rocketmq.broker.subscription;

import java.io.File;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.config.RocksDBConfigManager;
import org.apache.rocketmq.broker.RocksDBConfigManager;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.rocksdb.RocksIterator;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import java.io.File;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;

public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager {

protected RocksDBConfigManager rocksDBConfigManager;

public RocksDBSubscriptionGroupManager(BrokerController brokerController) {
super(brokerController, false);
this.rocksDBConfigManager = new RocksDBConfigManager(brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
}

@Override
public boolean load() {
if (!this.rocksDBConfigManager.load(configFilePath(), this::decode0)) {
if (!rocksDBConfigManager.init()) {
return false;
}
if (!loadDataVersion() || !loadSubscriptionGroupAndForbidden()) {
return false;
}
this.init();
return true;
}

public boolean loadDataVersion() {
return this.rocksDBConfigManager.loadDataVersion();
}

public boolean loadSubscriptionGroupAndForbidden() {
return this.rocksDBConfigManager.loadData(this::decodeSubscriptionGroup)
&& this.loadForbidden(this::decodeForbidden)
&& merge();
}

public boolean loadForbidden(BiConsumer<byte[], byte[]> biConsumer) {
try (RocksIterator iterator = this.rocksDBConfigManager.configRocksDBStorage.forbiddenIterator()) {
iterator.seekToFirst();
while (iterator.isValid()) {
biConsumer.accept(iterator.key(), iterator.value());
iterator.next();
}
}
return true;
}


private boolean merge() {
if (!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) {
log.info("The switch is off, no merge operation is needed.");
return true;
}
if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) {
log.info("json file and json back file not exist, so skip merge");
return true;
}

if (!super.load()) {
log.error("load group and forbidden info from json file error, startup will exit");
return false;
}

final ConcurrentMap<String, SubscriptionGroupConfig> groupTable = this.getSubscriptionGroupTable();
final ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable = this.getForbiddenTable();
final DataVersion dataVersion = super.getDataVersion();
final DataVersion kvDataVersion = this.getDataVersion();
if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get()) {
for (Map.Entry<String, SubscriptionGroupConfig> entry : groupTable.entrySet()) {
putSubscriptionGroupConfig(entry.getValue());
log.info("import subscription config to rocksdb, group={}", entry.getValue());
}
for (Map.Entry<String, ConcurrentMap<String, Integer>> entry : forbiddenTable.entrySet()) {
try {
this.rocksDBConfigManager.updateForbidden(entry.getKey(), JSON.toJSONString(entry.getValue()));
log.info("import forbidden config to rocksdb, group={}", entry.getValue());
} catch (Exception e) {
log.error("import forbidden config to rocksdb failed, group={}", entry.getValue());
return false;
}
}
this.rocksDBConfigManager.getKvDataVersion().assignNewOne(dataVersion);
updateDataVersion();
}
log.info("finish marge subscription config from json file and merge to rocksdb");
this.persist();

return true;
}

@Override
public boolean stop() {
return this.rocksDBConfigManager.stop();
}

@Override
protected SubscriptionGroupConfig putSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) {
public SubscriptionGroupConfig putSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) {
String groupName = subscriptionGroupConfig.getGroupName();
SubscriptionGroupConfig oldConfig = this.subscriptionGroupTable.put(groupName, subscriptionGroupConfig);

Expand Down Expand Up @@ -89,8 +166,8 @@ protected SubscriptionGroupConfig removeSubscriptionGroupConfig(String groupName
return subscriptionGroupConfig;
}

@Override
protected void decode0(byte[] key, byte[] body) {

protected void decodeSubscriptionGroup(byte[] key, byte[] body) {
String groupName = new String(key, DataConverter.CHARSET_UTF8);
SubscriptionGroupConfig subscriptionGroupConfig = JSON.parseObject(body, SubscriptionGroupConfig.class);

Expand All @@ -105,8 +182,63 @@ public synchronized void persist() {
}
}

@Override
public String configFilePath() {
public String rocksdbConfigFilePath() {
return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "subscriptionGroups" + File.separator;
}

@Override
public DataVersion getDataVersion() {
return rocksDBConfigManager.getKvDataVersion();
}

@Override
public void updateDataVersion() {
try {
rocksDBConfigManager.updateKvDataVersion();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

protected void decodeForbidden(byte[] key, byte[] body) {
String forbiddenGroupName = new String(key, DataConverter.CHARSET_UTF8);
JSONObject jsonObject = JSON.parseObject(new String(body, DataConverter.CHARSET_UTF8));
Set<Map.Entry<String, Object>> entries = jsonObject.entrySet();
ConcurrentMap<String, Integer> forbiddenGroup = new ConcurrentHashMap<>(entries.size());
for (Map.Entry<String, Object> entry : entries) {
forbiddenGroup.put(entry.getKey(), (Integer) entry.getValue());
}
this.getForbiddenTable().put(forbiddenGroupName, forbiddenGroup);
log.info("load forbidden,{} value {}", forbiddenGroupName, forbiddenGroup.toString());
}

@Override
public void updateForbidden(String group, String topic, int forbiddenIndex, boolean setOrClear) {
try {
super.updateForbidden(group, topic, forbiddenIndex, setOrClear);
this.rocksDBConfigManager.updateForbidden(group, JSON.toJSONString(this.getForbiddenTable().get(group)));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void setForbidden(String group, String topic, int forbiddenIndex) {
try {
super.setForbidden(group, topic, forbiddenIndex);
this.rocksDBConfigManager.updateForbidden(group, JSON.toJSONString(this.getForbiddenTable().get(group)));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void clearForbidden(String group, String topic, int forbiddenIndex) {
try {
super.clearForbidden(group, topic, forbiddenIndex);
this.rocksDBConfigManager.updateForbidden(group, JSON.toJSONString(this.getForbiddenTable().get(group)));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Loading

0 comments on commit 0e29ea3

Please sign in to comment.