From 30cc35632d58d9d9dd4ccaf44d0a71ffd35cd82e Mon Sep 17 00:00:00 2001 From: Matt Bostock Date: Thu, 28 Sep 2017 20:02:52 +0100 Subject: [PATCH 1/2] write: Set partition key salt using HTTP header Allow an optional HTTP header, `X-AthensDB-Partition-Key-Salt` to be set in HTTP requests sending data to the cluster that adds a salt to the partition key which can be used to better distribute data across the cluster. One use for the partition key salt is to support multiple tenants; by using the tenant ID as the salt, data will be more evenly distributed if multiple tenants try to write the same time-series. --- internal/write/write.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/write/write.go b/internal/write/write.go index 0668f40e..32cbd80d 100644 --- a/internal/write/write.go +++ b/internal/write/write.go @@ -24,6 +24,7 @@ import ( const ( HttpHeaderInternalWrite = "X-AthensDB-Internal-Write-Version" HttpHeaderInternalWriteVersion = "0.0.1" + HttpHeaderPartitionKeySalt = "X-AthensDB-Partition-Key-Salt" Route = "/receive" httpHeaderRemoteWrite = "X-Prometheus-Remote-Write-Version" @@ -116,6 +117,7 @@ func (wr *writer) Handler(w http.ResponseWriter, r *http.Request) { seriesToNodes[*n] = make(seriesMap, numPreallocTimeseries) } + pSalt := []byte(r.Header.Get(HttpHeaderPartitionKeySalt)) for _, ts := range req.Timeseries { m := make(labels.Labels, 0, len(ts.Labels)) for _, l := range ts.Labels { @@ -131,7 +133,7 @@ func (wr *writer) Handler(w http.ResponseWriter, r *http.Request) { for _, s := range ts.Samples { timestamp := time.Unix(s.Timestamp/1000, (s.Timestamp-s.Timestamp/1000)*1e6) // FIXME: Avoid panic if the cluster is not yet initialised - pKey := cluster.PartitionKey([]byte{}, timestamp) + pKey := cluster.PartitionKey(pSalt, timestamp) for _, n := range wr.clstr.NodesByPartitionKey(pKey) { if _, ok := seriesToNodes[*n][mHash]; !ok { // FIXME handle change in cluster size From 4da88ae24e4835d21ee747b8cfdc0f5914056f1b Mon Sep 17 00:00:00 2001 From: Matt Bostock Date: Thu, 28 Sep 2017 20:21:47 +0100 Subject: [PATCH 2/2] write: Make HTTP all caps HTTP is an acronym, make it all caps. --- internal/write/write.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/write/write.go b/internal/write/write.go index 32cbd80d..a4efd22e 100644 --- a/internal/write/write.go +++ b/internal/write/write.go @@ -22,9 +22,9 @@ import ( ) const ( - HttpHeaderInternalWrite = "X-AthensDB-Internal-Write-Version" - HttpHeaderInternalWriteVersion = "0.0.1" - HttpHeaderPartitionKeySalt = "X-AthensDB-Partition-Key-Salt" + HTTPHeaderInternalWrite = "X-AthensDB-Internal-Write-Version" + HTTPHeaderInternalWriteVersion = "0.0.1" + HTTPHeaderPartitionKeySalt = "X-AthensDB-Partition-Key-Salt" Route = "/receive" httpHeaderRemoteWrite = "X-Prometheus-Remote-Write-Version" @@ -80,7 +80,7 @@ func (wr *writer) Handler(w http.ResponseWriter, r *http.Request) { // This is an internal write, so don't replicate it to other nodes // This case is very common, to make it fast - if r.Header.Get(HttpHeaderInternalWrite) != "" { + if r.Header.Get(HTTPHeaderInternalWrite) != "" { wr.mu.Lock() appender, err := wr.store.Appender() if err != nil { @@ -117,7 +117,7 @@ func (wr *writer) Handler(w http.ResponseWriter, r *http.Request) { seriesToNodes[*n] = make(seriesMap, numPreallocTimeseries) } - pSalt := []byte(r.Header.Get(HttpHeaderPartitionKeySalt)) + pSalt := []byte(r.Header.Get(HTTPHeaderPartitionKeySalt)) for _, ts := range req.Timeseries { m := make(labels.Labels, 0, len(ts.Labels)) for _, l := range ts.Labels { @@ -241,7 +241,7 @@ func (wr *writer) remoteWrite(sNodeMap seriesNodeMap) error { nodeReq.Header.Add("Content-Encoding", "snappy") nodeReq.Header.Set("Content-Type", "application/x-protobuf") nodeReq.Header.Set(httpHeaderRemoteWrite, httpHeaderRemoteWriteVersion) - nodeReq.Header.Set(HttpHeaderInternalWrite, HttpHeaderInternalWriteVersion) + nodeReq.Header.Set(HTTPHeaderInternalWrite, HTTPHeaderInternalWriteVersion) // FIXME set timeout using context httpResp, err := ctxhttp.Do(context.TODO(), http.DefaultClient, nodeReq)