Skip to content

Commit

Permalink
Merge pull request #340 from alikhtag/fix/carbonlink-py3-compatibility
Browse files Browse the repository at this point in the history
[carbonlink] Python 3 Pickle Compatible Metric Request Parser
  • Loading branch information
azhiltsov authored Jun 7, 2020
2 parents 48fb013 + 6a6e169 commit 0fdd9e5
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 7 deletions.
53 changes: 47 additions & 6 deletions cache/carbonlink.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func NewCarbonlinkRequest() *CarbonlinkRequest {
func pickleMaybeMemo(b *[]byte) bool { //"consumes" memo tokens
if len(*b) > 1 && (*b)[0] == 'q' {
*b = (*b)[2:]
} else if len(*b) > 1 && bytes.Index(*b, []byte("\x94")) == 0 { // Pickle protocol 4 or 5
*b = (*b)[1:]
}
return true
}
Expand All @@ -59,6 +61,14 @@ func pickleGetStr(buf *[]byte) (string, bool) {
return string(b[5 : 5+sLen]), true
}
}
} else if bytes.Index(b, []byte("\x8c")) == 0 { // Pickle protocol 4 or 5
if len(b) >= 2 {
sLen := int(uint8(b[1]))
if len(b) >= 2+sLen {
*buf = b[2+sLen:]
return string(b[2 : 2+sLen]), true
}
}
}
return "", false
}
Expand All @@ -72,29 +82,56 @@ func expectBytes(b *[]byte, v []byte) bool {
}
}

func protocolFourOrFiveFirstBytes(b *[]byte) bool {
// Parse and drop first 12 bytes of Pickle protocol 4 or 5, 12th byte must be "}"
if (bytes.Index(*b, []byte("\x80\x04")) == 0 ||
bytes.Index(*b, []byte("\x80\x05")) == 0) && bytes.Index((*b)[11:12], []byte("}")) == 0 {
*b = (*b)[12:]
return true
} else {
return false
}
}

var badErr error = fmt.Errorf("Bad pickle message")

// ParseCarbonlinkRequest from pickle encoded data
func ParseCarbonlinkRequest(d []byte) (*CarbonlinkRequest, error) {

if !(expectBytes(&d, []byte("\x80\x02}")) && pickleMaybeMemo(&d) && expectBytes(&d, []byte("("))) {
return nil, badErr
asciiPklMetricBytes := []byte("U\x06metric")
asciiPklTypeBytes := []byte("U\x04type")

var unicodePklMetricBytes, unicodePklTypeBytes []byte
if (expectBytes(&d, []byte("\x80\x02}")) ||
expectBytes(&d, []byte("\x80\x03}"))) && pickleMaybeMemo(&d) && expectBytes(&d, []byte("(")) {
// message is using pickle protocol 2 or 3.
// unicode bytes if Pickle request came from Python 3.0+
unicodePklMetricBytes = []byte("X\x06\x00\x00\x00metric")
unicodePklTypeBytes = []byte("X\x04\x00\x00\x00type")
} else if protocolFourOrFiveFirstBytes(&d) && pickleMaybeMemo(&d) && expectBytes(&d, []byte("(")) {
// message is using pickle protocol 4, or 5
unicodePklMetricBytes = []byte("\x8c\x06metric")
unicodePklTypeBytes = []byte("\x8c\x04type")
} else {
return nil, fmt.Errorf("Bad pickle message, unknown pickle protocol")
}

req := NewCarbonlinkRequest()

var Metric, Type string
var ok bool

if expectBytes(&d, []byte("U\x06metric")) {
if expectBytes(&d, asciiPklMetricBytes) || expectBytes(&d, unicodePklMetricBytes) {
if !pickleMaybeMemo(&d) {
return nil, badErr
}
if Metric, ok = pickleGetStr(&d); !ok {
return nil, badErr
}

if !(pickleMaybeMemo(&d) && expectBytes(&d, []byte("U\x04type")) && pickleMaybeMemo(&d)) {
if !(pickleMaybeMemo(&d) &&
(expectBytes(&d, asciiPklTypeBytes) || expectBytes(&d, unicodePklTypeBytes)) &&
pickleMaybeMemo(&d)) {
return nil, badErr
}

Expand All @@ -108,7 +145,8 @@ func ParseCarbonlinkRequest(d []byte) (*CarbonlinkRequest, error) {

req.Metric = Metric
req.Type = Type
} else if expectBytes(&d, []byte("U\x04type")) {

} else if expectBytes(&d, asciiPklTypeBytes) || expectBytes(&d, unicodePklTypeBytes) {
if !pickleMaybeMemo(&d) {
return nil, badErr
}
Expand All @@ -117,7 +155,9 @@ func ParseCarbonlinkRequest(d []byte) (*CarbonlinkRequest, error) {
return nil, badErr
}

if !(pickleMaybeMemo(&d) && expectBytes(&d, []byte("U\x06metric")) && pickleMaybeMemo(&d)) {
if !(pickleMaybeMemo(&d) &&
(expectBytes(&d, asciiPklMetricBytes) || expectBytes(&d, unicodePklMetricBytes)) &&
pickleMaybeMemo(&d)) {
return nil, badErr
}

Expand All @@ -131,6 +171,7 @@ func ParseCarbonlinkRequest(d []byte) (*CarbonlinkRequest, error) {

req.Metric = Metric
req.Type = Type

} else {
return nil, badErr
}
Expand Down
137 changes: 136 additions & 1 deletion cache/carbonlink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,19 @@ import (
"github.com/stretchr/testify/assert"
)

// Python 2.x ASCII protocol 2 Pickles
const sampleCacheQuery = "\x00\x00\x00Y\x80\x02}q\x01(U\x06metricq\x02U,carbon.agents.carbon_agent_server.cache.sizeq\x03U\x04typeq\x04U\x0bcache-queryq\x05u."
const sampleCacheQuery2 = "\x00\x00\x00Y\x80\x02}q\x01(U\x04typeq\x04U\x0bcache-queryq\x05U\x06metricq\x02U,carbon.agents.carbon_agent_server.param.sizeq\x03u."
const sampleCacheQuery3 = "\x00\x00\x00R\x80\x02}(U\x06metricX,\x00\x00\x00carbon.agents.carbon_agent_server.param.sizeU\x04typeU\x0bcache-queryu." // unicode metric
const sampleCacheQuery3 = "\x00\x00\x00R\x80\x02}(U\x06metricX,\x00\x00\x00carbon.agents.carbon_agent_server.param.sizeU\x04typeU\x0bcache-queryu." // unicode metric string, but not whole pickle message

// Full unicode pickle ( Python 3.0+ )
const unicodeQueryPklProtocol2 = "\x00\x00\x00h\x80\x02}q\x00(X\x04\x00\x00\x00typeq\x01X\x0b\x00\x00\x00cache-queryq\x02X\x06\x00\x00\x00metricq\x03X/\x00\x00\x00carbon.agents.carbon_agent_server.cache.metricsq\x04u."

// Pickles with protocols >2 ( Python 3.0 + )
const unicodeQueryPklProtocol3 = "\x00\x00\x00h\x80\x03}q\x00(X\x04\x00\x00\x00typeq\x01X\x0b\x00\x00\x00cache-queryq\x02X\x06\x00\x00\x00metricq\x03X/\x00\x00\x00carbon.agents.carbon_agent_server.cache.metricsq\x04u."
const unicodeQueryPklProtocol4 = "\x00\x00\x00e\x80\x04\x95Z\x00\x00\x00\x00\x00\x00\x00}\x94(\x8c\x06metric\x94\x8c4operations.mining.minerals.carbon.graphite.weight.kg\x94\x8c\x04type\x94\x8c\x0bcache-query\x94u."
const unicodeQueryPklProtocol5 = "\x00\x00\x00Y\x80\x05\x95Q\x00\x00\x00\x00\x00\x00\x00}\x94(\x8c\x04type\x94\x8c\x0bcache-query\x94\x8c\x06metric\x94\x8c+pogoda.goroda.dozhd.prodolzhitelnost.sekund\x94u."
const protocol4ContainsBracket = "\x00\x00\x00\x88\x80\x04\x95}\x00\x00\x00\x00\x00\x00\x00}\x94(\x8c\x04type\x94\x8c\x0bcache-query\x94\x8c\x06metric\x94\x8cWcarbon.agents.gc-dev-graphite-graphite-go-carbon-5b4bc456f8-nx57q.cache.queueBuildCount\x94u."

func TestCarbonlink(t *testing.T) {
assert := assert.New(t)
Expand All @@ -38,9 +48,44 @@ func TestCarbonlink(t *testing.T) {
1422795966,
)

msgUnicodePklProtocol2 := points.OnePoint(
"carbon.agents.carbon_agent_server.cache.metrics",
1234,
1582129201,
)

msgUnicodePklProtocol3 := points.OnePoint(
"carbon.agents.carbon_agent_server.cache.metrics",
5555,
1582215601,
)

msgUnicodePklProtocol4 := points.OnePoint(
"operations.mining.minerals.carbon.graphite.weight.kg",
5431,
1582950001,
)

msgUnicodePklProtocol5 := points.OnePoint(
"pogoda.goroda.dozhd.prodolzhitelnost.sekund",
3600,
1587356401,
)

msgProtocol4ContainsBracket := points.OnePoint(
"carbon.agents.gc-dev-graphite-graphite-go-carbon-5b4bc456f8-nx57q.cache.queueBuildCount",
25,
1587356901,
)

cache.Add(msg1)
cache.Add(msg2)
cache.Add(msg3)
cache.Add(msgUnicodePklProtocol2)
cache.Add(msgUnicodePklProtocol3)
cache.Add(msgUnicodePklProtocol4)
cache.Add(msgUnicodePklProtocol5)
cache.Add(msgProtocol4ContainsBracket)

defer cache.Stop()

Expand Down Expand Up @@ -135,6 +180,96 @@ func TestCarbonlink(t *testing.T) {
assert.Equal("\x80\x02}U\ndatapoints]s.", string(data))
cleanup()

/* Unicode Python 3.0+ Pickle Protocol 2 Message */
conn, cleanup = NewClient()

_, err = conn.Write([]byte(unicodeQueryPklProtocol2))
assert.NoError(err)

err = binary.Read(conn, binary.BigEndian, &replyLength)
assert.NoError(err)

data = make([]byte, replyLength)

err = binary.Read(conn, binary.BigEndian, data)
assert.NoError(err)

// {'datapoints': [(1582129201, 1234.0), (1582215601, 5555.0)]}
assert.Equal("\x80\x02}U\ndatapoints](J1`M^G@\x93H\x00\x00\x00\x00\x00\x86J\xb1\xb1N^G@\xb5\xb3\x00\x00\x00\x00\x00\x86es.", string(data))
cleanup()

/* Pickle Protocol 3 Message */
conn, cleanup = NewClient()

_, err = conn.Write([]byte(unicodeQueryPklProtocol3))
assert.NoError(err)

err = binary.Read(conn, binary.BigEndian, &replyLength)
assert.NoError(err)

data = make([]byte, replyLength)

err = binary.Read(conn, binary.BigEndian, data)
assert.NoError(err)

// {'datapoints': [(1582129201, 1234.0), (1582215601, 5555.0)]}
assert.Equal("\x80\x02}U\ndatapoints](J1`M^G@\x93H\x00\x00\x00\x00\x00\x86J\xb1\xb1N^G@\xb5\xb3\x00\x00\x00\x00\x00\x86es.", string(data))
cleanup()

/* Pickle Protocol 4 Message */
conn, cleanup = NewClient()

_, err = conn.Write([]byte(unicodeQueryPklProtocol4))
assert.NoError(err)

err = binary.Read(conn, binary.BigEndian, &replyLength)
assert.NoError(err)

data = make([]byte, replyLength)

err = binary.Read(conn, binary.BigEndian, data)
assert.NoError(err)

// {'datapoints': [(1582950001, 5431.0)]}
assert.Equal("\x80\x02}U\ndatapoints]Jq\xe6Y^G@\xb57\x00\x00\x00\x00\x00\x86as.", string(data))
cleanup()

/* Pickle Protocol 4 Message Contains '}' in first 12 bytes */
conn, cleanup = NewClient()

_, err = conn.Write([]byte(protocol4ContainsBracket))
assert.NoError(err)

err = binary.Read(conn, binary.BigEndian, &replyLength)
assert.NoError(err)

data = make([]byte, replyLength)

err = binary.Read(conn, binary.BigEndian, data)
assert.NoError(err)

// {'datapoints': [(1582950001, 5431.0)]}
assert.Equal("\x80\x02}U\ndatapoints]J\xe5$\x9d^G@9\x00\x00\x00\x00\x00\x00\x86as.", string(data))
cleanup()

/* Pickle Protocol 5 Message */
conn, cleanup = NewClient()

_, err = conn.Write([]byte(unicodeQueryPklProtocol5))
assert.NoError(err)

err = binary.Read(conn, binary.BigEndian, &replyLength)
assert.NoError(err)

data = make([]byte, replyLength)

err = binary.Read(conn, binary.BigEndian, data)
assert.NoError(err)

// {'datapoints': [(1587356401, 3600.0)]}
assert.Equal("\x80\x02}U\ndatapoints]J\xf1\"\x9d^G@\xac \x00\x00\x00\x00\x00\x86as.", string(data))
cleanup()

/* WRONG MESSAGE TEST */
conn, cleanup = NewClient()
_, err = conn.Write([]byte("\x00\x00\x00\x05aaaaa"))
Expand Down

0 comments on commit 0fdd9e5

Please sign in to comment.