Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for DataDrivenDBInputFormat & Add a JDBCScheme builder. #44

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 138 additions & 1 deletion src/jvm/com/twitter/maple/jdbc/JDBCScheme.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
import cascading.util.Util;
import com.twitter.maple.jdbc.db.DBInputFormat;
import com.twitter.maple.jdbc.db.DBOutputFormat;
import com.twitter.maple.jdbc.db.DataDrivenDBInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -46,6 +49,8 @@
*/
public class JDBCScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>
{
private static final Logger LOG = LoggerFactory.getLogger(JDBCScheme.class);

private Class<? extends DBInputFormat> inputFormatClass;
private Class<? extends DBOutputFormat> outputFormatClass;
private String[] columns;
Expand All @@ -60,6 +65,7 @@ public class JDBCScheme extends Scheme<JobConf, RecordReader, OutputCollector, O
private String countQuery;
private long limit = -1;
private Boolean tableAlias = true;
private Boolean dataDrivenSplits = false;

/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
Expand Down Expand Up @@ -113,6 +119,26 @@ public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? exte
* @param tableAlias of type Boolean
*/
public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, Fields columnFields, String[] columns, String[] orderBy, String conditions, long limit, Fields updateByFields, String[] updateBy, Boolean tableAlias)
{
this(inputFormatClass, outputFormatClass, columnFields, columns, orderBy, conditions, limit, updateByFields, updateBy, tableAlias, false);
}

/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param inputFormatClass of type Class<? extends DBInputFormat>
* @param outputFormatClass of type Class<? extends DBOutputFormat>
* @param columnFields of type Fields
* @param columns of type String[]
* @param orderBy of type String[]
* @param conditions of type String
* @param limit of type long
* @param updateByFields of type Fields
* @param updateBy of type String[]
* @param tableAlias of type Boolean
* @param dataDrivenSplits of type Boolean
*/
public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, Fields columnFields, String[] columns, String[] orderBy, String conditions, long limit, Fields updateByFields, String[] updateBy, Boolean tableAlias, Boolean dataDrivenSplits)
{
this.columnFields = columnFields;

Expand Down Expand Up @@ -141,6 +167,7 @@ public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? exte
this.conditions = conditions;
this.limit = limit;
this.tableAlias = tableAlias;
this.dataDrivenSplits = dataDrivenSplits;

this.inputFormatClass = inputFormatClass;
this.outputFormatClass = outputFormatClass;
Expand Down Expand Up @@ -550,6 +577,109 @@ public JDBCScheme( Fields columnFields, String[] columns, String selectQuery, St
this( null, columnFields, columns, selectQuery, countQuery, -1 );
}

/**
* Builder for JDBCScheme instances.
*/
public static class Builder {
private Class<? extends DBInputFormat> inputFormatClass = null;
private Class<? extends DBOutputFormat> outputFormatClass = null;
private String[] columns;
private String[] orderBy = null;
private String conditions = null;
private String[] updateBy = null;
private Fields updateByFields = null;
private Fields columnFields = null;
private Boolean customQuery = false;
private String selectQuery;
private String countQuery;
private long limit = -1;
private Boolean tableAlias = true;
private Boolean dataDrivenSplits = false;

public Builder setInputFormatClass(Class<? extends DBInputFormat> inputFormatClass) {
this.inputFormatClass = inputFormatClass;
return this;
}

public Builder setOutputFormatClass(Class<? extends DBOutputFormat> outputFormatClass) {
this.outputFormatClass = outputFormatClass;
return this;
}

public Builder setColumns(String[] columns) {
this.columns = columns;
return this;
}

public Builder setOrderBy(String[] orderBy) {
this.orderBy = orderBy;
return this;
}

public Builder setConditions(String conditions) {
this.conditions = conditions;
return this;
}

public Builder setUpdateBy(String[] updateBy) {
this.updateBy = updateBy;
return this;
}

public Builder setUpdateByFields(Fields updateByFields) {
this.updateByFields = updateByFields;
return this;
}

public Builder setColumnFields(Fields columnFields) {
this.columnFields = columnFields;
return this;
}

public Builder setCustomQuery(String selectQuery, String countQuery) {
this.customQuery = true;
this.selectQuery = selectQuery.trim().replaceAll(";$", "");
this.countQuery = countQuery.trim().replaceAll(";$", "");
return this;
}

public Builder setLimit(long limit) {
this.limit = limit;
return this;
}

public Builder setTableAlias(Boolean tableAlias) {
this.tableAlias = tableAlias;
return this;
}

public Builder setDataDrivenSplits(Boolean dataDrivenSplits) {
this.dataDrivenSplits = dataDrivenSplits;
return this;
}

/**
* Constructs a new instance using the values set in the Builder.
* @return a new JDBCScheme instance using values set in the Builder.
*/
public JDBCScheme build() {
if(columnFields == null && columns != null) {
columnFields = new Fields(columns);
}

if(updateByFields == null && updateBy != null) {
updateByFields = new Fields(updateBy);
}

if(customQuery) {
LOG.warn("A custom query is set. The following fields will not be used: outputFormatClass, orderBy, conditions, updateByFields, updateBy, dataDrivenSplits");
return new JDBCScheme(inputFormatClass, columnFields, columns, selectQuery, countQuery, limit, tableAlias);
} else {
return new JDBCScheme(inputFormatClass, outputFormatClass, columnFields, columns, orderBy, conditions, limit, updateByFields, updateBy, tableAlias, dataDrivenSplits);
}
}
}

/**
* Method getColumns returns the columns of this JDBCScheme object.
*
Expand Down Expand Up @@ -578,7 +708,14 @@ public void sourceConfInit( FlowProcess<JobConf> process, Tap<JobConf, RecordRea
else {
String tableName = ( (JDBCTap) tap ).getTableName();
String joinedOrderBy = orderBy != null ? Util.join( orderBy, ", " ) : null;
DBInputFormat.setInput( conf, TupleRecord.class, tableName, conditions, joinedOrderBy, limit, concurrentReads, tableAlias, columns );
if(dataDrivenSplits) {
if(orderBy == null || orderBy.length != 1) {
throw new IllegalArgumentException("OrderBy must be defined to split on when using dataDrivenSplits. Only one field is allowed");
}
DataDrivenDBInputFormat.setInput( conf, TupleRecord.class, tableName, conditions, joinedOrderBy, limit, concurrentReads, tableAlias, columns );
} else {
DBInputFormat.setInput( conf, TupleRecord.class, tableName, conditions, joinedOrderBy, limit, concurrentReads, tableAlias, columns );
}
}

if( inputFormatClass != null )
Expand Down
157 changes: 157 additions & 0 deletions src/jvm/com/twitter/maple/jdbc/db/BigDecimalSplitter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright (c) 2009 Concurrent, Inc.
*
* This work has been released into the public domain
* by the copyright holder. This applies worldwide.
*
* In case this is not legally possible:
* The copyright holder grants any entity the right
* to use this work for any purpose, without any
* conditions, unless such conditions are required by law.
*/
/**
* Copyright 2012 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.maple.jdbc.db;

import java.math.BigDecimal;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.InputSplit;

/**
* Implement DBSplitter over BigDecimal values.
*/
public class BigDecimalSplitter implements DBSplitter {
private static final Log LOG = LogFactory.getLog(BigDecimalSplitter.class);

public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
throws SQLException {

BigDecimal minVal = results.getBigDecimal(1);
BigDecimal maxVal = results.getBigDecimal(2);

String lowClausePrefix = colName + " >= ";
String highClausePrefix = colName + " < ";

BigDecimal numSplits = new BigDecimal(conf.getInt(DBConfiguration.CONCURRENT_READS_PROPERTY, 1));

if (minVal == null && maxVal == null) {
// Range is null to null. Return a null split accordingly.
List<InputSplit> splits = new ArrayList<InputSplit>();
splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
colName + " IS NULL", colName + " IS NULL"));
return splits;
}

if (minVal == null || maxVal == null) {
// Don't know what is a reasonable min/max value for interpolation. Fail.
LOG.error("Cannot find a range for NUMERIC or DECIMAL fields with one end NULL.");
return null;
}

// Get all the split points together.
List<BigDecimal> splitPoints = split(numSplits, minVal, maxVal);
List<InputSplit> splits = new ArrayList<InputSplit>();

// Turn the split points into a set of intervals.
BigDecimal start = splitPoints.get(0);
for (int i = 1; i < splitPoints.size(); i++) {
BigDecimal end = splitPoints.get(i);

if (i == splitPoints.size() - 1) {
// This is the last one; use a closed interval.
splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
lowClausePrefix + start.toString(),
colName + " <= " + end.toString()));
} else {
// Normal open-interval case.
splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
lowClausePrefix + start.toString(),
highClausePrefix + end.toString()));
}

