From 2a23bc7a594e2e518bea5df93adffb17fa3060cf Mon Sep 17 00:00:00 2001 From: Rory Crispin Date: Tue, 5 Sep 2023 10:39:19 +0100 Subject: [PATCH 1/4] ClickHouse patches --- .github/workflows/build-test-publish.yaml | 531 ++++++++++++++++++ cmd/otelcontribcol/processors_test.go | 4 + exporter/clickhouseexporter/exporter_logs.go | 213 +++++-- .../clickhouseexporter/exporter_logs_test.go | 33 +- .../clickhouseexporter/exporter_metrics.go | 8 +- .../clickhouseexporter/exporter_traces.go | 48 +- 6 files changed, 747 insertions(+), 90 deletions(-) create mode 100644 .github/workflows/build-test-publish.yaml diff --git a/.github/workflows/build-test-publish.yaml b/.github/workflows/build-test-publish.yaml new file mode 100644 index 000000000000..067128085d6c --- /dev/null +++ b/.github/workflows/build-test-publish.yaml @@ -0,0 +1,531 @@ +name: build-test-publish +on: + push: + branches: [main] + tags: + - "v[0-9]+.[0-9]+.[0-9]+*" + pull_request: +env: + TEST_RESULTS: testbed/tests/results/junit/results.xml + # See: https://github.com/actions/cache/issues/810#issuecomment-1222550359 + # Cache downloads for this workflow consistently run in under 1 minute + SEGMENT_DOWNLOAD_TIMEOUT_MINS: 5 + +# Do not cancel this workflow on main. See https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/16616 +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} + cancel-in-progress: true + +jobs: + setup-environment: + timeout-minutes: 30 + runs-on: ubuntu-latest + if: ${{ github.actor != 'dependabot[bot]' }} + steps: + - name: Checkout Repo + uses: actions/checkout@v3 + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: 1.21 + - name: Cache Go + id: go-cache + uses: actions/cache@v3 + with: + path: | + ~/go/bin + ~/go/pkg/mod + key: go-cache-${{ runner.os }}-${{ hashFiles('**/go.sum') }} + - name: Install dependencies + if: steps.go-cache.outputs.cache-hit != 'true' + run: make -j2 gomoddownload + - name: Install Tools + if: steps.go-cache.outputs.cache-hit != 'true' + run: make install-tools + check-collector-module-version: + runs-on: ubuntu-latest + needs: [setup-environment] + steps: + - name: Checkout Repo + uses: actions/checkout@v3 + - name: Check Collector Module Version + run: ./.github/workflows/scripts/check-collector-module-version.sh + lint-matrix: + strategy: + matrix: + group: + - receiver-0 + - receiver-1 + - processor + - exporter + - extension + - connector + - internal + - other + runs-on: ubuntu-latest + needs: [setup-environment] + steps: + - name: Checkout Repo + uses: actions/checkout@v3 + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: 1.21 + - name: Cache Go + id: go-cache + uses: actions/cache@v3 + with: + path: | + ~/go/bin + ~/go/pkg/mod + key: go-cache-${{ runner.os }}-${{ hashFiles('**/go.sum') }} + - name: Install dependencies + if: steps.go-cache.outputs.cache-hit != 'true' + run: make -j2 gomoddownload + - name: Install Tools + if: steps.go-cache.outputs.cache-hit != 'true' + run: make install-tools + - name: Cache Lint Build + uses: actions/cache@v3 + with: + path: ~/.cache/go-build + key: go-lint-build-${{ matrix.group }}-${{ runner.os }}-${{ hashFiles('**/go.sum') }} + - name: Lint + run: make -j2 golint GROUP=${{ matrix.group }} + lint: + if: ${{ github.actor != 'dependabot[bot]' && always() }} + runs-on: ubuntu-latest + needs: [setup-environment, lint-matrix] + steps: + - name: Print result + run: echo ${{ needs.lint-matrix.result }} + - name: Interpret result + run: | + if [[ success == ${{ needs.lint-matrix.result }} ]] + then + echo "All matrix jobs passed!" + else + echo "One or more matrix jobs failed." + false + fi + checks: + runs-on: ubuntu-latest + needs: [setup-environment] + steps: + - name: Checkout Repo + uses: actions/checkout@v3 + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: 1.21 + - name: Cache Go + id: go-cache + uses: actions/cache@v3 + with: + path: | + ~/go/bin + ~/go/pkg/mod + key: go-cache-${{ runner.os }}-${{ hashFiles('**/go.sum') }} + - name: Install dependencies + if: steps.go-cache.outputs.cache-hit != 'true' + run: make -j2 gomoddownload + - name: Install Tools + if: steps.go-cache.outputs.cache-hit != 'true' + run: make install-tools + - name: CheckDoc + run: make checkdoc + - name: Porto + run: | + make -j2 goporto + git diff --exit-code || (echo 'Porto links are out of date, please run "make goporto" and commit the changes in this PR.' && exit 1) + - name: crosslink + run: | + make crosslink + git diff --exit-code || (echo 'Replace statements are out of date, please run "make crosslink" and commit the changes in this PR.' && exit 1) + - name: Check for go mod dependency changes + run: | + make gotidy + git diff --exit-code || (echo 'go.mod/go.sum deps changes detected, please run "make gotidy" and commit the changes in this PR.' && exit 1) + - name: Gen genotelcontribcol + run: | + make genotelcontribcol + git diff -s --exit-code || (echo 'Generated code is out of date, please run "make genotelcontribcol" and commit the changes in this PR.' && exit 1) + - name: Gen genoteltestbedcol + run: | + make genoteltestbedcol + git diff -s --exit-code || (echo 'Generated code is out of date, please run "make genoteltestbedcol" and commit the changes in this PR.' && exit 1) + - name: CodeGen + run: | + make -j2 generate + git diff --exit-code ':!*go.sum' || (echo 'Generated code is out of date, please run "make generate" and commit the changes in this PR.' && exit 1) + - name: Check gendependabot + run: | + make -j2 gendependabot + git diff --exit-code ':!*go.sum' || (echo 'dependabot.yml is out of date, please run "make gendependabot" and commit the changes in this PR.' && exit 1) + - name: MultimodVerify + run: make multimod-verify + - name: Components dropdown in issue templates + run: | + make generate-gh-issue-templates + git diff --exit-code '.github/ISSUE_TEMPLATE' || (echo 'Dropdowns in issue templates are out of date, please run "make generate-gh-issue-templates" and commit the changes in this PR.' && exit 1) + unittest-matrix: + strategy: + matrix: + go-version: ["1.21", "1.20"] # 1.20 is interpreted as 1.2 without quotes + group: + - receiver-0 + - receiver-1 + - processor + - exporter + - extension + - connector + - internal + - other + runs-on: ubuntu-latest + needs: [setup-environment] + steps: + - name: Collect Workflow Telemetry + if: always() + uses: runforesight/foresight-workflow-kit-action@v1 + with: + api_key: ${{ secrets.FORESIGHT_API_KEY }} + - name: Checkout Repo + uses: actions/checkout@v3 + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: ${{ matrix.go-version }} + - name: Cache Go + id: go-cache + uses: actions/cache@v3 + with: + path: | + ~/go/bin + ~/go/pkg/mod + key: go-cache-${{ runner.os }}-${{ hashFiles('**/go.sum') }} + - name: Install dependencies + if: steps.go-cache.outputs.cache-hit != 'true' + run: make -j2 gomoddownload + - name: Install Tools + if: steps.go-cache.outputs.cache-hit != 'true' + run: make install-tools + - name: Cache Test Build + uses: actions/cache@v3 + with: + path: ~/.cache/go-build + key: go-test-build-${{ runner.os }}-${{ matrix.go-version }}-${{ hashFiles('**/go.sum') }} + - name: Run Unit Tests + run: make gotest GROUP=${{ matrix.group }} + unittest: + if: ${{ github.actor != 'dependabot[bot]' && always() }} + strategy: + matrix: + go-version: ["1.21", "1.20"] # 1.20 is interpreted as 1.2 without quotes + runs-on: ubuntu-latest + needs: [setup-environment, unittest-matrix] + steps: + - name: Print result + run: echo ${{ needs.unittest-matrix.result }} + - name: Interpret result + run: | + if [[ success == ${{ needs.unittest-matrix.result }} ]] + then + echo "All matrix jobs passed!" + else + echo "One or more matrix jobs failed." + false + fi + + integration-tests: + runs-on: ubuntu-latest + needs: [setup-environment] + steps: + - name: Checkout Repo + uses: actions/checkout@v3 + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: 1.21 + - name: Cache Go + id: go-cache + uses: actions/cache@v3 + with: + path: | + ~/go/bin + ~/go/pkg/mod + key: go-cache-${{ runner.os }}-${{ hashFiles('**/go.sum') }} + - name: Run Integration Tests + run: make integration-tests-with-cover + + correctness-traces: + runs-on: ubuntu-latest + needs: [setup-environment] + steps: + - name: Checkout Repo + uses: actions/checkout@v3 + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: 1.21 + - name: Cache Go + id: go-cache + uses: actions/cache@v3 + with: + path: | + ~/go/bin + ~/go/pkg/mod + key: go-cache-${{ runner.os }}-${{ hashFiles('**/go.sum') }} + - name: Install dependencies + if: steps.go-cache.outputs.cache-hit != 'true' + run: make -j2 gomoddownload + - name: Install Tools + if: steps.go-cache.outputs.cache-hit != 'true' + run: make install-tools + - name: Correctness + run: make -C testbed run-correctness-traces-tests + + correctness-metrics: + runs-on: ubuntu-latest + needs: [setup-environment] + steps: + - name: Checkout Repo + uses: actions/checkout@v3 + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: 1.21 + - name: Cache Go + id: go-cache + uses: actions/cache@v3 + with: + path: | + ~/go/bin + ~/go/pkg/mod + key: go-cache-${{ runner.os }}-${{ hashFiles('**/go.sum') }} + - name: Install dependencies + if: steps.go-cache.outputs.cache-hit != 'true' + run: make -j2 gomoddownload + - name: Install Tools + if: steps.go-cache.outputs.cache-hit != 'true' + run: make install-tools + - name: Correctness + run: make -C testbed run-correctness-metrics-tests + + cross-compile: + runs-on: ubuntu-latest + needs: [unittest, integration-tests, lint] + strategy: + matrix: + os: + #- darwin + - linux + #- windows + arch: + #- 386 + - amd64 + #- arm + #- arm64 + #- ppc64le + exclude: + - os: darwin + arch: 386 + - os: darwin + arch: arm + - os: darwin + arch: ppc64le + - os: windows + arch: arm + - os: windows + arch: arm64 + - os: windows + arch: ppc64le + steps: + - name: Checkout Repo + uses: actions/checkout@v3 + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: 1.21 + - name: Cache Go + id: go-cache + uses: actions/cache@v3 + with: + path: | + ~/go/bin + ~/go/pkg/mod + key: go-cache-${{ runner.os }}-${{ hashFiles('**/go.sum') }} + - name: Install dependencies + if: steps.go-cache.outputs.cache-hit != 'true' + run: make -j2 gomoddownload + - name: Install Tools + if: steps.go-cache.outputs.cache-hit != 'true' + run: make install-tools + - name: Build Collector ${{ matrix.binary }} + run: make GOOS=${{ matrix.os }} GOARCH=${{ matrix.arch }} otelcontribcol + - name: Upload Collector Binaries + uses: actions/upload-artifact@v3 + with: + name: collector-binaries + path: ./bin/* + + build-package: + # Use 20.04.5 until https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/16450 is resolved + runs-on: ubuntu-20.04 + needs: [cross-compile] + strategy: + fail-fast: false + matrix: + package_type: ["deb", "rpm"] + steps: + - name: Checkout Repo + uses: actions/checkout@v3 + with: + fetch-depth: 0 + - name: Install Ruby + uses: ruby/setup-ruby@v1 + with: + ruby-version: "2.6" + - name: Install fpm + run: gem install --no-document fpm -v 1.11.0 + - name: Download Collector Binaries + uses: actions/download-artifact@v3 + with: + name: collector-binaries + path: bin/ + - run: chmod +x bin/* + # - name: Set Release Tag + # id: github_tag + # run: ./.github/workflows/scripts/set_release_tag.sh + - name: Build ${{ matrix.package_type }} amd64 package + run: ./internal/buildscripts/packaging/fpm/${{ matrix.package_type }}/build.sh "0.0.${{ github.run_id }}" "amd64" "./dist/" + # - name: Build ${{ matrix.package_type }} arm64 package + # run: ./internal/buildscripts/packaging/fpm/${{ matrix.package_type }}/build.sh "0.0.${{ github.run_id }}" "arm64" "./dist/" + # - name: Build ${{ matrix.package_type }} ppc64le package + # run: ./internal/buildscripts/packaging/fpm/${{ matrix.package_type }}/build.sh "0.0.${{ github.run_id }}" "ppc64le" "./dist/" + - name: Test ${{ matrix.package_type }} package + run: | + if [[ "${{ matrix.package_type }}" = "deb" ]]; then + ./internal/buildscripts/packaging/fpm/test.sh dist/otel-contrib-collector*amd64.deb examples/tracing/otel-collector-config.yml + else + ./internal/buildscripts/packaging/fpm/test.sh dist/otel-contrib-collector*x86_64.rpm examples/tracing/otel-collector-config.yml + fi + - name: Upload Packages + uses: actions/upload-artifact@v3 + with: + name: collector-packages + path: ./dist/* + + publish-dev: + runs-on: ubuntu-latest + needs: [lint, unittest, integration-tests, build-package] + if: (github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/tags/v')) + steps: + - name: Checkout Repo + uses: actions/checkout@v3 + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: 1.21 + - name: Mkdir bin and dist + run: | + mkdir bin/ dist/ + - name: Cache Go + id: go-cache + uses: actions/cache@v3 + with: + path: | + ~/go/bin + ~/go/pkg/mod + key: go-cache-${{ runner.os }}-${{ hashFiles('**/go.sum') }} + - name: Install dependencies + if: steps.go-cache.outputs.cache-hit != 'true' + run: make -j2 gomoddownload + - name: Install Tools + if: steps.go-cache.outputs.cache-hit != 'true' + run: make install-tools + - name: Download Binaries + uses: actions/download-artifact@v3 + with: + name: collector-binaries + path: ./bin/ + - run: chmod +x bin/* + - name: Download Packages + uses: actions/download-artifact@v3 + with: + name: collector-packages + path: ./dist/ + - name: Add Permissions to Tool Binaries + run: chmod -R +x ./dist + - name: Sanitize branch name and create version + id: create-version + env: + BRANCH: ${{github.ref_name}} + RUN_NUMBER: ${{github.run_number}} + BASE_VERSION: "0.0.0" + run: | + # let's simply use the k8s namespace rules (even stricter) and have the same version(-suffix) for everything + # lowercase everything and replace all invalid characters with '-' and trim to 60 characters + SANITIZED_BRANCH=$(echo -n "${BRANCH}" | tr '[:upper:]' '[:lower:]' | tr -C 'a-z0-9' '-') + SANITIZED_BRANCH="${SANITIZED_BRANCH:0:60}" + + BUILD_VERSION="${BASE_VERSION}-${SANITIZED_BRANCH}-${RUN_NUMBER}" + echo "BUILD_VERSION=${BUILD_VERSION}" | tee -a $GITHUB_ENV $GITHUB_OUTPUT + - name: Build Docker Image + run: | + make docker-otelcontribcol + docker tag otelcontribcol:latest 609927696493.dkr.ecr.us-west-2.amazonaws.com/opentelemetry-collector-contrib:${{ steps.create-version.outputs.BUILD_VERSION }} + - name: Validate Docker Image + run: | + docker run 609927696493.dkr.ecr.us-west-2.amazonaws.com/opentelemetry-collector-contrib:${{ steps.create-version.outputs.BUILD_VERSION }} --version + - id: login-gcp + name: Authenticate with Google Cloud + uses: google-github-actions/auth@v1 + with: + token_format: access_token + workload_identity_provider: ${{secrets.GCR_WORKLOAD_IDENTITY_PROVIDER}} + service_account: ${{secrets.GCR_SERVICE_ACCOUNT}} + access_token_lifetime: 1800s + - name: Login to us Artifact Registry + uses: docker/login-action@v2 + with: + registry: us-docker.pkg.dev + username: oauth2accesstoken + password: ${{ steps.login-gcp.outputs.access_token }} + - name: Login to eu Artifact Registry + uses: docker/login-action@v2 + with: + registry: europe-docker.pkg.dev + username: oauth2accesstoken + password: ${{ steps.login-gcp.outputs.access_token }} + - name: Login to asia Artifact Registry + uses: docker/login-action@v2 + with: + registry: asia-docker.pkg.dev + username: oauth2accesstoken + password: ${{ steps.login-gcp.outputs.access_token }} + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v1-node16 + with: + aws-access-key-id: ${{secrets.ECR_AWS_ACCESS_KEY_ID}} + aws-secret-access-key: ${{secrets.ECR_AWS_SECRET_ACCESS_KEY}} + aws-region: us-west-2 + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v1 + - name: Push Docker Image + run: | + docker push 609927696493.dkr.ecr.us-west-2.amazonaws.com/opentelemetry-collector-contrib:${{ steps.create-version.outputs.BUILD_VERSION }} + - name: Push image to GCP + env: + DOCKER_IMAGE: otelcontribcol + BUILD_VERSION: ${{steps.create-version.outputs.BUILD_VERSION}} + GCR_ASIA_IMAGE: ${{secrets.GCR_ASIA_IMAGE}} + GCR_EUROPE_IMAGE: ${{secrets.GCR_EUROPE_IMAGE}} + GCR_US_IMAGE: ${{secrets.GCR_US_IMAGE}} + run: | + docker tag $DOCKER_IMAGE:latest ${GCR_ASIA_IMAGE}:${BUILD_VERSION} + docker tag $DOCKER_IMAGE:latest ${GCR_EUROPE_IMAGE}:${BUILD_VERSION} + docker tag $DOCKER_IMAGE:latest ${GCR_US_IMAGE}:${BUILD_VERSION} + docker push -a ${GCR_ASIA_IMAGE} + docker push -a ${GCR_EUROPE_IMAGE} + docker push -a ${GCR_US_IMAGE} diff --git a/cmd/otelcontribcol/processors_test.go b/cmd/otelcontribcol/processors_test.go index 868ddc2a74f4..8e9329fc84c0 100644 --- a/cmd/otelcontribcol/processors_test.go +++ b/cmd/otelcontribcol/processors_test.go @@ -74,6 +74,10 @@ func TestDefaultProcessors(t *testing.T) { processor: "k8sattributes", skipLifecycle: true, // Requires a k8s API to communicate with }, + { + processor: "logstransform", + skipLifecycle: true, + }, { processor: "memory_limiter", getConfigFn: func() component.Config { diff --git a/exporter/clickhouseexporter/exporter_logs.go b/exporter/clickhouseexporter/exporter_logs.go index a970bdb26c96..f60d21ee57f2 100644 --- a/exporter/clickhouseexporter/exporter_logs.go +++ b/exporter/clickhouseexporter/exporter_logs.go @@ -7,9 +7,11 @@ import ( "context" "database/sql" "fmt" + "net/url" + "strings" "time" - _ "github.com/ClickHouse/clickhouse-go/v2" // For register database driver. + "github.com/ClickHouse/clickhouse-go/v2" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" @@ -28,7 +30,7 @@ type logsExporter struct { } func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) { - client, err := newClickhouseClient(cfg) + client, err := newClickHouseConn(cfg) if err != nil { return nil, err } @@ -57,35 +59,72 @@ func (e *logsExporter) shutdown(_ context.Context) error { return nil } -func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { +func (e *logsExporter) pushLogsData(_ context.Context, ld plog.Logs) error { start := time.Now() - err := doWithTx(ctx, e.client, func(tx *sql.Tx) error { - statement, err := tx.PrepareContext(ctx, e.insertSQL) + err := func() error { + scope, err := e.client.Begin() if err != nil { - return fmt.Errorf("PrepareContext:%w", err) + return fmt.Errorf("Begin:%w", err) } - defer func() { - _ = statement.Close() - }() + + batch, err := scope.Prepare(e.insertSQL) + if err != nil { + return fmt.Errorf("Prepare:%w", err) + } + var serviceName string - for i := 0; i < ld.ResourceLogs().Len(); i++ { - logs := ld.ResourceLogs().At(i) + var podName string + var containerName string + var region string + var cloudProvider string + var cell string + + resAttr := make(map[string]string) + + resourceLogs := ld.ResourceLogs() + for i := 0; i < resourceLogs.Len(); i++ { + logs := resourceLogs.At(i) res := logs.Resource() resURL := logs.SchemaUrl() - resAttr := attributesToMap(res.Attributes()) - if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok { - serviceName = v.Str() - } + + attrs := res.Attributes() + attributesToMap(attrs, resAttr) + + attrs.Range(func(key string, value pcommon.Value) bool { + switch key { + case conventions.AttributeServiceName: + serviceName = value.Str() + case conventions.AttributeK8SPodName: + podName = value.AsString() + case conventions.AttributeK8SContainerName: + containerName = value.AsString() + // TODO use AttributeCloudRegion 'cloud.region' + // https://github.com/ClickHouse/data-plane-application/issues/4155 + case "region": + fallthrough + case conventions.AttributeCloudRegion: + region = value.AsString() + case conventions.AttributeCloudProvider: + cloudProvider = value.AsString() + case "cell": + cell = value.AsString() + } + return true + }) for j := 0; j < logs.ScopeLogs().Len(); j++ { rs := logs.ScopeLogs().At(j).LogRecords() scopeURL := logs.ScopeLogs().At(j).SchemaUrl() scopeName := logs.ScopeLogs().At(j).Scope().Name() scopeVersion := logs.ScopeLogs().At(j).Scope().Version() - scopeAttr := attributesToMap(logs.ScopeLogs().At(j).Scope().Attributes()) + scopeAttr := make(map[string]string, attrs.Len()) + attributesToMap(logs.ScopeLogs().At(j).Scope().Attributes(), scopeAttr) for k := 0; k < rs.Len(); k++ { r := rs.At(k) - logAttr := attributesToMap(r.Attributes()) - _, err = statement.ExecContext(ctx, + + logAttr := make(map[string]string, attrs.Len()) + attributesToMap(r.Attributes(), logAttr) + + _, err = batch.Exec( r.Timestamp().AsTime(), traceutil.TraceIDToHexOrEmptyString(r.TraceID()), traceutil.SpanIDToHexOrEmptyString(r.SpanID()), @@ -95,6 +134,11 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { serviceName, r.Body().AsString(), resURL, + podName, + containerName, + region, + cloudProvider, + cell, resAttr, scopeURL, scopeName, @@ -103,26 +147,31 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { logAttr, ) if err != nil { - return fmt.Errorf("ExecContext:%w", err) + return fmt.Errorf("Append:%w", err) } } } + + // clear map for reuse + for k := range resAttr { + delete(resAttr, k) + } } - return nil - }) + + return scope.Commit() + }() + duration := time.Since(start) e.logger.Debug("insert logs", zap.Int("records", ld.LogRecordCount()), zap.String("cost", duration.String())) return err } -func attributesToMap(attributes pcommon.Map) map[string]string { - m := make(map[string]string, attributes.Len()) +func attributesToMap(attributes pcommon.Map, dest map[string]string) { attributes.Range(func(k string, v pcommon.Value) bool { - m[k] = v.AsString() + dest[k] = v.AsString() return true }) - return m } const ( @@ -136,7 +185,12 @@ CREATE TABLE IF NOT EXISTS %s ( SeverityText LowCardinality(String) CODEC(ZSTD(1)), SeverityNumber Int32 CODEC(ZSTD(1)), ServiceName LowCardinality(String) CODEC(ZSTD(1)), - Body String CODEC(ZSTD(1)), + Body LowCardinality(String) CODEC(ZSTD(1)), + PodName LowCardinality(String), + ContainerName LowCardinality(String), + Region LowCardinality(String), + CloudProvider LowCardinality(String), + Cell LowCardinality(String), ResourceSchemaUrl String CODEC(ZSTD(1)), ResourceAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)), ScopeSchemaUrl String CODEC(ZSTD(1)), @@ -154,10 +208,11 @@ CREATE TABLE IF NOT EXISTS %s ( INDEX idx_body Body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1 ) ENGINE MergeTree() %s -PARTITION BY toDate(Timestamp) -ORDER BY (ServiceName, SeverityText, toUnixTimestamp(Timestamp), TraceId) +PARTITION BY toYYYYMM(Timestamp) +ORDER BY (PodName, ContainerName, SeverityText, Timestamp) SETTINGS index_granularity=8192, ttl_only_drop_parts = 1; ` + // language=ClickHouse SQL insertLogsSQLTemplate = `INSERT INTO %s ( Timestamp, @@ -169,35 +224,47 @@ SETTINGS index_granularity=8192, ttl_only_drop_parts = 1; ServiceName, Body, ResourceSchemaUrl, + PodName, + ContainerName, + Region, + CloudProvider, + Cell, + ResourceAttributes, + ScopeSchemaUrl, + ScopeName, + ScopeVersion, + ScopeAttributes, + LogAttributes + )` + inlineinsertLogsSQLTemplate = `INSERT INTO %s SETTINGS async_insert=1, wait_for_async_insert=0 ( + Timestamp, + TraceId, + SpanId, + TraceFlags, + SeverityText, + SeverityNumber, + ServiceName, + Body, + ResourceSchemaUrl, + PodName, + ContainerName, + Region, + CloudProvider, + Cell, ResourceAttributes, ScopeSchemaUrl, ScopeName, ScopeVersion, ScopeAttributes, LogAttributes - ) VALUES ( - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ? - )` + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` ) var driverName = "clickhouse" // for testing -// newClickhouseClient create a clickhouse client. -func newClickhouseClient(cfg *Config) (*sql.DB, error) { +// newClickHouseClient create a clickhouse client. +// used by metrics and traces: +func newClickHouseClient(cfg *Config) (*sql.DB, error) { db, err := cfg.buildDB(cfg.Database) if err != nil { return nil, err @@ -205,6 +272,41 @@ func newClickhouseClient(cfg *Config) (*sql.DB, error) { return db, nil } +// used by logs: +func newClickHouseConn(cfg *Config) (*sql.DB, error) { + endpoint := cfg.Endpoint + + if len(cfg.ConnectionParams) > 0 { + values := make(url.Values, len(cfg.ConnectionParams)) + for k, v := range cfg.ConnectionParams { + values.Add(k, v) + } + + if !strings.Contains(endpoint, "?") { + endpoint += "?" + } else if !strings.HasSuffix(endpoint, "&") { + endpoint += "&" + } + + endpoint += values.Encode() + } + + opts, err := clickhouse.ParseDSN(endpoint) + if err != nil { + return nil, fmt.Errorf("unable to parse endpoint: %w", err) + } + + opts.Auth = clickhouse.Auth{ + Database: cfg.Database, + Username: cfg.Username, + Password: string(cfg.Password), + } + + // can return a "bad" connection if misconfigured, we won't know + // until a Ping, Exec, etc.. is done + return clickhouse.OpenDB(opts), nil +} + func createDatabase(ctx context.Context, cfg *Config) error { // use default database to create new database if cfg.Database == defaultDatabase { @@ -242,19 +344,8 @@ func renderCreateLogsTableSQL(cfg *Config) string { } func renderInsertLogsSQL(cfg *Config) string { - return fmt.Sprintf(insertLogsSQLTemplate, cfg.LogsTableName) -} - -func doWithTx(_ context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error { - tx, err := db.Begin() - if err != nil { - return fmt.Errorf("db.Begin: %w", err) + if strings.HasPrefix(cfg.Endpoint, "tcp") && cfg.ConnectionParams["async_insert"] == "1" { + return fmt.Sprintf(inlineinsertLogsSQLTemplate, cfg.LogsTableName) } - defer func() { - _ = tx.Rollback() - }() - if err := fn(tx); err != nil { - return err - } - return tx.Commit() + return fmt.Sprintf(insertLogsSQLTemplate, cfg.LogsTableName) } diff --git a/exporter/clickhouseexporter/exporter_logs_test.go b/exporter/clickhouseexporter/exporter_logs_test.go index c2d2bc44f2d7..92297a101dcf 100644 --- a/exporter/clickhouseexporter/exporter_logs_test.go +++ b/exporter/clickhouseexporter/exporter_logs_test.go @@ -51,7 +51,7 @@ func TestLogsExporter_New(t *testing.T) { }{ "no dsn": { config: withDefaultConfig(), - want: failWithMsg("exec create logs table sql: parse dsn address failed"), + want: failWithMsg("parse dsn address failed"), }, } @@ -78,6 +78,7 @@ func TestExporter_pushLogsData(t *testing.T) { t.Run("push success", func(t *testing.T) { var items int initClickhouseTestServer(t, func(query string, values []driver.Value) error { + t.Logf(query) t.Logf("%d, values:%+v", items, values) if strings.HasPrefix(query, "INSERT") { items++ @@ -97,7 +98,7 @@ func TestExporter_pushLogsData(t *testing.T) { require.Equal(t, "https://opentelemetry.io/schemas/1.4.0", values[8]) require.Equal(t, map[string]string{ "service.name": "test-service", - }, values[9]) + }, values[14]) } return nil }) @@ -107,12 +108,12 @@ func TestExporter_pushLogsData(t *testing.T) { t.Run("test check scope metadata", func(t *testing.T) { initClickhouseTestServer(t, func(query string, values []driver.Value) error { if strings.HasPrefix(query, "INSERT") { - require.Equal(t, "https://opentelemetry.io/schemas/1.7.0", values[10]) - require.Equal(t, "io.opentelemetry.contrib.clickhouse", values[11]) - require.Equal(t, "1.0.0", values[12]) + require.Equal(t, "https://opentelemetry.io/schemas/1.7.0", values[15]) + require.Equal(t, "io.opentelemetry.contrib.clickhouse", values[16]) + require.Equal(t, "1.0.0", values[17]) require.Equal(t, map[string]string{ "lib": "clickhouse", - }, values[13]) + }, values[18]) } return nil }) @@ -122,7 +123,12 @@ func TestExporter_pushLogsData(t *testing.T) { } func newTestLogsExporter(t *testing.T, dsn string, fns ...func(*Config)) *logsExporter { - exporter, err := newLogsExporter(zaptest.NewLogger(t), withTestExporterConfig(fns...)(dsn)) + cfg := withTestExporterConfig(fns...)(dsn) + exporter, err := newLogsExporter(zaptest.NewLogger(t), cfg) + require.NoError(t, err) + + // need to use the dummy driver driver for testing + exporter.client, err = newClickHouseClient(cfg) require.NoError(t, err) require.NoError(t, exporter.start(context.TODO(), nil)) @@ -216,7 +222,18 @@ func (*testClickhouseDriverStmt) Close() error { } func (t *testClickhouseDriverStmt) NumInput() int { - return strings.Count(t.query, "?") + if !strings.HasPrefix(t.query, `INSERT`) { + return 0 + } + + n := strings.Count(t.query, "?") + if n > 0 { + return n + } + + // no ? in batched queries but column are separated with "," + // except for the last one + return strings.Count(t.query, ",") + 1 } func (t *testClickhouseDriverStmt) Exec(args []driver.Value) (driver.Result, error) { diff --git a/exporter/clickhouseexporter/exporter_metrics.go b/exporter/clickhouseexporter/exporter_metrics.go index 916b9381e201..ccc978032bd0 100644 --- a/exporter/clickhouseexporter/exporter_metrics.go +++ b/exporter/clickhouseexporter/exporter_metrics.go @@ -24,7 +24,7 @@ type metricsExporter struct { } func newMetricsExporter(logger *zap.Logger, cfg *Config) (*metricsExporter, error) { - client, err := newClickhouseClient(cfg) + client, err := newClickHouseClient(cfg) if err != nil { return nil, err } @@ -57,7 +57,11 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric metricsMap := internal.NewMetricsModel(e.cfg.MetricsTableName) for i := 0; i < md.ResourceMetrics().Len(); i++ { metrics := md.ResourceMetrics().At(i) - resAttr := attributesToMap(metrics.Resource().Attributes()) + res := metrics.Resource() + + resAttr := make(map[string]string, res.Attributes().Len()) + attributesToMap(metrics.Resource().Attributes(), resAttr) + for j := 0; j < metrics.ScopeMetrics().Len(); j++ { rs := metrics.ScopeMetrics().At(j).Metrics() scopeInstr := metrics.ScopeMetrics().At(j).Scope() diff --git a/exporter/clickhouseexporter/exporter_traces.go b/exporter/clickhouseexporter/exporter_traces.go index 6f03ee295833..0cb4cecb3c6e 100644 --- a/exporter/clickhouseexporter/exporter_traces.go +++ b/exporter/clickhouseexporter/exporter_traces.go @@ -28,7 +28,7 @@ type tracesExporter struct { } func newTracesExporter(logger *zap.Logger, cfg *Config) (*tracesExporter, error) { - client, err := newClickhouseClient(cfg) + client, err := newClickHouseClient(cfg) if err != nil { return nil, err } @@ -59,8 +59,8 @@ func (e *tracesExporter) shutdown(_ context.Context) error { func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error { start := time.Now() - err := doWithTx(ctx, e.client, func(tx *sql.Tx) error { - statement, err := tx.PrepareContext(ctx, e.insertSQL) + err := func() error { + statement, err := e.client.PrepareContext(ctx, e.insertSQL) if err != nil { return fmt.Errorf("PrepareContext:%w", err) } @@ -70,7 +70,10 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er for i := 0; i < td.ResourceSpans().Len(); i++ { spans := td.ResourceSpans().At(i) res := spans.Resource() - resAttr := attributesToMap(res.Attributes()) + attr := res.Attributes() + resAttr := make(map[string]string, attr.Len()) + attributesToMap(attr, resAttr) + var serviceName string if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok { serviceName = v.Str() @@ -81,7 +84,9 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er scopeVersion := spans.ScopeSpans().At(j).Scope().Version() for k := 0; k < rs.Len(); k++ { r := rs.At(k) - spanAttr := attributesToMap(r.Attributes()) + + spanAttr := make(map[string]string, res.Attributes().Len()) + attributesToMap(r.Attributes(), spanAttr) status := r.Status() eventTimes, eventNames, eventAttrs := convertEvents(r.Events()) linksTraceIDs, linksSpanIDs, linksTraceStates, linksAttrs := convertLinks(r.Links()) @@ -116,7 +121,7 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er } } return nil - }) + }() duration := time.Since(start) e.logger.Debug("insert traces", zap.Int("records", td.SpanCount()), zap.String("cost", duration.String())) @@ -124,33 +129,38 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er } func convertEvents(events ptrace.SpanEventSlice) ([]time.Time, []string, []map[string]string) { - var ( - times []time.Time - names []string - attrs []map[string]string - ) + times := make([]time.Time, events.Len()) + names := make([]string, events.Len()) + attrs := make([]map[string]string, events.Len()) + for i := 0; i < events.Len(); i++ { event := events.At(i) times = append(times, event.Timestamp().AsTime()) names = append(names, event.Name()) - attrs = append(attrs, attributesToMap(event.Attributes())) + + eventAttrs := event.Attributes() + dest := make(map[string]string, eventAttrs.Len()) + attributesToMap(eventAttrs, dest) + attrs = append(attrs, dest) } return times, names, attrs } func convertLinks(links ptrace.SpanLinkSlice) ([]string, []string, []string, []map[string]string) { - var ( - traceIDs []string - spanIDs []string - states []string - attrs []map[string]string - ) + traceIDs := make([]string, links.Len()) + spanIDs := make([]string, links.Len()) + states := make([]string, links.Len()) + attrs := make([]map[string]string, links.Len()) + for i := 0; i < links.Len(); i++ { link := links.At(i) traceIDs = append(traceIDs, traceutil.TraceIDToHexOrEmptyString(link.TraceID())) spanIDs = append(spanIDs, traceutil.SpanIDToHexOrEmptyString(link.SpanID())) states = append(states, link.TraceState().AsRaw()) - attrs = append(attrs, attributesToMap(link.Attributes())) + + linkAttrs := link.Attributes() + dest := make(map[string]string, linkAttrs.Len()) + attrs = append(attrs, dest) } return traceIDs, spanIDs, states, attrs } From 2a916916cb95f5f843f9e5b6ec00d157d71a727e Mon Sep 17 00:00:00 2001 From: Rory Crispin Date: Wed, 6 Sep 2023 10:51:19 +0100 Subject: [PATCH 2/4] Fix auth to GCP, drop building RPM/DEBs --- .github/workflows/build-test-publish.yaml | 52 ++--------------------- 1 file changed, 4 insertions(+), 48 deletions(-) diff --git a/.github/workflows/build-test-publish.yaml b/.github/workflows/build-test-publish.yaml index 067128085d6c..672debb5be12 100644 --- a/.github/workflows/build-test-publish.yaml +++ b/.github/workflows/build-test-publish.yaml @@ -368,56 +368,12 @@ jobs: name: collector-binaries path: ./bin/* - build-package: - # Use 20.04.5 until https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/16450 is resolved - runs-on: ubuntu-20.04 - needs: [cross-compile] - strategy: - fail-fast: false - matrix: - package_type: ["deb", "rpm"] - steps: - - name: Checkout Repo - uses: actions/checkout@v3 - with: - fetch-depth: 0 - - name: Install Ruby - uses: ruby/setup-ruby@v1 - with: - ruby-version: "2.6" - - name: Install fpm - run: gem install --no-document fpm -v 1.11.0 - - name: Download Collector Binaries - uses: actions/download-artifact@v3 - with: - name: collector-binaries - path: bin/ - - run: chmod +x bin/* - # - name: Set Release Tag - # id: github_tag - # run: ./.github/workflows/scripts/set_release_tag.sh - - name: Build ${{ matrix.package_type }} amd64 package - run: ./internal/buildscripts/packaging/fpm/${{ matrix.package_type }}/build.sh "0.0.${{ github.run_id }}" "amd64" "./dist/" - # - name: Build ${{ matrix.package_type }} arm64 package - # run: ./internal/buildscripts/packaging/fpm/${{ matrix.package_type }}/build.sh "0.0.${{ github.run_id }}" "arm64" "./dist/" - # - name: Build ${{ matrix.package_type }} ppc64le package - # run: ./internal/buildscripts/packaging/fpm/${{ matrix.package_type }}/build.sh "0.0.${{ github.run_id }}" "ppc64le" "./dist/" - - name: Test ${{ matrix.package_type }} package - run: | - if [[ "${{ matrix.package_type }}" = "deb" ]]; then - ./internal/buildscripts/packaging/fpm/test.sh dist/otel-contrib-collector*amd64.deb examples/tracing/otel-collector-config.yml - else - ./internal/buildscripts/packaging/fpm/test.sh dist/otel-contrib-collector*x86_64.rpm examples/tracing/otel-collector-config.yml - fi - - name: Upload Packages - uses: actions/upload-artifact@v3 - with: - name: collector-packages - path: ./dist/* - publish-dev: + permissions: + contents: 'read' + id-token: 'write' runs-on: ubuntu-latest - needs: [lint, unittest, integration-tests, build-package] + needs: [lint, unittest, integration-tests, cross-compile] if: (github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/tags/v')) steps: - name: Checkout Repo From d6da6f4f2d0ed23f8afd6f539ea6f52d5c201903 Mon Sep 17 00:00:00 2001 From: Rory Crispin Date: Wed, 6 Sep 2023 12:05:53 +0100 Subject: [PATCH 3/4] Drop RPM/DEB from build flow --- .github/workflows/build-test-publish.yaml | 7 ------- 1 file changed, 7 deletions(-) diff --git a/.github/workflows/build-test-publish.yaml b/.github/workflows/build-test-publish.yaml index 672debb5be12..a03e97d1d709 100644 --- a/.github/workflows/build-test-publish.yaml +++ b/.github/workflows/build-test-publish.yaml @@ -405,13 +405,6 @@ jobs: name: collector-binaries path: ./bin/ - run: chmod +x bin/* - - name: Download Packages - uses: actions/download-artifact@v3 - with: - name: collector-packages - path: ./dist/ - - name: Add Permissions to Tool Binaries - run: chmod -R +x ./dist - name: Sanitize branch name and create version id: create-version env: From ae223460c7a1dae01e0acc69a6f623248ca2bcf7 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 7 Sep 2023 13:07:35 +0000 Subject: [PATCH 4/4] Bump github.com/cyphar/filepath-securejoin from 0.2.3 to 0.2.4 Bumps [github.com/cyphar/filepath-securejoin](https://github.com/cyphar/filepath-securejoin) from 0.2.3 to 0.2.4. - [Release notes](https://github.com/cyphar/filepath-securejoin/releases) - [Commits](https://github.com/cyphar/filepath-securejoin/compare/v0.2.3...v0.2.4) --- updated-dependencies: - dependency-name: github.com/cyphar/filepath-securejoin dependency-type: indirect ... Signed-off-by: dependabot[bot] --- go.mod | 2 +- go.sum | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index b06fa9192675..6d7fd943ec55 100644 --- a/go.mod +++ b/go.mod @@ -315,7 +315,7 @@ require ( github.com/coreos/go-oidc v2.2.1+incompatible // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cskr/pubsub v1.0.2 // indirect - github.com/cyphar/filepath-securejoin v0.2.3 // indirect + github.com/cyphar/filepath-securejoin v0.2.4 // indirect github.com/danieljoos/wincred v1.1.2 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/denisenkom/go-mssqldb v0.12.2 // indirect diff --git a/go.sum b/go.sum index 72661c20159b..d14c903f4833 100644 --- a/go.sum +++ b/go.sum @@ -1296,8 +1296,9 @@ github.com/crossdock/crossdock-go v0.0.0-20160816171116-049aabb0122b/go.mod h1:v github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0= github.com/cskr/pubsub v1.0.2/go.mod h1:/8MzYXk/NJAz782G8RPkFzXTZVu63VotefPnR9TIRis= github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4= -github.com/cyphar/filepath-securejoin v0.2.3 h1:YX6ebbZCZP7VkM3scTTokDgBL2TY741X51MTk3ycuNI= github.com/cyphar/filepath-securejoin v0.2.3/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4= +github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg= +github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4= github.com/d2g/dhcp4 v0.0.0-20170904100407-a1d1b6c41b1c/go.mod h1:Ct2BUK8SB0YC1SMSibvLzxjeJLnrYEVLULFNiHY9YfQ= github.com/d2g/dhcp4client v1.0.0/go.mod h1:j0hNfjhrt2SxUOw55nL0ATM/z4Yt3t2Kd1mW34z5W5s= github.com/d2g/dhcp4server v0.0.0-20181031114812-7d4a0a7f59a5/go.mod h1:Eo87+Kg/IX2hfWJfwxMzLyuSZyxSoAug2nGa1G2QAi8=