Skip to content
This repository has been archived by the owner on Dec 22, 2020. It is now read-only.

Commit

Permalink
Update Mongo driver to 2.0+
Browse files Browse the repository at this point in the history
Connects to #107
  • Loading branch information
John Nason committed Jan 5, 2018
1 parent aa76a2d commit 41371f3
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 70 deletions.
4 changes: 2 additions & 2 deletions lib/mosql/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ def parse_args
end

def connect_mongo
@mongo = Mongo::MongoClient.from_uri(options[:mongo])
config = @mongo['admin'].command(:ismaster => 1)
@mongo = Mongo::Client.new(options[:mongo])
config = @mongo.use('admin').command(:ismaster => 1).documents.first
if !config['setName'] && !options[:skip_tail]
log.warn("`#{options[:mongo]}' is not a replset.")
log.warn("Will run the initial import, then stop.")
Expand Down
12 changes: 6 additions & 6 deletions lib/mosql/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,12 @@ def transform_primitive(v, type=nil)
v.to_s
when BSON::Binary
if type.downcase == 'uuid'
v.to_s.unpack("H*").first
v.data.to_s.unpack("H*").first
else
Sequel::SQL::Blob.new(v.to_s)
Sequel::SQL::Blob.new(v.data.to_s)
end
when BSON::DBRef
v.object_id.to_s
when Mongo::DBRef
v.id.to_s
else
v
end
Expand All @@ -209,7 +209,7 @@ def transform(ns, obj, schema=nil)

# Do a deep clone, because we're potentially going to be
# mutating embedded objects.
obj = BSON.deserialize(BSON.serialize(obj))
obj = Marshal.load(Marshal.dump(obj))

row = []
schema[:columns].each do |col|
Expand Down Expand Up @@ -259,7 +259,7 @@ def sanitize(value)
when Array
value.map {|v| sanitize(v)}
when BSON::Binary
Base64.encode64(value.to_s)
Base64.encode64(value.data.to_s)
when Float
# NaN is illegal in JSON. Translate into null.
value.nan? ? nil : value
Expand Down
32 changes: 15 additions & 17 deletions lib/mosql/streamer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def import

def collection_for_ns(ns)
dbname, collection = ns.split(".", 2)
@mongo.db(dbname).collection(collection)
@mongo.use(dbname)[collection]
end

def unsafe_handle_exceptions(ns, obj)
Expand Down Expand Up @@ -114,7 +114,7 @@ def initial_import
end

log.info("Importing for Mongo DB #{dbname}...")
db = @mongo.db(dbname)
db = @mongo.use(dbname)
collections = db.collections.select { |c| spec.key?(c.name) }

collections.each do |collection|
Expand All @@ -141,21 +141,19 @@ def import_collection(ns, collection, filter)

start = Time.now
sql_time = 0
collection.find(filter, :batch_size => BATCH) do |cursor|
with_retries do
cursor.each do |obj|
batch << @schema.transform(ns, obj)
count += 1

if batch.length >= BATCH
sql_time += track_time do
bulk_upsert(table, ns, batch)
end
elapsed = Time.now - start
log.info("Imported #{count} rows (#{elapsed}s, #{sql_time}s SQL)...")
batch.clear
exit(0) if @done
with_retries do
collection.find(filter, :batch_size => BATCH).each do |obj|
batch << @schema.transform(ns, obj)
count += 1

if batch.length >= BATCH
sql_time += track_time do
bulk_upsert(table, ns, batch)
end
elapsed = Time.now - start
log.info("Imported #{count} rows (#{elapsed}s, #{sql_time}s SQL)...")
batch.clear
exit(0) if @done
end
end
end
Expand All @@ -179,7 +177,7 @@ def optail
end

def sync_object(ns, selector)
obj = collection_for_ns(ns).find_one(selector)
obj = collection_for_ns(ns).find(selector).limit(1).first
if obj
unsafe_handle_exceptions(ns, obj) do
@sql.upsert_ns(ns, obj)
Expand Down
8 changes: 4 additions & 4 deletions mosql.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency "log4r"
gem.add_runtime_dependency "json"

