Skip to content

Commit

Permalink
Merge pull request #473 from metrico/micro_gc
Browse files Browse the repository at this point in the history
Micro-GC
  • Loading branch information
akvlad authored Mar 11, 2024
2 parents c53d66e + 1d30e44 commit c2640d1
Show file tree
Hide file tree
Showing 9 changed files with 753 additions and 108 deletions.
3 changes: 2 additions & 1 deletion promql/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ module.exports.PSQLError = PSQLError
* @param stepMs {number}
*/
module.exports.rangeQuery = async (query, startMs, endMs, stepMs) => {
let resp
try {
const resp = await prometheus.pqlRangeQuery(query, startMs, endMs, stepMs, module.exports.getData)
resp = await prometheus.pqlRangeQuery(query, startMs, endMs, stepMs, module.exports.getData)
return JSON.parse(resp)
} catch (e) {
if (e instanceof prometheus.WasmError) {
Expand Down
7 changes: 5 additions & 2 deletions wasm_parts/go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
module wasm_parts

go 1.21

toolchain go1.21.3

replace (
cloud.google.com/go v0.65.0 => cloud.google.com/go v0.102.1
github.com/InfluxCommunity/influxdb3-go v0.2.0 => github.com/akvlad/influxdb3-go v0.0.1
Expand All @@ -10,6 +14,7 @@ replace (

require (
github.com/alecthomas/participle/v2 v2.1.0
github.com/metrico/micro-gc v0.0.4
github.com/pquerna/ffjson v0.0.0-20190930134022-aa0246cd15f7
github.com/prometheus/prometheus v1.8.2-0.20220714142409-b41e0750abf5
)
Expand Down Expand Up @@ -56,5 +61,3 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

go 1.20
88 changes: 88 additions & 0 deletions wasm_parts/go.sum

Large diffs are not rendered by default.

84 changes: 64 additions & 20 deletions wasm_parts/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package main
import (
"context"
"fmt"
gcContext "github.com/metrico/micro-gc/context"
"sync"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
Expand All @@ -18,6 +21,11 @@ import (
"wasm_parts/types"
)

//go:linkname get sync.(*Pool).Get
func get(p *sync.Pool) any {
panic("GET POOL")
}

var maxSamples = 5000000

type ctx struct {
Expand All @@ -35,13 +43,17 @@ func createCtx(id uint32) {

//export alloc
func alloc(id uint32, size int) *byte {
ctxId := gcContext.GetContextID()
gcContext.SetContext(id)
data[id].request = make([]byte, size)
gcContext.SetContext(ctxId)
return &data[id].request[0]
}

//export dealloc
func dealloc(id uint32) {
delete(data, id)
gcContext.ReleaseContext(id)
}

//export getCtxRequest
Expand All @@ -66,6 +78,10 @@ func getCtxResponseLen(id uint32) uint32 {

//export transpileTraceQL
func transpileTraceQL(id uint32) int {
ctxId := gcContext.GetContextID()
gcContext.SetContext(id)
defer gcContext.SetContext(ctxId)

request := types.TraceQLRequest{}
err := request.UnmarshalJSON(data[id].request)
if err != nil {
Expand Down Expand Up @@ -102,6 +118,8 @@ func transpileTraceQL(id uint32) int {
options = append(options, sql.STRING_OPT_INLINE_WITH)
}
str, err := sel.String(request.Ctx.CHSqlCtx, options...)
print(str)
print("\n")
if err != nil {
data[id].response = []byte(err.Error())
return 1
Expand All @@ -110,24 +128,30 @@ func transpileTraceQL(id uint32) int {
return 0
}

var eng *promql.Engine = nil
var engC = 0
var eng *promql.Engine = promql.NewEngine(promql.EngineOpts{
Logger: TestLogger{},
MaxSamples: maxSamples,
Timeout: time.Second * 30,
ActiveQueryTracker: nil,
LookbackDelta: 0,
NoStepSubqueryIntervalFn: nil,
EnableAtModifier: false,
EnableNegativeOffset: false,
})
var engC = func() *promql.Engine {
return promql.NewEngine(promql.EngineOpts{
Logger: TestLogger{},
MaxSamples: maxSamples,
Timeout: time.Second * 30,
ActiveQueryTracker: nil,
LookbackDelta: 0,
NoStepSubqueryIntervalFn: nil,
EnableAtModifier: false,
EnableNegativeOffset: false,
})
}()

func getEng() *promql.Engine {
if eng == nil || engC > 5 {
eng = promql.NewEngine(promql.EngineOpts{
Logger: TestLogger{},
MaxSamples: maxSamples,
Timeout: time.Second * 30,
ActiveQueryTracker: nil,
LookbackDelta: 0,
NoStepSubqueryIntervalFn: nil,
EnableAtModifier: false,
EnableNegativeOffset: false,
})
engC = 0
}
engC++
return eng
}

Expand All @@ -143,7 +167,11 @@ func stats() {

//export pqlRangeQuery
func pqlRangeQuery(id uint32, fromMS float64, toMS float64, stepMS float64) uint32 {
return pql(data[id], func() (promql.Query, error) {
ctxId := gcContext.GetContextID()
gcContext.SetContext(id)
defer gcContext.SetContext(ctxId)

return pql(id, data[id], func() (promql.Query, error) {
queriable := &TestQueryable{id: id, stepMs: int64(stepMS)}
return getEng().NewRangeQuery(
queriable,
Expand All @@ -158,7 +186,11 @@ func pqlRangeQuery(id uint32, fromMS float64, toMS float64, stepMS float64) uint

//export pqlInstantQuery
func pqlInstantQuery(id uint32, timeMS float64) uint32 {
return pql(data[id], func() (promql.Query, error) {
ctxId := gcContext.GetContextID()
gcContext.SetContext(id)
defer gcContext.SetContext(ctxId)

return pql(id, data[id], func() (promql.Query, error) {
queriable := &TestQueryable{id: id, stepMs: 15000}
return getEng().NewInstantQuery(
queriable,
Expand All @@ -170,6 +202,10 @@ func pqlInstantQuery(id uint32, timeMS float64) uint32 {

//export pqlSeries
func pqlSeries(id uint32) uint32 {
ctxId := gcContext.GetContextID()
gcContext.SetContext(id)
defer gcContext.SetContext(ctxId)

queriable := &TestQueryable{id: id, stepMs: 15000}
query, err := getEng().NewRangeQuery(
queriable,
Expand Down Expand Up @@ -220,7 +256,7 @@ func wrapErrorStr(err error) string {
return err.Error()
}

func pql(c *ctx, query func() (promql.Query, error)) uint32 {
func pql(id uint32, c *ctx, query func() (promql.Query, error)) uint32 {
rq, err := query()

if err != nil {
Expand All @@ -238,6 +274,10 @@ func pql(c *ctx, query func() (promql.Query, error)) uint32 {

c.response = []byte(matchersJSON)
c.onDataLoad = func(c *ctx) {
ctxId := gcContext.GetContextID()
gcContext.SetContext(id)
defer gcContext.SetContext(ctxId)

res := rq.Exec(context.Background())
c.response = []byte(writeResponse(res))
return
Expand Down Expand Up @@ -318,7 +358,11 @@ func writeVector(v promql.Vector) string {
return jsonBuilder.String()
}

func main() {}
func main() {
p := sync.Pool{}
a := p.Get()
_ = a
}

type TestLogger struct{}

Expand Down
9 changes: 4 additions & 5 deletions wasm_parts/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const { gunzipSync } = require('zlib')
class WasmError extends Error {}
module.exports.WasmError = WasmError

let counter = 0
let counter = 1

const getWasm = (() => {
const _Go = Go
Expand All @@ -20,6 +20,9 @@ const getWasm = (() => {
run = true
const _wasm = await WebAssembly.instantiate(
gunzipSync(fs.readFileSync(WASM_URL)), go.importObject)
setInterval(() => {
console.log(`WASM SIZE: ${Math.floor(wasm.exports.memory.buffer.byteLength / 1024 / 1024)} MB`)
}, 5000)
go.run(_wasm.instance)
wasm = _wasm.instance
wasm.exports.setMaxSamples(process.env.ADVANCED_PROMETHEUS_MAX_SAMPLES || 5000000)
Expand All @@ -28,10 +31,6 @@ const getWasm = (() => {
}
init()
return () => {
if (cnt >= 20 && !run) {
init()
}
cnt++
return wasm
}
})()
Expand Down
Binary file modified wasm_parts/main.wasm.gz
Binary file not shown.
2 changes: 1 addition & 1 deletion wasm_parts/traceql/parser/lexer_rules v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ var TraceQLLexerRulesV2 = []lexer.SimpleRule{
{"space", `\s+`},
}

var TraceQLLexerDefinition lexer.Definition = lexer.MustSimple(TraceQLLexerRulesV2)
var TraceQLLexerDefinition = func() lexer.Definition { return lexer.MustSimple(TraceQLLexerRulesV2) }
2 changes: 1 addition & 1 deletion wasm_parts/traceql/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

func Parse(str string) (*TraceQLScript, error) {
res := &TraceQLScript{}
parser, err := participle.Build[TraceQLScript](participle.Lexer(TraceQLLexerDefinition), participle.UseLookahead(2))
parser, err := participle.Build[TraceQLScript](participle.Lexer(TraceQLLexerDefinition()), participle.UseLookahead(2))
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit c2640d1

Please sign in to comment.