From f89d27d118eff1fd2c4f04faf1667c6b52860751 Mon Sep 17 00:00:00 2001 From: thorfour Date: Tue, 26 Sep 2023 14:01:26 -0500 Subject: [PATCH 1/2] Remove ExperimentalArrow flag Arrow ingestion is now the only supported type of profile ingestion --- go.mod | 2 +- go.sum | 4 +- pkg/parca/parca.go | 5 - pkg/parcacol/ingest.go | 50 +++----- pkg/parcacol/ingest_test.go | 232 +++++++++++++++--------------------- 5 files changed, 116 insertions(+), 177 deletions(-) diff --git a/go.mod b/go.mod index 8afee25e90b..f72fefd1286 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( github.com/oklog/run v1.1.0 github.com/olekukonko/tablewriter v0.0.5 github.com/parquet-go/parquet-go v0.18.0 - github.com/polarsignals/frostdb v0.0.0-20230913161601-5b173f20ed72 + github.com/polarsignals/frostdb v0.0.0-20230926084601-c9100f2ac9c7 github.com/prometheus/client_golang v1.16.0 github.com/prometheus/common v0.44.0 github.com/prometheus/prometheus v0.47.0 diff --git a/go.sum b/go.sum index ab8ec85fd17..464c6178e5d 100644 --- a/go.sum +++ b/go.sum @@ -755,8 +755,8 @@ github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6J github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/polarsignals/frostdb v0.0.0-20230913161601-5b173f20ed72 h1:L3y6vPOusdrYnh5GKSH4DpQdwSeFcaJITNfOF13/1TQ= -github.com/polarsignals/frostdb v0.0.0-20230913161601-5b173f20ed72/go.mod h1:fT7khtHWo/cEHSi1PgAnBJFMoBwB4T2NG6YCqVY0T1s= +github.com/polarsignals/frostdb v0.0.0-20230926084601-c9100f2ac9c7 h1:w1H1GpnYgbk5WddP7IljZajFlwOIHqUShNXz0IDMGXI= +github.com/polarsignals/frostdb v0.0.0-20230926084601-c9100f2ac9c7/go.mod h1:fT7khtHWo/cEHSi1PgAnBJFMoBwB4T2NG6YCqVY0T1s= github.com/polarsignals/wal v0.0.0-20230809151629-4d4e3eac6d40 h1:3kD5F5BBrnv2SAnBV7LGrXAhhAL+pZRkk++D4wrAH2c= github.com/polarsignals/wal v0.0.0-20230809151629-4d4e3eac6d40/go.mod h1:EVDHAAe+7GQ33A1/x+/gE+sBPN4toQ0XG5RoLD49xr8= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= diff --git a/pkg/parca/parca.go b/pkg/parca/parca.go index cbbfde9d4ef..624f74ee829 100644 --- a/pkg/parca/parca.go +++ b/pkg/parca/parca.go @@ -119,8 +119,6 @@ type Flags struct { InsecureSkipVerify bool `kong:"help='Skip TLS certificate verification.'"` ExternalLabel map[string]string `kong:"help='Label(s) to attach to all profiles in scraper-only mode.'"` - ExperimentalArrow bool `default:"false" help:"EXPERIMENTAL: Enables Arrow ingestion, this will reduce CPU usage but will increase memory usage."` - Hidden FlagsHidden `embed:"" prefix:""` } @@ -192,9 +190,6 @@ func Run(ctx context.Context, logger log.Logger, reg *prometheus.Registry, flags } } - // Enable arrow ingestion - parcacol.ExperimentalArrow = flags.ExperimentalArrow - if flags.Port != "" { level.Warn(logger).Log("msg", "flag --port is deprecated, use --http-address instead") flags.HTTPAddress = flags.Port diff --git a/pkg/parcacol/ingest.go b/pkg/parcacol/ingest.go index 9fb7957d300..f2833051354 100644 --- a/pkg/parcacol/ingest.go +++ b/pkg/parcacol/ingest.go @@ -41,13 +41,10 @@ import ( "github.com/parca-dev/parca/pkg/profile" ) -var ExperimentalArrow bool - var ErrMissingNameLabel = errors.New("missing __name__ label") type Table interface { Schema() *dynparquet.Schema - Insert(context.Context, []byte) (tx uint64, err error) InsertRecord(context.Context, arrow.Record) (tx uint64, err error) } @@ -131,43 +128,26 @@ func (ing NormalizedIngester) Ingest(ctx context.Context, series []Series) error pBuf.Sort() - // Experimental feature that ingests profiles as arrow records. - if ExperimentalArrow { - // Read sorted rows into an arrow record - records, err := ParquetBufToArrowRecord(ctx, pBuf.Buffer, 0) - if err != nil { - return err - } - defer func() { - for _, record := range records { - record.Release() - } - }() - + // Read sorted rows into an arrow record + records, err := ParquetBufToArrowRecord(ctx, pBuf.Buffer, 0) + if err != nil { + return err + } + defer func() { for _, record := range records { - if record.NumRows() == 0 { - return nil - } - - if _, err := ing.table.InsertRecord(ctx, record); err != nil { - return err - } + record.Release() } - return nil - } - - buf := ing.bufferPool.Get().(*bytes.Buffer) - buf.Reset() - defer ing.bufferPool.Put(buf) + }() - if err := ing.schema.SerializeBuffer(buf, pBuf.Buffer); err != nil { - return err - } + for _, record := range records { + if record.NumRows() == 0 { + return nil + } - if _, err := ing.table.Insert(ctx, buf.Bytes()); err != nil { - return err + if _, err := ing.table.InsertRecord(ctx, record); err != nil { + return err + } } - return nil } diff --git a/pkg/parcacol/ingest_test.go b/pkg/parcacol/ingest_test.go index 7b754c07d87..93a325c9a67 100644 --- a/pkg/parcacol/ingest_test.go +++ b/pkg/parcacol/ingest_test.go @@ -96,81 +96,63 @@ func TestPprofToParquet(t *testing.T) { fileContent, err := os.ReadFile("../query/testdata/alloc_objects.pb.gz") require.NoError(t, err) - tests := map[string]struct { - arrow bool - }{ - "parquet": {false}, - "arrow": {true}, + table := &fakeTable{ + schema: schema, } - - for name, test := range tests { - t.Run(name, func(t *testing.T) { - if test.arrow { - ExperimentalArrow = true - t.Cleanup(func() { - ExperimentalArrow = false - }) - } - - table := &fakeTable{ - schema: schema, - } - req := &profilestorepb.WriteRawRequest{ - Series: []*profilestorepb.RawProfileSeries{{ - Labels: &profilestorepb.LabelSet{ - Labels: []*profilestorepb.Label{ - { - Name: "__name__", - Value: "memory", - }, - { - Name: "job", - Value: "default", - }, - }, + req := &profilestorepb.WriteRawRequest{ + Series: []*profilestorepb.RawProfileSeries{{ + Labels: &profilestorepb.LabelSet{ + Labels: []*profilestorepb.Label{ + { + Name: "__name__", + Value: "memory", }, - Samples: []*profilestorepb.RawSample{{ - RawProfile: fileContent, - }}, - }}, - } - err := NormalizedIngest( - ctx, - counter, - req, - logger, - table, - schema, - metastore, - &sync.Pool{ - New: func() interface{} { - return bytes.NewBuffer(nil) + { + Name: "job", + Value: "default", }, }, - true, - ) - require.NoError(t, err) + }, + Samples: []*profilestorepb.RawSample{{ + RawProfile: fileContent, + }}, + }}, + } + err = NormalizedIngest( + ctx, + counter, + req, + logger, + table, + schema, + metastore, + &sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(nil) + }, + }, + true, + ) + require.NoError(t, err) - for i, insert := range table.inserts { - serBuf, err := dynparquet.ReaderFromBytes(insert) - require.NoError(t, err) + for i, insert := range table.inserts { + serBuf, err := dynparquet.ReaderFromBytes(insert) + require.NoError(t, err) - rows := serBuf.Reader() - rowBuf := []parquet.Row{{}} - for { - _, err := rows.ReadRows(rowBuf) - if err == io.EOF { - break - } - if err != io.EOF { - if err != nil { - require.NoError(t, os.WriteFile(fmt.Sprintf("test-%d.parquet", i), insert, 0o777)) - } - require.NoError(t, err) - } + rows := serBuf.Reader() + rowBuf := []parquet.Row{{}} + for { + _, err := rows.ReadRows(rowBuf) + if err == io.EOF { + break + } + if err != io.EOF { + if err != nil { + require.NoError(t, os.WriteFile(fmt.Sprintf("test-%d.parquet", i), insert, 0o777)) } + require.NoError(t, err) } - }) + } } } @@ -205,81 +187,63 @@ func TestUncompressedPprofToParquet(t *testing.T) { require.NoError(t, err) require.NoError(t, r.Close()) - tests := map[string]struct { - arrow bool - }{ - "parquet": {false}, - "arrow": {true}, + table := &fakeTable{ + schema: schema, } - - for name, test := range tests { - t.Run(name, func(t *testing.T) { - if test.arrow { - ExperimentalArrow = true - t.Cleanup(func() { - ExperimentalArrow = false - }) - } - - table := &fakeTable{ - schema: schema, - } - req := &profilestorepb.WriteRawRequest{ - Series: []*profilestorepb.RawProfileSeries{{ - Labels: &profilestorepb.LabelSet{ - Labels: []*profilestorepb.Label{ - { - Name: "__name__", - Value: "memory", - }, - { - Name: "job", - Value: "default", - }, - }, + req := &profilestorepb.WriteRawRequest{ + Series: []*profilestorepb.RawProfileSeries{{ + Labels: &profilestorepb.LabelSet{ + Labels: []*profilestorepb.Label{ + { + Name: "__name__", + Value: "memory", }, - Samples: []*profilestorepb.RawSample{{ - RawProfile: fileContent, - }}, - }}, - } - err := NormalizedIngest( - ctx, - counter, - req, - logger, - table, - schema, - metastore, - &sync.Pool{ - New: func() interface{} { - return bytes.NewBuffer(nil) + { + Name: "job", + Value: "default", }, }, - true, - ) - require.NoError(t, err) + }, + Samples: []*profilestorepb.RawSample{{ + RawProfile: fileContent, + }}, + }}, + } + err = NormalizedIngest( + ctx, + counter, + req, + logger, + table, + schema, + metastore, + &sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(nil) + }, + }, + true, + ) + require.NoError(t, err) - for i, insert := range table.inserts { - serBuf, err := dynparquet.ReaderFromBytes(insert) - require.NoError(t, err) + for i, insert := range table.inserts { + serBuf, err := dynparquet.ReaderFromBytes(insert) + require.NoError(t, err) - rows := serBuf.Reader() - rowBuf := []parquet.Row{{}} - for { - _, err := rows.ReadRows(rowBuf) - if err == io.EOF { - break - } - if err != io.EOF { - if err != nil { - require.NoError(t, os.WriteFile(fmt.Sprintf("test-%d.parquet", i), insert, 0o777)) - } - require.NoError(t, err) - } + rows := serBuf.Reader() + rowBuf := []parquet.Row{{}} + for { + _, err := rows.ReadRows(rowBuf) + if err == io.EOF { + break + } + if err != io.EOF { + if err != nil { + require.NoError(t, os.WriteFile(fmt.Sprintf("test-%d.parquet", i), insert, 0o777)) } + require.NoError(t, err) } - }) + } } } From 269f350649297eec3174ba322a7809c536f0397c Mon Sep 17 00:00:00 2001 From: thorfour Date: Tue, 26 Sep 2023 14:04:10 -0500 Subject: [PATCH 2/2] README --- README.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/README.md b/README.md index c13a989024b..711ccea0501 100644 --- a/README.md +++ b/README.md @@ -160,9 +160,6 @@ Flags: --external-label=KEY=VALUE;... Label(s) to attach to all profiles in scraper-only mode. - --experimental-arrow EXPERIMENTAL: Enables Arrow ingestion, this - will reduce CPU usage but will increase memory - usage. ```