gem.add_runtime_dependency "mongoriver", "0.4"
gem.add_runtime_dependency "mongoriver", "0.5"

gem.add_runtime_dependency "mongo", "~> 1.10"
gem.add_runtime_dependency "bson", "~> 1.10"
gem.add_runtime_dependency "bson_ext", "~> 1.10"
gem.add_runtime_dependency "mongo", "~> 2.0"
gem.add_runtime_dependency "bson", "~> 4.0"
gem.add_runtime_dependency "bson_ext"

gem.add_development_dependency "minitest"
gem.add_development_dependency "mocha"
Expand Down
8 changes: 2 additions & 6 deletions test/functional/_lib.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def connect_sql

def connect_mongo
begin
Mongo::Connection.from_uri(mongo_test_uri)
rescue Mongo::ConnectionFailure, Mongo::ConnectionError
mongo = Mongo::Client.new(mongo_test_uri, { database: mongo_test_dbname } )
rescue Mongo::Error
$stderr.puts <<EOF
*********************************************************************
Expand All @@ -53,10 +53,6 @@ def connect_mongo
end
end

def mongo_db
mongo.db(mongo_test_dbname)
end

def setup
Sequel.default_timezone = :utc
@sequel = connect_sql
Expand Down
38 changes: 20 additions & 18 deletions test/functional/streamer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ def build_streamer

# $set's are currently a bit of a hack where we read the object
# from the db, so make sure the new object exists in mongo
connect_mongo['mosql_test']['collection'].insert(o.merge('var' => 100),
:w => 1)
mongo.use('mosql_test')['collection'].insert_one(o.merge('var' => 100),
:w => 1)

