-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Don't Panic on Missing Dirs #1874
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,7 +31,16 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B | |
if config == nil { | ||
config = NewConfig() | ||
} | ||
initializing := !fileExists(config.BrokerDir()) && !fileExists(config.DataDir()) | ||
|
||
var initBroker, initServer bool | ||
if initBroker = !fileExists(config.BrokerDir()); initBroker { | ||
log.Printf("Broker directory missing. Need to create a broker.") | ||
} | ||
|
||
if initServer = !fileExists(config.DataDir()); initServer { | ||
log.Printf("Data directory missing. Need to create data directory.") | ||
} | ||
initServer = initServer || initBroker | ||
|
||
// Parse join urls from the --join flag. | ||
var joinURLs []*url.URL | ||
|
@@ -42,7 +51,7 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B | |
} | ||
|
||
// Open broker, initialize or join as necessary. | ||
b := openBroker(config.BrokerDir(), config.BrokerURL(), initializing, joinURLs, logWriter) | ||
b := openBroker(config.BrokerDir(), config.BrokerURL(), initBroker, joinURLs, logWriter) | ||
|
||
// Start the broker handler. | ||
var h *Handler | ||
|
@@ -65,7 +74,7 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B | |
} | ||
|
||
// Open server, initialize or join as necessary. | ||
s := openServer(config, b, initializing, configExists, joinURLs, logWriter) | ||
s := openServer(config, b, initServer, initBroker, configExists, joinURLs, logWriter) | ||
s.SetAuthenticationEnabled(config.Authentication.Enabled) | ||
|
||
// Enable retention policy enforcement if requested. | ||
|
@@ -158,8 +167,11 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B | |
|
||
// unless disabled, start the loop to report anonymous usage stats every 24h | ||
if !config.ReportingDisabled { | ||
clusterID := b.Broker.Log().Config().ClusterID | ||
go s.StartReportingLoop(version, clusterID) | ||
// Make sure we have a config object b4 we try to use it. | ||
if configObj := b.Broker.Log().Config(); configObj != nil { | ||
clusterID := configObj.ClusterID | ||
go s.StartReportingLoop(version, clusterID) | ||
} | ||
} | ||
|
||
return b.Broker, s | ||
|
@@ -172,7 +184,7 @@ func writePIDFile(path string) { | |
} | ||
|
||
// Ensure the required directory structure exists. | ||
err := os.MkdirAll(filepath.Dir(path), 0700) | ||
err := os.MkdirAll(filepath.Dir(path), 0755) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
|
@@ -207,11 +219,6 @@ func parseConfig(path, hostname string) *Config { | |
|
||
// creates and initializes a broker. | ||
func openBroker(path string, u *url.URL, initializing bool, joinURLs []*url.URL, w io.Writer) *influxdb.Broker { | ||
// Ignore if there's no existing broker and we're not initializing or joining. | ||
if !fileExists(path) && !initializing && len(joinURLs) == 0 { | ||
return nil | ||
} | ||
|
||
// Create broker. | ||
b := influxdb.NewBroker() | ||
b.SetLogOutput(w) | ||
|
@@ -220,7 +227,7 @@ func openBroker(path string, u *url.URL, initializing bool, joinURLs []*url.URL, | |
log.Fatalf("failed to open broker: %s", err) | ||
} | ||
|
||
// If this is a new broker then we can initialie two ways: | ||
// If this is a new broker then we can initialize two ways: | ||
// 1) Start a brand new cluster. | ||
// 2) Join an existing cluster. | ||
if initializing { | ||
|
@@ -256,12 +263,7 @@ func joinBroker(b *influxdb.Broker, joinURLs []*url.URL) { | |
} | ||
|
||
// creates and initializes a server. | ||
func openServer(config *Config, b *influxdb.Broker, initializing, configExists bool, joinURLs []*url.URL, w io.Writer) *influxdb.Server { | ||
// Ignore if there's no existing server and we're not initializing or joining. | ||
if !fileExists(config.Data.Dir) && !initializing && len(joinURLs) == 0 { | ||
return nil | ||
} | ||
|
||
func openServer(config *Config, b *influxdb.Broker, initServer, initBroker, configExists bool, joinURLs []*url.URL, w io.Writer) *influxdb.Server { | ||
// Create and open the server. | ||
s := influxdb.NewServer() | ||
s.SetLogOutput(w) | ||
|
@@ -276,14 +278,15 @@ func openServer(config *Config, b *influxdb.Broker, initializing, configExists b | |
} | ||
|
||
// If the server is uninitialized then initialize or join it. | ||
if initializing { | ||
if initServer { | ||
if len(joinURLs) == 0 { | ||
initializeServer(config.DataURL(), s, b, w) | ||
initializeServer(config.DataURL(), s, b, w, initBroker) | ||
} else { | ||
joinServer(s, config.DataURL(), joinURLs) | ||
openServerClient(s, joinURLs, w) | ||
} | ||
} else if !configExists { | ||
} | ||
|
||
if !configExists { | ||
// We are spining up a server that has no config, | ||
// but already has an initialized data directory | ||
joinURLs = []*url.URL{b.URL()} | ||
|
@@ -301,12 +304,14 @@ func openServer(config *Config, b *influxdb.Broker, initializing, configExists b | |
} | ||
|
||
// initializes a new server that does not yet have an ID. | ||
func initializeServer(u *url.URL, s *influxdb.Server, b *influxdb.Broker, w io.Writer) { | ||
func initializeServer(u *url.URL, s *influxdb.Server, b *influxdb.Broker, w io.Writer, initBroker bool) { | ||
// TODO: Create replica using the messaging client. | ||
|
||
// Create replica on broker. | ||
if err := b.CreateReplica(1, u); err != nil { | ||
log.Fatalf("replica creation error: %s", err) | ||
if initBroker { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A server needs to create replicas on the broker regardless of whether this node is initializing as a broker. In other words, this won't work if the node is running as a server only. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does every node need to have a raft directory? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, only Broker nodes. |
||
// Create replica on broker. | ||
if err := b.CreateReplica(1, u); err != nil { | ||
log.Fatalf("replica creation error: %s", err) | ||
} | ||
} | ||
|
||
// Create messaging client. | ||
|
@@ -319,9 +324,11 @@ func initializeServer(u *url.URL, s *influxdb.Server, b *influxdb.Broker, w io.W | |
log.Fatalf("set client error: %s", err) | ||
} | ||
|
||
// Initialize the server. | ||
if err := s.Initialize(b.URL()); err != nil { | ||
log.Fatalf("server initialization error: %s", err) | ||
if initBroker { | ||
// Initialize the server. | ||
if err := s.Initialize(b.URL()); err != nil { | ||
log.Fatalf("server initialization error: %s", err) | ||
} | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How could this happen? The
nil
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, it's what was causing the panics initially seen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, I think I've defended this from ever actually happening.