Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Refactor the retry logic in the monitor
Update monitor env variables to be consistent
Update doc

Signed-off-by: Yanjun Zhou <zhouya@vmware.com>
  • Loading branch information
yanjunz97 committed Mar 15, 2022
1 parent 7fce28d commit 0bd6b6f
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 46 deletions.
10 changes: 4 additions & 6 deletions build/yamls/flow-visibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4947,20 +4947,18 @@ spec:
spec:
containers:
- env:
- name: CH_USERNAME
- name: CLICKHOUSE_USERNAME
valueFrom:
secretKeyRef:
key: username
name: clickhouse-secret
- name: CH_PASSWORD
- name: CLICKHOUSE_PASSWORD
valueFrom:
secretKeyRef:
key: password
name: clickhouse-secret
- name: SVC_HOST
value: clickhouse-clickhouse.flow-visibility.svc.cluster.local
- name: SVC_PORT
value: "9000"
- name: DB_URL
value: tcp://clickhouse-clickhouse.flow-visibility.svc:9000
- name: TABLE_NAME
value: default.flows
- name: MV_NAMES
Expand Down
10 changes: 4 additions & 6 deletions build/yamls/flow-visibility/base/clickhouse.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,18 @@ spec:
- name: clickhouse-monitor
image: flow-visibility-clickhouse-monitor
env:
- name: CH_USERNAME
- name: CLICKHOUSE_USERNAME
valueFrom:
secretKeyRef:
name: clickhouse-secret
key: username
- name: CH_PASSWORD
- name: CLICKHOUSE_PASSWORD
valueFrom:
secretKeyRef:
name: clickhouse-secret
key: password
- name: SVC_HOST
value: "clickhouse-clickhouse.flow-visibility.svc.cluster.local"
- name: SVC_PORT
value: "9000"
- name: DB_URL
value: "tcp://clickhouse-clickhouse.flow-visibility.svc:9000"
- name: TABLE_NAME
value: "default.flows"
- name: MV_NAMES
Expand Down
3 changes: 2 additions & 1 deletion docs/network-flow-visibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,8 @@ Grafana flow collector supports the ClickHouse in-memory deployment with limited
storage size. This is specified in [clickhouse.yml][clickhouse_manifest_yaml].
The default value of storage size for the ClickHouse server is 8 GiB. Users
can expect a linear growth in the ClickHouse throughput when they enlarge the
storage size. To deploy the ClickHouse with a different storage size, please
storage size. For development or testing environment, you can decrease the storage
size to 2GB. To deploy the ClickHouse with a different storage size, please
modify the `sizeLimit` in the following section.

