Skip to content

Commit

Permalink
Merge pull request #1 from grafana/salvacorts/add-secondary-index-lab…
Browse files Browse the repository at this point in the history
…els-to-push-request

Add secondary index labels to push request
  • Loading branch information
salvacorts authored Mar 10, 2023
2 parents e2ac2d5 + df84161 commit 2cce257
Show file tree
Hide file tree
Showing 12 changed files with 313 additions and 76 deletions.
4 changes: 4 additions & 0 deletions integration/loki_micro_services_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,22 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
{
strconv.FormatInt(now.Add(-45*time.Minute).UnixNano(), 10),
"lineA",
"",
},
{
strconv.FormatInt(now.Add(-45*time.Minute).UnixNano(), 10),
"lineB",
"",
},
{
strconv.FormatInt(now.Add(-time.Minute).UnixNano(), 10),
"lineC",
"",
},
{
strconv.FormatInt(now.Add(-time.Minute).UnixNano(), 10),
"lineD",
"",
},
},
})
Expand Down
27 changes: 23 additions & 4 deletions pkg/loghttp/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ func init() {

// Entry represents a log entry. It includes a log message and the time it occurred at.
type Entry struct {
Timestamp time.Time
Line string
Timestamp time.Time
Line string
IndexLabels string
}

func (e *Entry) UnmarshalJSON(data []byte) error {
Expand Down Expand Up @@ -46,6 +47,13 @@ func (e *Entry) UnmarshalJSON(data []byte) error {
return
}
e.Line = v
case 2: // indexLabels
il, err := jsonparser.ParseString(value)
if err != nil {
parseError = err
return
}
e.IndexLabels = il
}
i++
})
Expand All @@ -67,6 +75,7 @@ func (sliceEntryDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
i := 0
var ts time.Time
var line string
var indexLabels string
ok := iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool {
var ok bool
switch i {
Expand All @@ -81,15 +90,23 @@ func (sliceEntryDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
return false
}
return true
case 2:
indexLabels = iter.ReadString()
i++
if iter.Error != nil {
return false
}
return true
default:
iter.ReportError("error reading entry", "array must contains 2 values")
return false
}
})
if ok {
*((*[]Entry)(ptr)) = append(*((*[]Entry)(ptr)), Entry{
Timestamp: ts,
Line: line,
Timestamp: ts,
Line: line,
IndexLabels: indexLabels,
})
return true
}
Expand Down Expand Up @@ -126,6 +143,8 @@ func (EntryEncoder) Encode(ptr unsafe.Pointer, stream *jsoniter.Stream) {
stream.WriteRaw(`"`)
stream.WriteMore()
stream.WriteStringWithHTMLEscaped(e.Line)
stream.WriteMore()
stream.WriteString(e.IndexLabels)
stream.WriteArrayEnd()
}

Expand Down
117 changes: 87 additions & 30 deletions pkg/push/push.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/push/push.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ message EntryAdapter {
(gogoproto.jsontag) = "ts"
];
string line = 2 [(gogoproto.jsontag) = "line"];
string indexLabels = 3 [(gogoproto.jsontag) = "indexLabels"];
}
51 changes: 49 additions & 2 deletions pkg/push/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ type Stream struct {

// Entry is a log entry with a timestamp.
type Entry struct {
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"`
Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"`
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"`
Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"`
IndexLabels string `protobuf:"bytes,3,opt,name=indexLabels,proto3" json:"indexLabels"`
}

func (m *Stream) Marshal() (dAtA []byte, err error) {
Expand Down Expand Up @@ -90,6 +91,13 @@ func (m *Entry) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
if len(m.IndexLabels) > 0 {
i -= len(m.IndexLabels)
copy(dAtA[i:], m.IndexLabels)
i = encodeVarintPush(dAtA, i, uint64(len(m.IndexLabels)))
i--
dAtA[i] = 0x1a
}
if len(m.Line) > 0 {
i -= len(m.Line)
copy(dAtA[i:], m.Line)
Expand Down Expand Up @@ -341,6 +349,38 @@ func (m *Entry) Unmarshal(dAtA []byte) error {
}
m.Line = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field IndexLabels", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthPush
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthPush
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.IndexLabels = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipPush(dAtA[iNdEx:])
Expand Down Expand Up @@ -400,6 +440,10 @@ func (m *Entry) Size() (n int) {
if l > 0 {
n += 1 + l + sovPush(uint64(l))
}
l = len(m.IndexLabels)
if l > 0 {
n += 1 + l + sovPush(uint64(l))
}
return n
}

Expand Down Expand Up @@ -461,5 +505,8 @@ func (m *Entry) Equal(that interface{}) bool {
if m.Line != that1.Line {
return false
}
if m.IndexLabels != that1.IndexLabels {
return false
}
return true
}
6 changes: 3 additions & 3 deletions pkg/querier/queryrange/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,22 +1091,22 @@ var (
"test": "test"
},
"values":[
[ "123456789012345", "super line" ]
[ "123456789012345", "super line", "" ]
]
},
{
"stream": {
"test": "test2"
},
"values":[
[ "123456789012346", "super line2" ]
[ "123456789012346", "super line2", "" ]
]
}
]
}
}`
streamsStringLegacy = `{
` + statsResultString + `"streams":[{"labels":"{test=\"test\"}","entries":[{"ts":"1970-01-02T10:17:36.789012345Z","line":"super line"}]},{"labels":"{test=\"test2\"}","entries":[{"ts":"1970-01-02T10:17:36.789012346Z","line":"super line2"}]}]}`
` + statsResultString + `"streams":[{"labels":"{test=\"test\"}","entries":[{"ts":"1970-01-02T10:17:36.789012345Z","line":"super line","indexLabels":""}]},{"labels":"{test=\"test2\"}","entries":[{"ts":"1970-01-02T10:17:36.789012346Z","line":"super line2","indexLabels":""}]}]}`
logStreams = []logproto.Stream{
{
Labels: `{test="test"}`,
Expand Down
Loading

0 comments on commit 2cce257

Please sign in to comment.