Skip to content
This repository has been archived by the owner on Dec 22, 2020. It is now read-only.

Update Mongo driver to 2.0+ #132

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/mosql/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ def parse_args
end

def connect_mongo
@mongo = Mongo::MongoClient.from_uri(options[:mongo])
config = @mongo['admin'].command(:ismaster => 1)
@mongo = Mongo::Client.new(options[:mongo])
config = @mongo.use('admin').command(:ismaster => 1).documents.first
if !config['setName'] && !options[:skip_tail]
log.warn("`#{options[:mongo]}' is not a replset.")
log.warn("Will run the initial import, then stop.")
Expand Down
12 changes: 6 additions & 6 deletions lib/mosql/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,12 @@ def transform_primitive(v, type=nil)
v.to_s
when BSON::Binary
if type.downcase == 'uuid'
v.to_s.unpack("H*").first
v.data.to_s.unpack("H*").first
else
Sequel::SQL::Blob.new(v.to_s)
Sequel::SQL::Blob.new(v.data.to_s)
end
when BSON::DBRef
v.object_id.to_s
when Mongo::DBRef
v.id.to_s
else
v
end
Expand All @@ -209,7 +209,7 @@ def transform(ns, obj, schema=nil)

# Do a deep clone, because we're potentially going to be
# mutating embedded objects.
obj = BSON.deserialize(BSON.serialize(obj))
obj = Marshal.load(Marshal.dump(obj))

row = []
schema[:columns].each do |col|
Expand Down Expand Up @@ -259,7 +259,7 @@ def sanitize(value)
when Array
value.map {|v| sanitize(v)}
when BSON::Binary
Base64.encode64(value.to_s)
Base64.encode64(value.data.to_s)
when Float
# NaN is illegal in JSON. Translate into null.
value.nan? ? nil : value
Expand Down
32 changes: 15 additions & 17 deletions lib/mosql/streamer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def import

def collection_for_ns(ns)
dbname, collection = ns.split(".", 2)
@mongo.db(dbname).collection(collection)
@mongo.use(dbname)[collection]
end

def unsafe_handle_exceptions(ns, obj)
Expand Down Expand Up @@ -114,7 +114,7 @@ def initial_import
end

log.info("Importing for Mongo DB #{dbname}...")
db = @mongo.db(dbname)
db = @mongo.use(dbname)
collections = db.collections.select { |c| spec.key?(c.name) }

collections.each do |collection|
Expand All @@ -141,21 +141,19 @@ def import_collection(ns, collection, filter)

start = Time.now
sql_time = 0
collection.find(filter, :batch_size => BATCH) do |cursor|
with_retries do
cursor.each do |obj|
batch << @schema.transform(ns, obj)
count += 1

if batch.length >= BATCH
sql_time += track_time do
bulk_upsert(table, ns, batch)
end
elapsed = Time.now - start
log.info("Imported #{count} rows (#{elapsed}s, #{sql_time}s SQL)...")
batch.clear
exit(0) if @done
with_retries do
collection.find(filter, :batch_size => BATCH).each do |obj|
batch << @schema.transform(ns, obj)
count += 1

if batch.length >= BATCH
sql_time += track_time do
bulk_upsert(table, ns, batch)
end
elapsed = Time.now - start
log.info("Imported #{count} rows (#{elapsed}s, #{sql_time}s SQL)...")
batch.clear
exit(0) if @done
end
end
end
Expand All @@ -179,7 +177,7 @@ def optail
end

def sync_object(ns, selector)
obj = collection_for_ns(ns).find_one(selector)
obj = collection_for_ns(ns).find(selector).limit(1).first
if obj
unsafe_handle_exceptions(ns, obj) do
@sql.upsert_ns(ns, obj)
Expand Down
2 changes: 1 addition & 1 deletion lib/mosql/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module MoSQL
VERSION = "0.4.3"
VERSION = "0.5.0"
end
8 changes: 4 additions & 4 deletions mosql.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency "log4r"
gem.add_runtime_dependency "json"

gem.add_runtime_dependency "mongoriver", "0.4"
gem.add_runtime_dependency "mongoriver", "0.5"

gem.add_runtime_dependency "mongo", "~> 1.10"
gem.add_runtime_dependency "bson", "~> 1.10"
gem.add_runtime_dependency "bson_ext", "~> 1.10"
gem.add_runtime_dependency "mongo", "~> 2.0"
gem.add_runtime_dependency "bson", "~> 4.0"
gem.add_runtime_dependency "bson_ext"

gem.add_development_dependency "minitest"
gem.add_development_dependency "mocha"
Expand Down
8 changes: 2 additions & 6 deletions test/functional/_lib.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def connect_sql

def connect_mongo
begin
Mongo::Connection.from_uri(mongo_test_uri)
rescue Mongo::ConnectionFailure, Mongo::ConnectionError
mongo = Mongo::Client.new(mongo_test_uri, { database: mongo_test_dbname } )
rescue Mongo::Error
$stderr.puts <<EOF

*********************************************************************
Expand All @@ -53,10 +53,6 @@ def connect_mongo
end
end

def mongo_db
mongo.db(mongo_test_dbname)
end

