Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IOT-62: Add a constructor to create RulesProcessor from json. #1

Open
wants to merge 2 commits into
base: IOT-58
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.hortonworks.iotas.common;

import org.apache.commons.lang.StringUtils;

import java.util.Collections;
import java.util.Map;
import java.util.UUID;

Expand Down Expand Up @@ -30,7 +29,7 @@ public IotasEventImpl(Map<String, Object> keyValues, String dataSourceId, String

@Override
public Map<String, Object> getFieldsAndValues() {
return fieldsAndValues;
return Collections.unmodifiableMap(fieldsAndValues);
}

@Override
Expand All @@ -52,7 +51,6 @@ public boolean equals(Object o) {
IotasEventImpl that = (IotasEventImpl) o;

return id.equals(that.id);

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
/**
* Represents any entity that can be stored by our storage layer.
*/
public interface
Storable {
public interface Storable {
//TODO: probably we can remove getNameSpace and getPrimaryKey since we now have getStorableKey.
//TODO: Leaving it for now for discussion purposes, as well as not to break the client code
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,14 @@ public List<Field> getDeclaredInput() {
public void setDeclaredInput(List<Field> declaredInput) {
this.declaredInput = declaredInput;
}

@Override
public String toString() {
return "Component{" +
"id=" + id +
", name='" + name + '\'' +
", description='" + description + '\'' +
", declaredInput=" + declaredInput +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,11 @@ public List<Rule> getRules() {
public void setRules(List<Rule> rules) {
this.rules = rules;
}

@Override
public String toString() {
return "RulesProcessor{" +
"rules=" + rules +
"} " + super.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.hortonworks.iotas.layout.design.component;

import com.hortonworks.iotas.layout.design.rule.Rule;

/**
* Created by pshah on 11/19/15.
*/
public interface RulesProcessorBuilder {
RulesProcessor getRulesProcessor ();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.hortonworks.iotas.layout.design.component;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* Created by pshah on 11/19/15.
*/
public class RulesProcessorJsonBuilder implements RulesProcessorBuilder {
protected static final Logger log = LoggerFactory.getLogger(RulesProcessorJsonBuilder.class);
private final String json;
RulesProcessorJsonBuilder (String json) {
this.json = json;
}

@Override
public RulesProcessor getRulesProcessor () {
ObjectMapper mapper = new ObjectMapper();
RulesProcessor rulesProcessor = null;
try {
rulesProcessor = mapper.readValue(json, RulesProcessor.class);
} catch (IOException e) {
log.error("Error creating RulesProcessor from json string. Check " +
"the json");
e.printStackTrace();
}
return rulesProcessor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hortonworks.iotas.layout.design.component;

/**
* Marker class to clearly identify a {@link Sink} <br><br/>
* A {@link Sink} receives input but does not communicate with any downstream components, hence it emits no output
*/
public class Sink extends Component {
// Sink extending Component is a more accurate representation of the physical world than having Component implement
// a Sink interface because the later implies that Processor "is a" Sink, which is not correct.
// On the other hand Sink "is a" Component
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,12 @@ public void setDeclaredOutput(List<Field> declaredOutput) {
public List<Field> getDeclaredOutput() {
return declaredOutput;
}

@Override
public String toString() {
return "Action{" +
"components=" + components +
", declaredOutput=" + declaredOutput +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,46 @@

package com.hortonworks.iotas.layout.runtime.processor;

import backtype.storm.topology.OutputFieldsDeclarer;
import com.hortonworks.iotas.layout.design.component.RulesProcessor;
import com.hortonworks.iotas.layout.runtime.rule.RuleRuntime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.Collections;
import java.util.List;


/**
* Object representing a design time rules processor.
* Object representing a runtime rules processor
* @param <I> Type of runtime input to this rule, for example {@code Tuple}
* @param <E> Type of object required to execute this rule in the underlying streaming framework e.g {@code IOutputCollector}
*/
public class RuleProcessorRuntime implements Serializable {
public class RuleProcessorRuntime<I, E> implements Serializable {
protected static final Logger log = LoggerFactory.getLogger(RuleProcessorRuntime.class);

protected RulesProcessor rulesProcessor;
protected List<RuleRuntime> rulesRuntime;
protected List<RuleRuntime<I,E>> rulesRuntime;

RuleProcessorRuntime(List<RuleRuntime> rulesRuntime, RulesProcessor rulesProcessor) {
this.rulesRuntime = rulesRuntime;

this.rulesProcessor = rulesProcessor;
}

public List<RuleRuntime> getRulesRuntime() {
return rulesRuntime;
public RuleProcessorRuntime(RuleProcessorRuntimeDependenciesBuilder<I,E> builder) {
this.rulesProcessor = builder.getRulesProcessor();
this.rulesRuntime = builder.getRulesRuntime();
}

public void declareOutput(OutputFieldsDeclarer declarer) {
for (RuleRuntime ruleRuntime:rulesRuntime) {
ruleRuntime.declareOutput(declarer);
}
}

public void setRulesRuntime(List<RuleRuntime> rulesRuntime) {
this.rulesRuntime = rulesRuntime;
public RulesProcessor getRulesProcessor() {
return rulesProcessor;
}

public RulesProcessor getRuleProcessor() {
return rulesProcessor;
public List<RuleRuntime<I, E>> getRulesRuntime() {
return Collections.unmodifiableList(rulesRuntime);
}

public void setRuleProcessor(RulesProcessor rulesProcessor) {
this.rulesProcessor = rulesProcessor;
@Override
public String toString() {
return "RuleProcessorRuntime{" +
"rulesProcessor=" + rulesProcessor +
", rulesRuntime=" + rulesRuntime +
'}';
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hortonworks.iotas.layout.runtime.processor;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.hortonworks.iotas.layout.design.component.RulesProcessor;
import com.hortonworks.iotas.layout.design.component.RulesProcessorBuilder;
import com.hortonworks.iotas.layout.design.component.RulesProcessorJsonBuilder;
import com.hortonworks.iotas.layout.design.rule.Rule;
import com.hortonworks.iotas.layout.runtime.rule.RuleRuntime;
import com.hortonworks.iotas.layout.runtime.rule.RuleRuntimeBuilder;
import org.apache.hadoop.mapred.OutputCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* @param <I> Type of runtime input to this rule, for example {@code Tuple}
* @param <E> Type of object required to execute this rule in the underlying streaming framework e.g {@code IOutputCollector}
*/
public class RuleProcessorRuntimeDependenciesBuilder<I, E> {
protected static final Logger log = LoggerFactory.getLogger(RuleProcessorRuntimeDependenciesBuilder.class);

private final RulesProcessor rulesProcessor;
private final RuleRuntimeBuilder<I,E> ruleRuntimeBuilder;

public RuleProcessorRuntimeDependenciesBuilder(RulesProcessorBuilder rulesProcessorBuilder, RuleRuntimeBuilder<I,E> ruleRuntimeBuilder) {
this.rulesProcessor = rulesProcessorBuilder.getRulesProcessor();
this.ruleRuntimeBuilder = ruleRuntimeBuilder;
}

public List<RuleRuntime<I,E>> getRulesRuntime() {
final List<Rule> rules = rulesProcessor.getRules();
final List<RuleRuntime<I, E>> rulesRuntime = new ArrayList<>();

if (rules != null) {
for (Rule rule : rules) {
ruleRuntimeBuilder.buildExpression(rule);
ruleRuntimeBuilder.buildScriptEngine();
ruleRuntimeBuilder.buildScript();
RuleRuntime<I, E> ruleRuntime = ruleRuntimeBuilder.getRuleRuntime(rule);
rulesRuntime.add(ruleRuntime);
log.trace("Added {}", ruleRuntime);
}
log.debug("Finished building. {}", this);
}
return rulesRuntime;
}

public RulesProcessor getRulesProcessor() {
return rulesProcessor;
}

@Override
public String toString() {
return "RuleProcessorRuntimeDependenciesBuilder{" + rulesProcessor +
", "+ ruleRuntimeBuilder + '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hortonworks.iotas.layout.runtime.rule;

import com.hortonworks.iotas.common.IotasEvent;
import com.hortonworks.iotas.layout.design.rule.Rule;
import com.hortonworks.iotas.layout.design.rule.exception.ConditionEvaluationException;
import com.hortonworks.iotas.layout.runtime.rule.condition.script.Script;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.script.ScriptException;
import java.io.Serializable;

/**
* @param <I> Type of runtime input to this rule, for example {@code Tuple}
* @param <E> Type of object required to execute this rule in the underlying streaming framework e.g {@code IOutputCollector}
*/
public abstract class RuleRuntime<I, E> implements Serializable {
protected static final Logger log = LoggerFactory.getLogger(RuleRuntime.class);

protected final Rule rule;
protected final Script<IotasEvent, ?> script; // Script used to evaluate the condition

RuleRuntime(Rule rule, Script<IotasEvent, ?> script) {
this.rule = rule;
this.script = script;
}

public boolean evaluate(IotasEvent input) {
try {
boolean evaluates = script.evaluate(input);
log.debug("Rule condition evaluated to [{}].\n\t[{}]\n\tInput[{}]", evaluates, rule, input);
return evaluates;
} catch (ScriptException e) {
throw new ConditionEvaluationException("Exception occurred when evaluating rule condition. " + this, e);
}
}

/**
* Executes a {@link Rule}'s Action
* @param input runtime input to this rule, for example, {@code Tuple} for {@code Storm}
* @param executor object required to execute this rule's action in the underlying streaming framework e.g {@code OutputCollector} for {@code Storm}
*/
public abstract void execute(I input, E executor);

@Override
public String toString() {
return "RuleRuntime{" +
"rule=" + rule +
", script=" + script +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@

import java.io.Serializable;

public interface RuleRuntimeBuilder extends Serializable {
/**
* @param <I> Type of runtime input to this rule, for example {@code Tuple}
* @param <E> Type of object required to execute this rule in the underlying streaming framework e.g {@code IOutputCollector}
*/
public interface RuleRuntimeBuilder<I, E> extends Serializable {
Logger log = LoggerFactory.getLogger(RuleRuntimeBuilder.class);

void buildExpression(Rule rule);
Expand All @@ -33,5 +37,5 @@ public interface RuleRuntimeBuilder extends Serializable {

void buildScript();

RuleRuntime getRuleRuntime(Rule rule);
RuleRuntime<I, E> getRuleRuntime(Rule rule);
}
Loading