From 649bbcb7977cbe04934de501c4fe3bb38e38afd4 Mon Sep 17 00:00:00 2001 From: Navin Singh Date: Tue, 5 Jul 2022 11:42:36 +0530 Subject: [PATCH 1/2] inmemorypipe accepts pandas df --- python/zingg/pipes/pipes.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/python/zingg/pipes/pipes.py b/python/zingg/pipes/pipes.py index e44dff141..e6b5a5649 100644 --- a/python/zingg/pipes/pipes.py +++ b/python/zingg/pipes/pipes.py @@ -48,9 +48,18 @@ def setDbTable(self, dbtable): Pipe.addProperty(self, "dbtable", dbtable) class InMemoryPipe(Pipe): - def __init__(self, name): + def __init__(self, name, df = None): Pipe.__init__(self, name, Format.INMEMORY.type()) - def setDataset(self, ds): - Pipe.getPipe(self).setDataset(ds) + if (df is not None): + self.setDataset(df) + def setDataset(self, df): + if (isinstance(df, pd.DataFrame)): + ds = spark.createDataFrame(df) + Pipe.getPipe(self).setDataset(ds._jdf) + elif (isinstance(df, DataFrame)): + Pipe.getPipe(self).setDataset(df._jdf) + else: + LOG.error(" setDataset(): NUll or Unsuported type: %s", type(df)) + def getDataset(self): return Pipe.getPipe(self).getDataset() From db0f6aad518536e8b3405dfd23e916844fe556fc Mon Sep 17 00:00:00 2001 From: Navin Singh Date: Tue, 5 Jul 2022 17:15:49 +0530 Subject: [PATCH 2/2] added line breaks between python functions --- python/zingg/pipes/pipes.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/python/zingg/pipes/pipes.py b/python/zingg/pipes/pipes.py index e6b5a5649..3cb38474e 100644 --- a/python/zingg/pipes/pipes.py +++ b/python/zingg/pipes/pipes.py @@ -11,8 +11,10 @@ class CsvPipe(Pipe): def __init__(self, name): Pipe.__init__(self, name, Format.CSV.type()) Pipe.addProperty(self, FilePipe.HEADER,"true") + def setDelimiter(self, delimiter): Pipe.addProperty(self, "delimiter", delimiter) + def setLocation(self, location): Pipe.addProperty(self, FilePipe.LOCATION, location) @@ -20,30 +22,41 @@ class BigQueryPipe(Pipe): def __init__(self,name): Pipe.__init__(self, name, "bigquery") Pipe.addProperty(self, "viewsEnabled", "true") + def setCredentialFile(self, file): Pipe.addProperty(self, "credentialsFile", file) + def setTable(self, table): Pipe.addProperty(self, "table", table) + def setTemporaryGcsBucket(self, bucket): Pipe.addProperty(self, "temporaryGcsBucket", bucket) + def setViewsEnabled(self, isEnabled): Pipe.addProperty(self, "viewsEnabled", isEnabled) class SnowflakePipe(Pipe): def __init__(self,name): Pipe.__init__(self, name, Format.SNOWFLAKE.type()) + def setURL(self, url): Pipe.addProperty(self, "sfUrl", url) + def setUser(self, user): Pipe.addProperty(self, "sfUser", user) + def setPassword(self, passwd): Pipe.addProperty(self, "sfPassword", passwd) + def setDatabase(self, db): Pipe.addProperty(self, "sfDatabase", db) + def setSFSchema(self, schema): Pipe.addProperty(self, "sfSchema", schema) + def setWarehouse(self, warehouse): Pipe.addProperty(self, "sfWarehouse", warehouse) + def setDbTable(self, dbtable): Pipe.addProperty(self, "dbtable", dbtable) @@ -52,6 +65,7 @@ def __init__(self, name, df = None): Pipe.__init__(self, name, Format.INMEMORY.type()) if (df is not None): self.setDataset(df) + def setDataset(self, df): if (isinstance(df, pd.DataFrame)): ds = spark.createDataFrame(df)