Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wire up drop series parsing #1629

Merged
merged 32 commits into from
Feb 22, 2015
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
fd9c19a
wire up drop series parsing
corylanou Feb 17, 2015
918c5c8
one more err test condition for drop series
corylanou Feb 17, 2015
27d7f45
support drop series without from but with where
corylanou Feb 18, 2015
6370a4e
wip in progress for actually dropping the series data
corylanou Feb 18, 2015
7cfc2a5
wip
corylanou Feb 20, 2015
6d5be3d
wip. compiles now. tests next
corylanou Feb 20, 2015
8306057
actually drop shard data
corylanou Feb 20, 2015
c2daa18
first test for dropping series
corylanou Feb 20, 2015
4948b62
first working version of drop series. more testing to come
corylanou Feb 20, 2015
495cedb
move where we unlock
corylanou Feb 20, 2015
0d6b761
accidentally lost this in my rebase
corylanou Feb 21, 2015
697e972
fixing rebase conflicts
corylanou Feb 21, 2015
5ab3731
delete -> drop for series naming
corylanou Feb 21, 2015
9a6e29e
collect all series IDs then unlock
dgnorton Feb 21, 2015
61352f7
pass all series IDs to Server.DropSeries at once
dgnorton Feb 21, 2015
6114c81
refactoring drop series
corylanou Feb 21, 2015
2752ada
moving more responsibility to database.dropSeries
corylanou Feb 21, 2015
5df9726
fix deleting from crazy map
corylanou Feb 22, 2015
7d74cca
addressing nit
corylanou Feb 22, 2015
c575ee9
fixing a fat fingering rebase
corylanou Feb 22, 2015
d35e2a9
locking is now done in process messages
corylanou Feb 22, 2015
da10fa8
in depth drop series test
corylanou Feb 22, 2015
b53d02f
delete correct map
corylanou Feb 22, 2015
d5ac049
remove bogus method
corylanou Feb 22, 2015
63b6719
better basic drop series test
corylanou Feb 22, 2015
3e94c14
no need to resort
corylanou Feb 22, 2015
d059b3e
update test name
corylanou Feb 22, 2015
113fcea
no need to cursor
corylanou Feb 22, 2015
29910c3
better transaction batching for drop series
corylanou Feb 22, 2015
8637a10
start with seriesByMeasurement
corylanou Feb 22, 2015
c8afd62
deleteing a key that does not exist is ok
corylanou Feb 22, 2015
0f42be3
check that the store exists
corylanou Feb 22, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions database.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,54 @@ func (m *Measurement) addSeries(s *Series) bool {
return true
}

// removeSeries will remove a series from the measurementIndex. Returns true if already removed
func (m *Measurement) dropSeries(seriesID uint32) bool {
if _, ok := m.seriesByID[seriesID]; !ok {
return true
}
s := m.seriesByID[seriesID]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these two lookups of seriesByID not be combined? I think it would be more pleasing if there were.

tagset := string(marshalTags(s.Tags))

delete(m.series, tagset)
delete(m.seriesByID, seriesID)

var ids []uint32
for _, id := range m.seriesIDs {
if id != seriesID {
ids = append(ids, id)
}
}
m.seriesIDs = ids

// remove this series id to the tag index on the measurement
// s.seriesByTagKeyValue is defined as map[string]map[string]seriesIDs
for k, v := range m.seriesByTagKeyValue {
values := v
for kk, vv := range values {
var ids []uint32
for _, id := range vv {
if id != seriesID {
ids = append(ids, id)
}
}
// Check to see if we have any ids, if not, remove the key
if len(ids) == 0 {
delete(values, kk)
} else {
values[kk] = ids
}
}
// If we have no values, then we delete the key
if len(values) == 0 {
delete(m.seriesByTagKeyValue, k)
} else {
m.seriesByTagKeyValue[k] = values
}
}

return true
}

