Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding write compliant proto parquet support, but from new library 1.… #3

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<maven.compiler.target>1.6</maven.compiler.target>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<parquet.version>1.11.1</parquet.version>
<parquet.version>1.12.2</parquet.version>

<!-- Whether we will using legazy old consumer kafka-legacy or new consumer kafka-2.0.0 -->
<!-- Name of the directory below src/main/config that contains kafka-version-specific settings -->
Expand Down Expand Up @@ -399,7 +399,7 @@
<dependency>
<groupId>com.fullcontact.platform</groupId>
<artifactId>protobufs</artifactId>
<version>21.10.36</version>
<version>22.3.17</version>
</dependency>
</dependencies>

Expand Down
170 changes: 170 additions & 0 deletions src/main/config/quassi.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#include=secor.kubernetes-prod.properties
secor.gs.bucket=
secor.gs.path=
#secor.file.reader.writer.factory=com.pinterest.secor.io.impl.ProtobufParquetFileReaderWriterFactory
#secor.swift.path=
#secor.local.path=/mnt/secor_data/message_logs/partition
#ostrich.port=9998
#secor.kubernetes-prod.properties: |+
cloud.service=S3
aws.proxy.isEnabled=false
aws.proxy.http.host=
aws.proxy.http.port=
#kafka.seed.broker.host=b-1.databus.5ycvu3.c5.kafka.us-east-1.amazonaws.com
aws.region=us-east-1
aws.endpoint=
aws.client.pathstyleaccess=false
aws.sse.type=S3
aws.sse.customer.key=
aws.sse.kms.key=
#secor.s3.filesystem=s3n
swift.use.get.auth=true
swift.auth.url=
swift.tenant=
swift.username=
swift.port=8080
swift.public=true
swift.password=
swift.api.key=
secor.gs.upload.direct=false
secor.gs.tasks.ratelimit.pr.second=2
secor.gs.threadpool.fixed.size=32
secor.gs.credentials.path=
zookeeper.session.timeout.ms=3000
zookeeper.sync.time.ms=200
secor.zookeeper.path=/
kafka.consumer.timeout.ms=10000
kafka.consumer.auto.offset.reset=smallest
kafka.partition.assignment.strategy=range
kafka.rebalance.max.retries=
kafka.rebalance.backoff.ms=
kafka.socket.receive.buffer.bytes=
kafka.fetch.message.max.bytes=
kafka.fetch.min.bytes=
kafka.fetch.wait.max.ms=
#kafka.seed.broker.port=9092
kafka.zookeeper.path=/
#schema.registry.url=http://kafka-schema-registry.kafka.svc.cluster.local:8081
kafka.dual.commit.enabled=false
kafka.offsets.storage=zookeeper
kafka.useTimestamp=false
kafka.message.timestamp.className=com.pinterest.secor.timestamp.Kafka10MessageTimestamp
kafka.message.iterator.className=com.pinterest.secor.reader.SecorKafkaMessageIterator
secor.generation=1
secor.consumer.threads=6
secor.messages.per.second=400000
secor.offsets.per.partition=10000000
secor.offsets.prefix=offset=
secor.topic_partition.forget.seconds=600
partitioner.granularity.hour=false
partitioner.granularity.minute=false
partitioner.granularity.date.prefix=dt=
partitioner.granularity.hour.prefix=hr=
partitioner.granularity.minute.prefix=min=
partitioner.granularity.date.format=yyyy-MM-dd
partitioner.granularity.hour.format=HH
partitioner.granularity.minute.format=mm
partitioner.finalizer.delay.seconds=3600
secor.local.log.delete.age.hours=1
qubole.api.token=
hive.table.prefix=
tsdb.hostport=
monitoring.blacklist.topics=
monitoring.prefix=qaasi-secor
monitoring.interval.seconds=30
statsd.hostport=localhost:9125
secor.thrift.protocol.class=
secor.thrift.message.class.*=
statsd.prefixWithConsumerGroup=true
message.timestamp.name=last_updated
message.timestamp.name.separator=
message.timestamp.id=1
message.timestamp.type=i64
message.timestamp.input.pattern=
message.timestamp.required=true
secor.compression.codec=org.apache.hadoop.io.compress.GzipCodec
secor.file.extension=.gz.parquet
secor.file.reader.Delimiter=\n
secor.file.writer.Delimiter=\n
secor.max.message.size.bytes=1000000
secor.upload.manager.class=com.pinterest.secor.uploader.S3UploadManager
secor.parser.timezone=UTC
secor.message.transformer.class=com.pinterest.secor.transformer.IdentityMessageTransformer
secor.s3.prefix.md5hash=false
secor.s3.alter.path.date=
secor.s3.alternative.path=
secor.enable.qubole=false
secor.qubole.timeout.ms=300000
secor.kafka.upload_at_minute_mark.topic_filter=
secor.upload.minute_mark=0
secor.file.age.youngest=true
secor.monitoring.metrics.collector.class=com.pinterest.secor.monitoring.MicroMeterMetricCollector
secor.monitoring.metrics.collector.micrometer.prometheus.enabled=true
parquet.block.size=134217728
parquet.page.size=1048576
parquet.enable.dictionary=true
parquet.validation=false
secor.orc.message.schema.*=struct<a:int\,b:int\,c:struct<d:int\,e:string>\,f:array<string>\,g:int>
secor.orc.schema.provider=com.pinterest.secor.util.orc.schema.DefaultORCSchemaProvider
#ostrich.port=9998
secor.swift.path=
#secor.max.file.size.bytes=200000000
#secor.max.file.age.seconds=400

