Skip to content

Commit

Permalink
Updated StanfordCoreNLPClient so that it won't fallback to local proc…
Browse files Browse the repository at this point in the history
…essing. Fixed problem preventing updates to processor config properties. Simplified StanfordCoreNLPService.
  • Loading branch information
Drew Kerrigan committed Jan 29, 2020
1 parent f244588 commit aa8763f
Show file tree
Hide file tree
Showing 8 changed files with 906 additions and 135 deletions.
6 changes: 3 additions & 3 deletions nifi-stanfordcorenlp-nar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
<parent>
<groupId>com.iss.nifi</groupId>
<artifactId>nifi-stanfordcorenlp-processor</artifactId>
<version>1.0</version>
<version>1.1</version>
</parent>

<artifactId>nifi-stanfordcorenlp-nar</artifactId>
<version>1.0</version>
<version>1.1</version>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
Expand All @@ -34,7 +34,7 @@
<dependency>
<groupId>com.iss.nifi</groupId>
<artifactId>nifi-stanfordcorenlp-processors</artifactId>
<version>1.0</version>
<version>1.1</version>
</dependency>
</dependencies>

Expand Down
2 changes: 1 addition & 1 deletion nifi-stanfordcorenlp-processors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>com.iss.nifi</groupId>
<artifactId>nifi-stanfordcorenlp-processor</artifactId>
<version>1.0</version>
<version>1.1</version>
</parent>

<artifactId>nifi-stanfordcorenlp-processors</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* MIT License
*
* Copyright (c) 2019 Institutional Shareholder Services. All other rights reserved.
* Copyright (c) 2020 Institutional Shareholder Services. All other rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -66,6 +66,8 @@
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;

import edu.stanford.nlp.pipeline.AnnotationPipeline;

@Tags({ "Stanford", "CoreNLP" })
@CapabilityDescription("Stanford CoreNLP Processor")
@SeeAlso({})
Expand Down Expand Up @@ -157,7 +159,8 @@ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {

@OnScheduled
public void onScheduled(final ProcessContext context) throws Exception {
ensureService(context);
getLogger().debug("OnScheduled called for StanfordCoreNLPProcessor, refreshing StanfordCoreNLPService");
service = new StanfordCoreNLPService(createPipeline(context));
}

@Override
Expand All @@ -169,22 +172,22 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
flowFile = session.create();
}

String flowFileText = getTextFromSession(session, flowFile);
final String flowFileText = getTextFromSession(session, flowFile);

if (flowFileText == null || flowFileText.isEmpty()) {
getLogger().error("Empty flow file cannot be analyzed");
session.transfer(flowFile, FAILURE_RELATIONSHIP);
return;
}

String jsonPath = context.getProperty(PATH_ATTR).evaluateAttributeExpressions(flowFile).getValue();
String entityTypes = context.getProperty(ENTITIES_ATTR).evaluateAttributeExpressions(flowFile).getValue();
String text = getTextFromJson(flowFileText, jsonPath);
final String jsonPath = context.getProperty(PATH_ATTR).evaluateAttributeExpressions(flowFile).getValue();
final String entityTypes = context.getProperty(ENTITIES_ATTR).evaluateAttributeExpressions(flowFile).getValue();
final String text = getTextFromJson(flowFileText, jsonPath);
Map<String, List<String>> entityMap;

try {
entityMap = service.extractEntities(text, entityTypes);
} catch (Exception e) {
} catch (final Exception e) {
e.printStackTrace();
getLogger().error("Failed to analyze flow file text");
session.transfer(flowFile, FAILURE_RELATIONSHIP);
Expand All @@ -193,136 +196,142 @@ public void onTrigger(final ProcessContext context, final ProcessSession session

Map<String, Object> flowFileJsonMap;

Gson gson = new Gson();
final Gson gson = new Gson();
try {
flowFileJsonMap = gson.fromJson(flowFileText, Map.class);
} catch (JsonSyntaxException e) {
} catch (final JsonSyntaxException e) {
e.printStackTrace();
getLogger().warn("Failed to parse flow file text as json, writing new flow file from blank json document");
flowFileJsonMap = new HashMap<String, Object>();
}

try {
for (String k : entityMap.keySet()) {
for (final String k : entityMap.keySet()) {
flowFileJsonMap.put(k, entityMap.get(k));
}

String entityJson = gson.toJson(entityMap);
String finalJson = gson.toJson(flowFileJsonMap);
final String entityJson = gson.toJson(entityMap);
final String finalJson = gson.toJson(flowFileJsonMap);

flowFile = session.putAttribute(flowFile, OUTPUT_ATTR, entityJson);
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
public void process(final OutputStream out) throws IOException {
out.write(finalJson.getBytes());
}
});

session.transfer(flowFile, SUCCESS_RELATIONSHIP);
return;
} catch (Exception e) {
} catch (final Exception e) {
e.printStackTrace();
getLogger().warn("Failed to generate flow file or attributes");
}

session.transfer(flowFile, FAILURE_RELATIONSHIP);
}

