Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Part 10]: Task Executor Starter and other related classes #160

Merged
merged 4 commits into from
Apr 6, 2022

Conversation

sundargates
Copy link
Collaborator

TaskExecutorStarter is the starting point for the task executor from the worker side. Initializing this class and starting it as part of the runtime framework (guice/springboot) would start the task executor thread capable of running any task from the client side.

Context

This is the last missing piece to start the task executor from the runtime framework.

Checklist

  • ./gradlew build compiles code correctly
  • Added new tests where applicable
  • ./gradlew test passes all tests
  • Extended README or added javadocs where applicable
  • Added copyright headers for new files from CONTRIBUTING.md

TaskExecutorStarter is the starting point for the task executor from the worker side. Initializing this class and starting it as part of the runtime framework (guice/springboot) would start the task executor thread capable of running any task from the client side.
api "com.typesafe.akka:akka-http-jackson_$scalaBinaryVersion:$akkaHttpVersion"
api "com.typesafe.akka:akka-http-caching_$scalaBinaryVersion:$akkaHttpVersion"
api "com.typesafe.akka:akka-stream_$scalaBinaryVersion:$akkaVersion"
compile group: 'com.typesafe.akka', name: "akka-stream_$scalaBinaryVersion", version: "$akkaVersion"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this line the same as the one above?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right. let me clean this up.

});
}

public static org.apache.hadoop.fs.FileSystem create(URI fsUri) throws IOException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's quite confusing that this returns a hadoop filesystem. consider renaming this class to FileSystemUtils or something else.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return this;
}

public RpcSystem getRpcSystem() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you may want to make these getXXX() methods private.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good suggestion. done

}

public static RpcSystem load(Configuration configuration) {
return new MantisAkkaRpcSystemLoader().loadRpcSystem(configuration);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

umm, would it better to have this as a singleton?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it does. made the change.

return factory.create(fsUri);
} else {
throw new IllegalArgumentException(
String.format("Unknown schema", fsUri.getScheme().toString()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: pl add a %s here for the scheme.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for spotting this. done.

});
}

public static org.apache.hadoop.fs.FileSystem create(URI fsUri) throws IOException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@@ -17,34 +17,34 @@
apply plugin: 'application'

ext {
akkaVersion = '2.5.23'
akkaHttpVersion = '10.1.8'
akkaVersion = '2.6.15'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The newer ones are 2.6.18 and 10.2.9. Do you think we can advance to those ones instead (asking since I have those version upgrade in some of my code already).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the versions that the flink RPC library brings in. I want the versions to match that.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Let me use these ones on my change too then.

@sundargates sundargates self-assigned this Apr 5, 2022
@sundargates sundargates merged commit ecf0d4e into Netflix:master Apr 6, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants