Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C] Handle log buffer files with term_length == AERON_LOGBUFFER_TERM_MAX_LENGTH on Windows. #1360

Merged
merged 2 commits into from
Sep 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 23 additions & 25 deletions aeron-client/src/main/c/util/aeron_fileutil.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
#define S_IROTH 0
#define S_IWOTH 0

static int aeron_mmap(aeron_mapped_file_t *mapping, int fd, off_t offset, bool pre_touch)
static int aeron_mmap(aeron_mapped_file_t *mapping, int fd, bool pre_touch)
{
HANDLE hmap = CreateFileMapping((HANDLE)_get_osfhandle(fd), 0, PAGE_READWRITE, 0, 0, 0);

Expand All @@ -72,7 +72,7 @@ static int aeron_mmap(aeron_mapped_file_t *mapping, int fd, off_t offset, bool p
return -1;
}

mapping->addr = MapViewOfFileEx(hmap, FILE_MAP_WRITE, 0, offset, mapping->length, NULL);
mapping->addr = MapViewOfFileEx(hmap, FILE_MAP_WRITE, 0, 0, mapping->length, NULL);

if (!CloseHandle(hmap))
{
Expand Down Expand Up @@ -113,11 +113,11 @@ int aeron_unmap(aeron_mapped_file_t *mapped_file)
return 0;
}

int aeron_ftruncate(int fd, off_t length)
int aeron_ftruncate(int fd, size_t length)
{
HANDLE hfile = (HANDLE)_get_osfhandle(fd);
LARGE_INTEGER file_size;
file_size.QuadPart = length;
file_size.QuadPart = (long long)length;

if (!SetFilePointerEx(hfile, file_size, NULL, FILE_BEGIN))
{
Expand Down Expand Up @@ -223,7 +223,7 @@ int aeron_is_directory(const char *path)
#include <stdio.h>
#include <pwd.h>

static int aeron_mmap(aeron_mapped_file_t *mapping, int fd, off_t offset, bool pre_touch)
static int aeron_mmap(aeron_mapped_file_t *mapping, int fd, bool pre_touch)
{
int flags = MAP_SHARED;

Expand All @@ -236,7 +236,7 @@ static int aeron_mmap(aeron_mapped_file_t *mapping, int fd, off_t offset, bool p
(void)pre_touch;
#endif

mapping->addr = mmap(NULL, mapping->length, PROT_READ | PROT_WRITE, flags, fd, offset);
mapping->addr = mmap(NULL, mapping->length, PROT_READ | PROT_WRITE, flags, fd, 0);
close(fd);

return MAP_FAILED == mapping->addr ? -1 : 0;
Expand Down Expand Up @@ -320,9 +320,9 @@ int aeron_map_new_file(aeron_mapped_file_t *mapped_file, const char *path, bool
int fd = aeron_create_file(path);
if (fd >= 0)
{
if (aeron_ftruncate(fd, (off_t)mapped_file->length) >= 0)
if (0 == aeron_ftruncate(fd, mapped_file->length))
{
if (aeron_mmap(mapped_file, fd, 0, fill_with_zeroes) == 0)
if (0 == aeron_mmap(mapped_file, fd, fill_with_zeroes))
{
#ifndef AERON_NATIVE_PRETOUCH
if (fill_with_zeroes)
Expand All @@ -340,12 +340,12 @@ int aeron_map_new_file(aeron_mapped_file_t *mapped_file, const char *path, bool
}
else
{
AERON_SET_ERR(errno, "Failed to stat file: %s", path);
AERON_SET_ERR(errno, "Failed to truncate file: %s", path);
}
}
else
{
AERON_SET_ERR(errno, "Failed to open file: %s", path);
AERON_SET_ERR(errno, "Failed to create file: %s", path);
}

return result;
Expand All @@ -358,13 +358,12 @@ int aeron_map_existing_file(aeron_mapped_file_t *mapped_file, const char *path)
int fd = open(path, O_RDWR);
if (fd >= 0)
{
struct stat sb;

if (fstat(fd, &sb) == 0)
const int64_t file_length = aeron_file_length(path);
if (-1 != file_length)
{
mapped_file->length = (size_t)sb.st_size;
mapped_file->length = (size_t)file_length;

if (aeron_mmap(mapped_file, fd, 0, false) == 0)
if (0 == aeron_mmap(mapped_file, fd, false))
{
result = 0;
}
Expand Down Expand Up @@ -451,17 +450,17 @@ int aeron_raw_log_map(
uint64_t page_size)
{
int result = -1;
uint64_t log_length = aeron_logbuffer_compute_log_length(term_length, page_size);
const uint64_t log_length = aeron_logbuffer_compute_log_length(term_length, page_size);

int fd = open(path, O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
int fd = aeron_create_file(path);
if (fd >= 0)
{
if (aeron_ftruncate(fd, (off_t)log_length) >= 0)
if (0 == aeron_ftruncate(fd, (size_t)log_length))
{
mapped_raw_log->mapped_file.length = (size_t)log_length;
mapped_raw_log->mapped_file.addr = NULL;

if (aeron_mmap(&mapped_raw_log->mapped_file, fd, 0, !use_sparse_files) < 0)
if (0 != aeron_mmap(&mapped_raw_log->mapped_file, fd, !use_sparse_files))
{
AERON_SET_ERR(errno, "Failed to map raw log, filename: %s", path);
return -1;
Expand Down Expand Up @@ -495,7 +494,7 @@ int aeron_raw_log_map(
}
else
{
AERON_SET_ERR(errno, "Failed to open raw log, filename: %s", path);
AERON_SET_ERR(errno, "Failed to create raw log, filename: %s", path);
}

return result;
Expand All @@ -508,13 +507,12 @@ int aeron_raw_log_map_existing(aeron_mapped_raw_log_t *mapped_raw_log, const cha
int fd = open(path, O_RDWR);
if (fd >= 0)
{
struct stat sb;

if (fstat(fd, &sb) == 0)
const int64_t file_length = aeron_file_length(path);
if (-1 != file_length)
{
mapped_raw_log->mapped_file.length = (size_t)sb.st_size;
mapped_raw_log->mapped_file.length =file_length;

if (aeron_mmap(&mapped_raw_log->mapped_file, fd, 0, pre_touch) < 0)
if (0 != aeron_mmap(&mapped_raw_log->mapped_file, fd, pre_touch))
{
AERON_SET_ERR(errno, "Failed to mmap existing raw log, filename: %s", path);
return -1;
Expand Down
2 changes: 1 addition & 1 deletion aeron-client/src/main/c/util/aeron_fileutil.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ int aeron_unmap(aeron_mapped_file_t *mapped_file);
#define S_IRWXG 0
#define S_IRWXO 0

int aeron_ftruncate(int fd, off_t length);
int aeron_ftruncate(int fd, size_t length);
int aeron_mkdir(const char *path, int permission);
#endif

Expand Down
139 changes: 128 additions & 11 deletions aeron-client/src/test/c/util/aeron_fileutil_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,22 @@ TEST_F(FileUtilTest, rawLogCloseShouldUnmapAndDeleteLogFile)
aeron_mapped_raw_log_t mapped_raw_log = {};
const char *file = "test_close_unused_file.log";
const size_t file_length = 16384;
ASSERT_EQ(0, aeron_raw_log_map(&mapped_raw_log, file, true, 4096, 4096));
const size_t term_length = 4096;
ASSERT_EQ(0, aeron_raw_log_map(&mapped_raw_log, file, true, term_length, 4096)) << aeron_errmsg();

EXPECT_NE(nullptr, mapped_raw_log.mapped_file.addr);
EXPECT_EQ(file_length, mapped_raw_log.mapped_file.length);
EXPECT_EQ((int64_t)file_length, aeron_file_length(file));
EXPECT_EQ(term_length, mapped_raw_log.term_length);
EXPECT_NE(nullptr, mapped_raw_log.log_meta_data.addr);
EXPECT_EQ(AERON_LOGBUFFER_META_DATA_LENGTH, mapped_raw_log.log_meta_data.length);
for (size_t i = 0; i < AERON_LOGBUFFER_PARTITION_COUNT; i++)
{
EXPECT_NE(nullptr, mapped_raw_log.term_buffers[i].addr);
EXPECT_EQ(term_length, mapped_raw_log.term_buffers[i].length);
}

ASSERT_EQ(0, aeron_raw_log_close(&mapped_raw_log, file));
ASSERT_EQ(0, aeron_raw_log_close(&mapped_raw_log, file)) << aeron_errmsg();

EXPECT_EQ(nullptr, mapped_raw_log.mapped_file.addr);
EXPECT_EQ((size_t)0, mapped_raw_log.mapped_file.length);
Expand All @@ -53,13 +62,13 @@ TEST_F(FileUtilTest, rawLogFreeShouldUnmapAndDeleteLogFile)
aeron_mapped_raw_log_t mapped_raw_log = {};
const char *file = "test_free_unused_file.log";
const size_t file_length = 16384;
ASSERT_EQ(0, aeron_raw_log_map(&mapped_raw_log, file, true, 4096, 4096));
ASSERT_EQ(0, aeron_raw_log_map(&mapped_raw_log, file, true, 4096, 4096)) << aeron_errmsg();

EXPECT_NE(nullptr, mapped_raw_log.mapped_file.addr);
EXPECT_EQ(file_length, mapped_raw_log.mapped_file.length);
EXPECT_EQ((int64_t)file_length, aeron_file_length(file));

ASSERT_EQ(true, aeron_raw_log_free(&mapped_raw_log, file));
ASSERT_EQ(true, aeron_raw_log_free(&mapped_raw_log, file)) << aeron_errmsg();

EXPECT_EQ(nullptr, mapped_raw_log.mapped_file.addr);
EXPECT_EQ((size_t)0, mapped_raw_log.mapped_file.length);
Expand All @@ -71,11 +80,11 @@ TEST_F(FileUtilTest, rawLogCloseShouldNotDeleteFileIfUnmapFails)
aeron_mapped_raw_log_t mapped_raw_log = {};
const char *file = "test_close_unmap_fails.log";
const size_t file_length = 16384;
ASSERT_EQ(0, aeron_raw_log_map(&mapped_raw_log, file, true, 4096, 4096));
ASSERT_EQ(0, aeron_raw_log_map(&mapped_raw_log, file, true, 4096, 4096)) << aeron_errmsg();
const auto mapped_addr = mapped_raw_log.mapped_file.addr;
mapped_raw_log.mapped_file.addr = reinterpret_cast<void *>(-1);

ASSERT_EQ(-1, aeron_raw_log_close(&mapped_raw_log, file));
ASSERT_EQ(-1, aeron_raw_log_close(&mapped_raw_log, file)) << aeron_errmsg();
EXPECT_EQ((int64_t)file_length, aeron_file_length(file));

mapped_raw_log.mapped_file.addr = mapped_addr;
Expand All @@ -88,15 +97,15 @@ TEST_F(FileUtilTest, rawLogFreeShouldNotDeleteFileIfUnmapFails)
aeron_mapped_raw_log_t mapped_raw_log = {};
const char *file = "test_free_unmap_fails.log";
const size_t file_length = 16384;
ASSERT_EQ(0, aeron_raw_log_map(&mapped_raw_log, file, true, 4096, 4096));
ASSERT_EQ(0, aeron_raw_log_map(&mapped_raw_log, file, true, 4096, 4096)) << aeron_errmsg();
const auto mapped_addr = mapped_raw_log.mapped_file.addr;
mapped_raw_log.mapped_file.addr = reinterpret_cast<void *>(-1);

ASSERT_EQ(false, aeron_raw_log_free(&mapped_raw_log, file));
ASSERT_EQ(false, aeron_raw_log_free(&mapped_raw_log, file)) << aeron_errmsg();
EXPECT_EQ((int64_t)file_length, aeron_file_length(file));

mapped_raw_log.mapped_file.addr = mapped_addr;
ASSERT_EQ(true, aeron_raw_log_free(&mapped_raw_log, file));
ASSERT_EQ(true, aeron_raw_log_free(&mapped_raw_log, file)) << aeron_errmsg();
EXPECT_EQ((int64_t)-1, aeron_file_length(file));
}

Expand All @@ -111,7 +120,7 @@ TEST_F(FileUtilTest, resolveShouldConcatPaths)
#endif
char result[AERON_MAX_PATH];

ASSERT_LT(0, aeron_file_resolve(parent, child, result, sizeof(result)));
ASSERT_LT(0, aeron_file_resolve(parent, child, result, sizeof(result))) << aeron_errmsg();
ASSERT_STREQ(expected, result);
}

Expand All @@ -121,7 +130,115 @@ TEST_F(FileUtilTest, resolveShouldReportTruncatedPaths)
const char *child = "this_is_the_child";
char result[10];

ASSERT_EQ(-1, aeron_file_resolve(parent, child, result, sizeof(result)));
ASSERT_EQ(-1, aeron_file_resolve(parent, child, result, sizeof(result))) << aeron_errmsg();
ASSERT_EQ(EINVAL, aeron_errcode());
ASSERT_EQ('\0', result[sizeof(result) - 1]);
}


TEST_F(FileUtilTest, mapNewFileShouldHandleFilesBiggerThan2GB)
{
aeron_mapped_file_t mapped_file = {};
const char *file = "test_map_new_file_big_size.log";
const size_t file_length = 3221225472;
mapped_file.length = file_length;
ASSERT_EQ(0, aeron_map_new_file(&mapped_file, file, false)) << aeron_errmsg();

EXPECT_NE(nullptr, mapped_file.addr);
EXPECT_EQ(file_length, mapped_file.length);
EXPECT_EQ((int64_t)file_length, aeron_file_length(file));

ASSERT_EQ(0, aeron_unmap(&mapped_file)) << aeron_errmsg();

EXPECT_NE(nullptr, mapped_file.addr);
EXPECT_EQ(file_length, mapped_file.length);
EXPECT_EQ(0, remove(file));
EXPECT_EQ((int64_t)-1, aeron_file_length(file));
}

TEST_F(FileUtilTest, mapExistingFileShouldHandleFilesBiggerThan2GB)
{
aeron_mapped_file_t mapped_file = {};
const char *file = "test_map_existing_file_big_size.log";
const size_t file_length = 2500000000;
mapped_file.length = file_length;
ASSERT_EQ(0, aeron_map_new_file(&mapped_file, file, false)) << aeron_errmsg();
ASSERT_EQ(0, aeron_unmap(&mapped_file)) << aeron_errmsg();

ASSERT_EQ(0, aeron_map_existing_file(&mapped_file, file)) << aeron_errmsg();

EXPECT_NE(nullptr, mapped_file.addr);
EXPECT_EQ(file_length, mapped_file.length);
EXPECT_EQ((int64_t)file_length, aeron_file_length(file));

ASSERT_EQ(0, aeron_unmap(&mapped_file)) << aeron_errmsg();

EXPECT_NE(nullptr, mapped_file.addr);
EXPECT_EQ(file_length, mapped_file.length);
EXPECT_EQ(0, remove(file));
EXPECT_EQ((int64_t)-1, aeron_file_length(file));
}

TEST_F(FileUtilTest, rawLogMapShouldHandleMaxTermBufferLengthAndMaxPageSize)
{
aeron_mapped_raw_log_t mapped_raw_log = {};
const char *file = "test_raw_log_map_new_file_max_buffer_length.log";
const size_t file_length = 4294967296;
ASSERT_EQ(0, aeron_raw_log_map(&mapped_raw_log, file, true, AERON_LOGBUFFER_TERM_MAX_LENGTH, AERON_PAGE_MAX_SIZE)) << aeron_errmsg();

EXPECT_NE(nullptr, mapped_raw_log.mapped_file.addr);
EXPECT_EQ(file_length, mapped_raw_log.mapped_file.length);
EXPECT_EQ((int64_t)file_length, aeron_file_length(file));
EXPECT_EQ(AERON_LOGBUFFER_TERM_MAX_LENGTH, mapped_raw_log.term_length);
EXPECT_NE(nullptr, mapped_raw_log.log_meta_data.addr);
EXPECT_EQ(AERON_LOGBUFFER_META_DATA_LENGTH, mapped_raw_log.log_meta_data.length);
for (size_t i = 0; i < AERON_LOGBUFFER_PARTITION_COUNT; i++)
{
EXPECT_NE(nullptr, mapped_raw_log.term_buffers[i].addr);
EXPECT_EQ(AERON_LOGBUFFER_TERM_MAX_LENGTH, mapped_raw_log.term_buffers[i].length);
}

ASSERT_TRUE(aeron_raw_log_free(&mapped_raw_log, file)) << aeron_errmsg();

EXPECT_EQ(nullptr, mapped_raw_log.mapped_file.addr);
EXPECT_EQ((size_t)0, mapped_raw_log.mapped_file.length);
EXPECT_EQ((int64_t)-1, aeron_file_length(file));
}

TEST_F(FileUtilTest, rawLogMapExistingShouldHandleMaxTermBufferLength)
{
aeron_mapped_raw_log_t mapped_raw_log = {};
const char *file = "test_raw_log_map_existing_file_max_buffer_length.log";
const size_t file_length = 3223322624;
const size_t term_length = (size_t)AERON_LOGBUFFER_TERM_MAX_LENGTH;
const size_t page_size = 2 * 1024 * 1024;
ASSERT_EQ(0, aeron_raw_log_map(&mapped_raw_log, file, true, term_length, page_size)) << aeron_errmsg();
auto logbuffer_metadata = (aeron_logbuffer_metadata_t *)(mapped_raw_log.log_meta_data.addr);
logbuffer_metadata->term_length = (int32_t)term_length;
logbuffer_metadata->page_size = (int32_t)page_size;
ASSERT_EQ(0, aeron_unmap(&mapped_raw_log.mapped_file)) << aeron_errmsg();

mapped_raw_log = {};
ASSERT_EQ(0, aeron_raw_log_map_existing(&mapped_raw_log, file, false)) << aeron_errmsg();

EXPECT_NE(nullptr, mapped_raw_log.mapped_file.addr);
EXPECT_EQ(file_length, mapped_raw_log.mapped_file.length);
EXPECT_EQ((int64_t)file_length, aeron_file_length(file));
EXPECT_EQ(term_length, mapped_raw_log.term_length);
EXPECT_NE(nullptr, mapped_raw_log.log_meta_data.addr);
EXPECT_EQ(AERON_LOGBUFFER_META_DATA_LENGTH, mapped_raw_log.log_meta_data.length);
logbuffer_metadata = (aeron_logbuffer_metadata_t *)(mapped_raw_log.log_meta_data.addr);
EXPECT_EQ(term_length, (size_t)logbuffer_metadata->term_length);
EXPECT_EQ(page_size, (size_t)logbuffer_metadata->page_size);
for (size_t i = 0; i < AERON_LOGBUFFER_PARTITION_COUNT; i++)
{
EXPECT_NE(nullptr, mapped_raw_log.term_buffers[i].addr);
EXPECT_EQ(term_length, mapped_raw_log.term_buffers[i].length);
}

ASSERT_TRUE(aeron_raw_log_free(&mapped_raw_log, file)) << aeron_errmsg();

EXPECT_EQ(nullptr, mapped_raw_log.mapped_file.addr);
EXPECT_EQ((size_t)0, mapped_raw_log.mapped_file.length);
EXPECT_EQ((int64_t)-1, aeron_file_length(file));
}