Skip to content

Commit

Permalink
Merge pull request #1874 from influxdb/1753
Browse files Browse the repository at this point in the history
Don't Panic on Missing Dirs
  • Loading branch information
rothrock committed Mar 6, 2015
2 parents 58ef65e + 9b63eed commit 98aee8c
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 34 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
## v0.9.0-rc9 [2015-03-06]

### Bugfixes

- [1872](https://github.com/influxdb/influxdb/pull/1872): Fix "stale term" errors with raft
- [#1867](https://github.com/influxdb/influxdb/pull/1867): Fix race accessing topic replicas map
- [#1864](https://github.com/influxdb/influxdb/pull/1864): fix race in startStateLoop
- [#1753] (https://github.com/influxdb/influxdb/pull/1874): Do Not Panic on Missing Dirs

## v0.9.0-rc8 [2015-03-05]

Expand Down
65 changes: 36 additions & 29 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,16 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
if config == nil {
config = NewConfig()
}
initializing := !fileExists(config.BrokerDir()) && !fileExists(config.DataDir())

var initBroker, initServer bool
if initBroker = !fileExists(config.BrokerDir()); initBroker {
log.Printf("Broker directory missing. Need to create a broker.")
}

if initServer = !fileExists(config.DataDir()); initServer {
log.Printf("Data directory missing. Need to create data directory.")
}
initServer = initServer || initBroker

// Parse join urls from the --join flag.
var joinURLs []*url.URL
Expand All @@ -42,7 +51,7 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
}

// Open broker, initialize or join as necessary.
b := openBroker(config.BrokerDir(), config.BrokerURL(), initializing, joinURLs, logWriter)
b := openBroker(config.BrokerDir(), config.BrokerURL(), initBroker, joinURLs, logWriter)

// Start the broker handler.
var h *Handler
Expand All @@ -65,7 +74,7 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
}

// Open server, initialize or join as necessary.
s := openServer(config, b, initializing, configExists, joinURLs, logWriter)
s := openServer(config, b, initServer, initBroker, configExists, joinURLs, logWriter)
s.SetAuthenticationEnabled(config.Authentication.Enabled)

// Enable retention policy enforcement if requested.
Expand Down Expand Up @@ -158,8 +167,11 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B

// unless disabled, start the loop to report anonymous usage stats every 24h
if !config.ReportingDisabled {
clusterID := b.Broker.Log().Config().ClusterID
go s.StartReportingLoop(version, clusterID)
// Make sure we have a config object b4 we try to use it.
if configObj := b.Broker.Log().Config(); configObj != nil {
clusterID := configObj.ClusterID
go s.StartReportingLoop(version, clusterID)
}
}

return b.Broker, s
Expand All @@ -172,7 +184,7 @@ func writePIDFile(path string) {
}

// Ensure the required directory structure exists.
err := os.MkdirAll(filepath.Dir(path), 0700)
err := os.MkdirAll(filepath.Dir(path), 0755)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -207,11 +219,6 @@ func parseConfig(path, hostname string) *Config {

// creates and initializes a broker.
func openBroker(path string, u *url.URL, initializing bool, joinURLs []*url.URL, w io.Writer) *influxdb.Broker {
// Ignore if there's no existing broker and we're not initializing or joining.
if !fileExists(path) && !initializing && len(joinURLs) == 0 {
return nil
}

// Create broker.
b := influxdb.NewBroker()
b.SetLogOutput(w)
Expand All @@ -220,7 +227,7 @@ func openBroker(path string, u *url.URL, initializing bool, joinURLs []*url.URL,
log.Fatalf("failed to open broker: %s", err)
}

// If this is a new broker then we can initialie two ways:
// If this is a new broker then we can initialize two ways:
// 1) Start a brand new cluster.
// 2) Join an existing cluster.
if initializing {
Expand Down Expand Up @@ -256,12 +263,7 @@ func joinBroker(b *influxdb.Broker, joinURLs []*url.URL) {
}

// creates and initializes a server.
func openServer(config *Config, b *influxdb.Broker, initializing, configExists bool, joinURLs []*url.URL, w io.Writer) *influxdb.Server {
// Ignore if there's no existing server and we're not initializing or joining.
if !fileExists(config.Data.Dir) && !initializing && len(joinURLs) == 0 {
return nil
}

func openServer(config *Config, b *influxdb.Broker, initServer, initBroker, configExists bool, joinURLs []*url.URL, w io.Writer) *influxdb.Server {
// Create and open the server.
s := influxdb.NewServer()
s.SetLogOutput(w)
Expand All @@ -276,14 +278,15 @@ func openServer(config *Config, b *influxdb.Broker, initializing, configExists b
}

// If the server is uninitialized then initialize or join it.
if initializing {
if initServer {
if len(joinURLs) == 0 {
initializeServer(config.DataURL(), s, b, w)
initializeServer(config.DataURL(), s, b, w, initBroker)
} else {
joinServer(s, config.DataURL(), joinURLs)
openServerClient(s, joinURLs, w)
}
} else if !configExists {
}

if !configExists {
// We are spining up a server that has no config,
// but already has an initialized data directory
joinURLs = []*url.URL{b.URL()}
Expand All @@ -301,12 +304,14 @@ func openServer(config *Config, b *influxdb.Broker, initializing, configExists b
}

// initializes a new server that does not yet have an ID.
func initializeServer(u *url.URL, s *influxdb.Server, b *influxdb.Broker, w io.Writer) {
func initializeServer(u *url.URL, s *influxdb.Server, b *influxdb.Broker, w io.Writer, initBroker bool) {
// TODO: Create replica using the messaging client.

// Create replica on broker.
if err := b.CreateReplica(1, u); err != nil {
log.Fatalf("replica creation error: %s", err)
if initBroker {
// Create replica on broker.
if err := b.CreateReplica(1, u); err != nil {
log.Fatalf("replica creation error: %s", err)
}
}

// Create messaging client.
Expand All @@ -319,9 +324,11 @@ func initializeServer(u *url.URL, s *influxdb.Server, b *influxdb.Broker, w io.W
log.Fatalf("set client error: %s", err)
}

// Initialize the server.
if err := s.Initialize(b.URL()); err != nil {
log.Fatalf("server initialization error: %s", err)
if initBroker {
// Initialize the server.
if err := s.Initialize(b.URL()); err != nil {
log.Fatalf("server initialization error: %s", err)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion messaging/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ func (t *topic) open() error {
assert(t.file == nil, "topic already open: %d", t.id)

// Ensure the parent directory exists.
if err := os.MkdirAll(filepath.Dir(t.path), 0700); err != nil {
if err := os.MkdirAll(filepath.Dir(t.path), 0755); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (l *Log) Open(path string) error {
}

// Create directory, if not exists.
if err := os.MkdirAll(path, 0700); err != nil {
if err := os.MkdirAll(path, 0755); err != nil {
return err
}
l.path = path
Expand Down
4 changes: 2 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,10 @@ func (s *Server) Open(path string) error {
s.path = path

// Create required directories.
if err := os.MkdirAll(path, 0700); err != nil {
if err := os.MkdirAll(path, 0755); err != nil {
return err
}
if err := os.MkdirAll(filepath.Join(path, "shards"), 0700); err != nil {
if err := os.MkdirAll(filepath.Join(path, "shards"), 0755); err != nil {
return err
}

Expand Down

0 comments on commit 98aee8c

Please sign in to comment.