diff --git a/docs/plugins/klogs.md b/docs/plugins/klogs.md index 356c04cfe..03ec020ff 100644 --- a/docs/plugins/klogs.md +++ b/docs/plugins/klogs.md @@ -16,8 +16,10 @@ To use the klogs plugin the following configuration is needed in the satellites | options.database | string | The name of the database. | Yes | | options.username | string | Username to access a ClickHouse instance. | No | | options.password | string | Password to access a ClickHouse instance. | No | -| options.readTimeout | string | The read timeout for operations. | No | -| options.writeTimeout | string | The write timeout for operations. | No | +| options.dialTimeout | string | ClickHouse dial timeout. The default value is `10s`. | No | +| options.connMaxLifetime | string | ClickHouse maximum connection lifetime. The default value is `1h`. | No | +| options.maxIdleConns | number | ClickHouse maximum number of idle connections. The default value is `5`. | No | +| options.maxOpenConns | number | ClickHouse maximum number of open connections. The default value is `10`. | No | | options.materializedColumns | []string | A list of materialized columns. See [kobsio/klogs](https://github.com/kobsio/klogs#configuration) for more information. | No | ```yaml @@ -27,10 +29,12 @@ plugins: options: address: database: - writeTimeout: - readTimeout: username: password: + dialTimeout: + connMaxLifetime: + maxIdleConns: + maxOpenConns: materializedColumns: ``` @@ -107,16 +111,16 @@ kobs supports multiple operators which can be used in a query to retrieve logs f | `_not_` | Exclude the term from the query. | `cluster='kobs-demo' _and_ _not_ namespace='bookinfo'` | | `_and_` | Both terms must be included in the results. | `namespace='bookinfo' _and_ app='bookinfo'` | | `_or_` | The result can contain one of the given terms. | `namespace='bookinfo' _or_ namespace='istio-system'` | -| `_exists_` | The field can not be `null` | `container_name='istio-proxy' _and_ _exists_ content.request_id` | +| `_exists_` | The field can not be `null` | `container_name='istio-proxy' _and_ _exists_ content_request_id` | | `=` | The field must have this value. | `namespace='bookinfo'` | | `!=` | The field should not have this value. | `namespace!='bookinfo'` | -| `>` | The value of the field must be greater than the specified value. | `content.response_code>499` | -| `>=` | The value of the field must be greater than or equal to the specified value. | `content.response_code>=500` | -| `<` | The value of the field must be lower than the specified value. | `content.response_code<500` | -| `<=` | The value of the field must be lower than or equal to the specified value. | `content.response_code<=499` | -| `=~` | The value of the field is compared using `ILIKE`. | `content.upstream_cluster=~'inbound%'` | -| `!~` | The value of the field is compared using `NOT ILIKE`. | `content.upstream_cluster!~'inbound%'` | -| `~` | The value of the field must match the regular expression. The syntax of the `re2` regular expressions can be found [here](https://github.com/google/re2/wiki/Syntax). | `content.upstream_cluster~'inbound.*'` | +| `>` | The value of the field must be greater than the specified value. | `content_response_code>499` | +| `>=` | The value of the field must be greater than or equal to the specified value. | `content_response_code>=500` | +| `<` | The value of the field must be lower than the specified value. | `content_response_code<500` | +| `<=` | The value of the field must be lower than or equal to the specified value. | `content_response_code<=499` | +| `=~` | The value of the field is compared using `ILIKE`. | `content_upstream_cluster=~'inbound%'` | +| `!~` | The value of the field is compared using `NOT ILIKE`. | `content_upstream_cluster!~'inbound%'` | +| `~` | The value of the field must match the regular expression. The syntax of the `re2` regular expressions can be found [here](https://github.com/google/re2/wiki/Syntax). | `content_upstream_cluster~'inbound.*'` | #### Default Fields @@ -133,7 +137,7 @@ In the following you can find a list of fields which are available for each log #### Examples -- `namespace='bookinfo' _and_ app='bookinfo' _and_ container_name='istio-proxy' _and_ content.upstream_cluster~'inbound.*'`: Select all inbound Istio logs from the bookinfo app in the bookinfo namespace. +- `namespace='bookinfo' _and_ app='bookinfo' _and_ container_name='istio-proxy' _and_ content_upstream_cluster~'inbound.*'`: Select all inbound Istio logs from the bookinfo app in the bookinfo namespace. ### Logs Dashboard @@ -154,18 +158,18 @@ spec: type: logs queries: - name: Istio Logs - query: "namespace='bookinfo' _and_ app='bookinfo' _and_ container_name='istio-proxy' _and_ content.upstream_cluster~'inbound.*'" + query: "namespace='bookinfo' _and_ app='bookinfo' _and_ container_name='istio-proxy' _and_ content_upstream_cluster~'inbound.*'" fields: - "pod_name" - - "content.authority" - - "content.route_name" - - "content.protocol" - - "content.method" - - "content.path" - - "content.response_code" - - "content.upstream_service_time" - - "content.bytes_received" - - "content.bytes_sent" + - "content_authority" + - "content_route_name" + - "content_protocol" + - "content_method" + - "content_path" + - "content_response_code" + - "content_upstream_service_time" + - "content_bytes_received" + - "content_bytes_sent" ``` ### Aggregation Dashboard @@ -210,7 +214,7 @@ spec: query: "cluster='kobs-demo' _and_ app='myapplication' _and_ container_name='myapplication'" chart: pie options: - sliceBy: content.level + sliceBy: content_level sizeByOperation: count - size: 3 panels: @@ -222,14 +226,14 @@ spec: options: type: aggregation aggregation: - query: "cluster='kobs-demo' _and_ app='myapplication' _and_ container_name='istio-proxy' _and_ content.response_code>0" + query: "cluster='kobs-demo' _and_ app='myapplication' _and_ container_name='istio-proxy' _and_ content_response_code>0" chart: line options: horizontalAxisOperation: time verticalAxisOperation: avg - verticalAxisField: content.duration + verticalAxisField: content_duration breakDownByFields: - - content.response_code + - content_response_code ``` ![Aggregation Example](assets/klogs-aggregation.png) diff --git a/docs/plugins/sql.md b/docs/plugins/sql.md index 85c586362..88d72b2e5 100644 --- a/docs/plugins/sql.md +++ b/docs/plugins/sql.md @@ -111,8 +111,8 @@ The following options can be used for a panel with the SQL plugin: query: | SELECT toStartOfInterval(timestamp, INTERVAL 60 second) AS time, - avg(fields_number.value[indexOf(fields_number.key, 'content.duration')]) as avg_duration, - avg(fields_number.value[indexOf(fields_number.key, 'content.upstream_service_time')]) as avg_ust, + avg(fields_number['content_duration']) as avg_duration, + avg(fields_number['content_upstream_service_time']) as avg_ust, avg_duration - avg_ust as avg_diff FROM logs.logs @@ -122,7 +122,7 @@ The following options can be used for a panel with the SQL plugin: AND namespace='myservice' AND app='myservice' AND container_name='istio-proxy' - AND match(fields_string.value[indexOf(fields_string.key, 'content.upstream_cluster')], '^inbound.*') + AND match(fields_string['content_upstream_cluster'], '^inbound.*') GROUP BY time ORDER BY @@ -153,7 +153,7 @@ The following options can be used for a panel with the SQL plugin: query: | SELECT toStartOfInterval(timestamp, INTERVAL 60 second) AS time, - avg(fields_number.value[indexOf(fields_number.key, 'content.duration')]) - avg(fields_number.value[indexOf(fields_number.key, 'content.upstream_service_time')]) as avg_diff + avg(fields_number['content_duration']) - avg(fields_number['content_upstream_service_time']) as avg_diff FROM logs.logs WHERE @@ -162,7 +162,7 @@ The following options can be used for a panel with the SQL plugin: AND namespace='myservice' AND app='myservice' AND container_name='istio-proxy' - AND match(fields_string.value[indexOf(fields_string.key, 'content.upstream_cluster')], '^inbound.*') + AND match(fields_string['content_upstream_cluster'], '^inbound.*') GROUP BY time ORDER BY @@ -188,8 +188,8 @@ The following options can be used for a panel with the SQL plugin: query: | SELECT toStartOfInterval(timestamp, INTERVAL 60 second) AS time, - avg(fields_number.value[indexOf(fields_number.key, 'content.duration')]) as avg_duration, - avg(fields_number.value[indexOf(fields_number.key, 'content.upstream_service_time')]) as avg_ust + avg(fields_number['content_duration']) as avg_duration, + avg(fields_number['content_upstream_service_time']) as avg_ust FROM logs.logs WHERE @@ -198,7 +198,7 @@ The following options can be used for a panel with the SQL plugin: AND namespace='myservice' AND app='myservice' AND container_name='istio-proxy' - AND match(fields_string.value[indexOf(fields_string.key, 'content.upstream_cluster')], '^inbound.*') + AND match(fields_string['content_upstream_cluster'], '^inbound.*') GROUP BY time ORDER BY @@ -230,8 +230,8 @@ The following options can be used for a panel with the SQL plugin: - name: Log Levels query: | SELECT - content.level, - count(content.level) as count_data + content_level, + count(content_level) as count_data FROM logs.logs WHERE @@ -241,9 +241,9 @@ The following options can be used for a panel with the SQL plugin: AND app='myservice' AND container_name='myservice' GROUP BY - content.level + content_level columns: - content.level: + content_level: title: Level count_data: title: Count @@ -258,8 +258,8 @@ The following options can be used for a panel with the SQL plugin: type: pie query: | SELECT - content.level, - count(content.level) as count_data + content_level, + count(content_level) as count_data FROM logs.logs WHERE @@ -269,8 +269,8 @@ The following options can be used for a panel with the SQL plugin: AND app='myservice' AND container_name='myservice' GROUP BY - content.level - pieLabelColumn: content.level + content_level + pieLabelColumn: content_level pieValueColumn: count_data ``` diff --git a/go.mod b/go.mod index 85ee06627..d9f726378 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/Azure/go-autorest/autorest v0.11.28 github.com/Azure/go-autorest/autorest/azure/auth v0.5.11 github.com/Azure/go-autorest/autorest/date v0.3.0 - github.com/ClickHouse/clickhouse-go v1.5.4 + github.com/ClickHouse/clickhouse-go/v2 v2.3.0 github.com/DataDog/datadog-api-client-go/v2 v2.3.1 github.com/andygrunwald/go-jira v1.16.0 github.com/coreos/go-oidc/v3 v3.3.0 @@ -72,16 +72,17 @@ require ( github.com/Azure/go-autorest/logger v0.2.1 // indirect github.com/Azure/go-autorest/tracing v0.6.0 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect + github.com/ClickHouse/ch-go v0.47.3 // indirect github.com/DataDog/zstd v1.5.0 // indirect github.com/Microsoft/go-winio v0.5.2 // indirect github.com/PuerkitoBio/goquery v1.5.1 // indirect github.com/PuerkitoBio/purell v1.1.1 // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect github.com/ajg/form v1.5.1 // indirect + github.com/andybalholm/brotli v1.0.4 // indirect github.com/andybalholm/cascadia v1.1.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect - github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dimchansky/utfbom v1.1.1 // indirect github.com/docker/distribution v2.8.0+incompatible // indirect @@ -94,6 +95,8 @@ require ( github.com/fatih/structs v1.1.0 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect github.com/fluxcd/pkg/apis/kustomize v0.5.0 // indirect + github.com/go-faster/city v1.0.1 // indirect + github.com/go-faster/errors v0.6.1 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect @@ -111,7 +114,7 @@ require ( github.com/gorilla/mux v1.8.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.1 // indirect github.com/hashicorp/go-retryablehttp v0.5.1 // indirect - github.com/hashicorp/go-version v1.4.0 // indirect + github.com/hashicorp/go-version v1.6.0 // indirect github.com/imdario/mergo v0.3.12 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect @@ -135,6 +138,8 @@ require ( github.com/opencontainers/image-spec v1.0.2 // indirect github.com/openshift/api v0.0.0-20200221181648-8ce0047d664f // indirect github.com/openzipkin/zipkin-go v0.4.0 // indirect + github.com/paulmach/orb v0.7.1 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -142,6 +147,7 @@ require ( github.com/prometheus/procfs v0.8.0 // indirect github.com/rs/xid v1.4.0 // indirect github.com/rs/zerolog v1.20.0 // indirect + github.com/segmentio/asm v1.2.0 // indirect github.com/shopspring/decimal v1.3.1 // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/spf13/pflag v1.0.5 // indirect @@ -153,11 +159,11 @@ require ( github.com/xdg-go/stringprep v1.0.3 // indirect github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect go.opentelemetry.io/otel/metric v0.31.0 // indirect - go.uber.org/atomic v1.9.0 // indirect - go.uber.org/multierr v1.7.0 // indirect + go.uber.org/atomic v1.10.0 // indirect + go.uber.org/multierr v1.8.0 // indirect golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b // indirect golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect - golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 // indirect + golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20220411224347-583f2d630306 // indirect diff --git a/go.sum b/go.sum index 45d3f459d..ba157345b 100644 --- a/go.sum +++ b/go.sum @@ -105,8 +105,10 @@ github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7 github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/ClickHouse/clickhouse-go v1.5.4 h1:cKjXeYLNWVJIx2J1K6H2CqyRmfwVJVY1OV1coaaFcI0= -github.com/ClickHouse/clickhouse-go v1.5.4/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI= +github.com/ClickHouse/ch-go v0.47.3 h1:bBKid8DRELKRf4/oXqrEks7Cc4DLb5Giwm9uazM6h3M= +github.com/ClickHouse/ch-go v0.47.3/go.mod h1:m3LHc5FeQ1Jjee5EEay5e7hQmSk4SuKyMfifNUz8l3g= +github.com/ClickHouse/clickhouse-go/v2 v2.3.0 h1:v0iT0yZspjjNgnLyPUa0WoGMme0Y/sNjCtOAFcyBkkA= +github.com/ClickHouse/clickhouse-go/v2 v2.3.0/go.mod h1:f2kb1LPopJdIyt0Y0vxNk9aiQCyhCmeVcyvOOaPCT4Q= github.com/DataDog/datadog-api-client-go/v2 v2.3.1 h1:+0FHme5n4AuJEGmzaN8+n3OWKFLiJoBP+FNI60EqvuU= github.com/DataDog/datadog-api-client-go/v2 v2.3.1/go.mod h1:98b/MtTwSAr/yhTfhCR1oxAqQ/4tMkdrgKH7fYiDA0g= github.com/DataDog/zstd v1.5.0 h1:+K/VEwIAaPcHiMtQvpLD4lqW7f0Gk3xdYZmI1hD+CXo= @@ -129,6 +131,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= +github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/andybalholm/cascadia v1.1.0 h1:BuuO6sSfQNFRu1LppgbD25Hr2vLYW25JvxHs5zzsLTo= github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y= github.com/andygrunwald/go-jira v1.16.0 h1:PU7C7Fkk5L96JvPc6vDVIrd99vdPnYudHu4ju2c2ikQ= @@ -140,8 +144,6 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/bkaradzic/go-lz4 v1.0.0 h1:RXc4wYsyz985CkXXeX04y4VnZFGG8Rd43pRaHsOXAKk= -github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -151,8 +153,6 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 h1:F1EaeKL/ta07PY/k9Os/UFtwERei2/XzGemhpGnBKNg= -github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= @@ -227,6 +227,10 @@ github.com/go-chi/cors v1.2.1 h1:xEC8UT3Rlp2QuWNEr4Fs/c2EAGVKBwy/1vHx3bppil4= github.com/go-chi/cors v1.2.1/go.mod h1:sSbTewc+6wYHBBCW7ytsFSn836hqM7JxpglAy2Vzc58= github.com/go-chi/render v1.0.2 h1:4ER/udB0+fMWB2Jlf15RV3F4A2FDuYi/9f+lFttR/Lg= github.com/go-chi/render v1.0.2/go.mod h1:/gr3hVkmYR0YlEy3LxCuVRFzEu9Ruok+gFqbIofjao0= +github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw= +github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw= +github.com/go-faster/errors v0.6.1 h1:nNIPOBkprlKzkThvS/0YaX8Zs9KewLCOSFQS5BU06FI= +github.com/go-faster/errors v0.6.1/go.mod h1:5MGV2/2T9yvlrbhe9pD9LO5Z/2zCSq2T8j+Jpi2LAyY= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -254,7 +258,6 @@ github.com/go-openapi/jsonreference v0.19.5/go.mod h1:RdybgQwPxbL4UEjuAruzK1x3nE github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= github.com/go-openapi/swag v0.21.1 h1:wm0rhTb5z7qpJRHBdPOMuY4QjVUMbF6/kwoYeRAOrKU= github.com/go-openapi/swag v0.21.1/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ= -github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -375,8 +378,8 @@ github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtng github.com/hashicorp/go-retryablehttp v0.5.1 h1:Vsx5XKPqPs3M6sM4U4GWyUqFS8aBiL9U5gkgvpkg4SE= github.com/hashicorp/go-retryablehttp v0.5.1/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= -github.com/hashicorp/go-version v1.4.0 h1:aAQzgqIrRKRa7w75CKpbBxYsmUoPjzVm1W59ca1L0J4= -github.com/hashicorp/go-version v1.4.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= +github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= @@ -392,7 +395,6 @@ github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/U github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= -github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= @@ -430,7 +432,6 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= -github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.10.6 h1:jbk+ZieJ0D7EVGJYpL9QTz7/YW6UHbmdnZWYyK5cdBs= github.com/lib/pq v1.10.6/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= @@ -438,7 +439,6 @@ github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= -github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 h1:I0XW9+e1XWDxdcEniV4rQAIOPUGDq67JSCiRCgGCZLI= github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= @@ -504,9 +504,12 @@ github.com/opsgenie/opsgenie-go-sdk-v2 v1.2.13 h1:nV98dkBpqaYbDnhefmOQ+Rn4hE+jD6 github.com/opsgenie/opsgenie-go-sdk-v2 v1.2.13/go.mod h1:4OjcxgwdXzezqytxN534MooNmrxRD50geWZxTD7845s= github.com/orlangure/gnomock v0.21.1 h1:ODD/okHK6l9rZw+VODexhJQRFmfGp6GyoIi9dGBcs7Q= github.com/orlangure/gnomock v0.21.1/go.mod h1:fwPi+PJan1wXILHQVlM6BrBB+jForjpREZ26Nozn7to= -github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= -github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= +github.com/paulmach/orb v0.7.1 h1:Zha++Z5OX/l168sqHK3k4z18LDvr+YAO/VjK0ReQ9rU= +github.com/paulmach/orb v0.7.1/go.mod h1:FWRlTgl88VI1RBx/MkrwWDRhQ96ctqMCh8boXhmqB/A= +github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 h1:Qj1ukM4GlMWXNdMBuXcXfz/Kw9s1qm0CLY32QxuSImI= github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4/go.mod h1:N6UoU20jOqggOuDwUaBQpluzLNDqif3kq9z2wpdYEfQ= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -551,6 +554,8 @@ github.com/rs/zerolog v1.20.0 h1:38k9hgtUBdxFwE34yS8rTHmHBa4eN16E4DJlv177LNs= github.com/rs/zerolog v1.20.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJRjo= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= +github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= @@ -637,11 +642,11 @@ go.opentelemetry.io/otel/trace v1.9.0 h1:oZaCNJUjWcg60VXWee8lJKlqhPbXAPB51URuR47 go.opentelemetry.io/otel/trace v1.9.0/go.mod h1:2737Q0MuG8q1uILYm2YYVkAyLtOofiTNGg6VODnOiPo= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= -go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= +go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= -go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec= -go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= +go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= +go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY= go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -861,8 +866,9 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab h1:2QkjZIsXupsJbJIdSjjUOgWK3aEtzyuh2mPt3l/CkeU= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= diff --git a/plugins/plugin-istio/pkg/instance/instance.go b/plugins/plugin-istio/pkg/instance/instance.go index 359939b69..b586f5455 100644 --- a/plugins/plugin-istio/pkg/instance/instance.go +++ b/plugins/plugin-istio/pkg/instance/instance.go @@ -315,13 +315,13 @@ func (i *instance) GetTopology(ctx context.Context, namespace, application strin func (i *instance) Tap(ctx context.Context, namespace, application, filterUpstreamCluster, filterMethod, filterPath string, timeStart int64, timeEnd int64) ([]map[string]any, error) { var filters string if filterUpstreamCluster != "" { - filters = filters + fmt.Sprintf(" _and_ content.upstream_cluster~'%s'", filterUpstreamCluster) + filters = filters + fmt.Sprintf(" _and_ content_upstream_cluster~'%s'", filterUpstreamCluster) } if filterMethod != "" { - filters = filters + fmt.Sprintf(" _and_ content.method~'%s'", filterMethod) + filters = filters + fmt.Sprintf(" _and_ content_method~'%s'", filterMethod) } if filterPath != "" { - filters = filters + fmt.Sprintf(" _and_ content.path~'%s'", filterPath) + filters = filters + fmt.Sprintf(" _and_ content_path~'%s'", filterPath) } logs, _, _, _, _, err := i.klogs.GetLogs(ctx, fmt.Sprintf("namespace='%s' _and_ app='%s' _and_ container_name='istio-proxy' %s", namespace, application, filters), "", "", 100, timeStart, timeEnd) @@ -336,31 +336,31 @@ func (i *instance) Tap(ctx context.Context, namespace, application, filterUpstre func (i *instance) Top(ctx context.Context, namespace, application, filterUpstreamCluster, filterMethod, filterPath, sortBy, sortDirection string, timeStart int64, timeEnd int64) ([][]any, error) { var filters string if filterUpstreamCluster != "" { - filters = filters + fmt.Sprintf(" AND match(fields_string.value[indexOf(fields_string.key, 'content.upstream_cluster')], '%s')", filterUpstreamCluster) + filters = filters + fmt.Sprintf(" AND match(fields_string['content_upstream_cluster'], '%s')", filterUpstreamCluster) } if filterMethod != "" { - filters = filters + fmt.Sprintf(" AND match(fields_string.value[indexOf(fields_string.key, 'content.method')], '%s')", filterMethod) + filters = filters + fmt.Sprintf(" AND match(fields_string['content_method'], '%s')", filterMethod) } if filterPath != "" { - filters = filters + fmt.Sprintf(" AND match(fields_string.value[indexOf(fields_string.key, 'content.path')], '%s')", filterPath) + filters = filters + fmt.Sprintf(" AND match(fields_string['content_path'], '%s')", filterPath) } rows, _, err := i.klogs.GetRawQueryResults(ctx, fmt.Sprintf(`SELECT - fields_string.value[indexOf(fields_string.key, 'content.upstream_cluster')] as upstream, - fields_string.value[indexOf(fields_string.key, 'content.method')] as method, - path(fields_string.value[indexOf(fields_string.key, 'content.path')]) as path, + fields_string['content_upstream_cluster'] as upstream, + fields_string['content_method'] as method, + path(fields_string['content_path']) as path, count(*) as count, - min(fields_number.value[indexOf(fields_number.key, 'content.duration')]) as min, - max(fields_number.value[indexOf(fields_number.key, 'content.duration')]) as max, - avg(fields_number.value[indexOf(fields_number.key, 'content.duration')]) as avg, - anyLast(fields_number.value[indexOf(fields_number.key, 'content.duration')]) as last, - countIf(fields_number.value[indexOf(fields_number.key, 'content.response_code')] < 500) / count * 100 as sr + min(fields_number['content_duration']) as min, + max(fields_number['content_duration']) as max, + avg(fields_number['content_duration']) as avg, + anyLast(fields_number['content_duration']) as last, + countIf(fields_number['content_response_code'] < 500) / count * 100 as sr FROM logs.logs WHERE timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d) AND namespace = '%s' AND app = '%s' AND container_name = 'istio-proxy' %s GROUP BY - fields_string.value[indexOf(fields_string.key, 'content.upstream_cluster')], - fields_string.value[indexOf(fields_string.key, 'content.method')], - path(fields_string.value[indexOf(fields_string.key, 'content.path')]) + fields_string['content_upstream_cluster'], + fields_string['content_method'], + path(fields_string['content_path']) ORDER BY %s %s LIMIT 100 SETTINGS skip_unavailable_shards = 1`, timeStart, timeEnd, namespace, application, filters, sortBy, sortDirection)) @@ -377,22 +377,22 @@ func (i *instance) TopDetails(ctx context.Context, namespace, application, upstr filters := fmt.Sprintf(" AND namespace = '%s' AND app = '%s' AND container_name = 'istio-proxy'", namespace, application) if upstreamCluster != "" { - filters = filters + fmt.Sprintf(" AND fields_string.value[indexOf(fields_string.key, 'content.upstream_cluster')] = '%s'", upstreamCluster) + filters = filters + fmt.Sprintf(" AND fields_string['content_upstream_cluster'] = '%s'", upstreamCluster) } if method != "" { - filters = filters + fmt.Sprintf(" AND fields_string.value[indexOf(fields_string.key, 'content.method')] = '%s'", method) + filters = filters + fmt.Sprintf(" AND fields_string['content_method'] = '%s'", method) } if path != "" { - filters = filters + fmt.Sprintf(" AND path(fields_string.value[indexOf(fields_string.key, 'content.path')]) = '%s'", path) + filters = filters + fmt.Sprintf(" AND path(fields_string['content_path']) = '%s'", path) } rows, _, err := i.klogs.GetRawQueryResults(ctx, fmt.Sprintf(`SELECT toStartOfInterval(timestamp, INTERVAL %d second) AS interval_data, count(*) AS count_data, - countIf(fields_number.value[indexOf(fields_number.key, 'content.response_code')] < 500) / count_data * 100 as sr_data, - quantile(0.5)(fields_number.value[indexOf(fields_number.key, 'content.duration')]) as p50_data, - quantile(0.9)(fields_number.value[indexOf(fields_number.key, 'content.duration')]) as p90_data, - quantile(0.99)(fields_number.value[indexOf(fields_number.key, 'content.duration')]) as p99_data + countIf(fields_number['content_response_code'] < 500) / count_data * 100 as sr_data, + quantile(0.5)(fields_number['content_duration']) as p50_data, + quantile(0.9)(fields_number['content_duration']) as p90_data, + quantile(0.99)(fields_number['content_duration']) as p99_data FROM logs.logs WHERE timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d) %s GROUP BY interval_data diff --git a/plugins/plugin-istio/src/components/panel/Tap.tsx b/plugins/plugin-istio/src/components/panel/Tap.tsx index 2c4a622bc..f4bcf50a5 100644 --- a/plugins/plugin-istio/src/components/panel/Tap.tsx +++ b/plugins/plugin-istio/src/components/panel/Tap.tsx @@ -147,23 +147,23 @@ const Tap: React.FunctionComponent = ({ onClick={setDetails ? (): void => handleRowClick(index, line) : undefined} > - {line.hasOwnProperty('content.upstream_cluster') ? getDirection(line['content.upstream_cluster']) : '-'} + {line.hasOwnProperty('content_upstream_cluster') ? getDirection(line['content_upstream_cluster']) : '-'} - {line.hasOwnProperty('content.upstream_cluster') ? line['content.upstream_cluster'] : '-'} + {line.hasOwnProperty('content_upstream_cluster') ? line['content_upstream_cluster'] : '-'} - {line.hasOwnProperty('content.method') ? line['content.method'] : '-'} + {line.hasOwnProperty('content_method') ? line['content_method'] : '-'} - {line.hasOwnProperty('content.path') ? line['content.path'] : '-'} + {line.hasOwnProperty('content_path') ? line['content_path'] : '-'} - {line.hasOwnProperty('content.duration') ? `${line['content.duration']} ms` : '-'} + {line.hasOwnProperty('content_duration') ? `${line['content_duration']} ms` : '-'} - {line.hasOwnProperty('content.response_code') ? line['content.response_code'] : '-'} + {line.hasOwnProperty('content_response_code') ? line['content_response_code'] : '-'} - {line.hasOwnProperty('content.grpc_status') ? line['content.grpc_status'] : '-'} + {line.hasOwnProperty('content_grpc_status') ? line['content_grpc_status'] : '-'} ))} diff --git a/plugins/plugin-istio/src/components/panel/details/DetailsTap.tsx b/plugins/plugin-istio/src/components/panel/details/DetailsTap.tsx index 81185ab7a..5782885b9 100644 --- a/plugins/plugin-istio/src/components/panel/details/DetailsTap.tsx +++ b/plugins/plugin-istio/src/components/panel/details/DetailsTap.tsx @@ -25,8 +25,8 @@ const DetailsTap: React.FunctionComponent = ({ line, close }: {`${ - line.hasOwnProperty('content.upstream_cluster') ? getDirection(line['content.upstream_cluster']) : '-' - }: ${line.hasOwnProperty('content.authority') ? line['content.authority'] : '-'}`} + line.hasOwnProperty('content_upstream_cluster') ? getDirection(line['content_upstream_cluster']) : '-' + }: ${line.hasOwnProperty('content_authority') ? line['content_authority'] : '-'}`} <span className="pf-u-pl-sm pf-u-font-size-sm pf-u-color-400">{formatTime(line['timestamp'])}</span> diff --git a/plugins/plugin-klogs/cmd/klogs_test.go b/plugins/plugin-klogs/cmd/klogs_test.go deleted file mode 100644 index 1c69bf9f7..000000000 --- a/plugins/plugin-klogs/cmd/klogs_test.go +++ /dev/null @@ -1,36 +0,0 @@ -package klogs - -import ( - "testing" - - "github.com/kobsio/kobs/pkg/satellite/plugins/plugin" - "github.com/kobsio/kobs/plugins/plugin-klogs/pkg/instance" - - "github.com/go-chi/chi/v5" - "github.com/stretchr/testify/require" -) - -func TestGetInstance(t *testing.T) { - mockInstance := &instance.MockInstance{} - mockInstance.On("GetName").Return("klogs") - - router := Router{chi.NewRouter(), []instance.Instance{mockInstance}} - instance1 := router.getInstance("default") - require.NotNil(t, instance1) - - instance2 := router.getInstance("klogs") - require.NotNil(t, instance2) - - instance3 := router.getInstance("invalidname") - require.Nil(t, instance3) -} - -func TestMount(t *testing.T) { - router1, err := Mount([]plugin.Instance{{Name: "klogs", Options: map[string]any{}}}, nil) - require.NoError(t, err) - require.NotNil(t, router1) - - router2, err := Mount([]plugin.Instance{{Name: "klogs", Options: map[string]any{"username": []string{"token"}}}}, nil) - require.Error(t, err) - require.Nil(t, router2) -} diff --git a/plugins/plugin-klogs/pkg/instance/aggregation.go b/plugins/plugin-klogs/pkg/instance/aggregation.go index e925c75de..f8f1401d3 100644 --- a/plugins/plugin-klogs/pkg/instance/aggregation.go +++ b/plugins/plugin-klogs/pkg/instance/aggregation.go @@ -57,16 +57,16 @@ func generateFieldName(fieldName string, materializedColumns []string, customFie } if mustNumber { - return fmt.Sprintf("fields_number.value[indexOf(fields_number.key, '%s')]", fieldName) + return fmt.Sprintf("fields_number['%s']", fieldName) } for _, field := range customFields.Number { if field == fieldName { - return fmt.Sprintf("fields_number.value[indexOf(fields_number.key, '%s')]", fieldName) + return fmt.Sprintf("fields_number['%s']", fieldName) } } - return fmt.Sprintf("fields_string.value[indexOf(fields_string.key, '%s')]", fieldName) + return fmt.Sprintf("fields_string['%s']", fieldName) } // getOrderBy returns the SQL keyword for the user defined order in the aggregation. diff --git a/plugins/plugin-klogs/pkg/instance/aggregation_test.go b/plugins/plugin-klogs/pkg/instance/aggregation_test.go index 0d48077e8..b8fab9628 100644 --- a/plugins/plugin-klogs/pkg/instance/aggregation_test.go +++ b/plugins/plugin-klogs/pkg/instance/aggregation_test.go @@ -14,12 +14,12 @@ func TestGenerateFieldName(t *testing.T) { }{ {field: "namespace", mustNumber: false, expect: "namespace"}, {field: "namespace", mustNumber: false, expect: "namespace"}, - {field: "content.method", mustNumber: false, expect: "fields_string.value[indexOf(fields_string.key, 'content.method')]"}, - {field: "content.duration", mustNumber: true, expect: "fields_number.value[indexOf(fields_number.key, 'content.duration')]"}, - {field: "content.duration", mustNumber: false, expect: "fields_number.value[indexOf(fields_number.key, 'content.duration')]"}, + {field: "content_method", mustNumber: false, expect: "fields_string['content_method']"}, + {field: "content_duration", mustNumber: true, expect: "fields_number['content_duration']"}, + {field: "content_duration", mustNumber: false, expect: "fields_number['content_duration']"}, } { t.Run(tt.field, func(t *testing.T) { - actual := generateFieldName(tt.field, nil, Fields{String: nil, Number: []string{"content.duration"}}, tt.mustNumber) + actual := generateFieldName(tt.field, nil, Fields{String: nil, Number: []string{"content_duration"}}, tt.mustNumber) require.Equal(t, tt.expect, actual) }) } diff --git a/plugins/plugin-klogs/pkg/instance/helpers.go b/plugins/plugin-klogs/pkg/instance/helpers.go index e0878a008..c16ede49c 100644 --- a/plugins/plugin-klogs/pkg/instance/helpers.go +++ b/plugins/plugin-klogs/pkg/instance/helpers.go @@ -1,5 +1,231 @@ package instance +import ( + "fmt" + "strings" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + defaultFields = []string{"timestamp", "cluster", "namespace", "app", "pod_name", "container_name", "host", "log"} + defaultColumns = "timestamp, cluster, namespace, app, pod_name, container_name, host, fields_string, fields_number, log" + + fieldsMetric = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "kobs", + Name: "klogs_fields_total", + Help: "Number how often a field was used in a query.", + }, []string{"field"}) +) + +// parseLogsQuery parses the given query string and return the conditions for the where statement in the sql query. We +// are providing a very simple query language where the user can use "(", ")", "_not_", "_and_" and "_or_" operators. +// Then we are splitting the string again for the other operators "=", "!=", ">", ">=", "<", "<=" and "~" which are used +// to check the value of a field. +// Once we have build all the conditions we concate all the strings to the final sql statement for the where clause. +func parseLogsQuery(query string, materializedColumns []string) (string, error) { + var newOpenBrackets []string + openBrackets := strings.Split(query, "(") + for _, openBracket := range openBrackets { + var newCloseBrackets []string + closeBrackets := strings.Split(openBracket, ")") + for _, closeBracket := range closeBrackets { + var newNots []string + nots := strings.Split(closeBracket, "_not_") + for _, not := range nots { + var newAnds []string + ands := strings.Split(not, "_and_") + for _, and := range ands { + var newOrs []string + ors := strings.Split(and, "_or_") + for _, or := range ors { + condition, err := splitOperator(or, materializedColumns) + if err != nil { + return "", err + } + + newOrs = append(newOrs, condition) + } + newAnds = append(newAnds, strings.Join(newOrs, " OR ")) + } + newNots = append(newNots, strings.Join(newAnds, " AND ")) + } + newCloseBrackets = append(newCloseBrackets, strings.Join(newNots, " NOT ")) + } + newOpenBrackets = append(newOpenBrackets, strings.Join(newCloseBrackets, ")")) + } + + return strings.Join(newOpenBrackets, "("), nil +} + +// splitOperator splits the given string by the following operators "=", "!=", ">", ">=", "<", "<=" and "~". If the +// result is a slice with two items we found the operator which was used by the user to check the value of a field. So +// that we pass the key (first item), value (second item) and the operator to the handleConditionParts to build the +// where condition. +func splitOperator(condition string, materializedColumns []string) (string, error) { + greaterThanOrEqual := strings.Split(condition, ">=") + if len(greaterThanOrEqual) == 2 { + return handleConditionParts(greaterThanOrEqual[0], greaterThanOrEqual[1], ">=", materializedColumns) + } + + greaterThan := strings.Split(condition, ">") + if len(greaterThan) == 2 { + return handleConditionParts(greaterThan[0], greaterThan[1], ">", materializedColumns) + } + + lessThanOrEqual := strings.Split(condition, "<=") + if len(lessThanOrEqual) == 2 { + return handleConditionParts(lessThanOrEqual[0], lessThanOrEqual[1], "<=", materializedColumns) + } + + lessThan := strings.Split(condition, "<") + if len(lessThan) == 2 { + return handleConditionParts(lessThan[0], lessThan[1], "<", materializedColumns) + } + + ilike := strings.Split(condition, "=~") + if len(ilike) == 2 { + return handleConditionParts(ilike[0], ilike[1], "=~", materializedColumns) + } + + notEqual := strings.Split(condition, "!=") + if len(notEqual) == 2 { + return handleConditionParts(notEqual[0], notEqual[1], "!=", materializedColumns) + } + + notIlike := strings.Split(condition, "!~") + if len(notIlike) == 2 { + return handleConditionParts(notIlike[0], notIlike[1], "!~", materializedColumns) + } + + regex := strings.Split(condition, "~") + if len(regex) == 2 { + return handleConditionParts(regex[0], regex[1], "~", materializedColumns) + } + + equal := strings.Split(condition, "=") + if len(equal) == 2 { + return handleConditionParts(equal[0], equal[1], "=", materializedColumns) + } + + if strings.Contains(condition, "_exists_ ") { + return handleExistsCondition(strings.TrimLeft(strings.TrimSpace(condition), "_exists_ "), materializedColumns), nil + } + + if strings.TrimSpace(condition) == "" { + return "", nil + } + + return "", fmt.Errorf("invalid operator: %s", condition) +} + +// handleConditionParts converts the given key, value and operator to it's sql representation. This is required because +// some fields like "timestamp", "cluster", "namespace", etc. are a seperate column in the sql table, where others like +// "content_level" or "content_response_code" are only available via the fields_strings / fields_numbers column. For +// these nested columns we have to use a special query syntax. We also have to use the match function when the operator +// is "~" which says that the user checks the field value against a regular expression. +// +// See: https://gist.github.com/alexey-milovidov/d6ffc9e0bc0bc72dd7bca90e76e3b83b +// See: https://clickhouse.tech/docs/en/sql-reference/functions/string-search-functions/#matchhaystack-pattern +func handleConditionParts(key, value, operator string, materializedColumns []string) (string, error) { + key = strings.TrimSpace(key) + value = strings.TrimSpace(value) + + // The kobs_klogs_fields_total metric can be used to determine how often a field is used. This information can + // then be used to create an additional column for this field via the following SQL commands: + // ALTER TABLE logs.logs ON CLUSTER '{cluster}' ADD COLUMN String DEFAULT fields_string['']; + // ALTER TABLE logs.logs ON CLUSTER '{cluster}' ADD COLUMN Float64 DEFAULT fields_number['']; + fieldsMetric.WithLabelValues(key).Inc() + + if contains(defaultFields, key) || contains(materializedColumns, key) { + if operator == "=~" { + return fmt.Sprintf("%s ILIKE %s", key, value), nil + } + + if operator == "!~" { + return fmt.Sprintf("%s NOT ILIKE %s", key, value), nil + } + + if operator == "~" { + return fmt.Sprintf("match(%s, %s)", key, value), nil + } + + return fmt.Sprintf("%s%s%s", key, operator, value), nil + } + + if value != "" && string(value[0]) == "'" && string(value[len(value)-1]) == "'" { + if operator == "=~" { + return fmt.Sprintf("fields_string['%s'] ILIKE %s", key, value), nil + } + + if operator == "!~" { + return fmt.Sprintf("fields_string['%s'] NOT ILIKE %s", key, value), nil + } + + if operator == "~" { + return fmt.Sprintf("match(fields_string['%s'], %s)", key, value), nil + } + + return fmt.Sprintf("fields_string['%s'] %s %s", key, operator, value), nil + } + + if operator == "=~" { + return fmt.Sprintf("fields_number['%s'] ILIKE %s", key, value), nil + } + + if operator == "!~" { + return fmt.Sprintf("fields_number['%s'] NOT ILIKE %s", key, value), nil + } + + if operator == "~" { + return fmt.Sprintf("match(fields_number['%s'], %s)", key, value), nil + } + + return fmt.Sprintf("fields_number['%s'] %s %s", key, operator, value), nil +} + +func handleExistsCondition(key string, materializedColumns []string) string { + if contains(defaultFields, key) || contains(materializedColumns, key) { + return fmt.Sprintf("%s IS NOT NULL", key) + } + + return fmt.Sprintf("(mapContains(fields_string, '%s') = 1 OR mapContains(fields_number, '%s') = 1)", key, key) +} + +func parseOrder(order, orderBy string, materializedColumns []string) string { + if order == "" || orderBy == "" { + return "timestamp DESC" + } + + if order == "ascending" { + order = "ASC" + } else { + order = "DESC" + } + + orderBy = strings.TrimSpace(orderBy) + if contains(defaultFields, orderBy) || contains(materializedColumns, orderBy) { + return fmt.Sprintf("%s %s", orderBy, order) + } + + return fmt.Sprintf("fields_string['%s'] %s, fields_number['%s'] %s", orderBy, order, orderBy, order) +} + +// getBucketTimes determines the start and end time of an bucket. This is necessary, because the first and last bucket +// time can be outside of the user defined time range. +func getBucketTimes(interval, bucketTime, timeStart, timeEnd int64) (int64, int64) { + if bucketTime < timeStart { + return timeStart, timeStart + interval - (timeStart - bucketTime) + } + + if bucketTime+interval > timeEnd { + return bucketTime, bucketTime + timeEnd - bucketTime + } + + return bucketTime, bucketTime + interval +} + // appendIfMissing appends a value to a slice, when this values doesn't exist in the slice already. func appendIfMissing(items []string, item string) []string { for _, ele := range items { diff --git a/plugins/plugin-klogs/pkg/instance/helpers_test.go b/plugins/plugin-klogs/pkg/instance/helpers_test.go index 0e4741372..917ccd579 100644 --- a/plugins/plugin-klogs/pkg/instance/helpers_test.go +++ b/plugins/plugin-klogs/pkg/instance/helpers_test.go @@ -1,11 +1,149 @@ package instance import ( + "fmt" "testing" "github.com/stretchr/testify/require" ) +func TestParseLogsQuery(t *testing.T) { + for _, tt := range []struct { + query string + where string + isInvalid bool + }{ + {query: "cluster = 'foo' _and_ namespace = 'bar'", where: "cluster='foo' AND namespace='bar'", isInvalid: false}, + {query: "cluster = 'foo' _and_ (namespace='hello' _or_ namespace='world')", where: "cluster='foo' AND (namespace='hello' OR namespace='world')", isInvalid: false}, + {query: "kubernetes_label_foo = 'bar'", where: "fields_string['kubernetes_label_foo'] = 'bar'", isInvalid: false}, + {query: "kubernetes_label_foo_bar =~ '\\%hellow\\%world\\%'", where: "fields_string['kubernetes_label_foo_bar'] ILIKE '\\%hellow\\%world\\%'", isInvalid: false}, + {query: "kubernetes_label_foo_bar ~ 'hello.*'", where: "match(fields_string['kubernetes_label_foo_bar'], 'hello.*')", isInvalid: false}, + {query: "kubernetes_label_foo_bar / 'hello.*'", isInvalid: true}, + } { + t.Run(tt.query, func(t *testing.T) { + parsedWhere, err := parseLogsQuery(tt.query, nil) + if tt.isInvalid { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, tt.where, parsedWhere) + } + }) + } +} + +func TestSplitOperator(t *testing.T) { + for _, tt := range []struct { + query string + expectedCondition string + isInvalid bool + }{ + {query: "cluster >= 'foo'", expectedCondition: "cluster>='foo'", isInvalid: false}, + {query: "cluster > 'foo'", expectedCondition: "cluster>'foo'", isInvalid: false}, + {query: "cluster <= 'foo'", expectedCondition: "cluster<='foo'", isInvalid: false}, + {query: "cluster < 'foo'", expectedCondition: "cluster<'foo'", isInvalid: false}, + {query: "cluster =~ 'foo'", expectedCondition: "cluster ILIKE 'foo'", isInvalid: false}, + {query: "cluster != 'foo'", expectedCondition: "cluster!='foo'", isInvalid: false}, + {query: "cluster !~ 'foo'", expectedCondition: "cluster NOT ILIKE 'foo'", isInvalid: false}, + {query: "cluster ~ 'foo'", expectedCondition: "match(cluster, 'foo')", isInvalid: false}, + {query: "cluster = 'foo'", expectedCondition: "cluster='foo'", isInvalid: false}, + {query: "_exists_ cluster", expectedCondition: "cluster IS NOT NULL", isInvalid: false}, + {query: " ", expectedCondition: "", isInvalid: false}, + {query: "cluster / 'foo'", isInvalid: true}, + } { + t.Run(tt.query, func(t *testing.T) { + actualCondition, err := splitOperator(tt.query, nil) + if tt.isInvalid { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, tt.expectedCondition, actualCondition) + } + }) + } +} + +func TestHandleConditionParts(t *testing.T) { + for _, tt := range []struct { + key string + value string + operator string + expectedCondition string + }{ + {key: "cluster", value: "'foobar'", operator: "=~", expectedCondition: "cluster ILIKE 'foobar'"}, + {key: "cluster", value: "'foobar'", operator: "!~", expectedCondition: "cluster NOT ILIKE 'foobar'"}, + {key: "cluster", value: "'foobar'", operator: "~", expectedCondition: "match(cluster, 'foobar')"}, + {key: "cluster", value: "'foobar'", operator: "=", expectedCondition: "cluster='foobar'"}, + {key: "helloworld", value: "'foobar'", operator: "=~", expectedCondition: "fields_string['helloworld'] ILIKE 'foobar'"}, + {key: "helloworld", value: "'foobar'", operator: "!~", expectedCondition: "fields_string['helloworld'] NOT ILIKE 'foobar'"}, + {key: "helloworld", value: "'foobar'", operator: "~", expectedCondition: "match(fields_string['helloworld'], 'foobar')"}, + {key: "helloworld", value: "'foobar'", operator: "=", expectedCondition: "fields_string['helloworld'] = 'foobar'"}, + {key: "helloworld", value: "42", operator: "=~", expectedCondition: "fields_number['helloworld'] ILIKE 42"}, + {key: "helloworld", value: "42", operator: "!~", expectedCondition: "fields_number['helloworld'] NOT ILIKE 42"}, + {key: "helloworld", value: "42", operator: "~", expectedCondition: "match(fields_number['helloworld'], 42)"}, + {key: "helloworld", value: "42", operator: "=", expectedCondition: "fields_number['helloworld'] = 42"}, + } { + t.Run(tt.key, func(t *testing.T) { + actualCondition, _ := handleConditionParts(tt.key, tt.value, tt.operator, nil) + require.Equal(t, tt.expectedCondition, actualCondition) + }) + } +} + +func TestHandleExistsCondition(t *testing.T) { + for _, tt := range []struct { + key string + expectedCondition string + }{ + {key: "cluster", expectedCondition: "cluster IS NOT NULL"}, + {key: "foobar", expectedCondition: "(mapContains(fields_string, 'foobar') = 1 OR mapContains(fields_number, 'foobar') = 1)"}, + } { + t.Run(tt.key, func(t *testing.T) { + actualCondition := handleExistsCondition(tt.key, nil) + require.Equal(t, tt.expectedCondition, actualCondition) + }) + } +} + +func TestParseOrder(t *testing.T) { + for _, tt := range []struct { + order string + orderBy string + expectedCondition string + }{ + {order: "", orderBy: "", expectedCondition: "timestamp DESC"}, + {order: "ascending", orderBy: "cluster", expectedCondition: "cluster ASC"}, + {order: "descending", orderBy: "cluster", expectedCondition: "cluster DESC"}, + {order: "ascending", orderBy: "foobar", expectedCondition: "fields_string['foobar'] ASC, fields_number['foobar'] ASC"}, + } { + t.Run(tt.order+tt.orderBy, func(t *testing.T) { + actualCondition := parseOrder(tt.order, tt.orderBy, nil) + require.Equal(t, tt.expectedCondition, actualCondition) + }) + } +} + +func TestGetInterval(t *testing.T) { + for _, tt := range []struct { + interval int64 + bucketTime int64 + timeStart int64 + timeEnd int64 + expectedTimeStart int64 + expectedTimeEnd int64 + }{ + {interval: 124, bucketTime: 1640188920, timeStart: 1640189016, timeEnd: 1640192745, expectedTimeStart: 1640189016, expectedTimeEnd: 1640189044}, + {interval: 124, bucketTime: 1640190780, timeStart: 1640189016, timeEnd: 1640192745, expectedTimeStart: 1640190780, expectedTimeEnd: 1640190904}, + {interval: 124, bucketTime: 1640192640, timeStart: 1640189016, timeEnd: 1640192745, expectedTimeStart: 1640192640, expectedTimeEnd: 1640192745}, + } { + t.Run(fmt.Sprintf("%d", tt.bucketTime), func(t *testing.T) { + actualTimeStart, actualTimeEnd := getBucketTimes(tt.interval, tt.bucketTime, tt.timeStart, tt.timeEnd) + require.Equal(t, tt.expectedTimeStart, actualTimeStart) + require.Equal(t, tt.expectedTimeEnd, actualTimeEnd) + }) + } +} + func TestAppendIfMissing(t *testing.T) { items := []string{"foo", "bar"} diff --git a/plugins/plugin-klogs/pkg/instance/instance.go b/plugins/plugin-klogs/pkg/instance/instance.go index 7c6e6f4ee..c8edcdb7e 100644 --- a/plugins/plugin-klogs/pkg/instance/instance.go +++ b/plugins/plugin-klogs/pkg/instance/instance.go @@ -4,13 +4,12 @@ import ( "context" "database/sql" "fmt" - "sort" "strings" "time" "github.com/kobsio/kobs/pkg/log" - _ "github.com/ClickHouse/clickhouse-go" + "github.com/ClickHouse/clickhouse-go/v2" "github.com/mitchellh/mapstructure" "go.uber.org/zap" ) @@ -21,8 +20,10 @@ type Config struct { Database string `json:"database"` Username string `json:"username"` Password string `json:"password"` - WriteTimeout string `json:"writeTimeout"` - ReadTimeout string `json:"readTimeout"` + DialTimeout string `json:"dialTimeout"` + ConnMaxLifetime string `json:"connMaxLifetime"` + MaxIdleConns int `json:"maxIdleConns"` + MaxOpenConns int `json:"maxOpenConns"` MaterializedColumns []string `json:"materializedColumns"` } @@ -54,16 +55,16 @@ func (i *instance) getFields(ctx context.Context) (Fields, error) { now := time.Now().Unix() for _, fieldType := range []string{"string", "number"} { - rowsFieldsString, err := i.client.QueryContext(ctx, fmt.Sprintf("SELECT DISTINCT arrayJoin(fields_%s.key) FROM %s.logs WHERE timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d) SETTINGS skip_unavailable_shards = 1", fieldType, i.database, now-3600, now)) + rowsFieldKeys, err := i.client.QueryContext(ctx, fmt.Sprintf("SELECT DISTINCT arrayJoin(mapKeys(fields_%s)) FROM %s.logs WHERE timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d) SETTINGS skip_unavailable_shards = 1", fieldType, i.database, now-3600, now)) if err != nil { return fields, err } - defer rowsFieldsString.Close() + defer rowsFieldKeys.Close() - for rowsFieldsString.Next() { + for rowsFieldKeys.Next() { var field string - if err := rowsFieldsString.Scan(&field); err != nil { + if err := rowsFieldKeys.Scan(&field); err != nil { return fields, err } @@ -74,7 +75,7 @@ func (i *instance) getFields(ctx context.Context) (Fields, error) { } } - if err := rowsFieldsString.Err(); err != nil { + if err := rowsFieldKeys.Err(); err != nil { return fields, err } } @@ -123,7 +124,7 @@ func (i *instance) refreshCachedFields() []string { } } -// GetFields returns all cahced fields which are containing the filter term. The cached fields are refreshed every 24. +// GetFields returns all cached fields which are containing the filter term. The cached fields are refreshed every 24. func (i *instance) GetFields(filter string, fieldType string) []string { var fields []string @@ -148,199 +149,6 @@ func (i *instance) GetFields(filter string, fieldType string) []string { return fields } -// GetLogs parses the given query into the sql syntax, which is then run against the ClickHouse instance. The returned -// rows are converted into a document schema which can be used by our UI. -func (i *instance) GetLogs(ctx context.Context, query, order, orderBy string, limit, timeStart, timeEnd int64) ([]map[string]any, []string, int64, int64, []Bucket, error) { - var count int64 - var buckets []Bucket - var documents []map[string]any - var timeConditions string - var interval int64 - - fields := defaultFields - queryStartTime := time.Now() - - // When the user provides a query, we have to build the additional conditions for the sql query. This is done via - // the parseLogsQuery which is responsible for parsing our simple query language and returning the corresponding - // where statement. These conditions are the added as additional AND to our sql query. - conditions := "" - if query != "" { - parsedQuery, err := parseLogsQuery(query, i.materializedColumns) - if err != nil { - return nil, nil, 0, 0, nil, err - } - - conditions = fmt.Sprintf("AND %s", parsedQuery) - } - - parsedOrder := parseOrder(order, orderBy, i.materializedColumns) - - // We check that the time range if not 0 or lower then 0, because this would mean that the end time is equal to the - // start time or before the start time, which results in an error for the following SQL queries. - if timeEnd-timeStart <= 0 { - return nil, nil, 0, 0, nil, fmt.Errorf("invalid time range") - } - - // We have to define the interval for the selected time range. By default we are creating 30 buckets in the - // following SQL query, but for time ranges with less then 30 seconds we have to create less buckets. - switch seconds := timeEnd - timeStart; { - case seconds <= 2: - interval = (timeEnd - timeStart) / 1 - case seconds <= 10: - interval = (timeEnd - timeStart) / 5 - case seconds <= 30: - interval = (timeEnd - timeStart) / 10 - default: - interval = (timeEnd - timeStart) / 30 - } - - // Now we are creating 30 buckets for the selected time range and count the documents in each bucket. This is used - // to render the distribution chart, which shows how many documents/rows are available within a bucket. - sqlQueryBuckets := fmt.Sprintf(`SELECT toStartOfInterval(timestamp, INTERVAL %d second) AS interval_data, count(*) AS count_data FROM %s.logs WHERE timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d) %s GROUP BY interval_data ORDER BY interval_data WITH FILL FROM toStartOfInterval(FROM_UNIXTIME(%d), INTERVAL %d second) TO toStartOfInterval(FROM_UNIXTIME(%d), INTERVAL %d second) STEP %d SETTINGS skip_unavailable_shards = 1`, interval, i.database, timeStart, timeEnd, conditions, timeStart, interval, timeEnd, interval, interval) - log.Debug(ctx, "SQL query buckets", zap.String("query", sqlQueryBuckets)) - rowsBuckets, err := i.client.QueryContext(ctx, sqlQueryBuckets) - if err != nil { - return nil, nil, 0, 0, nil, err - } - defer rowsBuckets.Close() - - for rowsBuckets.Next() { - var intervalData time.Time - var countData int64 - - if err := rowsBuckets.Scan(&intervalData, &countData); err != nil { - return nil, nil, 0, 0, nil, err - } - - buckets = append(buckets, Bucket{ - Interval: intervalData.Unix(), - Count: countData, - }) - } - - if err := rowsBuckets.Err(); err != nil { - return nil, nil, 0, 0, nil, err - } - - sort.Slice(buckets, func(i, j int) bool { - return buckets[i].Interval < buckets[j].Interval - }) - - // To optimize the query to get the raw logs we are creating a new time condition for the where statement. In that - // way we only have to look into the buckets which are containing some documents only have to include the first N - // buckets until the limit is reached. - // When provided a custom order (not "timestamp DESC") we can also optimize the search based on the limit when the - // user wants to sort the returned documents via "timestamp ASC". For all other order conditions we can only check - // if the bucket contains some documents, but we can not optimize the results based on the limit. - if parsedOrder == "timestamp DESC" { - for i := len(buckets) - 1; i >= 0; i-- { - if count < limit && buckets[i].Count > 0 { - bucketTimeStart, bucketTimeEnd := getBucketTimes(interval, buckets[i].Interval, timeStart, timeEnd) - - if timeConditions == "" { - timeConditions = fmt.Sprintf("(timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d))", bucketTimeStart, bucketTimeEnd) - } else { - timeConditions = fmt.Sprintf("%s OR (timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d))", timeConditions, bucketTimeStart, bucketTimeEnd) - } - } - - count = count + buckets[i].Count - } - } else if parsedOrder == "timestamp ASC" { - for i := 0; i < len(buckets); i++ { - if count < limit && buckets[i].Count > 0 { - bucketTimeStart, bucketTimeEnd := getBucketTimes(interval, buckets[i].Interval, timeStart, timeEnd) - - if timeConditions == "" { - timeConditions = fmt.Sprintf("(timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d))", bucketTimeStart, bucketTimeEnd) - } else { - timeConditions = fmt.Sprintf("%s OR (timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d))", timeConditions, bucketTimeStart, bucketTimeEnd) - } - } - - count = count + buckets[i].Count - } - } else { - for i := 0; i < len(buckets); i++ { - if buckets[i].Count > 0 { - bucketTimeStart, bucketTimeEnd := getBucketTimes(interval, buckets[i].Interval, timeStart, timeEnd) - - if timeConditions == "" { - timeConditions = fmt.Sprintf("(timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d))", bucketTimeStart, bucketTimeEnd) - } else { - timeConditions = fmt.Sprintf("%s OR (timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d))", timeConditions, bucketTimeStart, bucketTimeEnd) - } - } - - count = count + buckets[i].Count - } - } - - log.Debug(ctx, "SQL result buckets", zap.Int64("count", count), zap.Any("buckets", buckets)) - - // If the count of documents is 0 we can already return the result, because the following query wouldn't return any - // documents. - if count == 0 { - return documents, fields, count, time.Now().Sub(queryStartTime).Milliseconds(), buckets, nil - } - - // Now we are building and executing our sql query. We always return all fields from the logs table, where the - // timestamp of a row is within the selected query range and the parsed query. We also order all the results by the - // timestamp field and limiting the results / using a offset for pagination. - sqlQueryRawLogs := fmt.Sprintf("SELECT %s FROM %s.logs WHERE (%s) %s ORDER BY %s LIMIT %d SETTINGS skip_unavailable_shards = 1", defaultColumns, i.database, timeConditions, conditions, parsedOrder, limit) - log.Debug(ctx, "SQL query raw logs", zap.String("query", sqlQueryRawLogs)) - rowsRawLogs, err := i.client.QueryContext(ctx, sqlQueryRawLogs) - if err != nil { - return nil, nil, 0, 0, nil, err - } - defer rowsRawLogs.Close() - - // Now we are going throw all the returned rows and passing them to the Row struct. After that we are converting - // each row to a JSON document for the React UI, which contains all the default fields and all the items from the - // fields_string / fields_number array. - // When the offset is 0 (user starts a new query) we are also checking all the fields from the nested fields_string - // and fields_number array and adding them to the fields slice. This slice can then be used by the user in our React - // UI to show only a list of selected fields in the table. - for rowsRawLogs.Next() { - var r Row - if err := rowsRawLogs.Scan(&r.Timestamp, &r.Cluster, &r.Namespace, &r.App, &r.Pod, &r.Container, &r.Host, &r.FieldsString.Key, &r.FieldsString.Value, &r.FieldsNumber.Key, &r.FieldsNumber.Value, &r.Log); err != nil { - return nil, nil, 0, 0, nil, err - } - - var document map[string]any - document = make(map[string]any) - document["timestamp"] = r.Timestamp - document["cluster"] = r.Cluster - document["namespace"] = r.Namespace - document["app"] = r.App - document["pod_name"] = r.Pod - document["container_name"] = r.Container - document["host"] = r.Host - document["log"] = r.Log - - for index, field := range r.FieldsNumber.Key { - document[field] = r.FieldsNumber.Value[index] - fields = appendIfMissing(fields, field) - } - - for index, field := range r.FieldsString.Key { - document[field] = r.FieldsString.Value[index] - fields = appendIfMissing(fields, field) - } - - documents = append(documents, document) - } - - if err := rowsRawLogs.Err(); err != nil { - return nil, nil, 0, 0, nil, err - } - - sort.Strings(fields) - log.Debug(ctx, "SQL result raw logs", zap.Int("documentsCount", len(documents))) - - return documents, fields, count, time.Now().Sub(queryStartTime).Milliseconds(), buckets, nil -} - // GetRawQueryResults returns all rows for the user provided SQL query. This function should only be used by other // plugins. If users should be able to directly access a Clickhouse instance you can expose the instance using the SQL // plugin. @@ -388,22 +196,43 @@ func New(name string, options map[string]any) (Instance, error) { return nil, err } - if config.WriteTimeout == "" { - config.WriteTimeout = "30" + parsedDialTimeout := 10 * time.Second + if config.DialTimeout != "" { + tmpParsedDialTimeout, err := time.ParseDuration(config.DialTimeout) + if err != nil { + parsedDialTimeout = tmpParsedDialTimeout + } } - if config.ReadTimeout == "" { - config.ReadTimeout = "30" + parsedConnMaxLifetime := 1 * time.Hour + if config.ConnMaxLifetime != "" { + tmpParsedConnMaxLifetime, err := time.ParseDuration(config.ConnMaxLifetime) + if err != nil { + parsedConnMaxLifetime = tmpParsedConnMaxLifetime + } } - dns := "tcp://" + config.Address + "?username=" + config.Username + "&password=" + config.Password + "&database=" + config.Database + "&write_timeout=" + config.WriteTimeout + "&read_timeout=" + config.ReadTimeout + if config.MaxIdleConns == 0 { + config.MaxIdleConns = 5 + } - client, err := sql.Open("clickhouse", dns) - if err != nil { - log.Error(nil, "Could not initialize database connection", zap.Error(err)) - return nil, err + if config.MaxOpenConns == 0 { + config.MaxOpenConns = 10 } + client := clickhouse.OpenDB(&clickhouse.Options{ + Addr: strings.Split(config.Address, ","), + Auth: clickhouse.Auth{ + Database: config.Database, + Username: config.Username, + Password: config.Password, + }, + DialTimeout: parsedDialTimeout, + }) + client.SetMaxIdleConns(config.MaxIdleConns) + client.SetMaxOpenConns(config.MaxOpenConns) + client.SetConnMaxLifetime(parsedConnMaxLifetime) + instance := &instance{ name: name, database: config.Database, diff --git a/plugins/plugin-klogs/pkg/instance/instance_mock.go b/plugins/plugin-klogs/pkg/instance/instance_mock.go deleted file mode 100644 index 6678a2e65..000000000 --- a/plugins/plugin-klogs/pkg/instance/instance_mock.go +++ /dev/null @@ -1,215 +0,0 @@ -// Code generated by mockery v2.12.3. DO NOT EDIT. - -package instance - -import ( - context "context" - - mock "github.com/stretchr/testify/mock" -) - -// MockInstance is an autogenerated mock type for the Instance type -type MockInstance struct { - mock.Mock -} - -// GetAggregation provides a mock function with given fields: ctx, aggregation -func (_m *MockInstance) GetAggregation(ctx context.Context, aggregation Aggregation) ([]map[string]interface{}, []string, error) { - ret := _m.Called(ctx, aggregation) - - var r0 []map[string]interface{} - if rf, ok := ret.Get(0).(func(context.Context, Aggregation) []map[string]interface{}); ok { - r0 = rf(ctx, aggregation) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]map[string]interface{}) - } - } - - var r1 []string - if rf, ok := ret.Get(1).(func(context.Context, Aggregation) []string); ok { - r1 = rf(ctx, aggregation) - } else { - if ret.Get(1) != nil { - r1 = ret.Get(1).([]string) - } - } - - var r2 error - if rf, ok := ret.Get(2).(func(context.Context, Aggregation) error); ok { - r2 = rf(ctx, aggregation) - } else { - r2 = ret.Error(2) - } - - return r0, r1, r2 -} - -// GetFields provides a mock function with given fields: filter, fieldType -func (_m *MockInstance) GetFields(filter string, fieldType string) []string { - ret := _m.Called(filter, fieldType) - - var r0 []string - if rf, ok := ret.Get(0).(func(string, string) []string); ok { - r0 = rf(filter, fieldType) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]string) - } - } - - return r0 -} - -// GetLogs provides a mock function with given fields: ctx, query, order, orderBy, limit, timeStart, timeEnd -func (_m *MockInstance) GetLogs(ctx context.Context, query string, order string, orderBy string, limit int64, timeStart int64, timeEnd int64) ([]map[string]interface{}, []string, int64, int64, []Bucket, error) { - ret := _m.Called(ctx, query, order, orderBy, limit, timeStart, timeEnd) - - var r0 []map[string]interface{} - if rf, ok := ret.Get(0).(func(context.Context, string, string, string, int64, int64, int64) []map[string]interface{}); ok { - r0 = rf(ctx, query, order, orderBy, limit, timeStart, timeEnd) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]map[string]interface{}) - } - } - - var r1 []string - if rf, ok := ret.Get(1).(func(context.Context, string, string, string, int64, int64, int64) []string); ok { - r1 = rf(ctx, query, order, orderBy, limit, timeStart, timeEnd) - } else { - if ret.Get(1) != nil { - r1 = ret.Get(1).([]string) - } - } - - var r2 int64 - if rf, ok := ret.Get(2).(func(context.Context, string, string, string, int64, int64, int64) int64); ok { - r2 = rf(ctx, query, order, orderBy, limit, timeStart, timeEnd) - } else { - r2 = ret.Get(2).(int64) - } - - var r3 int64 - if rf, ok := ret.Get(3).(func(context.Context, string, string, string, int64, int64, int64) int64); ok { - r3 = rf(ctx, query, order, orderBy, limit, timeStart, timeEnd) - } else { - r3 = ret.Get(3).(int64) - } - - var r4 []Bucket - if rf, ok := ret.Get(4).(func(context.Context, string, string, string, int64, int64, int64) []Bucket); ok { - r4 = rf(ctx, query, order, orderBy, limit, timeStart, timeEnd) - } else { - if ret.Get(4) != nil { - r4 = ret.Get(4).([]Bucket) - } - } - - var r5 error - if rf, ok := ret.Get(5).(func(context.Context, string, string, string, int64, int64, int64) error); ok { - r5 = rf(ctx, query, order, orderBy, limit, timeStart, timeEnd) - } else { - r5 = ret.Error(5) - } - - return r0, r1, r2, r3, r4, r5 -} - -// GetName provides a mock function with given fields: -func (_m *MockInstance) GetName() string { - ret := _m.Called() - - var r0 string - if rf, ok := ret.Get(0).(func() string); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(string) - } - - return r0 -} - -// GetRawQueryResults provides a mock function with given fields: ctx, query -func (_m *MockInstance) GetRawQueryResults(ctx context.Context, query string) ([][]interface{}, []string, error) { - ret := _m.Called(ctx, query) - - var r0 [][]interface{} - if rf, ok := ret.Get(0).(func(context.Context, string) [][]interface{}); ok { - r0 = rf(ctx, query) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([][]interface{}) - } - } - - var r1 []string - if rf, ok := ret.Get(1).(func(context.Context, string) []string); ok { - r1 = rf(ctx, query) - } else { - if ret.Get(1) != nil { - r1 = ret.Get(1).([]string) - } - } - - var r2 error - if rf, ok := ret.Get(2).(func(context.Context, string) error); ok { - r2 = rf(ctx, query) - } else { - r2 = ret.Error(2) - } - - return r0, r1, r2 -} - -// getFields provides a mock function with given fields: ctx -func (_m *MockInstance) getFields(ctx context.Context) (Fields, error) { - ret := _m.Called(ctx) - - var r0 Fields - if rf, ok := ret.Get(0).(func(context.Context) Fields); ok { - r0 = rf(ctx) - } else { - r0 = ret.Get(0).(Fields) - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// refreshCachedFields provides a mock function with given fields: -func (_m *MockInstance) refreshCachedFields() []string { - ret := _m.Called() - - var r0 []string - if rf, ok := ret.Get(0).(func() []string); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]string) - } - } - - return r0 -} - -type NewMockInstanceT interface { - mock.TestingT - Cleanup(func()) -} - -// NewMockInstance creates a new instance of MockInstance. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockInstance(t NewMockInstanceT) *MockInstance { - mock := &MockInstance{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/plugins/plugin-klogs/pkg/instance/logs.go b/plugins/plugin-klogs/pkg/instance/logs.go index 2b9ca2279..af80639cf 100644 --- a/plugins/plugin-klogs/pkg/instance/logs.go +++ b/plugins/plugin-klogs/pkg/instance/logs.go @@ -1,227 +1,204 @@ package instance import ( + "context" "fmt" - "strings" + "sort" + "time" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/kobsio/kobs/pkg/log" + "go.uber.org/zap" ) -var ( - defaultFields = []string{"timestamp", "cluster", "namespace", "app", "pod_name", "container_name", "host", "log"} - defaultColumns = "timestamp, cluster, namespace, app, pod_name, container_name, host, fields_string.key, fields_string.value, fields_number.key, fields_number.value, log" - - fieldsMetric = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: "kobs", - Name: "klogs_fields_total", - Help: "Number how often a field was used in a query.", - }, []string{"field"}) -) - -// parseLogsQuery parses the given query string and return the conditions for the where statement in the sql query. We -// are providing a very simple query language where the user can use "(", ")", "_not_", "_and_" and "_or_" operators. -// Then we are splitting the string again for the other operators "=", "!=", ">", ">=", "<", "<=" and "~" which are used -// to check the value of a field. -// Once we have build all the conditions we concate all the strings to the final sql statement for the where clause. -func parseLogsQuery(query string, materializedColumns []string) (string, error) { - var newOpenBrackets []string - openBrackets := strings.Split(query, "(") - for _, openBracket := range openBrackets { - var newCloseBrackets []string - closeBrackets := strings.Split(openBracket, ")") - for _, closeBracket := range closeBrackets { - var newNots []string - nots := strings.Split(closeBracket, "_not_") - for _, not := range nots { - var newAnds []string - ands := strings.Split(not, "_and_") - for _, and := range ands { - var newOrs []string - ors := strings.Split(and, "_or_") - for _, or := range ors { - condition, err := splitOperator(or, materializedColumns) - if err != nil { - return "", err - } - - newOrs = append(newOrs, condition) - } - newAnds = append(newAnds, strings.Join(newOrs, " OR ")) - } - newNots = append(newNots, strings.Join(newAnds, " AND ")) - } - newCloseBrackets = append(newCloseBrackets, strings.Join(newNots, " NOT ")) +// GetLogs parses the given query into the sql syntax, which is then run against the ClickHouse instance. The returned +// rows are converted into a document schema which can be used by our UI. +func (i *instance) GetLogs(ctx context.Context, query, order, orderBy string, limit, timeStart, timeEnd int64) ([]map[string]any, []string, int64, int64, []Bucket, error) { + var count int64 + var buckets []Bucket + var documents []map[string]any + var timeConditions string + var interval int64 + + fields := defaultFields + queryStartTime := time.Now() + + // When the user provides a query, we have to build the additional conditions for the sql query. This is done via + // the parseLogsQuery which is responsible for parsing our simple query language and returning the corresponding + // where statement. These conditions are the added as additional AND to our sql query. + conditions := "" + if query != "" { + parsedQuery, err := parseLogsQuery(query, i.materializedColumns) + if err != nil { + return nil, nil, 0, 0, nil, err } - newOpenBrackets = append(newOpenBrackets, strings.Join(newCloseBrackets, ")")) - } - return strings.Join(newOpenBrackets, "("), nil -} - -// splitOperator splits the given string by the following operators "=", "!=", ">", ">=", "<", "<=" and "~". If the -// result is a slice with two items we found the operator which was used by the user to check the value of a field. So -// that we pass the key (first item), value (second item) and the operator to the handleConditionParts to build the -// where condition. -func splitOperator(condition string, materializedColumns []string) (string, error) { - greaterThanOrEqual := strings.Split(condition, ">=") - if len(greaterThanOrEqual) == 2 { - return handleConditionParts(greaterThanOrEqual[0], greaterThanOrEqual[1], ">=", materializedColumns) - } - - greaterThan := strings.Split(condition, ">") - if len(greaterThan) == 2 { - return handleConditionParts(greaterThan[0], greaterThan[1], ">", materializedColumns) + conditions = fmt.Sprintf("AND %s", parsedQuery) } - lessThanOrEqual := strings.Split(condition, "<=") - if len(lessThanOrEqual) == 2 { - return handleConditionParts(lessThanOrEqual[0], lessThanOrEqual[1], "<=", materializedColumns) - } + parsedOrder := parseOrder(order, orderBy, i.materializedColumns) - lessThan := strings.Split(condition, "<") - if len(lessThan) == 2 { - return handleConditionParts(lessThan[0], lessThan[1], "<", materializedColumns) + // We check that the time range if not 0 or lower then 0, because this would mean that the end time is equal to the + // start time or before the start time, which results in an error for the following SQL queries. + if timeEnd-timeStart <= 0 { + return nil, nil, 0, 0, nil, fmt.Errorf("invalid time range") } - ilike := strings.Split(condition, "=~") - if len(ilike) == 2 { - return handleConditionParts(ilike[0], ilike[1], "=~", materializedColumns) + // We have to define the interval for the selected time range. By default we are creating 30 buckets in the + // following SQL query, but for time ranges with less then 30 seconds we have to create less buckets. + switch seconds := timeEnd - timeStart; { + case seconds <= 2: + interval = (timeEnd - timeStart) / 1 + case seconds <= 10: + interval = (timeEnd - timeStart) / 5 + case seconds <= 30: + interval = (timeEnd - timeStart) / 10 + default: + interval = (timeEnd - timeStart) / 30 } - notEqual := strings.Split(condition, "!=") - if len(notEqual) == 2 { - return handleConditionParts(notEqual[0], notEqual[1], "!=", materializedColumns) + // Now we are creating 30 buckets for the selected time range and count the documents in each bucket. This is used + // to render the distribution chart, which shows how many documents/rows are available within a bucket. + sqlQueryBuckets := fmt.Sprintf(`SELECT toStartOfInterval(timestamp, INTERVAL %d second) AS interval_data, count(*) AS count_data FROM %s.logs WHERE timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d) %s GROUP BY interval_data ORDER BY interval_data WITH FILL FROM toStartOfInterval(FROM_UNIXTIME(%d), INTERVAL %d second) TO toStartOfInterval(FROM_UNIXTIME(%d), INTERVAL %d second) STEP %d SETTINGS skip_unavailable_shards = 1`, interval, i.database, timeStart, timeEnd, conditions, timeStart, interval, timeEnd, interval, interval) + log.Debug(ctx, "SQL query buckets", zap.String("query", sqlQueryBuckets)) + rowsBuckets, err := i.client.QueryContext(ctx, sqlQueryBuckets) + if err != nil { + return nil, nil, 0, 0, nil, err } + defer rowsBuckets.Close() - notIlike := strings.Split(condition, "!~") - if len(notIlike) == 2 { - return handleConditionParts(notIlike[0], notIlike[1], "!~", materializedColumns) - } + for rowsBuckets.Next() { + var intervalData time.Time + var countData int64 - regex := strings.Split(condition, "~") - if len(regex) == 2 { - return handleConditionParts(regex[0], regex[1], "~", materializedColumns) - } - - equal := strings.Split(condition, "=") - if len(equal) == 2 { - return handleConditionParts(equal[0], equal[1], "=", materializedColumns) - } + if err := rowsBuckets.Scan(&intervalData, &countData); err != nil { + return nil, nil, 0, 0, nil, err + } - if strings.Contains(condition, "_exists_ ") { - return handleExistsCondition(strings.TrimLeft(strings.TrimSpace(condition), "_exists_ "), materializedColumns), nil + buckets = append(buckets, Bucket{ + Interval: intervalData.Unix(), + Count: countData, + }) } - if strings.TrimSpace(condition) == "" { - return "", nil + if err := rowsBuckets.Err(); err != nil { + return nil, nil, 0, 0, nil, err } - return "", fmt.Errorf("invalid operator: %s", condition) -} + sort.Slice(buckets, func(i, j int) bool { + return buckets[i].Interval < buckets[j].Interval + }) -// handleConditionParts converts the given key, value and operator to it's sql representation. This is required because -// some fields like "timestamp", "cluster", "namespace", etc. are a seperate column in the sql table, where others like -// "content.level" or "content.response_code" are only available via the fields_strings / fields_numbers column. For -// these nested columns we have to use a special query syntax. We also have to use the match function when the operator -// is "~" which says that the user checks the field value against a regular expression. -// -// See: https://gist.github.com/alexey-milovidov/d6ffc9e0bc0bc72dd7bca90e76e3b83b -// See: https://clickhouse.tech/docs/en/sql-reference/functions/string-search-functions/#matchhaystack-pattern -func handleConditionParts(key, value, operator string, materializedColumns []string) (string, error) { - key = strings.TrimSpace(key) - value = strings.TrimSpace(value) - - // The kobs_klogs_fields_total metric can be used to determine how often a field is used. This information can - // then be used to create an additional column for this field via the following SQL commands: - // ALTER TABLE logs.logs ON CLUSTER '{cluster}' ADD COLUMN String DEFAULT fields_string.value[indexOf(fields_string.key, '')]; - // ALTER TABLE logs.logs ON CLUSTER '{cluster}' ADD COLUMN String DEFAULT fields_number.value[indexOf(fields_number.key, '')]; - fieldsMetric.WithLabelValues(key).Inc() - - if contains(defaultFields, key) || contains(materializedColumns, key) { - if operator == "=~" { - return fmt.Sprintf("%s ILIKE %s", key, value), nil - } + // To optimize the query to get the raw logs we are creating a new time condition for the where statement. In that + // way we only have to look into the buckets which are containing some documents only have to include the first N + // buckets until the limit is reached. + // When provided a custom order (not "timestamp DESC") we can also optimize the search based on the limit when the + // user wants to sort the returned documents via "timestamp ASC". For all other order conditions we can only check + // if the bucket contains some documents, but we can not optimize the results based on the limit. + if parsedOrder == "timestamp DESC" { + for i := len(buckets) - 1; i >= 0; i-- { + if count < limit && buckets[i].Count > 0 { + bucketTimeStart, bucketTimeEnd := getBucketTimes(interval, buckets[i].Interval, timeStart, timeEnd) - if operator == "!~" { - return fmt.Sprintf("%s NOT ILIKE %s", key, value), nil - } - - if operator == "~" { - return fmt.Sprintf("match(%s, %s)", key, value), nil - } - - return fmt.Sprintf("%s%s%s", key, operator, value), nil - } + if timeConditions == "" { + timeConditions = fmt.Sprintf("(timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d))", bucketTimeStart, bucketTimeEnd) + } else { + timeConditions = fmt.Sprintf("%s OR (timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d))", timeConditions, bucketTimeStart, bucketTimeEnd) + } + } - if value != "" && string(value[0]) == "'" && string(value[len(value)-1]) == "'" { - if operator == "=~" { - return fmt.Sprintf("fields_string.value[indexOf(fields_string.key, '%s')] ILIKE %s", key, value), nil + count = count + buckets[i].Count } + } else if parsedOrder == "timestamp ASC" { + for i := 0; i < len(buckets); i++ { + if count < limit && buckets[i].Count > 0 { + bucketTimeStart, bucketTimeEnd := getBucketTimes(interval, buckets[i].Interval, timeStart, timeEnd) + + if timeConditions == "" { + timeConditions = fmt.Sprintf("(timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d))", bucketTimeStart, bucketTimeEnd) + } else { + timeConditions = fmt.Sprintf("%s OR (timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d))", timeConditions, bucketTimeStart, bucketTimeEnd) + } + } - if operator == "!~" { - return fmt.Sprintf("fields_string.value[indexOf(fields_string.key, '%s')] NOT ILIKE %s", key, value), nil + count = count + buckets[i].Count } + } else { + for i := 0; i < len(buckets); i++ { + if buckets[i].Count > 0 { + bucketTimeStart, bucketTimeEnd := getBucketTimes(interval, buckets[i].Interval, timeStart, timeEnd) + + if timeConditions == "" { + timeConditions = fmt.Sprintf("(timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d))", bucketTimeStart, bucketTimeEnd) + } else { + timeConditions = fmt.Sprintf("%s OR (timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d))", timeConditions, bucketTimeStart, bucketTimeEnd) + } + } - if operator == "~" { - return fmt.Sprintf("match(fields_string.value[indexOf(fields_string.key, '%s')], %s)", key, value), nil + count = count + buckets[i].Count } - - return fmt.Sprintf("fields_string.value[indexOf(fields_string.key, '%s')] %s %s", key, operator, value), nil } - if operator == "=~" { - return fmt.Sprintf("fields_number.value[indexOf(fields_number.key, '%s')] ILIKE %s", key, value), nil - } + log.Debug(ctx, "SQL result buckets", zap.Int64("count", count), zap.Any("buckets", buckets)) - if operator == "!~" { - return fmt.Sprintf("fields_number.value[indexOf(fields_number.key, '%s')] NOT ILIKE %s", key, value), nil + // If the count of documents is 0 we can already return the result, because the following query wouldn't return any + // documents. + if count == 0 { + return documents, fields, count, time.Now().Sub(queryStartTime).Milliseconds(), buckets, nil } - if operator == "~" { - return fmt.Sprintf("match(fields_number.value[indexOf(fields_number.key, '%s')], %s)", key, value), nil + // Now we are building and executing our sql query. We always return all fields from the logs table, where the + // timestamp of a row is within the selected query range and the parsed query. We also order all the results by the + // timestamp field and limiting the results / using a offset for pagination. + sqlQueryRawLogs := fmt.Sprintf("SELECT %s FROM %s.logs WHERE (%s) %s ORDER BY %s LIMIT %d SETTINGS skip_unavailable_shards = 1", defaultColumns, i.database, timeConditions, conditions, parsedOrder, limit) + log.Debug(ctx, "SQL query raw logs", zap.String("query", sqlQueryRawLogs)) + rowsRawLogs, err := i.client.QueryContext(ctx, sqlQueryRawLogs) + if err != nil { + return nil, nil, 0, 0, nil, err } + defer rowsRawLogs.Close() - return fmt.Sprintf("fields_number.value[indexOf(fields_number.key, '%s')] %s %s", key, operator, value), nil -} - -func handleExistsCondition(key string, materializedColumns []string) string { - if contains(defaultFields, key) || contains(materializedColumns, key) { - return fmt.Sprintf("%s IS NOT NULL", key) - } - - return fmt.Sprintf("(has(fields_string.key, '%s') = 1 OR has(fields_number.key, '%s') = 1)", key, key) -} + // Now we are going throw all the returned rows and passing them to the Row struct. After that we are converting + // each row to a JSON document for the React UI, which contains all the default fields and all the items from the + // fields_string / fields_number array. + // When the offset is 0 (user starts a new query) we are also checking all the fields from the nested fields_string + // and fields_number array and adding them to the fields slice. This slice can then be used by the user in our React + // UI to show only a list of selected fields in the table. + for rowsRawLogs.Next() { + var r Row + if err := rowsRawLogs.Scan(&r.Timestamp, &r.Cluster, &r.Namespace, &r.App, &r.Pod, &r.Container, &r.Host, &r.FieldsString, &r.FieldsNumber, &r.Log); err != nil { + return nil, nil, 0, 0, nil, err + } -func parseOrder(order, orderBy string, materializedColumns []string) string { - if order == "" || orderBy == "" { - return "timestamp DESC" - } + var document map[string]any + document = make(map[string]any) + document["timestamp"] = r.Timestamp + document["cluster"] = r.Cluster + document["namespace"] = r.Namespace + document["app"] = r.App + document["pod_name"] = r.Pod + document["container_name"] = r.Container + document["host"] = r.Host + document["log"] = r.Log + + for k, v := range r.FieldsNumber { + document[k] = v + fields = appendIfMissing(fields, k) + } - if order == "ascending" { - order = "ASC" - } else { - order = "DESC" - } + for k, v := range r.FieldsString { + document[k] = v + fields = appendIfMissing(fields, k) + } - orderBy = strings.TrimSpace(orderBy) - if contains(defaultFields, orderBy) || contains(materializedColumns, orderBy) { - return fmt.Sprintf("%s %s", orderBy, order) + documents = append(documents, document) } - return fmt.Sprintf("fields_string.value[indexOf(fields_string.key, '%s')] %s, fields_number.value[indexOf(fields_number.key, '%s')] %s", orderBy, order, orderBy, order) -} - -// getBucketTimes determines the start and end time of an bucket. This is necessary, because the first and last bucket -// time can be outside of the user defined time range. -func getBucketTimes(interval, bucketTime, timeStart, timeEnd int64) (int64, int64) { - if bucketTime < timeStart { - return timeStart, timeStart + interval - (timeStart - bucketTime) + if err := rowsRawLogs.Err(); err != nil { + return nil, nil, 0, 0, nil, err } - if bucketTime+interval > timeEnd { - return bucketTime, bucketTime + timeEnd - bucketTime - } + sort.Strings(fields) + log.Debug(ctx, "SQL result raw logs", zap.Int("documentsCount", len(documents))) - return bucketTime, bucketTime + interval + return documents, fields, count, time.Now().Sub(queryStartTime).Milliseconds(), buckets, nil } diff --git a/plugins/plugin-klogs/pkg/instance/logs_test.go b/plugins/plugin-klogs/pkg/instance/logs_test.go deleted file mode 100644 index 590c6ff3f..000000000 --- a/plugins/plugin-klogs/pkg/instance/logs_test.go +++ /dev/null @@ -1,145 +0,0 @@ -package instance - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestParseLogsQuery(t *testing.T) { - for _, tt := range []struct { - query string - where string - isInvalid bool - }{ - {query: "cluster = 'foo' _and_ namespace = 'bar'", where: "cluster='foo' AND namespace='bar'", isInvalid: false}, - {query: "cluster = 'foo' _and_ (namespace='hello' _or_ namespace='world')", where: "cluster='foo' AND (namespace='hello' OR namespace='world')", isInvalid: false}, - {query: "kubernetes.label_foo = 'bar'", where: "fields_string.value[indexOf(fields_string.key, 'kubernetes.label_foo')] = 'bar'", isInvalid: false}, - {query: "kubernetes.label_foo_bar =~ '\\%hellow\\%world\\%'", where: "fields_string.value[indexOf(fields_string.key, 'kubernetes.label_foo_bar')] ILIKE '\\%hellow\\%world\\%'", isInvalid: false}, - {query: "kubernetes.label_foo_bar ~ 'hello.*'", where: "match(fields_string.value[indexOf(fields_string.key, 'kubernetes.label_foo_bar')], 'hello.*')", isInvalid: false}, - {query: "kubernetes.label_foo_bar / 'hello.*'", isInvalid: true}, - } { - t.Run(tt.query, func(t *testing.T) { - parsedWhere, err := parseLogsQuery(tt.query, nil) - if tt.isInvalid { - require.Error(t, err) - } else { - require.NoError(t, err) - require.Equal(t, tt.where, parsedWhere) - } - }) - } -} - -func TestSplitOperator(t *testing.T) { - for _, tt := range []struct { - query string - expectedCondition string - isInvalid bool - }{ - {query: "cluster >= 'foo'", expectedCondition: "cluster>='foo'", isInvalid: false}, - {query: "cluster > 'foo'", expectedCondition: "cluster>'foo'", isInvalid: false}, - {query: "cluster <= 'foo'", expectedCondition: "cluster<='foo'", isInvalid: false}, - {query: "cluster < 'foo'", expectedCondition: "cluster<'foo'", isInvalid: false}, - {query: "cluster =~ 'foo'", expectedCondition: "cluster ILIKE 'foo'", isInvalid: false}, - {query: "cluster != 'foo'", expectedCondition: "cluster!='foo'", isInvalid: false}, - {query: "cluster !~ 'foo'", expectedCondition: "cluster NOT ILIKE 'foo'", isInvalid: false}, - {query: "cluster ~ 'foo'", expectedCondition: "match(cluster, 'foo')", isInvalid: false}, - {query: "cluster = 'foo'", expectedCondition: "cluster='foo'", isInvalid: false}, - {query: "_exists_ cluster", expectedCondition: "cluster IS NOT NULL", isInvalid: false}, - {query: " ", expectedCondition: "", isInvalid: false}, - {query: "cluster / 'foo'", isInvalid: true}, - } { - t.Run(tt.query, func(t *testing.T) { - actualCondition, err := splitOperator(tt.query, nil) - if tt.isInvalid { - require.Error(t, err) - } else { - require.NoError(t, err) - require.Equal(t, tt.expectedCondition, actualCondition) - } - }) - } -} - -func TestHandleConditionParts(t *testing.T) { - for _, tt := range []struct { - key string - value string - operator string - expectedCondition string - }{ - {key: "cluster", value: "'foobar'", operator: "=~", expectedCondition: "cluster ILIKE 'foobar'"}, - {key: "cluster", value: "'foobar'", operator: "!~", expectedCondition: "cluster NOT ILIKE 'foobar'"}, - {key: "cluster", value: "'foobar'", operator: "~", expectedCondition: "match(cluster, 'foobar')"}, - {key: "cluster", value: "'foobar'", operator: "=", expectedCondition: "cluster='foobar'"}, - {key: "helloworld", value: "'foobar'", operator: "=~", expectedCondition: "fields_string.value[indexOf(fields_string.key, 'helloworld')] ILIKE 'foobar'"}, - {key: "helloworld", value: "'foobar'", operator: "!~", expectedCondition: "fields_string.value[indexOf(fields_string.key, 'helloworld')] NOT ILIKE 'foobar'"}, - {key: "helloworld", value: "'foobar'", operator: "~", expectedCondition: "match(fields_string.value[indexOf(fields_string.key, 'helloworld')], 'foobar')"}, - {key: "helloworld", value: "'foobar'", operator: "=", expectedCondition: "fields_string.value[indexOf(fields_string.key, 'helloworld')] = 'foobar'"}, - {key: "helloworld", value: "42", operator: "=~", expectedCondition: "fields_number.value[indexOf(fields_number.key, 'helloworld')] ILIKE 42"}, - {key: "helloworld", value: "42", operator: "!~", expectedCondition: "fields_number.value[indexOf(fields_number.key, 'helloworld')] NOT ILIKE 42"}, - {key: "helloworld", value: "42", operator: "~", expectedCondition: "match(fields_number.value[indexOf(fields_number.key, 'helloworld')], 42)"}, - {key: "helloworld", value: "42", operator: "=", expectedCondition: "fields_number.value[indexOf(fields_number.key, 'helloworld')] = 42"}, - } { - t.Run(tt.key, func(t *testing.T) { - actualCondition, _ := handleConditionParts(tt.key, tt.value, tt.operator, nil) - require.Equal(t, tt.expectedCondition, actualCondition) - }) - } -} - -func TestHandleExistsCondition(t *testing.T) { - for _, tt := range []struct { - key string - expectedCondition string - }{ - {key: "cluster", expectedCondition: "cluster IS NOT NULL"}, - {key: "foobar", expectedCondition: "(has(fields_string.key, 'foobar') = 1 OR has(fields_number.key, 'foobar') = 1)"}, - } { - t.Run(tt.key, func(t *testing.T) { - actualCondition := handleExistsCondition(tt.key, nil) - require.Equal(t, tt.expectedCondition, actualCondition) - }) - } -} - -func TestParseOrder(t *testing.T) { - for _, tt := range []struct { - order string - orderBy string - expectedCondition string - }{ - {order: "", orderBy: "", expectedCondition: "timestamp DESC"}, - {order: "ascending", orderBy: "cluster", expectedCondition: "cluster ASC"}, - {order: "descending", orderBy: "cluster", expectedCondition: "cluster DESC"}, - {order: "ascending", orderBy: "foobar", expectedCondition: "fields_string.value[indexOf(fields_string.key, 'foobar')] ASC, fields_number.value[indexOf(fields_number.key, 'foobar')] ASC"}, - } { - t.Run(tt.order+tt.orderBy, func(t *testing.T) { - actualCondition := parseOrder(tt.order, tt.orderBy, nil) - require.Equal(t, tt.expectedCondition, actualCondition) - }) - } -} - -func TestGetInterval(t *testing.T) { - for _, tt := range []struct { - interval int64 - bucketTime int64 - timeStart int64 - timeEnd int64 - expectedTimeStart int64 - expectedTimeEnd int64 - }{ - {interval: 124, bucketTime: 1640188920, timeStart: 1640189016, timeEnd: 1640192745, expectedTimeStart: 1640189016, expectedTimeEnd: 1640189044}, - {interval: 124, bucketTime: 1640190780, timeStart: 1640189016, timeEnd: 1640192745, expectedTimeStart: 1640190780, expectedTimeEnd: 1640190904}, - {interval: 124, bucketTime: 1640192640, timeStart: 1640189016, timeEnd: 1640192745, expectedTimeStart: 1640192640, expectedTimeEnd: 1640192745}, - } { - t.Run(fmt.Sprintf("%d", tt.bucketTime), func(t *testing.T) { - actualTimeStart, actualTimeEnd := getBucketTimes(tt.interval, tt.bucketTime, tt.timeStart, tt.timeEnd) - require.Equal(t, tt.expectedTimeStart, actualTimeStart) - require.Equal(t, tt.expectedTimeEnd, actualTimeEnd) - }) - } -} diff --git a/plugins/plugin-klogs/pkg/instance/structs.go b/plugins/plugin-klogs/pkg/instance/structs.go index ca9da5b86..81ef7b381 100644 --- a/plugins/plugin-klogs/pkg/instance/structs.go +++ b/plugins/plugin-klogs/pkg/instance/structs.go @@ -10,18 +10,6 @@ type Fields struct { Number []string } -// FieldString is the struct for the nested fields for all JSON fields of a log line, which are containing a string. -type FieldString struct { - Key []string - Value []string -} - -// FieldNumber is the struct for the nested fields for all JSON fields of a log line, which are containing a number. -type FieldNumber struct { - Key []string - Value []float64 -} - // Row is the struct which represents a single row in the logs table of ClickHouse. type Row struct { Timestamp time.Time @@ -31,8 +19,8 @@ type Row struct { Pod string Container string Host string - FieldsString FieldString - FieldsNumber FieldNumber + FieldsString map[string]string + FieldsNumber map[string]float64 Log string } diff --git a/plugins/plugin-klogs/src/components/panel/LogsDocument.tsx b/plugins/plugin-klogs/src/components/panel/LogsDocument.tsx index fbea46fb4..704a50af5 100644 --- a/plugins/plugin-klogs/src/components/panel/LogsDocument.tsx +++ b/plugins/plugin-klogs/src/components/panel/LogsDocument.tsx @@ -82,7 +82,7 @@ const LogsDocument: React.FunctionComponent = ({ {Object.keys(document) - .filter((key) => key.startsWith('content.') && document[key].length < 128) + .filter((key) => key.startsWith('content_') && document[key].length < 128) .map((key) => ( {key}: @@ -95,7 +95,7 @@ const LogsDocument: React.FunctionComponent = ({ {document['log']} - {Object.keys(document).filter((key) => key.startsWith('content.') && document[key].length < 128) + {Object.keys(document).filter((key) => key.startsWith('content_') && document[key].length < 128) .length === 0 ? Object.keys(document) .filter((key) => key.startsWith('kubernetes.')) diff --git a/plugins/plugin-sql/pkg/instance/instance.go b/plugins/plugin-sql/pkg/instance/instance.go index 2c5ee9452..f47d87ec6 100644 --- a/plugins/plugin-sql/pkg/instance/instance.go +++ b/plugins/plugin-sql/pkg/instance/instance.go @@ -6,7 +6,7 @@ import ( "fmt" "math" - _ "github.com/ClickHouse/clickhouse-go" + _ "github.com/ClickHouse/clickhouse-go/v2" _ "github.com/go-sql-driver/mysql" _ "github.com/lib/pq" "github.com/mitchellh/mapstructure"