-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRulesBasedInsertHarperDB.groovy
38 lines (33 loc) · 1.14 KB
/
RulesBasedInsertHarperDB.groovy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import java.io.File
import groovy.json.JsonSlurper
import groovy.json.JsonOutput
def jsonSlurper = new JsonSlurper()
Map toSend = new HashMap()
List reading = new ArrayList()
def flowFile = session.get()
if(!flowFile) return
flowFile = session.write(flowFile, { inputStream, outputStream ->
def row = jsonSlurper.parseText(IOUtils.toString(inputStream, StandardCharsets.UTF_8))
reading.add(row)
toSend["records"] = reading
toSend["operation"] = "insert"
toSend["schema"] = "reading"
if(row.reading <= 18){
toSend["table"] = "low"
} else if(row.reading >= 22){
toSend["table"] = "high"
} else {
toSend["table"] = "normal"
}
def json = JsonOutput.toJson(toSend)
try {
outputStream.write(json.getBytes(StandardCharsets.UTF_8))
} catch (Exception e){
System.out.println(e.getMessage())
session.transfer(flowFile, REL_FAILURE)
}
} as StreamCallback)
//flowFile = session.putAttribute(flowFile, 'index_name', indexName)
session.transfer(flowFile,REL_SUCCESS)