Skip to content

Commit

Permalink
Issue #28: first pass at support for attachments
Browse files Browse the repository at this point in the history
  • Loading branch information
sriramkrishnan committed Sep 18, 2013
1 parent 4e8cd11 commit 78d8d4f
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@

package com.netflix.genie.client.sample;

import java.io.File;
import java.io.PrintWriter;

import javax.activation.DataHandler;
import javax.activation.FileDataSource;

import com.netflix.config.ConfigurationManager;
import com.netflix.genie.client.ExecutionServiceClient;
import com.netflix.genie.common.messages.JobStatusResponse;
import com.netflix.genie.common.model.FileAttachment;
import com.netflix.genie.common.model.JobInfoElement;
import com.netflix.genie.common.model.Types.Configuration;
import com.netflix.genie.common.model.Types.JobStatus;
Expand Down Expand Up @@ -78,7 +85,17 @@ public static void main(String[] args) throws Exception {
jobInfo.setDescription("This is a test");
jobInfo.setConfiguration(Configuration.TEST.name());
jobInfo.setSchedule(Schedule.ADHOC.name());
jobInfo.setCmdArgs("-e \"select count(*) from counters where dateint=20120430 and hour=10\"");
// send the query as an attachment
File query = File.createTempFile("hive", ".q");
PrintWriter pw = new PrintWriter(query);
pw.println("select count(*) from counters where dateint=20120430 and hour=10;");
pw.close();
FileAttachment[] attachments = new FileAttachment[1];
attachments[0] = new FileAttachment();
attachments[0].setName("hive.q");
attachments[0].setData(new DataHandler(new FileDataSource(query.getAbsolutePath())));
jobInfo.setAttachments(attachments);
jobInfo.setCmdArgs("-f hive.q");
jobInfo = client.submitJob(jobInfo);

String jobID = jobInfo.getJobID();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
*
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.netflix.genie.common.model;

import javax.activation.DataHandler;

/**
* Representation of a file attachment sent as part of the job request.
*
* @author skrishnan
*
*/
public class FileAttachment {

/**
* Name of the file.
*/
private String name;

/**
* The data for the attachment.
*/
private DataHandler data;

/**
* Get the name of the file for this attachment.
*
* @return name of file for this attachment
*/
public String getName() {
return name;
}

/**
* Set the name of the file for this attachment.
*
* @param name name of the file for this attachment
*/
public void setName(String name) {
this.name = name;
}

/**
* Get the data handler containing the data for the attachment.
*
* @return the data handler containing data for the attachment
*/
public DataHandler getData() {
return data;
}

/**
* Set the data handler for the attachment.
*
* @param data the data handler for the attachment.
*/
public void setData(DataHandler data) {
this.data = data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import javax.persistence.Id;
import javax.persistence.Lob;
import javax.persistence.Table;
import javax.persistence.Transient;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -117,6 +118,13 @@ public class JobInfoElement implements Serializable {
@Lob
private String fileDependencies;

/**
* Set of file dependencies, sent as MIME attachments.
* This is not persisted in the DB for space reasons.
*/
@Transient
private FileAttachment[] attachments;

/**
* Location of logs being archived to s3.
*/
Expand Down Expand Up @@ -582,6 +590,25 @@ public void setFileDependencies(String fileDependencies) {
this.fileDependencies = fileDependencies;
}


/**
* Get the set of attachments for this job.
*
* @return the set of attachments for this job
*/
public FileAttachment[] getAttachments() {
return attachments;
}

/**
* Set the attachments for this job.
*
* @param attachments the attachments for this job
*/
public void setAttachments(FileAttachment[] attachments) {
this.attachments = attachments;
}

/**
* Get location of pig on s3/hdfs to override default.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,23 @@
package com.netflix.genie.server.jobmanager.impl;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.HttpURLConnection;
import java.util.HashMap;
import java.util.Map;

import javax.activation.DataHandler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.config.ConfigurationManager;
import com.netflix.genie.common.exceptions.CloudServiceException;
import com.netflix.genie.common.messages.ClusterConfigResponse;
import com.netflix.genie.common.model.ClusterConfigElement;
import com.netflix.genie.common.model.FileAttachment;
import com.netflix.genie.common.model.JobInfoElement;
import com.netflix.genie.common.model.Types;
import com.netflix.genie.common.model.Types.JobStatus;
Expand Down Expand Up @@ -162,6 +166,24 @@ public void launch(JobInfoElement ji) throws CloudServiceException {
}
pb.directory(userJobDir);

// copy over the attachments if they exist
if ((ji.getAttachments() != null) && (ji.getAttachments().length > 0)) {
for (int i = 0; i < ji.getAttachments().length; i++) {
FileAttachment attachment = ji.getAttachments()[i];
try {
FileOutputStream output = new FileOutputStream(cWorkingDir + File.separator + attachment.getName());
DataHandler inputHandler = attachment.getData();
inputHandler.writeTo(output);
output.close();
} catch (Exception e) {
String msg = "Unable to copy attachment correctly: " + attachment.getName();
logger.error(msg);
throw new CloudServiceException(
HttpURLConnection.HTTP_INTERNAL_ERROR, msg);
}
}
}

// set environment variables for the process
Map<String, String> penv = pb.environment();
penv.putAll(env);
Expand Down

0 comments on commit 78d8d4f

Please sign in to comment.