Skip to content

Commit

Permalink
pretty much done?
Browse files Browse the repository at this point in the history
  • Loading branch information
juliannguyen4 committed Jan 5, 2024
1 parent 4d411e3 commit 01c380c
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 19 deletions.
13 changes: 10 additions & 3 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type Cluster struct {

metricsPolicy *MetricsPolicy
metricsEnabled bool
metricsListener *MetricsListener
metricsListener MetricsListener

tranCount iatomic.Int
retryCount iatomic.Int
Expand Down Expand Up @@ -176,10 +176,10 @@ func (clstr *Cluster) enableMetrics(policy *MetricsPolicy) {
clstr.metricsListener.onDisable(clstr)
}

listener := policy.listener
var listener MetricsListener = *policy.listener

if listener == nil {
listener = newMetricsWriter(policy.reportDir)
listener = &MetricsWriter{dir: policy.reportDir}
}

clstr.metricsListener = listener
Expand Down Expand Up @@ -829,6 +829,13 @@ func (clstr *Cluster) removeNodes(nodesToRemove []*Node) {
return nodesMap, nil
})

if clstr.metricsEnabled {
err := (*clstr.metricsListener).onNodeClose(node)
if err != nil {
logger.Logger.Warn("Write metrics failed on " + node.name + ": " + err.Error())
}
}

node.Close()
}

Expand Down
7 changes: 4 additions & 3 deletions metrics_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package aerospike

type MetricsListener interface {
// Takes in a cluster object to collect data about it
onEnable(cluster *Cluster, policy *MetricsPolicy)
onSnapshot(cluster *Cluster)
onDisable(cluster *Cluster)
onEnable(cluster *Cluster, policy *MetricsPolicy) error
onSnapshot(cluster *Cluster) error
onNodeClose(node *Node) error
onDisable(cluster *Cluster) error
}
29 changes: 20 additions & 9 deletions metrics_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type MetricsWriter struct {
}

func (mw *MetricsWriter) onEnable(clstr *Cluster, policy *MetricsPolicy) error {
// TODO: format time properly
mw.sb.Reset()
now := time.Now()

Expand All @@ -48,8 +47,7 @@ func (mw *MetricsWriter) onEnable(clstr *Cluster, policy *MetricsPolicy) error {
mw.sb.WriteString(" header(1)")
// TODO: need to update this
mw.sb.WriteString(" cluster[name,cpu,mem,tranCount,retryCount,node[]]")
mw.sb.WriteString(" node[name,address,port,syncConn,errors,timeouts,latency[]]")
mw.sb.WriteString(" conn[inUse,inPool,opened,closed]")
mw.sb.WriteString(" node[name,address,port,errors,latency[]]")
mw.sb.WriteString(" latency(")
mw.sb.WriteString(string(policy.latencyColumns))
mw.sb.WriteString(",")
Expand All @@ -72,24 +70,37 @@ func (mw *MetricsWriter) writeLine() error {
return nil
}

func (mw *MetricsWriter) onSnapshot(clstr *Cluster) {
func (mw *MetricsWriter) onSnapshot(clstr *Cluster) error {
if mw.enabled {
mw.writeCluster(clstr)
err := mw.writeCluster(clstr)
if err != nil {
return err
}
}
return nil
}

func (mw *MetricsWriter) onNodeClose(node *Node) {
func (mw *MetricsWriter) onNodeClose(node *Node) error {
var err error = nil
if mw.enabled {
mw.sb.Reset()

mw.sb.WriteString(time.Now().Format(TIMESTAMP_FORMAT))
mw.sb.WriteString(" node")
mw.writeNode(node)
err = mw.writeLine()
}
return err
}

func (mw *MetricsWriter) onDisable(clstr *Cluster) {
func (mw *MetricsWriter) onDisable(clstr *Cluster) error {
if mw.enabled {
mw.enabled = false
mw.writeCluster(clstr)
err := mw.writeCluster(clstr)
if err != nil {
return err
}
}
return nil
}

func (mw *MetricsWriter) writeCluster(clstr *Cluster) error {
Expand Down
4 changes: 4 additions & 0 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ func newNode(cluster *Cluster, nv *nodeValidator) *Node {
return newNode
}

func (nd *Node) enableMetrics(policy *MetricsPolicy) {
nd.metrics = newNodeMetrics(policy)
}

// SupportsBatchAny returns true if the node supports the feature.
func (nd *Node) SupportsBatchAny() bool {
return (nd.features & _SUPPORTS_BATCH_ANY) != 0
Expand Down
4 changes: 0 additions & 4 deletions node_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,3 @@ func (ns *nodeStats) UnmarshalJSON(data []byte) error {

return nil
}

func (ns *nodeStats) String() String {

}

0 comments on commit 01c380c

Please sign in to comment.