Skip to content

Commit

Permalink
Add ability to choose chunk time interval and wrap Pgx Connections
Browse files Browse the repository at this point in the history
The new wrap will allow for better testing coverage
  • Loading branch information
Blagoj Atanasovski authored and atanasovskib committed Aug 8, 2019
1 parent 975d891 commit 3cd49a4
Show file tree
Hide file tree
Showing 27 changed files with 522 additions and 254 deletions.
47 changes: 17 additions & 30 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ language: go

services:
- docker

env:
global:
- GO111MODULE=on
jobs:
include:
- stage: test
Expand All @@ -14,14 +16,11 @@ jobs:
- docker run -d --name ts1 -p 5433:5432 -e POSTGRES_PASSWORD=postgres timescale/timescaledb
# Run specific version of influx
- docker run -d --name influx1_0 -p 8086:8086 influxdb:1.0
install:
# Setup dependency management tool
- curl -L -s https://github.com/golang/dep/releases/download/v0.5.0/dep-linux-amd64 -o $GOPATH/bin/dep
- chmod +x $GOPATH/bin/dep
- dep ensure
install:
- echo 'do not run default install step'
script:
# Unit and integration tests
- go test -race -tags=integration -coverprofile=coverage.txt -covermode=atomic ./...
- GO111MODULE=on go test -race -tags=integration -coverprofile=coverage.txt -covermode=atomic ./...
after_success:
- bash <(curl -s https://codecov.io/bash)
- stage: test
Expand All @@ -33,14 +32,11 @@ jobs:
- docker run -d --name ts1 -p 5433:5432 -e POSTGRES_PASSWORD=postgres timescale/timescaledb
# Run specific version of influx
- docker run -d --name influx1_5 -p 8086:8086 influxdb:1.5
install:
# Setup dependency management tool
- curl -L -s https://github.com/golang/dep/releases/download/v0.5.0/dep-linux-amd64 -o $GOPATH/bin/dep
- chmod +x $GOPATH/bin/dep
- dep ensure
install:
- echo 'do not run default install step'
script:
# Unit and integration tests
- go test -race -tags=integration -coverprofile=coverage.txt -covermode=atomic ./...
- GO111MODULE=on go test -race -tags=integration -coverprofile=coverage.txt -covermode=atomic ./...
after_success:
- bash <(curl -s https://codecov.io/bash)
- stage: test
Expand All @@ -52,14 +48,11 @@ jobs:
- docker run -d --name ts1 -p 5433:5432 -e POSTGRES_PASSWORD=postgres timescale/timescaledb
# Run specific version of influx
- docker run -d --name influx1_6 -p 8086:8086 influxdb:1.6
install:
# Setup dependency management tool
- curl -L -s https://github.com/golang/dep/releases/download/v0.5.0/dep-linux-amd64 -o $GOPATH/bin/dep
- chmod +x $GOPATH/bin/dep
- dep ensure
install:
- echo 'do not run default install step'
script:
# Unit and integration tests
- go test -race -tags=integration -coverprofile=coverage.txt -covermode=atomic ./...
- GO111MODULE=on go test -race -tags=integration -coverprofile=coverage.txt -covermode=atomic ./...
after_success:
- bash <(curl -s https://codecov.io/bash)
- stage: test
Expand All @@ -72,13 +65,10 @@ jobs:
# Run specific version of influx
- docker run -d --name influx1_7 -p 8086:8086 influxdb:1.7
install:
# Setup dependency management tool
- curl -L -s https://github.com/golang/dep/releases/download/v0.5.0/dep-linux-amd64 -o $GOPATH/bin/dep
- chmod +x $GOPATH/bin/dep
- dep ensure
- echo 'do not run default install step'
script:
# Unit and integration tests
- go test -race -tags=integration -coverprofile=coverage.txt -covermode=atomic ./...
- GO111MODULE=on go test -race -tags=integration -coverprofile=coverage.txt -covermode=atomic ./...
after_success:
- bash <(curl -s https://codecov.io/bash)
- stage: test
Expand All @@ -90,13 +80,10 @@ jobs:
- docker run -d --name ts1 -p 5433:5432 -e POSTGRES_PASSWORD=postgres timescale/timescaledb
# Run specific version of influx
- docker run -d --name influx_l -p 8086:8086 influxdb
install:
# Setup dependency management tool
- curl -L -s https://github.com/golang/dep/releases/download/v0.5.0/dep-linux-amd64 -o $GOPATH/bin/dep
- chmod +x $GOPATH/bin/dep
- dep ensure
install:
- echo 'do not run default install step'
script:
# Unit and integration tests
- go test -race -tags=integration -coverprofile=coverage.txt -covermode=atomic ./...
- GO111MODULE=on go test -race -tags=integration -coverprofile=coverage.txt -covermode=atomic ./...
after_success:
- bash <(curl -s https://codecov.io/bash)
5 changes: 3 additions & 2 deletions cmd/outflux/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

influx "github.com/influxdata/influxdb/client/v2"
"github.com/jackc/pgx"
"github.com/spf13/cobra"
"github.com/timescale/outflux/internal/cli"
"github.com/timescale/outflux/internal/cli/flagparsers"
Expand Down Expand Up @@ -56,6 +55,8 @@ func initMigrateCmd() *cobra.Command {
migrateCmd.PersistentFlags().String(flagparsers.FieldsColumnFlag, flagparsers.DefaultFieldsColumn, "When "+flagparsers.FieldsAsJSONFlag+" is set, this column specifies the name of the JSON column for the fields")
migrateCmd.PersistentFlags().String(flagparsers.OutputSchemaFlag, flagparsers.DefaultOutputSchema, "The schema of the output database that the data will be inserted into")
migrateCmd.PersistentFlags().Bool(flagparsers.MultishardIntFloatCast, flagparsers.DefaultMultishardIntFloatCast, "If a field is Int64 in one shard, and Float64 in another, with this flag it will be cast to Float64 despite possible data loss")
migrateCmd.PersistentFlags().String(flagparsers.ChunkTimeIntervalFlag, flagparsers.DefaultChunkTimeInterval, "chunk_time_interval of the hypertables created by Outflux")

return migrateCmd
}

Expand Down Expand Up @@ -163,7 +164,7 @@ func preparePipeErrors(errors []error) error {
return fmt.Errorf(errString)
}

func openConnections(app *appContext, connArgs *cli.ConnectionConfig) (influx.Client, *pgx.Conn, error) {
func openConnections(app *appContext, connArgs *cli.ConnectionConfig) (influx.Client, connections.PgxWrap, error) {
influxConn, err := app.ics.NewConnection(influxConnParams(connArgs))
if err != nil {
return nil, nil, fmt.Errorf("could not open connection to Influx Server\n%v", err)
Expand Down
9 changes: 4 additions & 5 deletions cmd/outflux/mocks_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

influx "github.com/influxdata/influxdb/client/v2"
"github.com/jackc/pgx"
"github.com/timescale/outflux/internal/cli"
"github.com/timescale/outflux/internal/connections"
"github.com/timescale/outflux/internal/pipeline"
Expand All @@ -20,7 +19,7 @@ type mockService struct {
inflSchemMngr schemamanagement.SchemaManager
}

func (m *mockService) Create(infConn influx.Client, tsConn *pgx.Conn, measure, inputDb string, conf *cli.MigrationConfig) (pipeline.Pipe, error) {
func (m *mockService) Create(infConn influx.Client, tsConn connections.PgxWrap, measure, inputDb string, conf *cli.MigrationConfig) (pipeline.Pipe, error) {
return m.pipe, m.pipeErr
}

Expand All @@ -32,16 +31,16 @@ func (m *mockService) Influx(c influx.Client, db, rp string, convertIntToFloat b
return m.inflSchemMngr
}

func (m *mockService) TimeScale(dbConn *pgx.Conn, schema string) schemamanagement.SchemaManager {
func (m *mockService) TimeScale(dbConn connections.PgxWrap, schema, chunkInterval string) schemamanagement.SchemaManager {
return nil
}

type mockTsConnSer struct {
tsConn *pgx.Conn
tsConn connections.PgxWrap
tsConnErr error
}

func (m *mockTsConnSer) NewConnection(connStr string) (*pgx.Conn, error) {
func (m *mockTsConnSer) NewConnection(connStr string) (connections.PgxWrap, error) {
return m.tsConn, m.tsConnErr
}

Expand Down
5 changes: 3 additions & 2 deletions cmd/outflux/schema_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"time"

influx "github.com/influxdata/influxdb/client/v2"
"github.com/jackc/pgx"
"github.com/timescale/outflux/internal/cli"
"github.com/timescale/outflux/internal/connections"

"github.com/spf13/cobra"
"github.com/timescale/outflux/internal/cli/flagparsers"
Expand Down Expand Up @@ -44,6 +44,7 @@ func initSchemaTransferCmd() *cobra.Command {
schemaTransferCmd.PersistentFlags().String(flagparsers.FieldsColumnFlag, flagparsers.DefaultFieldsColumn, "When "+flagparsers.FieldsAsJSONFlag+" is set, this column specifies the name of the JSON column for the fields")
schemaTransferCmd.PersistentFlags().String(flagparsers.OutputSchemaFlag, flagparsers.DefaultOutputSchema, "The schema of the output database that the data will be inserted into")
schemaTransferCmd.PersistentFlags().Bool(flagparsers.MultishardIntFloatCast, flagparsers.DefaultMultishardIntFloatCast, "If a field is Int64 in one shard, and Float64 in another, with this flag it will be cast to Float64 despite possible data loss")
schemaTransferCmd.PersistentFlags().String(flagparsers.ChunkTimeIntervalFlag, flagparsers.DefaultChunkTimeInterval, "chunk_time_interval of the hypertables created by Outflux")
return schemaTransferCmd
}

Expand Down Expand Up @@ -101,7 +102,7 @@ func transfer(
inputDb string,
args *cli.MigrationConfig,
infConn influx.Client,
pgConn *pgx.Conn,
pgConn connections.PgxWrap,
measure string) error {

pipe, err := app.pipeService.Create(infConn, pgConn, measure, inputDb, args)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ require (
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 // indirect
github.com/spf13/cobra v0.0.3
github.com/spf13/pflag v1.0.3
github.com/stretchr/testify v1.2.2
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMe
github.com/containerd/continuity v0.0.0-20181027224239-bea7585dbfac/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y=
github.com/coreos/bbolt v1.3.1-coreos.6/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/denisenkom/go-mssqldb v0.0.0-20181014144952-4e0d7dc8888f/go.mod h1:xN/JuLBIz4bjkxNmByTiV1IbhfnYb6oo99phBn4Eqhc=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
Expand Down Expand Up @@ -197,6 +198,7 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/term v0.0.0-20180730021639-bffc007b7fd5/go.mod h1:eCbImbZ95eXtAUIbLAuAVnBnwf83mjf6QIVH8SHYwqQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.0.0-20171201122222-661e31bf844d/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
Expand Down Expand Up @@ -230,7 +232,9 @@ github.com/stevvooe/resumable v0.0.0-20180830230917-22b14a53ba50/go.mod h1:1pdIZ
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/tcnksm/go-input v0.0.0-20180404061846-548a7d7a8ee8/go.mod h1:IlWNj9v/13q7xFbaK4mbyzMNwrZLaWSHx/aibKIZuIg=
github.com/testcontainers/testcontainer-go v0.0.0-20181115231424-8e868ca12c0f/go.mod h1:SrG3IY071gtmZJjGbKO+POJ57a/MMESerYNWt6ZRtKs=
github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
Expand Down
2 changes: 2 additions & 0 deletions internal/cli/flagparsers/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
TagsColumnFlag = "tags-column"
FieldsAsJSONFlag = "fields-as-json"
FieldsColumnFlag = "fields-column"
ChunkTimeIntervalFlag = "chunk-time-interval"
// InfluxDB can have different data types for the same field accross
// different shards. If a field is discovered with an Int64 and a Float64 type
// and this flag is TRUE it will allow the field to be converted to float,
Expand All @@ -56,4 +57,5 @@ const (
DefaultFieldsAsJSON = false
DefaultFieldsColumn = "fields"
DefaultMultishardIntFloatCast = false
DefaultChunkTimeInterval = ""
)
3 changes: 2 additions & 1 deletion internal/cli/flagparsers/migrate_args_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func FlagsToMigrateConfig(flags *pflag.FlagSet, args []string) (*cli.ConnectionC
outputSchema, _ := flags.GetString(OutputSchemaFlag)
rp, _ := flags.GetString(RetentionPolicyFlag)
intToFloat, _ := flags.GetBool(MultishardIntFloatCast)

chunkTimeInterval, _ := flags.GetString(ChunkTimeIntervalFlag)
migrateArgs := &cli.MigrationConfig{
RetentionPolicy: rp,
OutputSchemaStrategy: strategy,
Expand All @@ -100,6 +100,7 @@ func FlagsToMigrateConfig(flags *pflag.FlagSet, args []string) (*cli.ConnectionC
FieldsAsJSON: fieldsAsJSON,
FieldsCol: fieldsColumn,
OnConflictConvertIntToFloat: intToFloat,
ChunkTimeInterval: chunkTimeInterval,
}

return connectionArgs, migrateArgs, nil
Expand Down
2 changes: 2 additions & 0 deletions internal/cli/flagparsers/schema_transfer_args_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func FlagsToSchemaTransferConfig(flags *pflag.FlagSet, args []string) (*cli.Conn
}
outputSchema, _ := flags.GetString(OutputSchemaFlag)
intToFloat, _ := flags.GetBool(MultishardIntFloatCast)
chunkTimeInterval, _ := flags.GetString(ChunkTimeIntervalFlag)
return connectionArgs, &cli.MigrationConfig{
RetentionPolicy: retentionPolicy,
OutputSchema: outputSchema,
Expand All @@ -52,5 +53,6 @@ func FlagsToSchemaTransferConfig(flags *pflag.FlagSet, args []string) (*cli.Conn
FieldsAsJSON: fieldsAsJSON,
FieldsCol: fieldsColumn,
OnConflictConvertIntToFloat: intToFloat,
ChunkTimeInterval: chunkTimeInterval,
}, nil
}
1 change: 1 addition & 0 deletions internal/cli/ingestion_conf_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ func (s *defaultIngestionConfCreator) create(pipeID string, conf *MigrationConfi
CommitStrategy: conf.CommitStrategy,
SchemaStrategy: conf.OutputSchemaStrategy,
Schema: conf.OutputSchema,
ChunkTimeInterval: conf.ChunkTimeInterval,
}
}
1 change: 1 addition & 0 deletions internal/cli/migration_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ type MigrationConfig struct {
FieldsAsJSON bool
FieldsCol string
OnConflictConvertIntToFloat bool
ChunkTimeInterval string
}
6 changes: 3 additions & 3 deletions internal/cli/pipe_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"fmt"

influx "github.com/influxdata/influxdb/client/v2"
"github.com/jackc/pgx"

"github.com/timescale/outflux/internal/connections"
"github.com/timescale/outflux/internal/extraction"
"github.com/timescale/outflux/internal/ingestion"
"github.com/timescale/outflux/internal/pipeline"
Expand All @@ -17,7 +17,7 @@ const (

// PipeService defines methods for creating pipelines
type PipeService interface {
Create(infConn influx.Client, pgConn *pgx.Conn, measure, inputDb string, conf *MigrationConfig) (pipeline.Pipe, error)
Create(infConn influx.Client, pgConn connections.PgxWrap, measure, inputDb string, conf *MigrationConfig) (pipeline.Pipe, error)
}

type pipeService struct {
Expand All @@ -42,7 +42,7 @@ func NewPipeService(
}
}

func (s *pipeService) Create(infConn influx.Client, tsConn *pgx.Conn, measure, inputDb string, conf *MigrationConfig) (pipeline.Pipe, error) {
func (s *pipeService) Create(infConn influx.Client, tsConn connections.PgxWrap, measure, inputDb string, conf *MigrationConfig) (pipeline.Pipe, error) {
pipeID := fmt.Sprintf(pipeIDTemplate, measure)
extractionConf := s.extractionConfCreator.create(pipeID, inputDb, measure, conf)
ingestionConf := s.ingestionConfCreator.create(pipeID, conf)
Expand Down
5 changes: 3 additions & 2 deletions internal/cli/pipe_service_create_elements.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ import (
"fmt"

influx "github.com/influxdata/influxdb/client/v2"
"github.com/jackc/pgx"
"github.com/timescale/outflux/internal/extraction"
extrConfig "github.com/timescale/outflux/internal/extraction/config"
"github.com/timescale/outflux/internal/ingestion"
ingConfig "github.com/timescale/outflux/internal/ingestion/config"
"github.com/timescale/outflux/internal/connections"

)

func (p *pipeService) createElements(
infConn influx.Client,
tsConn *pgx.Conn,
tsConn connections.PgxWrap,
extrConf *extrConfig.ExtractionConfig,
ingConf *ingConfig.IngestorConfig) (extraction.Extractor, ingestion.Ingestor, error) {
extractor, err := p.extractorService.InfluxExtractor(infConn, extrConf)
Expand Down
37 changes: 37 additions & 0 deletions internal/connections/pgx_wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package connections

import "github.com/jackc/pgx"

// PgxWrap represents a wrapper interface around pgx.Conn, for easier testing.
type PgxWrap interface {
Begin() (*pgx.Tx, error)
CopyFrom(tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int, error)
Exec(sql string, arguments ...interface{}) (commandTag pgx.CommandTag, err error)
Query(sql string, args ...interface{}) (*pgx.Rows, error)
Close() error
}

type defaultPgxWrapper struct {
db *pgx.Conn
}

// NewPgxWrapper creates a new pgx.Conn wrapper.
func NewPgxWrapper(db *pgx.Conn) PgxWrap {
return &defaultPgxWrapper{db}
}

func (d *defaultPgxWrapper) CopyFrom(tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int, error) {
return d.db.CopyFrom(tableName, columnNames, rowSrc)
}
func (d *defaultPgxWrapper) Exec(sql string, arguments ...interface{}) (commandTag pgx.CommandTag, err error) {
return d.db.Exec(sql, arguments...)
}
func (d *defaultPgxWrapper) Query(sql string, args ...interface{}) (*pgx.Rows, error) {
return d.db.Query(sql, args...)
}
func (d *defaultPgxWrapper) Close() error {
return d.db.Close()
}
func (d *defaultPgxWrapper) Begin() (*pgx.Tx, error) {
return d.db.Begin()
}
11 changes: 8 additions & 3 deletions internal/connections/timescale_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

// TSConnectionService creates new timescale db connections
type TSConnectionService interface {
NewConnection(connectionString string) (*pgx.Conn, error)
NewConnection(connectionString string) (PgxWrap, error)
}

type defaultTSConnectionService struct{}
Expand All @@ -19,7 +19,7 @@ func NewTSConnectionService() TSConnectionService {
return &defaultTSConnectionService{}
}

func (s *defaultTSConnectionService) NewConnection(connectionString string) (*pgx.Conn, error) {
func (s *defaultTSConnectionService) NewConnection(connectionString string) (PgxWrap, error) {
log.Printf("Overriding PG environment variables for connection with: %s", connectionString)
envConnConfig, err := pgx.ParseEnvLibpq()
if err != nil {
Expand All @@ -38,5 +38,10 @@ func (s *defaultTSConnectionService) NewConnection(connectionString string) (*pg
}

connConfig = envConnConfig.Merge(connConfig)
return pgx.Connect(connConfig)
pgxConn, err := pgx.Connect(connConfig)
if err != nil {
return nil, err
}

return NewPgxWrapper(pgxConn), nil
}
Loading

0 comments on commit 3cd49a4

Please sign in to comment.