From 0f92733c9948d617c3f2bb5a7a15611a3d474d98 Mon Sep 17 00:00:00 2001 From: Wazir Ahmed Date: Mon, 4 Jul 2022 18:54:36 +0530 Subject: [PATCH 1/2] refactoring: Removed unnecessary arguments from function calls Signed-off-by: Wazir Ahmed --- src/networkpolicy/helperFunctions.go | 8 ++++---- src/networkpolicy/networkPolicy.go | 2 +- src/plugin/cilium.go | 6 +++--- src/plugin/cilium_test.go | 4 +--- src/server/grpcServer.go | 2 +- 5 files changed, 10 insertions(+), 12 deletions(-) diff --git a/src/networkpolicy/helperFunctions.go b/src/networkpolicy/helperFunctions.go index d45a10fd..5a216ee1 100644 --- a/src/networkpolicy/helperFunctions.go +++ b/src/networkpolicy/helperFunctions.go @@ -746,7 +746,7 @@ func clearTrackFlowIDMaps() { // == File Outputs == // // ================== // -func WriteNetworkPoliciesToFile(cluster, namespace string, services []types.Service) { +func WriteNetworkPoliciesToFile(cluster, namespace string) { // retrieve the latest policies from the db latestPolicies := libs.GetNetworkPolicies(CfgDB, cluster, namespace, "latest", "", "") @@ -754,16 +754,16 @@ func WriteNetworkPoliciesToFile(cluster, namespace string, services []types.Serv // libs.WriteKnoxNetPolicyToYamlFile(namespace, latestPolicies) // convert knoxPolicy to CiliumPolicy - ciliumPolicies := plugin.ConvertKnoxPoliciesToCiliumPolicies(services, latestPolicies) + ciliumPolicies := plugin.ConvertKnoxPoliciesToCiliumPolicies(latestPolicies) // write discovered policies to files libs.WriteCiliumPolicyToYamlFile(namespace, ciliumPolicies) } func GetNetPolicy(cluster, namespace string) *wpb.WorkerResponse { - var services []types.Service latestPolicies := libs.GetNetworkPolicies(CfgDB, cluster, namespace, "latest", "", "") - ciliumPolicies := plugin.ConvertKnoxPoliciesToCiliumPolicies(services, latestPolicies) + log.Info().Msgf("No. of latestPolicies - %d", len(latestPolicies)) + ciliumPolicies := plugin.ConvertKnoxPoliciesToCiliumPolicies(latestPolicies) var response wpb.WorkerResponse for i := range ciliumPolicies { diff --git a/src/networkpolicy/networkPolicy.go b/src/networkpolicy/networkPolicy.go index 6493ab78..9a9f1128 100644 --- a/src/networkpolicy/networkPolicy.go +++ b/src/networkpolicy/networkPolicy.go @@ -1643,7 +1643,7 @@ func PopulateNetworkPoliciesFromNetworkLogs(networkLogs []types.KnoxNetworkLog) // write discovered policies to file if strings.Contains(NetworkPolicyTo, "file") { - WriteNetworkPoliciesToFile(clusterName, namespace, services) + WriteNetworkPoliciesToFile(clusterName, namespace) } log.Info().Msgf("-> Network policy discovery done for namespace: [%s], [%d] policies discovered", namespace, len(newNetPolicies)) diff --git a/src/plugin/cilium.go b/src/plugin/cilium.go index 168cfd08..4da22852 100644 --- a/src/plugin/cilium.go +++ b/src/plugin/cilium.go @@ -537,7 +537,7 @@ func buildNewCiliumNetworkPolicy(inPolicy types.KnoxNetworkPolicy) types.CiliumN return ciliumPolicy } -func ConvertKnoxNetworkPolicyToCiliumPolicy(services []types.Service, inPolicy types.KnoxNetworkPolicy) types.CiliumNetworkPolicy { +func ConvertKnoxNetworkPolicyToCiliumPolicy(inPolicy types.KnoxNetworkPolicy) types.CiliumNetworkPolicy { ciliumPolicy := buildNewCiliumNetworkPolicy(inPolicy) // ====== // @@ -738,11 +738,11 @@ func ConvertKnoxNetworkPolicyToCiliumPolicy(services []types.Service, inPolicy t return ciliumPolicy } -func ConvertKnoxPoliciesToCiliumPolicies(services []types.Service, policies []types.KnoxNetworkPolicy) []types.CiliumNetworkPolicy { +func ConvertKnoxPoliciesToCiliumPolicies(policies []types.KnoxNetworkPolicy) []types.CiliumNetworkPolicy { ciliumPolicies := []types.CiliumNetworkPolicy{} for _, policy := range policies { - ciliumPolicy := ConvertKnoxNetworkPolicyToCiliumPolicy(services, policy) + ciliumPolicy := ConvertKnoxNetworkPolicyToCiliumPolicy(policy) ciliumPolicies = append(ciliumPolicies, ciliumPolicy) } diff --git a/src/plugin/cilium_test.go b/src/plugin/cilium_test.go index 7d7c7783..a687fe32 100644 --- a/src/plugin/cilium_test.go +++ b/src/plugin/cilium_test.go @@ -177,9 +177,7 @@ func TestConvertKnoxPolicyToCiliumPolicy(t *testing.T) { expected := &types.CiliumNetworkPolicy{} json.Unmarshal(ciliumBytes, expected) - svcs := []types.Service{} - - actual := ConvertKnoxNetworkPolicyToCiliumPolicy(svcs, *knoxPolicy) + actual := ConvertKnoxNetworkPolicyToCiliumPolicy(*knoxPolicy) if !cmp.Equal(*expected, actual) { t.Errorf("they should be equal %v %v", expected, actual) } diff --git a/src/server/grpcServer.go b/src/server/grpcServer.go index 71e898d3..d61e1c9f 100644 --- a/src/server/grpcServer.go +++ b/src/server/grpcServer.go @@ -107,7 +107,7 @@ func (s *workerServer) Convert(ctx context.Context, in *wpb.WorkerRequest) (*wpb if in.GetPolicytype() == "network" { log.Info().Msg("Convert network policy called") networker.InitNetPolicyDiscoveryConfiguration() - networker.WriteNetworkPoliciesToFile(in.GetClustername(), in.GetNamespace(), []types.Service{}) + networker.WriteNetworkPoliciesToFile(in.GetClustername(), in.GetNamespace()) return networker.GetNetPolicy(in.Clustername, in.Namespace), nil } else if in.GetPolicytype() == "system" { log.Info().Msg("Convert system policy called") From f5a3b2e98fda613b97e4019773d8133dfbe093ab Mon Sep 17 00:00:00 2001 From: Wazir Ahmed Date: Wed, 6 Jul 2022 04:10:27 +0530 Subject: [PATCH 2/2] cilium: Refactored code to aggregate policy per deployment/endpoint - Generate one policy per deployment/endpoint - Added `updatedTime` flag in the DB `network_policy` table Signed-off-by: Wazir Ahmed --- src/libs/dbHandler.go | 12 + src/libs/dbHandler_test.go | 111 +++--- src/libs/mysqlHandler.go | 49 ++- src/libs/sqliteHandler.go | 48 ++- src/networkpolicy/deduplicator.go | 106 ++---- src/networkpolicy/helperFunctions.go | 4 + src/networkpolicy/networkPolicy.go | 512 +++++++++++++++++++++++++-- src/types/policyData.go | 39 ++ 8 files changed, 723 insertions(+), 158 deletions(-) diff --git a/src/libs/dbHandler.go b/src/libs/dbHandler.go index 878e9ccd..9dcb1211 100644 --- a/src/libs/dbHandler.go +++ b/src/libs/dbHandler.go @@ -87,6 +87,18 @@ func UpdateOutdatedNetworkPolicy(cfg types.ConfigDB, outdatedPolicy string, late } } +func UpdateNetworkPolicy(cfg types.ConfigDB, policy types.KnoxNetworkPolicy) { + if cfg.DBDriver == "mysql" { + if err := UpdateNetworkPolicyToMySQL(cfg, policy); err != nil { + log.Error().Msg(err.Error()) + } + } else if cfg.DBDriver == "sqlite3" { + if err := UpdateNetworkPolicyToSQLite(cfg, policy); err != nil { + log.Error().Msg(err.Error()) + } + } +} + func InsertNetworkPolicies(cfg types.ConfigDB, policies []types.KnoxNetworkPolicy) { if cfg.DBDriver == "mysql" { if err := InsertNetworkPoliciesToMySQL(cfg, policies); err != nil { diff --git a/src/libs/dbHandler_test.go b/src/libs/dbHandler_test.go index b7043504..b37af9b7 100644 --- a/src/libs/dbHandler_test.go +++ b/src/libs/dbHandler_test.go @@ -41,9 +41,10 @@ func TestGetNetworkPolicies(t *testing.T) { "status", // str "outdated", // str "spec", // []byte - "generatedTime", // int + "generatedTime", // uint64 + "updatedTime", // uint64 }). - AddRow("", "test", flowID, "", "", "", "", "", "", "", spec, 0) + AddRow("", "test", flowID, "", "", "", "", "", "", "", spec, 0, 0) mock.ExpectQuery("^SELECT (.+) FROM network_policy*"). WillReturnRows(rows) @@ -71,18 +72,19 @@ func TestInsertNetworkPolicies(t *testing.T) { prep := mock.ExpectPrepare("INSERT INTO network_policy") prep.ExpectExec(). WithArgs( - "", // str - "kind", // str - flowID, // []byte - "", // str - "", // str - "", // str - "", // str - "", // str - "", // str - "", // str - spec, // []byte - 0, // int + "", // str + "kind", // str + flowID, // []byte + "", // str + "", // str + "", // str + "", // str + "", // str + "", // str + "", // str + spec, // []byte + sqlmock.AnyArg(), // uint64 + sqlmock.AnyArg(), // uint64 ).WillReturnResult(sqlmock.NewResult(0, 1)) nfe := []types.KnoxNetworkPolicy{ @@ -100,44 +102,45 @@ func TestInsertNetworkPolicies(t *testing.T) { } func TestInsertNetworkPoliciesSQLite(t *testing.T) { - // prepare mock sqlite - _, mock := NewMock() - - policy := types.KnoxNetworkPolicy{} - - specPtr := &policy.Spec - spec, _ := json.Marshal(specPtr) - - flowIDsPrt := &policy.FlowIDs - flowID, _ := json.Marshal(flowIDsPrt) - - prep := mock.ExpectPrepare("INSERT INTO network_policy") - prep.ExpectExec(). - WithArgs( - "", // str - "kind", // str - flowID, // []byte - "", // str - "", // str - "", // str - "", // str - "", // str - "", // str - "", // str - spec, // []byte - 0, // int - ).WillReturnResult(sqlmock.NewResult(0, 1)) - - nfe := []types.KnoxNetworkPolicy{ - types.KnoxNetworkPolicy{ - Kind: "kind", - }, - } - - err := InsertNetworkPoliciesToSQLite(types.ConfigDB{DBDriver: "sqlite3"}, nfe) - assert.NoError(t, err) - - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf(Unmet+"%s", err) - } + // prepare mock sqlite + _, mock := NewMock() + + policy := types.KnoxNetworkPolicy{} + + specPtr := &policy.Spec + spec, _ := json.Marshal(specPtr) + + flowIDsPrt := &policy.FlowIDs + flowID, _ := json.Marshal(flowIDsPrt) + + prep := mock.ExpectPrepare("INSERT INTO network_policy") + prep.ExpectExec(). + WithArgs( + "", // str + "kind", // str + flowID, // []byte + "", // str + "", // str + "", // str + "", // str + "", // str + "", // str + "", // str + spec, // []byte + sqlmock.AnyArg(), // uint64 + sqlmock.AnyArg(), // uint64 + ).WillReturnResult(sqlmock.NewResult(0, 1)) + + nfe := []types.KnoxNetworkPolicy{ + types.KnoxNetworkPolicy{ + Kind: "kind", + }, + } + + err := InsertNetworkPoliciesToSQLite(types.ConfigDB{DBDriver: "sqlite3"}, nfe) + assert.NoError(t, err) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf(Unmet+"%s", err) + } } diff --git a/src/libs/mysqlHandler.go b/src/libs/mysqlHandler.go index 79e96543..1a7ee9f6 100644 --- a/src/libs/mysqlHandler.go +++ b/src/libs/mysqlHandler.go @@ -83,7 +83,7 @@ func GetNetworkPoliciesFromMySQL(cfg types.ConfigDB, cluster, namespace, status, var results *sql.Rows var err error - query := "SELECT apiVersion,kind,flow_ids,name,cluster_name,namespace,type,rule,status,outdated,spec,generatedTime FROM " + TableNetworkPolicy_TableName + query := "SELECT apiVersion,kind,flow_ids,name,cluster_name,namespace,type,rule,status,outdated,spec,generatedTime,updatedTime FROM " + TableNetworkPolicy_TableName var whereClause string var args []interface{} @@ -140,6 +140,7 @@ func GetNetworkPoliciesFromMySQL(cfg types.ConfigDB, cluster, namespace, status, &policy.Outdated, &specByte, &policy.GeneratedTime, + &policy.UpdatedTime, ); err != nil { return nil, err } @@ -170,6 +171,42 @@ func GetNetworkPoliciesFromMySQL(cfg types.ConfigDB, cluster, namespace, status, return policies, nil } +func UpdateNetworkPolicyToMySQL(cfg types.ConfigDB, policy types.KnoxNetworkPolicy) error { + db := connectMySQL(cfg) + defer db.Close() + + // set status -> outdated + stmt, err := db.Prepare("UPDATE " + TableNetworkPolicy_TableName + + " SET apiVersion=?,kind=?,cluster_name=?,namespace=?,type=?,status=?,outdated=?,spec=?,updatedTime=? WHERE name = ?") + if err != nil { + return err + } + defer stmt.Close() + + specPointer := &policy.Spec + spec, err := json.Marshal(specPointer) + if err != nil { + return err + } + + _, err = stmt.Exec( + policy.APIVersion, + policy.Kind, + policy.Metadata["cluster_name"], + policy.Metadata["namespace"], + policy.Metadata["type"], + policy.Metadata["status"], + policy.Outdated, + spec, + ConvertStrToUnixTime("now"), + policy.Metadata["name"]) + if err != nil { + return err + } + + return nil +} + func UpdateOutdatedNetworkPolicyFromMySQL(cfg types.ConfigDB, outdatedPolicy string, latestPolicy string) error { db := connectMySQL(cfg) defer db.Close() @@ -204,7 +241,7 @@ func UpdateOutdatedNetworkPolicyFromMySQL(cfg types.ConfigDB, outdatedPolicy str } func insertNetworkPolicy(cfg types.ConfigDB, db *sql.DB, policy types.KnoxNetworkPolicy) error { - stmt, err := db.Prepare("INSERT INTO " + TableNetworkPolicy_TableName + "(apiVersion,kind,flow_ids,name,cluster_name,namespace,type,rule,status,outdated,spec,generatedTime) values(?,?,?,?,?,?,?,?,?,?,?,?)") + stmt, err := db.Prepare("INSERT INTO " + TableNetworkPolicy_TableName + "(apiVersion,kind,flow_ids,name,cluster_name,namespace,type,rule,status,outdated,spec,generatedTime,updatedTime) values(?,?,?,?,?,?,?,?,?,?,?,?,?)") if err != nil { return err } @@ -222,6 +259,8 @@ func insertNetworkPolicy(cfg types.ConfigDB, db *sql.DB, policy types.KnoxNetwor return err } + currTime := ConvertStrToUnixTime("now") + _, err = stmt.Exec(policy.APIVersion, policy.Kind, flowids, @@ -233,7 +272,8 @@ func insertNetworkPolicy(cfg types.ConfigDB, db *sql.DB, policy types.KnoxNetwor policy.Metadata["status"], policy.Outdated, spec, - policy.GeneratedTime) + currTime, + currTime) if err != nil { return err } @@ -506,7 +546,8 @@ func CreateTableNetworkPolicyMySQL(cfg types.ConfigDB) error { " `status` varchar(10) DEFAULT NULL," + " `outdated` varchar(50) DEFAULT NULL," + " `spec` JSON DEFAULT NULL," + - " `generatedTime` int DEFAULT NULL," + + " `generatedTime` bigint NOT NULL," + + " `updatedTime` bigint NOT NULL," + " PRIMARY KEY (`id`)" + " );" diff --git a/src/libs/sqliteHandler.go b/src/libs/sqliteHandler.go index 5d9f29ab..fc84e6b9 100644 --- a/src/libs/sqliteHandler.go +++ b/src/libs/sqliteHandler.go @@ -83,7 +83,7 @@ func GetNetworkPoliciesFromSQLite(cfg types.ConfigDB, cluster, namespace, status var results *sql.Rows var err error - query := "SELECT apiVersion,kind,flow_ids,name,cluster_name,namespace,type,rule,status,outdated,spec,generatedTime FROM " + TableNetworkPolicySQLite_TableName + query := "SELECT apiVersion,kind,flow_ids,name,cluster_name,namespace,type,rule,status,outdated,spec,generatedTime,updatedTime FROM " + TableNetworkPolicySQLite_TableName if cluster != "" && namespace != "" && status != "" { query = query + " WHERE cluster_name = ? and namespace = ? and status = ? " results, err = db.Query(query, cluster, namespace, status) @@ -130,6 +130,7 @@ func GetNetworkPoliciesFromSQLite(cfg types.ConfigDB, cluster, namespace, status &policy.Outdated, &specByte, &policy.GeneratedTime, + &policy.UpdatedTime, ); err != nil { return nil, err } @@ -160,6 +161,41 @@ func GetNetworkPoliciesFromSQLite(cfg types.ConfigDB, cluster, namespace, status return policies, nil } +func UpdateNetworkPolicyToSQLite(cfg types.ConfigDB, policy types.KnoxNetworkPolicy) error { + db := connectSQLite(cfg) + defer db.Close() + + stmt, err := db.Prepare("UPDATE " + TableNetworkPolicySQLite_TableName + + " SET apiVersion=?,kind=?,cluster_name=?,namespace=?,type=?,status=?,outdated=?,spec=?,updatedTime=? WHERE name = ?") + if err != nil { + return err + } + defer stmt.Close() + + specPointer := &policy.Spec + spec, err := json.Marshal(specPointer) + if err != nil { + return err + } + + _, err = stmt.Exec( + policy.APIVersion, + policy.Kind, + policy.Metadata["cluster_name"], + policy.Metadata["namespace"], + policy.Metadata["type"], + policy.Metadata["status"], + policy.Outdated, + spec, + ConvertStrToUnixTime("now"), + policy.Metadata["name"]) + if err != nil { + return err + } + + return nil +} + func UpdateOutdatedNetworkPolicyFromSQLite(cfg types.ConfigDB, outdatedPolicy string, latestPolicy string) error { db := connectSQLite(cfg) defer db.Close() @@ -194,7 +230,7 @@ func UpdateOutdatedNetworkPolicyFromSQLite(cfg types.ConfigDB, outdatedPolicy st } func insertNetworkPolicySQLite(cfg types.ConfigDB, db *sql.DB, policy types.KnoxNetworkPolicy) error { - stmt, err := db.Prepare("INSERT INTO " + TableNetworkPolicySQLite_TableName + "(apiVersion,kind,flow_ids,name,cluster_name,namespace,type,rule,status,outdated,spec,generatedTime) values(?,?,?,?,?,?,?,?,?,?,?,?)") + stmt, err := db.Prepare("INSERT INTO " + TableNetworkPolicySQLite_TableName + "(apiVersion,kind,flow_ids,name,cluster_name,namespace,type,rule,status,outdated,spec,generatedTime,updatedTime) values(?,?,?,?,?,?,?,?,?,?,?,?,?)") if err != nil { return err } @@ -212,6 +248,8 @@ func insertNetworkPolicySQLite(cfg types.ConfigDB, db *sql.DB, policy types.Knox return err } + currTime := ConvertStrToUnixTime("now") + _, err = stmt.Exec(policy.APIVersion, policy.Kind, flowids, @@ -223,7 +261,8 @@ func insertNetworkPolicySQLite(cfg types.ConfigDB, db *sql.DB, policy types.Knox policy.Metadata["status"], policy.Outdated, spec, - policy.GeneratedTime) + currTime, + currTime) if err != nil { return err } @@ -484,7 +523,8 @@ func CreateTableNetworkPolicySQLite(cfg types.ConfigDB) error { " `status` varchar(10) DEFAULT NULL," + " `outdated` varchar(50) DEFAULT NULL," + " `spec` JSON DEFAULT NULL," + - " `generatedTime` int DEFAULT NULL," + + " `generatedTime` bigint NOT NULL," + + " `updatedTime` bigint NOT NULL," + " PRIMARY KEY (`id`)" + " );" diff --git a/src/networkpolicy/deduplicator.go b/src/networkpolicy/deduplicator.go index d66f55f5..a0a26888 100644 --- a/src/networkpolicy/deduplicator.go +++ b/src/networkpolicy/deduplicator.go @@ -1,9 +1,7 @@ package networkpolicy import ( - "sort" "strings" - "time" "github.com/accuknox/auto-policy-discovery/src/libs" types "github.com/accuknox/auto-policy-discovery/src/types" @@ -922,85 +920,55 @@ func IsExistingPolicySpec(existingPolicies []types.KnoxNetworkPolicy, newPolicy func UpdateDuplicatedPolicy(existingPolicies []types.KnoxNetworkPolicy, discoveredPolicies []types.KnoxNetworkPolicy, dnsToIPs map[string][]string, clusterName string) []types.KnoxNetworkPolicy { newPolicies := []types.KnoxNetworkPolicy{} - // update policy name map - policyNamesMap := map[string]bool{} - for _, exist := range existingPolicies { - policyNamesMap[exist.Metadata["name"]] = true - } - - // enumerate discovered network policy - for _, policy := range discoveredPolicies { - // step 1: compare the total network policy spec - if IsExistingPolicySpec(existingPolicies, policy) { - continue - } + existIngressPolicies := map[string]types.KnoxNetworkPolicy{} + existEgressPolicies := map[string]types.KnoxNetworkPolicy{} - // step 2: generate policy name - namedPolicy := GeneratePolicyName(policyNamesMap, policy, clusterName) + policyNamesMap := map[string]bool{} + for _, existPolicy := range existingPolicies { + policyNamesMap[existPolicy.Metadata["name"]] = true - // step 3: update existing HTTP rules: egress or ingress - if strings.Contains(policy.Metadata["rule"], "toHTTPs") { - updatedPolicy, updated := UpdateHTTP(namedPolicy, existingPolicies) - if updated { - namedPolicy = updatedPolicy - } + lblArr := getLabelArrayFromMap(existPolicy.Spec.Selector.MatchLabels) + selector := strings.Join(lblArr, ",") + if existPolicy.Metadata["type"] == PolicyTypeIngress { + existIngressPolicies[selector] = existPolicy } else { - // step 4: update existing matchLabels+toPorts rules: egress or ingress - if strings.Contains(policy.Metadata["rule"], "matchLabels") { - updatedPolicy, updated := UpdateMatchLabels(policy, existingPolicies) - if updated { - namedPolicy = updatedPolicy - } - } - - // step 5: update existing CIDR(+toPorts) rules: egress - if strings.Contains(policy.Metadata["rule"], "toCIDRs") && policy.Metadata["type"] == PolicyTypeEgress { - updatedPolicy, updated := UpdateToPorts(namedPolicy, existingPolicies) - if updated { - namedPolicy = updatedPolicy - } - } - - // step 6: update existing FQDN+toPorts rules: egress - if strings.Contains(policy.Metadata["rule"], "toFQDNs") && policy.Metadata["type"] == PolicyTypeEgress { - updatedPolicy, updated := UpdateToPorts(namedPolicy, existingPolicies) - if updated { - namedPolicy = updatedPolicy - } - } + existEgressPolicies[selector] = existPolicy + } + } - // step 7: update existing Entities rules: egress or ingress - if strings.Contains(policy.Metadata["rule"], "Entities") { - updatedPolicy, updated := UpdateEntity(namedPolicy, existingPolicies) + for _, newPolicy := range discoveredPolicies { + lblArr := getLabelArrayFromMap(newPolicy.Spec.Selector.MatchLabels) + selector := strings.Join(lblArr, ",") + if newPolicy.Metadata["type"] == PolicyTypeIngress { + existPolicy, ok := existIngressPolicies[selector] + if ok { + // Ingress policy for this endpoint exists already + mergedPolicy, updated := mergeIngressPolicies(existPolicy, []types.KnoxNetworkPolicy{newPolicy}) if updated { - namedPolicy = updatedPolicy + existIngressPolicies[selector] = mergedPolicy + libs.UpdateNetworkPolicy(CfgDB, mergedPolicy) } + } else { + // Ingress policy for this endpoint does not exists previously + namedPolicy := GeneratePolicyName(policyNamesMap, newPolicy, clusterName) + newPolicies = append(newPolicies, namedPolicy) } - - // step 8: update existing Entities rules: egress - if strings.Contains(policy.Metadata["rule"], "toServices") && policy.Metadata["type"] == PolicyTypeEgress { - updatedPolicy, updated := UpdateService(namedPolicy, existingPolicies) + } else { + existPolicy, ok := existEgressPolicies[selector] + if ok { + // Egress policy for this endpoint exists already + mergedPolicy, updated := mergeEgressPolicies(existPolicy, []types.KnoxNetworkPolicy{newPolicy}) if updated { - namedPolicy = updatedPolicy + existEgressPolicies[selector] = mergedPolicy + libs.UpdateNetworkPolicy(CfgDB, mergedPolicy) } + } else { + // Egress policy for this endpoint does not exists previously + namedPolicy := GeneratePolicyName(policyNamesMap, newPolicy, clusterName) + newPolicies = append(newPolicies, namedPolicy) } } - - // step 9: update status - namedPolicy.Metadata["status"] = "latest" - - // step 10: update generated time - namedPolicy.GeneratedTime = time.Now().Unix() - - newPolicies = append(newPolicies, namedPolicy) } - // step 11: check if existing cidr matchs new fqdn - updateExistCIDRtoNewFQDN(existingPolicies, newPolicies, dnsToIPs) - - sort.Slice(newPolicies, func(i, j int) bool { - return newPolicies[i].Metadata["name"] < newPolicies[j].Metadata["name"] - }) - return newPolicies } diff --git a/src/networkpolicy/helperFunctions.go b/src/networkpolicy/helperFunctions.go index 5a216ee1..1889f6ef 100644 --- a/src/networkpolicy/helperFunctions.go +++ b/src/networkpolicy/helperFunctions.go @@ -127,6 +127,10 @@ func FilterNetworkLogsByConfig(logs []types.KnoxNetworkLog, pods []types.Pod) [] continue } + if log.Protocol == libs.IPProtocolUDP && log.Direction == "TRAFFIC_DIRECTION_UNKNOWN" { + continue + } + if log.Protocol == libs.IPProtocolUDP && log.IsReply { continue } diff --git a/src/networkpolicy/networkPolicy.go b/src/networkpolicy/networkPolicy.go index 9a9f1128..ab9c41e8 100644 --- a/src/networkpolicy/networkPolicy.go +++ b/src/networkpolicy/networkPolicy.go @@ -1495,42 +1495,500 @@ func DiscoverNetworkPolicy(namespace string, services []types.Service, pods []types.Pod) []types.KnoxNetworkPolicy { - // step 1: [network logs] -> {dst: [network logs (src+dst)]} - originLogsPerDst := groupNetworkLogPerDst(networkLogs, services, CIDRBits) - - /* - step 2: {dst: [network logs (src+dst)]} -> {dst: [srcs (labeled)]} - +++ here, we start to track flow IDs +++ - we keep LabeledSrcsPerDst map for aggregating the merged policy set in the future - */ - labeledSrcsPerDst := map[Dst][]SrcSimple{} - if val, ok := LabeledSrcsPerDst[namespace]; ok { - labeledSrcsPerDst = extractSrcByLabel(val, originLogsPerDst, pods) + networkPolicies := []types.KnoxNetworkPolicy{} + + ingressPolicies := map[string][]types.KnoxNetworkPolicy{} + egressPolicies := map[string][]types.KnoxNetworkPolicy{} + + for i := range networkLogs { + ingress, egress := convertKnoxNetworkLogToKnoxNetworkPolicy(&networkLogs[i], pods) + + if ingress != nil { + endpointSelector := getLabelArrayFromMap(ingress.Spec.Selector.MatchLabels) + k := strings.Join(endpointSelector, ",") + ingressPolicies[k] = append(ingressPolicies[k], *ingress) + } + if egress != nil { + endpointSelector := getLabelArrayFromMap(egress.Spec.Selector.MatchLabels) + k := strings.Join(endpointSelector, ",") + egressPolicies[k] = append(egressPolicies[k], *egress) + } + } + + for selector, policies := range ingressPolicies { + if len(policies) > 1 { + mergedPolicy, _ := mergeNetworkPolicies(policies[0], policies[1:]) + ingressPolicies[selector] = []types.KnoxNetworkPolicy{mergedPolicy} + } + } + + for selector, policies := range egressPolicies { + if len(policies) > 1 { + mergedPolicy, _ := mergeNetworkPolicies(policies[0], policies[1:]) + egressPolicies[selector] = []types.KnoxNetworkPolicy{mergedPolicy} + } + } + + for _, p := range ingressPolicies { + networkPolicies = append(networkPolicies, p...) + } + for _, p := range egressPolicies { + networkPolicies = append(networkPolicies, p...) + } + return networkPolicies +} + +func mergeNetworkPolicies(existPolicy types.KnoxNetworkPolicy, policies []types.KnoxNetworkPolicy) (types.KnoxNetworkPolicy, bool) { + if existPolicy.Metadata["type"] == PolicyTypeIngress { + return mergeIngressPolicies(existPolicy, policies) + } + return mergeEgressPolicies(existPolicy, policies) +} + +func mergeIngressPolicies(existPolicy types.KnoxNetworkPolicy, policies []types.KnoxNetworkPolicy) (types.KnoxNetworkPolicy, bool) { + mergedPolicy := existPolicy + updated := false + + var ingressMatched bool + + for _, policy := range policies { + for _, newIngress := range policy.Spec.Ingress { + ingressMatched = false + + if len(newIngress.MatchLabels) > 0 { + lblArr := getLabelArrayFromMap(newIngress.MatchLabels) + newSelector := strings.Join(lblArr, ",") + + for i, existIngress := range mergedPolicy.Spec.Ingress { + if len(existIngress.MatchLabels) == 0 { + continue + } + lblArr = getLabelArrayFromMap(existIngress.MatchLabels) + existSelector := strings.Join(lblArr, ",") + + if newSelector == existSelector { + ingressMatched, updated, mergedPolicy.Spec.Ingress[i].ToHTTPs = mergeHttpRules(existIngress, newIngress) + if ingressMatched { + break + } + } + } + } else if len(newIngress.FromEntities) > 0 { + newEntity := newIngress.FromEntities[0] + + for i, existIngress := range mergedPolicy.Spec.Ingress { + if len(existIngress.FromEntities) == 0 { + continue + } + existEntity := existIngress.FromEntities[0] + + if newEntity == existEntity { + ingressMatched, updated, mergedPolicy.Spec.Ingress[i].ToHTTPs = mergeHttpRules(existIngress, newIngress) + if ingressMatched { + break + } + } + } + } + if !ingressMatched { + mergedPolicy.Spec.Ingress = append(mergedPolicy.Spec.Ingress, newIngress) + updated = true + } + } + } + return mergedPolicy, updated +} + +func mergeEgressPolicies(existPolicy types.KnoxNetworkPolicy, policies []types.KnoxNetworkPolicy) (types.KnoxNetworkPolicy, bool) { + mergedPolicy := existPolicy + updated := false + + var egressMatched bool + + for _, policy := range policies { + for _, newEgress := range policy.Spec.Egress { + egressMatched = false + + if len(newEgress.MatchLabels) > 0 { + lblArr := getLabelArrayFromMap(newEgress.MatchLabels) + newSelector := strings.Join(lblArr, ",") + + for i, existEgress := range mergedPolicy.Spec.Egress { + if len(existEgress.MatchLabels) == 0 { + continue + } + lblArr = getLabelArrayFromMap(existEgress.MatchLabels) + existSelector := strings.Join(lblArr, ",") + + if newSelector == existSelector { + egressMatched, updated, mergedPolicy.Spec.Egress[i].ToHTTPs = mergeHttpRules(existEgress, newEgress) + if egressMatched { + break + } + } + } + } else if len(newEgress.ToEntities) > 0 { + newEntity := newEgress.ToEntities[0] + + for i, existEgress := range mergedPolicy.Spec.Egress { + if len(existEgress.ToEntities) == 0 { + continue + } + existEntity := existEgress.ToEntities[0] + + if newEntity == existEntity { + egressMatched, updated, mergedPolicy.Spec.Egress[i].ToHTTPs = mergeHttpRules(existEgress, newEgress) + if egressMatched { + break + } + } + } + } else if len(newEgress.ToFQDNs) > 0 { + newFQDN := newEgress.ToFQDNs[0].MatchNames[0] + + for i, existEgress := range mergedPolicy.Spec.Egress { + if len(existEgress.ToFQDNs) == 0 { + continue + } + existFQDN := existEgress.ToFQDNs[0].MatchNames[0] + + if newFQDN == existFQDN { + egressMatched, updated, mergedPolicy.Spec.Egress[i].ToHTTPs = mergeHttpRules(existEgress, newEgress) + if egressMatched { + break + } + } + } + } + if !egressMatched { + mergedPolicy.Spec.Egress = append(mergedPolicy.Spec.Egress, newEgress) + updated = true + } + } + } + + return mergedPolicy, updated +} + +func mergeHttpRules(existRule types.L47Rule, newRule types.L47Rule) (bool, bool, []types.SpecHTTP) { + existIcmpRule := existRule.GetICMPRules() + newIcmpRule := newRule.GetICMPRules() + + existPortRule := existRule.GetPortRules() + newPortRule := newRule.GetPortRules() + + existHttpRule := existRule.GetHTTPRules() + newHttpRule := newRule.GetHTTPRules() + + existIsICMP := false + if len(existIcmpRule) > 0 { + existIsICMP = true + } + + newIsICMP := false + if len(newIcmpRule) > 0 { + existIsICMP = true + } + + // Handle Ingress/Egress with ICMP rules + if existIsICMP && !newIsICMP { + return false, false, nil + } else if !existIsICMP && newIsICMP { + return false, false, nil + } else if existIsICMP && newIsICMP { + if existIcmpRule[0].Equal(newIcmpRule[0]) { + return true, false, nil + } + return false, false, nil + } + + // Handling Ingress/Egress with L4 and HTTP rules + mergedHttpRule := existHttpRule + updated := false + if existPortRule[0].Equal(newPortRule[0]) { + for _, h := range newHttpRule { + if !libs.ContainsElement(existHttpRule, h) { + mergedHttpRule = append(mergedHttpRule, h) + updated = true + } + } + return true, updated, mergedHttpRule + } + return false, false, nil +} + +func convertKnoxNetworkLogToKnoxNetworkPolicy(log *types.KnoxNetworkLog, pods []types.Pod) (_, _ *types.KnoxNetworkPolicy) { + var ingressPolicy, egressPolicy *types.KnoxNetworkPolicy = nil, nil + + if log.SrcPodName != "" && log.DstPodName != "" { + // 1. Generate egress policy for the src + // and ingress policy for the dst + + // Ingress/Egress Policy + ePolicy, iPolicy := buildNewKnoxEgressPolicy(), buildNewKnoxIngressPolicy() + + // 1.1 Set the endpoint selector + ePolicy.Spec.Selector.MatchLabels = getEndpointMatchLabels(log.SrcPodName, pods) + iPolicy.Spec.Selector.MatchLabels = getEndpointMatchLabels(log.DstPodName, pods) + + // 1.2 Set the to/from Endpoint selector + egress := types.Egress{} + ingress := types.Ingress{} + egress.MatchLabels = getEndpointMatchLabels(log.DstPodName, pods) + ingress.MatchLabels = getEndpointMatchLabels(log.SrcPodName, pods) + + if log.SrcNamespace != log.DstNamespace { + // cross namespace policy + egress.MatchLabels["io.kubernetes.pod.namespace"] = log.DstNamespace + ingress.MatchLabels["io.kubernetes.pod.namespace"] = log.SrcNamespace + } + + // 1.3 Set the dst port/protocol + if !libs.IsICMP(log.Protocol) { + egress.ToPorts = []types.SpecPort{{Port: strconv.Itoa(log.DstPort), Protocol: libs.GetProtocol(log.Protocol)}} + ingress.ToPorts = append(ingress.ToPorts, egress.ToPorts...) + } else { + // 1.4 Set the icmp code/type + family := "IPv4" + if log.Protocol == libs.IPProtocolICMPv6 { + family = "IPv6" + } + egress.ICMPs = []types.SpecICMP{{Family: family, Type: uint8(log.ICMPType)}} + ingress.ICMPs = append(ingress.ICMPs, egress.ICMPs...) + } + + if log.L7Protocol == libs.L7ProtocolHTTP { + httpRule := types.SpecHTTP{Method: log.HTTPMethod, Path: log.HTTPPath} + egress.ToHTTPs = []types.SpecHTTP{httpRule} + ingress.ToHTTPs = []types.SpecHTTP{httpRule} + } + + ePolicy.Spec.Egress = append(ePolicy.Spec.Egress, egress) + iPolicy.Spec.Ingress = append(iPolicy.Spec.Ingress, ingress) + + ePolicy.Metadata["namespace"] = log.SrcNamespace + iPolicy.Metadata["namespace"] = log.DstNamespace + + egressPolicy = &ePolicy + ingressPolicy = &iPolicy + } else if len(log.SrcReservedLabels) > 0 { + // 2. Generate ingress policy only for the dst + + // Ingress Policy + iPolicy := buildNewKnoxIngressPolicy() + + // 2.1 Set the endpoint selector + iPolicy.Spec.Selector.MatchLabels = getEndpointMatchLabels(log.DstPodName, pods) + + // 2.2 Set the fromEntities selector + ingress := types.Ingress{} + srcEntity := getEntityFromReservedLabels(log.SrcReservedLabels) + if srcEntity != "" { + ingress.FromEntities = append(ingress.FromEntities, srcEntity) + + // 2.3 Set the dst port/protocol + if !libs.IsICMP(log.Protocol) { + ingress.ToPorts = []types.SpecPort{{Port: strconv.Itoa(log.DstPort), Protocol: libs.GetProtocol(log.Protocol)}} + } else { + // 2.4 Set the icmp code/type + family := "IPv4" + if log.Protocol == libs.IPProtocolICMPv6 { + family = "IPv6" + } + ingress.ICMPs = []types.SpecICMP{{Family: family, Type: uint8(log.ICMPType)}} + } + + if log.L7Protocol == libs.L7ProtocolHTTP { + httpRule := types.SpecHTTP{Method: log.HTTPMethod, Path: log.HTTPPath} + ingress.ToHTTPs = []types.SpecHTTP{httpRule} + } + + iPolicy.Spec.Ingress = append(iPolicy.Spec.Ingress, ingress) + iPolicy.Metadata["namespace"] = log.DstNamespace + + ingressPolicy = &iPolicy + } + } else if len(log.DstReservedLabels) > 0 { + // 3. Generate egress policy only for the src + + // Egress Policy + ePolicy := buildNewKnoxEgressPolicy() + + // 3.1 Set the endpoint selector + ePolicy.Spec.Selector.MatchLabels = getEndpointMatchLabels(log.SrcPodName, pods) + + // 3.2 Set the toEntities/ToFQDNs + egress := types.Egress{} + dstEntity := getEntityFromReservedLabels(log.DstReservedLabels) + if dstEntity != "" { + if dstEntity == "world" && log.DNSQuery != "" { + fqdn := types.SpecFQDN{[]string{log.DNSQuery}} + egress.ToFQDNs = append(egress.ToFQDNs, fqdn) + } else { + egress.ToEntities = append(egress.ToEntities, dstEntity) + } + + // 3.3 Set the dst port/protocol + if !libs.IsICMP(log.Protocol) { + egress.ToPorts = []types.SpecPort{{Port: strconv.Itoa(log.DstPort), Protocol: libs.GetProtocol(log.Protocol)}} + } else { + // 3.4 Set the icmp code/type + family := "IPv4" + if log.Protocol == libs.IPProtocolICMPv6 { + family = "IPv6" + } + egress.ICMPs = []types.SpecICMP{{Family: family, Type: uint8(log.ICMPType)}} + } + + if log.L7Protocol == libs.L7ProtocolHTTP { + httpRule := types.SpecHTTP{Method: log.HTTPMethod, Path: log.HTTPPath} + egress.ToHTTPs = []types.SpecHTTP{httpRule} + } + + ePolicy.Spec.Egress = append(ePolicy.Spec.Egress, egress) + ePolicy.Metadata["namespace"] = log.SrcNamespace + egressPolicy = &ePolicy + } + } + + if !isValidPolicy(ingressPolicy) { + ingressPolicy = nil + } + + if !isValidPolicy(egressPolicy) { + egressPolicy = nil + } + + return ingressPolicy, egressPolicy +} + +func isValidPolicy(policy *types.KnoxNetworkPolicy) bool { + if policy == nil { + return false + } + + if len(policy.Spec.Selector.MatchLabels) == 0 { + return false + } + + if policy.Metadata["type"] == PolicyTypeIngress { + if len(policy.Spec.Ingress) == 0 { + return false + } + + for _, ingress := range policy.Spec.Ingress { + if len(ingress.MatchLabels) == 0 && + len(ingress.FromEntities) == 0 && + len(ingress.FromCIDRs) == 0 { + return false + } + + if len(ingress.ToPorts) == 0 && + len(ingress.ICMPs) == 0 { + return false + } + + if len(ingress.ToHTTPs) > 0 { + if len(ingress.ToPorts) == 0 { + return false + } + for _, p := range ingress.ToPorts { + if p.Protocol != "TCP" { + return false + } + } + } + } + } + + if policy.Metadata["type"] == PolicyTypeEgress { + if len(policy.Spec.Egress) == 0 { + return false + } + + for _, egress := range policy.Spec.Egress { + if len(egress.MatchLabels) == 0 && + len(egress.ToEntities) == 0 && + len(egress.ToFQDNs) == 0 && + len(egress.ToCIDRs) == 0 { + return false + } + + if len(egress.ToPorts) == 0 && + len(egress.ICMPs) == 0 { + return false + } + + if len(egress.ToHTTPs) > 0 { + if len(egress.ToPorts) == 0 { + return false + } + for _, p := range egress.ToPorts { + if p.Protocol != "TCP" { + return false + } + } + } + } + } + + return true +} + +func getEntityFromReservedLabels(reservedLabels []string) string { + entities := []string{} + + for _, label := range reservedLabels { + entity := strings.TrimPrefix(label, "reserved:") + entities = append(entities, entity) + } + + // reservedLabels might contain more than one entity name. + // Choose the label which is more unique/specific to + // the src/dst entity and return the entity name + if libs.ContainsElement(entities, "kube-apiserver") { + return "kube-apiserver" + } else if libs.ContainsElement(entities, "world") { + return "world" + } else if libs.ContainsElement(entities, "remote-node") { + return "remote-node" + } else if libs.ContainsElement(entities, "host") { + return "host" } else { - labeledSrcsPerDst = extractSrcByLabel(labeledSrcsPerDst, originLogsPerDst, pods) + return "" } +} - // step 3: {dst: [srcs (labeled)]} -> {dst: [merged srcs (labeled + merged)]} - aggregatedSrcsPerDst := aggregateSrcByLabel(labeledSrcsPerDst, pods) +func getEndpointMatchLabels(podName string, pods []types.Pod) map[string]string { + podLabels := getLabelsFromPod(podName, pods) + matchLabels := getLabelMapFromArray(podLabels) + return matchLabels +} - // step 4: {aggregated_src: [dsts (merged proto/port)]} merging protocols and ports for the same destinations - aggregatedSrcPerMergedDst := mergeDstByProtoPort(aggregatedSrcsPerDst) +func getLabelMapFromArray(labels []string) map[string]string { + labelMap := map[string]string{} - // step 5: {aggregated_src: [dsts (merged proto/port + aggregated_label)] - aggregatedSrcPerAggregatedDst := aggregateDstByLabel(aggregatedSrcPerMergedDst, pods) + for _, label := range labels { + kvPair := strings.Split(label, "=") + if len(kvPair) != 2 { + continue + } + labelMap[kvPair[0]] = kvPair[1] + } - // TODO: This needs to be revisited after issues with L7 policies are fixed - // ----- - // step 6: aggregate HTTP rule (method+path) - // AggregateHTTPRule(aggregatedSrcPerAggregatedDst) + return labelMap +} - // step 7: building network policies - networkPolicies := buildNetworkPolicy(namespace, services, aggregatedSrcPerAggregatedDst) +func getLabelArrayFromMap(labelMap map[string]string) []string { + labels := []string{} - // step 8: update labeledSrcsPerDst map (remove cidr dst/additionals) - LabeledSrcsPerDst[namespace] = updateLabeledSrcPerDst(labeledSrcsPerDst) + for k, v := range labelMap { + labels = append(labels, (k + "=" + v)) + } - return networkPolicies + sort.Strings(labels) + + return labels } func applyPolicyFilter(discoveredPolicies map[string][]types.KnoxNetworkPolicy) map[string][]types.KnoxNetworkPolicy { diff --git a/src/types/policyData.go b/src/types/policyData.go index ac75489f..6cdfe93f 100644 --- a/src/types/policyData.go +++ b/src/types/policyData.go @@ -16,12 +16,20 @@ type SpecICMP struct { Type uint8 `json:"type" yaml:"type,omitempty" bson:"type,omitempty"` } +func (x SpecICMP) Equal(y SpecICMP) bool { + return x.Family == y.Family && x.Type == y.Type +} + // SpecPort Structure type SpecPort struct { Port string `json:"port,omitempty" yaml:"port,omitempty" bson:"port,omitempty"` Protocol string `json:"protocol,omitempty" yaml:"protocol,omitempty" bson:"protocol,omitempty"` } +func (x SpecPort) Equal(y SpecPort) bool { + return x.Port == y.Port && x.Protocol == y.Protocol +} + // SpecService Structure type SpecService struct { ServiceName string `json:"serviceName,omitempty" yaml:"serviceName,omitempty" bson:"serviceName,omitempty"` @@ -69,6 +77,36 @@ type Egress struct { ToHTTPs []SpecHTTP `json:"toHTTPs,omitempty" yaml:"toHTTPs,omitempty" bson:"toHTTPs,omitempty"` } +type L47Rule interface { + GetICMPRules() []SpecICMP + GetPortRules() []SpecPort + GetHTTPRules() []SpecHTTP +} + +func (x Ingress) GetICMPRules() []SpecICMP { + return x.ICMPs +} + +func (x Ingress) GetPortRules() []SpecPort { + return x.ToPorts +} + +func (x Ingress) GetHTTPRules() []SpecHTTP { + return x.ToHTTPs +} + +func (x Egress) GetICMPRules() []SpecICMP { + return x.ICMPs +} + +func (x Egress) GetPortRules() []SpecPort { + return x.ToPorts +} + +func (x Egress) GetHTTPRules() []SpecHTTP { + return x.ToHTTPs +} + // Spec Structure type Spec struct { Selector Selector `json:"selector,omitempty" yaml:"selector,omitempty" bson:"selector,omitempty"` @@ -90,6 +128,7 @@ type KnoxNetworkPolicy struct { Spec Spec `json:"spec,omitempty" yaml:"spec,omitempty" bson:"spec,omitempty"` GeneratedTime int64 `json:"generatedTime,omitempty" yaml:"generatedTime,omitempty" bson:"generatedTime,omitempty"` + UpdatedTime int64 `json:"updatedTime,omitempty" yaml:"updatedTime,omitempty" bson:"updatedTime,omitempty"` } // =========================== //