Skip to content

Commit

Permalink
[exporter/doris] Second PR of New component: Doris Exporter (#34980)
Browse files Browse the repository at this point in the history
**Description:** 

Second PR of New component: Doris Exporter. Implementation of traces.

**Link to tracking Issue:** #33479 

**Testing:** 

**Documentation:**
  • Loading branch information
joker-star-l authored Sep 11, 2024
1 parent e3a44b8 commit 601a99d
Show file tree
Hide file tree
Showing 12 changed files with 693 additions and 23 deletions.
27 changes: 27 additions & 0 deletions .chloggen/doris-traces.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: dorisexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: traces implementation

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33479]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
9 changes: 6 additions & 3 deletions exporter/dorisexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@ The following configuration options are supported:
* `logs` (default = otel_logs) The table name for logs.
* `traces` (default = otel_traces) The table name for traces.
* `metrics` (default = otel_metrics) The table name for metrics.
* `create_schema` (default = true) Whether databases and tables are created automatically.
* `mysql_endpoint` The mysql protocol address to create the schema; ignored if `create_schema` is false.
* `create_schema` (default = true) Whether databases and tables are created automatically in doris.
* `mysql_endpoint` The mysql protocol address of doris. Only use to create the schema; ignored if `create_schema` is false.
* `history_days` (default = 0) Data older than these days will be deleted; ignored if `create_schema` is false. If set to 0, historical data will not be deleted.
* `create_history_days` (default = 0) The number of days in the history partition that was created when the table was created; ignored if `create_schema` is false. If `history_days` is not 0, `create_history_days` needs to be less than or equal to `history_days`.
* `replication_num` (default = 1) The number of replicas of the table; ignored if `create_schema` is false.
* `timezone` (default is the time zone of the opentelemetry collector) The time zone of doris.
* `timeout` (default = 5s) Time to wait per individual attempt to send data to a backend.
* `sending_queue` [details here](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/exporterhelper#configuration)
* `enabled` (default = true)
* `num_consumers` (default = 10) Number of consumers that dequeue batches; ignored if `enabled` is false.
Expand All @@ -41,6 +40,10 @@ The following configuration options are supported:
* `max_interval` (default = 30s) The upper bound on backoff; ignored if `enabled` is false.
* `max_elapsed_time` (default = 300s) The maximum amount of time spent trying to send a batch; ignored if `enabled` is false. If set to 0, the retries are never stopped.

The Doris exporter supports common [HTTP Configuration Settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/confighttp/README.md#http-configuration-settings), except for compression (all requests are uncompressed). As a consequence of supporting [confighttp](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/confighttp/README.md#http-configuration-settings), the Doris exporter also supports common [TLS Configuration Settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md#tls-configuration-settings).

The Doris exporter sets `timeout` (HTTP request timeout) to 60s by default. All other defaults are as defined by [confighttp](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/confighttp/README.md#http-configuration-settings).

Example:
```yaml
exporters:
Expand Down
52 changes: 47 additions & 5 deletions exporter/dorisexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,24 @@ package dorisexporter // import "github.com/open-telemetry/opentelemetry-collect

import (
"errors"
"fmt"
"regexp"
"time"

"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

type Config struct {
exporterhelper.TimeoutSettings `mapstructure:",squash"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
confighttp.ClientConfig `mapstructure:",squash"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`

// TableNames is the table name for logs, traces and metrics.
Table `mapstructure:"table"`

// Endpoint is the http stream load address.
Endpoint string `mapstructure:"endpoint"`
// Database is the database name.
Database string `mapstructure:"database"`
// Username is the authentication username.
Expand Down Expand Up @@ -93,5 +94,46 @@ func (cfg *Config) Validate() (err error) {
err = errors.Join(err, errors.New("metrics table name must be alphanumeric and underscore"))
}

_, errT := cfg.timeZone()
if errT != nil {
err = errors.Join(err, errors.New("invalid timezone"))
}

return err
}

const (
defaultStart = -2147483648 // IntMin
)

func (cfg *Config) startHistoryDays() int32 {
if cfg.HistoryDays == 0 {
return defaultStart
}
return -cfg.HistoryDays
}

func (cfg *Config) timeZone() (*time.Location, error) {
return time.LoadLocation(cfg.TimeZone)
}

const (
properties = `
PROPERTIES (
"replication_num" = "%d",
"enable_single_replica_compaction" = "true",
"compaction_policy" = "time_series",
"dynamic_partition.enable" = "true",
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "%d",
"dynamic_partition.history_partition_num" = "%d",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p"
)
`
)

func (cfg *Config) propertiesStr() string {
return fmt.Sprintf(properties, cfg.ReplicationNum, cfg.startHistoryDays(), cfg.CreateHistoryDays)
}
8 changes: 6 additions & 2 deletions exporter/dorisexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/confmap/confmaptest"
Expand All @@ -30,6 +31,10 @@ func TestLoadConfig(t *testing.T) {
defaultCfg.(*Config).Endpoint = "http://localhost:8030"
defaultCfg.(*Config).MySQLEndpoint = "localhost:9030"

httpClientConfig := confighttp.NewDefaultClientConfig()
httpClientConfig.Timeout = 5 * time.Second
httpClientConfig.Endpoint = "http://localhost:8030"

tests := []struct {
id component.ID
expected component.Config
Expand All @@ -41,7 +46,7 @@ func TestLoadConfig(t *testing.T) {
{
id: component.NewIDWithName(metadata.Type, "full"),
expected: &Config{
TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: 5 * time.Second},
ClientConfig: httpClientConfig,
BackOffConfig: configretry.BackOffConfig{
Enabled: true,
InitialInterval: 5 * time.Second,
Expand All @@ -60,7 +65,6 @@ func TestLoadConfig(t *testing.T) {
Traces: "otel_traces",
Metrics: "otel_metrics",
},
Endpoint: "http://localhost:8030",
Database: "otel",
Username: "admin",
Password: configopaque.String("admin"),
Expand Down
117 changes: 117 additions & 0 deletions exporter/dorisexporter/exporter_common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package dorisexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dorisexporter"

import (
"bytes"
"context"
"database/sql"
"fmt"
"net/http"
"time"

_ "github.com/go-sql-driver/mysql" // for register database driver
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
)

const timeFormat = "2006-01-02 15:04:05.999999"

type commonExporter struct {
component.TelemetrySettings

client *http.Client

logger *zap.Logger
cfg *Config
timeZone *time.Location
}

func newExporter(logger *zap.Logger, cfg *Config, set component.TelemetrySettings) *commonExporter {
// There won't be an error because it's already been validated in the Config.Validate method.
timeZone, _ := cfg.timeZone()

return &commonExporter{
TelemetrySettings: set,
logger: logger,
cfg: cfg,
timeZone: timeZone,
}
}

func (e *commonExporter) formatTime(t time.Time) string {
return t.In(e.timeZone).Format(timeFormat)
}

type streamLoadResponse struct {
TxnID int64
Label string
Status string
ExistingJobStatus string
Message string
NumberTotalRows int64
NumberLoadedRows int64
NumberFilteredRows int64
NumberUnselectedRows int64
LoadBytes int64
LoadTimeMs int64
BeginTxnTimeMs int64
StreamLoadPutTimeMs int64
ReadDataTimeMs int64
WriteDataTimeMs int64
CommitAndPublishTimeMs int64
ErrorURL string
}

func (r *streamLoadResponse) success() bool {
return r.Status == "Success" || r.Status == "Publish Timeout"
}

func streamLoadURL(address string, db string, table string) string {
return address + "/api/" + db + "/" + table + "/_stream_load"
}

func streamLoadRequest(ctx context.Context, cfg *Config, table string, data []byte) (*http.Request, error) {
url := streamLoadURL(cfg.Endpoint, cfg.Database, table)
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewBuffer(data))
if err != nil {
return nil, err
}

req.Header.Set("format", "json")
req.Header.Set("Expect", "100-continue")
req.Header.Set("strip_outer_array", "true")
req.SetBasicAuth(cfg.Username, string(cfg.Password))

return req, nil
}

func createDorisHTTPClient(ctx context.Context, cfg *Config, host component.Host, settings component.TelemetrySettings) (*http.Client, error) {
client, err := cfg.ClientConfig.ToClient(ctx, host, settings)
if err != nil {
return nil, err
}

client.CheckRedirect = func(req *http.Request, _ []*http.Request) error {
req.SetBasicAuth(cfg.Username, string(cfg.Password))
return nil
}

return client, nil
}

func createDorisMySQLClient(cfg *Config) (*sql.DB, error) {
dsn := fmt.Sprintf("%s:%s@tcp(%s)/mysql", cfg.Username, string(cfg.Password), cfg.MySQLEndpoint)
conn, err := sql.Open("mysql", dsn)
return conn, err
}

func createAndUseDatabase(ctx context.Context, conn *sql.DB, cfg *Config) error {
_, err := conn.ExecContext(ctx, "CREATE DATABASE IF NOT EXISTS "+cfg.Database)
if err != nil {
return err
}
_, err = conn.ExecContext(ctx, "USE "+cfg.Database)
return err
}
53 changes: 53 additions & 0 deletions exporter/dorisexporter/exporter_common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package dorisexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dorisexporter"

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/otel/metric"
)

var testTelemetrySettings = component.TelemetrySettings{
LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider {
return nil
},
}

func TestNewCommonExporter(t *testing.T) {
cfg := createDefaultConfig().(*Config)
exporter := newExporter(nil, cfg, testTelemetrySettings)
require.NotNil(t, exporter)
}

func TestCommonExporter_FormatTime(t *testing.T) {
cfg := createDefaultConfig().(*Config)
exporter := newExporter(nil, cfg, testTelemetrySettings)
require.NotNil(t, exporter)

now := time.Date(2024, 1, 1, 0, 0, 0, 1000, time.Local)
require.Equal(t, "2024-01-01 00:00:00.000001", exporter.formatTime(now))
}

func TestStreamLoadResponse_Success(t *testing.T) {
resp := &streamLoadResponse{
Status: "Success",
}
require.True(t, resp.success())

resp.Status = "Publish Timeout"
require.True(t, resp.success())

resp.Status = "Fail"
require.False(t, resp.success())
}

func TestStreamLoadUrl(t *testing.T) {
url := streamLoadURL("http://doris:8030", "otel", "otel_logs")
require.Equal(t, "http://doris:8030/api/otel/otel_logs/_stream_load", url)
}
Loading

0 comments on commit 601a99d

Please sign in to comment.