Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

Expand All @@ -39,10 +42,28 @@ public static byte[] compress(byte[] data) throws IOException {
return bytesStream.toByteArray();
}

public static byte[] compress(File file) throws IOException {
ByteArrayOutputStream bytesStream = new ByteArrayOutputStream();
try (FileInputStream fileInputStream = new FileInputStream(file);
GZIPOutputStream gzipStream = new GZIPOutputStream(bytesStream)) {

byte[] buffer = new byte[8192]; // 8KB buffer
int bytesRead;
while ((bytesRead = fileInputStream.read(buffer)) != -1) {
gzipStream.write(buffer, 0, bytesRead);
}
}
return bytesStream.toByteArray();
}

public static byte[] decompress(byte[] data) throws IOException {
ByteArrayInputStream bytesStream = new ByteArrayInputStream(data);
try (GZIPInputStream gzipStream = new GZIPInputStream(bytesStream)) {
return IOUtils.toByteArray(gzipStream);
}
}

public static InputStream lazyDecompress(byte[] data) throws IOException {
return new GZIPInputStream(new ByteArrayInputStream(data));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.analysis;

import org.apache.doris.backup.BackupJobInfo;
import org.apache.doris.backup.BackupMeta;
import org.apache.doris.backup.Repository;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ReplicaAllocation;
Expand Down Expand Up @@ -66,8 +68,8 @@ public class RestoreStmt extends AbstractBackupStmt implements NotFallbackInPars
private boolean isCleanPartitions = false;
private boolean isAtomicRestore = false;
private boolean isForceReplace = false;
private byte[] meta = null;
private byte[] jobInfo = null;
private BackupMeta meta = null;
private BackupJobInfo jobInfo = null;
private String storageVaultName = null;

public RestoreStmt(LabelName labelName, String repoName, AbstractBackupTableRefClause restoreTableRefClause,
Expand All @@ -76,7 +78,7 @@ public RestoreStmt(LabelName labelName, String repoName, AbstractBackupTableRefC
}

public RestoreStmt(LabelName labelName, String repoName, AbstractBackupTableRefClause restoreTableRefClause,
Map<String, String> properties, byte[] meta, byte[] jobInfo) {
Map<String, String> properties, BackupMeta meta, BackupJobInfo jobInfo) {
super(labelName, repoName, restoreTableRefClause, properties);
this.meta = meta;
this.jobInfo = jobInfo;
Expand Down Expand Up @@ -110,11 +112,11 @@ public boolean isLocal() {
return isLocal;
}

public byte[] getMeta() {
public BackupMeta getMeta() {
return meta;
}

public byte[] getJobInfo() {
public BackupJobInfo getJobInfo() {
return jobInfo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,8 +584,7 @@ private void restore(Repository repository, Database db, RestoreStmt stmt) throw
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "not supported now.");
}
if (stmt.isLocal()) {
String jobInfoString = new String(stmt.getJobInfo());
jobInfo = BackupJobInfo.genFromJson(jobInfoString);
jobInfo = stmt.getJobInfo();

if (jobInfo.extraInfo == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Invalid job extra info empty");
Expand Down Expand Up @@ -622,13 +621,7 @@ private void restore(Repository repository, Database db, RestoreStmt stmt) throw
metaVersion = jobInfo.metaVersion;
}

BackupMeta backupMeta;
try {
backupMeta = BackupMeta.fromBytes(stmt.getMeta(), metaVersion);
} catch (IOException e) {
LOG.warn("read backup meta failed, current meta version {}", Env.getCurrentEnvJournalVersion(), e);
throw new DdlException("read backup meta failed", e);
}
BackupMeta backupMeta = stmt.getMeta();
String backupTimestamp = TimeUtils.longToTimeString(
jobInfo.getBackupTime(), TimeUtils.getDatetimeFormatWithHyphenWithTimeZone());
restoreJob = new RestoreJob(stmt.getLabel(), backupTimestamp,
Expand Down
16 changes: 4 additions & 12 deletions fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -1175,20 +1175,12 @@ public synchronized Snapshot getSnapshot() {
// Avoid loading expired meta.
long expiredAt = createTime + timeoutMs;
if (System.currentTimeMillis() >= expiredAt) {
return new Snapshot(label, new byte[0], new byte[0], expiredAt, commitSeq);
return new Snapshot(label, null, null, expiredAt, commitSeq);
}

try {
File metaInfoFile = new File(localMetaInfoFilePath);
File jobInfoFile = new File(localJobInfoFilePath);
byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
return new Snapshot(label, metaInfoBytes, jobInfoBytes, expiredAt, commitSeq);
} catch (IOException e) {
LOG.warn("failed to load meta info and job info file, meta info file {}, job info file {}: ",
localMetaInfoFilePath, localJobInfoFilePath, e);
return null;
}
File metaInfoFile = new File(localMetaInfoFilePath);
File jobInfoFile = new File(localJobInfoFilePath);
return new Snapshot(label, metaInfoFile, jobInfoFile, expiredAt, commitSeq);
}

public synchronized List<String> getInfo() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
Expand Down Expand Up @@ -763,6 +765,12 @@ public static BackupJobInfo genFromJson(String json) {
return jobInfo;
}

public static BackupJobInfo fromInputStream(InputStream inputStream) throws IOException {
try (InputStreamReader reader = new InputStreamReader(inputStream)) {
return GsonUtils.GSON.fromJson(reader, BackupJobInfo.class);
}
}

public void writeToFile(File jobInfoFile) throws FileNotFoundException {
PrintWriter printWriter = new PrintWriter(jobInfoFile);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public static BackupMeta fromBytes(byte[] bytes, int metaVersion) throws IOExcep
return fromInputStream(new ByteArrayInputStream(bytes), metaVersion);
}

protected static BackupMeta fromInputStream(InputStream stream, int metaVersion) throws IOException {
public static BackupMeta fromInputStream(InputStream stream, int metaVersion) throws IOException {
MetaContext metaContext = new MetaContext();
metaContext.setMetaVersion(metaVersion);
metaContext.setThreadLocalInfo();
Expand Down
62 changes: 46 additions & 16 deletions fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,22 @@

package org.apache.doris.backup;

import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.common.GZIPUtils;
import org.apache.doris.common.Pair;

import com.google.gson.annotations.SerializedName;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;

public class Snapshot {
@SerializedName(value = "label")
private String label = null;

@SerializedName(value = "meta")
private byte[] meta = null;
private File meta = null;

@SerializedName(value = "jobInfo")
private byte[] jobInfo = null;
private File jobInfo = null;

@SerializedName(value = "expired_at")
private long expiredAt = 0;
Expand All @@ -40,20 +43,53 @@ public class Snapshot {
public Snapshot() {
}

public Snapshot(String label, byte[] meta, byte[] jobInfo, long expiredAt, long commitSeq) {
public Snapshot(String label, File meta, File jobInfo, long expiredAt, long commitSeq) {
this.label = label;
this.meta = meta;
this.jobInfo = jobInfo;
this.expiredAt = expiredAt;
this.commitSeq = commitSeq;
}

public byte[] getMeta() {
return meta;
public static Pair<BackupMeta, BackupJobInfo> readFromBytes(byte[] meta, byte[] jobInfo) throws IOException {
BackupJobInfo backupJobInfo = BackupJobInfo.genFromJson(new String(jobInfo));
BackupMeta backupMeta = BackupMeta.fromBytes(meta, backupJobInfo.metaVersion);
return Pair.of(backupMeta, backupJobInfo);
}

public static Pair<BackupMeta, BackupJobInfo> readFromCompressedBytes(byte[] meta, byte[] jobInfo)
throws IOException {
BackupJobInfo backupJobInfo = BackupJobInfo.fromInputStream(GZIPUtils.lazyDecompress(jobInfo));
BackupMeta backupMeta = BackupMeta.fromInputStream(GZIPUtils.lazyDecompress(meta), backupJobInfo.metaVersion);
return Pair.of(backupMeta, backupJobInfo);
}

public static boolean isCompressed(byte[] meta, byte[] jobInfo) {
return GZIPUtils.isGZIPCompressed(jobInfo) || GZIPUtils.isGZIPCompressed(meta);
}

public long getMetaSize() {
return meta != null ? meta.length() : 0;
}

public long getJobInfoSize() {
return jobInfo != null ? jobInfo.length() : 0;
}

public byte[] getJobInfo() {
return jobInfo;
public byte[] getCompressedMeta() throws IOException {
return GZIPUtils.compress(meta);
}

public byte[] getCompressedJobInfo() throws IOException {
return GZIPUtils.compress(jobInfo);
}

public byte[] getMeta() throws IOException {
return Files.readAllBytes(meta.toPath());
}

public byte[] getJobInfo() throws IOException {
return Files.readAllBytes(jobInfo.toPath());
}

public long getExpiredAt() {
Expand All @@ -68,16 +104,10 @@ public long getCommitSeq() {
return commitSeq;
}

public String toJson() {
return GsonUtils.GSON.toJson(this);
}

@Override
public String toString() {
return "Snapshot{"
+ "label='" + label + '\''
+ ", meta=" + meta
+ ", jobInfo=" + jobInfo
+ ", expiredAt=" + expiredAt
+ ", commitSeq=" + commitSeq
+ '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TableRef;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.backup.BackupJobInfo;
import org.apache.doris.backup.BackupMeta;
import org.apache.doris.backup.Snapshot;
import org.apache.doris.binlog.BinlogLagInfo;
import org.apache.doris.catalog.AutoIncrementGenerator;
Expand Down Expand Up @@ -61,7 +63,6 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.GZIPUtils;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
Expand Down Expand Up @@ -3141,24 +3142,38 @@ private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String c
result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_EXPIRED);
result.getStatus().addToErrorMsgs(String.format("snapshot %s is expired", label));
} else {
byte[] meta = snapshot.getMeta();
byte[] jobInfo = snapshot.getJobInfo();
long metaSize = snapshot.getMetaSize();
long jobInfoSize = snapshot.getJobInfoSize();
long snapshotSize = snapshot.getMetaSize() + snapshot.getJobInfoSize();
if (metaSize + jobInfoSize >= Integer.MAX_VALUE && !request.isEnableCompress()) {
String msg = String.format(
"Snapshot %s is too large (%d bytes > 2GB). Please enable compression to continue.",
label, snapshotSize);
LOG.warn("get snapshot failed: {}", msg);
result.getStatus().setStatusCode(TStatusCode.INTERNAL_ERROR);
result.getStatus().addToErrorMsgs(msg);
return result;
}

long expiredAt = snapshot.getExpiredAt();
long commitSeq = snapshot.getCommitSeq();

LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}, "
+ "expired at: {}, commit seq: {}", label, meta.length, jobInfo.length, expiredAt, commitSeq);
+ "expired at: {}, commit seq: {}", label, metaSize, jobInfoSize, expiredAt, commitSeq);
if (request.isEnableCompress()) {
meta = GZIPUtils.compress(meta);
jobInfo = GZIPUtils.compress(jobInfo);
byte[] meta = snapshot.getCompressedMeta();
byte[] jobInfo = snapshot.getCompressedJobInfo();
result.setMeta(meta);
result.setJobInfo(jobInfo);
result.setCompressed(true);
if (LOG.isDebugEnabled()) {
LOG.debug("get snapshot info with compress, snapshot: {}, compressed meta "
+ "size {}, compressed job info size {}", label, meta.length, jobInfo.length);
}
} else {
result.setMeta(snapshot.getMeta());
result.setJobInfo(snapshot.getJobInfo());
}
result.setMeta(meta);
result.setJobInfo(jobInfo);
result.setExpiredAt(expiredAt);
result.setCommitSeq(commitSeq);
}
Expand Down Expand Up @@ -3271,6 +3286,8 @@ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest reque
}
}

BackupMeta backupMeta;
BackupJobInfo backupJobInfo;
byte[] meta = request.getMeta();
byte[] jobInfo = request.getJobInfo();
if (Config.enable_restore_snapshot_rpc_compression && request.isCompressed()) {
Expand All @@ -3279,18 +3296,29 @@ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest reque
meta.length, jobInfo.length);
}
try {
meta = GZIPUtils.decompress(meta);
jobInfo = GZIPUtils.decompress(jobInfo);
Pair<BackupMeta, BackupJobInfo> pair = Snapshot.readFromCompressedBytes(meta, jobInfo);
backupMeta = pair.first;
backupJobInfo = pair.second;
} catch (Exception e) {
LOG.warn("decompress meta and job info failed", e);
throw new UserException("decompress meta and job info failed", e);
}
} else if (GZIPUtils.isGZIPCompressed(jobInfo) || GZIPUtils.isGZIPCompressed(meta)) {
} else if (Snapshot.isCompressed(meta, jobInfo)) {
throw new UserException("The request is compressed, but the config "
+ "`enable_restore_snapshot_rpc_compressed` is not enabled.");
} else {
try {
Pair<BackupMeta, BackupJobInfo> pair = Snapshot.readFromBytes(meta, jobInfo);
backupMeta = pair.first;
backupJobInfo = pair.second;
} catch (Exception e) {
LOG.warn("deserialize meta and job info failed", e);
throw new UserException("deserialize meta and job info failed", e);
}
}

RestoreStmt restoreStmt = new RestoreStmt(label, repoName, restoreTableRefClause, properties, meta, jobInfo);
RestoreStmt restoreStmt = new RestoreStmt(
label, repoName, restoreTableRefClause, properties, backupMeta, backupJobInfo);
restoreStmt.setIsBeingSynced();
LOG.debug("restore snapshot info, restoreStmt: {}", restoreStmt);
try {
Expand Down