diff --git a/.gitignore b/.gitignore index f46f60a..aa4ba32 100644 --- a/.gitignore +++ b/.gitignore @@ -28,9 +28,11 @@ # for a library or gem, you might want to ignore these files since the code is # intended to run in multiple environments; otherwise, check them in: -# Gemfile.lock -# .ruby-version -# .ruby-gemset +Gemfile.lock +.ruby-gemset +.ruby-version # unless supporting rvm < 1.11.0 or doing something fancy, ignore this: .rvmrc + +.byebug_history \ No newline at end of file diff --git a/README.md b/README.md index c3bb906..81c4357 100644 --- a/README.md +++ b/README.md @@ -114,7 +114,7 @@ To run the data processor, run the following commands: ```sh cd samples - rake run properties_file=sample.properties + rake run_consumer properties_file=sample.properties ``` #### Notes @@ -157,7 +157,7 @@ Amazon Linux can be found at `/usr/bin/java` and should be 1.7 or greater. cd kclrb/samples rake run_producer # ... and in another terminal - rake run properties_file=sample.properties + rake run_consumer properties_file=sample.properties ``` ## Under the Hood - What You Should Know about Amazon KCL's [MultiLangDaemon][multi-lang-daemon] diff --git a/aws-kclrb.gemspec b/aws-kclrb.gemspec index b2958de..c7f769f 100644 --- a/aws-kclrb.gemspec +++ b/aws-kclrb.gemspec @@ -10,7 +10,7 @@ Gem::Specification.new do |spec| spec.files += ['README.md', 'LICENSE.txt', 'VERSION', 'NOTICE.txt', '.yardopts', '.rspec'] spec.licenses = ['Apache-2.0'] spec.platform = Gem::Platform::RUBY - spec.homepage = 'http://github.com/aws/amazon-kinesis-client-ruby' + spec.homepage = 'https://github.com/awslabs/amazon-kinesis-client-ruby' spec.require_paths = ['lib'] spec.add_dependency('multi_json', '~> 1.0') diff --git a/samples/Gemfile b/samples/Gemfile new file mode 100644 index 0000000..8edc1c7 --- /dev/null +++ b/samples/Gemfile @@ -0,0 +1,8 @@ +source 'https://rubygems.org' + +gem 'aws-sdk', '~> 3.0', '>= 3.0.1' +gem 'multi_json', '~> 1.13', '>= 1.13.1' +gem 'pry', '~> 0.12.2' +gem 'pry-byebug', '~> 3.7' +gem 'byebug', '~> 11.0', '>= 11.0.1' +gem 'aws-kclrb', '~> 2.0' diff --git a/samples/Rakefile b/samples/Rakefile index 0bc54ff..cfdd711 100644 --- a/samples/Rakefile +++ b/samples/Rakefile @@ -1,8 +1,9 @@ # # Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 - require 'open-uri' +require 'byebug' +require 'pathname' SAMPLES_DIR = File.dirname(__FILE__) JAR_DIR = File.join(SAMPLES_DIR, 'jars') @@ -90,10 +91,12 @@ MAVEN_PACKAGES = [ ['commons-collections', 'commons-collections', '3.2.2'] ] +desc "download jar files" task :download_jars => [JAR_DIR] MAVEN_PACKAGES.each do |jar| _, _, local_jar_file = get_maven_jar_info(*jar) + #desc "downlods jar file" file local_jar_file do puts "Downloading '#{local_jar_file}' from maven..." download_maven_jar(*jar) @@ -110,12 +113,23 @@ task :run_producer do |t| sh *commands end -desc "Run KCL sample processor" -task :run => :download_jars do |t| +desc "adds a single record to the stream" +task :put_record, [:data] do |t, args| + fail "you must provide 'data' for the record you want to add" unless args.data + puts "adding record with data: #{args.data}" + commands = %W( + #{SAMPLES_DIR}/sample_kcl_producer.rb -o #{args.data} + ) + sh *commands +end + +desc "Run KCL sample processor aka consumer" +task :run_consumer => :download_jars do |t| java_home = ENV['JAVA_HOME'] fail "JAVA_HOME environment variable not set." unless java_home properties_file = ENV['properties_file'] - unless properties_file + properties_file = './sample.properties' if properties_file.nil? + unless Pathname.new(properties_file).exist? fail "Properties file not provided. Use \"rake run properties_file= to provide it.\"" end log_configuration = ENV['log_configuration'] diff --git a/samples/sample_kcl.rb b/samples/sample_kcl.rb index aee132b..78bbe81 100755 --- a/samples/sample_kcl.rb +++ b/samples/sample_kcl.rb @@ -78,7 +78,7 @@ def process_record(record, data) else length = data.length end - STDERR.puts("ShardId: #{@shard_id}, Partition Key: #{record['partitionKey']}, Sequence Number:#{record['sequenceNumber']}, Length of data: #{length}") + STDERR.puts("ShardId: #{@shard_id}, Partition Key: #{record['partitionKey']}, Sequence Number:#{record['sequenceNumber']}, Length of data: #{length}, data: #{data}") rescue => e STDERR.puts "#{e}: Failed to process record '#{record}'" end diff --git a/samples/sample_kcl_producer.rb b/samples/sample_kcl_producer.rb index 9a1936c..d9ae4f1 100755 --- a/samples/sample_kcl_producer.rb +++ b/samples/sample_kcl_producer.rb @@ -3,9 +3,10 @@ # Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 -require 'aws-sdk-core' require 'multi_json' require 'optparse' +require 'aws-sdk' +require 'byebug' # @api private class SampleProducer @@ -63,6 +64,11 @@ def put_record puts "Put record to shard '#{r[:shard_id]}' (#{r[:sequence_number]}): '#{MultiJson.dump(data)}'" end + def only_record_to_put(data) + r = @kinesis.put_record(:stream_name => @stream_name, :data => data.to_s, :partition_key => to_s) + puts "Put record to shard '#{r[:shard_id]}' (#{r[:sequence_number]}): '#{MultiJson.dump(data)}'" + end + private def get_data { @@ -94,6 +100,7 @@ def wait_for_stream_to_become_active shard_count = nil sleep_between_puts = 0.25 timeout = 0 + only_record_to_put = nil # Get and parse options option_parser = OptionParser.new do |opts| opts.banner = "Usage: #{File.basename($0)} [options]" @@ -118,6 +125,9 @@ def wait_for_stream_to_become_active puts opts exit end + opts.on("-o DATA", "--only_record_to_put DATA", "provide data for the only record to put ont he stream") do |d| + only_record_to_put = d + end end begin option_parser.parse! @@ -133,7 +143,13 @@ def wait_for_stream_to_become_active kconfig = {} kconfig[:region] = aws_region if aws_region kinesis = Aws::Kinesis::Client.new(kconfig) - producer = SampleProducer.new(kinesis, stream_name, sleep_between_puts, shard_count) - producer.run(timeout) + + if only_record_to_put + producer.only_record_to_put(only_record_to_put) + else + producer.run(timeout) + end + + end