diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java index 7408e2888cc3a5..4500c76f638792 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java @@ -21,7 +21,10 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -39,10 +42,28 @@ public static byte[] compress(byte[] data) throws IOException { return bytesStream.toByteArray(); } + public static byte[] compress(File file) throws IOException { + ByteArrayOutputStream bytesStream = new ByteArrayOutputStream(); + try (FileInputStream fileInputStream = new FileInputStream(file); + GZIPOutputStream gzipStream = new GZIPOutputStream(bytesStream)) { + + byte[] buffer = new byte[8192]; // 8KB buffer + int bytesRead; + while ((bytesRead = fileInputStream.read(buffer)) != -1) { + gzipStream.write(buffer, 0, bytesRead); + } + } + return bytesStream.toByteArray(); + } + public static byte[] decompress(byte[] data) throws IOException { ByteArrayInputStream bytesStream = new ByteArrayInputStream(data); try (GZIPInputStream gzipStream = new GZIPInputStream(bytesStream)) { return IOUtils.toByteArray(gzipStream); } } + + public static InputStream lazyDecompress(byte[] data) throws IOException { + return new GZIPInputStream(new ByteArrayInputStream(data)); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java index cad0f42e027ff3..ddbf9110593d0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java @@ -17,6 +17,8 @@ package org.apache.doris.analysis; +import org.apache.doris.backup.BackupJobInfo; +import org.apache.doris.backup.BackupMeta; import org.apache.doris.backup.Repository; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ReplicaAllocation; @@ -66,8 +68,8 @@ public class RestoreStmt extends AbstractBackupStmt implements NotFallbackInPars private boolean isCleanPartitions = false; private boolean isAtomicRestore = false; private boolean isForceReplace = false; - private byte[] meta = null; - private byte[] jobInfo = null; + private BackupMeta meta = null; + private BackupJobInfo jobInfo = null; private String storageVaultName = null; public RestoreStmt(LabelName labelName, String repoName, AbstractBackupTableRefClause restoreTableRefClause, @@ -76,7 +78,7 @@ public RestoreStmt(LabelName labelName, String repoName, AbstractBackupTableRefC } public RestoreStmt(LabelName labelName, String repoName, AbstractBackupTableRefClause restoreTableRefClause, - Map properties, byte[] meta, byte[] jobInfo) { + Map properties, BackupMeta meta, BackupJobInfo jobInfo) { super(labelName, repoName, restoreTableRefClause, properties); this.meta = meta; this.jobInfo = jobInfo; @@ -110,11 +112,11 @@ public boolean isLocal() { return isLocal; } - public byte[] getMeta() { + public BackupMeta getMeta() { return meta; } - public byte[] getJobInfo() { + public BackupJobInfo getJobInfo() { return jobInfo; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index ed6f135cbc08de..5db2e28fd070c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -584,8 +584,7 @@ private void restore(Repository repository, Database db, RestoreStmt stmt) throw ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "not supported now."); } if (stmt.isLocal()) { - String jobInfoString = new String(stmt.getJobInfo()); - jobInfo = BackupJobInfo.genFromJson(jobInfoString); + jobInfo = stmt.getJobInfo(); if (jobInfo.extraInfo == null) { ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Invalid job extra info empty"); @@ -622,13 +621,7 @@ private void restore(Repository repository, Database db, RestoreStmt stmt) throw metaVersion = jobInfo.metaVersion; } - BackupMeta backupMeta; - try { - backupMeta = BackupMeta.fromBytes(stmt.getMeta(), metaVersion); - } catch (IOException e) { - LOG.warn("read backup meta failed, current meta version {}", Env.getCurrentEnvJournalVersion(), e); - throw new DdlException("read backup meta failed", e); - } + BackupMeta backupMeta = stmt.getMeta(); String backupTimestamp = TimeUtils.longToTimeString( jobInfo.getBackupTime(), TimeUtils.getDatetimeFormatWithHyphenWithTimeZone()); restoreJob = new RestoreJob(stmt.getLabel(), backupTimestamp, diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index 947d7f06109612..f7e6eedc4dcea9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -1175,20 +1175,12 @@ public synchronized Snapshot getSnapshot() { // Avoid loading expired meta. long expiredAt = createTime + timeoutMs; if (System.currentTimeMillis() >= expiredAt) { - return new Snapshot(label, new byte[0], new byte[0], expiredAt, commitSeq); + return new Snapshot(label, null, null, expiredAt, commitSeq); } - try { - File metaInfoFile = new File(localMetaInfoFilePath); - File jobInfoFile = new File(localJobInfoFilePath); - byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath()); - byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath()); - return new Snapshot(label, metaInfoBytes, jobInfoBytes, expiredAt, commitSeq); - } catch (IOException e) { - LOG.warn("failed to load meta info and job info file, meta info file {}, job info file {}: ", - localMetaInfoFilePath, localJobInfoFilePath, e); - return null; - } + File metaInfoFile = new File(localMetaInfoFilePath); + File jobInfoFile = new File(localJobInfoFilePath); + return new Snapshot(label, metaInfoFile, jobInfoFile, expiredAt, commitSeq); } public synchronized List getInfo() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java index 554a21c44080f7..7e77426a82a6d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java @@ -57,6 +57,8 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.io.PrintWriter; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -763,6 +765,12 @@ public static BackupJobInfo genFromJson(String json) { return jobInfo; } + public static BackupJobInfo fromInputStream(InputStream inputStream) throws IOException { + try (InputStreamReader reader = new InputStreamReader(inputStream)) { + return GsonUtils.GSON.fromJson(reader, BackupJobInfo.class); + } + } + public void writeToFile(File jobInfoFile) throws FileNotFoundException { PrintWriter printWriter = new PrintWriter(jobInfoFile); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java index 850d6f92e83c0f..124fa802ef90a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java @@ -103,7 +103,7 @@ public static BackupMeta fromBytes(byte[] bytes, int metaVersion) throws IOExcep return fromInputStream(new ByteArrayInputStream(bytes), metaVersion); } - protected static BackupMeta fromInputStream(InputStream stream, int metaVersion) throws IOException { + public static BackupMeta fromInputStream(InputStream stream, int metaVersion) throws IOException { MetaContext metaContext = new MetaContext(); metaContext.setMetaVersion(metaVersion); metaContext.setThreadLocalInfo(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java index a9f734dbc99220..2fc3ca6d146ee6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java @@ -17,19 +17,22 @@ package org.apache.doris.backup; -import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.common.GZIPUtils; +import org.apache.doris.common.Pair; import com.google.gson.annotations.SerializedName; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; + public class Snapshot { @SerializedName(value = "label") private String label = null; - @SerializedName(value = "meta") - private byte[] meta = null; + private File meta = null; - @SerializedName(value = "jobInfo") - private byte[] jobInfo = null; + private File jobInfo = null; @SerializedName(value = "expired_at") private long expiredAt = 0; @@ -40,7 +43,7 @@ public class Snapshot { public Snapshot() { } - public Snapshot(String label, byte[] meta, byte[] jobInfo, long expiredAt, long commitSeq) { + public Snapshot(String label, File meta, File jobInfo, long expiredAt, long commitSeq) { this.label = label; this.meta = meta; this.jobInfo = jobInfo; @@ -48,12 +51,45 @@ public Snapshot(String label, byte[] meta, byte[] jobInfo, long expiredAt, long this.commitSeq = commitSeq; } - public byte[] getMeta() { - return meta; + public static Pair readFromBytes(byte[] meta, byte[] jobInfo) throws IOException { + BackupJobInfo backupJobInfo = BackupJobInfo.genFromJson(new String(jobInfo)); + BackupMeta backupMeta = BackupMeta.fromBytes(meta, backupJobInfo.metaVersion); + return Pair.of(backupMeta, backupJobInfo); + } + + public static Pair readFromCompressedBytes(byte[] meta, byte[] jobInfo) + throws IOException { + BackupJobInfo backupJobInfo = BackupJobInfo.fromInputStream(GZIPUtils.lazyDecompress(jobInfo)); + BackupMeta backupMeta = BackupMeta.fromInputStream(GZIPUtils.lazyDecompress(meta), backupJobInfo.metaVersion); + return Pair.of(backupMeta, backupJobInfo); + } + + public static boolean isCompressed(byte[] meta, byte[] jobInfo) { + return GZIPUtils.isGZIPCompressed(jobInfo) || GZIPUtils.isGZIPCompressed(meta); + } + + public long getMetaSize() { + return meta != null ? meta.length() : 0; + } + + public long getJobInfoSize() { + return jobInfo != null ? jobInfo.length() : 0; } - public byte[] getJobInfo() { - return jobInfo; + public byte[] getCompressedMeta() throws IOException { + return GZIPUtils.compress(meta); + } + + public byte[] getCompressedJobInfo() throws IOException { + return GZIPUtils.compress(jobInfo); + } + + public byte[] getMeta() throws IOException { + return Files.readAllBytes(meta.toPath()); + } + + public byte[] getJobInfo() throws IOException { + return Files.readAllBytes(jobInfo.toPath()); } public long getExpiredAt() { @@ -68,16 +104,10 @@ public long getCommitSeq() { return commitSeq; } - public String toJson() { - return GsonUtils.GSON.toJson(this); - } - @Override public String toString() { return "Snapshot{" + "label='" + label + '\'' - + ", meta=" + meta - + ", jobInfo=" + jobInfo + ", expiredAt=" + expiredAt + ", commitSeq=" + commitSeq + '}'; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 6057a20d9f389f..262f64938cc913 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -28,6 +28,8 @@ import org.apache.doris.analysis.TableName; import org.apache.doris.analysis.TableRef; import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.backup.BackupJobInfo; +import org.apache.doris.backup.BackupMeta; import org.apache.doris.backup.Snapshot; import org.apache.doris.binlog.BinlogLagInfo; import org.apache.doris.catalog.AutoIncrementGenerator; @@ -61,7 +63,6 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.DuplicatedRequestException; -import org.apache.doris.common.GZIPUtils; import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; @@ -3141,24 +3142,38 @@ private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String c result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_EXPIRED); result.getStatus().addToErrorMsgs(String.format("snapshot %s is expired", label)); } else { - byte[] meta = snapshot.getMeta(); - byte[] jobInfo = snapshot.getJobInfo(); + long metaSize = snapshot.getMetaSize(); + long jobInfoSize = snapshot.getJobInfoSize(); + long snapshotSize = snapshot.getMetaSize() + snapshot.getJobInfoSize(); + if (metaSize + jobInfoSize >= Integer.MAX_VALUE && !request.isEnableCompress()) { + String msg = String.format( + "Snapshot %s is too large (%d bytes > 2GB). Please enable compression to continue.", + label, snapshotSize); + LOG.warn("get snapshot failed: {}", msg); + result.getStatus().setStatusCode(TStatusCode.INTERNAL_ERROR); + result.getStatus().addToErrorMsgs(msg); + return result; + } + long expiredAt = snapshot.getExpiredAt(); long commitSeq = snapshot.getCommitSeq(); LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}, " - + "expired at: {}, commit seq: {}", label, meta.length, jobInfo.length, expiredAt, commitSeq); + + "expired at: {}, commit seq: {}", label, metaSize, jobInfoSize, expiredAt, commitSeq); if (request.isEnableCompress()) { - meta = GZIPUtils.compress(meta); - jobInfo = GZIPUtils.compress(jobInfo); + byte[] meta = snapshot.getCompressedMeta(); + byte[] jobInfo = snapshot.getCompressedJobInfo(); + result.setMeta(meta); + result.setJobInfo(jobInfo); result.setCompressed(true); if (LOG.isDebugEnabled()) { LOG.debug("get snapshot info with compress, snapshot: {}, compressed meta " + "size {}, compressed job info size {}", label, meta.length, jobInfo.length); } + } else { + result.setMeta(snapshot.getMeta()); + result.setJobInfo(snapshot.getJobInfo()); } - result.setMeta(meta); - result.setJobInfo(jobInfo); result.setExpiredAt(expiredAt); result.setCommitSeq(commitSeq); } @@ -3271,6 +3286,8 @@ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest reque } } + BackupMeta backupMeta; + BackupJobInfo backupJobInfo; byte[] meta = request.getMeta(); byte[] jobInfo = request.getJobInfo(); if (Config.enable_restore_snapshot_rpc_compression && request.isCompressed()) { @@ -3279,18 +3296,29 @@ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest reque meta.length, jobInfo.length); } try { - meta = GZIPUtils.decompress(meta); - jobInfo = GZIPUtils.decompress(jobInfo); + Pair pair = Snapshot.readFromCompressedBytes(meta, jobInfo); + backupMeta = pair.first; + backupJobInfo = pair.second; } catch (Exception e) { LOG.warn("decompress meta and job info failed", e); throw new UserException("decompress meta and job info failed", e); } - } else if (GZIPUtils.isGZIPCompressed(jobInfo) || GZIPUtils.isGZIPCompressed(meta)) { + } else if (Snapshot.isCompressed(meta, jobInfo)) { throw new UserException("The request is compressed, but the config " + "`enable_restore_snapshot_rpc_compressed` is not enabled."); + } else { + try { + Pair pair = Snapshot.readFromBytes(meta, jobInfo); + backupMeta = pair.first; + backupJobInfo = pair.second; + } catch (Exception e) { + LOG.warn("deserialize meta and job info failed", e); + throw new UserException("deserialize meta and job info failed", e); + } } - RestoreStmt restoreStmt = new RestoreStmt(label, repoName, restoreTableRefClause, properties, meta, jobInfo); + RestoreStmt restoreStmt = new RestoreStmt( + label, repoName, restoreTableRefClause, properties, backupMeta, backupJobInfo); restoreStmt.setIsBeingSynced(); LOG.debug("restore snapshot info, restoreStmt: {}", restoreStmt); try {