#secor.qaasi.properties: |+
#include=secor.kubernetes-prod.partition.properties
kafka.new.consumer.topic.list=
secor.kafka.topic_filter=databus.qaasi.v0
secor.kafka.topic_blacklist=
secor.s3.bucket=fullcontact-databus-msk
secor.s3.path=
secor.s3.filesystem=s3a
aws.access.key=
aws.secret.key=
aws.session.token=
aws.role=
secor.swift.containers.for.each.topic=false
secor.swift.container=logsContainer
secor.protobuf.message.class.databus.qaasi.v0=contact_proto.FieldsMessage$Fields
secor.message.parser.class=com.pinterest.secor.parser.ProtobufMessageParser
secor.file.reader.writer.factory=com.pinterest.secor.io.impl.ProtobufParquetFileReaderWriterFactory
schema.registry.url=http://kafka-schema-registry.kafka.svc.cluster.local:8081
kafka.seed.broker.host=b-1.databus.5ycvu3.c5.kafka.us-east-1.amazonaws.com
kafka.seed.broker.port=9092
zookeeper.quorum=z-3.databus.5ycvu3.c5.kafka.us-east-1.amazonaws.com:2181,z-2.databus.5ycvu3.c5.kafka.us-east-1.amazonaws.com:2181,z-1.databus.5ycvu3.c5.kafka.us-east-1.amazonaws.com:2181
secor.max.file.size.bytes=10000
secor.max.file.age.seconds=10
ostrich.port=9998
secor.kafka.group=qaasi
secor.local.path=/tmp/secor
secor.upload.deterministic=false
secor.upload.on.shutdown=true
kafka.client.className=com.pinterest.secor.common.SecorKafkaClient
kafka.new.consumer.poll.timeout.seconds=10
kafka.new.consumer.auto.offset.reset=earliest
kafka.new.consumer.request.timeout.ms=
kafka.new.consumer.ssl.key.password=
kafka.new.consumer.ssl.keystore.location=
kafka.new.consumer.ssl.keystore.password=
kafka.new.consumer.ssl.truststore.location=
kafka.new.consumer.ssl.truststore.password=
kafka.new.consumer.isolation.level=
kafka.new.consumer.max.poll.records=
kafka.new.consumer.sasl.client.callback.handler.class=
kafka.new.consumer.sasl.jaas.config=
kafka.new.consumer.sasl.kerberos.service.name=
kafka.new.consumer.sasl.login.callback.handler.class=
kafka.new.consumer.sasl.login.class=
kafka.new.consumer.sasl.mechanism=
kafka.new.consumer.security.protocol=
kafka.new.consumer.ssl.enabled.protocols=
kafka.new.consumer.ssl.keystore.type=
kafka.new.consumer.ssl.protocol=
kafka.new.consumer.ssl.provider=
kafka.new.consumer.ssl.truststore.type=
kafka.new.consumer.partition.assignment.strategy.class=
kafka.new.consumer.max.poll.interval.ms=300000
kafka.fetch.max.bytes=
#statsd.prefixWithConsumerGroup=true
statsd.dogstatsd.tags.enabled=false
statsd.dogstatsd.constant.tags=
Loading