Skip to content

Commit

Permalink
[ADDED] Missing fields on StreamConfig for 2.10.0 release (#1405)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed Sep 20, 2023
1 parent bee172d commit 8ce44e0
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 48 deletions.
108 changes: 84 additions & 24 deletions jetstream/stream_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,30 +35,32 @@ type (
}

StreamConfig struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
Discard DiscardPolicy `json:"discard"`
DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"`
MaxAge time.Duration `json:"max_age"`
MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
Sealed bool `json:"sealed,omitempty"`
DenyDelete bool `json:"deny_delete,omitempty"`
DenyPurge bool `json:"deny_purge,omitempty"`
AllowRollup bool `json:"allow_rollup_hdrs,omitempty"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
Discard DiscardPolicy `json:"discard"`
DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"`
MaxAge time.Duration `json:"max_age"`
MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
Sealed bool `json:"sealed,omitempty"`
DenyDelete bool `json:"deny_delete,omitempty"`
DenyPurge bool `json:"deny_purge,omitempty"`
AllowRollup bool `json:"allow_rollup_hdrs,omitempty"`
Compression StoreCompression `json:"compression"`
FirstSeq uint64 `json:"first_seq,omitempty"`

// Allow applying a subject transform to incoming messages before doing anything else
SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"`
Expand All @@ -71,6 +73,9 @@ type (
// Allow higher performance and unified direct access for mirrors as well.
MirrorDirect bool `json:"mirror_direct"`

// Limits for consumers on this stream.
ConsumerLimits StreamConsumerLimits `json:"consumer_limits,omitempty"`

// Metadata is additional metadata for the Stream.
// Keys starting with `_nats` are reserved.
// NOTE: Metadata requires nats-server v2.10.0+
Expand Down Expand Up @@ -159,6 +164,13 @@ type (
DeliverPrefix string `json:"deliver"`
}

// StreamConsumerLimits are the limits for a consumer on a stream.
// These can be overridden on a per consumer basis.
StreamConsumerLimits struct {
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
}

// DiscardPolicy determines how to proceed when limits of messages or bytes are
// reached.
DiscardPolicy int
Expand All @@ -168,6 +180,9 @@ type (

// StorageType determines how messages are stored for retention.
StorageType int

// StoreCompression determines how messages are compressed.
StoreCompression uint8
)

const (
Expand Down Expand Up @@ -318,3 +333,48 @@ func (st *StorageType) UnmarshalJSON(data []byte) error {
func jsonString(s string) string {
return "\"" + s + "\""
}

const (
NoCompression StoreCompression = iota
S2Compression
)

func (alg StoreCompression) String() string {
switch alg {
case NoCompression:
return "None"
case S2Compression:
return "S2"
default:
return "Unknown StoreCompression"
}
}

func (alg StoreCompression) MarshalJSON() ([]byte, error) {
var str string
switch alg {
case S2Compression:
str = "s2"
case NoCompression:
str = "none"
default:
return nil, fmt.Errorf("unknown compression algorithm")
}
return json.Marshal(str)
}

func (alg *StoreCompression) UnmarshalJSON(b []byte) error {
var str string
if err := json.Unmarshal(b, &str); err != nil {
return err
}
switch str {
case "s2":
*alg = S2Compression
case "none":
*alg = NoCompression
default:
return fmt.Errorf("unknown compression algorithm")
}
return nil
}
47 changes: 47 additions & 0 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -3740,6 +3740,53 @@ func (st *StorageType) UnmarshalJSON(data []byte) error {
return nil
}

type StoreCompression uint8

const (
NoCompression StoreCompression = iota
S2Compression
)

func (alg StoreCompression) String() string {
switch alg {
case NoCompression:
return "None"
case S2Compression:
return "S2"
default:
return "Unknown StoreCompression"
}
}

func (alg StoreCompression) MarshalJSON() ([]byte, error) {
var str string
switch alg {
case S2Compression:
str = "s2"
case NoCompression:
str = "none"
default:
return nil, fmt.Errorf("unknown compression algorithm")
}
return json.Marshal(str)
}

func (alg *StoreCompression) UnmarshalJSON(b []byte) error {
var str string
if err := json.Unmarshal(b, &str); err != nil {
return err
}
switch str {
case "s2":
*alg = S2Compression
case "none":
*alg = NoCompression
default:
return fmt.Errorf("unknown compression algorithm")
}
return nil
}

// Length of our hash used for named consumers.
const nameHashLen = 8

Expand Down
60 changes: 36 additions & 24 deletions jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,30 +102,32 @@ type JetStreamManager interface {
// There are sensible defaults for most. If no subjects are
// given the name will be used as the only subject.
type StreamConfig struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
Discard DiscardPolicy `json:"discard"`
DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"`
MaxAge time.Duration `json:"max_age"`
MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
Sealed bool `json:"sealed,omitempty"`
DenyDelete bool `json:"deny_delete,omitempty"`
DenyPurge bool `json:"deny_purge,omitempty"`
AllowRollup bool `json:"allow_rollup_hdrs,omitempty"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
Discard DiscardPolicy `json:"discard"`
DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"`
MaxAge time.Duration `json:"max_age"`
MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
Sealed bool `json:"sealed,omitempty"`
DenyDelete bool `json:"deny_delete,omitempty"`
DenyPurge bool `json:"deny_purge,omitempty"`
AllowRollup bool `json:"allow_rollup_hdrs,omitempty"`
Compression StoreCompression `json:"compression"`
FirstSeq uint64 `json:"first_seq,omitempty"`

// Allow applying a subject transform to incoming messages before doing anything else.
SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"`
Expand All @@ -138,6 +140,9 @@ type StreamConfig struct {
// Allow higher performance and unified direct access for mirrors as well.
MirrorDirect bool `json:"mirror_direct"`

// Limits for consumers on this stream.
ConsumerLimits StreamConsumerLimits `json:"consumer_limits,omitempty"`

// Metadata is additional metadata for the Stream.
// Keys starting with `_nats` are reserved.
// NOTE: Metadata requires nats-server v2.10.0+
Expand Down Expand Up @@ -182,6 +187,13 @@ type ExternalStream struct {
DeliverPrefix string `json:"deliver,omitempty"`
}

// StreamConsumerLimits are the limits for a consumer on a stream.
// These can be overridden on a per consumer basis.
type StreamConsumerLimits struct {
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
}

// Helper for copying when we do not want to change user's version.
func (ss *StreamSource) copy() *StreamSource {
nss := *ss
Expand Down

0 comments on commit 8ce44e0

Please sign in to comment.