Skip to content

Commit

Permalink
feat: traces and logs
Browse files Browse the repository at this point in the history
  • Loading branch information
joker-star-l committed Aug 14, 2024
1 parent ef6375e commit 40c7422
Show file tree
Hide file tree
Showing 9 changed files with 1,076 additions and 3 deletions.
110 changes: 110 additions & 0 deletions exporter/dorisexporter/exporter_common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package dorisexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dorisexporter"

import (
"bytes"
"context"
"database/sql"
"fmt"
"net/http"
"time"

_ "github.com/go-sql-driver/mysql"
"go.uber.org/zap"
)

const timeFormat = "2006-01-02 15:04:05.999999"

type commonExporter struct {
client *http.Client

logger *zap.Logger
cfg *Config
timeZone *time.Location
}

func newExporter(logger *zap.Logger, cfg *Config) (*commonExporter, error) {
client := &http.Client{
CheckRedirect: func(req *http.Request, via []*http.Request) error {
req.SetBasicAuth(cfg.Username, string(cfg.Password))
return nil
},
}

timeZone, err := cfg.timeZone()
if err != nil {
return nil, err
}

return &commonExporter{
logger: logger,
cfg: cfg,
client: client,
timeZone: timeZone,
}, nil
}

func (e *commonExporter) formatTime(t time.Time) string {
return t.In(e.timeZone).Format(timeFormat)
}

type streamLoadResponse struct {
TxnId int64
Label string
Status string
ExistingJobStatus string
Message string
NumberTotalRows int64
NumberLoadedRows int64
NumberFilteredRows int64
NumberUnselectedRows int64
LoadBytes int64
LoadTimeMs int64
BeginTxnTimeMs int64
StreamLoadPutTimeMs int64
ReadDataTimeMs int64
WriteDataTimeMs int64
CommitAndPublishTimeMs int64
ErrorURL string
}

func (r *streamLoadResponse) success() bool {
return r.Status == "Success" || r.ExistingJobStatus == "Publish Timeout"
}

func streamLoadUrl(address string, db string, table string) string {
return address + "/api/" + db + "/" + table + "/_stream_load"
}

func streamLoadRequest(ctx context.Context, cfg *Config, table string, data []byte) (*http.Request, error) {
url := streamLoadUrl(cfg.Endpoint, cfg.Database, table)
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewBuffer(data))
if err != nil {
return nil, err
}

req.Header.Set("format", "json")
req.Header.Set("Expect", "100-continue")
req.Header.Set("strip_outer_array", "true")
req.Header.Set("timezone", cfg.TimeZone)
req.SetBasicAuth(cfg.Username, string(cfg.Password))

return req, nil
}

func createMySQLClient(cfg *Config) (*sql.DB, error) {
dsn := fmt.Sprintf("%s:%s@tcp(%s)/mysql", cfg.Username, cfg.Password, cfg.MySQLEndpoint)
conn, err := sql.Open("mysql", dsn)
return conn, err
}

func createAndUseDatabase(ctx context.Context, conn *sql.DB, cfg *Config) error {
_, err := conn.ExecContext(ctx, "CREATE DATABASE IF NOT EXISTS "+cfg.Database)
if err != nil {
return err
}
_, err = conn.ExecContext(ctx, "USE "+cfg.Database)
return err
}
105 changes: 105 additions & 0 deletions exporter/dorisexporter/exporter_common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package dorisexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dorisexporter"

import (
"context"
"encoding/json"
"fmt"
"testing"
"time"

"go.opentelemetry.io/collector/config/configopaque"
)

func TestStreamLoadUrl(t *testing.T) {
url := streamLoadUrl("http://doris:8030", "otel", "otel_logs")
fmt.Println(url)
}

func TestCreateMySQLClient(t *testing.T) {
config := createDefaultConfig().(*Config)
config.MySQLEndpoint = "127.0.0.1:9030"
config.Username = "root"
config.Password = ""

conn, err := createMySQLClient(config)
if err != nil {
t.Error(err)
return
}
defer conn.Close()
}

func TestCreateAndUseDatabase(t *testing.T) {
config := createDefaultConfig().(*Config)
config.MySQLEndpoint = "127.0.0.1:9030"
config.Username = "admin"
config.Password = "admin"
config.Database = "otel3"
config.HistoryDays = 3

conn, err := createMySQLClient(config)
if err != nil {
t.Error(err)
return
}
defer conn.Close()

ctx := context.Background()

err = createAndUseDatabase(ctx, conn, config)
if err != nil {
t.Error(err)
return
}

rows, err := conn.QueryContext(ctx, "SHOW TABLES;")
if err != nil {
t.Error(err)
return
}
defer rows.Close()

for rows.Next() {
var table string
err = rows.Scan(&table)
if err != nil {
t.Error(err)
return
}
t.Error(err)
}
}

func TestTraceJSON(t *testing.T) {
trace := &dTrace{}
marshal, err := json.Marshal(trace)
if err != nil {
t.Error(err)
return
}

fmt.Println(string(marshal))
}

func TestTimeFormat(t *testing.T) {
now := time.Now()
fmt.Printf("now.Format(TimeFormat): %v\n", now.Format(timeFormat))
}

func TestTimeZone(t *testing.T) {
timeZone, err := time.LoadLocation("America/New_York")
if err != nil {
t.Error(err)
return
}
fmt.Println(timeZone)
fmt.Println(time.Now().In(timeZone).Format(timeFormat))
}

func TestString(t *testing.T) {
s := configopaque.String("helloworld")
fmt.Println(string(s))
}
Loading

0 comments on commit 40c7422

Please sign in to comment.