Skip to content

Commit

Permalink
Chunksize in bytes instead of samples (#502)
Browse files Browse the repository at this point in the history
* chuckSizeBytes

* fix unittest

* docs

* per filipe's review
  • Loading branch information
Ariel Shtul authored Aug 26, 2020
1 parent 0a42025 commit 5896a50
Show file tree
Hide file tree
Showing 16 changed files with 89 additions and 83 deletions.
27 changes: 15 additions & 12 deletions docs/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,21 @@
Create a new time-series.

```sql
TS.CREATE key [RETENTION retentionTime] [UNCOMPRESSED] [LABELS label value..]
TS.CREATE key [RETENTION retentionTime] [UNCOMPRESSED] [CHUNK_SIZE size] [LABELS label value..]
```

* key - Key name for timeseries

Optional args:

* retentionTime - Maximum age for samples compared to last event time (in milliseconds)
* RETENTION - Maximum age for samples compared to last event time (in milliseconds)
* Default: The global retention secs configuration of the database (by default, `0`)
* When set to 0, the series is not trimmed at all
* labels - Set of label-value pairs that represent metadata labels of the key
* UNCOMPRESSED - since version 1.2, both timestamps and values are compressed by default.
Adding this flag will keep data in an uncompressed form. Compression not only saves
memory but usually improve performance due to lower number of memory accesses.
memory but usually improve performance due to lower number of memory accesses.
* CHUNK_SIZE - amount of memory, in bytes, allocated for data. Default: 4000.
* labels - Set of label-value pairs that represent metadata labels of the key

#### Complexity

Expand Down Expand Up @@ -67,19 +68,20 @@ TS.ALTER temperature:2:32 LABELS sensor_id 2 area_id 32 sub_area_id 15
Append (or create and append) a new sample to the series.

```sql
TS.ADD key timestamp value [RETENTION retentionTime] [UNCOMPRESSED] [LABELS label value..]
TS.ADD key timestamp value [RETENTION retentionTime] [UNCOMPRESSED] [CHUNK_SIZE size] [LABELS label value..]
```

* timestamp - UNIX timestamp of the sample. `*` can be used for automatic timestamp (using the system clock)
* value - numeric data value of the sample (double)

These arguments are optional because they can be set by TS.CREATE:

* retentionTime - Maximum age for samples compared to last event time (in milliseconds)
* RETENTION - Maximum age for samples compared to last event time (in milliseconds)
* Default: The global retention secs configuration of the database (by default, `0`)
* When set to 0, the series is not trimmed at all
* labels - Set of label-value pairs that represent metadata labels of the key
* UNCOMPRESSED - Changes data storage from compressed (by default) to uncompressed
* CHUNK_SIZE - amount of memory, in bytes, allocated for data. Default: 4000.
* labels - Set of label-value pairs that represent metadata labels of the key

If this command is used to add data to an existing timeseries, `retentionTime` and `labels` are ignored.

Expand Down Expand Up @@ -138,13 +140,13 @@ Creates a new sample that increments/decrements the latest sample's value.
> Note: TS.INCRBY/TS.DECRBY support updates for the latest sample.
```sql
TS.INCRBY key value [TIMESTAMP timestamp] [RETENTION retentionTime] [UNCOMPRESSED] [LABELS label value..]
TS.INCRBY key value [TIMESTAMP timestamp] [RETENTION retentionTime] [UNCOMPRESSED] [CHUNK_SIZE size] [LABELS label value..]
```

or

```sql
TS.DECRBY key value [TIMESTAMP timestamp] [RETENTION retentionTime] [UNCOMPRESSED] [LABELS label value..]
TS.DECRBY key value [TIMESTAMP timestamp] [RETENTION retentionTime] [UNCOMPRESSED] [CHUNK_SIZE size] [LABELS label value..]
```

This command can be used as a counter or gauge that automatically gets history as a time series.
Expand All @@ -154,12 +156,13 @@ This command can be used as a counter or gauge that automatically gets history a

Optional args:

* timestamp - UNIX timestamp of the sample. `*` can be used for automatic timestamp (using the system clock)
* retentionTime - Maximum age for samples compared to last event time (in milliseconds)
* TIMESTAMP - UNIX timestamp of the sample. `*` can be used for automatic timestamp (using the system clock)
* RETENTION - Maximum age for samples compared to last event time (in milliseconds)
* Default: The global retention secs configuration of the database (by default, `0`)
* When set to 0, the series is not trimmed at all
* labels - Set of label-value pairs that represent metadata labels of the key
* UNCOMPRESSED - Changes data storage from compressed (by default) to uncompressed
* CHUNK_SIZE - amount of memory, in bytes, allocated for data. Default: 4000.
* labels - Set of label-value pairs that represent metadata labels of the key

If this command is used to add data to an existing timeseries, `retentionTime` and `labels` are ignored.

Expand Down
22 changes: 12 additions & 10 deletions src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@

#include "rmutil/alloc.h"

Chunk_t *Uncompressed_NewChunk(size_t sampleCount) {
Chunk_t *Uncompressed_NewChunk(size_t size) {
Chunk *newChunk = (Chunk *)malloc(sizeof(Chunk));
newChunk->num_samples = 0;
newChunk->max_samples = sampleCount;
newChunk->samples = (Sample *)malloc(sizeof(Sample) * sampleCount);
newChunk->size = size;
newChunk->samples = (Sample *)malloc(size);

return newChunk;
}
Expand All @@ -32,21 +32,22 @@ Chunk_t *Uncompressed_SplitChunk(Chunk_t *chunk) {
size_t curNumSamples = curChunk->num_samples - split;

// create chunk and copy samples
Chunk *newChunk = Uncompressed_NewChunk(split);
Chunk *newChunk = Uncompressed_NewChunk(split * SAMPLE_SIZE);
for (size_t i = 0; i < split; ++i) {
Sample *sample = &curChunk->samples[curNumSamples + i];
Uncompressed_AddSample(newChunk, sample);
}

// update current chunk
curChunk->max_samples = curChunk->num_samples = curNumSamples;
curChunk->samples = realloc(curChunk->samples, curNumSamples * sizeof(Sample));
curChunk->num_samples = curNumSamples;
curChunk->size = curNumSamples * SAMPLE_SIZE;
curChunk->samples = realloc(curChunk->samples, curChunk->size);

return newChunk;
}

static int IsChunkFull(Chunk *chunk) {
return chunk->num_samples == chunk->max_samples;
return chunk->num_samples == chunk->size / SAMPLE_SIZE;
}

u_int64_t Uncompressed_NumOfSample(Chunk_t *chunk) {
Expand Down Expand Up @@ -95,8 +96,9 @@ ChunkResult Uncompressed_AddSample(Chunk_t *chunk, Sample *sample) {
* @param sample
*/
static void upsertChunk(Chunk *chunk, size_t idx, Sample *sample) {
if (chunk->num_samples == chunk->max_samples) {
chunk->samples = realloc(chunk->samples, ++chunk->max_samples * sizeof(Sample));
if (chunk->num_samples == chunk->size / SAMPLE_SIZE) {
chunk->size += sizeof(Sample);
chunk->samples = realloc(chunk->samples, chunk->size);
}
if (idx < chunk->num_samples) { // sample is not last
memmove(&chunk->samples[idx + 1],
Expand Down Expand Up @@ -182,7 +184,7 @@ void Uncompressed_FreeChunkIterator(ChunkIter_t *iter, bool rev) {

size_t Uncompressed_GetChunkSize(Chunk_t *chunk, bool includeStruct) {
Chunk *uncompChunk = chunk;
size_t size = uncompChunk->max_samples * sizeof(Sample);
size_t size = uncompChunk->size;
size += includeStruct ? sizeof(*uncompChunk) : 0;
return size;
}
2 changes: 1 addition & 1 deletion src/chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ typedef struct Chunk {
timestamp_t base_timestamp;
Sample *samples;
short num_samples;
short max_samples;
short size;
} Chunk;

typedef struct ChunkIterator {
Expand Down
10 changes: 5 additions & 5 deletions src/compressed_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
*********************/
Chunk_t *Compressed_NewChunk(size_t size) {
CompressedChunk *chunk = (CompressedChunk *)calloc(1, sizeof(CompressedChunk));
chunk->size = size * sizeof(Sample);
chunk->size = size;
chunk->data = (u_int64_t *)calloc(chunk->size, sizeof(char));
chunk->prevLeading = 32;
chunk->prevTrailing = 32;
Expand Down Expand Up @@ -76,8 +76,8 @@ Chunk_t *Compressed_SplitChunk(Chunk_t *chunk) {
size_t i = 0;
Sample sample;
ChunkIter_t *iter = Compressed_NewChunkIterator(curChunk, false);
CompressedChunk *newChunk1 = Compressed_NewChunk(curChunk->size / sizeof(Sample));
CompressedChunk *newChunk2 = Compressed_NewChunk(curChunk->size / sizeof(Sample));
CompressedChunk *newChunk1 = Compressed_NewChunk(curChunk->size);
CompressedChunk *newChunk2 = Compressed_NewChunk(curChunk->size);
for (; i < curNumSamples; ++i) {
Compressed_ChunkIteratorGetNext(iter, &sample);
ensureAddSample(newChunk1, &sample);
Expand All @@ -103,7 +103,7 @@ ChunkResult Compressed_UpsertSample(UpsertCtx *uCtx, int *size) {
ChunkResult nextRes = CR_OK;
CompressedChunk *oldChunk = (CompressedChunk *)uCtx->inChunk;

size_t newSize = oldChunk->size / sizeof(Sample);
size_t newSize = oldChunk->size;

CompressedChunk *newChunk = Compressed_NewChunk(newSize);
Compressed_Iterator *iter = Compressed_NewChunkIterator(oldChunk, false);
Expand Down Expand Up @@ -168,7 +168,7 @@ size_t Compressed_GetChunkSize(Chunk_t *chunk, bool includeStruct) {
static Chunk *decompressChunk(CompressedChunk *compressedChunk) {
Sample sample;
uint64_t numSamples = compressedChunk->count;
Chunk *uncompressedChunk = Uncompressed_NewChunk(numSamples);
Chunk *uncompressedChunk = Uncompressed_NewChunk(numSamples * SAMPLE_SIZE);

ChunkIter_t *iter = Compressed_NewChunkIterator(compressedChunk, 0);
for (uint64_t i = 0; i < numSamples; ++i) {
Expand Down
10 changes: 5 additions & 5 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,19 @@ int ReadConfig(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
TSGlobalConfig.retentionPolicy = RETENTION_TIME_DEFAULT;
}

if (argc > 1 && RMUtil_ArgIndex("MAX_SAMPLE_PER_CHUNK", argv, argc) >= 0) {
if (argc > 1 && RMUtil_ArgIndex("CHUNK_SIZE_BYTES", argv, argc) >= 0) {
if (RMUtil_ParseArgsAfter(
"MAX_SAMPLE_PER_CHUNK", argv, argc, "l", &TSGlobalConfig.maxSamplesPerChunk) !=
"CHUNK_SIZE_BYTES", argv, argc, "l", &TSGlobalConfig.chunkSizeBytes) !=
REDISMODULE_OK) {
return TSDB_ERROR;
}
} else {
TSGlobalConfig.maxSamplesPerChunk = SAMPLES_PER_CHUNK_DEFAULT_SECS;
TSGlobalConfig.chunkSizeBytes = Chunk_SIZE_BYTES_SECS;
}
RedisModule_Log(ctx,
"verbose",
"loaded default MAX_SAMPLE_PER_CHUNK policy: %lld \n",
TSGlobalConfig.maxSamplesPerChunk);
"loaded default CHUNK_SIZE_BYTES policy: %lld \n",
TSGlobalConfig.chunkSizeBytes);
return TSDB_OK;
}

Expand Down
2 changes: 1 addition & 1 deletion src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ typedef struct {
SimpleCompactionRule *compactionRules;
uint64_t compactionRulesCount;
long long retentionPolicy;
long long maxSamplesPerChunk;
long long chunkSizeBytes;
short options;
int hasGlobalConfig;
} TSConfig;
Expand Down
4 changes: 3 additions & 1 deletion src/consts.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#define TRUE 1
#define FALSE 0

#define SAMPLE_SIZE sizeof(Sample)

#define timestamp_t u_int64_t
#define api_timestamp_t u_int64_t
#define TSDB_ERR_TIMESTAMP_TOO_OLD -1
Expand All @@ -22,7 +24,7 @@

/* TS.CREATE Defaults */
#define RETENTION_TIME_DEFAULT 0LL
#define SAMPLES_PER_CHUNK_DEFAULT_SECS 256LL // fills one page 4096
#define Chunk_SIZE_BYTES_SECS 4096LL // fills one page 4096
#define SPLIT_FACTOR 1.2

/* TS.Range Aggregation types */
Expand Down
14 changes: 7 additions & 7 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ static int parseCreateArgs(RedisModuleCtx *ctx,
int argc,
CreateCtx *cCtx) {
cCtx->retentionTime = TSGlobalConfig.retentionPolicy;
cCtx->maxSamplesPerChunk = TSGlobalConfig.maxSamplesPerChunk;
cCtx->chunkSizeBytes = TSGlobalConfig.chunkSizeBytes;
cCtx->labelsCount = 0;
if (parseLabelsFromArgs(argv, argc, &cCtx->labelsCount, &cCtx->labels) == REDISMODULE_ERR) {
RTS_ReplyGeneralError(ctx, "TSDB: Couldn't parse LABELS");
Expand All @@ -123,13 +123,13 @@ static int parseCreateArgs(RedisModuleCtx *ctx,
}

if (RMUtil_ArgIndex("CHUNK_SIZE", argv, argc) > 0 &&
RMUtil_ParseArgsAfter("CHUNK_SIZE", argv, argc, "l", &cCtx->maxSamplesPerChunk) !=
RMUtil_ParseArgsAfter("CHUNK_SIZE", argv, argc, "l", &cCtx->chunkSizeBytes) !=
REDISMODULE_OK) {
RTS_ReplyGeneralError(ctx, "TSDB: Couldn't parse CHUNK_SIZE");
return REDISMODULE_ERR;
}

if (cCtx->maxSamplesPerChunk <= 0) {
if (cCtx->chunkSizeBytes <= 0) {
RTS_ReplyGeneralError(ctx, "TSDB: Couldn't parse CHUNK_SIZE");
return REDISMODULE_ERR;
}
Expand Down Expand Up @@ -290,8 +290,8 @@ int TSDB_info(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_ReplyWithLongLong(ctx, series->retentionTime);
RedisModule_ReplyWithSimpleString(ctx, "chunkCount");
RedisModule_ReplyWithLongLong(ctx, RedisModule_DictSize(series->chunks));
RedisModule_ReplyWithSimpleString(ctx, "maxSamplesPerChunk");
RedisModule_ReplyWithLongLong(ctx, series->maxSamplesPerChunk);
RedisModule_ReplyWithSimpleString(ctx, "chunkSize");
RedisModule_ReplyWithLongLong(ctx, series->chunkSizeBytes);

RedisModule_ReplyWithSimpleString(ctx, "labels");
ReplyWithSeriesLabels(ctx, series);
Expand Down Expand Up @@ -866,7 +866,7 @@ int TSDB_alter(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
}

if (RMUtil_ArgIndex("CHUNK_SIZE", argv, argc) > 0) {
series->maxSamplesPerChunk = cCtx.maxSamplesPerChunk;
series->chunkSizeBytes = cCtx.chunkSizeBytes;
}

if (RMUtil_ArgIndex("LABELS", argv, argc) > 0) {
Expand Down Expand Up @@ -1212,7 +1212,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
.mem_usage = SeriesMemUsage,
.free = FreeSeries };

SeriesType = RedisModule_CreateDataType(ctx, "TSDB-TYPE", TS_UNCOMPRESSED_VER, &tm);
SeriesType = RedisModule_CreateDataType(ctx, "TSDB-TYPE", TS_SIZE_RDB_VER, &tm);
if (SeriesType == NULL)
return REDISMODULE_ERR;
IndexInit();
Expand Down
9 changes: 6 additions & 3 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@
#include <rmutil/alloc.h>

void *series_rdb_load(RedisModuleIO *io, int encver) {
if (encver != TS_ENC_VER && encver != TS_UNCOMPRESSED_VER) {
if (encver < TS_ENC_VER && encver > TS_SIZE_RDB_VER) {
RedisModule_LogIOError(io, "error", "data is not in the correct encoding");
return NULL;
}
CreateCtx cCtx = { 0 };
RedisModuleString *keyName = RedisModule_LoadString(io);
cCtx.retentionTime = RedisModule_LoadUnsigned(io);
cCtx.maxSamplesPerChunk = RedisModule_LoadUnsigned(io);
cCtx.chunkSizeBytes = RedisModule_LoadUnsigned(io);
if (encver < 2) {
cCtx.chunkSizeBytes *= SAMPLE_SIZE;
}

if (encver >= TS_UNCOMPRESSED_VER) {
cCtx.options = RedisModule_LoadUnsigned(io);
Expand Down Expand Up @@ -85,7 +88,7 @@ void series_rdb_save(RedisModuleIO *io, void *value) {
Series *series = value;
RedisModule_SaveString(io, series->keyName);
RedisModule_SaveUnsigned(io, series->retentionTime);
RedisModule_SaveUnsigned(io, series->maxSamplesPerChunk);
RedisModule_SaveUnsigned(io, series->chunkSizeBytes);
RedisModule_SaveUnsigned(io, series->options);

RedisModule_SaveUnsigned(io, series->labelsCount);
Expand Down
1 change: 1 addition & 0 deletions src/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#define TS_ENC_VER 0
#define TS_UNCOMPRESSED_VER 1
#define TS_SIZE_RDB_VER 2

void *series_rdb_load(RedisModuleIO *io, int encver);
void series_rdb_save(RedisModuleIO *io, void *value);
Expand Down
11 changes: 5 additions & 6 deletions src/tsdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Series *NewSeries(RedisModuleString *keyName, CreateCtx *cCtx) {
Series *newSeries = (Series *)malloc(sizeof(Series));
newSeries->keyName = keyName;
newSeries->chunks = RedisModule_CreateDict(NULL);
newSeries->maxSamplesPerChunk = cCtx->maxSamplesPerChunk;
newSeries->chunkSizeBytes = cCtx->chunkSizeBytes;
newSeries->retentionTime = cCtx->retentionTime;
newSeries->srcKey = NULL;
newSeries->rules = NULL;
Expand All @@ -58,7 +58,7 @@ Series *NewSeries(RedisModuleString *keyName, CreateCtx *cCtx) {
} else {
newSeries->funcs = GetChunkClass(CHUNK_COMPRESSED);
}
Chunk_t *newChunk = newSeries->funcs->NewChunk(newSeries->maxSamplesPerChunk);
Chunk_t *newChunk = newSeries->funcs->NewChunk(newSeries->chunkSizeBytes);
dictOperator(newSeries->chunks, newChunk, 0, DICT_OP_SET);
newSeries->lastChunk = newChunk;
return newSeries;
Expand Down Expand Up @@ -283,8 +283,7 @@ int SeriesUpsertSample(Series *series, api_timestamp_t timestamp, double value)
}

// Split chunks
if (funcs->GetChunkSize(chunk, false) >
series->maxSamplesPerChunk * sizeof(Sample) * SPLIT_FACTOR) {
if (funcs->GetChunkSize(chunk, false) > series->chunkSizeBytes * SPLIT_FACTOR) {
Chunk_t *newChunk = funcs->SplitChunk(chunk);
if (newChunk == NULL) {
return REDISMODULE_ERR;
Expand Down Expand Up @@ -336,7 +335,7 @@ int SeriesAddSample(Series *series, api_timestamp_t timestamp, double value) {
// When a new chunk is created trim the series
SeriesTrim(series);

Chunk_t *newChunk = series->funcs->NewChunk(series->maxSamplesPerChunk);
Chunk_t *newChunk = series->funcs->NewChunk(series->chunkSizeBytes);
dictOperator(series->chunks, newChunk, timestamp, DICT_OP_SET);
ret = series->funcs->AddSample(newChunk, &sample);
series->lastChunk = newChunk;
Expand Down Expand Up @@ -495,7 +494,7 @@ int SeriesCreateRulesFromGlobalConfig(RedisModuleCtx *ctx,

CreateCtx cCtx = {
.retentionTime = rule->retentionSizeMillisec,
.maxSamplesPerChunk = TSGlobalConfig.maxSamplesPerChunk,
.chunkSizeBytes = TSGlobalConfig.chunkSizeBytes,
.labelsCount = compactedRuleLabelCount,
.labels = compactedLabels,
.options = TSGlobalConfig.options & SERIES_OPT_UNCOMPRESSED,
Expand Down
4 changes: 2 additions & 2 deletions src/tsdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ typedef struct CompactionRule {

typedef struct CreateCtx {
long long retentionTime;
long long maxSamplesPerChunk;
long long chunkSizeBytes;
size_t labelsCount;
Label *labels;
int options;
Expand All @@ -34,7 +34,7 @@ typedef struct Series {
RedisModuleDict* chunks;
Chunk_t *lastChunk;
uint64_t retentionTime;
short maxSamplesPerChunk;
short chunkSizeBytes;
short options;
CompactionRule *rules;
timestamp_t lastTimestamp;
Expand Down
Loading

0 comments on commit 5896a50

Please sign in to comment.