From 691e61b8afc8791f0426eaedc4edce922763205e Mon Sep 17 00:00:00 2001
From: Stiven Deleur <sdeleur@bloomberg.net>
Date: Tue, 7 Aug 2018 11:07:39 -0400
Subject: [PATCH 1/6] wip

---
 expr/func_removeabovebelowpercentile.go      |  88 ++++++
 expr/func_removeabovebelowpercentile_test.go | 272 +++++++++++++++++++
 expr/funcs.go                                |   2 +
 3 files changed, 362 insertions(+)
 create mode 100644 expr/func_removeabovebelowpercentile.go
 create mode 100644 expr/func_removeabovebelowpercentile_test.go

diff --git a/expr/func_removeabovebelowpercentile.go b/expr/func_removeabovebelowpercentile.go
new file mode 100644
index 0000000000..339d49bb12
--- /dev/null
+++ b/expr/func_removeabovebelowpercentile.go
@@ -0,0 +1,88 @@
+package expr
+
+import (
+	"fmt"
+	"math"
+	"sort"
+
+	schema "gopkg.in/raintank/schema.v1"
+
+	"github.com/grafana/metrictank/api/models"
+)
+
+type FuncRemoveAboveBelowPercentile struct {
+	in    GraphiteFunc
+	n     float64
+	above bool
+}
+
+func NewRemoveAboveBelowPercentileConstructor(above bool) func() GraphiteFunc {
+	return func() GraphiteFunc {
+		return &FuncRemoveAboveBelowPercentile{above: above}
+	}
+}
+
+func (s *FuncRemoveAboveBelowPercentile) Signature() ([]Arg, []Arg) {
+
+	return []Arg{
+		ArgSeriesList{val: &s.in},
+		ArgFloat{key: "n", val: &s.n},
+	}, []Arg{ArgSeriesList{}}
+}
+
+func (s *FuncRemoveAboveBelowPercentile) Context(context Context) Context {
+	return context
+}
+
+func (s *FuncRemoveAboveBelowPercentile) Exec(cache map[Req][]models.Series) ([]models.Series, error) {
+	series, err := s.in.Exec(cache)
+	if err != nil {
+		return nil, err
+	}
+
+	var output []models.Series
+	for _, serie := range series {
+		if s.above {
+			serie.Target = fmt.Sprintf("removeAbovePercentile(%s, %g)", serie.Target, s.n)
+		} else {
+			serie.Target = fmt.Sprintf("removeBelowPercentile(%s, %g)", serie.Target, s.n)
+		}
+		serie.QueryPatt = serie.Target
+
+		out := pointSlicePool.Get().([]schema.Point)
+		for _, p := range serie.Datapoints {
+			if s.above {
+				if p.Val > s.n {
+					p.Val = math.NaN()
+				}
+			} else {
+				if p.Val < s.n {
+					p.Val = math.NaN()
+				}
+			}
+			out = append(out, p)
+		}
+		serie.Datapoints = out
+		output = append(output, serie)
+	}
+
+	cache[Req{}] = append(cache[Req{}], output...)
+
+	return output, nil
+}
+
+func getPercentileValue(datapoints []schema.Point, n float64) float64 {
+	sortedDatapoints := make([]schema.Point, len(datapoints))
+	for _, p := range datapoints {
+		if !math.IsNaN(p.Val) {
+			sortedDatapoints = append(sortedDatapoints, p)
+		}
+	}
+
+	sort.Slice(sortedDatapoints, func(i, j int) bool {
+		return sortedDatapoints[i].Val < sortedDatapoints[j].Val
+	})
+
+	index := math.Ceil(n/100.0*float64(len(sortedDatapoints)+1)) - 1
+	return sortedDatapoints[int(index)].Val
+}
diff --git a/expr/func_removeabovebelowpercentile_test.go b/expr/func_removeabovebelowpercentile_test.go
new file mode 100644
index 0000000000..a4cdc2e7c3
--- /dev/null
+++ b/expr/func_removeabovebelowpercentile_test.go
@@ -0,0 +1,272 @@
+package expr
+
+import (
+	"math"
+	"math/rand"
+	"strconv"
+	"testing"
+
+	"github.com/grafana/metrictank/api/models"
+	"github.com/grafana/metrictank/test"
+	"gopkg.in/raintank/schema.v1"
+)
+
+func TestRemoveAbovePercentileSingleAllNonNull(t *testing.T) {
+	testRemoveAboveBelowPercentile(
+		"removeAbovePercentile",
+		true,
+		199,
+		[]models.Series{
+			{
+				Interval:   10,
+				QueryPatt:  "abcd",
+				Target:     "a",
+				Datapoints: getCopy(a),
+			},
+			{
+				Interval:   10,
+				QueryPatt:  "abcd",
+				Target:     "b",
+				Datapoints: getCopy(b),
+			},
+			{
+				Interval:   10,
+				QueryPatt:  "abcd",
+				Target:     "c",
+				Datapoints: getCopy(c),
+			},
+			{
+				Interval:   10,
+				QueryPatt:  "abcd",
+				Target:     "d",
+				Datapoints: getCopy(d),
+			},
+		},
+		[]models.Series{
+			{
+				Interval:  10,
+				QueryPatt: "removeAbovePercentile(a, 199)",
+				Datapoints: []schema.Point{
+					{Val: 0, Ts: 10},
+					{Val: 0, Ts: 20},
+					{Val: 5.5, Ts: 30},
+					{Val: math.NaN(), Ts: 40},
+					{Val: math.NaN(), Ts: 50},
+					{Val: math.NaN(), Ts: 60},
+				},
+			},
+			{
+				Interval:  10,
+				QueryPatt: "removeAbovePercentile(b, 199)",
+				Datapoints: []schema.Point{
+					{Val: 0, Ts: 10},
+					{Val: math.NaN(), Ts: 20},
+					{Val: math.NaN(), Ts: 30},
+					{Val: math.NaN(), Ts: 40},
+					{Val: math.NaN(), Ts: 50},
+					{Val: math.NaN(), Ts: 60},
+				},
+			},
+			{
+				Interval:  10,
+				QueryPatt: "removeAbovePercentile(c, 199)",
+				Datapoints: []schema.Point{
+					{Val: 0, Ts: 10},
+					{Val: 0, Ts: 20},
+					{Val: 1, Ts: 30},
+					{Val: 2, Ts: 40},
+					{Val: 3, Ts: 50},
+					{Val: 4, Ts: 60},
+				},
+			},
+			{
+				Interval:  10,
+				QueryPatt: "removeAbovePercentile(d, 199)",
+				Datapoints: []schema.Point{
+					{Val: 0, Ts: 10},
+					{Val: 33, Ts: 20},
+					{Val: 199, Ts: 30},
+					{Val: 29, Ts: 40},
+					{Val: 80, Ts: 50},
+					{Val: math.NaN(), Ts: 60},
+				},
+			},
+		},
+		t,
+	)
+}
+
+func TestRemoveBelowPercentileSingleAllNonNull(t *testing.T) {
+	testRemoveAboveBelowPercentile(
+		"removeBelowPercentile",
+		false,
+		199,
+		[]models.Series{
+			{
+				Interval:   10,
+				QueryPatt:  "abcd",
+				Target:     "a",
+				Datapoints: getCopy(a),
+			},
+			{
+				Interval:   10,
+				QueryPatt:  "abcd",
+				Target:     "b",
+				Datapoints: getCopy(b),
+			},
+			{
+				Interval:   10,
+				QueryPatt:  "abcd",
+				Target:     "c",
+				Datapoints: getCopy(c),
+			},
+			{
+				Interval:   10,
+				QueryPatt:  "abcd",
+				Target:     "d",
+				Datapoints: getCopy(d),
+			},
+		},
+		[]models.Series{
+			{
+				Interval:  10,
+				QueryPatt: "removeBelowPercentile(a, 199)",
+				Datapoints: []schema.Point{
+					{Val: math.NaN(), Ts: 10},
+					{Val: math.NaN(), Ts: 20},
+					{Val: math.NaN(), Ts: 30},
+					{Val: math.NaN(), Ts: 40},
+					{Val: math.NaN(), Ts: 50},
+					{Val: 1234567890, Ts: 60},
+				},
+			},
+			{
+				Interval:  10,
+				QueryPatt: "removeBelowPercentile(b, 199)",
+				Datapoints: []schema.Point{
+					{Val: math.NaN(), Ts: 10},
+					{Val: math.MaxFloat64, Ts: 20},
+					{Val: math.MaxFloat64 - 20, Ts: 30},
+					{Val: math.NaN(), Ts: 40},
+					{Val: 1234567890, Ts: 50},
+					{Val: math.NaN(), Ts: 60},
+				},
+			},
+			{
+				Interval:  10,
+				QueryPatt: "removeBelowPercentile(c, 199)",
+				Datapoints: []schema.Point{
+					{Val: math.NaN(), Ts: 10},
+					{Val: math.NaN(), Ts: 20},
+					{Val: math.NaN(), Ts: 30},
+					{Val: math.NaN(), Ts: 40},
+					{Val: math.NaN(), Ts: 50},
+					{Val: math.NaN(), Ts: 60},
+				},
+			},
+			{
+				Interval:  10,
+				QueryPatt: "removeBelowPercentile(d, 199)",
+				Datapoints: []schema.Point{
+					{Val: math.NaN(), Ts: 10},
+					{Val: math.NaN(), Ts: 20},
+					{Val: 199, Ts: 30},
+					{Val: math.NaN(), Ts: 40},
+					{Val: math.NaN(), Ts: 50},
+					{Val: 250, Ts: 60},
+				},
+			},
+		},
+		t,
+	)
+}
+
+func testRemoveAboveBelowPercentile(name string, above bool, n float64, in []models.Series, out []models.Series, t *testing.T) {
+	f := NewRemoveAboveBelowPercentileConstructor(above)()
+	f.(*FuncRemoveAboveBelowPercentile).in = NewMock(in)
+	f.(*FuncRemoveAboveBelowPercentile).n = n
+	gots, err := f.Exec(make(map[Req][]models.Series))
+	if err != nil {
+		t.Fatalf("case %q (%f): err should be nil. got %q", name, n, err)
+	}
+	if len(gots) != len(out) {
+		t.Fatalf("case %q (%f): isNonNull len output expected %d, got %d", name, n, len(out), len(gots))
+	}
+	for i, g := range gots {
+		exp := out[i]
+		if g.QueryPatt != exp.QueryPatt {
+			t.Fatalf("case %q (%f): expected target %q, got %q", name, n, exp.QueryPatt, g.QueryPatt)
+		}
+		if len(g.Datapoints) != len(exp.Datapoints) {
+			t.Fatalf("case %q (%f) len output expected %d, got %d", name, n, len(exp.Datapoints), len(g.Datapoints))
+		}
+		for j, p := range g.Datapoints {
+			bothNaN := math.IsNaN(p.Val) && math.IsNaN(exp.Datapoints[j].Val)
+			if (bothNaN || p.Val == exp.Datapoints[j].Val) && p.Ts == exp.Datapoints[j].Ts {
+				continue
+			}
+			t.Fatalf("case %q (%f): output point %d - expected %v got %v", name, n, j, exp.Datapoints[j], p)
+		}
+	}
+}
+func BenchmarkRemoveAboveBelowPercentile10k_1NoNulls(b *testing.B) {
+	benchmarkRemoveAboveBelowPercentile(b, 1, test.RandFloats10k, test.RandFloats10k)
+}
+func BenchmarkRemoveAboveBelowPercentile10k_10NoNulls(b *testing.B) {
+	benchmarkRemoveAboveBelowPercentile(b, 10, test.RandFloats10k, test.RandFloats10k)
+}
+func BenchmarkRemoveAboveBelowPercentile10k_100NoNulls(b *testing.B) {
+	benchmarkRemoveAboveBelowPercentile(b, 100, test.RandFloats10k, test.RandFloats10k)
+}
+func BenchmarkRemoveAboveBelowPercentile10k_1000NoNulls(b *testing.B) {
+	benchmarkRemoveAboveBelowPercentile(b, 1000, test.RandFloats10k, test.RandFloats10k)
+}
+func BenchmarkRemoveAboveBelowPercentile10k_1SomeSeriesHalfNulls(b *testing.B) {
+	benchmarkRemoveAboveBelowPercentile(b, 1, test.RandFloats10k, test.RandFloatsWithNulls10k)
+}
+func BenchmarkRemoveAboveBelowPercentile10k_10SomeSeriesHalfNulls(b *testing.B) {
+	benchmarkRemoveAboveBelowPercentile(b, 10, test.RandFloats10k, test.RandFloatsWithNulls10k)
+}
+func BenchmarkRemoveAboveBelowPercentile10k_100SomeSeriesHalfNulls(b *testing.B) {
+	benchmarkRemoveAboveBelowPercentile(b, 100, test.RandFloats10k, test.RandFloatsWithNulls10k)
+}
+func BenchmarkRemoveAboveBelowPercentile10k_1000SomeSeriesHalfNulls(b *testing.B) {
+	benchmarkRemoveAboveBelowPercentile(b, 1000, test.RandFloats10k, test.RandFloatsWithNulls10k)
+}
+func BenchmarkRemoveAboveBelowPercentile10k_1AllSeriesHalfNulls(b *testing.B) {
+	benchmarkRemoveAboveBelowPercentile(b, 1, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
+}
+func BenchmarkRemoveAboveBelowPercentile10k_10AllSeriesHalfNulls(b *testing.B) {
+	benchmarkRemoveAboveBelowPercentile(b, 10, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
+}
+func BenchmarkRemoveAboveBelowPercentile10k_100AllSeriesHalfNulls(b *testing.B) {
+	benchmarkRemoveAboveBelowPercentile(b, 100, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
+}
+func BenchmarkRemoveAboveBelowPercentile10k_1000AllSeriesHalfNulls(b *testing.B) {
+	benchmarkRemoveAboveBelowPercentile(b, 1000, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
+}
+func benchmarkRemoveAboveBelowPercentile(b *testing.B, numSeries int, fn0, fn1 func() []schema.Point) {
+	var input []models.Series
+	for i := 0; i < numSeries; i++ {
+		series := models.Series{
+			QueryPatt: strconv.Itoa(i),
+		}
+		if i%2 == 0 {
+			series.Datapoints = fn0()
+		} else {
+			series.Datapoints = fn1()
+		}
+		input = append(input, series)
+	}
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		f := NewRemoveAboveBelowPercentileConstructor(rand.Int()%2 == 0)()
+		f.(*FuncRemoveAboveBelowPercentile).in = NewMock(input)
+		f.(*FuncRemoveAboveBelowPercentile).n = rand.Float64()
+		got, err := f.Exec(make(map[Req][]models.Series))
+		if err != nil {
+			b.Fatalf("%s", err)
+		}
+		results = got
+	}
+}
diff --git a/expr/funcs.go b/expr/funcs.go
index 5313a1f327..dc6d10ef34 100644
--- a/expr/funcs.go
+++ b/expr/funcs.go
@@ -82,7 +82,9 @@ func init() {
 		"nonNegativeDerivative": {NewNonNegativeDerivative, true},
 		"perSecond":             {NewPerSecond, true},
 		"rangeOfSeries":         {NewAggregateConstructor("rangeOf", crossSeriesRange), true},
+		"removeAbovePercentile": {NewRemoveAboveBelowPercentileConstructor(true), true},
 		"removeAboveValue":      {NewRemoveAboveBelowValueConstructor(true), true},
+		"removeBelowPercentile": {NewRemoveAboveBelowPercentileConstructor(false), true},
 		"removeBelowValue":      {NewRemoveAboveBelowValueConstructor(false), true},
 		"scale":                 {NewScale, true},
 		"scaleToSeconds":        {NewScaleToSeconds, true},

From e8bf0d39b26ac06f43eb855e7c6602d51a1ec32f Mon Sep 17 00:00:00 2001
From: Stiven Deleur <sdeleur@bloomberg.net>
Date: Tue, 7 Aug 2018 15:35:13 -0400
Subject: [PATCH 2/6] changed readme and tests

---
 docs/graphite.md                             | 6 +++---
 expr/func_removeabovebelowpercentile_test.go | 6 +++---
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/docs/graphite.md b/docs/graphite.md
index 45c1ced0b2..26404cc36a 100644
--- a/docs/graphite.md
+++ b/docs/graphite.md
@@ -35,7 +35,7 @@ See also:
 
 
 | Function name and signature                                    | Alias        | Metrictank |
-| -------------------------------------------------------------- | -----------  | ---------- |
+| -------------------------------------------------------------- | ------------ | ---------- |
 | absolute                                                       |              | No         |
 | aggregate                                                      |              | No         |
 | aggregateLine                                                  |              | No         |
@@ -136,9 +136,9 @@ See also:
 | randomWalkFunction                                             | randomWalk   | No         |
 | rangeOfSeries(seriesList) series                               |              | Stable     |
 | reduceSeries                                                   | reduce       | No         |
-| removeAbovePercentile                                          |              | No         |
+| removeAbovePercentile(seriesList, n) seriesList                |              | No         |
 | removeAboveValue(seriesList, n) seriesList                     |              | Stable     |
-| removeBelowPercentile                                          |              | No         |
+| removeBelowPercentile(seriesList, n) seriesList                |              | No         |
 | removeBelowValue(seriesList, n) seriesList                     |              | Stable     |
 | removeBetweenPercentile                                        |              | No         |
 | removeEmptySeries                                              |              | No         |
diff --git a/expr/func_removeabovebelowpercentile_test.go b/expr/func_removeabovebelowpercentile_test.go
index a4cdc2e7c3..6bb0946063 100644
--- a/expr/func_removeabovebelowpercentile_test.go
+++ b/expr/func_removeabovebelowpercentile_test.go
@@ -15,7 +15,7 @@ func TestRemoveAbovePercentileSingleAllNonNull(t *testing.T) {
 	testRemoveAboveBelowPercentile(
 		"removeAbovePercentile",
 		true,
-		199,
+		60,
 		[]models.Series{
 			{
 				Interval:   10,
@@ -100,7 +100,7 @@ func TestRemoveBelowPercentileSingleAllNonNull(t *testing.T) {
 	testRemoveAboveBelowPercentile(
 		"removeBelowPercentile",
 		false,
-		199,
+		50,
 		[]models.Series{
 			{
 				Interval:   10,
@@ -262,7 +262,7 @@ func benchmarkRemoveAboveBelowPercentile(b *testing.B, numSeries int, fn0, fn1 f
 	for i := 0; i < b.N; i++ {
 		f := NewRemoveAboveBelowPercentileConstructor(rand.Int()%2 == 0)()
 		f.(*FuncRemoveAboveBelowPercentile).in = NewMock(input)
-		f.(*FuncRemoveAboveBelowPercentile).n = rand.Float64()
+		f.(*FuncRemoveAboveBelowPercentile).n = float64(rand.Int()%100 + 1)
 		got, err := f.Exec(make(map[Req][]models.Series))
 		if err != nil {
 			b.Fatalf("%s", err)

From 550eac3e74dac9e6bfb86e460225c45f5bf23f78 Mon Sep 17 00:00:00 2001
From: Stiven Deleur <sdeleur@bloomberg.net>
Date: Tue, 7 Aug 2018 16:36:22 -0400
Subject: [PATCH 3/6] add error

---
 expr/func_removeabovebelowpercentile.go | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/expr/func_removeabovebelowpercentile.go b/expr/func_removeabovebelowpercentile.go
index 339d49bb12..a03a54008d 100644
--- a/expr/func_removeabovebelowpercentile.go
+++ b/expr/func_removeabovebelowpercentile.go
@@ -1,6 +1,7 @@
 package expr
 
 import (
+	"errors"
 	"fmt"
 	"math"
 	"sort"
@@ -40,6 +41,10 @@ func (s *FuncRemoveAboveBelowPercentile) Exec(cache map[Req][]models.Series) ([]
 		return nil, err
 	}
 
+	if s.n <= 0 {
+		return nil, errors.New("The requested percent is required to be greater than 0")
+	}
+
 	var output []models.Series
 	for _, serie := range series {
 		if s.above {

From 8d6cd9be0d670928d3613c5cf8be9ad08a93cf3f Mon Sep 17 00:00:00 2001
From: Stiven Deleur <sdeleur@bloomberg.net>
Date: Wed, 8 Aug 2018 16:05:28 -0400
Subject: [PATCH 4/6] added tests, fixed bug

---
 expr/func_removeabovebelowpercentile.go      | 18 +++++++---
 expr/func_removeabovebelowpercentile_test.go | 36 ++++++++++----------
 2 files changed, 32 insertions(+), 22 deletions(-)

diff --git a/expr/func_removeabovebelowpercentile.go b/expr/func_removeabovebelowpercentile.go
index a03a54008d..1f698788e6 100644
--- a/expr/func_removeabovebelowpercentile.go
+++ b/expr/func_removeabovebelowpercentile.go
@@ -54,14 +54,23 @@ func (s *FuncRemoveAboveBelowPercentile) Exec(cache map[Req][]models.Series) ([]
 		}
 		serie.QueryPatt = serie.Target
 
+		newTags := make(map[string]string, len(serie.Tags)+1)
+		for k, v := range serie.Tags {
+			newTags[k] = v
+		}
+		newTags["nPercentile"] = fmt.Sprintf("%g", s.n)
+		serie.Tags = newTags
+
+		percentile := getPercentileValue(serie.Datapoints, s.n)
+
 		out := pointSlicePool.Get().([]schema.Point)
 		for _, p := range serie.Datapoints {
 			if s.above {
-				if p.Val > s.n {
+				if p.Val > percentile {
 					p.Val = math.NaN()
 				}
 			} else {
-				if p.Val < s.n {
+				if p.Val < percentile {
 					p.Val = math.NaN()
 				}
 			}
@@ -77,7 +86,7 @@ func (s *FuncRemoveAboveBelowPercentile) Exec(cache map[Req][]models.Series) ([]
 }
 
 func getPercentileValue(datapoints []schema.Point, n float64) float64 {
-	sortedDatapoints := make([]schema.Point, len(datapoints))
+	sortedDatapoints := make([]schema.Point, 0, len(datapoints))
 	for _, p := range datapoints {
 		if !math.IsNaN(p.Val) {
 			sortedDatapoints = append(sortedDatapoints, p)
@@ -88,6 +97,7 @@ func getPercentileValue(datapoints []schema.Point, n float64) float64 {
 		return sortedDatapoints[i].Val < sortedDatapoints[j].Val
 	})
 
-	index := math.Ceil(n/100.0*float64(len(sortedDatapoints)+1)) - 1
+	index := math.Min(math.Ceil(n/100.0*float64(len(sortedDatapoints)+1)), float64(len(sortedDatapoints))) - 1
+
 	return sortedDatapoints[int(index)].Val
 }
diff --git a/expr/func_removeabovebelowpercentile_test.go b/expr/func_removeabovebelowpercentile_test.go
index 6bb0946063..a4af27c96e 100644
--- a/expr/func_removeabovebelowpercentile_test.go
+++ b/expr/func_removeabovebelowpercentile_test.go
@@ -45,7 +45,7 @@ func TestRemoveAbovePercentileSingleAllNonNull(t *testing.T) {
 		[]models.Series{
 			{
 				Interval:  10,
-				QueryPatt: "removeAbovePercentile(a, 199)",
+				QueryPatt: "removeAbovePercentile(a, 60)",
 				Datapoints: []schema.Point{
 					{Val: 0, Ts: 10},
 					{Val: 0, Ts: 20},
@@ -57,31 +57,31 @@ func TestRemoveAbovePercentileSingleAllNonNull(t *testing.T) {
 			},
 			{
 				Interval:  10,
-				QueryPatt: "removeAbovePercentile(b, 199)",
+				QueryPatt: "removeAbovePercentile(b, 60)",
 				Datapoints: []schema.Point{
 					{Val: 0, Ts: 10},
-					{Val: math.NaN(), Ts: 20},
-					{Val: math.NaN(), Ts: 30},
+					{Val: math.MaxFloat64, Ts: 20},
+					{Val: math.MaxFloat64 - 20, Ts: 30},
 					{Val: math.NaN(), Ts: 40},
-					{Val: math.NaN(), Ts: 50},
+					{Val: 1234567890, Ts: 50},
 					{Val: math.NaN(), Ts: 60},
 				},
 			},
 			{
 				Interval:  10,
-				QueryPatt: "removeAbovePercentile(c, 199)",
+				QueryPatt: "removeAbovePercentile(c, 60)",
 				Datapoints: []schema.Point{
 					{Val: 0, Ts: 10},
 					{Val: 0, Ts: 20},
 					{Val: 1, Ts: 30},
 					{Val: 2, Ts: 40},
 					{Val: 3, Ts: 50},
-					{Val: 4, Ts: 60},
+					{Val: math.NaN(), Ts: 60},
 				},
 			},
 			{
 				Interval:  10,
-				QueryPatt: "removeAbovePercentile(d, 199)",
+				QueryPatt: "removeAbovePercentile(d, 60)",
 				Datapoints: []schema.Point{
 					{Val: 0, Ts: 10},
 					{Val: 33, Ts: 20},
@@ -130,11 +130,11 @@ func TestRemoveBelowPercentileSingleAllNonNull(t *testing.T) {
 		[]models.Series{
 			{
 				Interval:  10,
-				QueryPatt: "removeBelowPercentile(a, 199)",
+				QueryPatt: "removeBelowPercentile(a, 50)",
 				Datapoints: []schema.Point{
 					{Val: math.NaN(), Ts: 10},
 					{Val: math.NaN(), Ts: 20},
-					{Val: math.NaN(), Ts: 30},
+					{Val: 5.5, Ts: 30},
 					{Val: math.NaN(), Ts: 40},
 					{Val: math.NaN(), Ts: 50},
 					{Val: 1234567890, Ts: 60},
@@ -142,37 +142,37 @@ func TestRemoveBelowPercentileSingleAllNonNull(t *testing.T) {
 			},
 			{
 				Interval:  10,
-				QueryPatt: "removeBelowPercentile(b, 199)",
+				QueryPatt: "removeBelowPercentile(b, 50)",
 				Datapoints: []schema.Point{
 					{Val: math.NaN(), Ts: 10},
 					{Val: math.MaxFloat64, Ts: 20},
 					{Val: math.MaxFloat64 - 20, Ts: 30},
 					{Val: math.NaN(), Ts: 40},
-					{Val: 1234567890, Ts: 50},
+					{Val: math.NaN(), Ts: 50},
 					{Val: math.NaN(), Ts: 60},
 				},
 			},
 			{
 				Interval:  10,
-				QueryPatt: "removeBelowPercentile(c, 199)",
+				QueryPatt: "removeBelowPercentile(c, 50)",
 				Datapoints: []schema.Point{
 					{Val: math.NaN(), Ts: 10},
 					{Val: math.NaN(), Ts: 20},
 					{Val: math.NaN(), Ts: 30},
-					{Val: math.NaN(), Ts: 40},
-					{Val: math.NaN(), Ts: 50},
-					{Val: math.NaN(), Ts: 60},
+					{Val: 2, Ts: 40},
+					{Val: 3, Ts: 50},
+					{Val: 4, Ts: 60},
 				},
 			},
 			{
 				Interval:  10,
-				QueryPatt: "removeBelowPercentile(d, 199)",
+				QueryPatt: "removeBelowPercentile(d, 50)",
 				Datapoints: []schema.Point{
 					{Val: math.NaN(), Ts: 10},
 					{Val: math.NaN(), Ts: 20},
 					{Val: 199, Ts: 30},
 					{Val: math.NaN(), Ts: 40},
-					{Val: math.NaN(), Ts: 50},
+					{Val: 80, Ts: 50},
 					{Val: 250, Ts: 60},
 				},
 			},

From a71921e17473f5f35c35b1ab6cb1dd31bd838396 Mon Sep 17 00:00:00 2001
From: Stiven Deleur <sdeleur@bloomberg.net>
Date: Mon, 13 Aug 2018 12:00:19 -0400
Subject: [PATCH 5/6] PR changes

---
 expr/func_removeabovebelowpercentile.go | 21 ++++++++++++---------
 1 file changed, 12 insertions(+), 9 deletions(-)

diff --git a/expr/func_removeabovebelowpercentile.go b/expr/func_removeabovebelowpercentile.go
index 1f698788e6..8ea1679506 100644
--- a/expr/func_removeabovebelowpercentile.go
+++ b/expr/func_removeabovebelowpercentile.go
@@ -45,7 +45,12 @@ func (s *FuncRemoveAboveBelowPercentile) Exec(cache map[Req][]models.Series) ([]
 		return nil, errors.New("The requested percent is required to be greater than 0")
 	}
 
+	if len(series) == 0 {
+		return series, nil
+	}
+
 	var output []models.Series
+	sortedDatapointVals := make([]float64, 0, len(series[0].Datapoints)) //reuse float64 slice
 	for _, serie := range series {
 		if s.above {
 			serie.Target = fmt.Sprintf("removeAbovePercentile(%s, %g)", serie.Target, s.n)
@@ -61,7 +66,7 @@ func (s *FuncRemoveAboveBelowPercentile) Exec(cache map[Req][]models.Series) ([]
 		newTags["nPercentile"] = fmt.Sprintf("%g", s.n)
 		serie.Tags = newTags
 
-		percentile := getPercentileValue(serie.Datapoints, s.n)
+		percentile := getPercentileValue(serie.Datapoints, s.n, sortedDatapointVals)
 
 		out := pointSlicePool.Get().([]schema.Point)
 		for _, p := range serie.Datapoints {
@@ -85,19 +90,17 @@ func (s *FuncRemoveAboveBelowPercentile) Exec(cache map[Req][]models.Series) ([]
 	return output, nil
 }
 
-func getPercentileValue(datapoints []schema.Point, n float64) float64 {
-	sortedDatapoints := make([]schema.Point, 0, len(datapoints))
+func getPercentileValue(datapoints []schema.Point, n float64, sortedDatapointVals []float64) float64 {
+	sortedDatapointVals = sortedDatapointVals[:0]
 	for _, p := range datapoints {
 		if !math.IsNaN(p.Val) {
-			sortedDatapoints = append(sortedDatapoints, p)
+			sortedDatapointVals = append(sortedDatapointVals, p.Val)
 		}
 	}
 
-	sort.Slice(sortedDatapoints, func(i, j int) bool {
-		return sortedDatapoints[i].Val < sortedDatapoints[j].Val
-	})
+	sort.Float64s(sortedDatapointVals)
 
-	index := math.Min(math.Ceil(n/100.0*float64(len(sortedDatapoints)+1)), float64(len(sortedDatapoints))) - 1
+	index := math.Min(math.Ceil(n/100.0*float64(len(sortedDatapointVals)+1)), float64(len(sortedDatapointVals))) - 1
 
-	return sortedDatapoints[int(index)].Val
+	return sortedDatapointVals[int(index)]
 }

From ddc1e0f0439cdc6c5fb76052f694673417719e83 Mon Sep 17 00:00:00 2001
From: Stiven Deleur <sdeleur@bloomberg.net>
Date: Wed, 15 Aug 2018 10:44:59 -0400
Subject: [PATCH 6/6] PR changes

---
 expr/func_removeabovebelowpercentile.go | 13 ++++++-------
 expr/validator.go                       |  8 ++++++++
 2 files changed, 14 insertions(+), 7 deletions(-)

diff --git a/expr/func_removeabovebelowpercentile.go b/expr/func_removeabovebelowpercentile.go
index 8ea1679506..c9d0466e86 100644
--- a/expr/func_removeabovebelowpercentile.go
+++ b/expr/func_removeabovebelowpercentile.go
@@ -1,7 +1,6 @@
 package expr
 
 import (
-	"errors"
 	"fmt"
 	"math"
 	"sort"
@@ -27,7 +26,7 @@ func (s *FuncRemoveAboveBelowPercentile) Signature() ([]Arg, []Arg) {
 
 	return []Arg{
 		ArgSeriesList{val: &s.in},
-		ArgFloat{key: "n", val: &s.n},
+		ArgFloat{key: "n", val: &s.n, validator: []Validator{NonNegativePercent}},
 	}, []Arg{ArgSeriesList{}}
 }
 
@@ -41,16 +40,14 @@ func (s *FuncRemoveAboveBelowPercentile) Exec(cache map[Req][]models.Series) ([]
 		return nil, err
 	}
 
-	if s.n <= 0 {
-		return nil, errors.New("The requested percent is required to be greater than 0")
-	}
-
 	if len(series) == 0 {
 		return series, nil
 	}
 
 	var output []models.Series
-	sortedDatapointVals := make([]float64, 0, len(series[0].Datapoints)) //reuse float64 slice
+
+	// will be reused for each getPercentileValue call
+	sortedDatapointVals := make([]float64, 0, len(series[0].Datapoints))
 	for _, serie := range series {
 		if s.above {
 			serie.Target = fmt.Sprintf("removeAbovePercentile(%s, %g)", serie.Target, s.n)
@@ -90,6 +87,8 @@ func (s *FuncRemoveAboveBelowPercentile) Exec(cache map[Req][]models.Series) ([]
 	return output, nil
 }
 
+// sortedDatapointVals is an empty slice to be used for sorting datapoints.
+// n must be > 0. if n > 100, the largest value is returned.
 func getPercentileValue(datapoints []schema.Point, n float64, sortedDatapointVals []float64) float64 {
 	sortedDatapointVals = sortedDatapointVals[:0]
 	for _, p := range datapoints {
diff --git a/expr/validator.go b/expr/validator.go
index 07f89e13e3..0a6397d69b 100644
--- a/expr/validator.go
+++ b/expr/validator.go
@@ -9,6 +9,7 @@ import (
 
 var ErrIntPositive = errors.New("integer must be positive")
 var ErrInvalidAggFunc = errors.New("Invalid aggregation func")
+var ErrNonNegativePercent = errors.New("The requested percent is required to be greater than 0")
 
 // Validator is a function to validate an input
 type Validator func(e *expr) error
@@ -44,3 +45,10 @@ func IsOperator(e *expr) error {
 	}
 	return errors.New("Unsupported operator: " + e.str)
 }
+
+func NonNegativePercent(e *expr) error {
+	if e.float < 0 || e.int < 0 {
+		return ErrNonNegativePercent
+	}
+	return nil
+}