Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline Parallel Ingest (CSVFileScanFragment operator) #536

Open
jortiz16 opened this issue Aug 22, 2016 · 8 comments
Open

Pipeline Parallel Ingest (CSVFileScanFragment operator) #536

jortiz16 opened this issue Aug 22, 2016 · 8 comments

Comments

@jortiz16
Copy link
Contributor

Currently, parallel ingest is only accessible through a REST call to the coordinator. The coordinator builds a query plan and sends a CSVFileScanFragment with the appropriate byte ranges to each worker.

We somehow need to introduce parallel ingest to MyriaL. The tricky part will be to figure out how to pipeline this operator. Should Raco be the one figuring out how to split the byte ranges? I think we discussed this before and we preferred not to introduce any AWS/S3 API to Raco.

@jortiz16
Copy link
Contributor Author

tagging @senderista

@senderista
Copy link
Contributor

I thought an alternative we discussed in the past was to have Raco encode a pseudo-operator run at the coordinator which would be initialized only with the S3 URL and would dynamically create and dispatch the local operators (i.e., CSVFileScanFragment initialized with the appropriate ranges)? Are there other examples of operators which create an operator tree at runtime?

adding @jingjingwang

@jortiz16
Copy link
Contributor Author

I vaguely remember talking about this. I do remember that we agreed to give Raco only the S3 URL, but I'm not familiar with how it can create the plan from there or whether there is another existing operator that does something like this.

@jortiz16
Copy link
Contributor Author

So would initializing the operator at runtime within MyriaX be the way to go? I ran into a situation where I need to use this feature, so I'm trying to tackle it at the moment.

So here was my thinking...

  1. implement a fake operator on the Raco side
  2. on the MyriaX side, implement the corresponding encoding for the fake operator
  3. when initializing the operator, we need to make sure it is initialized with the correct byte ranges. This almost seems like it needs to be worker independent. The problem here is that when we initialize the operator, it does not know any information about the worker at this point.

We could make it so the fake operator encoding has information about the worker id (Raco could probably take care of this?). That way, when we initialize the operator, we have the worker id from the encoding. We can then create a new CSVFileScanFragment constructor that can take the worker id parameter and initialize the constructor with the correct byte ranges.

Not sure if this would be the best approach.

@senderista
Copy link
Contributor

FWIW I'm doing something vaguely similar to this right now in this PR: uwescience/myria#858. One question is whether we can assume that all worker IDs are contiguous. If so, then each worker operator instance could initialize its own byte range (taking the minimum partition size into account) after it determines the object size, but we probably can't assume this if we want to be fault-tolerant.

I think we can dispense with the assumption of contiguous worker IDs as long as we ensure that all workers share the same live-workers view. We should be able to do this in OperatorEncoding.construct() by querying the server and initializing a private serializable field (see the above PR for an example). With that information, the minimum partition size, and the object's size in bytes, the workers should be able to determine which byte range they're responsible for without coordination.

(FYI, an Operator can get its worker ID with ((WorkerSubQuery) getLocalSubQuery()).getWorker().getID().)

@jortiz16
Copy link
Contributor Author

This sounds good, thanks! I'll try it

@jortiz16
Copy link
Contributor Author

Sorry, I think I'm still missing something. So I'm adding this logic under the CSVFileScanFragmentEncoding.construct() method, but I don't see how we can call getLocalSubQuery since this requires the operator object. Basically from the operator encoding, is there a way to get the operator object itself?

@senderista
Copy link
Contributor

What I meant was to pass the live-workers view to the Operator constructor by calling args.getServer().getAliveWorkers() from within OperatorEncoding.construct(), then calling getLocalSubQuery() from Operator.init() or elsewhere. The Operator instance returned from OperatorEncoding.construct() is only used for serialization (so any field you initialize in the constructor called from construct() must be serializable), while Operator.init() is only called on the "real" Operator instance when it is deserialized on the worker where it runs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants