Skip to content

Commit

Permalink
Add Streaming source impl (#994)
Browse files Browse the repository at this point in the history
* Add Streaming Source Impl

Signed-off-by: Peng Huo <penghuo@gmail.com>

* update build.gradle

Signed-off-by: Peng Huo <penghuo@gmail.com>

* change to hadoop-fs

Signed-off-by: Peng Huo <penghuo@gmail.com>

* exclude FileSystemStreamSource from jacoco

Signed-off-by: Peng Huo <penghuo@gmail.com>

* exclude unnecessary depedency

Signed-off-by: Peng Huo <penghuo@gmail.com>

* Update integ-test depedency

Signed-off-by: Peng Huo <penghuo@gmail.com>

* change from splits to split in batch

Signed-off-by: Peng Huo <penghuo@gmail.com>

Signed-off-by: Peng Huo <penghuo@gmail.com>
  • Loading branch information
penghuo authored Nov 8, 2022
1 parent 48eeb0e commit 5105022
Show file tree
Hide file tree
Showing 14 changed files with 542 additions and 4 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/sql-test-and-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ jobs:
matrix:
entry:
- { os: ubuntu-latest, java: 11 }
- { os: windows-latest, java: 11, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc}
- { os: windows-latest, java: 11, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc -PbuildPlatform=windows }
- { os: macos-latest, java: 11, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc }
- { os: ubuntu-latest, java: 17 }
- { os: windows-latest, java: 17, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc }
- { os: windows-latest, java: 17, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc -PbuildPlatform=windows }
- { os: macos-latest, java: 17, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc }
runs-on: ${{ matrix.entry.os }}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor.streaming;

import lombok.Data;
import org.opensearch.sql.storage.split.Split;

/**
* A batch of streaming execution.
*/
@Data
public class Batch {
private final Split split;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor.streaming;

import lombok.Data;

/**
* Offset.
*/
@Data
public class Offset {

private final Long offset;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor.streaming;

import java.util.Optional;

/**
* Streaming source.
*/
public interface StreamingSource {
/**
* Get current {@link Offset} of stream data.
*
* @return empty if the stream does not has new data.
*/
Optional<Offset> getLatestOffset();

/**
* Get a {@link Batch} from source between (start, end].
*
* @param start start offset.
* @param end end offset.
* @return @link Batch}.
*/
Batch getBatch(Optional<Offset> start, Offset end);
}
21 changes: 21 additions & 0 deletions core/src/main/java/org/opensearch/sql/storage/split/Split.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.storage.split;

import org.opensearch.sql.storage.StorageEngine;

/**
* Split is a sections of a data set. Each {@link StorageEngine} should have specific
* implementation of Split.
*/
public interface Split {

/**
* Get the split id.
* @return split id.
*/
String getSplitId();
}
2 changes: 1 addition & 1 deletion doctest/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ doctest.dependsOn startOpenSearch
startOpenSearch.dependsOn startPrometheus
doctest.finalizedBy stopOpenSearch
stopOpenSearch.finalizedBy stopPrometheus
build.dependsOn doctest
check.dependsOn doctest
clean.dependsOn(cleanBootstrap)

// 2.0.0-alpha1-SNAPSHOT -> 2.0.0.0-alpha1-SNAPSHOT
Expand Down
129 changes: 129 additions & 0 deletions filesystem/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java-library'
id "io.freefair.lombok"
id 'jacoco'
}

ext {
hadoop = "3.3.4"
aws = "1.12.330"
}

configurations.all {
resolutionStrategy.force "commons-io:commons-io:2.8.0"
}

dependencies {
implementation project(':core')
// required by hadoop filesystem https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/index.html.
implementation("org.apache.hadoop:hadoop-common:${hadoop}") {
exclude group: 'org.apache.zookeeper'
exclude group: 'org.eclipse.jetty'
exclude group: 'com.sun.jersey'
exclude group: 'javax.servlet.jsp'
exclude group: 'javax.servlet'
exclude group: 'org.apache.kerby'
exclude group: 'org.apache.curator'
exclude group: 'com.google.protobuf', module: 'protobuf-java'
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'com.nimbusds', module: 'nimbus-jose-jwt'
// enforce version.
exclude group: 'com.fasterxml.woodstox', module: 'woodstox-core'
exclude group: 'commons-io', module: 'commons-io'
exclude group: 'ch.qos.reload4j', module: 'reload4j'
exclude group: 'org.apache.httpcomponents', module: 'httpcore'
}
implementation('com.fasterxml.woodstox:woodstox-core')
constraints {
implementation('com.fasterxml.woodstox:woodstox-core:6.4.0') {
because 'https://www.mend.io/vulnerability-database/CVE-2022-40156'
}
}
implementation('commons-io:commons-io')
constraints {
implementation('commons-io:commons-io:2.8.0') {
because 'between versions 2.8.0 and 2.5'
}
}
implementation('ch.qos.reload4j:reload4j')
constraints {
implementation('ch.qos.reload4j:reload4j:1.2.22') {
because 'between versions 1.2.22 and 1.2.19'
}
}
implementation('org.apache.httpcomponents:httpcore')
constraints {
implementation('org.apache.httpcomponents:httpcore:4.4.15') {
because 'between versions 4.4.15 and 4.4.13'
}
}

testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4'
}

test {
useJUnitPlatform()
testLogging {
events "passed", "skipped", "failed"
exceptionFormat "full"
}

// hadoop-fs depend on native library which is missing on windows.
// https://hadoop.apache.org/docs/r3.3.4/hadoop-project-dist/hadoop-common/NativeLibraries.html#Native_Hadoop_Library
if ('windows' == project.getProperties().getOrDefault('buildPlatform', 'linux')) {
excludes = [
'**/FileSystemStreamSourceTest.class'
]
}
}

jacocoTestReport {
reports {
html.enabled true
xml.enabled true
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
}))
}
}
test.finalizedBy(project.tasks.jacocoTestReport)

