Skip to content

Commit 120c3aa

Browse files
committed
storage: Restore the original state of chunks
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
1 parent 110953a commit 120c3aa

File tree

1 file changed

+39
-9
lines changed

1 file changed

+39
-9
lines changed

src/flb_storage.c

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -747,6 +747,24 @@ void flb_storage_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_ch
747747
*fs_chunks = storage_st.chunks_fs;
748748
}
749749

750+
/* Replace '/', '\\' and ':' with '_' to make filename components safe */
751+
static inline void sanitize_name_component(const char *in, char *out, size_t out_sz)
752+
{
753+
size_t i;
754+
755+
if (out_sz == 0) {
756+
return;
757+
}
758+
759+
if (!in) {
760+
in = "no-tag";
761+
}
762+
763+
for (i = 0; i < out_sz - 1 && in[i] != '\0'; i++) {
764+
out[i] = (in[i] == '/' || in[i] == '\\' || in[i] == ':') ? '_' : in[i];
765+
}
766+
out[i] = '\0';
767+
}
750768

751769
static struct cio_stream *get_or_create_rejected_stream(struct flb_config *ctx)
752770
{
@@ -793,10 +811,13 @@ int flb_storage_quarantine_chunk(struct flb_config *ctx,
793811
#ifdef CIO_HAVE_BACKEND_FILESYSTEM
794812
struct cio_stream *dlq;
795813
void *buf = NULL;
814+
int was_up = 0;
796815
size_t size = 0;
797816
int err = 0;
798817
char name[256];
799818
struct cio_chunk *dst;
819+
char safe_tag[128];
820+
char safe_out[64];
800821

801822
if (!ctx || !src) {
802823
return -1;
@@ -806,26 +827,28 @@ int flb_storage_quarantine_chunk(struct flb_config *ctx,
806827
return -1;
807828
}
808829

809-
if (cio_chunk_is_up(src) != CIO_TRUE) {
830+
/* Remember original state and bring the chunk up if needed */
831+
was_up = (cio_chunk_is_up(src) == CIO_TRUE);
832+
if (!was_up) {
810833
if (cio_chunk_up_force(src) != CIO_OK) {
811834
flb_warn("[storage] cannot bring chunk up to copy into DLQ");
812835
return -1;
813836
}
814837
}
815838

839+
sanitize_name_component(tag, safe_tag, sizeof(safe_tag));
840+
sanitize_name_component(out_name ? out_name : "out", safe_out, sizeof(safe_out));
841+
842+
/* Compose a simple, unique-ish file name with sanitized pieces */
843+
snprintf(name, sizeof(name),
844+
"%s_%d_%s_%p.flb",
845+
safe_tag, status_code, safe_out, (void *) src);
846+
816847
if (cio_chunk_get_content_copy(src, &buf, &size) != CIO_OK || size == 0) {
817848
flb_warn("[storage] cannot read content for DLQ copy (size=%zu)", size);
818849
return -1;
819850
}
820851

821-
/* Compose a simple, unique-ish file name */
822-
snprintf(name, sizeof(name),
823-
"%s_%d_%s_%p.flb",
824-
tag ? tag : "no-tag",
825-
status_code,
826-
out_name ? out_name : "out",
827-
(void *) src);
828-
829852
/* Create + write the DLQ copy */
830853
dst = cio_chunk_open(ctx->cio, dlq, name, CIO_OPEN, size, &err);
831854
if (!dst) {
@@ -846,6 +869,13 @@ int flb_storage_quarantine_chunk(struct flb_config *ctx,
846869

847870
flb_info("[storage] quarantined rejected chunk into DLQ stream (bytes=%zu)", size);
848871

872+
/* Restore original state if we brought the chunk up */
873+
if (!was_up) {
874+
if (cio_chunk_down(src) != CIO_OK) {
875+
flb_debug("[storage] failed to bring chunk back down after DLQ copy");
876+
}
877+
}
878+
849879
return 0;
850880
#else
851881
FLB_UNUSED(ctx);

0 commit comments

Comments
 (0)