Skip to content

Commit

Permalink
feat: Added support for maxPayload, raftHeartbeatTimeout, raftElectio…
Browse files Browse the repository at this point in the history
…nTimeout, raftLeaseTimeout, raftCommitTimeout native nats settings (argoproj#1402)

* feat: Added support for maxPayload, raftHeartbeatTimeout, raftElectionTimeout, raftLeaseTimeout, raftCommitTimeout native nats settings

Signed-off-by: daniel-codefresh <daniel.soifer@codefresh.io>

* moved hard coded values to constants

Signed-off-by: daniel-codefresh <daniel.soifer@codefresh.io>

* ident

Signed-off-by: daniel-codefresh <daniel.soifer@codefresh.io>

* removed unnecessary quotes

Signed-off-by: daniel-codefresh <daniel.soifer@codefresh.io>

* revert last commit

Signed-off-by: Daniel Soifer <daniel.soifer@codefresh.io>
  • Loading branch information
daniel-codefresh authored Nov 2, 2021
1 parent 5212bad commit 73bafd4
Show file tree
Hide file tree
Showing 11 changed files with 633 additions and 93 deletions.
55 changes: 55 additions & 0 deletions api/event-bus.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 57 additions & 0 deletions api/event-bus.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions api/jsonschema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,10 @@
"format": "int64",
"type": "integer"
},
"maxPayload": {
"description": "Maximum number of bytes in a message payload, 0 means unlimited. Defaults to 1MB",
"type": "string"
},
"maxSubs": {
"description": "Maximum number of subscriptions per channel, 0 means unlimited. Defaults to 1000",
"format": "int64",
Expand Down Expand Up @@ -468,6 +472,22 @@
"description": "If specified, indicates the EventSource pod's priority. \"system-node-critical\" and \"system-cluster-critical\" are two special keywords which indicate the highest priorities with the former being the highest priority. Any other name must be defined by creating a PriorityClass object with that name. If not specified, the pod priority will be default or zero if there is no default. More info: https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/",
"type": "string"
},
"raftCommitTimeout": {
"description": "Specifies the time without an Apply() operation before sending an heartbeat to ensure timely commit, i.e. \"72h\", “4h35m”. Defaults to 100ms",
"type": "string"
},
"raftElectionTimeout": {
"description": "Specifies the time in candidate state without a leader before attempting an election, i.e. \"72h\", “4h35m”. Defaults to 2s",
"type": "string"
},
"raftHeartbeatTimeout": {
"description": "Specifies the time in follower state without a leader before attempting an election, i.e. \"72h\", “4h35m”. Defaults to 2s",
"type": "string"
},
"raftLeaseTimeout": {
"description": "Specifies how long a leader waits without being able to contact a quorum of nodes before stepping down as leader, i.e. \"72h\", “4h35m”. Defaults to 1s",
"type": "string"
},
"replicas": {
"description": "Size is the NATS StatefulSet size",
"format": "int32",
Expand Down Expand Up @@ -2331,6 +2351,10 @@
"description": "Region is AWS region",
"type": "string"
},
"roleARN": {
"description": "RoleARN is the Amazon Resource Name (ARN) of the role to assume.",
"type": "string"
},
"secretKey": {
"$ref": "#/definitions/io.k8s.api.core.v1.SecretKeySelector",
"description": "SecretKey refers K8s secret containing aws secret key"
Expand Down
20 changes: 20 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,22 @@ const (
EventBusAuthFileMountPath = "/etc/eventbus/auth"
// Default NATS Streaming messages max age
NATSStreamingMaxAge = "72h"
// Default NATS Streaming max messages per channel
NATSStreamingMaxMsgs = uint64(1000000)
// Default NATS Streaming max subscriptions per channel
NATSStreamingMaxSubs = uint64(1000)
// Default NATS Streaming max total size of messages per channel
NATSStreamingMaxBytes = "1GB"
// Default NATS Streaming max size of message payload
NATSStreamingMaxPayload = "1MB"
// Default NATS Streaming RAFT heartbeat timeout
NATSStreamingRaftHeartbeatTimeout = "2s"
// Default NATS Streaming RAFT election timeout
NATSStreamingRaftElectionTimeout = "2s"
// Default NATS Streaming RAFT lease timeout
NATSStreamingRaftLeaseTimeout = "1s"
// Default NATS Streaming RAFT commit timeout
NATSStreamingRaftCommitTimeout = "100ms"
// Default EventBus name
DefaultEventBusName = "default"
)
Expand Down
49 changes: 45 additions & 4 deletions controllers/eventbus/installer/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,18 +449,54 @@ func (i *natsInstaller) buildConfigMap() (*corev1.ConfigMap, error) {
if err != nil {
return nil, err
}
maxMsgs := uint64(1000000)
maxMsgs := common.NATSStreamingMaxMsgs
if i.eventBus.Spec.NATS.Native.MaxMsgs != nil {
maxMsgs = *i.eventBus.Spec.NATS.Native.MaxMsgs
}
maxSubs := uint64(1000)
maxSubs := common.NATSStreamingMaxSubs
if i.eventBus.Spec.NATS.Native.MaxSubs != nil {
maxSubs = *i.eventBus.Spec.NATS.Native.MaxSubs
}
maxBytes := "1GB"
maxBytes := common.NATSStreamingMaxBytes
if i.eventBus.Spec.NATS.Native.MaxBytes != nil {
maxBytes = *i.eventBus.Spec.NATS.Native.MaxBytes
}
maxPayload := common.NATSStreamingMaxPayload
if i.eventBus.Spec.NATS.Native.MaxPayload != nil {
maxPayload = *i.eventBus.Spec.NATS.Native.MaxPayload
}
raftHeartbeatTimeout := common.NATSStreamingRaftHeartbeatTimeout
if i.eventBus.Spec.NATS.Native.RaftHeartbeatTimeout != nil {
raftHeartbeatTimeout = *i.eventBus.Spec.NATS.Native.RaftHeartbeatTimeout
}
_, err = time.ParseDuration(raftHeartbeatTimeout)
if err != nil {
return nil, err
}
raftElectionTimeout := common.NATSStreamingRaftElectionTimeout
if i.eventBus.Spec.NATS.Native.RaftElectionTimeout != nil {
raftElectionTimeout = *i.eventBus.Spec.NATS.Native.RaftElectionTimeout
}
_, err = time.ParseDuration(raftElectionTimeout)
if err != nil {
return nil, err
}
raftLeaseTimeout := common.NATSStreamingRaftLeaseTimeout
if i.eventBus.Spec.NATS.Native.RaftLeaseTimeout != nil {
raftLeaseTimeout = *i.eventBus.Spec.NATS.Native.RaftLeaseTimeout
}
_, err = time.ParseDuration(raftLeaseTimeout)
if err != nil {
return nil, err
}
raftCommitTimeout := common.NATSStreamingRaftCommitTimeout
if i.eventBus.Spec.NATS.Native.RaftCommitTimeout != nil {
raftCommitTimeout = *i.eventBus.Spec.NATS.Native.RaftCommitTimeout
}
_, err = time.ParseDuration(raftCommitTimeout)
if err != nil {
return nil, err
}
peers := []string{}
routes := []string{}
for j := 0; j < replicas; j++ {
Expand All @@ -475,6 +511,7 @@ cluster {
cluster_advertise: $CLUSTER_ADVERTISE
connect_retries: 10
}
max_payload: %s
streaming {
id: %s
store: file
Expand All @@ -483,14 +520,18 @@ streaming {
node_id: $POD_NAME
peers: [%s]
log_path: /data/stan/logs
raft_heartbeat_timeout: "%s"
raft_election_timeout: "%s"
raft_lease_timeout: "%s"
raft_commit_timeout: "%s"
}
store_limits {
max_age: %s
max_msgs: %v
max_bytes: %s
max_subs: %v
}
}`, strconv.Itoa(int(monitorPort)), strconv.Itoa(int(clusterPort)), strings.Join(routes, ","), clusterID, strings.Join(peers, ","), maxAge, maxMsgs, maxBytes, maxSubs)
}`, strconv.Itoa(int(monitorPort)), strconv.Itoa(int(clusterPort)), strings.Join(routes, ","), maxPayload, clusterID, strings.Join(peers, ","), raftHeartbeatTimeout, raftElectionTimeout, raftLeaseTimeout, raftCommitTimeout, maxAge, maxMsgs, maxBytes, maxSubs)
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: i.eventBus.Namespace,
Expand Down
Loading

0 comments on commit 73bafd4

Please sign in to comment.