Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
Binary file added lib/.DS_Store
Binary file not shown.
Binary file added lib/fluent/.DS_Store
Binary file not shown.
44 changes: 32 additions & 12 deletions lib/fluent/plugin/out_redshift_v2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ def insert_logs(chunk)

def create_gz_file(chunk)
tmp = Tempfile.new("s3-")
tmp.binmode
if json? || msgpack?
tmp = create_gz_file_from_structured_data(tmp, chunk)
else
tmp = create_gz_file_from_flat_data(tmp, chunk)
end
tmp =
if json? || msgpack?
create_gz_file_from_structured_data(tmp, chunk)
else
create_gz_file_from_flat_data(tmp, chunk)
end

if tmp
key = next_gz_path
Expand Down Expand Up @@ -400,24 +400,44 @@ def create_redshift_connection
raise RedshiftError.new("Unable to create a new connection.") unless conn
raise RedshiftError.new("Connection failed: %s" % [ conn.error_message ]) if conn.status == PG::CONNECTION_BAD

socket = conn.socket_io
output_progress = "Starting connection..."

poll_status = PG::PGRES_POLLING_WRITING
until poll_status == PG::PGRES_POLLING_OK || poll_status == PG::PGRES_POLLING_FAILED
case poll_status
when PG::PGRES_POLLING_READING
io = IO.select([socket], nil, nil, db_conf[:connect_timeout])
raise RedshiftError.new("Asynchronous connection timed out!(READING)") unless io
output_progress = " waiting for socket to become readable"
select( [conn.socket_io], nil, nil, 5.0 ) or
raise "Asynchronous connection timed out!"
when PG::PGRES_POLLING_WRITING
io = IO.select(nil, [socket], nil, db_conf[:connect_timeout])
raise RedshiftError.new("Asynchronous connection timed out!(WRITING)") unless io
output_progress = " waiting for socket to become writable"
select( nil, [conn.socket_io], nil, 5.0 ) or
raise "Asynchronous connection timed out!"
end
# Output a status message about the progress
case conn.status
when PG::CONNECTION_STARTED
output_progress = " waiting for connection to be made."
when PG::CONNECTION_MADE
output_progress = " connection OK; waiting to send."
when PG::CONNECTION_AWAITING_RESPONSE
output_progress = " waiting for a response from the server."
when PG::CONNECTION_AUTH_OK
output_progress = " received authentication; waiting for backend start-up to finish."
when PG::CONNECTION_SSL_STARTUP
output_progress = " negotiating SSL encryption."
when PG::CONNECTION_SETENV
output_progress = " negotiating environment-driven parameter settings."
when PG::CONNECTION_NEEDED
output_progress = " internal state: connect() needed."
end

poll_status = conn.connect_poll
end

unless conn.status == PG::CONNECTION_OK
raise RedshiftError, ("Connect failed: %s" % [conn.error_message.to_s.lines.uniq.join(" ")])
end

conn
rescue => e
conn.close rescue nil if conn
Expand Down