Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ private synchronized void releaseIntern() {

//cache status maintenance (pass cacheNoWrite flag)
release(_isAcquireFromEmpty && !_requiresLocalWrite);

if( isCachingActive() //only if caching is enabled (otherwise keep everything in mem)
&& isCached(true) //not empty and not read/modify
&& !isBelowCachingThreshold() ) //min size for caching
Expand Down Expand Up @@ -876,37 +876,39 @@ public synchronized void exportData (String fName, String outputFormat, int repl
boolean eqFormat = isEqualOutputFormat(outputFormat);
boolean eqBlksize = (getBlocksize() != blen)
&& (outputFormat == null || outputFormat.equals("binary"));

//actual export (note: no direct transfer of local copy in order to ensure blocking (and hence, parallelism))
if( isDirty() || !eqScheme || isFederated() ||
(pWrite && (!eqFormat | !eqBlksize)) )
{

// CASE 1: dirty in-mem matrix or pWrite w/ different format (write matrix to fname; load into memory if evicted)
// a) get the matrix
boolean federatedWrite = (outputFormat != null ) && outputFormat.contains("federated");

if( isEmpty(true) && !federatedWrite)
{
//read data from HDFS if required (never read before), this applies only to pWrite w/ different output formats
//note: for large rdd outputs, we compile dedicated writespinstructions (no need to handle this here)
try {
if( getRDDHandle()==null || getRDDHandle().allowsShortCircuitRead() )
_data = readBlobFromHDFS( _hdfsFileName );
else if( getRDDHandle() != null )
_data = readBlobFromRDD( getRDDHandle(), new MutableBoolean() );
else if(!federatedWrite)
_data = readBlobFromFederated( getFedMapping() );
setDirty(false);
refreshMetaData(); //e.g., after unknown csv read
}
catch (IOException e) {
throw new DMLRuntimeException("Reading of " + _hdfsFileName + " ("+hashCode()+") failed.", e);
}
}
//get object from cache
if(!federatedWrite) {
if( _data == null )
getCache();

if(_data == null && isEmpty(false)){
// read data from HDFS if required (never read before),
// this applies only to pWrite w/ different output formats
// note: for large rdd outputs, we compile dedicated writespinstructions (no need to handle this here)
try {
if( getRDDHandle()==null || getRDDHandle().allowsShortCircuitRead() )
_data = readBlobFromHDFS( _hdfsFileName );
else if( getRDDHandle() != null )
_data = readBlobFromRDD( getRDDHandle(), new MutableBoolean() );
else if(!federatedWrite) // pull back from federated site.
_data = readBlobFromFederated( getFedMapping() );
setDirty(false);
refreshMetaData(); //e.g., after unknown csv read
}
catch (IOException e) {
throw new DMLRuntimeException("Reading of " + _hdfsFileName + " ("+hashCode()+") failed.", e);
}
}

acquire( false, _data==null ); //incl. read matrix if evicted
}

Expand All @@ -927,7 +929,7 @@ else if(!federatedWrite)
}
else if( pWrite ) // pwrite with same output format
{
//CASE 2: matrix already in same format but different file on hdfs (copy matrix to fname)
//CASE 2: matrix already in same format but different file on hdfs (copy to fname)
try
{
HDFSTool.deleteFileIfExistOnHDFS(fName);
Expand Down
Loading