Skip to content

Commit

Permalink
Update storage write functions to normalize table and db names
Browse files Browse the repository at this point in the history
  • Loading branch information
Mike Keesey authored and mkeesey committed Nov 4, 2024
1 parent a6ed44f commit e1f07ad
Showing 1 changed file with 23 additions and 5 deletions.
28 changes: 23 additions & 5 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,29 +145,37 @@ func (s *LocalStore) WriteTimeSeries(ctx context.Context, ts []*prompb.TimeSerie
}

func (s *LocalStore) WriteOTLPLogs(ctx context.Context, database, table string, logs *otlp.Logs) error {
sanitizedDB := schema.NormalizeAdxIdentifier(database)
sanitizedTable := schema.NormalizeAdxIdentifier(table)

if sanitizedDB == "" || sanitizedTable == "" {
logger.Warnf("Invalid database or table name: %s.%s", database, table)
return nil // Do not retry - move on
}

enc := csvWriterPool.Get(8 * 1024).(*transform2.CSVWriter)
defer csvWriterPool.Put(enc)

key := gbp.Get(256)
defer gbp.Put(key)

if logger.IsDebug() {
logger.Debugf("Store received %d logs for %s.%s", len(logs.Logs), database, table)
logger.Debugf("Store received %d logs for %s.%s", len(logs.Logs), sanitizedDB, sanitizedTable)
for _, log := range logs.Logs {
if l, err := protojson.Marshal(log); err == nil {
logger.Debugf("Log: %s", l)
}
}
}

key = fmt.Appendf(key[:0], "%s_%s", database, table)
key = fmt.Appendf(key[:0], "%s_%s", sanitizedDB, sanitizedTable)

w, err := s.GetWAL(ctx, key)
if err != nil {
return err
}

metrics.SamplesStored.WithLabelValues(table).Add(float64(len(logs.Logs)))
metrics.SamplesStored.WithLabelValues(sanitizedTable).Add(float64(len(logs.Logs)))

enc.Reset()
if err := enc.MarshalLog(logs); err != nil {
Expand Down Expand Up @@ -211,14 +219,24 @@ func (s *LocalStore) WriteNativeLogs(ctx context.Context, logs *types.LogBatch)
noDestinationCount++
continue
}
sanitizedDB := schema.NormalizeAdxIdentifier(database)
if sanitizedDB == "" {
noDestinationCount++
continue
}

table, ok := log.Attributes[types.AttributeTableName].(string)
if !ok || table == "" {
noDestinationCount++
continue
}
sanitizedTable := schema.NormalizeAdxIdentifier(table)
if sanitizedTable == "" {
noDestinationCount++
continue
}

key = fmt.Appendf(key[:0], "%s_%s_", database, table)
key = fmt.Appendf(key[:0], "%s_%s_", sanitizedDB, sanitizedTable)
key = strconv.AppendUint(key, enc.SchemaHash(), 36)

wal, err := s.GetWAL(ctx, key)
Expand Down Expand Up @@ -351,7 +369,7 @@ func SegmentKey(dst []byte, labels []*prompb.Label, hash uint64) ([]byte, error)
return nil, fmt.Errorf("name label not found")
}

dst = append(dst, database...)
dst = schema.AppendNormalizeAdxIdentifier(dst, database)
dst = append(dst, delim...)
dst = schema.AppendNormalizeMetricName(dst, name)
dst = append(dst, delim...)
Expand Down

0 comments on commit e1f07ad

Please sign in to comment.