Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev #109

Merged
merged 8 commits into from
Oct 19, 2023
Merged

Dev #109

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions .github/values/dev.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
volumes:
- name: kafka-cert
secret:
secretName: chainindexing.kafka.dev.tls.astranet.services
volumeMounts:
- name: kafka-cert
mountPath: /certs/chainindexing.kafka.dev
specAnnotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8081"
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ jobs:
with:
workloadName: astra-indexing-worker-dev
replicas: "1"
kafkaCert: "true"
kafkaEnv: "dev"
secrets: inherit
4 changes: 3 additions & 1 deletion bootstrap/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ type KafkaService struct {
User string `yaml:"user" toml:"user" xml:"user" json:"user,omitempty"`
Password string `yaml:"password" toml:"password" xml:"password" json:"password,omitempty"`
AuthenticationType string `yaml:"authentication_type" toml:"authentication_type" xml:"authentication_type" json:"authentication_type,omitempty"`
Env string `yaml:"env" toml:"env" xml:"env" json:"env,omitempty"`
CaCertPath string `yaml:"ca_cert_path" toml:"ca_cert_path" xml:"ca_cert_path" json:"ca_cert_path,omitempty"`
TlsCertPath string `yaml:"tls_cert_path" toml:"tls_cert_path" xml:"tls_cert_path" json:"tls_cert_path,omitempty"`
TlsKeyPath string `yaml:"tls_key_path" toml:"tls_key_path" xml:"tls_key_path" json:"tls_key_path,omitempty"`
}

