Skip to content

Commit

Permalink
Don't Panic on Missing Dirs
Browse files Browse the repository at this point in the history
Issue: 1753
Figure out what's missing, create it, and keep going.
  • Loading branch information
rothrock committed Mar 6, 2015
1 parent f957036 commit 771ef1c
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 33 deletions.
65 changes: 36 additions & 29 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,16 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
if config == nil {
config = NewConfig()
}
initializing := !fileExists(config.BrokerDir()) && !fileExists(config.DataDir())

var initBroker, initServer bool
if initBroker = !fileExists(config.BrokerDir()); initBroker {
log.Printf("Broker directory missing. Need to create a broker.")
}

if initServer = !fileExists(config.DataDir()); initServer {
log.Printf("Data directory missing. Need to create data directory.")
}
initServer = initServer || initBroker

This comment has been minimized.

Copy link
@sitano

sitano Mar 17, 2015

Actually, it is convenient to have this flag in the code with such a simple edge conditions. But it is really impossible to detect what is wrong with the influxd if those dirs existed but empty before first run. The only message you will have is: log closed. Raft log opener will unable to detect server is not initializing but configuration is missing in place. It is worth of adding the check into raft.log.open() and influxdb.run.openBroker near if (initialization) happening.

May be it is better to separate logic and checks of initialization of raft log and server broker. Those dirs existing check is not covering all cases anyway.


// Parse join urls from the --join flag.
var joinURLs []*url.URL
Expand All @@ -42,7 +51,7 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
}

// Open broker, initialize or join as necessary.
b := openBroker(config.BrokerDir(), config.BrokerURL(), initializing, joinURLs, logWriter)
b := openBroker(config.BrokerDir(), config.BrokerURL(), initBroker, joinURLs, logWriter)

// Start the broker handler.
var h *Handler
Expand All @@ -65,7 +74,7 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
}

// Open server, initialize or join as necessary.
s := openServer(config, b, initializing, configExists, joinURLs, logWriter)
s := openServer(config, b, initServer, initBroker, configExists, joinURLs, logWriter)
s.SetAuthenticationEnabled(config.Authentication.Enabled)

// Enable retention policy enforcement if requested.
Expand Down Expand Up @@ -155,8 +164,11 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B

// unless disabled, start the loop to report anonymous usage stats every 24h
if !config.ReportingDisabled {
clusterID := b.Broker.Log().Config().ClusterID
go s.StartReportingLoop(version, clusterID)
// Make sure we have a config object b4 we try to use it.
if configObj := b.Broker.Log().Config(); configObj != nil {
clusterID := configObj.ClusterID
go s.StartReportingLoop(version, clusterID)
}
}

return b.Broker, s
Expand All @@ -169,7 +181,7 @@ func writePIDFile(path string) {
}

// Ensure the required directory structure exists.
err := os.MkdirAll(filepath.Dir(path), 0700)
err := os.MkdirAll(filepath.Dir(path), 0755)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -204,11 +216,6 @@ func parseConfig(path, hostname string) *Config {

// creates and initializes a broker.
func openBroker(path string, u *url.URL, initializing bool, joinURLs []*url.URL, w io.Writer) *influxdb.Broker {
// Ignore if there's no existing broker and we're not initializing or joining.
if !fileExists(path) && !initializing && len(joinURLs) == 0 {
return nil
}

// Create broker.
b := influxdb.NewBroker()
b.SetLogOutput(w)
Expand All @@ -217,7 +224,7 @@ func openBroker(path string, u *url.URL, initializing bool, joinURLs []*url.URL,
log.Fatalf("failed to open broker: %s", err)
}

// If this is a new broker then we can initialie two ways:
// If this is a new broker then we can initialize two ways:
// 1) Start a brand new cluster.
// 2) Join an existing cluster.
if initializing {
Expand Down Expand Up @@ -253,12 +260,7 @@ func joinBroker(b *influxdb.Broker, joinURLs []*url.URL) {
}

// creates and initializes a server.
func openServer(config *Config, b *influxdb.Broker, initializing, configExists bool, joinURLs []*url.URL, w io.Writer) *influxdb.Server {
// Ignore if there's no existing server and we're not initializing or joining.
if !fileExists(config.Data.Dir) && !initializing && len(joinURLs) == 0 {
return nil
}

func openServer(config *Config, b *influxdb.Broker, initServer, initBroker, configExists bool, joinURLs []*url.URL, w io.Writer) *influxdb.Server {
// Create and open the server.
s := influxdb.NewServer()
s.SetLogOutput(w)
Expand All @@ -272,14 +274,15 @@ func openServer(config *Config, b *influxdb.Broker, initializing, configExists b
}

// If the server is uninitialized then initialize or join it.
if initializing {
if initServer {
if len(joinURLs) == 0 {
initializeServer(config.DataURL(), s, b, w)
initializeServer(config.DataURL(), s, b, w, initBroker)
} else {
joinServer(s, config.DataURL(), joinURLs)
openServerClient(s, joinURLs, w)
}
} else if !configExists {
}

if !configExists {
// We are spining up a server that has no config,
// but already has an initialized data directory
joinURLs = []*url.URL{b.URL()}
Expand All @@ -297,12 +300,14 @@ func openServer(config *Config, b *influxdb.Broker, initializing, configExists b
}

// initializes a new server that does not yet have an ID.
func initializeServer(u *url.URL, s *influxdb.Server, b *influxdb.Broker, w io.Writer) {
func initializeServer(u *url.URL, s *influxdb.Server, b *influxdb.Broker, w io.Writer, initBroker bool) {
// TODO: Create replica using the messaging client.

// Create replica on broker.
if err := b.CreateReplica(1, u); err != nil {
log.Fatalf("replica creation error: %s", err)
if initBroker {
// Create replica on broker.
if err := b.CreateReplica(1, u); err != nil {
log.Fatalf("replica creation error: %s", err)
}
}

// Create messaging client.
Expand All @@ -315,9 +320,11 @@ func initializeServer(u *url.URL, s *influxdb.Server, b *influxdb.Broker, w io.W
log.Fatalf("set client error: %s", err)
}

// Initialize the server.
if err := s.Initialize(b.URL()); err != nil {
log.Fatalf("server initialization error: %s", err)
if initBroker {
// Initialize the server.
if err := s.Initialize(b.URL()); err != nil {
log.Fatalf("server initialization error: %s", err)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion messaging/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ func (t *topic) open() error {
assert(t.file == nil, "topic already open: %d", t.id)

// Ensure the parent directory exists.
if err := os.MkdirAll(filepath.Dir(t.path), 0700); err != nil {
if err := os.MkdirAll(filepath.Dir(t.path), 0755); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (l *Log) Open(path string) error {
}

// Create directory, if not exists.
if err := os.MkdirAll(path, 0700); err != nil {
if err := os.MkdirAll(path, 0755); err != nil {
return err
}
l.path = path
Expand Down
4 changes: 2 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ func (s *Server) Open(path string) error {
s.path = path

// Create required directories.
if err := os.MkdirAll(path, 0700); err != nil {
if err := os.MkdirAll(path, 0755); err != nil {
return err
}
if err := os.MkdirAll(filepath.Join(path, "shards"), 0700); err != nil {
if err := os.MkdirAll(filepath.Join(path, "shards"), 0755); err != nil {
return err
}

Expand Down

0 comments on commit 771ef1c

Please sign in to comment.