-
Notifications
You must be signed in to change notification settings - Fork 15
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SDFAB-189] Add UPF programmable behaviour (#276)
* Initial move of upf programmable * Cleanup tests for upf programmable * Fix checkstyle * Fix pom file * Add UPF behaviour when pipeconf requires it Also, use do not manage UE limit on physical pipeline. * Force to use local artifacs * Fix read all counters usage and new tests * Update full profile suffix. * Make mocks more generic and independent from upf constants * Add dependency to fabric v1model and remove redundant interfaces * Add missing license headers * Remove dependency to fabric-v1model * Use different name for distributed structures * Rename upf store interface name * Build using snapshots * Fix MockReadRequest * Fix failure to find onos-build-conf Co-authored-by: Carmelo Cascone <carmelo@opennetworking.org>
- Loading branch information
1 parent
bcbb0e7
commit 5dc6523
Showing
27 changed files
with
3,538 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
234 changes: 234 additions & 0 deletions
234
src/main/java/org/stratumproject/fabric/tna/behaviour/upf/DistributedFabricUpfStore.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,234 @@ | ||
// Copyright 2020-present Open Networking Foundation | ||
// SPDX-License-Identifier: LicenseRef-ONF-Member-Only-1.0 | ||
package org.stratumproject.fabric.tna.behaviour.upf; | ||
|
||
import com.google.common.collect.BiMap; | ||
import com.google.common.collect.ImmutableBiMap; | ||
import com.google.common.collect.Maps; | ||
import org.onosproject.net.behaviour.upf.PacketDetectionRule; | ||
import org.onlab.packet.Ip4Address; | ||
import org.onlab.util.ImmutableByteSequence; | ||
import org.onlab.util.KryoNamespace; | ||
import org.onosproject.store.serializers.KryoNamespaces; | ||
import org.onosproject.store.service.ConsistentMap; | ||
import org.onosproject.store.service.DistributedSet; | ||
import org.onosproject.store.service.MapEvent; | ||
import org.onosproject.store.service.MapEventListener; | ||
import org.onosproject.store.service.Serializer; | ||
import org.onosproject.store.service.StorageService; | ||
import org.osgi.service.component.annotations.Activate; | ||
import org.osgi.service.component.annotations.Component; | ||
import org.osgi.service.component.annotations.Deactivate; | ||
import org.osgi.service.component.annotations.Reference; | ||
import org.osgi.service.component.annotations.ReferenceCardinality; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.HashSet; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.Set; | ||
|
||
import static com.google.common.base.Preconditions.checkNotNull; | ||
|
||
/** | ||
* Distributed implementation of FabricUpfStore. | ||
*/ | ||
// FIXME: this store is generic and not tied to a single device, should we have a store based on deviceId? | ||
@Component(immediate = true, service = DistributedFabricUpfStore.class) | ||
public final class DistributedFabricUpfStore implements FabricUpfStore { | ||
|
||
private final Logger log = LoggerFactory.getLogger(getClass()); | ||
|
||
@Reference(cardinality = ReferenceCardinality.MANDATORY) | ||
protected StorageService storageService; | ||
|
||
protected static final String FAR_ID_MAP_NAME = "fabric-upf-far-id-tna"; | ||
protected static final String BUFFER_FAR_ID_SET_NAME = "fabric-upf-buffer-far-id-tna"; | ||
protected static final String FAR_ID_UE_MAP_NAME = "fabric-upf-far-id-ue-tna"; | ||
protected static final KryoNamespace.Builder SERIALIZER = KryoNamespace.newBuilder() | ||
.register(KryoNamespaces.API) | ||
.register(UpfRuleIdentifier.class); | ||
|
||
// Mapping between scheduling priority ranges with Tofino priority queues | ||
// i.e., default queues are 8 in Tofino | ||
private static final BiMap<Integer, Integer> SCHEDULING_PRIORITY_MAP | ||
= new ImmutableBiMap.Builder<Integer, Integer>() | ||
// Highest scheduling priority for 3GPP is 1 and highest Tofino queue priority is 7 | ||
.put(1, 5) | ||
.put(6, 4) | ||
.put(7, 3) | ||
.put(8, 2) | ||
.put(9, 1) | ||
.build(); | ||
|
||
// Distributed local FAR ID to global FAR ID mapping | ||
protected ConsistentMap<UpfRuleIdentifier, Integer> farIdMap; | ||
private MapEventListener<UpfRuleIdentifier, Integer> farIdMapListener; | ||
// Local, reversed copy of farIdMapper for better reverse lookup performance | ||
protected Map<Integer, UpfRuleIdentifier> reverseFarIdMap; | ||
private int nextGlobalFarId = 1; | ||
|
||
protected DistributedSet<UpfRuleIdentifier> bufferFarIds; | ||
protected ConsistentMap<UpfRuleIdentifier, Set<Ip4Address>> farIdToUeAddrs; | ||
|
||
@Activate | ||
protected void activate() { | ||
// Allow unit test to inject farIdMap here. | ||
if (storageService != null) { | ||
this.farIdMap = storageService.<UpfRuleIdentifier, Integer>consistentMapBuilder() | ||
.withName(FAR_ID_MAP_NAME) | ||
.withRelaxedReadConsistency() | ||
.withSerializer(Serializer.using(SERIALIZER.build())) | ||
.build(); | ||
this.bufferFarIds = storageService.<UpfRuleIdentifier>setBuilder() | ||
.withName(BUFFER_FAR_ID_SET_NAME) | ||
.withRelaxedReadConsistency() | ||
.withSerializer(Serializer.using(SERIALIZER.build())) | ||
.build().asDistributedSet(); | ||
this.farIdToUeAddrs = storageService.<UpfRuleIdentifier, Set<Ip4Address>>consistentMapBuilder() | ||
.withName(FAR_ID_UE_MAP_NAME) | ||
.withRelaxedReadConsistency() | ||
.withSerializer(Serializer.using(SERIALIZER.build())) | ||
.build(); | ||
|
||
} | ||
farIdMapListener = new FarIdMapListener(); | ||
farIdMap.addListener(farIdMapListener); | ||
|
||
reverseFarIdMap = Maps.newHashMap(); | ||
farIdMap.entrySet().forEach(entry -> reverseFarIdMap.put(entry.getValue().value(), entry.getKey())); | ||
|
||
log.info("Started"); | ||
} | ||
|
||
@Deactivate | ||
protected void deactivate() { | ||
farIdMap.removeListener(farIdMapListener); | ||
farIdMap.destroy(); | ||
reverseFarIdMap.clear(); | ||
|
||
log.info("Stopped"); | ||
} | ||
|
||
@Override | ||
public void reset() { | ||
farIdMap.clear(); | ||
reverseFarIdMap.clear(); | ||
bufferFarIds.clear(); | ||
farIdToUeAddrs.clear(); | ||
nextGlobalFarId = 0; | ||
} | ||
|
||
@Override | ||
public Map<UpfRuleIdentifier, Integer> getFarIdMap() { | ||
return Map.copyOf(farIdMap.asJavaMap()); | ||
} | ||
|
||
@Override | ||
public int globalFarIdOf(UpfRuleIdentifier farIdPair) { | ||
int globalFarId = farIdMap.compute(farIdPair, | ||
(k, existingId) -> { | ||
return Objects.requireNonNullElseGet(existingId, () -> nextGlobalFarId++); | ||
}).value(); | ||
log.info("{} translated to GlobalFarId={}", farIdPair, globalFarId); | ||
return globalFarId; | ||
} | ||
|
||
@Override | ||
public int globalFarIdOf(ImmutableByteSequence pfcpSessionId, int sessionLocalFarId) { | ||
UpfRuleIdentifier farId = new UpfRuleIdentifier(pfcpSessionId, sessionLocalFarId); | ||
return globalFarIdOf(farId); | ||
|
||
} | ||
|
||
@Override | ||
public String queueIdOf(int schedulingPriority) { | ||
return (SCHEDULING_PRIORITY_MAP.get(schedulingPriority)).toString(); | ||
} | ||
|
||
@Override | ||
public String schedulingPriorityOf(int queueId) { | ||
return (SCHEDULING_PRIORITY_MAP.inverse().get(queueId)).toString(); | ||
} | ||
|
||
@Override | ||
public UpfRuleIdentifier localFarIdOf(int globalFarId) { | ||
return reverseFarIdMap.get(globalFarId); | ||
} | ||
|
||
public void learnFarIdToUeAddrs(PacketDetectionRule pdr) { | ||
UpfRuleIdentifier ruleId = UpfRuleIdentifier.of(pdr.sessionId(), pdr.farId()); | ||
farIdToUeAddrs.compute(ruleId, (k, set) -> { | ||
if (set == null) { | ||
set = new HashSet<>(); | ||
} | ||
set.add(pdr.ueAddress()); | ||
return set; | ||
}); | ||
} | ||
|
||
@Override | ||
public boolean isFarIdBuffering(UpfRuleIdentifier farId) { | ||
checkNotNull(farId); | ||
return bufferFarIds.contains(farId); | ||
} | ||
|
||
@Override | ||
public void learBufferingFarId(UpfRuleIdentifier farId) { | ||
checkNotNull(farId); | ||
bufferFarIds.add(farId); | ||
} | ||
|
||
@Override | ||
public void forgetBufferingFarId(UpfRuleIdentifier farId) { | ||
checkNotNull(farId); | ||
bufferFarIds.remove(farId); | ||
} | ||
|
||
@Override | ||
public void forgetUeAddr(Ip4Address ueAddr) { | ||
farIdToUeAddrs.keySet().forEach( | ||
farId -> farIdToUeAddrs.computeIfPresent(farId, (farIdz, ueAddrs) -> { | ||
ueAddrs.remove(ueAddr); | ||
return ueAddrs; | ||
})); | ||
} | ||
|
||
@Override | ||
public Set<Ip4Address> ueAddrsOfFarId(UpfRuleIdentifier farId) { | ||
return farIdToUeAddrs.getOrDefault(farId, Set.of()).value(); | ||
} | ||
|
||
@Override | ||
public Set<UpfRuleIdentifier> getBufferFarIds() { | ||
return Set.copyOf(bufferFarIds); | ||
} | ||
|
||
@Override | ||
public Map<UpfRuleIdentifier, Set<Ip4Address>> getFarIdToUeAddrs() { | ||
return Map.copyOf(farIdToUeAddrs.asJavaMap()); | ||
} | ||
|
||
// NOTE: FarIdMapListener is run on the same thread intentionally in order to ensure that | ||
// reverseFarIdMap update always finishes right after farIdMap is updated | ||
private class FarIdMapListener implements MapEventListener<UpfRuleIdentifier, Integer> { | ||
@Override | ||
public void event(MapEvent<UpfRuleIdentifier, Integer> event) { | ||
switch (event.type()) { | ||
case INSERT: | ||
reverseFarIdMap.put(event.newValue().value(), event.key()); | ||
break; | ||
case UPDATE: | ||
reverseFarIdMap.remove(event.oldValue().value()); | ||
reverseFarIdMap.put(event.newValue().value(), event.key()); | ||
break; | ||
case REMOVE: | ||
reverseFarIdMap.remove(event.oldValue().value()); | ||
break; | ||
default: | ||
break; | ||
} | ||
} | ||
} | ||
} |
Oops, something went wrong.