jacocoTestCoverageVerification {
violationRules {
rule {
// hadoop-fs depend on native library which is missing on windows.
// https://hadoop.apache.org/docs/r3.3.4/hadoop-project-dist/hadoop-common/NativeLibraries.html#Native_Hadoop_Library
if ('windows' == project.getProperties().getOrDefault('buildPlatform', 'linux')) {
excludes = [
'org.opensearch.sql.filesystem.streaming.FileSystemStreamSource'
]
}
element = 'CLASS'
limit {
counter = 'LINE'
minimum = 1.0
}
limit {
counter = 'BRANCH'
minimum = 1.0
}
}
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
}))
}
}
check.dependsOn jacocoTestCoverageVerification
jacocoTestCoverageVerification.dependsOn jacocoTestReport
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.filesystem.storage.split;

import java.util.Set;
import java.util.UUID;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.apache.hadoop.fs.Path;
import org.opensearch.sql.storage.split.Split;

@Data
public class FileSystemSplit implements Split {

@Getter
@EqualsAndHashCode.Exclude
private final String splitId = UUID.randomUUID().toString();

private final Set<Path> paths;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.filesystem.streaming;

import java.util.Set;
import lombok.Data;
import org.apache.hadoop.fs.Path;

/**
* File metadata. Batch id associate with the set of {@link Path}.
*/
@Data
public class FileMetaData {

private final Long batchId;

private final Set<Path> paths;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.filesystem.streaming;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.SneakyThrows;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.executor.streaming.Batch;
import org.opensearch.sql.executor.streaming.DefaultMetadataLog;
import org.opensearch.sql.executor.streaming.MetadataLog;
import org.opensearch.sql.executor.streaming.Offset;
import org.opensearch.sql.executor.streaming.StreamingSource;
import org.opensearch.sql.filesystem.storage.split.FileSystemSplit;

/**
* FileSystem Streaming Source use Hadoop FileSystem.
*/
public class FileSystemStreamSource implements StreamingSource {

private static final Logger log = LogManager.getLogger(FileSystemStreamSource.class);

private final MetadataLog<FileMetaData> fileMetaDataLog;

private Set<Path> seenFiles;

private final FileSystem fs;

private final Path basePath;

/**
* Constructor of FileSystemStreamSource.
*/
public FileSystemStreamSource(FileSystem fs, Path basePath) {
this.fs = fs;
this.basePath = basePath;
// todo, need to add state recovery
this.fileMetaDataLog = new DefaultMetadataLog<>();
// todo, need to add state recovery
this.seenFiles = new HashSet<>();
}

@SneakyThrows(value = IOException.class)
@Override
public Optional<Offset> getLatestOffset() {
// list all files. todo. improvement list performance.
Set<Path> allFiles =
Arrays.stream(fs.listStatus(basePath))
.filter(status -> !status.isDirectory())
.map(FileStatus::getPath)
.collect(Collectors.toSet());

// find unread files.
log.debug("all files {}", allFiles);
Set<Path> unread = Sets.difference(allFiles, seenFiles);

// update seenFiles.
seenFiles = allFiles;
log.debug("seen files {}", seenFiles);

Optional<Long> latestBatchIdOptional = fileMetaDataLog.getLatest().map(Pair::getKey);
if (!unread.isEmpty()) {
long latestBatchId = latestBatchIdOptional.map(id -> id + 1).orElse(0L);
fileMetaDataLog.add(latestBatchId, new FileMetaData(latestBatchId, unread));
log.debug("latestBatchId {}", latestBatchId);
return Optional.of(new Offset(latestBatchId));
} else {
log.debug("no unread data");
Optional<Offset> offset =
latestBatchIdOptional.isEmpty()
? Optional.empty()
: Optional.of(new Offset(latestBatchIdOptional.get()));
log.debug("return empty offset {}", offset);
return offset;
}
}

@Override
public Batch getBatch(Optional<Offset> start, Offset end) {
Long startBatchId = start.map(Offset::getOffset).map(id -> id + 1).orElse(0L);
Long endBatchId = end.getOffset();

Set<Path> paths =
fileMetaDataLog.get(Optional.of(startBatchId), Optional.of(endBatchId)).stream()
.map(FileMetaData::getPaths)
.flatMap(Set::stream)
.collect(Collectors.toSet());

log.debug("fetch files {} with id from: {} to: {}.", paths, start, end);
return new Batch(new FileSystemSplit(paths));
}
}
Loading

0 comments on commit 5105022

Please sign in to comment.