Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

An overwrite option for all sink types that write to HDFS #224

Open
hannesmiller opened this issue Jan 23, 2017 · 10 comments
Open

An overwrite option for all sink types that write to HDFS #224

hannesmiller opened this issue Jan 23, 2017 · 10 comments
Milestone

Comments

@hannesmiller
Copy link
Contributor

An overwrite option for all sink types that write to HDFS

Proposal

Sink.withOverwrite

AffectedSinks

  • Parquet
  • AvroParquet
  • Avro
  • Orc
  • Csv
@sksamuel sksamuel self-assigned this Jan 27, 2017
@sksamuel sksamuel added this to the 1.2 milestone Jan 27, 2017
sksamuel added a commit that referenced this issue Jan 27, 2017
@mycaule
Copy link

mycaule commented Jun 19, 2018

Hello,

Is it possible to append the content of the file ?
https://hadoop.apache.org/docs/r2.8.2/api/org/apache/hadoop/fs/FileSystem.html#append(org.apache.hadoop.fs.Path)

I tested withOverwrite(true) and withOverwrite(false) for ParquetSink and CsvSink sinks, it seems it always overwrite the file anyway.

In comparision, JdbcSink just insert rows in table without deleting previous ones.

@hannesmiller
Copy link
Contributor Author

@mycaule Thank you - I have reopened this issue and we will look into this.

@hannesmiller
Copy link
Contributor Author

@mycaule Unfortunately I cannot reproduce the problem - do you have a test case to reproduce the problem?

  • I have enhanced the CSVSinkTest and ParquetSinkTest to be a bit more thorough...
  it should "support overwrite" in {
    val path = new Path(s"target/${UUID.randomUUID().toString}", s"${UUID.randomUUID().toString}.pq")
    val schema = StructType(Field("a", StringType))
    val ds = DataStream.fromRows(
      schema,
      Seq(
        Row(schema, Vector("x")),
        Row(schema, Vector("y"))
      )
    )

    // Write twice to test overwrite
    ds.to(ParquetSink(path))
    ds.to(ParquetSink(path).withOverwrite(true))

    var parentStatus = fs.listStatus(path.getParent)
    println("Parquet Overwrite:")
    parentStatus.foreach(p => println(p.getPath))
    parentStatus.length shouldBe 1
    parentStatus.head.getPath.getName shouldBe path.getName

    // Write again without overwrite
    val appendPath = new Path(path.getParent, s"${UUID.randomUUID().toString}.pq")
    ds.to(ParquetSink(appendPath).withOverwrite(false))
    parentStatus = fs.listStatus(path.getParent)
    println("Parquet Append:")
    parentStatus.foreach(p => println(p.getPath))
    parentStatus.length shouldBe 2
  }
  • Snippet of the output
Parquet Overwrite:
file:/home/hannes/projects/eel-sdk/target/092fff6d-b6e9-40b1-b565-953911971376/f22c6618-e07b-4a56-9703-38d75ff26ba5.pq
Parquet Append:
file:/home/hannes/projects/eel-sdk/target/092fff6d-b6e9-40b1-b565-953911971376/f22c6618-e07b-4a56-9703-38d75ff26ba5.pq
file:/home/hannes/projects/eel-sdk/target/092fff6d-b6e9-40b1-b565-953911971376/1f0c810a-f788-46eb-b019-a17cceed1695.pq

@mycaule
Copy link

mycaule commented Jun 20, 2018

Hello my code looks like this. The file path is constant and I would like the file to be appended each time writeCSV is called

  implicit val conf = new Configuration()
  implicit val fs = FileSystem.get(new URI(s"hdfs://hdfsServer:8020"), conf)

 def writeCSV(params...) = {
    val path = s"hdfs://tmp/myfile.csv"

    val ds = ... // Create ds based on params
    val sink = CsvSink(new Path(path), false)
  // Second argument true or false give same result : the file is overwritten
    ds.to(sink)
  }

  // first call
  writeCSV(params1...)

  // second call
  writeCSV(params2...)

This file may need to have the Hadoop method fs.append called in my opinion.

https://github.com/51zero/eel-sdk/blob/master/eel-core/src/main/scala/io/eels/component/csv/CsvSink.scala#L34-L35

@mycaule
Copy link

mycaule commented Jun 20, 2018

I think the problem occurs when testing with HDFS with a particular configuration of the cluster.
https://stackoverflow.com/questions/22997137/append-data-to-existing-file-in-hdfs-java

I check my HDFS cluster configuration and it does support appending.

<property>
   <name>dfs.support.append</name>
   <value>true</value>
</property>

@hannesmiller
Copy link
Contributor Author

hannesmiller commented Jun 21, 2018

@mycaule
The CSV/Avro/Orc/Parquet sinks were not designed to append to existing files for a couple of reasons:

  1. For binary formats like Parquet and Orc it’s not possible to append even though HDFS may allow it - this is because for example for Parquet stats (averages, counts, distinct values) are written to the footer of the file upon closure - Orc has a similar concept

  2. The config option you mentioned is not typically enabled for most HDFS cluster installations I have worked on.

  3. This could in theory work for text based formats like CSV but there's hardly been any requests for this feature given the immutable nature of HDFS.

@hannesmiller
Copy link
Contributor Author

@mycaule In addition the hadoop FileSystem API has an append method as opposed to create - I tried this out by adding in an withAppend(true) method to the CsvSink, here's a snippet of my test:

      // Now write to the same file in append mode and test that we have double the amount of rows
      ds.to(CsvSink(path).withOverwrite(true))
      ds.to(CsvSink(path).withAppend(true))
      using(fs.open(path)) { inputStream =>
        using(new BufferedReader(new InputStreamReader(inputStream))) { reader =>
          val lines = reader.lines().toArray
          println(lines.mkString("\n"))
          lines.length shouldBe 4
        }
      }

Which unfortunately yields the following exception:

Not supported
java.io.IOException: Not supported
	at org.apache.hadoop.fs.ChecksumFileSystem.append(ChecksumFileSystem.java:357)
	at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1166)

The hadoop code says:

  @Override
  public FSDataOutputStream append(Path f, int bufferSize,
      Progressable progress) throws IOException {
    throw new IOException("Not supported");
  }

@hannesmiller
Copy link
Contributor Author

@mycaule I would like to close this issue if you have no objections?

@mycaule
Copy link

mycaule commented Jun 22, 2018

Ok, thanks for investigating anyway !

@mycaule
Copy link

mycaule commented Sep 5, 2018

Please close this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants