Skip to content

[WIP]: Spark 27463: Cogrouped Pandas Udf POC#1

Closed
d80tb7 wants to merge 6 commits intomasterfrom
SPARK-27463-poc
Closed

[WIP]: Spark 27463: Cogrouped Pandas Udf POC#1
d80tb7 wants to merge 6 commits intomasterfrom
SPARK-27463-poc

Conversation

@d80tb7
Copy link
Owner

@d80tb7 d80tb7 commented Jun 21, 2019

  • This includes:

  • JVM serialisation for interleaved dataframes.

  • Python Deserialisation for interleaved dataframes

  • A skeleton cogroup implementation

As this is a Poc there's a couple of caveats:

  • This code is very rough!
  • The data passing is pretty minimal (e.g. it only supports exactly two dataframes, there's no ability distinguish on the python side between key and value columns etc)
  • The cogroup implementation I have doesn't actually work properly (the column pruning is removing all the non key columns of the right dataframe so the rhs passed to pandas is missing the value cols).

At this point I think I'd like to focus on:

  • Does the Data passing mechanaism (i.e. the deviation from arrow streaming) make sense.
  • If we are going to introduce such a data passing mechanism how complex should it be?
  • Does the high level implementation of the cogroup here make sense.



abstract class BaseArrowPythonRunner[T](
funcs: Seq[ChainedPythonFunctions],
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just some common stuff that I needed for both the new Data passing mechanism and the existing (Arrow Streaming mechanism). I've broken it out her mainly because made it easier for me to track what new functionality I'd actually added. I don't think a proper solution would really have this class hierarchy.

import org.apache.arrow.vector.ipc.message.{ArrowRecordBatch, MessageSerializer}


class InterleavedArrowWriter( leftRoot: VectorSchemaRoot,
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is analagous to org.apache.arrow.vector.ipc.ArrowWriter but allows for interleaved dataframes to be sent. I suspect it could all be more memory efficient if we had a different interface which allowed for left batch to be sent before right batch is loaded.

@@ -0,0 +1,33 @@
/*
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore this!


def __init__(self, stream):
import pyarrow as pa
self._schema1 = pa.read_schema(stream)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to read these also using the message reader but for some reason pa.read_schema(self_reader.read_next_message()) didn't work.

@icexelloss
Copy link

@d80tb7 This is a great start! I think this could be summited against apache/spark master because:
(1) The code looks reasonable
(2) More people will watch that
(3) It shows progress

WDYT? If you are going to do that then I will wait for that one and comment

@d80tb7
Copy link
Owner Author

d80tb7 commented Jun 25, 2019

@icexelloss fair enough I've raised against apache/spark master here: apache#24965

@d80tb7 d80tb7 closed this Jun 25, 2019
d80tb7 pushed a commit that referenced this pull request Jul 23, 2019
…comparison assertions

## What changes were proposed in this pull request?

This PR removes a few hardware-dependent assertions which can cause a failure in `aarch64`.

**x86_64**
```
rootdonotdel-openlab-allinone-l00242678:/home/ubuntu# uname -a
Linux donotdel-openlab-allinone-l00242678 4.4.0-154-generic apache#181-Ubuntu SMP Tue Jun 25 05:29:03 UTC
2019 x86_64 x86_64 x86_64 GNU/Linux

scala> import java.lang.Float.floatToRawIntBits
import java.lang.Float.floatToRawIntBits
scala> floatToRawIntBits(0.0f/0.0f)
res0: Int = -4194304
scala> floatToRawIntBits(Float.NaN)
res1: Int = 2143289344
```

**aarch64**
```
[rootarm-huangtianhua spark]# uname -a
Linux arm-huangtianhua 4.14.0-49.el7a.aarch64 #1 SMP Tue Apr 10 17:22:26 UTC 2018 aarch64 aarch64 aarch64 GNU/Linux

scala> import java.lang.Float.floatToRawIntBits
import java.lang.Float.floatToRawIntBits
scala> floatToRawIntBits(0.0f/0.0f)
res1: Int = 2143289344
scala> floatToRawIntBits(Float.NaN)
res2: Int = 2143289344
```

## How was this patch tested?

Pass the Jenkins (This removes the test coverage).

Closes apache#25186 from huangtianhua/special-test-case-for-aarch64.

Authored-by: huangtianhua <huangtianhua@huawei.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
d80tb7 pushed a commit that referenced this pull request Oct 30, 2019
### What changes were proposed in this pull request?
`org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite` failed lately. After had a look at the logs it just shows the following fact without any details:
```
Caused by: sbt.ForkMain$ForkError: sun.security.krb5.KrbException: Server not found in Kerberos database (7) - Server not found in Kerberos database
```
Since the issue is intermittent and not able to reproduce it we should add more debug information and wait for reproduction with the extended logs.

### Why are the changes needed?
Failing test doesn't give enough debug information.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
I've started the test manually and checked that such additional debug messages show up:
```
>>> KrbApReq: APOptions are 00000000 00000000 00000000 00000000
>>> EType: sun.security.krb5.internal.crypto.Aes128CtsHmacSha1EType
Looking for keys for: kafka/localhostEXAMPLE.COM
Added key: 17version: 0
Added key: 23version: 0
Added key: 16version: 0
Found unsupported keytype (3) for kafka/localhostEXAMPLE.COM
>>> EType: sun.security.krb5.internal.crypto.Aes128CtsHmacSha1EType
Using builtin default etypes for permitted_enctypes
default etypes for permitted_enctypes: 17 16 23.
>>> EType: sun.security.krb5.internal.crypto.Aes128CtsHmacSha1EType
MemoryCache: add 1571936500/174770/16C565221B70AAB2BEFE31A83D13A2F4/client/localhostEXAMPLE.COM to client/localhostEXAMPLE.COM|kafka/localhostEXAMPLE.COM
MemoryCache: Existing AuthList:
apache#3: 1571936493/200803/8CD70D280B0862C5DA1FF901ECAD39FE/client/localhostEXAMPLE.COM
#2: 1571936499/985009/BAD33290D079DD4E3579A8686EC326B7/client/localhostEXAMPLE.COM
#1: 1571936499/995208/B76B9D78A9BE283AC78340157107FD40/client/localhostEXAMPLE.COM
```

Closes apache#26252 from gaborgsomogyi/SPARK-29580.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants