From 7f0869508b9688522da23af3a2280a04ab43222a Mon Sep 17 00:00:00 2001 From: Marius Appel Date: Mon, 25 Feb 2019 16:02:04 +0100 Subject: [PATCH 1/7] exception handling in multithread chunk processing --- src/cube.cpp | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/cube.cpp b/src/cube.cpp index 8af1fcf7..d2ed9d83 100644 --- a/src/cube.cpp +++ b/src/cube.cpp @@ -441,8 +441,16 @@ void chunk_processor_multithread::apply(std::shared_ptr c, for (uint16_t it = 0; it < _nthreads; ++it) { workers.push_back(std::thread([this, &c, f, it, &mutex](void) { for (uint32_t i = it; i < c->count_chunks(); i += _nthreads) { - std::shared_ptr dat = c->read_chunk(i); - f(i, dat, mutex); + try { + std::shared_ptr dat = c->read_chunk(i); + f(i, dat, mutex); + } catch (std::string s) { + GCBS_ERROR(s); + continue; + } catch (...) { + GCBS_ERROR("unexpected exception while processing chunk " + std::to_string(i)); + continue; + } } })); } From 8b3397552359a5d35c96a149780c84e8236264ce Mon Sep 17 00:00:00 2001 From: Marius Appel Date: Mon, 25 Feb 2019 17:07:47 +0100 Subject: [PATCH 2/7] thread-safe random filename generation --- src/utils.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/utils.h b/src/utils.h index 18508b50..a917e768 100644 --- a/src/utils.h +++ b/src/utils.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -40,11 +41,14 @@ class utils { static std::mt19937 gen(time(NULL)); //Standard mersenne_twister_engine seeded with rd() static const std::string LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; static std::uniform_int_distribution<> dis(0, LETTERS.length() - 1); + static std::mutex mtx; + mtx.lock(); std::stringstream ss; for (uint16_t i = 0; i < n; ++i) { ss << LETTERS[dis(gen)]; } std::string out = prefix + ss.str() + suffix; + mtx.unlock(); return out; } From 89bf827f18695215e1d4d30855af39b9b3ef716d Mon Sep 17 00:00:00 2001 From: Marius Appel Date: Tue, 26 Feb 2019 10:39:57 +0100 Subject: [PATCH 3/7] fix race conditions in streaming due to non thread-safe setenv / putenv --- src/stream.cpp | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/src/stream.cpp b/src/stream.cpp index dd18bf38..ed521174 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -134,18 +134,6 @@ std::shared_ptr stream_cube::stream_chunk_file(std::shared_ptrget_streaming_dir(), utils::generate_unique_filename(12, ".stream_", "_in")); std::string f_out = filesystem::join(config::instance()->get_streaming_dir(), utils::generate_unique_filename(12, ".stream_", "_out")); -#ifdef _WIN32 - _putenv("GDALCUBES_STREAMING=1"); - //_putenv((std::string("GDALCUBES_STREAMING_DIR") + "=" + config::instance()->get_streaming_dir().c_str()).c_str()); - _putenv((std::string("GDALCUBES_STREAMING_FILE_IN") + "=" + f_in.c_str()).c_str()); - _putenv((std::string("GDALCUBES_STREAMING_FILE_OUT") + "=" + f_out.c_str()).c_str()); -#else - setenv("GDALCUBES_STREAMING", "1", 1); - // setenv("GDALCUBES_STREAMING_DIR", config::instance()->get_streaming_dir().c_str(), 1); - setenv("GDALCUBES_STREAMING_FILE_IN", f_in.c_str(), 1); - setenv("GDALCUBES_STREAMING_FILE_OUT", f_out.c_str(), 1); -#endif - std::string errstr; // capture error string // write input data @@ -181,10 +169,26 @@ std::shared_ptr stream_cube::stream_chunk_file(std::shared_ptrbuf())), sizeof(double) * data->size()[0] * data->size()[1] * data->size()[2] * data->size()[3]); f_in_stream.close(); + /* setenv / _putenv is not thread-safe, we need to get a mutex until the child process has been started. */ + static std::mutex mtx; + mtx.lock(); +#ifdef _WIN32 + _putenv("GDALCUBES_STREAMING=1"); + //_putenv((std::string("GDALCUBES_STREAMING_DIR") + "=" + config::instance()->get_streaming_dir().c_str()).c_str()); + _putenv((std::string("GDALCUBES_STREAMING_FILE_IN") + "=" + f_in.c_str()).c_str()); + _putenv((std::string("GDALCUBES_STREAMING_FILE_OUT") + "=" + f_out.c_str()).c_str()); +#else + setenv("GDALCUBES_STREAMING", "1", 1); + // setenv("GDALCUBES_STREAMING_DIR", config::instance()->get_streaming_dir().c_str(), 1); + setenv("GDALCUBES_STREAMING_FILE_IN", f_in.c_str(), 1); + setenv("GDALCUBES_STREAMING_FILE_OUT", f_out.c_str(), 1); +#endif + // start process TinyProcessLib::Process process(_cmd, "", [](const char *bytes, std::size_t n) {}, [&errstr](const char *bytes, std::size_t n) { errstr = std::string(bytes, n); GCBS_DEBUG(errstr); }, false); + mtx.unlock(); auto exit_status = process.get_exit_status(); filesystem::remove(f_in); if (exit_status != 0) { From a8d237b4f9685ef715b394d7c90bac6ac5c52830 Mon Sep 17 00:00:00 2001 From: Marius Appel Date: Tue, 26 Feb 2019 12:06:54 +0100 Subject: [PATCH 4/7] fix window_time for empty chunk time series --- src/window_time.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/window_time.cpp b/src/window_time.cpp index a568f7a0..8c95d9d5 100644 --- a/src/window_time.cpp +++ b/src/window_time.cpp @@ -149,6 +149,7 @@ std::shared_ptr window_time_cube::read_chunk(chunkid_t id) { // buffer for a single time series including data from adjacent chunks for all used input bands uint32_t cur_ts_length = _win_size_l + size_tyx[0] + _win_size_r; double* cur_ts = (double*)std::calloc(cur_ts_length * _bands.count(), sizeof(double)); + std::fill(cur_ts, cur_ts + cur_ts_length * _bands.count(), NAN); for (uint32_t ixy = 0; ixy < size_tyx[1] * size_tyx[2]; ++ixy) { // fill values from l chunks From 24f2f8ebcb57e6ce864817a4f2c0ba4f1b96d436 Mon Sep 17 00:00:00 2001 From: Marius Appel Date: Tue, 26 Feb 2019 13:39:35 +0100 Subject: [PATCH 5/7] add ncdf compression --- src/cube.cpp | 7 ++++++- src/cube.h | 3 ++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/cube.cpp b/src/cube.cpp index d2ed9d83..e4414951 100644 --- a/src/cube.cpp +++ b/src/cube.cpp @@ -245,7 +245,7 @@ void cube::write_gtiff_directory(std::string dir, std::shared_ptrfinalize(); //} -void cube::write_netcdf_file(std::string path, std::shared_ptr p) { +void cube::write_netcdf_file(std::string path, uint8_t compression_level, std::shared_ptr p) { std::string op = filesystem::make_absolute(path); if (filesystem::is_directory(op)) { @@ -371,6 +371,11 @@ void cube::write_netcdf_file(std::string path, std::shared_ptr for (uint16_t i = 0; i < bands().count(); ++i) { int v; nc_def_var(ncout, bands().get(i).name.c_str(), NC_DOUBLE, 3, d_all, &v); + std::size_t csize[3] = {_chunk_size[0], _chunk_size[1], _chunk_size[2]}; + nc_def_var_chunking(ncout, v, NC_CHUNKED, csize); + if (compression_level > 0) { + nc_def_var_deflate(ncout, v, 1, 1, compression_level); // TODO: experiment with shuffling + } if (!bands().get(i).unit.empty()) nc_put_att_text(ncout, v, "units", strlen(bands().get(i).unit.c_str()), bands().get(i).unit.c_str()); diff --git a/src/cube.h b/src/cube.h index a2271413..58019e6f 100644 --- a/src/cube.h +++ b/src/cube.h @@ -646,9 +646,10 @@ class cube : public std::enable_shared_from_this { /** * Export a cube to a single NetCDF file * @param path path of the target file + * @param compression_level deflate level, 0=no compression, 1= fast, 9 = small * @param p chunk processor instance, defaults to the global configuration */ - void write_netcdf_file(std::string path, std::shared_ptr p = config::instance()->get_default_chunk_processor()); + void write_netcdf_file(std::string path, uint8_t compression_level = 0, std::shared_ptr p = config::instance()->get_default_chunk_processor()); /** * Get the cube's bands From dc8bb05a809cc9349f282165f745da38bfc2f144 Mon Sep 17 00:00:00 2001 From: Marius Appel Date: Tue, 26 Feb 2019 14:47:52 +0100 Subject: [PATCH 6/7] add gdal version and drivers info functions --- src/config.h | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/config.h b/src/config.h index 5339e7a2..72e33469 100644 --- a/src/config.h +++ b/src/config.h @@ -232,6 +232,18 @@ class config { _collection_format_preset_dirs.push_back(dir); } + std::string gdal_version_info() { + return GDALVersionInfo("--version"); + } + + std::vector gdal_formats() { + std::vector out; + for (int i = 0; i < GDALGetDriverCount(); ++i) { + out.push_back(GDALGetDriverShortName(GDALGetDriver(i))); + } + return out; + } + private: std::shared_ptr _chunk_processor; std::shared_ptr _progress_bar; From abdcd390608c7a2104ad0841bedacf304220eefd Mon Sep 17 00:00:00 2001 From: Marius Appel Date: Thu, 28 Feb 2019 16:49:39 +0100 Subject: [PATCH 7/7] release candidate for 0.1.0 --- CMakeLists.txt | 6 +++--- TODO.md | 7 ------- src/build_info.h | 8 ++++---- 3 files changed, 7 insertions(+), 14 deletions(-) delete mode 100644 TODO.md diff --git a/CMakeLists.txt b/CMakeLists.txt index 1cc7b5c6..3fa0aa6f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 3.8) -project(gdalcubes LANGUAGES CXX C VERSION 0.0.1) +project(gdalcubes LANGUAGES CXX C VERSION 0.1.0) #set(COLLECTION_FORMAT_VERSION_MAJOR 0) #set(COLLECTION_FORMAT_VERSION_MINOR 0) @@ -19,8 +19,8 @@ endif () # #set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pg --coverage") # #set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -pg --coverage" ) # #set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -pg --coverage") -# #set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address -fsanitize=undefined -D_FORTIFY_SOURCE=0") -# #set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=address -fsanitize=undefined -D_FORTIFY_SOURCE=0") +# set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address -fsanitize=undefined -D_FORTIFY_SOURCE=0") +# set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=address -fsanitize=undefined -D_FORTIFY_SOURCE=0") #endif ( ) diff --git a/TODO.md b/TODO.md deleted file mode 100644 index fccea6b7..00000000 --- a/TODO.md +++ /dev/null @@ -1,7 +0,0 @@ -# TODO (0.0.1 release) - -* Documentation of C++ sources and API -* Documentation of gdalcubes_server API -* C++ unit tests -* R package: Swarm execution -* Swarm execution fault tolerance \ No newline at end of file diff --git a/src/build_info.h b/src/build_info.h index 5109a54d..9c4a8b4a 100644 --- a/src/build_info.h +++ b/src/build_info.h @@ -3,10 +3,10 @@ #define BUILD_INFO_H #define GDALCUBES_VERSION_MAJOR 0 -#define GDALCUBES_VERSION_MINOR 0 -#define GDALCUBES_VERSION_PATCH 1 -#define GDALCUBES_GIT_DESC "55146dc8" -#define GDALCUBES_GIT_COMMIT "55146dc8f0803822f3516cb9c5d7329280df1684" +#define GDALCUBES_VERSION_MINOR 1 +#define GDALCUBES_VERSION_PATCH 0 +#define GDALCUBES_GIT_DESC "dc8bb05a" +#define GDALCUBES_GIT_COMMIT "dc8bb05a809cc9349f282165f745da38bfc2f144" #define COLLECTION_FORMAT_VERSION_MAJOR #define COLLECTION_FORMAT_VERSION_MINOR