Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Micro-GC #473

Merged
merged 3 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion promql/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ module.exports.PSQLError = PSQLError
* @param stepMs {number}
*/
module.exports.rangeQuery = async (query, startMs, endMs, stepMs) => {
let resp
try {
const resp = await prometheus.pqlRangeQuery(query, startMs, endMs, stepMs, module.exports.getData)
resp = await prometheus.pqlRangeQuery(query, startMs, endMs, stepMs, module.exports.getData)
return JSON.parse(resp)
} catch (e) {
if (e instanceof prometheus.WasmError) {
Expand Down
7 changes: 5 additions & 2 deletions wasm_parts/go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
module wasm_parts

go 1.21

toolchain go1.21.3

replace (
cloud.google.com/go v0.65.0 => cloud.google.com/go v0.102.1
github.com/InfluxCommunity/influxdb3-go v0.2.0 => github.com/akvlad/influxdb3-go v0.0.1
Expand All @@ -10,6 +14,7 @@ replace (

require (
github.com/alecthomas/participle/v2 v2.1.0
github.com/metrico/micro-gc v0.0.4
github.com/pquerna/ffjson v0.0.0-20190930134022-aa0246cd15f7
github.com/prometheus/prometheus v1.8.2-0.20220714142409-b41e0750abf5
)
Expand Down Expand Up @@ -56,5 +61,3 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

go 1.20
88 changes: 88 additions & 0 deletions wasm_parts/go.sum

Large diffs are not rendered by default.

84 changes: 64 additions & 20 deletions wasm_parts/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package main
import (
"context"
"fmt"
gcContext "github.com/metrico/micro-gc/context"
"sync"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
Expand All @@ -18,6 +21,11 @@ import (
"wasm_parts/types"
)

//go:linkname get sync.(*Pool).Get
func get(p *sync.Pool) any {
panic("GET POOL")
}

var maxSamples = 5000000

type ctx struct {
Expand All @@ -35,13 +43,17 @@ func createCtx(id uint32) {

//export alloc
func alloc(id uint32, size int) *byte {
ctxId := gcContext.GetContextID()
gcContext.SetContext(id)
data[id].request = make([]byte, size)
gcContext.SetContext(ctxId)
return &data[id].request[0]
}

//export dealloc
func dealloc(id uint32) {
delete(data, id)
gcContext.ReleaseContext(id)
}

//export getCtxRequest
Expand All @@ -66,6 +78,10 @@ func getCtxResponseLen(id uint32) uint32 {

//export transpileTraceQL
func transpileTraceQL(id uint32) int {
ctxId := gcContext.GetContextID()
gcContext.SetContext(id)
defer gcContext.SetContext(ctxId)

request := types.TraceQLRequest{}
err := request.UnmarshalJSON(data[id].request)
if err != nil {
Expand Down Expand Up @@ -102,6 +118,8 @@ func transpileTraceQL(id uint32) int {
options = append(options, sql.STRING_OPT_INLINE_WITH)
}
str, err := sel.String(request.Ctx.CHSqlCtx, options...)
print(str)
print("\n")
if err != nil {
data[id].response = []byte(err.Error())
return 1
Expand All @@ -110,24 +128,30 @@ func transpileTraceQL(id uint32) int {
return 0
}

var eng *promql.Engine = nil
var engC = 0
var eng *promql.Engine = promql.NewEngine(promql.EngineOpts{
Logger: TestLogger{},
MaxSamples: maxSamples,
Timeout: time.Second * 30,
ActiveQueryTracker: nil,
LookbackDelta: 0,
NoStepSubqueryIntervalFn: nil,
EnableAtModifier: false,
EnableNegativeOffset: false,
})
var engC = func() *promql.Engine {
return promql.NewEngine(promql.EngineOpts{
Logger: TestLogger{},
MaxSamples: maxSamples,
Timeout: time.Second * 30,
ActiveQueryTracker: nil,
LookbackDelta: 0,
NoStepSubqueryIntervalFn: nil,
EnableAtModifier: false,
EnableNegativeOffset: false,
})
}()

func getEng() *promql.Engine {
if eng == nil || engC > 5 {
eng = promql.NewEngine(promql.EngineOpts{
Logger: TestLogger{},
MaxSamples: maxSamples,
Timeout: time.Second * 30,
ActiveQueryTracker: nil,
LookbackDelta: 0,
NoStepSubqueryIntervalFn: nil,
EnableAtModifier: false,
EnableNegativeOffset: false,
})
engC = 0
}
engC++
return eng
}

Expand All @@ -143,7 +167,11 @@ func stats() {

//export pqlRangeQuery
func pqlRangeQuery(id uint32, fromMS float64, toMS float64, stepMS float64) uint32 {
return pql(data[id], func() (promql.Query, error) {
ctxId := gcContext.GetContextID()
gcContext.SetContext(id)
defer gcContext.SetContext(ctxId)

return pql(id, data[id], func() (promql.Query, error) {
queriable := &TestQueryable{id: id, stepMs: int64(stepMS)}
return getEng().NewRangeQuery(
queriable,
Expand All @@ -158,7 +186,11 @@ func pqlRangeQuery(id uint32, fromMS float64, toMS float64, stepMS float64) uint

//export pqlInstantQuery
func pqlInstantQuery(id uint32, timeMS float64) uint32 {
return pql(data[id], func() (promql.Query, error) {
ctxId := gcContext.GetContextID()
gcContext.SetContext(id)
defer gcContext.SetContext(ctxId)

return pql(id, data[id], func() (promql.Query, error) {
queriable := &TestQueryable{id: id, stepMs: 15000}
return getEng().NewInstantQuery(
queriable,
Expand All @@ -170,6 +202,10 @@ func pqlInstantQuery(id uint32, timeMS float64) uint32 {

//export pqlSeries
func pqlSeries(id uint32) uint32 {
ctxId := gcContext.GetContextID()
gcContext.SetContext(id)
defer gcContext.SetContext(ctxId)

queriable := &TestQueryable{id: id, stepMs: 15000}
query, err := getEng().NewRangeQuery(
queriable,
Expand Down Expand Up @@ -220,7 +256,7 @@ func wrapErrorStr(err error) string {
return err.Error()
}

func pql(c *ctx, query func() (promql.Query, error)) uint32 {
func pql(id uint32, c *ctx, query func() (promql.Query, error)) uint32 {
rq, err := query()

if err != nil {
Expand All @@ -238,6 +274,10 @@ func pql(c *ctx, query func() (promql.Query, error)) uint32 {

c.response = []byte(matchersJSON)
c.onDataLoad = func(c *ctx) {
ctxId := gcContext.GetContextID()
gcContext.SetContext(id)
defer gcContext.SetContext(ctxId)

res := rq.Exec(context.Background())
c.response = []byte(writeResponse(res))
return
Expand Down Expand Up @@ -318,7 +358,11 @@ func writeVector(v promql.Vector) string {
return jsonBuilder.String()
}

func main() {}
func main() {
p := sync.Pool{}
a := p.Get()
_ = a
}

type TestLogger struct{}

Expand Down
9 changes: 4 additions & 5 deletions wasm_parts/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const { gunzipSync } = require('zlib')
class WasmError extends Error {}
module.exports.WasmError = WasmError

let counter = 0
let counter = 1

const getWasm = (() => {
const _Go = Go
Expand All @@ -20,6 +20,9 @@ const getWasm = (() => {
run = true
const _wasm = await WebAssembly.instantiate(
gunzipSync(fs.readFileSync(WASM_URL)), go.importObject)
setInterval(() => {
console.log(`WASM SIZE: ${Math.floor(wasm.exports.memory.buffer.byteLength / 1024 / 1024)} MB`)
}, 5000)
go.run(_wasm.instance)
wasm = _wasm.instance
wasm.exports.setMaxSamples(process.env.ADVANCED_PROMETHEUS_MAX_SAMPLES || 5000000)
Expand All @@ -28,10 +31,6 @@ const getWasm = (() => {
}
init()
return () => {
if (cnt >= 20 && !run) {
init()
}
cnt++
return wasm
}
})()
Expand Down
Binary file modified wasm_parts/main.wasm.gz
Binary file not shown.
2 changes: 1 addition & 1 deletion wasm_parts/traceql/parser/lexer_rules v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ var TraceQLLexerRulesV2 = []lexer.SimpleRule{
{"space", `\s+`},
}

var TraceQLLexerDefinition lexer.Definition = lexer.MustSimple(TraceQLLexerRulesV2)
var TraceQLLexerDefinition = func() lexer.Definition { return lexer.MustSimple(TraceQLLexerRulesV2) }
2 changes: 1 addition & 1 deletion wasm_parts/traceql/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

func Parse(str string) (*TraceQLScript, error) {
res := &TraceQLScript{}
parser, err := participle.Build[TraceQLScript](participle.Lexer(TraceQLLexerDefinition), participle.UseLookahead(2))
parser, err := participle.Build[TraceQLScript](participle.Lexer(TraceQLLexerDefinition()), participle.UseLookahead(2))
if err != nil {
return nil, err
}
Expand Down
Loading
Loading