```yaml
Expand Down
85 changes: 52 additions & 33 deletions plugins/flow-visibility/clickhouse-monitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ClickHouse/clickhouse-go"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
Expand All @@ -41,9 +42,13 @@ const (
// The monitor stops for 3 intervals after a deletion to wait for the Clickhouse MergeTree Engine to release memory.
skipRoundsNum = 3
// Connection to Clickhouse timeout if if fails for 1 minute.
connectionTimeout = time.Minute
connTimeout = time.Minute
// Retry connection to Clickhouse every 5 seconds if it fails.
connectionWait = 5 * time.Second
connRetryInterval = 5 * time.Second
// Query to Clickhouse timeout if if fails for 10 seconds.
queryTimeout = 10 * time.Second
// Retry query to Clickhouse every second if it fails.
queryRetryInterval = 1 * time.Second
// Time format for timeInserted
timeFormat = "2006-01-02 15:04:05"
)
Expand Down Expand Up @@ -160,37 +165,33 @@ func getPodLogs() (string, error) {
// Connects to Clickhouse in a loop
func connectLoop() (*sql.DB, error) {
// Clickhouse configuration
userName := os.Getenv("CH_USERNAME")
password := os.Getenv("CH_PASSWORD")
host, port := os.Getenv("SVC_HOST"), os.Getenv("SVC_PORT")

ticker := time.NewTicker(connectionWait)
defer ticker.Stop()

timeoutExceeded := time.After(connectionTimeout)
for {
select {
case <-timeoutExceeded:
return nil, fmt.Errorf("failed to connect to Clickhouse after %s", connectionTimeout)

case <-ticker.C:
// Open the database and ping it
dataSourceName := fmt.Sprintf("tcp://%s:%s?debug=true&username=%s&password=%s", host, port, userName, password)
connect, err := sql.Open("clickhouse", dataSourceName)
if err != nil {
klog.ErrorS(err, "Failed to connect to Clickhouse")
}
if err := connect.Ping(); err != nil {
if exception, ok := err.(*clickhouse.Exception); ok {
klog.ErrorS(nil, "Failed to ping Clickhouse", "message", exception.Message)
} else {
klog.ErrorS(err, "Failed to ping Clickhouse")
}
userName := os.Getenv("CLICKHOUSE_USERNAME")
password := os.Getenv("CLICKHOUSE_PASSWORD")
databaseURL := os.Getenv("DB_URL")
var connect *sql.DB
if err := wait.PollImmediate(connRetryInterval, connTimeout, func() (bool, error) {
// Open the database and ping it
dataSourceName := fmt.Sprintf("%s?debug=true&username=%s&password=%s", databaseURL, userName, password)
var err error
connect, err = sql.Open("clickhouse", dataSourceName)
if err != nil {
klog.ErrorS(err, "Failed to connect to Clickhouse")
return false, err
}
if err := connect.Ping(); err != nil {
if exception, ok := err.(*clickhouse.Exception); ok {
klog.ErrorS(nil, "Failed to ping Clickhouse", "message", exception.Message)
} else {
return connect, nil
klog.ErrorS(err, "Failed to ping Clickhouse")
}
return false, err
} else {
return true, nil
}
}); err != nil {
return nil, fmt.Errorf("failed to connect to Clickhouse after %s", connTimeout)
}
return connect, nil
}

// Checks the memory usage in the Clickhouse, deletes records when it exceeds the threshold.
Expand All @@ -200,7 +201,13 @@ func monitorMemory(connect *sql.DB) bool {
totalSpace uint64
)
// Get memory usage from Clickhouse system table
if err := connect.QueryRow("SELECT free_space, total_space FROM system.disks").Scan(&freeSpace, &totalSpace); err != nil {
if err := wait.PollImmediate(queryRetryInterval, queryTimeout, func() (bool, error) {
if err := connect.QueryRow("SELECT free_space, total_space FROM system.disks").Scan(&freeSpace, &totalSpace); err != nil {
return false, err
} else {
return true, nil
}
}); err != nil {
klog.ErrorS(err, "Failed to get memory usage for Clickhouse")
return false
}
Expand Down Expand Up @@ -238,7 +245,13 @@ func getTimeBoundary(connect *sql.DB) (time.Time, error) {
return timeBoundary, err
}
command := fmt.Sprintf("SELECT timeInserted FROM %s LIMIT 1 OFFSET %d", tableName, deleteRowNum)
if err := connect.QueryRow(command).Scan(&timeBoundary); err != nil {
if err := wait.PollImmediate(queryRetryInterval, queryTimeout, func() (bool, error) {
if err := connect.QueryRow(command).Scan(&timeBoundary); err != nil {
return false, err
} else {
return true, nil
}
}); err != nil {
return timeBoundary, fmt.Errorf("failed to get timeInserted boundary from %s: %v", tableName, err)
}
return timeBoundary, nil
Expand All @@ -248,8 +261,14 @@ func getTimeBoundary(connect *sql.DB) (time.Time, error) {
func getDeleteRowNum(connect *sql.DB) (uint64, error) {
var deleteRowNum, count uint64
command := fmt.Sprintf("SELECT COUNT() FROM %s", tableName)
if err := connect.QueryRow(command).Scan(&count); err != nil {
return deleteRowNum, fmt.Errorf("Failed to get the number of records from %s: %v", tableName, err)
if err := wait.PollImmediate(queryRetryInterval, queryTimeout, func() (bool, error) {
if err := connect.QueryRow(command).Scan(&count); err != nil {
return false, err
} else {
return true, nil
}
}); err != nil {
return deleteRowNum, fmt.Errorf("failed to get the number of records from %s: %v", tableName, err)
}
deleteRowNum = uint64(float64(count) * deletePercentage)
return deleteRowNum, nil
Expand Down

0 comments on commit 0bd6b6f

Please sign in to comment.