diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..f9f60d0 Binary files /dev/null and b/.DS_Store differ diff --git a/lib/.DS_Store b/lib/.DS_Store new file mode 100644 index 0000000..6557e23 Binary files /dev/null and b/lib/.DS_Store differ diff --git a/lib/fluent/.DS_Store b/lib/fluent/.DS_Store new file mode 100644 index 0000000..a8f1fcc Binary files /dev/null and b/lib/fluent/.DS_Store differ diff --git a/lib/fluent/plugin/out_redshift_v2.rb b/lib/fluent/plugin/out_redshift_v2.rb index 639dec6..9178c33 100644 --- a/lib/fluent/plugin/out_redshift_v2.rb +++ b/lib/fluent/plugin/out_redshift_v2.rb @@ -125,12 +125,12 @@ def insert_logs(chunk) def create_gz_file(chunk) tmp = Tempfile.new("s3-") - tmp.binmode - if json? || msgpack? - tmp = create_gz_file_from_structured_data(tmp, chunk) - else - tmp = create_gz_file_from_flat_data(tmp, chunk) - end + tmp = + if json? || msgpack? + create_gz_file_from_structured_data(tmp, chunk) + else + create_gz_file_from_flat_data(tmp, chunk) + end if tmp key = next_gz_path @@ -400,24 +400,44 @@ def create_redshift_connection raise RedshiftError.new("Unable to create a new connection.") unless conn raise RedshiftError.new("Connection failed: %s" % [ conn.error_message ]) if conn.status == PG::CONNECTION_BAD - socket = conn.socket_io + output_progress = "Starting connection..." + poll_status = PG::PGRES_POLLING_WRITING until poll_status == PG::PGRES_POLLING_OK || poll_status == PG::PGRES_POLLING_FAILED case poll_status when PG::PGRES_POLLING_READING - io = IO.select([socket], nil, nil, db_conf[:connect_timeout]) - raise RedshiftError.new("Asynchronous connection timed out!(READING)") unless io + output_progress = " waiting for socket to become readable" + select( [conn.socket_io], nil, nil, 5.0 ) or + raise "Asynchronous connection timed out!" when PG::PGRES_POLLING_WRITING - io = IO.select(nil, [socket], nil, db_conf[:connect_timeout]) - raise RedshiftError.new("Asynchronous connection timed out!(WRITING)") unless io + output_progress = " waiting for socket to become writable" + select( nil, [conn.socket_io], nil, 5.0 ) or + raise "Asynchronous connection timed out!" + end + # Output a status message about the progress + case conn.status + when PG::CONNECTION_STARTED + output_progress = " waiting for connection to be made." + when PG::CONNECTION_MADE + output_progress = " connection OK; waiting to send." + when PG::CONNECTION_AWAITING_RESPONSE + output_progress = " waiting for a response from the server." + when PG::CONNECTION_AUTH_OK + output_progress = " received authentication; waiting for backend start-up to finish." + when PG::CONNECTION_SSL_STARTUP + output_progress = " negotiating SSL encryption." + when PG::CONNECTION_SETENV + output_progress = " negotiating environment-driven parameter settings." + when PG::CONNECTION_NEEDED + output_progress = " internal state: connect() needed." end + poll_status = conn.connect_poll end unless conn.status == PG::CONNECTION_OK raise RedshiftError, ("Connect failed: %s" % [conn.error_message.to_s.lines.uniq.join(" ")]) end - conn rescue => e conn.close rescue nil if conn