Skip to content

Commit

Permalink
Adds subscriber service for creating/dropping subscriptions to the
Browse files Browse the repository at this point in the history
InfluxDB data stream.
  • Loading branch information
nathanielc committed Oct 9, 2015
1 parent 131d67b commit fb4486d
Show file tree
Hide file tree
Showing 23 changed files with 1,716 additions and 99 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- [#4348](https://github.com/influxdb/influxdb/pull/4348): Public ApplyTemplate function for graphite parser.
- [#4178](https://github.com/influxdb/influxdb/pull/4178): Support fields in graphite parser. Thanks @roobert!
- [#4291](https://github.com/influxdb/influxdb/pull/4291): Added ALTER DATABASE RENAME. Thanks @linearb
- [#4375](https://github.com/influxdb/influxdb/pull/4375): Add Subscriptions so data can be 'forked' out of InfluxDB to another third party.

### Bugfixes
- [#4166](https://github.com/influxdb/influxdb/pull/4166): Fix parser error on invalid SHOW
Expand Down
7 changes: 7 additions & 0 deletions cluster/points_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ type PointsWriter struct {
WriteShard(shardID, ownerID uint64, points []models.Point) error
}

Subscriber interface {
WritePoints(p *WritePointsRequest)
}

statMap *expvar.Map
}

Expand Down Expand Up @@ -233,6 +237,9 @@ func (w *PointsWriter) WritePoints(p *WritePointsRequest) error {
}(shardMappings.Shards[shardID], p.Database, p.RetentionPolicy, points)
}

// Fire and forget points to Subscriber
go w.Subscriber.WritePoints(p)

for range shardMappings.Points {
select {
case <-w.closing:
Expand Down
26 changes: 26 additions & 0 deletions cluster/points_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,19 @@ func TestPointsWriter_WritePoints(t *testing.T) {
return nil, nil
}
ms.NodeIDFn = func() uint64 { return 1 }

subCalled := make(chan *cluster.WritePointsRequest, 1)
sub := Subcriber{}
sub.WritePointsFn = func(p *cluster.WritePointsRequest) {
subCalled <- p
}

c := cluster.NewPointsWriter()
c.MetaStore = ms
c.ShardWriter = sw
c.TSDBStore = store
c.HintedHandoff = hh
c.Subscriber = sub

err := c.WritePoints(pr)
if err == nil && test.expErr != nil {
Expand All @@ -325,6 +333,16 @@ func TestPointsWriter_WritePoints(t *testing.T) {
if err != nil && test.expErr != nil && err.Error() != test.expErr.Error() {
t.Errorf("PointsWriter.WritePoints(): '%s' error: got %v, exp %v", test.name, err, test.expErr)
}
if test.expErr == nil {
select {
case p := <-subCalled:
if p != pr {
t.Errorf("PointsWriter.WritePoints(): '%s' error: unexpected WritePointsRequest got %v, exp %v", test.name, p, pr)
}
default:
t.Errorf("PointsWriter.WritePoints(): '%s' error: Subscriber.WritePoints not called", test.name)
}
}
}
}

Expand Down Expand Up @@ -406,6 +424,14 @@ func (m MetaStore) ShardOwner(shardID uint64) (string, string, *meta.ShardGroupI
return m.ShardOwnerFn(shardID)
}

type Subcriber struct {
WritePointsFn func(p *cluster.WritePointsRequest)
}

func (s Subcriber) WritePoints(p *cluster.WritePointsRequest) {
s.WritePointsFn(p)
}

func NewRetentionPolicy(name string, duration time.Duration, nodeCount int) *meta.RetentionPolicyInfo {
shards := []meta.ShardInfo{}
owners := []meta.ShardOwner{}
Expand Down
16 changes: 16 additions & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/influxdb/influxdb/services/precreator"
"github.com/influxdb/influxdb/services/retention"
"github.com/influxdb/influxdb/services/snapshotter"
"github.com/influxdb/influxdb/services/subscriber"
"github.com/influxdb/influxdb/services/udp"
"github.com/influxdb/influxdb/tcp"
"github.com/influxdb/influxdb/tsdb"
Expand Down Expand Up @@ -60,6 +61,7 @@ type Server struct {
ShardWriter *cluster.ShardWriter
ShardMapper *cluster.ShardMapper
HintedHandoff *hh.Service
Subscriber *subscriber.Service

Services []Service

Expand Down Expand Up @@ -127,13 +129,18 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
// Create the hinted handoff service
s.HintedHandoff = hh.NewService(c.HintedHandoff, s.ShardWriter, s.MetaStore)

// Create the Subscriber service
s.Subscriber = subscriber.NewService()
s.Subscriber.MetaStore = s.MetaStore

// Initialize points writer.
s.PointsWriter = cluster.NewPointsWriter()
s.PointsWriter.WriteTimeout = time.Duration(c.Cluster.WriteTimeout)
s.PointsWriter.MetaStore = s.MetaStore
s.PointsWriter.TSDBStore = s.TSDBStore
s.PointsWriter.ShardWriter = s.ShardWriter
s.PointsWriter.HintedHandoff = s.HintedHandoff
s.PointsWriter.Subscriber = s.Subscriber

// Initialize the monitor
s.Monitor.Version = s.buildInfo.Version
Expand Down Expand Up @@ -371,6 +378,11 @@ func (s *Server) Open() error {
return fmt.Errorf("open hinted handoff: %s", err)
}

// Open the subcriber service
if err := s.Subscriber.Open(); err != nil {
return fmt.Errorf("open subscriber: %s", err)
}

for _, service := range s.Services {
if err := service.Open(); err != nil {
return fmt.Errorf("open service: %s", err)
Expand Down Expand Up @@ -415,6 +427,10 @@ func (s *Server) Close() error {
s.HintedHandoff.Close()
}

if s.Subscriber != nil {
s.Subscriber.Close()
}

// Close the TSDBStore, no more reads or writes at this point
if s.TSDBStore != nil {
s.TSDBStore.Close()
Expand Down
65 changes: 55 additions & 10 deletions influxql/INFLUXQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,17 @@ _cpu_stats
## Keywords

```
ALL ALTER AS ASC BEGIN BY
CREATE CONTINUOUS DATABASE DATABASES DEFAULT DELETE
DESC DROP DURATION END EXISTS EXPLAIN
FIELD FROM GRANT GROUP IF IN
INNER INSERT INTO KEY KEYS LIMIT
SHOW MEASUREMENT MEASUREMENTS NOT OFFSET ON
ORDER PASSWORD POLICY POLICIES PRIVILEGES QUERIES
QUERY READ REPLICATION RETENTION REVOKE SELECT
SERIES SLIMIT SOFFSET TAG TO USER
USERS VALUES WHERE WITH WRITE
ALL ALTER ANY AS ASC BEGIN
BY CREATE CONTINUOUS DATABASE DATABASES DEFAULT
DELETE DESC DESTINATIONS DROP DURATION END
EXISTS EXPLAIN FIELD FROM GRANT GROUP
IF IN INNER INSERT INTO KEY
KEYS LIMIT SHOW MEASUREMENT MEASUREMENTS NOT
OFFSET ON ORDER PASSWORD POLICY POLICIES
PRIVILEGES QUERIES QUERY READ REPLICATION RETENTION
REVOKE SELECT SERIES SLIMIT SOFFSET SUBSCRIPTION
SUBSCRIPTIONS TAG TO USER USERS VALUES
WHERE WITH WRITE
```

## Literals
Expand Down Expand Up @@ -174,12 +175,14 @@ statement = alter_retention_policy_stmt |
create_database_stmt |
create_retention_policy_stmt |
create_user_stmt |
create_subscription_stmt |
delete_stmt |
drop_continuous_query_stmt |
drop_database_stmt |
drop_measurement_stmt |
drop_retention_policy_stmt |
drop_series_stmt |
drop_subscription_stmt |
drop_user_stmt |
grant_stmt |
show_continuous_queries_stmt |
Expand All @@ -189,6 +192,7 @@ statement = alter_retention_policy_stmt |
show_retention_policies |
show_series_stmt |
show_shards_stmt |
show_subscriptions_stmt|
show_tag_keys_stmt |
show_tag_values_stmt |
show_users_stmt |
Expand Down Expand Up @@ -292,6 +296,22 @@ CREATE RETENTION POLICY "10m.events" ON somedb DURATION 10m REPLICATION 2;
CREATE RETENTION POLICY "10m.events" ON somedb DURATION 10m REPLICATION 2 DEFAULT;
```

### CREATE SUBSCRIPTION

```
create_subscription_stmt = "CREATE SUBSCRIPTION" subscription_name "ON" db_name "." retention_policy "DESTINATIONS" ("ANY"|"ALL") host { "," host} .
```

#### Examples:

```sql
-- Create a SUBSCRIPTION on database 'mydb' and retention policy 'default' that send data to 'example.com:9090' via UDP.
CREATE SUBSCRIPTION sub0 ON "mydb"."default" DESTINATIONS ALL 'udp://example.com:9090' ;

-- Create a SUBSCRIPTION on database 'mydb' and retention policy 'default' that round robins the data to 'h1.example.com:9090' and 'h2.example.com:9090'.
CREATE SUBSCRIPTION sub0 ON "mydb"."default" DESTINATIONS ANY 'udp://h1.example.com:9090', 'udp://h2.example.com:9090';
```

### CREATE USER

```
Expand Down Expand Up @@ -382,6 +402,19 @@ drop_series_stmt = "DROP SERIES" [ from_clause ] [ where_clause ]

```

### DROP SUBSCRIPTION

```
drop_subscription_stmt = "DROP SUBSCRIPTION" subscription_name "ON" db_name "." retention_policy .
```

#### Example:

```sql
DROP SUBSCRIPTION sub0 ON "mydb"."default";

```

### DROP USER

```
Expand Down Expand Up @@ -502,6 +535,18 @@ show_shards_stmt = "SHOW SHARDS" .
SHOW SHARDS;
```

### SHOW SUBSCRIPTIONS

```
show_subscriptions_stmt = "SHOW SUBSCRIPTIONS" .
```

#### Example:

```sql
SHOW SUBSCRIPTIONS;
```

### SHOW TAG KEYS

```
Expand Down
65 changes: 65 additions & 0 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (*AlterRetentionPolicyStatement) node() {}
func (*CreateContinuousQueryStatement) node() {}
func (*CreateDatabaseStatement) node() {}
func (*CreateRetentionPolicyStatement) node() {}
func (*CreateSubscriptionStatement) node() {}
func (*CreateUserStatement) node() {}
func (*Distinct) node() {}
func (*DeleteStatement) node() {}
Expand All @@ -94,6 +95,7 @@ func (*DropMeasurementStatement) node() {}
func (*DropRetentionPolicyStatement) node() {}
func (*DropSeriesStatement) node() {}
func (*DropServerStatement) node() {}
func (*DropSubscriptionStatement) node() {}
func (*DropUserStatement) node() {}
func (*GrantStatement) node() {}
func (*GrantAdminStatement) node() {}
Expand All @@ -111,6 +113,7 @@ func (*ShowMeasurementsStatement) node() {}
func (*ShowSeriesStatement) node() {}
func (*ShowShardsStatement) node() {}
func (*ShowStatsStatement) node() {}
func (*ShowSubscriptionsStatement) node() {}
func (*ShowDiagnosticsStatement) node() {}
func (*ShowTagKeysStatement) node() {}
func (*ShowTagValuesStatement) node() {}
Expand Down Expand Up @@ -194,6 +197,7 @@ func (*AlterRetentionPolicyStatement) stmt() {}
func (*CreateContinuousQueryStatement) stmt() {}
func (*CreateDatabaseStatement) stmt() {}
func (*CreateRetentionPolicyStatement) stmt() {}
func (*CreateSubscriptionStatement) stmt() {}
func (*CreateUserStatement) stmt() {}
func (*DeleteStatement) stmt() {}
func (*DropContinuousQueryStatement) stmt() {}
Expand All @@ -202,6 +206,7 @@ func (*DropMeasurementStatement) stmt() {}
func (*DropRetentionPolicyStatement) stmt() {}
func (*DropSeriesStatement) stmt() {}
func (*DropServerStatement) stmt() {}
func (*DropSubscriptionStatement) stmt() {}
func (*DropUserStatement) stmt() {}
func (*GrantStatement) stmt() {}
func (*GrantAdminStatement) stmt() {}
Expand All @@ -215,6 +220,7 @@ func (*ShowRetentionPoliciesStatement) stmt() {}
func (*ShowSeriesStatement) stmt() {}
func (*ShowShardsStatement) stmt() {}
func (*ShowStatsStatement) stmt() {}
func (*ShowSubscriptionsStatement) stmt() {}
func (*ShowDiagnosticsStatement) stmt() {}
func (*ShowTagKeysStatement) stmt() {}
func (*ShowTagValuesStatement) stmt() {}
Expand Down Expand Up @@ -2128,6 +2134,65 @@ func (s *ShowDiagnosticsStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}}
}

// CreateSubscriptionStatement represents a command to add a subscription to the incoming data stream
type CreateSubscriptionStatement struct {
Name string
Database string
RetentionPolicy string
Destinations []string
Mode string
}

// String returns a string representation of the CreateSubscriptionStatement.
func (s *CreateSubscriptionStatement) String() string {
var destinations bytes.Buffer
for i, dest := range s.Destinations {
if i != 0 {
destinations.Write([]byte(`, `))
}
destinations.Write([]byte(`'`))
destinations.Write([]byte(dest))
destinations.Write([]byte(`'`))
}
return fmt.Sprintf(`CREATE SUBSCRIPTION "%s" ON "%s"."%s" DESTINATIONS %s %s `, s.Name, s.Database, s.RetentionPolicy, s.Mode, string(destinations.Bytes()))
}

// RequiredPrivileges returns the privilege required to execute a CreateSubscriptionStatement
func (s *CreateSubscriptionStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}}
}

// DropSubscriptionStatement represents a command to drop a subscription to the incoming data stream.
type DropSubscriptionStatement struct {
Name string
Database string
RetentionPolicy string
}

// String returns a string representation of the DropSubscriptionStatement.
func (s *DropSubscriptionStatement) String() string {
return fmt.Sprintf(`DROP SUBSCRIPTION "%s" ON "%s"."%s"`, s.Name, s.Database, s.RetentionPolicy)
}

// RequiredPrivileges returns the privilege required to execute a DropSubscriptionStatement
func (s *DropSubscriptionStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}}
}

// ShowSubscriptionsStatement represents a command to show a list of subscriptions.
type ShowSubscriptionsStatement struct {
}

// String returns a string representation of the ShowSubscriptionStatement.
func (s *ShowSubscriptionsStatement) String() string {
return "SHOW SUBSCRIPTIONS"
}

// RequiredPrivileges returns the privilege required to execute a ShowSubscriptionStatement
func (s *ShowSubscriptionsStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}}
}

// ShowTagKeysStatement represents a command for listing tag keys.
type ShowTagKeysStatement struct {
// Data sources that fields are extracted from.
Expand Down
Loading

0 comments on commit fb4486d

Please sign in to comment.