Skip to content

Commit

Permalink
replay: add support for decompressing ZST log files (commaai#32910)
Browse files Browse the repository at this point in the history
* Add Support for Decompressing ZST Log Files

* 2 space and check magic number

* match BZ2

---------

Co-authored-by: Shane Smiskol <shane@smiskol.com>
old-commit-hash: ade1372
  • Loading branch information
deanlee and sshane authored Jul 29, 2024
1 parent 0fe0c60 commit 55ccea2
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 7 deletions.
2 changes: 1 addition & 1 deletion tools/cabana/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ else:
qt_libs = ['qt_util'] + base_libs

cabana_env = qt_env.Clone()
cabana_libs = [widgets, cereal, messaging, visionipc, replay_lib, 'panda', 'avutil', 'avcodec', 'avformat', 'bz2', 'curl', 'yuv', 'usb-1.0'] + qt_libs
cabana_libs = [widgets, cereal, messaging, visionipc, replay_lib, 'panda', 'avutil', 'avcodec', 'avformat', 'bz2', 'zstd', 'curl', 'yuv', 'usb-1.0'] + qt_libs
opendbc_path = '-DOPENDBC_FILE_PATH=\'"%s"\'' % (cabana_env.Dir("../../opendbc").abspath)
cabana_env['CXXFLAGS'] += [opendbc_path]

Expand Down
1 change: 1 addition & 0 deletions tools/install_ubuntu_dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ function install_ubuntu_common_requirements() {
libssl-dev \
libusb-1.0-0-dev \
libzmq3-dev \
libzstd-dev \
libsqlite3-dev \
libsystemd-dev \
locales \
Expand Down
2 changes: 1 addition & 1 deletion tools/replay/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ else:
replay_lib_src = ["replay.cc", "consoleui.cc", "camera.cc", "filereader.cc", "logreader.cc", "framereader.cc", "route.cc", "util.cc"]
replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs, FRAMEWORKS=base_frameworks)
Export('replay_lib')
replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'bz2', 'curl', 'yuv', 'ncurses'] + base_libs
replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'bz2', 'zstd', 'curl', 'yuv', 'ncurses'] + base_libs
qt_env.Program("replay", ["main.cc"], LIBS=replay_libs, FRAMEWORKS=base_frameworks)

if GetOption('extras'):
Expand Down
10 changes: 8 additions & 2 deletions tools/replay/logreader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@
#include <utility>
#include "tools/replay/filereader.h"
#include "tools/replay/util.h"
#include "common/util.h"

bool LogReader::load(const std::string &url, std::atomic<bool> *abort, bool local_cache, int chunk_size, int retries) {
std::string data = FileReader(local_cache, chunk_size, retries).read(url, abort);
if (!data.empty() && url.find(".bz2") != std::string::npos)
data = decompressBZ2(data, abort);
if (!data.empty()) {
if (url.find(".bz2") != std::string::npos || util::starts_with(data, "BZh9")) {
data = decompressBZ2(data, abort);
} else if (url.find(".zst") != std::string::npos || util::starts_with(data, "\x28\xB5\x2F\xFD")) {
data = decompressZST(data, abort);
}
}

bool success = !data.empty() && load(data.data(), data.size(), abort);
if (filters_.empty())
Expand Down
4 changes: 2 additions & 2 deletions tools/replay/route.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ void Route::addFileToSegment(int n, const QString &file) {
const int pos = name.lastIndexOf("--");
name = pos != -1 ? name.mid(pos + 2) : name;

if (name == "rlog.bz2" || name == "rlog") {
if (name == "rlog.bz2" || name == "rlog.zst" || name == "rlog") {
segments_[n].rlog = file;
} else if (name == "qlog.bz2" || name == "qlog") {
} else if (name == "qlog.bz2" || name == "qlog.zst" || name == "qlog") {
segments_[n].qlog = file;
} else if (name == "fcamera.hevc") {
segments_[n].road_cam = file;
Expand Down
37 changes: 36 additions & 1 deletion tools/replay/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <mutex>
#include <numeric>
#include <utility>
#include <zstd.h>

#include "common/timing.h"
#include "common/util.h"
Expand Down Expand Up @@ -300,7 +301,7 @@ std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic<bool>
if (bzerror == BZ_OK && prev_write_pos == strm.next_out) {
// content is corrupt
bzerror = BZ_STREAM_END;
rWarning("decompressBZ2 error : content is corrupt");
rWarning("decompressBZ2 error: content is corrupt");
break;
}

Expand All @@ -318,6 +319,40 @@ std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic<bool>
return {};
}

std::string decompressZST(const std::string &in, std::atomic<bool> *abort) {
return decompressZST((std::byte *)in.data(), in.size(), abort);
}

std::string decompressZST(const std::byte *in, size_t in_size, std::atomic<bool> *abort) {
ZSTD_DCtx *dctx = ZSTD_createDCtx();
assert(dctx != nullptr);

// Initialize input and output buffers
ZSTD_inBuffer input = {in, in_size, 0};
std::string decompressedData;
const size_t bufferSize = ZSTD_DStreamOutSize(); // recommended output buffer size
std::string outputBuffer(bufferSize, '\0');

while (input.pos < input.size && !(abort && *abort)) {
ZSTD_outBuffer output = {outputBuffer.data(), bufferSize, 0};

size_t result = ZSTD_decompressStream(dctx, &output, &input);
if (ZSTD_isError(result)) {
rWarning("decompressZST error: content is corrupt");
break;
}

decompressedData.append(outputBuffer.data(), output.pos);
}

ZSTD_freeDCtx(dctx);
if (!(abort && *abort)) {
decompressedData.shrink_to_fit();
return decompressedData;
}
return {};
}

void precise_nano_sleep(int64_t nanoseconds, std::atomic<bool> &should_exit) {
struct timespec req, rem;
req.tv_sec = nanoseconds / 1000000000;
Expand Down
2 changes: 2 additions & 0 deletions tools/replay/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ std::string sha256(const std::string &str);
void precise_nano_sleep(int64_t nanoseconds, std::atomic<bool> &should_exit);
std::string decompressBZ2(const std::string &in, std::atomic<bool> *abort = nullptr);
std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic<bool> *abort = nullptr);
std::string decompressZST(const std::string &in, std::atomic<bool> *abort = nullptr);
std::string decompressZST(const std::byte *in, size_t in_size, std::atomic<bool> *abort = nullptr);
std::string getUrlWithoutQuery(const std::string &url);
size_t getRemoteFileSize(const std::string &url, std::atomic<bool> *abort = nullptr);
std::string httpGet(const std::string &url, size_t chunk_size = 0, std::atomic<bool> *abort = nullptr);
Expand Down

0 comments on commit 55ccea2

Please sign in to comment.