Skip to content

Commit

Permalink
Merge pull request #2881 from build-trust/davide-baldo/encrypt-specif…
Browse files Browse the repository at this point in the history
…ic-fields
  • Loading branch information
rockwotj authored Sep 21, 2024
2 parents a44acad + 6306c6f commit df46e90
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 5 deletions.
11 changes: 11 additions & 0 deletions docs/modules/components/pages/inputs/ockam_kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ input:
allow_producer: self
relay: "" # No default (optional)
node_address: 127.0.0.1:6262
encrypted_fields: []
```
--
Expand Down Expand Up @@ -100,6 +101,7 @@ input:
allow_producer: self
relay: "" # No default (optional)
node_address: 127.0.0.1:6262
encrypted_fields: []
```
--
Expand Down Expand Up @@ -802,4 +804,13 @@ Sorry! This field is missing documentation.
*Default*: `"127.0.0.1:6262"`
=== `encrypted_fields`
The fields to encrypt in the kafka messages, assuming the record is a valid JSON map. By default, the whole record is encrypted.
*Type*: `array`
*Default*: `[]`
11 changes: 11 additions & 0 deletions docs/modules/components/pages/outputs/ockam_kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ output:
route_to_kafka_outlet: self
allow_consumer: self
route_to_consumer: /ip4/127.0.0.1/tcp/6262
encrypted_fields: []
```
--
Expand Down Expand Up @@ -111,6 +112,7 @@ output:
route_to_kafka_outlet: self
allow_consumer: self
route_to_consumer: /ip4/127.0.0.1/tcp/6262
encrypted_fields: []
```
--
Expand Down Expand Up @@ -896,4 +898,13 @@ Sorry! This field is missing documentation.
*Default*: `"/ip4/127.0.0.1/tcp/6262"`
=== `encrypted_fields`
The fields to encrypt in the kafka messages, assuming the record is a valid JSON map. By default, the whole record is encrypted.
*Type*: `array`
*Default*: `[]`
13 changes: 11 additions & 2 deletions internal/impl/ockam/input_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ func ockamKafkaInputConfig() *service.ConfigSpec {
Field(service.NewStringField("route_to_kafka_outlet").Default("self")).
Field(service.NewStringField("allow_producer").Default("self")).
Field(service.NewStringField("relay").Optional()).
Field(service.NewStringField("node_address").Default("127.0.0.1:6262"))
Field(service.NewStringField("node_address").Default("127.0.0.1:6262")).
Field(service.NewStringListField("encrypted_fields").
Description("The fields to encrypt in the kafka messages, assuming the record is a valid JSON map. By default, the whole record is encrypted.").
Default([]string{}))
}

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -143,7 +146,13 @@ func newOckamKafkaInput(conf *service.ParsedConfig, mgr *service.Resources) (*oc
return nil, err
}

err = n.createKafkaInlet("redpanda-connect-kafka-inlet", kafkaInletAddress, routeToKafkaOutlet, true, "self", allowOutlet, allowProducer, "", disableContentEncryption)
var encryptedFields []string
encryptedFields, err = conf.FieldStringList("encrypted_fields")
if err != nil {
return nil, err
}

err = n.createKafkaInlet("redpanda-connect-kafka-inlet", kafkaInletAddress, routeToKafkaOutlet, true, "self", allowOutlet, allowProducer, "", disableContentEncryption, encryptedFields)
if err != nil {
return nil, err
}
Expand Down
7 changes: 6 additions & 1 deletion internal/impl/ockam/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (n *node) delete() error {
}

// TODO: improve this function's interface
func (n *node) createKafkaInlet(name string, from string, to string, avoidPublishing bool, routeToConsumer string, allowOutlet string, allowProducer string, allowConsumer string, disableContentEncryption bool) error {
func (n *node) createKafkaInlet(name string, from string, to string, avoidPublishing bool, routeToConsumer string, allowOutlet string, allowProducer string, allowConsumer string, disableContentEncryption bool, encryptedFields []string) error {
args := []string{"kafka-inlet", "create", "--addr", name, "--at", n.name, "--from", from, "--to", to}
if routeToConsumer != "" {
args = append(args, "--consumer", routeToConsumer)
Expand All @@ -91,6 +91,11 @@ func (n *node) createKafkaInlet(name string, from string, to string, avoidPublis
args = append(args, "--disable-content-encryption")
}

for _, encryptedField := range encryptedFields {
args = append(args, "--encrypted-field")
args = append(args, encryptedField)
}

args = appendAllowArgs(args, "--allow", allowOutlet, n.identifier)
args = appendAllowArgs(args, "--allow-producer", allowProducer, n.identifier)
args = appendAllowArgs(args, "--allow-consumer", allowConsumer, n.identifier)
Expand Down
13 changes: 11 additions & 2 deletions internal/impl/ockam/output_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ func ockamKafkaOutputConfig() *service.ConfigSpec {
Field(service.NewStringField("allow").Default("self").Optional()).
Field(service.NewStringField("route_to_kafka_outlet").Default("self")).
Field(service.NewStringField("allow_consumer").Default("self")).
Field(service.NewStringField("route_to_consumer").Default("/ip4/127.0.0.1/tcp/6262"))
Field(service.NewStringField("route_to_consumer").Default("/ip4/127.0.0.1/tcp/6262")).
Field(service.NewStringListField("encrypted_fields").
Description("The fields to encrypt in the kafka messages, assuming the record is a valid JSON map. By default, the whole record is encrypted.").
Default([]string{}))
}

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -144,7 +147,13 @@ func newOckamKafkaOutput(conf *service.ParsedConfig, log *service.Logger) (*ocka
return nil, err
}

err = n.createKafkaInlet("redpanda-connect-kafka-inlet", kafkaInletAddress, routeToKafkaOutlet, true, routeToConsumer, allowOutlet, "", allowConsumer, disableContentEncryption)
var encryptedFields []string
encryptedFields, err = conf.FieldStringList("encrypted_fields")
if err != nil {
return nil, err
}

err = n.createKafkaInlet("redpanda-connect-kafka-inlet", kafkaInletAddress, routeToKafkaOutlet, true, routeToConsumer, allowOutlet, "", allowConsumer, disableContentEncryption, encryptedFields)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit df46e90

Please sign in to comment.