private String getTextFromSession(final ProcessSession session, FlowFile flowFile) {
private String getTextFromSession(final ProcessSession session, final FlowFile flowFile) {
final AtomicReference<String> atomicText = new AtomicReference<>();

session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
public void process(final InputStream in) throws IOException {
try {
String rawText = IOUtils.toString(in);
final String rawText = IOUtils.toString(in);
atomicText.set(rawText);
} catch (NullPointerException e) {
} catch (final NullPointerException e) {
e.printStackTrace();
getLogger().warn("FlowFile text was null");
} catch (IOException e) {
} catch (final IOException e) {
e.printStackTrace();
getLogger().error("FlowFile text could not be read due to IOException");
}
}
});

String text = atomicText.get();
final String text = atomicText.get();
if (text == null || text.isEmpty()) {
return null;
}

return text;
}

private String getTextFromJson(String flowFileText, String jsonPath) {
private String getTextFromJson(final String flowFileText, final String jsonPath) {
if (jsonPath == null || jsonPath.isEmpty()) {
return flowFileText;
}

try {
Configuration conf = Configuration.builder().options(Option.ALWAYS_RETURN_LIST).build();
List<String> result = JsonPath.using(conf).parse(flowFileText).read(jsonPath);
String combined = String.join(" ", result);
getLogger().info("Extracted this text from the flow file with the configured json path: " + combined);
final Configuration conf = Configuration.builder().options(Option.ALWAYS_RETURN_LIST).build();
final List<String> result = JsonPath.using(conf).parse(flowFileText).read(jsonPath);
final String combined = String.join(" ", result);
return combined;
} catch (ClassCastException e) {
LinkedHashMap<String, Object> resultMap = JsonPath.read(flowFileText, jsonPath);
} catch (final ClassCastException e) {
final LinkedHashMap<String, Object> resultMap = JsonPath.read(flowFileText, jsonPath);
String combined = "";
for (String k : resultMap.keySet()) {
for (final String k : resultMap.keySet()) {
combined += " " + resultMap.get(k);
}
getLogger().info("Extracted this text from the flow file with the configured json path: " + combined);
return combined;
} catch (Exception e) {
} catch (final Exception e) {
e.printStackTrace();
getLogger().warn("Failed to parse json using specified json path, analyzing flow file as text");
}

return flowFileText;
}

private void ensureService(final ProcessContext context) {
if (service != null) {
return;
}
String jsonProps = context.getProperty(PROPS_ATTR).getValue();
Properties props = jsonToProps(jsonProps);

String host = context.getProperty(HOST_ATTR).getValue();

if (host == null) {
service = new StanfordCoreNLPService(props);
return;
}

private int getPort(final ProcessContext context) {
int port;
try {
port = context.getProperty(PORT_ATTR).asInteger();
} catch (NumberFormatException e) {
} catch (final NumberFormatException e) {
e.printStackTrace();
getLogger().error("Failed to read port as integer, using default 9000");
port = 9000;
}
return port;
}

private AnnotationPipeline createPipeline(final ProcessContext context) {
final String jsonProps = context.getProperty(PROPS_ATTR).getValue();
final Properties props = jsonToProps(jsonProps);
final String host = context.getProperty(HOST_ATTR).getValue();

if (host == null) {
return StanfordCoreNLPService.createPipeline(props);
}

final int port = getPort(context);
final String key = context.getProperty(KEY_ATTR).getValue();
final String secret = context.getProperty(SECRET_ATTR).getValue();

String key = context.getProperty(KEY_ATTR).getValue();
String secret = context.getProperty(SECRET_ATTR).getValue();
return StanfordCoreNLPService.createPipeline(props, host, port, key, secret);
}

private void ensureService(final ProcessContext context) {
if (service != null) {
return;
}

service = new StanfordCoreNLPService(props, host, port, key, secret);
service = new StanfordCoreNLPService(createPipeline(context));
return;
}

private Properties jsonToProps(String jsonProps) {
Properties props = new Properties();
private Properties jsonToProps(final String jsonProps) {
final Properties props = new Properties();
if (jsonProps == null) {
return props;
}
Gson gson = new Gson();
final Gson gson = new Gson();
try {
Map<String, Object> jsonMap = gson.fromJson(jsonProps, Map.class);
for (String k : jsonMap.keySet()) {
final Map<String, Object> jsonMap = gson.fromJson(jsonProps, Map.class);
for (final String k : jsonMap.keySet()) {
props.setProperty(k, jsonMap.get(k).toString());
}
} catch (JsonSyntaxException e) {
} catch (final JsonSyntaxException e) {
e.printStackTrace();
getLogger().error("Failed to read json string.");
}
Expand Down
Loading

0 comments on commit aa8763f

Please sign in to comment.