// seriesByTags returns the Series that matches the given tagset.
func (m *Measurement) seriesByTags(tags map[string]string) *Series {
return m.series[string(marshalTags(tags))]
Expand Down Expand Up @@ -1002,6 +1050,17 @@ func (rp *RetentionPolicy) shardGroupByID(shardID uint64) *ShardGroup {
return nil
}

// dropSeries will delete all data with the seriesID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this done by Retention Policy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that is the only way to walk through to get the shards to delete the series. Is there a faster way to isolate the shards I need?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, retention policy is one of the right places to have it. If you're dropping a series, you should go from database -> retention policy -> shard group -> shard. Each of those will have a method dropSeries

func (rp *RetentionPolicy) dropSeries(seriesID uint32) error {
for _, g := range rp.shardGroups {
err := g.dropSeries(seriesID)
if err != nil {
return err
}
}
return nil
}

func (rp *RetentionPolicy) removeShardGroupByID(shardID uint64) {
for i, g := range rp.shardGroups {
if g.ID == shardID {
Expand Down Expand Up @@ -1075,6 +1134,35 @@ func (db *database) addSeriesToIndex(measurementName string, s *Series) bool {
return idx.addSeries(s)
}

// dropSeries removes the series from the in memory references
func (db *database) dropSeries(seriesByMeasurement map[string][]uint32) error {
for measurement, ids := range seriesByMeasurement {
for _, id := range ids {
// if the series is already gone, return
if db.series[id] == nil {
continue
}

delete(db.series, id)

// Remove series information from measurements
m := db.measurements[measurement]
if !m.dropSeries(id) {
return fmt.Errorf("failed to remove series id %d from measurment %q", id, m.Name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the dropSeries method on measurement doesn't return an error, but a bool. Just ignore if its already been dropped. That's fine, it's an idempotent operation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't throw an error. If it was already removed, we're ok with that, just continue on the loop

}

// Remove shard data
for _, rp := range db.policies {
if err := rp.dropSeries(id); err != nil {
return err
}
}
}
}

return nil
}

// createMeasurementIfNotExists will either add a measurement object to the index or return the existing one.
func (db *database) createMeasurementIfNotExists(name string) *Measurement {
idx := db.measurements[name]
Expand Down
21 changes: 21 additions & 0 deletions httpd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,27 @@ func TestHandler_ShowContinuousQueries(t *testing.T) {

}

func TestHandler_DropSeries(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
s := NewHTTPServer(srvr)
defer s.Close()

status, _ := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}]}`)

if status != http.StatusOK {
t.Fatalf("unexpected status: %d", status)
}

query := map[string]string{"db": "foo", "q": "DROP SERIES FROM cpu"}
status, _ = MustHTTP("GET", s.URL+`/query`, query, nil, "")

if status != http.StatusOK {
t.Fatalf("unexpected status: %d", status)
}
}

func TestHandler_serveWriteSeries(t *testing.T) {
srvr := OpenAuthenticatedServer(NewMessagingClient())
srvr.CreateDatabase("foo")
Expand Down
30 changes: 28 additions & 2 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,11 +985,37 @@ func (s *ShowSeriesStatement) RequiredPrivileges() ExecutionPrivileges {

// DropSeriesStatement represents a command for removing a series from the database.
type DropSeriesStatement struct {
Name string
// The Id of the series being dropped (optional)
SeriesID uint32

// Data source that fields are extracted from (optional)
Source Source

// An expression evaluated on data point (optional)
Condition Expr
}

// String returns a string representation of the drop series statement.
func (s *DropSeriesStatement) String() string { return fmt.Sprintf("DROP SERIES %s", s.Name) }
func (s *DropSeriesStatement) String() string {
var buf bytes.Buffer
i, _ := buf.WriteString("DROP SERIES")

if s.Source != nil {
_, _ = buf.WriteString(" FROM ")
_, _ = buf.WriteString(s.Source.String())
}
if s.Condition != nil {
_, _ = buf.WriteString(" WHERE ")
_, _ = buf.WriteString(s.Condition.String())
}

// If we haven't written any data since the initial statement, then this was a SeriesID statement
if len(buf.String()) == i {
_, _ = buf.WriteString(fmt.Sprintf(" %d", s.SeriesID))
}

return buf.String()
}

// RequiredPrivileges returns the privilige reqired to execute a DropSeriesStatement.
func (s DropSeriesStatement) RequiredPrivileges() ExecutionPrivileges {
Expand Down
25 changes: 21 additions & 4 deletions influxql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,14 +859,31 @@ func (p *Parser) parseShowFieldKeysStatement() (*ShowFieldKeysStatement, error)
// This function assumes the "DROP SERIES" tokens have already been consumed.
func (p *Parser) parseDropSeriesStatement() (*DropSeriesStatement, error) {
stmt := &DropSeriesStatement{}
var err error

// Read the name of the series to drop.
lit, err := p.parseIdent()
if err != nil {
if tok, _, _ := p.scanIgnoreWhitespace(); tok == FROM {
// Parse source.
if stmt.Source, err = p.parseSource(); err != nil {
return nil, err
}
} else {
p.unscan()
}

// Parse condition: "WHERE EXPR".
if stmt.Condition, err = p.parseCondition(); err != nil {
return nil, err
}
stmt.Name = lit

// If they didn't provide a FROM or a WHERE, they need to provide the SeriesID
if stmt.Condition == nil && stmt.Source == nil {
var id int
id, err = p.parseInt(0, math.MaxUint32)
if err != nil {
return nil, err
}
stmt.SeriesID = uint32(id)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have thought of this earlier when we were discussing syntax but it needs to support the syntax I mentioned in the comment above, on the String() method.

return stmt, nil
}

Expand Down
89 changes: 86 additions & 3 deletions influxql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,33 @@ func TestParser_ParseStatement(t *testing.T) {

// DROP SERIES statement
{
s: `DROP SERIES myseries`,
stmt: &influxql.DropSeriesStatement{Name: "myseries"},
s: `DROP SERIES 1`,
stmt: &influxql.DropSeriesStatement{SeriesID: 1},
},
{
s: `DROP SERIES FROM src`,
stmt: &influxql.DropSeriesStatement{Source: &influxql.Measurement{Name: "src"}},
},
{
s: `DROP SERIES WHERE host = 'hosta.influxdb.org'`,
stmt: &influxql.DropSeriesStatement{
Condition: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "host"},
RHS: &influxql.StringLiteral{Val: "hosta.influxdb.org"},
},
},
},
{
s: `DROP SERIES FROM src WHERE host = 'hosta.influxdb.org'`,
stmt: &influxql.DropSeriesStatement{
Source: &influxql.Measurement{Name: "src"},
Condition: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "host"},
RHS: &influxql.StringLiteral{Val: "hosta.influxdb.org"},
},
},
},

// SHOW CONTINUOUS QUERIES statement
Expand Down Expand Up @@ -604,7 +629,9 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `DELETE`, err: `found EOF, expected FROM at line 1, char 8`},
{s: `DELETE FROM`, err: `found EOF, expected identifier at line 1, char 13`},
{s: `DELETE FROM myseries WHERE`, err: `found EOF, expected identifier, string, number, bool at line 1, char 28`},
{s: `DROP SERIES`, err: `found EOF, expected identifier at line 1, char 13`},
{s: `DROP SERIES`, err: `found EOF, expected number at line 1, char 13`},
{s: `DROP SERIES FROM`, err: `found EOF, expected identifier at line 1, char 18`},
{s: `DROP SERIES FROM src WHERE`, err: `found EOF, expected identifier, string, number, bool at line 1, char 28`},
{s: `SHOW CONTINUOUS`, err: `found EOF, expected QUERIES at line 1, char 17`},
{s: `SHOW RETENTION`, err: `found EOF, expected POLICIES at line 1, char 16`},
{s: `SHOW RETENTION POLICIES`, err: `found EOF, expected identifier at line 1, char 25`},
Expand Down Expand Up @@ -933,6 +960,62 @@ func TestQuoteIdent(t *testing.T) {
}
}

// Ensure DropSeriesStatement can convert to a string
func TestDropSeriesStatement_String(t *testing.T) {
var tests = []struct {
s string
stmt influxql.Statement
}{
{
s: `DROP SERIES 1`,
stmt: &influxql.DropSeriesStatement{SeriesID: 1},
},
{
s: `DROP SERIES FROM src`,
stmt: &influxql.DropSeriesStatement{Source: &influxql.Measurement{Name: "src"}},
},
{
s: `DROP SERIES FROM src WHERE host = 'hosta.influxdb.org'`,
stmt: &influxql.DropSeriesStatement{
Source: &influxql.Measurement{Name: "src"},
Condition: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "host"},
RHS: &influxql.StringLiteral{Val: "hosta.influxdb.org"},
},
},
},
{
s: `DROP SERIES FROM src WHERE host = 'hosta.influxdb.org'`,
stmt: &influxql.DropSeriesStatement{
Source: &influxql.Measurement{Name: "src"},
Condition: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "host"},
RHS: &influxql.StringLiteral{Val: "hosta.influxdb.org"},
},
},
},
{
s: `DROP SERIES WHERE host = 'hosta.influxdb.org'`,
stmt: &influxql.DropSeriesStatement{
Condition: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "host"},
RHS: &influxql.StringLiteral{Val: "hosta.influxdb.org"},
},
},
},
}

for _, test := range tests {
s := test.stmt.String()
if s != test.s {
t.Errorf("error rendering string. expected %s, actual: %s", test.s, s)
}
}
}

func BenchmarkParserParseStatement(b *testing.B) {
b.ReportAllocs()
s := `SELECT field FROM "series" WHERE value > 10`
Expand Down
19 changes: 16 additions & 3 deletions metastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/binary"
"fmt"
"time"
"unsafe"

"github.com/boltdb/bolt"
)
Expand Down Expand Up @@ -231,15 +230,29 @@ func (tx *metatx) createSeries(database, name string, tags map[string]string) (*

// store the tag map for the series
s := &Series{ID: uint32(id), Tags: tags}
idBytes := make([]byte, 4)
*(*uint32)(unsafe.Pointer(&idBytes[0])) = uint32(id)
idBytes := u32tob(uint32(id))

if err := b.Put(idBytes, mustMarshalJSON(s)); err != nil {
return nil, err
}
return s, nil
}

// dropSeries removes all seriesIDS for a given database/measurement
func (tx *metatx) dropSeries(database string, seriesByMeasurement map[string][]uint32) error {
for measurement, ids := range seriesByMeasurement {
b := tx.Bucket([]byte("Databases")).Bucket([]byte(database)).Bucket([]byte("Series")).Bucket([]byte(measurement))
if b != nil {
for _, id := range ids {
if err := b.Delete(u32tob(id)); err != nil {
return err
}
}
}
}
return nil
}

// loops through all the measurements and series in a database
func (tx *metatx) indexDatabase(db *database) {
// get the bucket that holds series data for the database
Expand Down
Loading