Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup #490

Merged
merged 2 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ public static void main(String[] args){

/* Get a plan builder */
WayangContext wayangContext = new WayangContext(new Configuration())
// .withPlugin(Java.basicPlugin())
// .withPlugin(Spark.basicPlugin());
.withPlugin(Flink.basicPlugin());
.withPlugin(Java.basicPlugin())
.withPlugin(Spark.basicPlugin());
// .withPlugin(Flink.basicPlugin());

JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
.withJobName("WordCount")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ public KafkaTopicSink(String topicName, Class<T> typeClass) {
)
)
);
System.out.println("### 11 ... ");

}


Expand All @@ -92,8 +90,6 @@ public KafkaTopicSink(String topicName,
topicName,
new TransformationDescriptor<>(formattingFunction, typeClass, String.class)
);
System.out.println("### 12 ... ");

}

/**
Expand All @@ -106,7 +102,6 @@ public KafkaTopicSink(String topicName, TransformationDescriptor<T, String> form
super(DataSetType.createDefault(formattingDescriptor.getInputType()));
this.topicName = topicName;
this.formattingDescriptor = formattingDescriptor;
System.out.println("### 13 ... ");
}

/**
Expand All @@ -118,7 +113,6 @@ public KafkaTopicSink(KafkaTopicSink<T> that) {
super(that);
this.topicName = that.topicName;
this.formattingDescriptor = that.formattingDescriptor;
System.out.println("### 14 ... ");
}

boolean isInitialized = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,12 @@ public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> eval

logger.info("---> WRITE TO KAFKA SINK...");

logger.info("### 9 ... ");

JavaChannelInstance input = (JavaChannelInstance) inputs[0];

initProducer( (KafkaTopicSink<T>) this );

final Function<T, String> formatter = javaExecutor.getCompiler().compile(this.formattingDescriptor);

logger.info("### 10 ... ");

try ( KafkaProducer<String,String> producer = getProducer() ) {
input.<T>provideStream().forEach(
dataQuantum -> {
Expand Down Expand Up @@ -121,8 +117,6 @@ public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> eval
throw new WayangException("Writing to Kafka topic failed.", e);
}

logger.info("### 11 ... ");

return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ProtocolException;
import java.net.URL;
import java.net.*;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -45,7 +44,6 @@
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.stream.Stream;

Expand Down Expand Up @@ -82,26 +80,15 @@ public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> eval
assert inputs.length == this.getNumInputs();
assert outputs.length == this.getNumOutputs();


String urlStr = this.getInputUrl().trim();
URL sourceUrl = null;

try {

FileSystem fs = FileSystems.getFileSystem(urlStr).get(); //.orElseThrow(
//() -> new WayangException(String.format("FileSystems.getFileSystem( urlStr ).get() => Cannot access file system of %s. ", urlStr))
//);

final InputStream inputStream = fs.open(urlStr);
Stream<String> lines = new BufferedReader(new InputStreamReader(inputStream)).lines();
((StreamChannel.Instance) outputs[0]).accept(lines);

}
catch (Exception e) {

try {

URL url = new URL(urlStr);

HttpURLConnection connection2 = (HttpURLConnection) url.openConnection();
sourceUrl = new URL(urlStr);
String protocol = sourceUrl.getProtocol();
if ( protocol.startsWith("https") || protocol.startsWith("http") ) {
HttpURLConnection connection2 = (HttpURLConnection) sourceUrl.openConnection();
connection2.setRequestMethod("GET");

// Check if the response code indicates success (HTTP status code 200)
Expand All @@ -112,12 +99,21 @@ public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> eval
((StreamChannel.Instance) outputs[0]).accept(lines2);
}
}
catch (IOException ioException) {
ioException.printStackTrace();
throw new WayangException(String.format("Reading from URL: %s failed.", urlStr), ioException);
else {
FileSystem fs = FileSystems.getFileSystem(urlStr).orElseThrow(
() -> new WayangException(String.format("Cannot access file system of %s.", urlStr))
);

final InputStream inputStream = fs.open(urlStr);
Stream<String> lines = new BufferedReader(new InputStreamReader(inputStream)).lines();
((StreamChannel.Instance) outputs[0]).accept(lines);
}

// connection2.disconnect();
} catch (MalformedURLException e) {
throw new RuntimeException(e);
} catch (ProtocolException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new WayangException(String.format("Reading %s failed.", urlStr), e);
}

ExecutionLineageNode prepareLineageNode = new ExecutionLineageNode(operatorContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.java.Java;
import org.apache.wayang.tensorflow.Tensorflow;
import org.junit.Ignore;
import org.junit.Test;

import java.util.ArrayList;
Expand All @@ -38,6 +39,7 @@

/**
* Test the Tensorflow integration with Wayang.
* Note: this test fails on M1 Macs because of Tensorflow-Java incompatibility.
*/
public class TensorflowIntegrationIT {

Expand Down Expand Up @@ -66,7 +68,7 @@ public class TensorflowIntegrationIT {

public static String[] LABELS = new String[]{"Iris-setosa", "Iris-versicolor", "Iris-virginica"};

@Test
@Ignore
public void test() {
/* training features */
CollectionSource<float[]> trainXSource = new CollectionSource<>(trainX, float[].class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.wayang.core.util.WayangCollections;
import org.apache.wayang.java.Java;
import org.apache.wayang.tensorflow.Tensorflow;
import org.junit.Ignore;
import org.junit.Test;

import java.net.URI;
Expand All @@ -42,6 +43,7 @@

/**
* Test the Tensorflow integration with Wayang.
* Note: this test fails on M1 Macs because of Tensorflow-Java incompatibility.
*/
public class TensorflowIrisIT {

Expand All @@ -54,7 +56,7 @@ public class TensorflowIrisIT {
"Iris-virginica", 2
);

@Test
@Ignore
public void test() {
final Tuple<Operator, Operator> trainSource = fileOperation(TRAIN_PATH, true);
final Tuple<Operator, Operator> testSource = fileOperation(TEST_PATH, false);
Expand Down
Loading