@streamer.handle_op({ 'ns' => 'mosql_test.collection',
'op' => 'u',
Expand Down Expand Up @@ -172,8 +172,10 @@ def build_streamer

# $set's are currently a bit of a hack where we read the object
# from the db, so make sure the new object exists in mongo
connect_mongo['mosql_test']['renameid'].insert(o.merge('goats' => 0),
:w => 1)
#connect_mongo['mosql_test'].insert(o.merge('goats' => 0),
#mongo['mosql_test'].insert_one(o.merge('goats' => 0),
mongo.use('mosql_test')['renameid'].insert_one(o.merge('goats' => 0),
:w => 1)

@streamer.handle_op({ 'ns' => 'mosql_test.renameid',
'op' => 'u',
Expand All @@ -197,9 +199,9 @@ def build_streamer
it 'filters unwanted records' do
data = [{:_id => BSON::ObjectId.from_time(Time.utc(2014, 7, 1)), :var => 2},
{:_id => BSON::ObjectId.from_time(Time.utc(2014, 7, 2)), :var => 3}]
collection = mongo["filter_test"]["collection"]
collection = mongo.use('filter_test')['collection']
collection.drop
data.map { |rec| collection.insert(rec)}
data.map { |rec| collection.insert_one(rec)}

@streamer.options[:skip_tail] = true
@streamer.initial_import
Expand All @@ -214,14 +216,14 @@ def build_streamer
it 'handles "u" ops with a compsite key' do
date = Time.utc(2014, 7, 1)
o = {'_id' => {'s' => 'asdf', 't' => date}, 'var' => 'data'}
collection = mongo["composite_key_test"]["collection"]
collection = mongo.use('composite_key_test')['collection']
collection.drop
collection.insert(o)
collection.insert_one(o)

@streamer.options[:skip_tail] = true
@streamer.initial_import

collection.update({ '_id' => { 's' => 'asdf', 't' => date}}, { '$set' => { 'var' => 'new_data'}})
collection.update_one({ '_id' => { 's' => 'asdf', 't' => date}}, { '$set' => { 'var' => 'new_data'}})
@streamer.handle_op({'ns' => 'composite_key_test.collection',
'op' => 'u',
'o2' => { '_id' => { 's' => 'asdf', 't' => date}},
Expand All @@ -234,9 +236,9 @@ def build_streamer

it 'handles composite keys' do
o = {'_id' => {'s' => 'asdf', 't' => Time.new}, 'var' => 'data'}
collection = mongo["composite_key_test"]["collection"]
collection = mongo.use('composite_key_test')['collection']
collection.drop
collection.insert(o)
collection.insert_one(o)

@streamer.options[:skip_tail] = true
@streamer.initial_import
Expand Down Expand Up @@ -333,9 +335,9 @@ def build_streamer
it 'imports from all dbs' do
ids = (1.upto(4)).map { BSON::ObjectId.new }
ids.each_with_index do |_id, i|
collection = mongo["test_#{i}"]['collection']
collection = mongo.use("test_#{i}")['collection']
collection.drop
collection.insert({:_id => _id, :var => i}, :w => 1)
collection.insert_one({:_id => _id, :var => i}, :w => 1)
end

@streamer.options[:skip_tail] = true
Expand All @@ -361,7 +363,7 @@ def build_streamer
@map = MoSQL::Schema.new(YAML.load(TIMESTAMP_MAP))
@adapter = MoSQL::SQLAdapter.new(@map, sql_test_uri)

mongo['db']['has_timestamp'].drop
mongo.use('db')['has_timestamp'].drop
@sequel.drop_table?(:has_timestamp)
@map.create_schema(@sequel)

Expand All @@ -370,7 +372,7 @@ def build_streamer

it 'preserves milliseconds on import' do
ts = Time.utc(2014, 8, 7, 6, 54, 32, 123000)
mongo['db']['has_timestamp'].insert({ts: ts})
mongo.use('db')['has_timestamp'].insert_one({ts: ts})
@streamer.options[:skip_tail] = true
@streamer.initial_import

Expand All @@ -382,19 +384,19 @@ def build_streamer

it 'preserves milliseconds on tailing' do
ts = Time.utc(2006,01,02, 15,04,05,678000)
id = mongo['db']['has_timestamp'].insert({ts: ts})
id = mongo.use('db')['has_timestamp'].insert_one({ts: ts}).inserted_id
@streamer.handle_op(
{
"ts" => {"t" => 1408647630, "i" => 4},
"h" => -965650193548512059,
"v" => 2,
"op" => "i",
"ns" => "db.has_timestamp",
"o" => mongo['db']['has_timestamp'].find_one({_id: id})
"o" => mongo.use('db')['has_timestamp'].find({_id: id}).first
})
got = @sequel[:has_timestamp].where(:_id => id.to_s).select.first[:ts]
assert_equal(ts.to_i, got.to_i)
assert_equal(ts.tv_usec, got.tv_usec)
end
end
end
end
24 changes: 11 additions & 13 deletions test/functional/transform.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class MoSQL::Test::Functional::TransformTest < MoSQL::Test::Functional
'stringy'
],
[
BSON::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000001')),
Mongo::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000001')).id,
'TEXT',
'5405fae77c584947fc000001'
],
Expand All @@ -41,29 +41,27 @@ class MoSQL::Test::Functional::TransformTest < MoSQL::Test::Functional
],
[
[
BSON::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000001')),
BSON::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000002'))
Mongo::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000001')).id,
Mongo::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000002')).id
],
'TEXT ARRAY',
['5405fae77c584947fc000001', '5405fae77c584947fc000002']
],
[
[
BSON::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000001')),
BSON::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000002'))
Mongo::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000001')).id,
Mongo::DBRef.new('db.otherns', BSON::ObjectId.from_string('5405fae77c584947fc000002')).id
],
'TEXT',
['5405fae77c584947fc000001', '5405fae77c584947fc000002'].to_json
],
[
BSON::Binary.new(["2d931510d99f494a8c6787feb05e1594"].pack("H*"),
BSON::Binary::SUBTYPE_UUID),
BSON::Binary.new(["2d931510d99f494a8c6787feb05e1594"].pack("H*"), :uuid),
'UUID',
"2d931510-d99f-494a-8c67-87feb05e1594"
],
[
BSON::Binary.new(["deadbeefcafebabe"].pack("H*"),
BSON::Binary::SUBTYPE_SIMPLE),
BSON::Binary.new(["deadbeefcafebabe"].pack("H*"), :generic),
'BYTEA',
["deadbeefcafebabe"].pack("H*")
]
Expand All @@ -84,7 +82,7 @@ class MoSQL::Test::Functional::TransformTest < MoSQL::Test::Functional
schema = MoSQL::Schema.new(map)
adapter = MoSQL::SQLAdapter.new(schema, sql_test_uri)
@sequel.drop_table?(:test_transform)
collection = @mongo['test']['test_transform']
collection = @mongo.use('test')['test_transform']
collection.drop

schema.create_schema(@sequel)
Expand All @@ -96,23 +94,23 @@ class MoSQL::Test::Functional::TransformTest < MoSQL::Test::Functional

# Test initial import
id = 'imported'
collection.insert({_id: id, value: mongo})
collection.insert_one({_id: id, value: mongo})
streamer.initial_import

got = @sequel[:test_transform].where(_id: id).to_a
assert_equal(sql, got.first[:value], "was able to transform a #{typ} field on initial import")

# Test streaming an insert
id = 'inserted'
collection.insert({_id: id, value: mongo})
collection.insert_one({_id: id, value: mongo})
streamer.handle_op(
{
"ts" => {"t" => 1408647630, "i" => 4},
"h" => -965650193548512059,
"v" => 2,
"op" => "i",
"ns" => "test.test_transform",
"o" => collection.find_one(_id: id)
"o" => collection.find(_id: id).first
})

got = @sequel[:test_transform].where(_id: id).to_a
Expand Down
8 changes: 4 additions & 4 deletions test/unit/lib/mosql/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,13 @@ class MoSQL::Test::SchemaTest < MoSQL::Test
it 'extracts object ids from a DBRef' do
oid = BSON::ObjectId.new
out = @map.transform('db.collection', {'_id' => "row 1",
'str' => BSON::DBRef.new('db.otherns', oid)})
'str' => Mongo::DBRef.new('db.otherns', oid)})
assert_equal(["row 1", nil, oid.to_s, nil], out)
end

it 'converts DBRef to object id in arrays' do
oid = [ BSON::ObjectId.new, BSON::ObjectId.new]
o = {'_id' => "row 1", "str" => [ BSON::DBRef.new('db.otherns', oid[0]), BSON::DBRef.new('db.otherns', oid[1]) ] }
o = {'_id' => "row 1", "str" => [ Mongo::DBRef.new('db.otherns', oid[0]), Mongo::DBRef.new('db.otherns', oid[1]) ] }
out = @map.transform('db.collection', o)
assert_equal(["row 1", nil, JSON.dump(oid.map! {|o| o.to_s}), nil ], out)
end
Expand All @@ -219,8 +219,8 @@ class MoSQL::Test::SchemaTest < MoSQL::Test
it 'base64-encodes BSON::Binary blobs in extra_props' do
out = @map.transform('db.with_extra_props',
{'_id' => 7,
'blob' => BSON::Binary.new("\x00\x00\x00"),
'embedded' => {'thing' => BSON::Binary.new("\x00\x00\x00")}})
'blob' => BSON::Binary.new("\x00\x00\x00", :generic),
'embedded' => {'thing' => BSON::Binary.new("\x00\x00\x00", :generic)}})
extra = JSON.parse(out[1])
assert(extra.key?('blob'))
assert_equal('AAAA', extra['blob'].strip)
Expand Down

1 comment on commit 41371f3

@Sergepetroff
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, is it still maintained? if so - I got this error while trying to import on every table. Using Ryby 2.7 and your cloned MoSQL and Mongoriver

2: from /usr/local/bundle/gems/sequel-5.30.0/lib/sequel/connection_pool/threaded.rb:92:in hold' 1: from /usr/local/bundle/gems/mosql-0.5.0/lib/mosql/schema.rb:301:in block in copy_data'
/usr/local/bundle/gems/mosql-0.5.0/lib/mosql/schema.rb:301:in `check': ERROR: duplicate key value violates unique constraint "games_pkey" (PG::UniqueViolation)
DETAIL: Key (_id)=(\x356366393066303331356261373630666561326537643163) already exists.
CONTEXT: COPY games, line 729

Please sign in to comment.