Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(#3353): Support sign and encrypt security modes in OPC-UA adapte… #3354

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
<amqp-client.version>5.21.0</amqp-client.version>
<apache-sis-referencing.version>1.2</apache-sis-referencing.version>
<boofcv.version>1.1.0</boofcv.version>
<bcprov-jdk18on.version>1.78.1</bcprov-jdk18on.version>
<classindex.version>3.9</classindex.version>
<checker-qual.version>3.43.0</checker-qual.version>
<commons-codec.version>1.17.0</commons-codec.version>
Expand All @@ -54,7 +55,7 @@
<commons-pool2.version>2.12.0</commons-pool2.version>
<commons-text.version>1.12.0</commons-text.version>
<ditto-client.version>1.0.0</ditto-client.version>
<eclipse.milo.version>0.6.9</eclipse.milo.version>
<eclipse.milo.version>0.6.14</eclipse.milo.version>
<file-management.version>3.1.0</file-management.version>
<flink.version>1.13.5</flink.version>
<fogsy-qudt.version>1.0</fogsy-qudt.version>
Expand Down Expand Up @@ -398,6 +399,16 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<version>${bcprov-jdk18on.version}</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcutil-jdk18on</artifactId>
<version>${bcprov-jdk18on.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.rdf4j</groupId>
<artifactId>rdf4j-rio-turtle</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,18 @@ public enum Envs {

// expects a comma separated string of service names
SP_SERVICE_TAGS("SP_SERVICE_TAGS", ""),
SP_ALLOWED_UPLOAD_FILETYPES("SP_ALLOWED_UPLOAD_FILETYPES", "", "");
SP_ALLOWED_UPLOAD_FILETYPES("SP_ALLOWED_UPLOAD_FILETYPES", "", ""),

// OPC-UA security
SP_OPCUA_SECURITY_DIR("SP_OPCUA_SECURITY_DIR", "/streampipes-security/opcua"),
SP_OPCUA_KEYSTORE_FILE("SP_OPCUA_KEYSTORE_FILE", "keystore.pfx"),
SP_OPCUA_KEYSTORE_PASSWORD("SP_OPCUA_KEYSTORE_PASSWORD", "password"),
SP_OPCUA_KEYSTORE_TYPE("SP_OPCUA_KEYSTORE_TYPE", "PKCS12"),
SP_OPCUA_KEYSTORE_ALIAS("SP_OPCUA_KEYSTORE_ALIAS", "apache-streampipes"),
SP_OPCUA_APPLICATION_URI(
"SP_OPCUA_APPLICATION_URI",
"urn:org:apache:streampipes:opcua:client"
);

private final String envVariableName;
private String defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,4 +328,33 @@ public StringEnvironmentVariable getAllowedUploadFiletypes() {
return new StringEnvironmentVariable(Envs.SP_ALLOWED_UPLOAD_FILETYPES);
}

@Override
public StringEnvironmentVariable getOpcUaSecurityDir() {
return new StringEnvironmentVariable(Envs.SP_OPCUA_SECURITY_DIR);
}

@Override
public StringEnvironmentVariable getOpcUaKeystoreFile() {
return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_FILE);
}

@Override
public StringEnvironmentVariable getOpcUaKeystorePassword() {
return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_PASSWORD);
}

@Override
public StringEnvironmentVariable getOpcUaApplicationUri() {
return new StringEnvironmentVariable(Envs.SP_OPCUA_APPLICATION_URI);
}

@Override
public StringEnvironmentVariable getOPcUaKeystoreType() {
return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_TYPE);
}

