Skip to content

Commit

Permalink
Release 0.6.1
Browse files Browse the repository at this point in the history
BigQuery:
- added the MetaFields field to webresource table for source-specific metadata fields accessible via SAFE_OFFSET(0-base index).

Java code:
- Added propagation of MetaFields field from sources all the way to the BigQuery dataset
- added the solutions package and as the first solution added the FileIndexerPipeline that takes a CSV file, indexes it, and writes a CSV file
- added dependency on Commons CSV package that helps with CSV processing
  • Loading branch information
datancoffee committed Dec 13, 2017
1 parent dd1a69d commit 3bd2344
Show file tree
Hide file tree
Showing 9 changed files with 856 additions and 76 deletions.
6 changes: 6 additions & 0 deletions bigquery/webresourceSchema.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,11 @@
"name": "ParentWebResourceHash",
"type": "STRING",
"description" : "(Optional) In threaded conversations, the parent is the previous comment, email, or document"
},
{
"mode": "REPEATED",
"name": "MetaFields",
"type": "STRING",
"description" : "(Optional) Source-specific metadata fields accessible via SAFE_OFFSET(0-base index). Review docs for source-specific index mappings."
}
]
9 changes: 7 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<groupId>com.google.cloud.dataflow</groupId>
<artifactId>examples-opinionanalysis</artifactId>
<version>0.5.1</version>
<version>0.6.1</version>

<packaging>jar</packaging>

Expand Down Expand Up @@ -360,7 +360,7 @@
<dependency>
<groupId>sirocco.sirocco-sa</groupId>
<artifactId>sirocco-sa</artifactId>
<version>1.0.6</version>
<version>1.0.7</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
Expand Down Expand Up @@ -533,6 +533,11 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.5</version>
</dependency>

<!-- compile dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,13 @@
* limitations under the License.
*******************************************************************************/
package com.google.cloud.dataflow.examples.opinionanalysis;

import java.io.IOException;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

import org.apache.avro.reflect.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
Expand Down Expand Up @@ -220,7 +216,9 @@ public static Pipeline createIndexerPipeline(IndexerPipelineOptions options) thr
}

//PHASE: Enrich with CloudNLP entities
filteredIndexes = enrichWithCNLP(filteredIndexes);

if (options.getRatioEnrichWithCNLP() > 0)
filteredIndexes = enrichWithCNLP(filteredIndexes, options.getRatioEnrichWithCNLP());

