From 3bd234409ff5e5142bef1d205a0276304ee45a72 Mon Sep 17 00:00:00 2001 From: Sergei Sokolenko Date: Tue, 12 Dec 2017 21:36:06 -0800 Subject: [PATCH] Release 0.6.1 BigQuery: - added the MetaFields field to webresource table for source-specific metadata fields accessible via SAFE_OFFSET(0-base index). Java code: - Added propagation of MetaFields field from sources all the way to the BigQuery dataset - added the solutions package and as the first solution added the FileIndexerPipeline that takes a CSV file, indexes it, and writes a CSV file - added dependency on Commons CSV package that helps with CSV processing --- bigquery/webresourceSchema.json | 6 + pom.xml | 9 +- .../opinionanalysis/IndexerPipeline.java | 136 ++-- .../IndexerPipelineOptions.java | 12 +- .../opinionanalysis/IndexerPipelineUtils.java | 94 ++- .../opinionanalysis/io/RecordFileSource.java | 1 + .../opinionanalysis/model/InputContent.java | 11 +- .../solutions/FileIndexerPipeline.java | 630 ++++++++++++++++++ .../solutions/FileIndexerPipelineOptions.java | 33 + 9 files changed, 856 insertions(+), 76 deletions(-) create mode 100644 src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/solutions/FileIndexerPipeline.java create mode 100644 src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/solutions/FileIndexerPipelineOptions.java diff --git a/bigquery/webresourceSchema.json b/bigquery/webresourceSchema.json index 58c0754..632aafb 100644 --- a/bigquery/webresourceSchema.json +++ b/bigquery/webresourceSchema.json @@ -74,5 +74,11 @@ "name": "ParentWebResourceHash", "type": "STRING", "description" : "(Optional) In threaded conversations, the parent is the previous comment, email, or document" + }, + { + "mode": "REPEATED", + "name": "MetaFields", + "type": "STRING", + "description" : "(Optional) Source-specific metadata fields accessible via SAFE_OFFSET(0-base index). Review docs for source-specific index mappings." } ] diff --git a/pom.xml b/pom.xml index f44ad87..ff16d43 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ com.google.cloud.dataflow examples-opinionanalysis - 0.5.1 + 0.6.1 jar @@ -360,7 +360,7 @@ sirocco.sirocco-sa sirocco-sa - 1.0.6 + 1.0.7 org.apache.avro @@ -533,6 +533,11 @@ + + org.apache.commons + commons-csv + 1.5 + diff --git a/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/IndexerPipeline.java b/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/IndexerPipeline.java index f1508fa..d685297 100644 --- a/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/IndexerPipeline.java +++ b/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/IndexerPipeline.java @@ -14,17 +14,13 @@ * limitations under the License. *******************************************************************************/ package com.google.cloud.dataflow.examples.opinionanalysis; - -import java.io.IOException; import java.sql.ResultSet; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; -import org.apache.avro.reflect.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; @@ -220,7 +216,9 @@ public static Pipeline createIndexerPipeline(IndexerPipelineOptions options) thr } //PHASE: Enrich with CloudNLP entities - filteredIndexes = enrichWithCNLP(filteredIndexes); + + if (options.getRatioEnrichWithCNLP() > 0) + filteredIndexes = enrichWithCNLP(filteredIndexes, options.getRatioEnrichWithCNLP()); // PHASE: Write to BigQuery // For the Indexes that are unique ("filteredIndexes"), create records in webresource, document, and sentiment. @@ -240,10 +238,10 @@ public static Pipeline createIndexerPipeline(IndexerPipelineOptions options) thr * @return */ private static PCollection enrichWithCNLP( - PCollection filteredIndexes) { + PCollection filteredIndexes, Float ratio) { PCollectionTuple splitAB = filteredIndexes - .apply(ParDo.of(new SplitAB(0.01F)) + .apply(ParDo.of(new SplitAB(ratio)) .withOutputTags(PipelineTags.BranchA, TupleTagList.of(PipelineTags.BranchB))); @@ -276,25 +274,31 @@ private static PCollection indexDocuments( .apply(ParDo.of(new IndexDocument()) .withOutputTags(PipelineTags.successfullyIndexed, // main output TupleTagList.of(PipelineTags.unsuccessfullyIndexed))); // side output - + PCollection indexes = alldocuments - .get(PipelineTags.successfullyIndexed) - .setCoder(AvroCoder.of(ContentIndexSummary.class)); - - PCollection unprocessedDocuments = alldocuments - .get(PipelineTags.unsuccessfullyIndexed); - - BigtableOptions.Builder optionsBuilder = - new BigtableOptions.Builder() - .setProjectId(options.getProject()) - .setInstanceId(options.getBigtableIndexerAdminDB()); - BigtableOptions bigtableOptions = optionsBuilder.build(); - - unprocessedDocuments - .apply(ParDo.of(new CreateDeadLetterEntries())) - .apply("Write to Dead Letter table in Bigtable", BigtableIO.write() - .withBigtableOptions(bigtableOptions) - .withTableId(IndexerPipelineUtils.DEAD_LETTER_TABLE)); + .get(PipelineTags.successfullyIndexed) + .setCoder(AvroCoder.of(ContentIndexSummary.class)); + + // if the Bigtable admin DB is set, write into dead letter table + if (options.getBigtableIndexerAdminDB() != null) { + + + PCollection unprocessedDocuments = alldocuments + .get(PipelineTags.unsuccessfullyIndexed); + + BigtableOptions.Builder optionsBuilder = + new BigtableOptions.Builder() + .setProjectId(options.getProject()) + .setInstanceId(options.getBigtableIndexerAdminDB()); + BigtableOptions bigtableOptions = optionsBuilder.build(); + + unprocessedDocuments + .apply(ParDo.of(new CreateDeadLetterEntries())) + .apply("Write to Dead Letter table in Bigtable", BigtableIO.write() + .withBigtableOptions(bigtableOptions) + .withTableId(IndexerPipelineUtils.DEAD_LETTER_TABLE)); + } + return indexes; } @@ -482,8 +486,11 @@ else if (options.getRedditCommentsTableName() != null) PCollection> postInfo = posts.apply(ParDo.of(new ExtractPostDataFn())); PCollection> commentInfo = comments.apply(ParDo.of(new ExtractCommentInfoFn())); - PCollection> kvpCollection = KeyedPCollectionTuple.of(postInfoTag, postInfo) - .and(commentInfoTag, commentInfo).apply(CoGroupByKey.create()); + PCollection> kvpCollection = + KeyedPCollectionTuple + .of(postInfoTag, postInfo) + .and(commentInfoTag, commentInfo) + .apply(CoGroupByKey.create()); // Process the CoGbkResult elements generated by the CoGroupByKey // transform. @@ -512,11 +519,12 @@ public void processElement(ProcessContext c) { Long postPubTime = IndexerPipelineUtils.extractRedditTime(post.get("created_utc").toString()); /* - * sso 7/10/2017: Changing postUrl to the "url" field, which will contain the external - * url or the article being discussed + * sso 11/20/2017: Create two webresource records per post record + * The first WR record will have the external post URL, + * the second one will have the reddit post URL */ - //String postUrl = buildRedditPostUrl(postPermalink); - String postUrl = post.get("url").toString(); + String postUrl = IndexerPipelineUtils.buildRedditPostUrl(postPermalink); + String[] postMetaFields = IndexerPipelineUtils.extractRedditPostMetaFields(post); // Create the first InputContent for the post item itself InputContent icPost = new InputContent(/* url */ postUrl, /* pubTime */ postPubTime, @@ -524,7 +532,7 @@ public void processElement(ProcessContext c) { /* language */ null, /* text */ post.get("selftext").toString(), /* documentCollectionId */ IndexerPipelineUtils.DOC_COL_ID_REDDIT_FH_BIGQUERY, /* collectionItemId */ postId, /* skipIndexing */ 0, /* parentUrl */ null, // the post record will become the beginning of the thread - /* parentPubTime */ null); + /* parentPubTime */ null, /* metaFields */ postMetaFields); postAndCommentList.add(icPost); @@ -549,14 +557,17 @@ public void processElement(ProcessContext c) { String parentId = comment.get("parent_id").toString(); String parentUrl = (parentId.startsWith("t1_")) ? IndexerPipelineUtils.buildRedditCommentUrl(postPermalink, comment.get("id").toString()) : postUrl; - + + String[] commentMetaFields = IndexerPipelineUtils.extractRedditCommentMetaFields(comment); + InputContent icComment = new InputContent(/* url */ commentUrl, - /* pubTime */ commentPubTime, /* title */ null, - /* author */ comment.get("author").toString(), /* language */ null, - /* text */ comment.get("body").toString(), - /* documentCollectionId */ IndexerPipelineUtils.DOC_COL_ID_REDDIT_FH_BIGQUERY, - /* collectionItemId */ commentId, /* skipIndexing */ 0, /* parentUrl */ parentUrl, - /* parentPubTime */ null // don't set time yet, because we might not have read that record yet + /* pubTime */ commentPubTime, /* title */ null, + /* author */ comment.get("author").toString(), /* language */ null, + /* text */ comment.get("body").toString(), + /* documentCollectionId */ IndexerPipelineUtils.DOC_COL_ID_REDDIT_FH_BIGQUERY, + /* collectionItemId */ commentId, /* skipIndexing */ 0, /* parentUrl */ parentUrl, + /* parentPubTime */ null, // don't set time yet, because we might not have read that record yet + /* metaFields */ commentMetaFields ); pubTimes.put(commentUrl, commentPubTime); // save the pub time of the current comment @@ -593,17 +604,21 @@ private static void writeAllTablesToBigQuery(PCollectionTuple bqrows, // Now write to BigQuery WriteDisposition dispo = options.getWriteTruncate() ? - WriteDisposition.WRITE_TRUNCATE: WriteDisposition.WRITE_APPEND; + WriteDisposition.WRITE_TRUNCATE: WriteDisposition.WRITE_APPEND; //Merge all collections with WebResource table records PCollectionList webresourceRowsList = (webresourceDeduped == null) ? - PCollectionList.of(webresourceRows).and(webresourceRowsUnindexed) : - PCollectionList.of(webresourceRows).and(webresourceRowsUnindexed).and(webresourceDeduped); + PCollectionList.of(webresourceRows).and(webresourceRowsUnindexed) : + PCollectionList.of(webresourceRows).and(webresourceRowsUnindexed).and(webresourceDeduped); - PCollection allWebresourceRows = webresourceRowsList.apply(Flatten.pCollections()); + PCollection allWebresourceRows = + webresourceRowsList.apply(Flatten.pCollections()); + allWebresourceRows = !options.isStreaming() ? + allWebresourceRows.apply("Reshuffle Webresources", new Reshuffle()) : + allWebresourceRows; + allWebresourceRows - .apply("Reshuffle Webresources", new Reshuffle()) .apply("Write to webresource", BigQueryIO.writeTableRows() .to(getWebResourcePartitionedTableRef(options)) @@ -611,8 +626,11 @@ private static void writeAllTablesToBigQuery(PCollectionTuple bqrows, .withCreateDisposition(CreateDisposition.CREATE_NEVER) .withWriteDisposition(dispo)); + documentRows = !options.isStreaming() ? + documentRows.apply("Reshuffle Documents", new Reshuffle()): + documentRows; + documentRows - .apply("Reshuffle Documents", new Reshuffle()) .apply("Write to document", BigQueryIO.writeTableRows() .to(getDocumentPartitionedTableRef(options)) @@ -620,8 +638,11 @@ private static void writeAllTablesToBigQuery(PCollectionTuple bqrows, .withCreateDisposition(CreateDisposition.CREATE_NEVER) .withWriteDisposition(dispo)); + sentimentRows = !options.isStreaming() ? + sentimentRows.apply("Reshuffle Sentiments", new Reshuffle()): + sentimentRows; + sentimentRows - .apply("Reshuffle Sentiments", new Reshuffle()) .apply("Write to sentiment", BigQueryIO.writeTableRows() .to(getSentimentPartitionedTableRef(options)) @@ -651,6 +672,7 @@ private static TableSchema getWebResourceSchema() { fields.add(new TableFieldSchema().setName("Domain").setType("STRING")); fields.add(new TableFieldSchema().setName("Author").setType("STRING")); fields.add(new TableFieldSchema().setName("ParentWebResourceHash").setType("STRING")); + fields.add(new TableFieldSchema().setName("MetaFields").setType("STRING").setMode("REPEATED")); TableSchema schema = new TableSchema().setFields(fields); return schema; @@ -961,12 +983,9 @@ public void processElement(ProcessContext c) { IndexingConsts.ContentType contentType = options.getIndexAsShorttext() ? IndexingConsts.ContentType.SHORTTEXT: IndexingConsts.ContentType.ARTICLE; try { - ic = c.element(); - - if (ic == null || ic.text == null || ic.text.isEmpty()) - throw new Exception("null or empty document"); - long processingTime = System.currentTimeMillis(); + + ic = c.element(); contentindex = new ContentIndex( ic.text, @@ -980,9 +999,11 @@ public void processElement(ProcessContext c) { ic.documentCollectionId, ic.collectionItemId, ic.parentUrl, - ic.parentPubTime); + ic.parentPubTime, + ic.metaFields); Indexer.index(contentindex); // Call to the NLP package + if (!contentindex.IsIndexingSuccessful) throw new Exception(contentindex.IndexingErrors + ". Text: "+ic.text); @@ -1108,7 +1129,8 @@ public void processElement(ProcessContext c) { IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow, "Domain", summary.wr.domain); IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow, "Author", summary.wr.author); IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow, "ParentWebResourceHash", summary.wr.parentWebResourceHash); - + IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow, "MetaFields", summary.wr.metaFields); + c.output(wrrow); // Create the document entry @@ -1234,6 +1256,7 @@ public void processElement(ProcessContext c) { IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow, "Domain", summary.wr.domain); IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow, "Author", summary.wr.author); IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow, "ParentWebResourceHash", summary.wr.parentWebResourceHash); + IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow, "MetaFields", summary.wr.metaFields); c.output(wrrow); } @@ -1253,7 +1276,7 @@ public void processElement(ProcessContext c) { wr.initialize(ic.url, ic.pubTime, processingTime, documentHash, ic.documentCollectionId, ic.collectionItemId, - ic.title, ic.author, parentWebResourceHash); + ic.title, ic.author, parentWebResourceHash, ic.metaFields); Instant pubTime = new Instant(wr.publicationTime); Instant proTime = new Instant(wr.processingTime); @@ -1273,6 +1296,7 @@ public void processElement(ProcessContext c) { IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow,"Domain", wr.domain); IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow,"Author", wr.author); IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow,"ParentWebResourceHash", wr.parentWebResourceHash); + IndexerPipelineUtils.setTableRowFieldIfNotNull(wrrow,"MetaFields", wr.metaFields); c.output(wrrow); @@ -1288,8 +1312,8 @@ static class GetUrlFn extends DoFn> { @ProcessElement public void processElement(ProcessContext c) { TableRow row = c.element(); - String url = row.get("Url").toString(); - String processingTime = row.get("ProcessingTime").toString(); + String url = IndexerPipelineUtils.getTableRowStringFieldIfNotNull(row,"Url"); + String processingTime = IndexerPipelineUtils.getTableRowStringFieldIfNotNull(row,"ProcessingTime"); if (url != null && !url.isEmpty()) { Long l = IndexerPipelineUtils.parseDateToLong(IndexerPipelineUtils.dateTimeFormatYMD_HMS_MSTZ, processingTime); diff --git a/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/IndexerPipelineOptions.java b/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/IndexerPipelineOptions.java index 7a5e2b4..9fa6893 100644 --- a/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/IndexerPipelineOptions.java +++ b/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/IndexerPipelineOptions.java @@ -106,6 +106,10 @@ public interface IndexerPipelineOptions extends DataflowPipelineOptions { String getBigQueryDataset(); void setBigQueryDataset(String dataset); + @Description("Sink GCS output file") + String getOutputFile(); + void setOutputFile(String value); + @Description("Deduplicate based on text content") Boolean getDedupeText(); void setDedupeText(Boolean dedupeText); @@ -144,9 +148,13 @@ public interface IndexerPipelineOptions extends DataflowPipelineOptions { String[] getStatsCalcTables(); void setStatsCalcTables(String[] tables); - @Description("Bigtable IndexerAdminDB instance for Dead Letter log and config") - @Default.String("indexer-admindb-01") + @Description("Set the Bigtable IndexerAdminDB instance for Dead Letter log and config") String getBigtableIndexerAdminDB(); void setBigtableIndexerAdminDB(String instance); + @Description("Ratio of elements to enrich with CNLP data") + Float getRatioEnrichWithCNLP(); + void setRatioEnrichWithCNLP(Float ratio); + + } diff --git a/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/IndexerPipelineUtils.java b/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/IndexerPipelineUtils.java index 774e992..5263d01 100644 --- a/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/IndexerPipelineUtils.java +++ b/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/IndexerPipelineUtils.java @@ -65,6 +65,8 @@ public class IndexerPipelineUtils { public static final String DOC_COL_ID_KGA = "01"; public static final String DOC_COL_ID_REDDIT_FH_BIGQUERY = "02"; public static final String DOC_COL_ID_GDELT_BUCKET = "03"; + public static final String DOC_COL_ID_CSV_FILE = "04"; + // Reddit domain url public static final String REDDIT_URL = "https://www.reddit.com"; @@ -88,6 +90,17 @@ public class IndexerPipelineUtils { // Integration with Cloud NLP public static final String CNLP_TAG_PREFIX = "cnlp::"; + // Metafields constants + public static final int METAFIELDS_REDDIT_NUM_FIELDS = 5; + public static final int METAFIELDS_REDDIT_EXTERNALLINK_IDX = 0; + public static final int METAFIELDS_REDDIT_SUBREDDIT_IDX = 1; + public static final int METAFIELDS_REDDIT_SCORE_IDX = 2; + public static final int METAFIELDS_REDDIT_POSTID_IDX = 3; + public static final int METAFIELDS_REDDIT_DOMAIN_IDX = 4; + + + public static final String METAFIELDS_VALUE_UNVAILABLE = "unavailable"; + // TODO: Move the date parsing functions to DateUtils public static DateTime parseDateString(DateTimeFormatter formatter, String s) { DateTime result = null; @@ -139,6 +152,15 @@ public static void setTableRowFieldIfNotNull(TableRow r, String field, Object va r.set(field, value); } + public static String getTableRowStringFieldIfNotNull(TableRow r, String field) { + Object value = r.get(field); + if (value != null) + return value.toString(); + else + return null; + } + + public static String buildJdbcSourceImportQuery(IndexerPipelineOptions options) { String timeWindow = null; if (!(options.getJdbcSourceFromDate() == null || options.getJdbcSourceFromDate().isEmpty())) @@ -343,8 +365,8 @@ public static void validateIndexerPipelineOptions(IndexerPipelineOptions options options.setDedupeText(options.isSourceGDELTbucket()); } - if (options.getBigQueryDataset().isEmpty()) { - throw new IllegalArgumentException("Sink BigQuery dataset needs to be specified."); + if ( (options.getBigQueryDataset() == null) && (options.getOutputFile() == null)) { + throw new IllegalArgumentException("Either Sink BigQuery dataset or Output file need to be specified."); } if (options.isSourcePubsub()) { @@ -353,12 +375,10 @@ public static void validateIndexerPipelineOptions(IndexerPipelineOptions options options.setStreaming(false); } - /* - if (options.isSourcePubsub()) { - setupPubsubTopic(options); + if (options.getRatioEnrichWithCNLP() == null) { + Float cnlpRatio = options.isStreaming() ? 1.0F : 0.01F; + options.setRatioEnrichWithCNLP(cnlpRatio); } - */ - //setupRunner(options); } @@ -399,7 +419,7 @@ public static void validateSocialStatsPipelineOptions(IndexerPipelineOptions opt - private static String buildRedditPostUrl(String permalink) { + public static String buildRedditPostUrl(String permalink) { return REDDIT_URL + permalink; } @@ -412,6 +432,58 @@ public static Long extractRedditTime(String createdUtcString) { return (i * 1000L); } + public static String[] extractRedditPostMetaFields(TableRow post) { + String[] result = new String[METAFIELDS_REDDIT_NUM_FIELDS]; + + String domain = post.get("domain").toString(); + if (!domain.startsWith("self.")) { + result[METAFIELDS_REDDIT_EXTERNALLINK_IDX] = post.get("url").toString(); + result[METAFIELDS_REDDIT_DOMAIN_IDX] = domain; + } else { + result[METAFIELDS_REDDIT_EXTERNALLINK_IDX] = METAFIELDS_VALUE_UNVAILABLE; + result[METAFIELDS_REDDIT_DOMAIN_IDX] = METAFIELDS_VALUE_UNVAILABLE; + } + Object oSubreddit = post.get("subreddit"); + if (oSubreddit != null) + result[METAFIELDS_REDDIT_SUBREDDIT_IDX] = oSubreddit.toString(); + else + result[METAFIELDS_REDDIT_SUBREDDIT_IDX] = METAFIELDS_VALUE_UNVAILABLE; + + result[METAFIELDS_REDDIT_SCORE_IDX] = post.get("score").toString(); + result[METAFIELDS_REDDIT_POSTID_IDX] = extractPostIdFromRedditPost(post); + + return result; + } + + public static String[] extractRedditCommentMetaFields(TableRow comment) { + + String[] result = new String[METAFIELDS_REDDIT_NUM_FIELDS]; + + result[METAFIELDS_REDDIT_EXTERNALLINK_IDX] = METAFIELDS_VALUE_UNVAILABLE; + result[METAFIELDS_REDDIT_DOMAIN_IDX] = METAFIELDS_VALUE_UNVAILABLE; + + Object oSubreddit = comment.get("subreddit"); + if (oSubreddit != null) + result[METAFIELDS_REDDIT_SUBREDDIT_IDX] = oSubreddit.toString(); + else + result[METAFIELDS_REDDIT_SUBREDDIT_IDX] = METAFIELDS_VALUE_UNVAILABLE; + + result[METAFIELDS_REDDIT_SCORE_IDX] = comment.get("score").toString(); + result[METAFIELDS_REDDIT_POSTID_IDX] = extractPostIdFromRedditComment(comment); + + return result; + + } + + public static String extractPostIdFromRedditPost(TableRow post) { + return "t3_" + (String) post.get("id"); + } + + public static String extractPostIdFromRedditComment(TableRow comment) { + return (String) comment.get("link_id"); + // link_id in comments is already in the format t3_ + } + /** * Extracts the post id from the post record. */ @@ -419,8 +491,7 @@ static class ExtractPostDataFn extends DoFn> { @ProcessElement public void processElement(ProcessContext c) { TableRow row = c.element(); - String postId = (String) row.get("id"); - postId = "t3_" + postId; + String postId = IndexerPipelineUtils.extractPostIdFromRedditPost(row); c.output(KV.of(postId, row)); } } @@ -432,8 +503,7 @@ static class ExtractCommentInfoFn extends DoFn> { @ProcessElement public void processElement(ProcessContext c) { TableRow row = c.element(); - String postId = (String) row.get("link_id"); - // link_id in comments is already in the format t3_ + String postId = extractPostIdFromRedditComment(row); c.output(KV.of(postId, row)); } } diff --git a/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/io/RecordFileSource.java b/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/io/RecordFileSource.java index 2b7e301..3a720e1 100644 --- a/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/io/RecordFileSource.java +++ b/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/io/RecordFileSource.java @@ -52,6 +52,7 @@ public class RecordFileSource extends FileBasedSource { private final byte separator; public static final byte DEFAULT_RECORD_SEPARATOR = '\036'; // use ASCII Record Separator RS octal number 036, decimal 30, hex 1E + public static final byte CR_RECORD_SEPARATOR = '\015'; // CR: octal number 015, decimal 13, hex 0D public RecordFileSource(ValueProvider fileSpec, Coder coder, byte separator) { diff --git a/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/model/InputContent.java b/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/model/InputContent.java index a755b13..ca2f384 100644 --- a/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/model/InputContent.java +++ b/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/model/InputContent.java @@ -62,16 +62,19 @@ public class InputContent { public String expectedWebResourceHash; @Nullable public String expectedParentWebResourceHash; + @Nullable + public String[] metaFields; + public InputContent() {} public InputContent(String url, Long pubTime, String title, String author, String language, String text, String documentCollectionId, String collectionItemId, Integer skipIndexing) { - this( url, pubTime, title, author, language, text, documentCollectionId, collectionItemId, skipIndexing, null, null); + this( url, pubTime, title, author, language, text, documentCollectionId, collectionItemId, skipIndexing, null, null, null); } public InputContent(String url, Long pubTime, String title, String author, String language, String text, - String documentCollectionId, String collectionItemId, Integer skipIndexing, String parentUrl, Long parentPubTime) { + String documentCollectionId, String collectionItemId, Integer skipIndexing, String parentUrl, Long parentPubTime, String[] metaFields) { this.url = url; this.pubTime = pubTime; this.title = title; @@ -83,6 +86,7 @@ public InputContent(String url, Long pubTime, String title, String author, Strin this.skipIndexing = skipIndexing; this.parentUrl = parentUrl; this.parentPubTime = parentPubTime; + this.metaFields = metaFields; this.calculateHashFields(); } @@ -149,7 +153,6 @@ public static InputContent createInputContentFromGDELTJson(String s) throws Exce result.documentCollectionId = IndexerPipelineUtils.DOC_COL_ID_GDELT_BUCKET; result.collectionItemId = node.get("gkgoffsets").asText(); - result.skipIndexing = 0; result.calculateHashFields(); @@ -160,7 +163,7 @@ public static InputContent createInputContentFromGDELTJson(String s) throws Exce private void calculateHashFields() { - this.expectedDocumentHash = ((this.text !=null)) ? Document.calculateDocumentHash(this.text) : null; + this.expectedDocumentHash = Document.calculateDocumentHash(this.text,this.documentCollectionId,this.collectionItemId); this.expectedWebResourceHash = ((this.pubTime != null) && (this.url !=null)) ? HashUtils.getSHA1HashBase64(this.pubTime + this.url) : null; this.expectedParentWebResourceHash = ((this.parentUrl != null && this.parentPubTime != null)) ? HashUtils.getSHA1HashBase64(this.parentPubTime + this.parentUrl) : null; } diff --git a/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/solutions/FileIndexerPipeline.java b/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/solutions/FileIndexerPipeline.java new file mode 100644 index 0000000..bb193d9 --- /dev/null +++ b/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/solutions/FileIndexerPipeline.java @@ -0,0 +1,630 @@ +/******************************************************************************* + * Copyright 2017 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ + +package com.google.cloud.dataflow.examples.opinionanalysis.solutions; + + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.StringWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; +import org.apache.commons.csv.CSVRecord; +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import com.google.bigtable.v2.Mutation; +import com.google.cloud.bigtable.config.BigtableOptions; +import com.google.cloud.dataflow.examples.opinionanalysis.IndexerPipelineOptions; +import com.google.cloud.dataflow.examples.opinionanalysis.IndexerPipelineUtils; +import com.google.cloud.dataflow.examples.opinionanalysis.io.RecordFileSource; +import com.google.cloud.dataflow.examples.opinionanalysis.model.InputContent; +import com.google.cloud.dataflow.examples.opinionanalysis.util.PipelineTags; + +import com.google.cloud.language.v1.Document.Type; +import com.google.cloud.language.v1.EncodingType; +import com.google.cloud.language.v1.Entity; +import com.google.cloud.language.v1.LanguageServiceClient; +import com.google.cloud.language.v1.Sentiment; +import com.google.cloud.language.v1.AnalyzeEntitiesRequest; +import com.google.cloud.language.v1.AnalyzeEntitiesResponse; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; + +import sirocco.indexer.Indexer; +import sirocco.indexer.IndexingConsts; +import sirocco.indexer.util.LogUtils; +import sirocco.model.ContentIndex; + +import sirocco.model.summary.ContentIndexSummary; +import sirocco.model.summary.DocumentTag; + + +public class FileIndexerPipeline { + private static final Logger LOG = LoggerFactory.getLogger(FileIndexerPipeline.class); + private static final long REPORT_LONG_INDEXING_DURATION = 10000; // Report indexing duration longer than 10s. + + public static void main(String[] args) throws Exception { + + FileIndexerPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(FileIndexerPipelineOptions.class); + + Pipeline pipeline = createIndexerPipeline(options); + + pipeline.run(); + + } + + /** + * This function creates the DAG graph of transforms. It can be called from main() + * as well as from the ControlPipeline. + * @param options + * @return + * @throws Exception + */ + public static Pipeline createIndexerPipeline(FileIndexerPipelineOptions options) throws Exception { + + IndexerPipelineUtils.validateIndexerPipelineOptions(options); + Pipeline pipeline = Pipeline.create(options); + + // PHASE: Read raw content from sources + + PCollection readContent = pipeline + .apply("Read entire CSV file", org.apache.beam.sdk.io.Read.from(new RecordFileSource( + ValueProvider.StaticValueProvider.of(options.getInputFile()), + StringUtf8Coder.of(), RecordFileSource.DEFAULT_RECORD_SEPARATOR))) // + .apply("Parse CSV file into InputContent objects", ParDo.of(new ParseCSVFile())); + + // Define the accumulators of all filters + PCollection contentToIndex = readContent; + + // PHASE: Index documents (extract opinions and entities/tags). + // Return successfully indexed docs, and create a Bigtable write transform to store errors + // in Dead Letter table. + PCollection indexes = indexDocuments(options, contentToIndex); + + if (options.getRatioEnrichWithCNLP() > 0) + indexes = enrichWithCNLP(indexes, options.getRatioEnrichWithCNLP()); + + // PHASE: Write to BigQuery + // For the Indexes that are unique ("filteredIndexes"), create records in webresource, document, and sentiment. + // Then, merge resulting webresources with webresourceRowsUnindexed and webresourceDeduped + indexes + .apply(ParDo.of(new CreateCSVLineFromIndexSummaryFn())) + .apply(TextIO.write().to(options.getOutputFile())); + + + return pipeline; + } + + + static class ParseCSVFile extends DoFn { + + @ProcessElement + public void processElement(ProcessContext c) { + + String rawInput = null; + InputContent iContent = null; + + try { + rawInput = c.element(); + if (rawInput == null) + throw new Exception("ParseCSVFile: null raw content"); + + + FileIndexerPipelineOptions options = c.getPipelineOptions().as(FileIndexerPipelineOptions.class); + Integer textColumnIdx = options.getTextColumnIdx(); + Integer collectionItemIdIdx = options.getCollectionItemIdIdx(); + + InputStreamReader isr = new InputStreamReader(IOUtils.toInputStream(rawInput,StandardCharsets.UTF_8.name())); + + Iterable records = CSVFormat.DEFAULT + .withFirstRecordAsHeader() + .parse(isr); + + for (CSVRecord record : records) { + + String text = record.get(textColumnIdx); + String documentCollectionId = IndexerPipelineUtils.DOC_COL_ID_CSV_FILE; + String collectionItemId = record.get(collectionItemIdIdx); + + InputContent ic = new InputContent( + null /*url*/, null /*pubTime*/, null /*title*/, null /*author*/, null /*language*/, + text, documentCollectionId, collectionItemId, 0 /*skipIndexing*/); + + c.output(ic); + } + + + } catch (Exception e) { + LOG.warn(e.getMessage()); + } + } + + + } + + /** + * + * Use in the future, when we are able to parallelize import at the record file source + * @author sezok + * + */ + static class ParseCSVLine extends DoFn { + + /* + @Setup + public void setup(){ + } + + @Teardown + public void teardown(){ + } + */ + + @ProcessElement + public void processElement(ProcessContext c) { + + String rawInput = null; + InputContent iContent = null; + + try { + rawInput = c.element(); + if (rawInput == null) + throw new Exception("ParseCSVLine: null raw content"); + rawInput = rawInput.trim(); + if (rawInput.isEmpty()) + throw new Exception("ParseCSVLine: empty raw content or whitespace chars only"); + + FileIndexerPipelineOptions options = c.getPipelineOptions().as(FileIndexerPipelineOptions.class); + Integer textColumnIdx = options.getTextColumnIdx(); + Integer collectionItemIdIdx = options.getCollectionItemIdIdx(); + + InputStreamReader isr = new InputStreamReader(IOUtils.toInputStream(rawInput,StandardCharsets.UTF_8.name())); + + Iterable records = CSVFormat.DEFAULT.parse(isr); + + for (CSVRecord record : records) { // should only be one record, but handle multi-record case as well + + String text = record.get(textColumnIdx); + String documentCollectionId = IndexerPipelineUtils.DOC_COL_ID_CSV_FILE; + String collectionItemId = record.get(collectionItemIdIdx); + + InputContent ic = new InputContent( + null /*url*/, null /*pubTime*/, null /*title*/, null /*author*/, null /*language*/, + text, documentCollectionId, collectionItemId, 0 /*skipIndexing*/); + + c.output(ic); + } + + + } catch (Exception e) { + LOG.warn(e.getMessage()); + } + } + + + } + + + /** + * @param indexes + * @return + */ + private static PCollection enrichWithCNLP( + PCollection indexes, Float ratio) { + + PCollectionTuple splitAB = indexes + .apply(ParDo.of(new SplitAB(ratio)) + .withOutputTags(PipelineTags.BranchA, + TupleTagList.of(PipelineTags.BranchB))); + + PCollection branchACol = splitAB.get(PipelineTags.BranchA); + PCollection branchBCol = splitAB.get(PipelineTags.BranchB); + + PCollection enrichedBCol = branchBCol.apply( + ParDo.of(new EnrichWithCNLPEntities())); + + //Merge all collections with WebResource table records + PCollectionList contentIndexSummariesList = + PCollectionList.of(branchACol).and(enrichedBCol); + PCollection allIndexSummaries = + contentIndexSummariesList.apply(Flatten.pCollections()); + + indexes = allIndexSummaries; + return indexes; + } + + /** + * @param options + * @param contentToIndex + * @return + */ + private static PCollection indexDocuments( + IndexerPipelineOptions options, + PCollection contentToIndex) { + + PCollectionTuple alldocuments = contentToIndex + .apply(ParDo.of(new IndexDocument()) + .withOutputTags(PipelineTags.successfullyIndexed, // main output + TupleTagList.of(PipelineTags.unsuccessfullyIndexed))); // side output + + PCollection indexes = alldocuments + .get(PipelineTags.successfullyIndexed) + .setCoder(AvroCoder.of(ContentIndexSummary.class)); + + // if the Bigtable admin DB is set, write into dead letter table + if (options.getBigtableIndexerAdminDB() != null) { + + PCollection unprocessedDocuments = alldocuments + .get(PipelineTags.unsuccessfullyIndexed); + + BigtableOptions.Builder optionsBuilder = + new BigtableOptions.Builder() + .setProjectId(options.getProject()) + .setInstanceId(options.getBigtableIndexerAdminDB()); + BigtableOptions bigtableOptions = optionsBuilder.build(); + + unprocessedDocuments + .apply(ParDo.of(new CreateDeadLetterEntries())) + .apply("Write to Dead Letter table in Bigtable", BigtableIO.write() + .withBigtableOptions(bigtableOptions) + .withTableId(IndexerPipelineUtils.DEAD_LETTER_TABLE)); + } + + return indexes; + } + + + /** + * Pipeline step 3 + * FormatAsTableRowFn - a DoFn for converting a sentiment summary into a BigQuery WebResources record + */ + + static class CreateCSVLineFromIndexSummaryFn extends DoFn { + + @ProcessElement + public void processElement(ProcessContext c) { + + ContentIndexSummary summary = c.element(); + + if (summary.sentiments == null) + return; + + try { + + StringWriter stringWriter = new StringWriter(); + CSVPrinter csvPrinter = new CSVPrinter(stringWriter,CSVFormat.DEFAULT); + + for (int i=0; i < summary.sentiments.length; i++) + { + + ArrayList linefields = new ArrayList(); + + addField(linefields,"RecordID",summary.doc.collectionItemId); + + ArrayList sttags = new ArrayList<>(); + if (summary.sentiments[i].tags != null) + for (int j=0; j < summary.sentiments[i].tags.length; j++) + sttags.add(summary.sentiments[i].tags[j].tag); + + addField(linefields,"Tags",sttags.toString()); // will write as [a,b,c] + + addField(linefields,"SentimentHash", summary.sentiments[i].sentimentHash); + addField(linefields,"Text", summary.sentiments[i].text); + addField(linefields,"LabelledPositions", summary.sentiments[i].labelledPositions); + addField(linefields,"AnnotatedText", summary.sentiments[i].annotatedText); + addField(linefields,"AnnotatedHtml", summary.sentiments[i].annotatedHtmlText); + addField(linefields,"SentimentTotalScore", summary.sentiments[i].sentimentTotalScore); + addField(linefields,"DominantValence", summary.sentiments[i].dominantValence.ordinal()); + addField(linefields,"StAcceptance", summary.sentiments[i].stAcceptance); + addField(linefields,"StAnger", summary.sentiments[i].stAnger); + addField(linefields,"StAnticipation", summary.sentiments[i].stAnticipation); + addField(linefields,"StAmbiguous", summary.sentiments[i].stAmbiguous); + addField(linefields,"StDisgust", summary.sentiments[i].stDisgust); + addField(linefields,"StFear", summary.sentiments[i].stFear); + addField(linefields,"StGuilt", summary.sentiments[i].stGuilt); + addField(linefields,"StInterest", summary.sentiments[i].stInterest); + addField(linefields,"StJoy", summary.sentiments[i].stJoy); + addField(linefields,"StSadness", summary.sentiments[i].stSadness); + addField(linefields,"StShame", summary.sentiments[i].stShame); + addField(linefields,"StSurprise", summary.sentiments[i].stSurprise); + addField(linefields,"StPositive", summary.sentiments[i].stPositive); + addField(linefields,"StNegative", summary.sentiments[i].stNegative); + addField(linefields,"StSentiment", summary.sentiments[i].stSentiment); + addField(linefields,"StProfane", summary.sentiments[i].stProfane); + addField(linefields,"StUnsafe", summary.sentiments[i].stUnsafe); + + ArrayList signalsarray = new ArrayList<>(); + if (summary.sentiments[i].signals != null) + for (int j=0; j < summary.sentiments[i].signals.length; j++) + signalsarray.add(summary.sentiments[i].signals[j]); + + addField(linefields,"Signals",signalsarray.toString()); + + csvPrinter.printRecord(linefields); + + String output = stringWriter.toString(); + csvPrinter.flush(); // will also flush the stringWriter + + c.output(output); + + + } + + csvPrinter.close(); + } catch (IOException e) { + LOG.warn(e.getMessage()); + } + } + + private void addField(ArrayList fields, String fieldName, String value) { + fields.add(value); + // TODO: should we quote the string? + } + private void addField(ArrayList fields, String fieldName, Integer value) { + fields.add(value.toString()); + } + + + } + + + + + /** + * Create items to be stored in Bigtable dead letter table unprocessed-documents + * @author sezok + * + */ + static class CreateDeadLetterEntries extends DoFn>> { + + @ProcessElement + public void processElement(ProcessContext c) { + InputContent i = c.element(); + String jobName = c.getPipelineOptions().getJobName(); + ByteString rowkey = ByteString.copyFromUtf8(jobName + "#" + i.expectedDocumentHash); + ByteString value = ByteString.copyFromUtf8(i.text); + + Iterable mutations = + ImmutableList.of(Mutation.newBuilder() + .setSetCell( + Mutation.SetCell.newBuilder() + .setFamilyName(IndexerPipelineUtils.DEAD_LETTER_TABLE_ERR_CF) + .setColumnQualifier(ByteString.copyFromUtf8("text")) + .setValue(value) + ) + .build()); + + c.output(KV.of(rowkey, mutations)); + } + } + + + + /** + * + * IndexDocument - a ParDo that analyzes just one document at a time + * and produces its Sentiment Analysis summary + */ + + static class IndexDocument extends DoFn { + + @ProcessElement + public void processElement(ProcessContext c) { + + ContentIndex contentindex = null; + ContentIndexSummary summary = null; + InputContent ic = null; + IndexerPipelineOptions options = c.getPipelineOptions().as(IndexerPipelineOptions.class); + IndexingConsts.ContentType contentType = options.getIndexAsShorttext() ? IndexingConsts.ContentType.SHORTTEXT: IndexingConsts.ContentType.ARTICLE; + + try { + long processingTime = System.currentTimeMillis(); + + ic = c.element(); + + contentindex = new ContentIndex( + ic.text, + IndexingConsts.IndexingType.TOPSENTIMENTS, + contentType, + processingTime, + ic.url, + ic.pubTime, + ic.title, + ic.author, + ic.documentCollectionId, + ic.collectionItemId, + ic.parentUrl, + ic.parentPubTime, + ic.metaFields); + + Indexer.index(contentindex); // Call to the NLP package + + if (!contentindex.IsIndexingSuccessful) + throw new Exception(contentindex.IndexingErrors + ". Text: "+ic.text); + + summary = contentindex.getContentIndexSummary(); + + long indexingDuration = System.currentTimeMillis() - processingTime; + if (indexingDuration > FileIndexerPipeline.REPORT_LONG_INDEXING_DURATION) { + LOG.warn("IndexDocument.processElement: Indexing took " + indexingDuration + " milliseconds."); + StringBuilder sb = new StringBuilder(); + LogUtils.printIndex(1, contentindex, sb); + String docIndex = sb.toString(); + LOG.warn("IndexDocument.processElement: Contents of Index ["+indexingDuration+" ms]: " + docIndex); + } + + if (summary == null) + throw new Exception("null ContentIndexSummary returned"); + else + c.output(summary); + + } catch (Exception e) { + LOG.warn("IndexDocument.processElement:",e); + c.output(PipelineTags.unsuccessfullyIndexed, ic); + } + + } + } + + + + /** + * Call CloudNLP + * + */ + static class EnrichWithCNLPEntities extends DoFn { + + private LanguageServiceClient languageClient; + + @StartBundle + public void startBundle(){ + try { + this.languageClient = LanguageServiceClient.create(); + } catch (Exception e) { + LOG.warn(e.getMessage()); + } + + } + + @FinishBundle + public void finishBundle(){ + if (this.languageClient == null) + return; + + try { + this.languageClient.close(); + } catch (Exception e) { + LOG.warn(e.getMessage()); + } + } + + @ProcessElement + public void processElement(ProcessContext c) { + ContentIndexSummary is = c.element(); + + try { + + if (this.languageClient == null) + throw new Exception("CNLP client not initialized"); + + com.google.cloud.language.v1.Document doc = com.google.cloud.language.v1.Document.newBuilder() + .setContent(is.doc.text).setType(Type.PLAIN_TEXT).build(); + + AnalyzeEntitiesRequest request = AnalyzeEntitiesRequest.newBuilder() + .setDocument(doc).setEncodingType(EncodingType.UTF16).build(); + + AnalyzeEntitiesResponse response = languageClient.analyzeEntities(request); + + // get at most as many entities as we have tags in the Sirocco-based output + // int entitiesToGet = Math.min(is.doc.tags.length, response.getEntitiesList().size()); + int entitiesToGet = response.getEntitiesList().size(); + DocumentTag[] newTags = new DocumentTag[entitiesToGet]; + + // Create additional Document Tags and add them to the output index summary + for (int idx = 0; idx < entitiesToGet; idx++) { + // Entities are sorted by salience in the response list, so pick the first ones + Entity entity = response.getEntitiesList().get(idx); + DocumentTag dt = new DocumentTag(); + String tag = IndexerPipelineUtils.CNLP_TAG_PREFIX + entity.getName(); + Float weight = entity.getSalience(); + Boolean goodAsTopic = null; + dt.initialize(tag, weight, goodAsTopic); + newTags[idx] = dt; + } + + if (entitiesToGet>0) + { + ContentIndexSummary iscopy = is.copy(); + DocumentTag[] combinedTags = new DocumentTag[newTags.length + iscopy.doc.tags.length]; + System.arraycopy(iscopy.doc.tags, 0, combinedTags, 0, iscopy.doc.tags.length); + System.arraycopy(newTags, 0, combinedTags, iscopy.doc.tags.length, newTags.length); + iscopy.doc.tags = combinedTags; + c.output(iscopy); + } + else + c.output(is); + + } catch (Exception e) { + LOG.warn(e.getMessage()); + } + + } + } + + + /** + * Splits incoming collection into A (main output) and B (side output) + * + * + */ + static class SplitAB extends DoFn { + + /** + * bRatio - Ratio of elements to route to "B" side output. + * Needs to be a float value between 0 and 1. + */ + private final Float bRatio; + private final int threshold; + private transient ThreadLocalRandom random; + + + public SplitAB(Float bRatio) { + this.bRatio = (bRatio < 0) ? 0: (bRatio < 1)? bRatio : 1; // valid values are between 0 and 1 + this.threshold = (int) (((float) Integer.MAX_VALUE) * this.bRatio); + } + + @StartBundle + public void startBundle() { + random = ThreadLocalRandom.current(); + } + + @ProcessElement + public void processElement(ProcessContext c) { + ContentIndexSummary i = c.element(); + int dice = random.nextInt(Integer.MAX_VALUE); + + if (dice > this.threshold) + c.output(i); + else + c.output(PipelineTags.BranchB, i); + } + } + + + + +} diff --git a/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/solutions/FileIndexerPipelineOptions.java b/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/solutions/FileIndexerPipelineOptions.java new file mode 100644 index 0000000..f968861 --- /dev/null +++ b/src/main/java/com/google/cloud/dataflow/examples/opinionanalysis/solutions/FileIndexerPipelineOptions.java @@ -0,0 +1,33 @@ +/******************************************************************************* + * Copyright 2017 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ + +package com.google.cloud.dataflow.examples.opinionanalysis.solutions; + +import org.apache.beam.sdk.options.Description; + +import com.google.cloud.dataflow.examples.opinionanalysis.IndexerPipelineOptions; + +public interface FileIndexerPipelineOptions extends IndexerPipelineOptions { + + @Description("Zero-based index of the Text column in input file") + Integer getTextColumnIdx(); + void setTextColumnIdx(Integer textColumnIdx); + + @Description("Zero-based index of the Collection Item ID column - unique identifier - in input file") + Integer getCollectionItemIdIdx(); + void setCollectionItemIdIdx(Integer collectionItemIdIdx); + +}