def setup
Sequel.default_timezone = :utc
@sequel = connect_sql
Expand Down
38 changes: 20 additions & 18 deletions test/functional/streamer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ def build_streamer

# $set's are currently a bit of a hack where we read the object
# from the db, so make sure the new object exists in mongo
connect_mongo['mosql_test']['collection'].insert(o.merge('var' => 100),
:w => 1)
mongo.use('mosql_test')['collection'].insert_one(o.merge('var' => 100),
:w => 1)

@streamer.handle_op({ 'ns' => 'mosql_test.collection',
'op' => 'u',
Expand Down Expand Up @@ -172,8 +172,10 @@ def build_streamer

# $set's are currently a bit of a hack where we read the object
# from the db, so make sure the new object exists in mongo
connect_mongo['mosql_test']['renameid'].insert(o.merge('goats' => 0),
:w => 1)
#connect_mongo['mosql_test'].insert(o.merge('goats' => 0),
#mongo['mosql_test'].insert_one(o.merge('goats' => 0),
mongo.use('mosql_test')['renameid'].insert_one(o.merge('goats' => 0),
:w => 1)

@streamer.handle_op({ 'ns' => 'mosql_test.renameid',
'op' => 'u',
Expand All @@ -197,9 +199,9 @@ def build_streamer
it 'filters unwanted records' do
data = [{:_id => BSON::ObjectId.from_time(Time.utc(2014, 7, 1)), :var => 2},
{:_id => BSON::ObjectId.from_time(Time.utc(2014, 7, 2)), :var => 3}]
collection = mongo["filter_test"]["collection"]
collection = mongo.use('filter_test')['collection']
collection.drop
data.map { |rec| collection.insert(rec)}
data.map { |rec| collection.insert_one(rec)}

@streamer.options[:skip_tail] = true
@streamer.initial_import
Expand All @@ -214,14 +216,14 @@ def build_streamer
it 'handles "u" ops with a compsite key' do
date = Time.utc(2014, 7, 1)
o = {'_id' => {'s' => 'asdf', 't' => date}, 'var' => 'data'}
collection = mongo["composite_key_test"]["collection"]
collection = mongo.use('composite_key_test')['collection']
collection.drop
collection.insert(o)
collection.insert_one(o)

@streamer.options[:skip_tail] = true
@streamer.initial_import

collection.update({ '_id' => { 's' => 'asdf', 't' => date}}, { '$set' => { 'var' => 'new_data'}})
collection.update_one({ '_id' => { 's' => 'asdf', 't' => date}}, { '$set' => { 'var' => 'new_data'}})
@streamer.handle_op({'ns' => 'composite_key_test.collection',
'op' => 'u',
'o2' => { '_id' => { 's' => 'asdf', 't' => date}},
Expand All @@ -234,9 +236,9 @@ def build_streamer

it 'handles composite keys' do
o = {'_id' => {'s' => 'asdf', 't' => Time.new}, 'var' => 'data'}
collection = mongo["composite_key_test"]["collection"]
collection = mongo.use('composite_key_test')['collection']
collection.drop
collection.insert(o)
collection.insert_one(o)

@streamer.options[:skip_tail] = true
@streamer.initial_import
Expand Down Expand Up @@ -333,9 +335,9 @@ def build_streamer
it 'imports from all dbs' do
ids = (1.upto(4)).map { BSON::ObjectId.new }
ids.each_with_index do |_id, i|
collection = mongo["test_#{i}"]['collection']
collection = mongo.use("test_#{i}")['collection']
collection.drop
collection.insert({:_id => _id, :var => i}, :w => 1)
collection.insert_one({:_id => _id, :var => i}, :w => 1)
end

@streamer.options[:skip_tail] = true
Expand All @@ -361,7 +363,7 @@ def build_streamer
@map = MoSQL::Schema.new(YAML.load(TIMESTAMP_MAP))
@adapter = MoSQL::SQLAdapter.new(@map, sql_test_uri)

mongo['db']['has_timestamp'].drop
mongo.use('db')['has_timestamp'].drop
@sequel.drop_table?(:has_timestamp)
@map.create_schema(@sequel)

Expand All @@ -370,7 +372,7 @@ def build_streamer

it 'preserves milliseconds on import' do
ts = Time.utc(2014, 8, 7, 6, 54, 32, 123000)
mongo['db']['has_timestamp'].insert({ts: ts})
mongo.use('db')['has_timestamp'].insert_one({ts: ts})
@streamer.options[:skip_tail] = true
@streamer.initial_import

Expand All @@ -382,19 +384,19 @@ def build_streamer

it 'preserves milliseconds on tailing' do
ts = Time.utc(2006,01,02, 15,04,05,678000)
id = mongo['db']['has_timestamp'].insert({ts: ts})
id = mongo.use('db')['has_timestamp'].insert_one({ts: ts}).inserted_id
@streamer.handle_op(
{
"ts" => {"t" => 1408647630, "i" => 4},
"h" => -965650193548512059,
"v" => 2,
"op" => "i",
"ns" => "db.has_timestamp",
"o" => mongo['db']['has_timestamp'].find_one({_id: id})
"o" => mongo.use('db')['has_timestamp'].find({_id: id}).first
})
got = @sequel[:has_timestamp].where(:_id => id.to_s).select.first[:ts]
assert_equal(ts.to_i, got.to_i)
assert_equal(ts.tv_usec, got.tv_usec)
end
end
end
end
24 changes: 11 additions & 13 deletions test/functional/transform.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class MoSQL::Test::Functional::TransformTest < MoSQL::Test::Functional
'stringy'
],
[
BSON::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000001')),
Mongo::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000001')).id,
'TEXT',
'5405fae77c584947fc000001'
],
Expand All @@ -41,29 +41,27 @@ class MoSQL::Test::Functional::TransformTest < MoSQL::Test::Functional
],
[
[
BSON::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000001')),
BSON::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000002'))
Mongo::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000001')).id,
Mongo::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000002')).id
],
'TEXT ARRAY',
['5405fae77c584947fc000001', '5405fae77c584947fc000002']
],
[
[
BSON::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000001')),
BSON::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000002'))
Mongo::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000001')).id,
Mongo::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000002')).id
],
'TEXT',
['5405fae77c584947fc000001', '5405fae77c584947fc000002'].to_json
],
[
BSON::Binary.new(["2d931510d99f494a8c6787feb05e1594"].pack("H*"),
BSON::Binary::SUBTYPE_UUID),
BSON::Binary.new(["2d931510d99f494a8c6787feb05e1594"].pack("H*"), :uuid),
'UUID',
"2d931510-d99f-494a-8c67-87feb05e1594"
],
[
BSON::Binary.new(["deadbeefcafebabe"].pack("H*"),
BSON::Binary::SUBTYPE_SIMPLE),
BSON::Binary.new(["deadbeefcafebabe"].pack("H*"), :generic),
'BYTEA',
["deadbeefcafebabe"].pack("H*")
]
Expand All @@ -84,7 +82,7 @@ class MoSQL::Test::Functional::TransformTest < MoSQL::Test::Functional
schema = MoSQL::Schema.new(map)
adapter = MoSQL::SQLAdapter.new(schema, sql_test_uri)
@sequel.drop_table?(:test_transform)
collection = @mongo['test']['test_transform']
collection = @mongo.use('test')['test_transform']
collection.drop

schema.create_schema(@sequel)
Expand All @@ -96,23 +94,23 @@ class MoSQL::Test::Functional::TransformTest < MoSQL::Test::Functional

# Test initial import
id = 'imported'
collection.insert({_id: id, value: mongo})
collection.insert_one({_id: id, value: mongo})
streamer.initial_import

got = @sequel[:test_transform].where(_id: id).to_a
assert_equal(sql, got.first[:value], "was able to transform a #{typ} field on initial import")

# Test streaming an insert
id = 'inserted'
collection.insert({_id: id, value: mongo})
collection.insert_one({_id: id, value: mongo})
streamer.handle_op(
{
"ts" => {"t" => 1408647630, "i" => 4},
"h" => -965650193548512059,
"v" => 2,
"op" => "i",
"ns" => "test.test_transform",
"o" => collection.find_one(_id: id)
"o" => collection.find(_id: id).first
})

got = @sequel[:test_transform].where(_id: id).to_a
Expand Down
8 changes: 4 additions & 4 deletions test/unit/lib/mosql/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,13 @@ class MoSQL::Test::SchemaTest < MoSQL::Test
it 'extracts object ids from a DBRef' do
oid = BSON::ObjectId.new
out = @map.transform('db.collection', {'_id' => "row 1",
'str' => BSON::DBRef.new('db.otherns', oid)})
'str' => Mongo::DBRef.new('db.otherns', oid)})
assert_equal(["row 1", nil, oid.to_s, nil], out)
end

it 'converts DBRef to object id in arrays' do
oid = [ BSON::ObjectId.new, BSON::ObjectId.new]
o = {'_id' => "row 1", "str" => [ BSON::DBRef.new('db.otherns', oid[0]), BSON::DBRef.new('db.otherns', oid[1]) ] }
o = {'_id' => "row 1", "str" => [ Mongo::DBRef.new('db.otherns', oid[0]), Mongo::DBRef.new('db.otherns', oid[1]) ] }
out = @map.transform('db.collection', o)
assert_equal(["row 1", nil, JSON.dump(oid.map! {|o| o.to_s}), nil ], out)
end
Expand All @@ -219,8 +219,8 @@ class MoSQL::Test::SchemaTest < MoSQL::Test
it 'base64-encodes BSON::Binary blobs in extra_props' do
out = @map.transform('db.with_extra_props',
{'_id' => 7,
'blob' => BSON::Binary.new("\x00\x00\x00"),
'embedded' => {'thing' => BSON::Binary.new("\x00\x00\x00")}})
'blob' => BSON::Binary.new("\x00\x00\x00", :generic),
'embedded' => {'thing' => BSON::Binary.new("\x00\x00\x00", :generic)}})
extra = JSON.parse(out[1])
assert(extra.key?('blob'))
assert_equal('AAAA', extra['blob'].strip)
Expand Down