// PHASE: Write to BigQuery
// For the Indexes that are unique ("filteredIndexes"), create records in webresource, document, and sentiment.
Expand All @@ -240,10 +238,10 @@ public static Pipeline createIndexerPipeline(IndexerPipelineOptions options) thr
* @return
*/
private static PCollection<ContentIndexSummary> enrichWithCNLP(
PCollection<ContentIndexSummary> filteredIndexes) {
PCollection<ContentIndexSummary> filteredIndexes, Float ratio) {

PCollectionTuple splitAB = filteredIndexes
.apply(ParDo.of(new SplitAB(0.01F))
.apply(ParDo.of(new SplitAB(ratio))
.withOutputTags(PipelineTags.BranchA,
TupleTagList.of(PipelineTags.BranchB)));

Expand Down Expand Up @@ -276,25 +274,31 @@ private static PCollection<ContentIndexSummary> indexDocuments(
.apply(ParDo.of(new IndexDocument())
.withOutputTags(PipelineTags.successfullyIndexed, // main output
TupleTagList.of(PipelineTags.unsuccessfullyIndexed))); // side output

PCollection<ContentIndexSummary> indexes = alldocuments
.get(PipelineTags.successfullyIndexed)
.setCoder(AvroCoder.of(ContentIndexSummary.class));

PCollection<InputContent> unprocessedDocuments = alldocuments
.get(PipelineTags.unsuccessfullyIndexed);

BigtableOptions.Builder optionsBuilder =
new BigtableOptions.Builder()
.setProjectId(options.getProject())
.setInstanceId(options.getBigtableIndexerAdminDB());
BigtableOptions bigtableOptions = optionsBuilder.build();

unprocessedDocuments
.apply(ParDo.of(new CreateDeadLetterEntries()))
.apply("Write to Dead Letter table in Bigtable", BigtableIO.write()
.withBigtableOptions(bigtableOptions)
.withTableId(IndexerPipelineUtils.DEAD_LETTER_TABLE));
.get(PipelineTags.successfullyIndexed)
.setCoder(AvroCoder.of(ContentIndexSummary.class));

// if the Bigtable admin DB is set, write into dead letter table
if (options.getBigtableIndexerAdminDB() != null) {


PCollection<InputContent> unprocessedDocuments = alldocuments
.get(PipelineTags.unsuccessfullyIndexed);

BigtableOptions.Builder optionsBuilder =
new BigtableOptions.Builder()
.setProjectId(options.getProject())
.setInstanceId(options.getBigtableIndexerAdminDB());
BigtableOptions bigtableOptions = optionsBuilder.build();

unprocessedDocuments
.apply(ParDo.of(new CreateDeadLetterEntries()))
.apply("Write to Dead Letter table in Bigtable", BigtableIO.write()
.withBigtableOptions(bigtableOptions)
.withTableId(IndexerPipelineUtils.DEAD_LETTER_TABLE));
}

return indexes;
}

Expand Down Expand Up @@ -482,8 +486,11 @@ else if (options.getRedditCommentsTableName() != null)
PCollection<KV<String, TableRow>> postInfo = posts.apply(ParDo.of(new ExtractPostDataFn()));
PCollection<KV<String, TableRow>> commentInfo = comments.apply(ParDo.of(new ExtractCommentInfoFn()));

PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple.of(postInfoTag, postInfo)
.and(commentInfoTag, commentInfo).apply(CoGroupByKey.<String>create());
PCollection<KV<String, CoGbkResult>> kvpCollection =
KeyedPCollectionTuple
.of(postInfoTag, postInfo)
.and(commentInfoTag, commentInfo)
.apply(CoGroupByKey.<String>create());

// Process the CoGbkResult elements generated by the CoGroupByKey
// transform.
Expand Down Expand Up @@ -512,19 +519,20 @@ public void processElement(ProcessContext c) {
Long postPubTime = IndexerPipelineUtils.extractRedditTime(post.get("created_utc").toString());

/*
* sso 7/10/2017: Changing postUrl to the "url" field, which will contain the external
* url or the article being discussed
* sso 11/20/2017: Create two webresource records per post record
* The first WR record will have the external post URL,
* the second one will have the reddit post URL
*/
//String postUrl = buildRedditPostUrl(postPermalink);
String postUrl = post.get("url").toString();
String postUrl = IndexerPipelineUtils.buildRedditPostUrl(postPermalink);
String[] postMetaFields = IndexerPipelineUtils.extractRedditPostMetaFields(post);

// Create the first InputContent for the post item itself
InputContent icPost = new InputContent(/* url */ postUrl, /* pubTime */ postPubTime,
/* title */ post.get("title").toString(), /* author */ post.get("author").toString(),
/* language */ null, /* text */ post.get("selftext").toString(),
/* documentCollectionId */ IndexerPipelineUtils.DOC_COL_ID_REDDIT_FH_BIGQUERY, /* collectionItemId */ postId,
/* skipIndexing */ 0, /* parentUrl */ null, // the post record will become the beginning of the thread
/* parentPubTime */ null);
/* parentPubTime */ null, /* metaFields */ postMetaFields);

postAndCommentList.add(icPost);

Expand All @@ -549,14 +557,17 @@ public void processElement(ProcessContext c) {
String parentId = comment.get("parent_id").toString();
String parentUrl = (parentId.startsWith("t1_"))
? IndexerPipelineUtils.buildRedditCommentUrl(postPermalink, comment.get("id").toString()) : postUrl;


String[] commentMetaFields = IndexerPipelineUtils.extractRedditCommentMetaFields(comment);

InputContent icComment = new InputContent(/* url */ commentUrl,
/* pubTime */ commentPubTime, /* title */ null,
/* author */ comment.get("author").toString(), /* language */ null,
/* text */ comment.get("body").toString(),
/* documentCollectionId */ IndexerPipelineUtils.DOC_COL_ID_REDDIT_FH_BIGQUERY,
/* collectionItemId */ commentId, /* skipIndexing */ 0, /* parentUrl */ parentUrl,
/* parentPubTime */ null // don't set time yet, because we might not have read that record yet
/* pubTime */ commentPubTime, /* title */ null,
/* author */ comment.get("author").toString(), /* language */ null,
/* text */ comment.get("body").toString(),
/* documentCollectionId */ IndexerPipelineUtils.DOC_COL_ID_REDDIT_FH_BIGQUERY,
/* collectionItemId */ commentId, /* skipIndexing */ 0, /* parentUrl */ parentUrl,
/* parentPubTime */ null, // don't set time yet, because we might not have read that record yet
/* metaFields */ commentMetaFields
);

pubTimes.put(commentUrl, commentPubTime); // save the pub time of the current comment
Expand Down Expand Up @@ -593,35 +604,45 @@ private static void writeAllTablesToBigQuery(PCollectionTuple bqrows,

// Now write to BigQuery
WriteDisposition dispo = options.getWriteTruncate() ?
WriteDisposition.WRITE_TRUNCATE: WriteDisposition.WRITE_APPEND;
WriteDisposition.WRITE_TRUNCATE: WriteDisposition.WRITE_APPEND;

//Merge all collections with WebResource table records
PCollectionList<TableRow> webresourceRowsList = (webresourceDeduped == null) ?
PCollectionList.of(webresourceRows).and(webresourceRowsUnindexed) :
PCollectionList.of(webresourceRows).and(webresourceRowsUnindexed).and(webresourceDeduped);
PCollectionList.of(webresourceRows).and(webresourceRowsUnindexed) :
PCollectionList.of(webresourceRows).and(webresourceRowsUnindexed).and(webresourceDeduped);

PCollection<TableRow> allWebresourceRows = webresourceRowsList.apply(Flatten.<TableRow>pCollections());
PCollection<TableRow> allWebresourceRows =
webresourceRowsList.apply(Flatten.<TableRow>pCollections());

allWebresourceRows = !options.isStreaming() ?
allWebresourceRows.apply("Reshuffle Webresources", new Reshuffle<TableRow>()) :
allWebresourceRows;

allWebresourceRows
.apply("Reshuffle Webresources", new Reshuffle<TableRow>())
.apply("Write to webresource",
BigQueryIO.writeTableRows()
.to(getWebResourcePartitionedTableRef(options))
.withSchema(getWebResourceSchema())
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(dispo));

documentRows = !options.isStreaming() ?
documentRows.apply("Reshuffle Documents", new Reshuffle<TableRow>()):
documentRows;

documentRows
.apply("Reshuffle Documents", new Reshuffle<TableRow>())
.apply("Write to document",
BigQueryIO.writeTableRows()
.to(getDocumentPartitionedTableRef(options))
.withSchema(getDocumentTableSchema())
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(dispo));

sentimentRows = !options.isStreaming() ?
sentimentRows.apply("Reshuffle Sentiments", new Reshuffle<TableRow>()):
sentimentRows;

sentimentRows
.apply("Reshuffle Sentiments", new Reshuffle<TableRow>())
.apply("Write to sentiment",
BigQueryIO.writeTableRows()
.to(getSentimentPartitionedTableRef(options))
Expand Down Expand Up @@ -651,6 +672,7 @@ private static TableSchema getWebResourceSchema() {
fields.add(new TableFieldSchema().setName("Domain").setType("STRING"));
fields.add(new TableFieldSchema().setName("Author").setType("STRING"));
fields.add(new TableFieldSchema().setName("ParentWebResourceHash").setType("STRING"));
fields.add(new TableFieldSchema().setName("MetaFields").setType("STRING").setMode("REPEATED"));

TableSchema schema = new TableSchema().setFields(fields);
return schema;
Expand Down Expand Up @@ -961,12 +983,9 @@ public void processElement(ProcessContext c) {
IndexingConsts.ContentType contentType = options.getIndexAsShorttext() ? IndexingConsts.ContentType.SHORTTEXT: IndexingConsts.ContentType.ARTICLE;

try {
ic = c.element();

if (ic == null || ic.text == null || ic.text.isEmpty())
throw new Exception("null or empty document");

long processingTime = System.currentTimeMillis();

ic = c.element();

contentindex = new ContentIndex(
ic.text,
Expand All @@ -980,9 +999,11 @@ public void processElement(ProcessContext c) {
ic.documentCollectionId,
ic.collectionItemId,
ic.parentUrl,
ic.parentPubTime);
ic.parentPubTime,
ic.metaFields);

Indexer.index(contentindex); // Call to the NLP package

if (!contentindex.IsIndexingSuccessful)
throw new Exception(contentindex.IndexingErrors + ". Text: "+ic.text);

Expand Down Expand Up @@ -1108,7 +1129,8 @@ public void processElement(ProcessContext c) {
IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow, "Domain", summary.wr.domain);
IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow, "Author", summary.wr.author);
IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow, "ParentWebResourceHash", summary.wr.parentWebResourceHash);

IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow, "MetaFields", summary.wr.metaFields);

c.output(wrrow);

// Create the document entry
Expand Down Expand Up @@ -1234,6 +1256,7 @@ public void processElement(ProcessContext c) {
IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow, "Domain", summary.wr.domain);
IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow, "Author", summary.wr.author);
IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow, "ParentWebResourceHash", summary.wr.parentWebResourceHash);
IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow, "MetaFields", summary.wr.metaFields);

c.output(wrrow);
}
Expand All @@ -1253,7 +1276,7 @@ public void processElement(ProcessContext c) {

wr.initialize(ic.url, ic.pubTime, processingTime,
documentHash, ic.documentCollectionId, ic.collectionItemId,
ic.title, ic.author, parentWebResourceHash);
ic.title, ic.author, parentWebResourceHash, ic.metaFields);

Instant pubTime = new Instant(wr.publicationTime);
Instant proTime = new Instant(wr.processingTime);
Expand All @@ -1273,6 +1296,7 @@ public void processElement(ProcessContext c) {
IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow,"Domain", wr.domain);
IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow,"Author", wr.author);
IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow,"ParentWebResourceHash", wr.parentWebResourceHash);
IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow,"MetaFields", wr.metaFields);

