Skip to content

Commit

Permalink
Add Streaming Plan Impl (#1068)
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <penghuo@gmail.com>
  • Loading branch information
penghuo authored Nov 23, 2022
1 parent 0f5d628 commit 42143b9
Show file tree
Hide file tree
Showing 36 changed files with 1,349 additions and 214 deletions.
7 changes: 1 addition & 6 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,12 @@ plugins {
id 'java-library'
id "io.freefair.lombok"
id 'jacoco'
id 'java-test-fixtures'
}

repositories {
mavenCentral()
}
//
//configurations.all {
// resolutionStrategy.dependencySubstitution {
// substitute module('com.google.guava:guava:26.0-jre') with module('com.google.guava:guava:29.0-jre')
// }
//}

dependencies {
api group: 'com.google.guava', name: 'guava', version: '31.0.1-jre'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
@RequiredArgsConstructor
public class Query extends Statement {

private final UnresolvedPlan plan;
protected final UnresolvedPlan plan;

@Override
public <R, C> R accept(AbstractNodeVisitor<R, C> visitor, C context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@
package org.opensearch.sql.datasource.model;

public enum DataSourceType {
PROMETHEUS,OPENSEARCH
PROMETHEUS,
OPENSEARCH,
FILESYSTEM
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor;

import java.util.Optional;
import lombok.Getter;
import org.opensearch.sql.storage.split.Split;

/**
* Execution context hold planning related information.
*/
public class ExecutionContext {
@Getter
private final Optional<Split> split;

public ExecutionContext(Split split) {
this.split = Optional.of(split);
}

private ExecutionContext(Optional<Split> split) {
this.split = split;
}

public static ExecutionContext emptyExecutionContext() {
return new ExecutionContext(Optional.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@ public interface ExecutionEngine {

/**
* Execute physical plan and call back response listener.
* Todo. deprecated this interface after finalize {@link ExecutionContext}.
*
* @param plan executable physical plan
* @param listener response listener
*/
void execute(PhysicalPlan plan, ResponseListener<QueryResponse> listener);

/**
* Execute physical plan with {@link ExecutionContext} and call back response listener.
*/
void execute(PhysicalPlan plan, ExecutionContext context,
ResponseListener<QueryResponse> listener);

/**
* Explain physical plan and call back response listener. The reason why this has to
* be part of execution engine interface is that the physical plan probably needs to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,13 @@ public interface QueryManager {
* @return {@link QueryId}.
*/
QueryId submit(AbstractPlan queryPlan);

/**
* Cancel submitted {@link AbstractPlan} by {@link QueryId}.
*
* @return true indicate successful.
*/
default boolean cancel(QueryId queryId) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ public void executePlan(LogicalPlan plan,
PlanContext planContext,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
try {
executionEngine.execute(plan(plan), listener);
planContext
.getSplit()
.ifPresentOrElse(
split -> executionEngine.execute(plan(plan), new ExecutionContext(split), listener),
() -> executionEngine.execute(
plan(plan), ExecutionContext.emptyExecutionContext(), listener));
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ public class QueryPlan extends AbstractPlan {
/**
* The query plan ast.
*/
private final UnresolvedPlan plan;
protected final UnresolvedPlan plan;

/**
* Query service.
*/
private final QueryService queryService;
protected final QueryService queryService;

private final ResponseListener<ExecutionEngine.QueryResponse> listener;
protected final ResponseListener<ExecutionEngine.QueryResponse> listener;

/** constructor. */
public QueryPlan(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor.execution;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryService;
import org.opensearch.sql.executor.streaming.DefaultMetadataLog;
import org.opensearch.sql.executor.streaming.MicroBatchStreamingExecution;
import org.opensearch.sql.executor.streaming.StreamingSource;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalPlanNodeVisitor;
import org.opensearch.sql.planner.logical.LogicalRelation;

/**
* Streaming Query Plan.
*/
public class StreamingQueryPlan extends QueryPlan {

private static final Logger log = LogManager.getLogger(StreamingQueryPlan.class);

private final ExecutionStrategy executionStrategy;

private MicroBatchStreamingExecution streamingExecution;

/**
* constructor.
*/
public StreamingQueryPlan(QueryId queryId,
UnresolvedPlan plan,
QueryService queryService,
ResponseListener<ExecutionEngine.QueryResponse> listener,
ExecutionStrategy executionStrategy) {
super(queryId, plan, queryService, listener);

this.executionStrategy = executionStrategy;
}

@Override
public void execute() {
try {
LogicalPlan logicalPlan = queryService.analyze(plan);
StreamingSource streamingSource = buildStreamingSource(logicalPlan);
streamingExecution =
new MicroBatchStreamingExecution(
streamingSource,
logicalPlan,
queryService,
new DefaultMetadataLog<>(),
new DefaultMetadataLog<>());
executionStrategy.execute(streamingExecution::execute);
} catch (UnsupportedOperationException | IllegalArgumentException e) {
listener.onFailure(e);
} catch (InterruptedException e) {
log.error(e);
// todo, update async task status.
}
}

interface ExecutionStrategy {
/**
* execute task.
*/
void execute(Runnable task) throws InterruptedException;
}

/**
* execute task with fixed interval.
* if task run time < interval, trigger next task on next interval.
* if task run time >= interval, trigger next task immediately.
*/
@RequiredArgsConstructor
public static class IntervalTriggerExecution implements ExecutionStrategy {

private final long intervalInSeconds;

@Override
public void execute(Runnable runnable) throws InterruptedException {
while (!Thread.currentThread().isInterrupted()) {
try {
Instant start = Instant.now();
runnable.run();
Instant end = Instant.now();
long took = Duration.between(start, end).toSeconds();
TimeUnit.SECONDS.sleep(intervalInSeconds > took ? intervalInSeconds - took : 0);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}

private StreamingSource buildStreamingSource(LogicalPlan logicalPlan) {
return logicalPlan.accept(new StreamingSourceBuilder(), null);
}

static class StreamingSourceBuilder extends LogicalPlanNodeVisitor<StreamingSource, Void> {
@Override
public StreamingSource visitNode(LogicalPlan plan, Void context) {
List<LogicalPlan> children = plan.getChild();
if (children.isEmpty()) {
String errorMsg =
String.format(
"Could find relation plan, %s does not have child node.",
plan.getClass().getSimpleName());
log.error(errorMsg);
throw new IllegalArgumentException(errorMsg);
}
return children.get(0).accept(this, context);
}

@Override
public StreamingSource visitRelation(LogicalRelation plan, Void context) {
try {
return plan.getTable().asStreamingSource();
} catch (UnsupportedOperationException e) {
String errorMsg =
String.format(
"table %s could not been used as streaming source.", plan.getRelationName());
log.error(errorMsg);
throw new UnsupportedOperationException(errorMsg);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void onResponse(ExecutionEngine.QueryResponse response) {

@Override
public void onFailure(Exception e) {
log.error("streaming processing failed. source = {}", source);
log.error("streaming processing failed. source = {} {}", source, e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.planner.PlanNode;
import org.opensearch.sql.storage.split.Split;

/**
* Physical plan.
Expand All @@ -36,6 +37,10 @@ public void close() {
getChild().forEach(PhysicalPlan::close);
}

public void add(Split split) {
getChild().forEach(child -> child.add(split));
}

public ExecutionEngine.Schema schema() {
throw new IllegalStateException(String.format("[BUG] schema can been only applied to "
+ "ProjectOperator, instead of %s", toString()));
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/opensearch/sql/storage/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.util.Map;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.executor.streaming.StreamingSource;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlan;

Expand Down Expand Up @@ -58,4 +59,10 @@ default LogicalPlan optimize(LogicalPlan plan) {
return plan;
}

/**
* Translate {@link Table} to {@link StreamingSource} if possible.
*/
default StreamingSource asStreamingSource() {
throw new UnsupportedOperationException();
}
}

This file was deleted.

Loading

0 comments on commit 42143b9

Please sign in to comment.