diff --git a/.ci/Dockerfile-plugins b/.ci/Dockerfile-plugins
index f18c9cfe09..46f9e62bd1 100644
--- a/.ci/Dockerfile-plugins
+++ b/.ci/Dockerfile-plugins
@@ -19,11 +19,14 @@ RUN set -e -u -x \
;; \
tdengine ) \
if [ "$(uname -m)" = "x86_64" ]; then \
- wget "https://www.taosdata.com/download/download-all.php?pkgType=tdengine_linux&pkgName=TDengine-client-2.0.3.1-Linux-x64.tar.gz" -O /tmp/TDengine-client-2.0.3.1-Linux-x64.tar.gz \
- && tar -zxvf /tmp/TDengine-client-2.0.3.1-Linux-x64.tar.gz \
- && cd TDengine-client && ./install_client.sh && cd - \
- && go build --buildmode=plugin -tags plugins -o plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME.go; \
- fi \
+ wget "https://www.taosdata.com/download/download-all.php?pkgType=tdengine_linux&pkgName=TDengine-client-2.0.6.0-Linux-x64.tar.gz" -O /tmp/TDengine-client-2.0.6.0.tar.gz; \
+ fi; \
+ if [ "$(uname -m)" = "aarch64" ]; then \
+ wget "https://www.taosdata.com/download/download-all.php?pkgType=tdengine_linux&pkgName=TDengine-client-2.0.6.0-Linux-aarch64.tar.gz" -O /tmp/TDengine-client-2.0.6.0.tar.gz; \
+ fi; \
+ tar -zxvf /tmp/TDengine-client-2.0.6.0.tar.gz \
+ && cd TDengine-client-2.0.6.0 && ./install_client.sh && cd - \
+ && go build --buildmode=plugin -tags plugins -o plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME.go \
;; \
* ) \
go build --buildmode=plugin -o plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME.go \
diff --git a/.github/workflows/build_packages.yaml b/.github/workflows/build_packages.yaml
index fe40bd95ee..90c6d8fbe8 100644
--- a/.github/workflows/build_packages.yaml
+++ b/.github/workflows/build_packages.yaml
@@ -262,13 +262,12 @@ jobs:
cd _packages && for var in $( ls |grep -v sha256); do
echo "$(cat $var.sha256) $var" | sha256sum -c || exit 1
done
- - name: update github release
+ - uses: zhanghongtong/upload-release-asset@v1
if: github.event_name == 'release'
- run: |
- version=$(echo ${{ github.ref }} | sed -r "s .*/.*/(.*) \1 g")
- for var in $(ls _packages) ; do
- .github/workflows/script/upload_github_release_asset.sh owner=emqx repo=kuiper tag=$version filename=_packages/$var github_api_token=$(echo ${{ secrets.AccessToken }})
- done
+ with:
+ repo: kuiper
+ path: "_packages/kuiper-*"
+ token: ${{ secrets.AccessToken }}
- name: create invalidation for cloudfront
if: github.event_name == 'release'
run: |
@@ -287,7 +286,13 @@ jobs:
if: github.event_name == 'release'
run: |
version=$(echo ${{ github.ref }} | sed -r "s .*/.*/(.*) \1 g")
- curl -w %{http_code} --insecure -H ${{ secrets.EmqxHeader }} https://admin.emqx.io/admin_api/v1/kuiper_github_release_callback?tag=$version
+ curl -w %{http_code} \
+ --insecure \
+ -H "Content-Type: application/json" \
+ -H "token: ${{ secrets.EMQX_IO_TOKEN }}" \
+ -X POST \
+ -d "{\"repo\":\"emqx/kuiper\", \"tag\": \"${version}\" }" \
+ ${{ secrets.EMQX_IO_RELEASE_API }}
- name: update helm packages
if: github.event_name == 'release'
run: |
diff --git a/.github/workflows/run_fvt_tests.yaml b/.github/workflows/run_fvt_tests.yaml
index 0b718a3815..13b2a33079 100644
--- a/.github/workflows/run_fvt_tests.yaml
+++ b/.github/workflows/run_fvt_tests.yaml
@@ -23,9 +23,9 @@ jobs:
- name: install jmeter
timeout-minutes: 10
env:
- JMETER_VERSION: 5.2.1
+ JMETER_VERSION: 5.3
run: |
- wget --no-check-certificate -O /tmp/apache-jmeter.tgz http://us.mirrors.quenda.co/apache//jmeter/binaries/apache-jmeter-$JMETER_VERSION.tgz --no-check-certificate
+ wget --no-check-certificate -O /tmp/apache-jmeter.tgz https://downloads.apache.org//jmeter/binaries/apache-jmeter-$JMETER_VERSION.tgz
cd /tmp && tar -xvf apache-jmeter.tgz
echo "jmeter.save.saveservice.output_format=xml" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties
echo "jmeter.save.saveservice.response_data.on_error=true" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties
@@ -84,9 +84,9 @@ jobs:
- name: install jmeter
timeout-minutes: 10
env:
- JMETER_VERSION: 5.2.1
+ JMETER_VERSION: 5.3
run: |
- wget --no-check-certificate -O /tmp/apache-jmeter.tgz http://us.mirrors.quenda.co/apache//jmeter/binaries/apache-jmeter-$JMETER_VERSION.tgz
+ wget --no-check-certificate -O /tmp/apache-jmeter.tgz https://downloads.apache.org//jmeter/binaries/apache-jmeter-$JMETER_VERSION.tgz
cd /tmp && tar -xvf apache-jmeter.tgz
echo "jmeter.save.saveservice.output_format=xml" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties
echo "jmeter.save.saveservice.response_data.on_error=true" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties
diff --git a/.github/workflows/script/upload_github_release_asset.sh b/.github/workflows/script/upload_github_release_asset.sh
deleted file mode 100755
index 4e92fdb583..0000000000
--- a/.github/workflows/script/upload_github_release_asset.sh
+++ /dev/null
@@ -1,64 +0,0 @@
-#!/usr/bin/env bash
-#
-# Author: Stefan Buck
-# License: MIT
-# https://gist.github.com/stefanbuck/ce788fee19ab6eb0b4447a85fc99f447
-#
-#
-# This script accepts the following parameters:
-#
-# * owner
-# * repo
-# * tag
-# * filename
-# * github_api_token
-#
-# Script to upload a release asset using the GitHub API v3.
-#
-# Example:
-#
-# upload_github_release_asset.sh github_api_token=TOKEN owner=stefanbuck repo=playground tag=v0.1.0 filename=./build.zip
-#
-
-# Check dependencies.
-set -e
-xargs=$(which gxargs || which xargs)
-
-# Validate settings.
-[ "$TRACE" ] && set -x
-
-CONFIG=$@
-
-for line in $CONFIG; do
- eval "$line"
-done
-
-# Define variables.
-GH_API="https://api.github.com"
-GH_REPO="$GH_API/repos/$owner/$repo"
-GH_TAGS="$GH_REPO/releases/tags/$tag"
-AUTH="Authorization: token $github_api_token"
-WGET_ARGS="--content-disposition --auth-no-challenge --no-cookie"
-CURL_ARGS="-LJO#"
-
-if [[ "$tag" == 'LATEST' ]]; then
- GH_TAGS="$GH_REPO/releases/latest"
-fi
-
-# Validate token.
-curl -o /dev/null -sH "$AUTH" $GH_REPO || { echo "Error: Invalid repo, token or network issue!"; exit 1; }
-
-# Read asset tags.
-response=$(curl -sH "$AUTH" $GH_TAGS)
-
-# Get ID of the asset based on given filename.
-eval $(echo "$response" | grep -m 1 "id.:" | grep -w id | tr : = | tr -cd '[[:alnum:]]=')
-[ "$id" ] || { echo "Error: Failed to get release id for tag: $tag"; echo "$response" | awk 'length($0)<100' >&2; exit 1; }
-
-# Upload asset
-echo "Uploading asset... "
-
-# Construct url
-GH_ASSET="https://uploads.github.com/repos/$owner/$repo/releases/$id/assets?name=$(basename $filename)"
-
-curl "$GITHUB_OAUTH_BASIC" --data-binary @"$filename" -H "Authorization: token $github_api_token" -H "Content-Type: application/octet-stream" $GH_ASSET
diff --git a/Makefile b/Makefile
index 6dfa17938c..c9edbce88a 100644
--- a/Makefile
+++ b/Makefile
@@ -182,7 +182,7 @@ PLUGINS := sinks/file \
plugins: cross_prepare sinks/tdengine $(PLUGINS)
sinks/tdengine:
@docker buildx build --no-cache \
- --platform=linux/amd64 \
+ --platform=linux/amd64,linux/arm64 \
-t cross_build \
--build-arg VERSION=$(VERSION) \
--build-arg PLUGIN_TYPE=sinks \
@@ -191,9 +191,12 @@ sinks/tdengine:
-f .ci/Dockerfile-plugins .
@mkdir -p _plugins/debian/sinks
- @tar -xvf /tmp/cross_build_plugins_sinks_tdengine.tar --wildcards "go/kuiper/plugins/sinks/tdengine/tdengine_amd64.zip" \
- && mv go/kuiper/plugins/sinks/tdengine/tdengine_amd64.zip _plugins/debian/sinks
+ @for arch in amd64 arm64; do \
+ tar -xvf /tmp/cross_build_plugins_sinks_tdengine.tar --wildcards "linux_$${arch}/go/kuiper/plugins/sinks/tdengine/tdengine_$$(echo $${arch%%_*}).zip" \
+ && mv $$(ls linux_$${arch}/go/kuiper/plugins/sinks/tdengine/tdengine_$$(echo $${arch%%_*}).zip) _plugins/debian/sinks; \
+ done
@rm -f /tmp/cross_build_plugins_sinks_tdengine.tar
+
$(PLUGINS): PLUGIN_TYPE = $(word 1, $(subst /, , $@))
$(PLUGINS): PLUGIN_NAME = $(word 2, $(subst /, , $@))
$(PLUGINS):
diff --git a/common/util.go b/common/util.go
index 9c07c40b01..1ba9f3f5c1 100644
--- a/common/util.go
+++ b/common/util.go
@@ -15,19 +15,22 @@ import (
"path"
"path/filepath"
//"runtime"
+ logrus_syslog "github.com/sirupsen/logrus/hooks/syslog"
+ "log/syslog"
"sort"
"strings"
"sync"
)
const (
- logFileName = "stream.log"
- etc_dir = "/etc/"
- data_dir = "/data/"
- log_dir = "/log/"
- StreamConf = "kuiper.yaml"
- KuiperBaseKey = "KuiperBaseKey"
- MetaKey = "__meta"
+ logFileName = "stream.log"
+ etc_dir = "/etc/"
+ data_dir = "/data/"
+ log_dir = "/log/"
+ StreamConf = "kuiper.yaml"
+ KuiperBaseKey = "KuiperBaseKey"
+ KuiperSyslogKey = "KuiperSyslogKey"
+ MetaKey = "__meta"
)
var (
@@ -79,21 +82,19 @@ type KuiperConf struct {
}
func init() {
+ Log = logrus.New()
+ if "true" == os.Getenv(KuiperSyslogKey) {
+ if hook, err := logrus_syslog.NewSyslogHook("", "", syslog.LOG_INFO, ""); err != nil {
+ Log.Error("Unable to connect to local syslog daemon")
+ } else {
+ Log.AddHook(hook)
+ }
+ }
+
filenameHook := filename.NewHook()
filenameHook.Field = "file"
- Log = logrus.New()
Log.AddHook(filenameHook)
- /*
- Log.SetReportCaller(true)
- Log.SetFormatter(&logrus.TextFormatter{
- CallerPrettyfier: func(f *runtime.Frame) (string, string) {
- filename := path.Base(f.File)
- return "", fmt.Sprintf("%s:%d", filename, f.Line)
- },
- DisableColors: true,
- FullTimestamp: true,
- })
- */
+
Log.SetFormatter(&logrus.TextFormatter{
TimestampFormat: "2006-01-02 15:04:05",
DisableColors: true,
diff --git a/docs/en_US/edgex/edgex_rule_engine_tutorial.md b/docs/en_US/edgex/edgex_rule_engine_tutorial.md
index 331ab0187f..c73825b9ea 100644
--- a/docs/en_US/edgex/edgex_rule_engine_tutorial.md
+++ b/docs/en_US/edgex/edgex_rule_engine_tutorial.md
@@ -71,6 +71,10 @@ f69e9c4d6cc8 nexus3.edgexfoundry.org:10004/docker-core-data-go:master
ed7ad5ae08b2 nexus3.edgexfoundry.org:10004/docker-edgex-volume:master "/bin/sh -c '/usr/bi…" 37 minutes ago Up 37 minutes edgex-files
```
+#### Run with native
+
+For performance reason, reader probably wants to run Kuiper with native approach. But you may find that [EdgeX cannot be used](https://github.com/emqx/kuiper/issues/596) with the downloaded Kuiper binary packages. It's because that EdgeX message bus relies on `zeromq` library. If `zeromq` library cannot be found in the library search path, it cannot be started. So it will have those Kuiper users who do not want to use EdgeX install the `zeromq` library as well. For this reason, the default downloaded Kuiper package **does NOT have embedded support** for `EdgeX`. If reader wants to support `EdgeX` in native packages, you can either make a native package by running command `make pkg_with_edgex`, or just copy the binary package from docker container.
+
### Create a stream
There are two approaches to manage stream, you can use your preferred approach.
diff --git a/docs/en_US/operation/configuration_file.md b/docs/en_US/operation/configuration_file.md
index 1d0789992b..9d0487304c 100644
--- a/docs/en_US/operation/configuration_file.md
+++ b/docs/en_US/operation/configuration_file.md
@@ -12,7 +12,8 @@ basic:
# true|false, if it's set to true, then the log will be print to log file
fileLog: true
```
-
+## system log
+When the user sets the value of the environment variable named KuiperSyslogKey to true, the log will be printed to the syslog.
## Cli Port
```yaml
basic:
diff --git a/docs/zh_CN/edgex/edgex_rule_engine_tutorial.md b/docs/zh_CN/edgex/edgex_rule_engine_tutorial.md
index ab7a32be69..8686e492e7 100644
--- a/docs/zh_CN/edgex/edgex_rule_engine_tutorial.md
+++ b/docs/zh_CN/edgex/edgex_rule_engine_tutorial.md
@@ -67,6 +67,10 @@ f69e9c4d6cc8 nexus3.edgexfoundry.org:10004/docker-core-data-go:master
ed7ad5ae08b2 nexus3.edgexfoundry.org:10004/docker-edgex-volume:master "/bin/sh -c '/usr/bi…" 37 minutes ago Up 37 minutes edgex-files
```
+#### 原生 (native) 方式运行
+
+出于运行效率考虑,读者可能需要直接以原生方式运行 Kuiper,但是可能会发现直接使用下载的 Kuiper 软件包启动后[无法直接使用 Edgex](https://github.com/emqx/kuiper/issues/596),这是因为 EdgeX 缺省消息总线依赖于 `zeromq` 库,如果 Kuiper 启动的时候在库文件寻找路径下无法找到 `zeromq` 库,它将无法启动。这导致对于不需要使用 EdgeX 的 Kuiper 用户也不得不去安装 `zeromq` 库 ,因此缺省提供的下载安装包中**内置不支持 Edgex** 。如果读者需要以原生方式运行 Kuiper 并且支持 `EdgeX`,可以通过命令 `make pkg_with_edgex` 自己来编译原生安装包,或者从容器中直接拷贝出安装包。
+
### 创建流
该步骤是创建一个可以从 EdgeX 消息总线进行数据消费的流。有两种方法来支持管理流,你可以选择喜欢的方式。
diff --git a/docs/zh_CN/operation/configuration_file.md b/docs/zh_CN/operation/configuration_file.md
index 012f39f70c..6b5b87689e 100755
--- a/docs/zh_CN/operation/configuration_file.md
+++ b/docs/zh_CN/operation/configuration_file.md
@@ -12,6 +12,8 @@ basic:
# true|false, if it's set to true, then the log will be print to log file
fileLog: true
```
+## 系统日志
+用户将名为 KuiperSyslogKey 的环境变量的值设置为 true 时,日志将打印到系统日志中。
## Cli 端口
```yaml
basic:
diff --git a/etc/sinks/rest.json b/etc/sinks/rest.json
index b20a2c6fcd..688e64fb7a 100644
--- a/etc/sinks/rest.json
+++ b/etc/sinks/rest.json
@@ -95,10 +95,10 @@
},
{
"name": "headers",
- "default": "",
+ "default": [],
"optional": true,
- "control": "text",
- "type": "string",
+ "control": "list",
+ "type": "list_object",
"hint": {
"en_US": "The additional headers to be set for the HTTP request.",
"zh_CN": "要为 HTTP 请求设置的其他标头"
diff --git a/plugins/sinks/tdengine/install.sh b/plugins/sinks/tdengine/install.sh
index 5185cda2c3..2c0f52c7fd 100644
--- a/plugins/sinks/tdengine/install.sh
+++ b/plugins/sinks/tdengine/install.sh
@@ -8,6 +8,9 @@ then
fi
url="https://www.taosdata.com/download/download-all.php?pkgType=tdengine_linux&pkgName=TDengine-client-$1-Linux-x64.tar.gz"
+if [ "$(uname -m)" = "aarch64" ]; then \
+ url="https://www.taosdata.com/download/download-all.php?pkgType=tdengine_linux&pkgName=TDengine-client-$1-Linux-aarch64.tar.gz"
+fi
zip="TDengine-client.tar.gz"
wget -T 280 -O "$zip" "$url"
@@ -18,7 +21,8 @@ then
fi
dir="TDengine-client"
-tar -zxvf "$zip"
+mkdir "$dir"
+tar -xzvf "$zip" -C ./"$dir" --strip-components 1
rm "$zip"
if ! [ -e $dir ]
diff --git a/xsql/ast.go b/xsql/ast.go
index 5ee815968e..a760d73bc0 100644
--- a/xsql/ast.go
+++ b/xsql/ast.go
@@ -314,10 +314,10 @@ type StreamField struct {
func (u *StreamField) MarshalJSON() ([]byte, error) {
return json.Marshal(&struct {
- FieldType string
+ FieldType interface{}
Name string
}{
- FieldType: PrintFieldType(u.FieldType),
+ FieldType: PrintFieldTypeForJson(u.FieldType),
Name: u.Name,
})
}
diff --git a/xsql/ast_test.go b/xsql/ast_test.go
index 220e168047..2897741775 100644
--- a/xsql/ast_test.go
+++ b/xsql/ast_test.go
@@ -1,6 +1,7 @@
package xsql
import (
+ "encoding/json"
"fmt"
"reflect"
"testing"
@@ -108,3 +109,42 @@ func Test_MessageValTest(t *testing.T) {
}
}
}
+
+func Test_StreamFieldsMarshall(t *testing.T) {
+ var tests = []struct {
+ sf StreamFields
+ r string
+ }{{
+ sf: []StreamField{
+ {Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
+ {Name: "FIRST_NAME", FieldType: &BasicType{Type: STRINGS}},
+ {Name: "LAST_NAME", FieldType: &BasicType{Type: STRINGS}},
+ {Name: "NICKNAMES", FieldType: &ArrayType{Type: STRINGS}},
+ {Name: "Gender", FieldType: &BasicType{Type: BOOLEAN}},
+ {Name: "ADDRESS", FieldType: &RecType{
+ StreamFields: []StreamField{
+ {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
+ {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
+ },
+ }},
+ },
+ r: `[{"FieldType":"bigint","Name":"USERID"},{"FieldType":"string","Name":"FIRST_NAME"},{"FieldType":"string","Name":"LAST_NAME"},{"FieldType":{"Type":"array","ElementType":"string"},"Name":"NICKNAMES"},{"FieldType":"boolean","Name":"Gender"},{"FieldType":{"Type":"struct","Fields":[{"FieldType":"string","Name":"STREET_NAME"},{"FieldType":"bigint","Name":"NUMBER"}]},"Name":"ADDRESS"}]`,
+ }, {
+ sf: []StreamField{
+ {Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
+ },
+ r: `[{"FieldType":"bigint","Name":"USERID"}]`,
+ }}
+ fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+ for i, tt := range tests {
+ r, err := json.Marshal(tt.sf)
+ if err != nil {
+ t.Errorf("%d. \nmarshall error: %v", i, err)
+ t.FailNow()
+ }
+ result := string(r)
+ if !reflect.DeepEqual(tt.r, result) {
+ t.Errorf("%d. \nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.r, result)
+ }
+ }
+}
diff --git a/xsql/processors/common_test.go b/xsql/processors/common_test.go
index 1bc1da5dc5..3a3c2fdda4 100644
--- a/xsql/processors/common_test.go
+++ b/xsql/processors/common_test.go
@@ -71,7 +71,7 @@ func compareMetrics(tp *xstream.TopologyNew, m map[string]interface{}) (err erro
)
for index, key = range keys {
if k == key {
- if strings.HasSuffix(k, "process_latency_ms") {
+ if strings.HasSuffix(k, "process_latency_us") {
if values[index].(int64) >= v.(int64) {
matched = true
continue
diff --git a/xsql/processors/extension_test.go b/xsql/processors/extension_test.go
index bd9afbe376..79484f1f0a 100644
--- a/xsql/processors/extension_test.go
+++ b/xsql/processors/extension_test.go
@@ -180,12 +180,12 @@ func TestFuncState(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_text_0_exceptions_total": int64(0),
- "op_preprocessor_text_0_process_latency_ms": int64(0),
+ "op_preprocessor_text_0_process_latency_us": int64(0),
"op_preprocessor_text_0_records_in_total": int64(8),
"op_preprocessor_text_0_records_out_total": int64(8),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(8),
"op_project_0_records_out_total": int64(8),
@@ -245,12 +245,12 @@ func TestFuncStateCheckpoint(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_text_0_exceptions_total": int64(0),
- "op_preprocessor_text_0_process_latency_ms": int64(0),
+ "op_preprocessor_text_0_process_latency_us": int64(0),
"op_preprocessor_text_0_records_in_total": int64(6),
"op_preprocessor_text_0_records_out_total": int64(6),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(6),
"op_project_0_records_out_total": int64(6),
@@ -267,12 +267,12 @@ func TestFuncStateCheckpoint(t *testing.T) {
cc: 1,
pauseMetric: map[string]interface{}{
"op_preprocessor_text_0_exceptions_total": int64(0),
- "op_preprocessor_text_0_process_latency_ms": int64(0),
+ "op_preprocessor_text_0_process_latency_us": int64(0),
"op_preprocessor_text_0_records_in_total": int64(3),
"op_preprocessor_text_0_records_out_total": int64(3),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(3),
"op_project_0_records_out_total": int64(3),
diff --git a/xsql/processors/rule_test.go b/xsql/processors/rule_test.go
index 03204ddf3d..c46c43057b 100644
--- a/xsql/processors/rule_test.go
+++ b/xsql/processors/rule_test.go
@@ -45,12 +45,12 @@ func TestSingleSQL(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demo_0_exceptions_total": int64(0),
- "op_preprocessor_demo_0_process_latency_ms": int64(0),
+ "op_preprocessor_demo_0_process_latency_us": int64(0),
"op_preprocessor_demo_0_records_in_total": int64(5),
"op_preprocessor_demo_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(5),
"op_project_0_records_out_total": int64(5),
@@ -85,12 +85,12 @@ func TestSingleSQL(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demo_0_exceptions_total": int64(0),
- "op_preprocessor_demo_0_process_latency_ms": int64(0),
+ "op_preprocessor_demo_0_process_latency_us": int64(0),
"op_preprocessor_demo_0_records_in_total": int64(5),
"op_preprocessor_demo_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(2),
"op_project_0_records_out_total": int64(2),
@@ -103,7 +103,7 @@ func TestSingleSQL(t *testing.T) {
"source_demo_0_records_out_total": int64(5),
"op_filter_0_exceptions_total": int64(0),
- "op_filter_0_process_latency_ms": int64(0),
+ "op_filter_0_process_latency_us": int64(0),
"op_filter_0_records_in_total": int64(5),
"op_filter_0_records_out_total": int64(2),
},
@@ -122,12 +122,12 @@ func TestSingleSQL(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demo_0_exceptions_total": int64(0),
- "op_preprocessor_demo_0_process_latency_ms": int64(0),
+ "op_preprocessor_demo_0_process_latency_us": int64(0),
"op_preprocessor_demo_0_records_in_total": int64(5),
"op_preprocessor_demo_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(2),
"op_project_0_records_out_total": int64(2),
@@ -140,7 +140,7 @@ func TestSingleSQL(t *testing.T) {
"source_demo_0_records_out_total": int64(5),
"op_filter_0_exceptions_total": int64(0),
- "op_filter_0_process_latency_ms": int64(0),
+ "op_filter_0_process_latency_us": int64(0),
"op_filter_0_records_in_total": int64(5),
"op_filter_0_records_out_total": int64(2),
},
@@ -164,12 +164,12 @@ func TestSingleSQL(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demoError_0_exceptions_total": int64(3),
- "op_preprocessor_demoError_0_process_latency_ms": int64(0),
+ "op_preprocessor_demoError_0_process_latency_us": int64(0),
"op_preprocessor_demoError_0_records_in_total": int64(5),
"op_preprocessor_demoError_0_records_out_total": int64(2),
"op_project_0_exceptions_total": int64(3),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(4),
"op_project_0_records_out_total": int64(1),
@@ -182,7 +182,7 @@ func TestSingleSQL(t *testing.T) {
"source_demoError_0_records_out_total": int64(5),
"op_filter_0_exceptions_total": int64(3),
- "op_filter_0_process_latency_ms": int64(0),
+ "op_filter_0_process_latency_us": int64(0),
"op_filter_0_records_in_total": int64(5),
"op_filter_0_records_out_total": int64(1),
},
@@ -206,12 +206,12 @@ func TestSingleSQL(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demoError_0_exceptions_total": int64(3),
- "op_preprocessor_demoError_0_process_latency_ms": int64(0),
+ "op_preprocessor_demoError_0_process_latency_us": int64(0),
"op_preprocessor_demoError_0_records_in_total": int64(5),
"op_preprocessor_demoError_0_records_out_total": int64(2),
"op_project_0_exceptions_total": int64(3),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(4),
"op_project_0_records_out_total": int64(1),
@@ -224,7 +224,7 @@ func TestSingleSQL(t *testing.T) {
"source_demoError_0_records_out_total": int64(5),
"op_filter_0_exceptions_total": int64(3),
- "op_filter_0_process_latency_ms": int64(0),
+ "op_filter_0_process_latency_us": int64(0),
"op_filter_0_records_in_total": int64(5),
"op_filter_0_records_out_total": int64(1),
},
@@ -255,12 +255,12 @@ func TestSingleSQL(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demo_0_exceptions_total": int64(0),
- "op_preprocessor_demo_0_process_latency_ms": int64(0),
+ "op_preprocessor_demo_0_process_latency_us": int64(0),
"op_preprocessor_demo_0_records_in_total": int64(5),
"op_preprocessor_demo_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(5),
"op_project_0_records_out_total": int64(5),
@@ -287,12 +287,12 @@ func TestSingleSQL(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demo_0_exceptions_total": int64(0),
- "op_preprocessor_demo_0_process_latency_ms": int64(0),
+ "op_preprocessor_demo_0_process_latency_us": int64(0),
"op_preprocessor_demo_0_records_in_total": int64(5),
"op_preprocessor_demo_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(2),
"op_project_0_records_out_total": int64(2),
@@ -305,7 +305,7 @@ func TestSingleSQL(t *testing.T) {
"source_demo_0_records_out_total": int64(5),
"op_filter_0_exceptions_total": int64(0),
- "op_filter_0_process_latency_ms": int64(0),
+ "op_filter_0_process_latency_us": int64(0),
"op_filter_0_records_in_total": int64(5),
"op_filter_0_records_out_total": int64(2),
},
@@ -331,12 +331,12 @@ func TestSingleSQL(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demo1_0_exceptions_total": int64(0),
- "op_preprocessor_demo1_0_process_latency_ms": int64(0),
+ "op_preprocessor_demo1_0_process_latency_us": int64(0),
"op_preprocessor_demo1_0_records_in_total": int64(5),
"op_preprocessor_demo1_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(5),
"op_project_0_records_out_total": int64(5),
@@ -367,17 +367,17 @@ func TestSingleSQL(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demo1_0_exceptions_total": int64(0),
- "op_preprocessor_demo1_0_process_latency_ms": int64(0),
+ "op_preprocessor_demo1_0_process_latency_us": int64(0),
"op_preprocessor_demo1_0_records_in_total": int64(5),
"op_preprocessor_demo1_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(2),
"op_project_0_records_out_total": int64(2),
"op_filter_0_exceptions_total": int64(0),
- "op_filter_0_process_latency_ms": int64(0),
+ "op_filter_0_process_latency_us": int64(0),
"op_filter_0_records_in_total": int64(5),
"op_filter_0_records_out_total": int64(2),
@@ -433,12 +433,12 @@ func TestSingleSQLError(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_ldemo_0_exceptions_total": int64(0),
- "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
+ "op_preprocessor_ldemo_0_process_latency_us": int64(0),
"op_preprocessor_ldemo_0_records_in_total": int64(5),
"op_preprocessor_ldemo_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(1),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(3),
"op_project_0_records_out_total": int64(2),
@@ -451,7 +451,7 @@ func TestSingleSQLError(t *testing.T) {
"source_ldemo_0_records_out_total": int64(5),
"op_filter_0_exceptions_total": int64(1),
- "op_filter_0_process_latency_ms": int64(0),
+ "op_filter_0_process_latency_us": int64(0),
"op_filter_0_records_in_total": int64(5),
"op_filter_0_records_out_total": int64(2),
},
@@ -475,12 +475,12 @@ func TestSingleSQLError(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_ldemo_0_exceptions_total": int64(0),
- "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
+ "op_preprocessor_ldemo_0_process_latency_us": int64(0),
"op_preprocessor_ldemo_0_records_in_total": int64(5),
"op_preprocessor_ldemo_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(1),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(5),
"op_project_0_records_out_total": int64(4),
@@ -533,12 +533,12 @@ func TestSingleSQLTemplate(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demo_0_exceptions_total": int64(0),
- "op_preprocessor_demo_0_process_latency_ms": int64(0),
+ "op_preprocessor_demo_0_process_latency_us": int64(0),
"op_preprocessor_demo_0_records_in_total": int64(5),
"op_preprocessor_demo_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(5),
"op_project_0_records_out_total": int64(5),
@@ -591,12 +591,12 @@ func TestNoneSingleSQLTemplate(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demo_0_exceptions_total": int64(0),
- "op_preprocessor_demo_0_process_latency_ms": int64(0),
+ "op_preprocessor_demo_0_process_latency_us": int64(0),
"op_preprocessor_demo_0_records_in_total": int64(5),
"op_preprocessor_demo_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(5),
"op_project_0_records_out_total": int64(5),
diff --git a/xsql/processors/window_rule_test.go b/xsql/processors/window_rule_test.go
index 6723a8e273..5df81d0a8e 100644
--- a/xsql/processors/window_rule_test.go
+++ b/xsql/processors/window_rule_test.go
@@ -58,12 +58,12 @@ func TestWindow(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demo_0_exceptions_total": int64(0),
- "op_preprocessor_demo_0_process_latency_ms": int64(0),
+ "op_preprocessor_demo_0_process_latency_us": int64(0),
"op_preprocessor_demo_0_records_in_total": int64(5),
"op_preprocessor_demo_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(4),
"op_project_0_records_out_total": int64(4),
@@ -76,7 +76,7 @@ func TestWindow(t *testing.T) {
"source_demo_0_records_out_total": int64(5),
"op_window_0_exceptions_total": int64(0),
- "op_window_0_process_latency_ms": int64(0),
+ "op_window_0_process_latency_us": int64(0),
"op_window_0_records_in_total": int64(5),
"op_window_0_records_out_total": int64(4),
},
@@ -98,12 +98,12 @@ func TestWindow(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demo_0_exceptions_total": int64(0),
- "op_preprocessor_demo_0_process_latency_ms": int64(0),
+ "op_preprocessor_demo_0_process_latency_us": int64(0),
"op_preprocessor_demo_0_records_in_total": int64(5),
"op_preprocessor_demo_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(2),
"op_project_0_records_out_total": int64(2),
@@ -116,12 +116,12 @@ func TestWindow(t *testing.T) {
"source_demo_0_records_out_total": int64(5),
"op_window_0_exceptions_total": int64(0),
- "op_window_0_process_latency_ms": int64(0),
+ "op_window_0_process_latency_us": int64(0),
"op_window_0_records_in_total": int64(5),
"op_window_0_records_out_total": int64(4),
"op_filter_0_exceptions_total": int64(0),
- "op_filter_0_process_latency_ms": int64(0),
+ "op_filter_0_process_latency_us": int64(0),
"op_filter_0_records_in_total": int64(4),
"op_filter_0_records_out_total": int64(2),
},
@@ -173,17 +173,17 @@ func TestWindow(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demo_0_exceptions_total": int64(0),
- "op_preprocessor_demo_0_process_latency_ms": int64(0),
+ "op_preprocessor_demo_0_process_latency_us": int64(0),
"op_preprocessor_demo_0_records_in_total": int64(5),
"op_preprocessor_demo_0_records_out_total": int64(5),
"op_preprocessor_demo1_0_exceptions_total": int64(0),
- "op_preprocessor_demo1_0_process_latency_ms": int64(0),
+ "op_preprocessor_demo1_0_process_latency_us": int64(0),
"op_preprocessor_demo1_0_records_in_total": int64(5),
"op_preprocessor_demo1_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(8),
"op_project_0_records_out_total": int64(8),
@@ -200,12 +200,12 @@ func TestWindow(t *testing.T) {
"source_demo1_0_records_out_total": int64(5),
"op_window_0_exceptions_total": int64(0),
- "op_window_0_process_latency_ms": int64(0),
+ "op_window_0_process_latency_us": int64(0),
"op_window_0_records_in_total": int64(10),
"op_window_0_records_out_total": int64(10),
"op_join_0_exceptions_total": int64(0),
- "op_join_0_process_latency_ms": int64(0),
+ "op_join_0_process_latency_us": int64(0),
"op_join_0_records_in_total": int64(10),
"op_join_0_records_out_total": int64(8),
},
@@ -249,12 +249,12 @@ func TestWindow(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demo_0_exceptions_total": int64(0),
- "op_preprocessor_demo_0_process_latency_ms": int64(0),
+ "op_preprocessor_demo_0_process_latency_us": int64(0),
"op_preprocessor_demo_0_records_in_total": int64(5),
"op_preprocessor_demo_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(5),
"op_project_0_records_out_total": int64(5),
@@ -267,17 +267,17 @@ func TestWindow(t *testing.T) {
"source_demo_0_records_out_total": int64(5),
"op_window_0_exceptions_total": int64(0),
- "op_window_0_process_latency_ms": int64(0),
+ "op_window_0_process_latency_us": int64(0),
"op_window_0_records_in_total": int64(5),
"op_window_0_records_out_total": int64(5),
"op_aggregate_0_exceptions_total": int64(0),
- "op_aggregate_0_process_latency_ms": int64(0),
+ "op_aggregate_0_process_latency_us": int64(0),
"op_aggregate_0_records_in_total": int64(5),
"op_aggregate_0_records_out_total": int64(5),
"op_order_0_exceptions_total": int64(0),
- "op_order_0_process_latency_ms": int64(0),
+ "op_order_0_process_latency_us": int64(0),
"op_order_0_records_in_total": int64(5),
"op_order_0_records_out_total": int64(5),
},
@@ -311,12 +311,12 @@ func TestWindow(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_sessionDemo_0_exceptions_total": int64(0),
- "op_preprocessor_sessionDemo_0_process_latency_ms": int64(0),
+ "op_preprocessor_sessionDemo_0_process_latency_us": int64(0),
"op_preprocessor_sessionDemo_0_records_in_total": int64(11),
"op_preprocessor_sessionDemo_0_records_out_total": int64(11),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(4),
"op_project_0_records_out_total": int64(4),
@@ -329,7 +329,7 @@ func TestWindow(t *testing.T) {
"source_sessionDemo_0_records_out_total": int64(11),
"op_window_0_exceptions_total": int64(0),
- "op_window_0_process_latency_ms": int64(0),
+ "op_window_0_process_latency_us": int64(0),
"op_window_0_records_in_total": int64(11),
"op_window_0_records_out_total": int64(4),
},
@@ -365,17 +365,17 @@ func TestWindow(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demo_0_exceptions_total": int64(0),
- "op_preprocessor_demo_0_process_latency_ms": int64(0),
+ "op_preprocessor_demo_0_process_latency_us": int64(0),
"op_preprocessor_demo_0_records_in_total": int64(5),
"op_preprocessor_demo_0_records_out_total": int64(5),
"op_preprocessor_demo1_0_exceptions_total": int64(0),
- "op_preprocessor_demo1_0_process_latency_ms": int64(0),
+ "op_preprocessor_demo1_0_process_latency_us": int64(0),
"op_preprocessor_demo1_0_records_in_total": int64(5),
"op_preprocessor_demo1_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(8),
"op_project_0_records_out_total": int64(8),
@@ -392,12 +392,12 @@ func TestWindow(t *testing.T) {
"source_demo1_0_records_out_total": int64(5),
"op_window_0_exceptions_total": int64(0),
- "op_window_0_process_latency_ms": int64(0),
+ "op_window_0_process_latency_us": int64(0),
"op_window_0_records_in_total": int64(10),
"op_window_0_records_out_total": int64(10),
"op_join_0_exceptions_total": int64(0),
- "op_join_0_process_latency_ms": int64(0),
+ "op_join_0_process_latency_us": int64(0),
"op_join_0_records_in_total": int64(10),
"op_join_0_records_out_total": int64(8),
},
@@ -436,12 +436,12 @@ func TestWindow(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demoError_0_exceptions_total": int64(3),
- "op_preprocessor_demoError_0_process_latency_ms": int64(0),
+ "op_preprocessor_demoError_0_process_latency_us": int64(0),
"op_preprocessor_demoError_0_records_in_total": int64(5),
"op_preprocessor_demoError_0_records_out_total": int64(2),
"op_project_0_exceptions_total": int64(3),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(6),
"op_project_0_records_out_total": int64(3),
@@ -454,7 +454,7 @@ func TestWindow(t *testing.T) {
"source_demoError_0_records_out_total": int64(5),
"op_window_0_exceptions_total": int64(3),
- "op_window_0_process_latency_ms": int64(0),
+ "op_window_0_process_latency_us": int64(0),
"op_window_0_records_in_total": int64(5),
"op_window_0_records_out_total": int64(3),
},
@@ -470,12 +470,12 @@ func TestWindow(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demo_0_exceptions_total": int64(0),
- "op_preprocessor_demo_0_process_latency_ms": int64(0),
+ "op_preprocessor_demo_0_process_latency_us": int64(0),
"op_preprocessor_demo_0_records_in_total": int64(5),
"op_preprocessor_demo_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(1),
"op_project_0_records_out_total": int64(1),
@@ -488,22 +488,22 @@ func TestWindow(t *testing.T) {
"source_demo_0_records_out_total": int64(5),
"op_window_0_exceptions_total": int64(0),
- "op_window_0_process_latency_ms": int64(0),
+ "op_window_0_process_latency_us": int64(0),
"op_window_0_records_in_total": int64(5),
"op_window_0_records_out_total": int64(4),
"op_filter_0_exceptions_total": int64(0),
- "op_filter_0_process_latency_ms": int64(0),
+ "op_filter_0_process_latency_us": int64(0),
"op_filter_0_records_in_total": int64(4),
"op_filter_0_records_out_total": int64(2),
"op_aggregate_0_exceptions_total": int64(0),
- "op_aggregate_0_process_latency_ms": int64(0),
+ "op_aggregate_0_process_latency_us": int64(0),
"op_aggregate_0_records_in_total": int64(2),
"op_aggregate_0_records_out_total": int64(2),
"op_having_0_exceptions_total": int64(0),
- "op_having_0_process_latency_ms": int64(0),
+ "op_having_0_process_latency_us": int64(0),
"op_having_0_records_in_total": int64(2),
"op_having_0_records_out_total": int64(1),
},
@@ -542,12 +542,12 @@ func TestWindow(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demo_0_exceptions_total": int64(0),
- "op_preprocessor_demo_0_process_latency_ms": int64(0),
+ "op_preprocessor_demo_0_process_latency_us": int64(0),
"op_preprocessor_demo_0_records_in_total": int64(5),
"op_preprocessor_demo_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(4),
"op_project_0_records_out_total": int64(4),
@@ -560,7 +560,7 @@ func TestWindow(t *testing.T) {
"source_demo_0_records_out_total": int64(5),
"op_window_0_exceptions_total": int64(0),
- "op_window_0_process_latency_ms": int64(0),
+ "op_window_0_process_latency_us": int64(0),
"op_window_0_records_in_total": int64(3),
"op_window_0_records_out_total": int64(4),
},
@@ -574,12 +574,12 @@ func TestWindow(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demo_0_exceptions_total": int64(0),
- "op_preprocessor_demo_0_process_latency_ms": int64(0),
+ "op_preprocessor_demo_0_process_latency_us": int64(0),
"op_preprocessor_demo_0_records_in_total": int64(5),
"op_preprocessor_demo_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(1),
"op_project_0_records_out_total": int64(1),
@@ -592,7 +592,7 @@ func TestWindow(t *testing.T) {
"source_demo_0_records_out_total": int64(5),
"op_window_0_exceptions_total": int64(0),
- "op_window_0_process_latency_ms": int64(0),
+ "op_window_0_process_latency_us": int64(0),
"op_window_0_records_in_total": int64(5),
"op_window_0_records_out_total": int64(1),
},
@@ -610,12 +610,12 @@ func TestWindow(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demo_0_exceptions_total": int64(0),
- "op_preprocessor_demo_0_process_latency_ms": int64(0),
+ "op_preprocessor_demo_0_process_latency_us": int64(0),
"op_preprocessor_demo_0_records_in_total": int64(5),
"op_preprocessor_demo_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(5),
"op_project_0_records_out_total": int64(5),
@@ -628,7 +628,7 @@ func TestWindow(t *testing.T) {
"source_demo_0_records_out_total": int64(5),
"op_window_0_exceptions_total": int64(0),
- "op_window_0_process_latency_ms": int64(0),
+ "op_window_0_process_latency_us": int64(0),
"op_window_0_records_in_total": int64(5),
"op_window_0_records_out_total": int64(5),
},
@@ -700,12 +700,12 @@ func TestEventWindow(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demoE_0_exceptions_total": int64(0),
- "op_preprocessor_demoE_0_process_latency_ms": int64(0),
+ "op_preprocessor_demoE_0_process_latency_us": int64(0),
"op_preprocessor_demoE_0_records_in_total": int64(6),
"op_preprocessor_demoE_0_records_out_total": int64(6),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(5),
"op_project_0_records_out_total": int64(5),
@@ -718,7 +718,7 @@ func TestEventWindow(t *testing.T) {
"source_demoE_0_records_out_total": int64(6),
"op_window_0_exceptions_total": int64(0),
- "op_window_0_process_latency_ms": int64(0),
+ "op_window_0_process_latency_us": int64(0),
"op_window_0_records_in_total": int64(6),
"op_window_0_records_out_total": int64(5),
},
@@ -737,12 +737,12 @@ func TestEventWindow(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demoE_0_exceptions_total": int64(0),
- "op_preprocessor_demoE_0_process_latency_ms": int64(0),
+ "op_preprocessor_demoE_0_process_latency_us": int64(0),
"op_preprocessor_demoE_0_records_in_total": int64(6),
"op_preprocessor_demoE_0_records_out_total": int64(6),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(2),
"op_project_0_records_out_total": int64(2),
@@ -755,12 +755,12 @@ func TestEventWindow(t *testing.T) {
"source_demoE_0_records_out_total": int64(6),
"op_window_0_exceptions_total": int64(0),
- "op_window_0_process_latency_ms": int64(0),
+ "op_window_0_process_latency_us": int64(0),
"op_window_0_records_in_total": int64(6),
"op_window_0_records_out_total": int64(4),
"op_filter_0_exceptions_total": int64(0),
- "op_filter_0_process_latency_ms": int64(0),
+ "op_filter_0_process_latency_us": int64(0),
"op_filter_0_records_in_total": int64(4),
"op_filter_0_records_out_total": int64(2),
},
@@ -800,17 +800,17 @@ func TestEventWindow(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demoE_0_exceptions_total": int64(0),
- "op_preprocessor_demoE_0_process_latency_ms": int64(0),
+ "op_preprocessor_demoE_0_process_latency_us": int64(0),
"op_preprocessor_demoE_0_records_in_total": int64(6),
"op_preprocessor_demoE_0_records_out_total": int64(6),
"op_preprocessor_demo1E_0_exceptions_total": int64(0),
- "op_preprocessor_demo1E_0_process_latency_ms": int64(0),
+ "op_preprocessor_demo1E_0_process_latency_us": int64(0),
"op_preprocessor_demo1E_0_records_in_total": int64(6),
"op_preprocessor_demo1E_0_records_out_total": int64(6),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(5),
"op_project_0_records_out_total": int64(5),
@@ -827,12 +827,12 @@ func TestEventWindow(t *testing.T) {
"source_demo1E_0_records_out_total": int64(6),
"op_window_0_exceptions_total": int64(0),
- "op_window_0_process_latency_ms": int64(0),
+ "op_window_0_process_latency_us": int64(0),
"op_window_0_records_in_total": int64(12),
"op_window_0_records_out_total": int64(5),
"op_join_0_exceptions_total": int64(0),
- "op_join_0_process_latency_ms": int64(0),
+ "op_join_0_process_latency_us": int64(0),
"op_join_0_records_in_total": int64(5),
"op_join_0_records_out_total": int64(5),
},
@@ -860,12 +860,12 @@ func TestEventWindow(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demoE_0_exceptions_total": int64(0),
- "op_preprocessor_demoE_0_process_latency_ms": int64(0),
+ "op_preprocessor_demoE_0_process_latency_us": int64(0),
"op_preprocessor_demoE_0_records_in_total": int64(6),
"op_preprocessor_demoE_0_records_out_total": int64(6),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(4),
"op_project_0_records_out_total": int64(4),
@@ -878,17 +878,17 @@ func TestEventWindow(t *testing.T) {
"source_demoE_0_records_out_total": int64(6),
"op_window_0_exceptions_total": int64(0),
- "op_window_0_process_latency_ms": int64(0),
+ "op_window_0_process_latency_us": int64(0),
"op_window_0_records_in_total": int64(6),
"op_window_0_records_out_total": int64(4),
"op_aggregate_0_exceptions_total": int64(0),
- "op_aggregate_0_process_latency_ms": int64(0),
+ "op_aggregate_0_process_latency_us": int64(0),
"op_aggregate_0_records_in_total": int64(4),
"op_aggregate_0_records_out_total": int64(4),
"op_order_0_exceptions_total": int64(0),
- "op_order_0_process_latency_ms": int64(0),
+ "op_order_0_process_latency_us": int64(0),
"op_order_0_records_in_total": int64(4),
"op_order_0_records_out_total": int64(4),
},
@@ -920,12 +920,12 @@ func TestEventWindow(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_sessionDemoE_0_exceptions_total": int64(0),
- "op_preprocessor_sessionDemoE_0_process_latency_ms": int64(0),
+ "op_preprocessor_sessionDemoE_0_process_latency_us": int64(0),
"op_preprocessor_sessionDemoE_0_records_in_total": int64(12),
"op_preprocessor_sessionDemoE_0_records_out_total": int64(12),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(4),
"op_project_0_records_out_total": int64(4),
@@ -938,7 +938,7 @@ func TestEventWindow(t *testing.T) {
"source_sessionDemoE_0_records_out_total": int64(12),
"op_window_0_exceptions_total": int64(0),
- "op_window_0_process_latency_ms": int64(0),
+ "op_window_0_process_latency_us": int64(0),
"op_window_0_records_in_total": int64(12),
"op_window_0_records_out_total": int64(4),
},
@@ -965,17 +965,17 @@ func TestEventWindow(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demoE_0_exceptions_total": int64(0),
- "op_preprocessor_demoE_0_process_latency_ms": int64(0),
+ "op_preprocessor_demoE_0_process_latency_us": int64(0),
"op_preprocessor_demoE_0_records_in_total": int64(6),
"op_preprocessor_demoE_0_records_out_total": int64(6),
"op_preprocessor_demo1E_0_exceptions_total": int64(0),
- "op_preprocessor_demo1E_0_process_latency_ms": int64(0),
+ "op_preprocessor_demo1E_0_process_latency_us": int64(0),
"op_preprocessor_demo1E_0_records_in_total": int64(6),
"op_preprocessor_demo1E_0_records_out_total": int64(6),
"op_project_0_exceptions_total": int64(0),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(5),
"op_project_0_records_out_total": int64(5),
@@ -996,7 +996,7 @@ func TestEventWindow(t *testing.T) {
"op_window_0_records_out_total": int64(5),
"op_join_0_exceptions_total": int64(0),
- "op_join_0_process_latency_ms": int64(0),
+ "op_join_0_process_latency_us": int64(0),
"op_join_0_records_in_total": int64(5),
"op_join_0_records_out_total": int64(5),
},
@@ -1037,12 +1037,12 @@ func TestEventWindow(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_demoErr_0_exceptions_total": int64(1),
- "op_preprocessor_demoErr_0_process_latency_ms": int64(0),
+ "op_preprocessor_demoErr_0_process_latency_us": int64(0),
"op_preprocessor_demoErr_0_records_in_total": int64(6),
"op_preprocessor_demoErr_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(1),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(6),
"op_project_0_records_out_total": int64(5),
@@ -1055,7 +1055,7 @@ func TestEventWindow(t *testing.T) {
"source_demoErr_0_records_out_total": int64(6),
"op_window_0_exceptions_total": int64(1),
- "op_window_0_process_latency_ms": int64(0),
+ "op_window_0_process_latency_us": int64(0),
"op_window_0_records_in_total": int64(6),
"op_window_0_records_out_total": int64(5),
},
@@ -1103,12 +1103,12 @@ func TestWindowError(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_ldemo_0_exceptions_total": int64(0),
- "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
+ "op_preprocessor_ldemo_0_process_latency_us": int64(0),
"op_preprocessor_ldemo_0_records_in_total": int64(5),
"op_preprocessor_ldemo_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(1),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(2),
"op_project_0_records_out_total": int64(1),
@@ -1121,7 +1121,7 @@ func TestWindowError(t *testing.T) {
"source_ldemo_0_records_out_total": int64(5),
"op_window_0_exceptions_total": int64(0),
- "op_window_0_process_latency_ms": int64(0),
+ "op_window_0_process_latency_us": int64(0),
"op_window_0_records_in_total": int64(5),
"op_window_0_records_out_total": int64(2),
},
@@ -1137,12 +1137,12 @@ func TestWindowError(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_ldemo_0_exceptions_total": int64(0),
- "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
+ "op_preprocessor_ldemo_0_process_latency_us": int64(0),
"op_preprocessor_ldemo_0_records_in_total": int64(5),
"op_preprocessor_ldemo_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(1),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(2),
"op_project_0_records_out_total": int64(1),
@@ -1155,12 +1155,12 @@ func TestWindowError(t *testing.T) {
"source_ldemo_0_records_out_total": int64(5),
"op_window_0_exceptions_total": int64(0),
- "op_window_0_process_latency_ms": int64(0),
+ "op_window_0_process_latency_us": int64(0),
"op_window_0_records_in_total": int64(5),
"op_window_0_records_out_total": int64(4),
"op_filter_0_exceptions_total": int64(1),
- "op_filter_0_process_latency_ms": int64(0),
+ "op_filter_0_process_latency_us": int64(0),
"op_filter_0_records_in_total": int64(4),
"op_filter_0_records_out_total": int64(1),
},
@@ -1196,17 +1196,17 @@ func TestWindowError(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_ldemo_0_exceptions_total": int64(0),
- "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
+ "op_preprocessor_ldemo_0_process_latency_us": int64(0),
"op_preprocessor_ldemo_0_records_in_total": int64(5),
"op_preprocessor_ldemo_0_records_out_total": int64(5),
"op_preprocessor_ldemo1_0_exceptions_total": int64(0),
- "op_preprocessor_ldemo1_0_process_latency_ms": int64(0),
+ "op_preprocessor_ldemo1_0_process_latency_us": int64(0),
"op_preprocessor_ldemo1_0_records_in_total": int64(5),
"op_preprocessor_ldemo1_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(3),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(8),
"op_project_0_records_out_total": int64(5),
@@ -1223,12 +1223,12 @@ func TestWindowError(t *testing.T) {
"source_ldemo1_0_records_out_total": int64(5),
"op_window_0_exceptions_total": int64(0),
- "op_window_0_process_latency_ms": int64(0),
+ "op_window_0_process_latency_us": int64(0),
"op_window_0_records_in_total": int64(10),
"op_window_0_records_out_total": int64(10),
"op_join_0_exceptions_total": int64(3),
- "op_join_0_process_latency_ms": int64(0),
+ "op_join_0_process_latency_us": int64(0),
"op_join_0_records_in_total": int64(10),
"op_join_0_records_out_total": int64(5),
},
@@ -1250,12 +1250,12 @@ func TestWindowError(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_ldemo_0_exceptions_total": int64(0),
- "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
+ "op_preprocessor_ldemo_0_process_latency_us": int64(0),
"op_preprocessor_ldemo_0_records_in_total": int64(5),
"op_preprocessor_ldemo_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(3),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(5),
"op_project_0_records_out_total": int64(2),
@@ -1268,17 +1268,17 @@ func TestWindowError(t *testing.T) {
"source_ldemo_0_records_out_total": int64(5),
"op_window_0_exceptions_total": int64(0),
- "op_window_0_process_latency_ms": int64(0),
+ "op_window_0_process_latency_us": int64(0),
"op_window_0_records_in_total": int64(5),
"op_window_0_records_out_total": int64(5),
"op_aggregate_0_exceptions_total": int64(0),
- "op_aggregate_0_process_latency_ms": int64(0),
+ "op_aggregate_0_process_latency_us": int64(0),
"op_aggregate_0_records_in_total": int64(5),
"op_aggregate_0_records_out_total": int64(5),
"op_having_0_exceptions_total": int64(3),
- "op_having_0_process_latency_ms": int64(0),
+ "op_having_0_process_latency_us": int64(0),
"op_having_0_records_in_total": int64(5),
"op_having_0_records_out_total": int64(2),
},
@@ -1299,12 +1299,12 @@ func TestWindowError(t *testing.T) {
},
m: map[string]interface{}{
"op_preprocessor_ldemo_0_exceptions_total": int64(0),
- "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
+ "op_preprocessor_ldemo_0_process_latency_us": int64(0),
"op_preprocessor_ldemo_0_records_in_total": int64(5),
"op_preprocessor_ldemo_0_records_out_total": int64(5),
"op_project_0_exceptions_total": int64(1),
- "op_project_0_process_latency_ms": int64(0),
+ "op_project_0_process_latency_us": int64(0),
"op_project_0_records_in_total": int64(4),
"op_project_0_records_out_total": int64(3),
@@ -1317,12 +1317,12 @@ func TestWindowError(t *testing.T) {
"source_ldemo_0_records_out_total": int64(5),
"op_window_0_exceptions_total": int64(0),
- "op_window_0_process_latency_ms": int64(0),
+ "op_window_0_process_latency_us": int64(0),
"op_window_0_records_in_total": int64(5),
"op_window_0_records_out_total": int64(4),
"op_order_0_exceptions_total": int64(1),
- "op_order_0_process_latency_ms": int64(0),
+ "op_order_0_process_latency_us": int64(0),
"op_order_0_records_in_total": int64(4),
"op_order_0_records_out_total": int64(3),
},
diff --git a/xsql/processors/xsql_processor.go b/xsql/processors/xsql_processor.go
index ea9731f39f..02caa195f0 100644
--- a/xsql/processors/xsql_processor.go
+++ b/xsql/processors/xsql_processor.go
@@ -77,6 +77,33 @@ func (p *StreamProcessor) execCreateStream(stmt *xsql.StreamStmt, statement stri
}
}
+func (p *StreamProcessor) ExecReplaceStream(statement string) (string, error) {
+ parser := xsql.NewParser(strings.NewReader(statement))
+ stmt, err := xsql.Language.Parse(parser)
+ if err != nil {
+ return "", err
+ }
+
+ switch s := stmt.(type) {
+ case *xsql.StreamStmt:
+ if err = p.db.Open(); nil != err {
+ return "", fmt.Errorf("Replace stream fails, error when opening db: %v.", err)
+ }
+ defer p.db.Close()
+
+ if err = p.db.Replace(string(s.Name), statement); nil != err {
+ return "", fmt.Errorf("Replace stream fails: %v.", err)
+ } else {
+ info := fmt.Sprintf("Stream %s is replaced.", s.Name)
+ log.Printf("%s", info)
+ return info, nil
+ }
+ default:
+ return "", fmt.Errorf("Invalid stream statement: %s", statement)
+ }
+ return "", nil
+}
+
func (p *StreamProcessor) ExecStreamSql(statement string) (string, error) {
r, err := p.ExecStmt(statement)
if err != nil {
@@ -228,6 +255,27 @@ func (p *RuleProcessor) ExecCreate(name, ruleJson string) (*api.Rule, error) {
return rule, nil
}
+func (p *RuleProcessor) ExecUpdate(name, ruleJson string) (*api.Rule, error) {
+ rule, err := p.getRuleByJson(name, ruleJson)
+ if err != nil {
+ return nil, err
+ }
+
+ err = p.db.Open()
+ if err != nil {
+ return nil, err
+ }
+ defer p.db.Close()
+
+ err = p.db.Replace(rule.Id, ruleJson)
+ if err != nil {
+ return nil, err
+ } else {
+ log.Infof("Rule %s is update.", rule.Id)
+ }
+
+ return rule, nil
+}
func (p *RuleProcessor) ExecReplaceRuleState(name string, triggered bool) (err error) {
rule, err := p.GetRuleByName(name)
diff --git a/xsql/util.go b/xsql/util.go
index c37ef52d81..ffbc483c9e 100644
--- a/xsql/util.go
+++ b/xsql/util.go
@@ -1,6 +1,10 @@
package xsql
-import "strings"
+import (
+ "encoding/json"
+ "fmt"
+ "strings"
+)
func PrintFieldType(ft FieldType) (result string) {
switch t := ft.(type) {
@@ -30,6 +34,56 @@ func PrintFieldType(ft FieldType) (result string) {
return
}
+func PrintFieldTypeForJson(ft FieldType) (result interface{}) {
+ r, q := doPrintFieldTypeForJson(ft)
+ if q {
+ return r
+ } else {
+ return json.RawMessage(r)
+ }
+}
+
+func doPrintFieldTypeForJson(ft FieldType) (result string, isLiteral bool) {
+ switch t := ft.(type) {
+ case *BasicType:
+ return t.Type.String(), true
+ case *ArrayType:
+ var (
+ fieldType string
+ q bool
+ )
+ if t.FieldType != nil {
+ fieldType, q = doPrintFieldTypeForJson(t.FieldType)
+ } else {
+ fieldType, q = t.Type.String(), true
+ }
+ if q {
+ result = fmt.Sprintf(`{"Type":"array","ElementType":"%s"}`, fieldType)
+ } else {
+ result = fmt.Sprintf(`{"Type":"array","ElementType":%s}`, fieldType)
+ }
+
+ case *RecType:
+ result = `{"Type":"struct","Fields":[`
+ isFirst := true
+ for _, f := range t.StreamFields {
+ if isFirst {
+ isFirst = false
+ } else {
+ result += ","
+ }
+ fieldType, q := doPrintFieldTypeForJson(f.FieldType)
+ if q {
+ result = fmt.Sprintf(`%s{"FieldType":"%s","Name":"%s"}`, result, fieldType, f.Name)
+ } else {
+ result = fmt.Sprintf(`%s{"FieldType":"%s","Name":"%s"}`, result, fieldType, f.Name)
+ }
+ }
+ result += `]}`
+ }
+ return result, false
+}
+
func GetStreams(stmt *SelectStatement) (result []string) {
if stmt == nil {
return nil
diff --git a/xsql/util_test.go b/xsql/util_test.go
index 4fcf487f46..77323d09de 100644
--- a/xsql/util_test.go
+++ b/xsql/util_test.go
@@ -61,10 +61,54 @@ func TestLowercaseKeyMap(t *testing.T) {
fmt.Printf("The test bucket size is %d.\n\n", len(tests))
for i, tt := range tests {
- //fmt.Printf("Parsing SQL %q.\n", tt.s)
result := LowercaseKeyMap(tt.src)
if !reflect.DeepEqual(tt.dest, result) {
t.Errorf("%d. \nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.dest, result)
}
}
}
+
+func TestPrintFieldType(t *testing.T) {
+ var tests = []struct {
+ ft FieldType
+ printed string
+ }{{
+ ft: &RecType{
+ StreamFields: []StreamField{
+ {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
+ {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
+ },
+ },
+ printed: `{"Type":"struct","Fields":[{"FieldType":"string","Name":"STREET_NAME"},{"FieldType":"bigint","Name":"NUMBER"}]}`,
+ }, {
+ ft: &ArrayType{
+ Type: STRUCT,
+ FieldType: &RecType{
+ StreamFields: []StreamField{
+ {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
+ {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
+ },
+ },
+ },
+ printed: `{"Type":"array","ElementType":{"Type":"struct","Fields":[{"FieldType":"string","Name":"STREET_NAME"},{"FieldType":"bigint","Name":"NUMBER"}]}}`,
+ }, {
+ ft: &ArrayType{
+ Type: STRUCT,
+ FieldType: &BasicType{Type: STRINGS},
+ },
+ printed: `{"Type":"array","ElementType":"string"}`,
+ }, {
+ ft: &BasicType{
+ Type: STRINGS,
+ },
+ printed: `string`,
+ }}
+ fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+ for i, tt := range tests {
+ //fmt.Printf("Parsing SQL %q.\n",tt.s)
+ result, _ := doPrintFieldTypeForJson(tt.ft)
+ if !reflect.DeepEqual(tt.printed, result) {
+ t.Errorf("%d. \nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.printed, result)
+ }
+ }
+}
diff --git a/xstream/nodes/dynamic_channel_buffer.go b/xstream/nodes/dynamic_channel_buffer.go
index 0ff01f1186..e228d0406b 100644
--- a/xstream/nodes/dynamic_channel_buffer.go
+++ b/xstream/nodes/dynamic_channel_buffer.go
@@ -7,10 +7,10 @@ import (
)
type DynamicChannelBuffer struct {
+ limit int64
In chan api.SourceTuple
Out chan api.SourceTuple
buffer []api.SourceTuple
- limit int64
}
func NewDynamicChannelBuffer() *DynamicChannelBuffer {
diff --git a/xstream/nodes/prometheus.go b/xstream/nodes/prometheus.go
index 39c0477323..ded5969cc7 100644
--- a/xstream/nodes/prometheus.go
+++ b/xstream/nodes/prometheus.go
@@ -8,12 +8,12 @@ import (
const RecordsInTotal = "records_in_total"
const RecordsOutTotal = "records_out_total"
const ExceptionsTotal = "exceptions_total"
-const ProcessLatencyMs = "process_latency_ms"
+const ProcessLatencyUs = "process_latency_us"
const LastInvocation = "last_invocation"
const BufferLength = "buffer_length"
var (
- MetricNames = []string{RecordsInTotal, RecordsOutTotal, ExceptionsTotal, ProcessLatencyMs, BufferLength, LastInvocation}
+ MetricNames = []string{RecordsInTotal, RecordsOutTotal, ExceptionsTotal, ProcessLatencyUs, BufferLength, LastInvocation}
prometheuseMetrics *PrometheusMetrics
mutex sync.RWMutex
)
@@ -60,7 +60,7 @@ func newPrometheusMetrics() *PrometheusMetrics {
Help: "Total number of user exceptions of " + prefix,
}, labelNames)
processLatency := prometheus.NewGaugeVec(prometheus.GaugeOpts{
- Name: prefix + "_" + ProcessLatencyMs,
+ Name: prefix + "_" + ProcessLatencyUs,
Help: "Process latency in millisecond of " + prefix,
}, labelNames)
bufferLength := prometheus.NewGaugeVec(prometheus.GaugeOpts{
diff --git a/xstream/nodes/stats_manager.go b/xstream/nodes/stats_manager.go
index d59ca9e9bc..62f78c46b5 100644
--- a/xstream/nodes/stats_manager.go
+++ b/xstream/nodes/stats_manager.go
@@ -111,7 +111,7 @@ func (sm *DefaultStatManager) ProcessTimeStart() {
func (sm *DefaultStatManager) ProcessTimeEnd() {
if !sm.processTimeStart.IsZero() {
- sm.processLatency = int64(time.Since(sm.processTimeStart) / time.Millisecond)
+ sm.processLatency = int64(time.Since(sm.processTimeStart) / time.Microsecond)
}
}
@@ -138,7 +138,7 @@ func (sm *PrometheusStatManager) IncTotalExceptions() {
func (sm *PrometheusStatManager) ProcessTimeEnd() {
if !sm.processTimeStart.IsZero() {
- sm.processLatency = int64(time.Since(sm.processTimeStart) / time.Millisecond)
+ sm.processLatency = int64(time.Since(sm.processTimeStart) / time.Microsecond)
sm.pProcessLatency.Set(float64(sm.processLatency))
}
}
diff --git a/xstream/server/server/rest.go b/xstream/server/server/rest.go
index 280c3ad6b0..beb106e11e 100644
--- a/xstream/server/server/rest.go
+++ b/xstream/server/server/rest.go
@@ -76,9 +76,9 @@ func createRestServer(port int) *http.Server {
r := mux.NewRouter()
r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
- r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete)
+ r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
- r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet)
+ r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost)
r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
@@ -191,6 +191,19 @@ func streamHandler(w http.ResponseWriter, r *http.Request) {
}
w.WriteHeader(http.StatusOK)
w.Write([]byte(content))
+ case http.MethodPut:
+ v, err := decodeStatementDescriptor(r.Body)
+ if err != nil {
+ handleError(w, err, "Invalid body", logger)
+ return
+ }
+ content, err := streamProcessor.ExecReplaceStream(v.Sql)
+ if err != nil {
+ handleError(w, err, "Stream command error", logger)
+ return
+ }
+ w.WriteHeader(http.StatusOK)
+ w.Write([]byte(content))
}
}
@@ -258,6 +271,35 @@ func ruleHandler(w http.ResponseWriter, r *http.Request) {
}
w.WriteHeader(http.StatusOK)
w.Write([]byte(content))
+ case http.MethodPut:
+ _, err := ruleProcessor.GetRuleByName(name)
+ if err != nil {
+ handleError(w, err, "not found this rule", logger)
+ return
+ }
+
+ body, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ handleError(w, err, "Invalid body", logger)
+ return
+ }
+
+ r, err := ruleProcessor.ExecUpdate(name, string(body))
+ var result string
+ if err != nil {
+ handleError(w, err, "Update rule error", logger)
+ return
+ } else {
+ result = fmt.Sprintf("Rule %s was updated successfully.", r.Id)
+ }
+
+ err = restartRule(name)
+ if err != nil {
+ handleError(w, err, "restart rule error", logger)
+ return
+ }
+ w.WriteHeader(http.StatusOK)
+ w.Write([]byte(result))
}
}