start = end;
}

return splits;
}

private static final BigDecimal MIN_INCREMENT = new BigDecimal(10000 * Double.MIN_VALUE);

/**
* Divide numerator by denominator. If impossible in exact mode, use rounding.
*/
protected BigDecimal tryDivide(BigDecimal numerator, BigDecimal denominator) {
try {
return numerator.divide(denominator);
} catch (ArithmeticException ae) {
return numerator.divide(denominator, BigDecimal.ROUND_HALF_UP);
}
}

/**
* Returns a list of BigDecimals one element longer than the list of input splits.
* This represents the boundaries between input splits.
* All splits are open on the top end, except the last one.
*
* So the list [0, 5, 8, 12, 18] would represent splits capturing the intervals:
*
* [0, 5)
* [5, 8)
* [8, 12)
* [12, 18] note the closed interval for the last split.
*/
List<BigDecimal> split(BigDecimal numSplits, BigDecimal minVal, BigDecimal maxVal)
throws SQLException {

List<BigDecimal> splits = new ArrayList<BigDecimal>();

// Use numSplits as a hint. May need an extra task if the size doesn't
// divide cleanly.

BigDecimal splitSize = tryDivide(maxVal.subtract(minVal), (numSplits));
if (splitSize.compareTo(MIN_INCREMENT) < 0) {
splitSize = MIN_INCREMENT;
LOG.warn("Set BigDecimal splitSize to MIN_INCREMENT");
}

BigDecimal curVal = minVal;

while (curVal.compareTo(maxVal) <= 0) {
splits.add(curVal);
curVal = curVal.add(splitSize);
}

if (splits.get(splits.size() - 1).compareTo(maxVal) != 0 || splits.size() == 1) {
// We didn't end on the maxVal. Add that to the end of the list.
splits.add(maxVal);
}

return splits;
}
}
Loading