type Blockchain struct {
Expand Down
14 changes: 11 additions & 3 deletions cmd/astra-indexing/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ type CLIConfig struct {
KafkaUser string
KafkaPassword string
KafkaAuthenticationType string
KafkaEnv string
CaCertPath string
TlsCertPath string
TlsKeyPath string
}

func OverrideByCLIConfig(config *config.Config, cliConfig *CLIConfig) {
Expand Down Expand Up @@ -135,7 +137,13 @@ func OverrideByCLIConfig(config *config.Config, cliConfig *CLIConfig) {
if cliConfig.KafkaAuthenticationType != "" {
config.KafkaService.AuthenticationType = cliConfig.KafkaAuthenticationType
}
if cliConfig.KafkaEnv != "" {
config.KafkaService.Env = cliConfig.KafkaEnv
if cliConfig.CaCertPath != "" {
config.KafkaService.CaCertPath = cliConfig.CaCertPath
}
if cliConfig.TlsCertPath != "" {
config.KafkaService.TlsCertPath = cliConfig.TlsCertPath
}
if cliConfig.TlsKeyPath != "" {
config.KafkaService.TlsKeyPath = cliConfig.TlsKeyPath
}
}
26 changes: 21 additions & 5 deletions cmd/astra-indexing/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,19 @@ func run(args []string) error {
EnvVars: []string{"KAFKA_AUTHEN_TYPE"},
},
&cli.StringFlag{
Name: "kafkaEnv",
Usage: "Kafka Env",
EnvVars: []string{"KAFKA_ENV"},
Name: "caCertPath",
Usage: "Ca Cert Path",
EnvVars: []string{"KAFKA_CA_CERT_PATH"},
},
&cli.StringFlag{
Name: "tlsCertPath",
Usage: "Tls Cert Path",
EnvVars: []string{"KAFKA_TLS_CERT_PATH"},
},
&cli.StringFlag{
Name: "tlsKeyPath",
Usage: "Tls Key Path",
EnvVars: []string{"KAFKA_TLS_KEY_PATH"},
},
},
Action: func(ctx *cli.Context) error {
Expand Down Expand Up @@ -243,8 +253,14 @@ func run(args []string) error {
if ctx.IsSet("kafkaAuthenticationType") {
cliConfig.KafkaAuthenticationType = ctx.String("kafkaAuthenticationType")
}
if ctx.IsSet("kafkaEnv") {
cliConfig.KafkaEnv = ctx.String("kafkaEnv")
if ctx.IsSet("caCertPath") {
cliConfig.CaCertPath = ctx.String("caCertPath")
}
if ctx.IsSet("tlsCertPath") {
cliConfig.TlsCertPath = ctx.String("tlsCertPath")
}
if ctx.IsSet("tlsKeyPath") {
cliConfig.TlsKeyPath = ctx.String("tlsKeyPath")
}

OverrideByCLIConfig(&config, &cliConfig)
Expand Down
21 changes: 14 additions & 7 deletions infrastructure/kafka/consumer/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ type Consumer[T any] struct {
Password string
AuthenticationType string
Sigchan chan os.Signal
Env string
CaCertPath string
TlsCertPath string
TlsKeyPath string
}

func (c *Consumer[T]) CreateConnection() error {
Expand Down Expand Up @@ -147,14 +149,19 @@ func (c *Consumer[T]) getDialer() (*kafka.Dialer, error) {
}
return dialer, nil
case "SSL":
tlsCertPath := utils.TLS_CERT_PATH
tlsKeyPath := utils.TLS_KEY_PATH
caCertPath := utils.CA_CERT_PATH
if c.CaCertPath != "" {
caCertPath = c.CaCertPath
}

if c.Env == "dev" {
tlsCertPath = utils.TLS_CERT_PATH_DEV
tlsKeyPath = utils.TLS_KEY_PATH_DEV
caCertPath = utils.CA_CERT_PATH_DEV
tlsCertPath := utils.TLS_CERT_PATH
if c.TlsCertPath != "" {
tlsCertPath = c.TlsCertPath
}

tlsKeyPath := utils.TLS_KEY_PATH
if c.TlsKeyPath != "" {
tlsKeyPath = c.TlsKeyPath
}

keypair, err := tls.LoadX509KeyPair(
Expand Down
4 changes: 3 additions & 1 deletion infrastructure/kafka/consumer/worker/evm_txs_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ func RunEvmTxsConsumer(rdbHandle *rdb.Handle, config *config.Config, logger appl
Password: config.KafkaService.Password,
AuthenticationType: config.KafkaService.AuthenticationType,
Sigchan: sigchan,
Env: config.KafkaService.Env,
CaCertPath: config.KafkaService.CaCertPath,
TlsCertPath: config.KafkaService.TlsCertPath,
TlsKeyPath: config.KafkaService.TlsKeyPath,
}
errConn := evmTxsConsumer.CreateConnection()
if errConn != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ func RunInternalTxsConsumer(rdbHandle *rdb.Handle, config *config.Config, logger
Password: config.KafkaService.Password,
AuthenticationType: config.KafkaService.AuthenticationType,
Sigchan: sigchan,
Env: config.KafkaService.Env,
CaCertPath: config.KafkaService.CaCertPath,
TlsCertPath: config.KafkaService.TlsCertPath,
TlsKeyPath: config.KafkaService.TlsKeyPath,
}
errConn := internalTxsConsumer.CreateConnection()
if errConn != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func RunTokenTransfersConsumer(rdbHandle *rdb.Handle, config *config.Config, log
Password: config.KafkaService.Password,
AuthenticationType: config.KafkaService.AuthenticationType,
Sigchan: sigchan,
Env: config.KafkaService.Env,
CaCertPath: config.KafkaService.CaCertPath,
TlsCertPath: config.KafkaService.TlsCertPath,
TlsKeyPath: config.KafkaService.TlsKeyPath,
}
errConn := tokenTransfersConsumer.CreateConnection()
if errConn != nil {
Expand Down
4 changes: 0 additions & 4 deletions infrastructure/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ const CA_CERT_PATH = "/certs/chainindexing.kafka.prod/ca.crt"
const TLS_CERT_PATH = "/certs/chainindexing.kafka.prod/tls.crt"
const TLS_KEY_PATH = "/certs/chainindexing.kafka.prod/tls.key"

const CA_CERT_PATH_DEV = "/certs/chainindexing.kafka.dev/ca.crt"
const TLS_CERT_PATH_DEV = "/certs/chainindexing.kafka.dev/tls.crt"
const TLS_KEY_PATH_DEV = "/certs/chainindexing.kafka.dev/tls.key"

const EVM_TXS_TOPIC = "evm-txs"
const INTERNAL_TXS_TOPIC = "internal-txs"
const TOKEN_TRANSFERS_TOPIC = "token-transfers"
Loading