@Override
public StringEnvironmentVariable getOpcUaKeystoreAlias() {
return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_ALIAS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ public interface Environment {
IntEnvironmentVariable getServicePort();

StringEnvironmentVariable getSpCoreScheme();

StringEnvironmentVariable getSpCoreHost();

IntEnvironmentVariable getSpCorePort();

// Time series storage env variables
Expand Down Expand Up @@ -144,12 +146,15 @@ public interface Environment {

// Broker defaults
StringEnvironmentVariable getKafkaHost();

IntEnvironmentVariable getKafkaPort();

StringEnvironmentVariable getMqttHost();

IntEnvironmentVariable getMqttPort();

StringEnvironmentVariable getNatsHost();

IntEnvironmentVariable getNatsPort();

StringEnvironmentVariable getPulsarUrl();
Expand All @@ -158,4 +163,15 @@ public interface Environment {

StringEnvironmentVariable getAllowedUploadFiletypes();

StringEnvironmentVariable getOpcUaSecurityDir();

StringEnvironmentVariable getOpcUaKeystoreFile();

StringEnvironmentVariable getOpcUaKeystorePassword();

StringEnvironmentVariable getOpcUaApplicationUri();

StringEnvironmentVariable getOPcUaKeystoreType();

StringEnvironmentVariable getOpcUaKeystoreAlias();
}
4 changes: 2 additions & 2 deletions streampipes-extensions/streampipes-connectors-opcua/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-parent</artifactId>
<artifactId>streampipes-extensions</artifactId>
<version>0.97.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>streampipes-connectors-opcua</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,35 @@
import org.apache.streampipes.extensions.api.migration.IModelMigrator;
import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
import org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV1;
import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV2;
import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV3;
import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV4;
import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaSinkMigrationV1;
import org.apache.streampipes.extensions.connectors.opcua.sink.OpcUaSink;

import java.util.List;

public class OpcUaConnectorsModuleExport implements IExtensionModuleExport {

private final OpcUaClientProvider clientProvider;

public OpcUaConnectorsModuleExport() {
this.clientProvider = new OpcUaClientProvider();
}

@Override
public List<StreamPipesAdapter> adapters() {
return List.of(
new OpcUaAdapter()
new OpcUaAdapter(clientProvider)
);
}

@Override
public List<IStreamPipesPipelineElement<?>> pipelineElements() {
return List.of(
new OpcUaSink()
new OpcUaSink(clientProvider)
);
}

Expand All @@ -50,7 +60,9 @@ public List<IStreamPipesPipelineElement<?>> pipelineElements() {
return List.of(
new OpcUaAdapterMigrationV1(),
new OpcUaAdapterMigrationV2(),
new OpcUaAdapterMigrationV3()
new OpcUaAdapterMigrationV3(),
new OpcUaAdapterMigrationV4(),
new OpcUaSinkMigrationV1()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
import org.apache.streampipes.extensions.connectors.opcua.client.SpOpcUaClient;
import org.apache.streampipes.extensions.connectors.opcua.client.ConnectedOpcUaClient;
import org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaAdapterConfig;
import org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration;
import org.apache.streampipes.extensions.connectors.opcua.config.SpOpcUaConfigExtractor;
Expand All @@ -42,7 +43,6 @@
import org.apache.streampipes.model.connect.rules.schema.DeleteRuleDescription;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
import org.apache.streampipes.sdk.helpers.Alternatives;
import org.apache.streampipes.sdk.helpers.Labels;
Expand All @@ -66,7 +66,6 @@
import java.util.stream.Collectors;

import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ADAPTER_TYPE;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULLING_INTERVAL;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULL_MODE;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.SUBSCRIPTION_MODE;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaUtil.getSchema;
Expand All @@ -78,7 +77,9 @@ public class OpcUaAdapter implements StreamPipesAdapter, IPullAdapter, SupportsR
private static final Logger LOG = LoggerFactory.getLogger(OpcUaAdapter.class);

private int pullingIntervalMilliSeconds;
private SpOpcUaClient<OpcUaAdapterConfig> spOpcUaClient;
private final OpcUaClientProvider clientProvider;
private ConnectedOpcUaClient connectedClient;
private OpcUaAdapterConfig opcUaAdapterConfig;
private List<OpcNode> allNodes;
private List<NodeId> allNodeIds;
private int numberProperties;
Expand All @@ -92,15 +93,14 @@ public class OpcUaAdapter implements StreamPipesAdapter, IPullAdapter, SupportsR
*/
private final Map<String, String> nodeIdToLabelMapping;

public OpcUaAdapter() {
super();
public OpcUaAdapter(OpcUaClientProvider clientProvider) {
this.clientProvider = clientProvider;
this.numberProperties = 0;
this.event = new HashMap<>();
this.nodeIdToLabelMapping = new HashMap<>();
}

private void prepareAdapter(IAdapterParameterExtractor extractor) throws AdapterException {

this.allNodeIds = new ArrayList<>();
List<String> deleteKeys = extractor
.getAdapterDescription()
Expand All @@ -111,21 +111,21 @@ private void prepareAdapter(IAdapterParameterExtractor extractor) throws Adapter
.collect(Collectors.toList());

try {
this.spOpcUaClient.connect();
this.connectedClient = clientProvider.getClient(this.opcUaAdapterConfig);
OpcUaNodeBrowser browserClient =
new OpcUaNodeBrowser(this.spOpcUaClient.getClient(), this.spOpcUaClient.getSpOpcConfig());
new OpcUaNodeBrowser(this.connectedClient.getClient(), this.opcUaAdapterConfig);
this.allNodes = browserClient.findNodes(deleteKeys);


for (OpcNode node : this.allNodes) {
this.allNodeIds.add(node.getNodeId());
}

if (spOpcUaClient.getSpOpcConfig().inPullMode()) {
this.pullingIntervalMilliSeconds = spOpcUaClient.getSpOpcConfig().getPullIntervalMilliSeconds();
if (opcUaAdapterConfig.inPullMode()) {
this.pullingIntervalMilliSeconds = opcUaAdapterConfig.getPullIntervalMilliSeconds();
} else {
this.numberProperties = this.allNodeIds.size();
this.spOpcUaClient.createListSubscription(this.allNodeIds, this);
this.connectedClient.createListSubscription(this.allNodeIds, this);
}

this.allNodes.forEach(node -> this.nodeIdToLabelMapping.put(node.getNodeId().toString(), node.getLabel()));
Expand All @@ -139,7 +139,7 @@ private void prepareAdapter(IAdapterParameterExtractor extractor) throws Adapter
@Override
public void pullData() throws ExecutionException, RuntimeException, InterruptedException, TimeoutException {
var response =
this.spOpcUaClient.getClient().readValues(0, TimestampsToReturn.Both, this.allNodeIds);
this.connectedClient.getClient().readValues(0, TimestampsToReturn.Both, this.allNodeIds);
boolean badStatusCodeReceived = false;
boolean emptyValueReceived = false;
List<DataValue> returnValues =
Expand Down Expand Up @@ -168,7 +168,7 @@ public void pullData() throws ExecutionException, RuntimeException, InterruptedE

private boolean shouldSkipEvent(boolean badStatusCodeReceived) {
return badStatusCodeReceived
&& this.spOpcUaClient.getSpOpcConfig().getIncompleteEventStrategy()
&& this.opcUaAdapterConfig.getIncompleteEventStrategy()
.equalsIgnoreCase(SharedUserConfiguration.INCOMPLETE_OPTION_IGNORE);
}

Expand Down Expand Up @@ -208,13 +208,13 @@ public PollingSettings getPollingInterval() {
public void onAdapterStarted(IAdapterParameterExtractor extractor,
IEventCollector collector,
IAdapterRuntimeContext adapterRuntimeContext) throws AdapterException {
this.spOpcUaClient = new SpOpcUaClient<>(
SpOpcUaConfigExtractor.extractAdapterConfig(extractor.getStaticPropertyExtractor())
);
this.opcUaAdapterConfig =
SpOpcUaConfigExtractor.extractAdapterConfig(extractor.getStaticPropertyExtractor());
//this.connectedClient = clientProvider.getClient(this.opcUaAdapterConfig);
this.collector = collector;
this.prepareAdapter(extractor);

if (this.spOpcUaClient.getSpOpcConfig().inPullMode()) {
if (this.opcUaAdapterConfig.inPullMode()) {
this.pullAdapterScheduler = new PullAdapterScheduler();
this.pullAdapterScheduler.schedule(this, extractor.getAdapterDescription().getElementId());
}
Expand All @@ -223,22 +223,22 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor,
@Override
public void onAdapterStopped(IAdapterParameterExtractor extractor,
IAdapterRuntimeContext adapterRuntimeContext) throws AdapterException {
this.spOpcUaClient.disconnect();
clientProvider.releaseClient(this.opcUaAdapterConfig);

if (this.spOpcUaClient.getSpOpcConfig().inPullMode()) {
if (this.opcUaAdapterConfig.inPullMode()) {
this.pullAdapterScheduler.shutdown();
}
}

@Override
public StaticProperty resolveConfiguration(String staticPropertyInternalName,
IStaticPropertyExtractor extractor) throws SpConfigurationException {
return OpcUaUtil.resolveConfig(staticPropertyInternalName, extractor);
return OpcUaUtil.resolveConfig(clientProvider, staticPropertyInternalName, extractor);
}

@Override
public IAdapterConfiguration declareConfig() {
var builder = AdapterConfigurationBuilder.create(ID, 3, OpcUaAdapter::new)
var builder = AdapterConfigurationBuilder.create(ID, 4, () -> new OpcUaAdapter(clientProvider))
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withLocales(Locales.EN)
.withCategory(AdapterType.Generic, AdapterType.Manufacturing)
Expand All @@ -255,6 +255,6 @@ public IAdapterConfiguration declareConfig() {
@Override
public GuessSchema onSchemaRequested(IAdapterParameterExtractor extractor,
IAdapterGuessSchemaContext adapterGuessSchemaContext) throws AdapterException {
return getSchema(extractor);
return getSchema(clientProvider, extractor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.eclipse.milo.opcua.sdk.client.AddressSpace;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.UaClient;
import org.eclipse.milo.opcua.sdk.client.nodes.UaNode;
import org.eclipse.milo.opcua.sdk.client.nodes.UaVariableNode;
import org.eclipse.milo.opcua.stack.core.Identifiers;
Expand All @@ -45,13 +46,13 @@

public class OpcUaNodeBrowser {

private final OpcUaClient client;
private final UaClient client;
private final OpcUaConfig spOpcConfig;

private static final Logger LOG = LoggerFactory.getLogger(OpcUaNodeBrowser.class);

public OpcUaNodeBrowser(
OpcUaClient client,
UaClient client,
OpcUaConfig spOpcUaClientConfig
) {
this.client = client;
Expand Down Expand Up @@ -127,7 +128,7 @@ private OpcNode toOpcNode(String nodeName) throws UaException {
}

private List<TreeInputNode> findChildren(
OpcUaClient client,
UaClient client,
NodeId nodeId
) throws UaException {
return client
Expand Down
Loading
Loading