c.output(wrrow);

Expand All @@ -1288,8 +1312,8 @@ static class GetUrlFn extends DoFn<TableRow, KV<String,Long>> {
@ProcessElement
public void processElement(ProcessContext c) {
TableRow row = c.element();
String url = row.get("Url").toString();
String processingTime = row.get("ProcessingTime").toString();
String url = IndexerPipelineUtils.getTableRowStringFieldIfNotNull(row,"Url");
String processingTime = IndexerPipelineUtils.getTableRowStringFieldIfNotNull(row,"ProcessingTime");
if (url != null && !url.isEmpty())
{
Long l = IndexerPipelineUtils.parseDateToLong(IndexerPipelineUtils.dateTimeFormatYMD_HMS_MSTZ, processingTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ public interface IndexerPipelineOptions extends DataflowPipelineOptions {
String getBigQueryDataset();
void setBigQueryDataset(String dataset);

@Description("Sink GCS output file")
String getOutputFile();
void setOutputFile(String value);

@Description("Deduplicate based on text content")
Boolean getDedupeText();
void setDedupeText(Boolean dedupeText);
Expand Down Expand Up @@ -144,9 +148,13 @@ public interface IndexerPipelineOptions extends DataflowPipelineOptions {
String[] getStatsCalcTables();
void setStatsCalcTables(String[] tables);

@Description("Bigtable IndexerAdminDB instance for Dead Letter log and config")
@Default.String("indexer-admindb-01")
@Description("Set the Bigtable IndexerAdminDB instance for Dead Letter log and config")
String getBigtableIndexerAdminDB();
void setBigtableIndexerAdminDB(String instance);

@Description("Ratio of elements to enrich with CNLP data")
Float getRatioEnrichWithCNLP();
void setRatioEnrichWithCNLP(Float ratio);


}
Loading

0 comments on commit 3bd2344

Please sign in to comment.