Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Made /samples code descriptive and use new aws-sdk gem #38

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@

# for a library or gem, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# Gemfile.lock
# .ruby-version
# .ruby-gemset
Gemfile.lock
.ruby-gemset
.ruby-version

# unless supporting rvm < 1.11.0 or doing something fancy, ignore this:
.rvmrc

.byebug_history
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ To run the data processor, run the following commands:

```sh
cd samples
rake run properties_file=sample.properties
rake run_consumer properties_file=sample.properties
```

#### Notes
Expand Down Expand Up @@ -157,7 +157,7 @@ Amazon Linux can be found at `/usr/bin/java` and should be 1.7 or greater.
cd kclrb/samples
rake run_producer
# ... and in another terminal
rake run properties_file=sample.properties
rake run_consumer properties_file=sample.properties
```

## Under the Hood - What You Should Know about Amazon KCL's [MultiLangDaemon][multi-lang-daemon]
Expand Down
2 changes: 1 addition & 1 deletion aws-kclrb.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Gem::Specification.new do |spec|
spec.files += ['README.md', 'LICENSE.txt', 'VERSION', 'NOTICE.txt', '.yardopts', '.rspec']
spec.licenses = ['Apache-2.0']
spec.platform = Gem::Platform::RUBY
spec.homepage = 'http://github.com/aws/amazon-kinesis-client-ruby'
spec.homepage = 'https://github.com/awslabs/amazon-kinesis-client-ruby'
spec.require_paths = ['lib']

spec.add_dependency('multi_json', '~> 1.0')
Expand Down
8 changes: 8 additions & 0 deletions samples/Gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
source 'https://rubygems.org'

gem 'aws-sdk', '~> 3.0', '>= 3.0.1'
gem 'multi_json', '~> 1.13', '>= 1.13.1'
gem 'pry', '~> 0.12.2'
gem 'pry-byebug', '~> 3.7'
gem 'byebug', '~> 11.0', '>= 11.0.1'
gem 'aws-kclrb', '~> 2.0'
22 changes: 18 additions & 4 deletions samples/Rakefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

require 'open-uri'
require 'byebug'
require 'pathname'

SAMPLES_DIR = File.dirname(__FILE__)
JAR_DIR = File.join(SAMPLES_DIR, 'jars')
Expand Down Expand Up @@ -90,10 +91,12 @@ MAVEN_PACKAGES = [
['commons-collections', 'commons-collections', '3.2.2']
]

desc "download jar files"
task :download_jars => [JAR_DIR]

MAVEN_PACKAGES.each do |jar|
_, _, local_jar_file = get_maven_jar_info(*jar)
#desc "downlods jar file"
file local_jar_file do
puts "Downloading '#{local_jar_file}' from maven..."
download_maven_jar(*jar)
Expand All @@ -110,12 +113,23 @@ task :run_producer do |t|
sh *commands
end

desc "Run KCL sample processor"
task :run => :download_jars do |t|
desc "adds a single record to the stream"
task :put_record, [:data] do |t, args|
fail "you must provide 'data' for the record you want to add" unless args.data
puts "adding record with data: #{args.data}"
commands = %W(
#{SAMPLES_DIR}/sample_kcl_producer.rb -o #{args.data}
)
sh *commands
end

desc "Run KCL sample processor aka consumer"
task :run_consumer => :download_jars do |t|
java_home = ENV['JAVA_HOME']
fail "JAVA_HOME environment variable not set." unless java_home
properties_file = ENV['properties_file']
unless properties_file
properties_file = './sample.properties' if properties_file.nil?
unless Pathname.new(properties_file).exist?
fail "Properties file not provided. Use \"rake run properties_file=<PATH_TO_FILE> to provide it.\""
end
log_configuration = ENV['log_configuration']
Expand Down
2 changes: 1 addition & 1 deletion samples/sample_kcl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def process_record(record, data)
else
length = data.length
end
STDERR.puts("ShardId: #{@shard_id}, Partition Key: #{record['partitionKey']}, Sequence Number:#{record['sequenceNumber']}, Length of data: #{length}")
STDERR.puts("ShardId: #{@shard_id}, Partition Key: #{record['partitionKey']}, Sequence Number:#{record['sequenceNumber']}, Length of data: #{length}, data: #{data}")
rescue => e
STDERR.puts "#{e}: Failed to process record '#{record}'"
end
Expand Down
22 changes: 19 additions & 3 deletions samples/sample_kcl_producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

require 'aws-sdk-core'
require 'multi_json'
require 'optparse'
require 'aws-sdk'
require 'byebug'

# @api private
class SampleProducer
Expand Down Expand Up @@ -63,6 +64,11 @@ def put_record
puts "Put record to shard '#{r[:shard_id]}' (#{r[:sequence_number]}): '#{MultiJson.dump(data)}'"
end

def only_record_to_put(data)
r = @kinesis.put_record(:stream_name => @stream_name, :data => data.to_s, :partition_key => to_s)
puts "Put record to shard '#{r[:shard_id]}' (#{r[:sequence_number]}): '#{MultiJson.dump(data)}'"
end

private
def get_data
{
Expand Down Expand Up @@ -94,6 +100,7 @@ def wait_for_stream_to_become_active
shard_count = nil
sleep_between_puts = 0.25
timeout = 0
only_record_to_put = nil
# Get and parse options
option_parser = OptionParser.new do |opts|
opts.banner = "Usage: #{File.basename($0)} [options]"
Expand All @@ -118,6 +125,9 @@ def wait_for_stream_to_become_active
puts opts
exit
end
opts.on("-o DATA", "--only_record_to_put DATA", "provide data for the only record to put ont he stream") do |d|
only_record_to_put = d
end
end
begin
option_parser.parse!
Expand All @@ -133,7 +143,13 @@ def wait_for_stream_to_become_active
kconfig = {}
kconfig[:region] = aws_region if aws_region
kinesis = Aws::Kinesis::Client.new(kconfig)

producer = SampleProducer.new(kinesis, stream_name, sleep_between_puts, shard_count)
producer.run(timeout)

if only_record_to_put
producer.only_record_to_put(only_record_to_put)
else
producer.run(timeout)
end


end