diff --git a/airbyte-integrations/connectors/source-mongodb/.gitignore b/airbyte-integrations/connectors/source-mongodb/.gitignore deleted file mode 100644 index 10bd420c0524..000000000000 --- a/airbyte-integrations/connectors/source-mongodb/.gitignore +++ /dev/null @@ -1,5 +0,0 @@ -NEW_SOURCE_CHECKLIST.md -.ruby-gemset -.byebug_history - -tmp diff --git a/airbyte-integrations/connectors/source-mongodb/.ruby-version b/airbyte-integrations/connectors/source-mongodb/.ruby-version deleted file mode 100644 index 4a36342fcab7..000000000000 --- a/airbyte-integrations/connectors/source-mongodb/.ruby-version +++ /dev/null @@ -1 +0,0 @@ -3.0.0 diff --git a/airbyte-integrations/connectors/source-mongodb/Dockerfile b/airbyte-integrations/connectors/source-mongodb/Dockerfile deleted file mode 100644 index 7d38acfba3ce..000000000000 --- a/airbyte-integrations/connectors/source-mongodb/Dockerfile +++ /dev/null @@ -1,17 +0,0 @@ -FROM ruby:3.0-alpine - -RUN apk update -RUN apk add --update build-base libffi-dev - -WORKDIR /airbyte - -COPY . ./ - -RUN gem install bundler -RUN bundle install - -ENV AIRBYTE_ENTRYPOINT "ruby /airbyte/source.rb" -ENTRYPOINT ["ruby", "/airbyte/source.rb"] - -LABEL io.airbyte.name=airbyte/source-mongodb -LABEL io.airbyte.version=0.3.3 diff --git a/airbyte-integrations/connectors/source-mongodb/Dockerfile.test b/airbyte-integrations/connectors/source-mongodb/Dockerfile.test deleted file mode 100644 index 080bd7062f8f..000000000000 --- a/airbyte-integrations/connectors/source-mongodb/Dockerfile.test +++ /dev/null @@ -1,8 +0,0 @@ -FROM mongo:4.0.23 - -COPY ./integration_tests /integration_tests - -RUN echo "mongorestore --archive=integration_tests/dump/analytics.archive" > /docker-entrypoint-initdb.d/init.sh - -LABEL io.airbyte.version=0.1.0 -LABEL io.airbyte.name=airbyte/mongodb-integration-test-seed diff --git a/airbyte-integrations/connectors/source-mongodb/Gemfile b/airbyte-integrations/connectors/source-mongodb/Gemfile deleted file mode 100644 index 411ef252d7dc..000000000000 --- a/airbyte-integrations/connectors/source-mongodb/Gemfile +++ /dev/null @@ -1,8 +0,0 @@ -source 'https://rubygems.org' - -gem 'mongo' -gem 'slop' -gem 'dry-types' -gem 'dry-struct' - -# gem 'byebug' diff --git a/airbyte-integrations/connectors/source-mongodb/Gemfile.lock b/airbyte-integrations/connectors/source-mongodb/Gemfile.lock deleted file mode 100644 index 1c26567c4ab0..000000000000 --- a/airbyte-integrations/connectors/source-mongodb/Gemfile.lock +++ /dev/null @@ -1,44 +0,0 @@ -GEM - remote: https://rubygems.org/ - specs: - bson (4.12.0) - concurrent-ruby (1.1.8) - dry-configurable (0.12.1) - concurrent-ruby (~> 1.0) - dry-core (~> 0.5, >= 0.5.0) - dry-container (0.7.2) - concurrent-ruby (~> 1.0) - dry-configurable (~> 0.1, >= 0.1.3) - dry-core (0.5.0) - concurrent-ruby (~> 1.0) - dry-inflector (0.2.0) - dry-logic (1.1.0) - concurrent-ruby (~> 1.0) - dry-core (~> 0.5, >= 0.5) - dry-struct (1.4.0) - dry-core (~> 0.5, >= 0.5) - dry-types (~> 1.5) - ice_nine (~> 0.11) - dry-types (1.5.1) - concurrent-ruby (~> 1.0) - dry-container (~> 0.3) - dry-core (~> 0.5, >= 0.5) - dry-inflector (~> 0.1, >= 0.1.2) - dry-logic (~> 1.0, >= 1.0.2) - ice_nine (0.11.2) - mongo (2.14.0) - bson (>= 4.8.2, < 5.0.0) - slop (4.8.2) - -PLATFORMS - x86_64-darwin-19 - x86_64-linux - -DEPENDENCIES - dry-struct - dry-types - mongo - slop - -BUNDLED WITH - 2.2.3 diff --git a/airbyte-integrations/connectors/source-mongodb/README.md b/airbyte-integrations/connectors/source-mongodb/README.md deleted file mode 100644 index 7f108fb5b591..000000000000 --- a/airbyte-integrations/connectors/source-mongodb/README.md +++ /dev/null @@ -1,61 +0,0 @@ -# Mongodb Source - -This is the repository for the Mongodb source connector, written in Ruby. - -## Local development -### Requirements - -#### Ruby Version -This module uses `rbenv` to manage its Ruby version. If you have `rbenv` installed, you should be running the correct Ruby version. - -While it is _highly_ recommended to use `rbenv`, if you don't want to, just make sure your system is running whatever ruby version is present in the file `.ruby-version`. - -#### Install dependencies -1. Install the correct `bundle` version (found at the bottom of `Gemfile`). Currently this is `gem install bundle:2.2.3`. -2. `bundle install` - -### Local iteration -1. Change code -2. `ruby source.rb ` - -For example, to verify if your provided credentials are valid and can be used to connect to a mongo DB, run: -``` -ruby source.rb check --config -``` - -The full list of commands are: - -1. `ruby source.rb spec` -2. `ruby source.rb check --config ` -3. `ruby source.rb discover --config ` -4. `ruby source.rb read --config --catalog [--state ]` - -These commands correspond to the ones in the [Airbyte Protocol](). - -### Build connector Docker image -First, build the module by running the following from the airbyte project root directory: -``` -cd airbyte-integrations/connectors/source-mongodb/ -docker build . -t airbyte/source-mongodb:dev -``` - -### Integration Tests -From the airbyte project root, run: -``` -./gradlew clean :airbyte-integrations:connectors:source-mongodb:integrationTest -``` - -## Configure credentials -Create a `secrets` folder (which is gitignored by default) and place your credentials as a JSON file in it. An example of the needed credentials is available in `integration_tests/valid_config.json`. - -## Discover phase -MongoDB does not have anything like table definition, thus we have to define column types from actual attributes and their values. Discover phase have two steps: - -### Step 1. Find all unique properties -Connector runs the map-reduce command which returns all unique document props in the collection. Map-reduce approach should be sufficient even for large clusters. - -### Step 2. Determine property types -For each property found, connector selects 10k documents from the collection where this property is not empty. If all the selected values have the same type - connector will set appropriate type to the property. In all other cases connector will fallback to `string` type. - -## Author -This connector was authored by [Yury Koleda](https://github.com/FUT). diff --git a/airbyte-integrations/connectors/source-mongodb/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-mongodb/integration_tests/configured_catalog.json deleted file mode 100644 index a32da7f833e3..000000000000 --- a/airbyte-integrations/connectors/source-mongodb/integration_tests/configured_catalog.json +++ /dev/null @@ -1,71 +0,0 @@ -{ - "streams": [ - { - "sync_mode": "incremental", - "destination_sync_mode": "append", - "cursor_field": ["bucket_end_date"], - "stream": { - "name": "transactions", - "supported_sync_modes": ["full_refresh", "incremental"], - "json_schema": { - "properties": { - "_id": { - "type": "string" - }, - "account_id": { - "type": "integer" - }, - "transaction_count": { - "type": "integer" - }, - "bucket_start_date": { - "type": "string" - }, - "bucket_end_date": { - "type": "string" - }, - "transactions": { - "type": "array" - } - } - } - } - }, - { - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite", - "stream": { - "name": "customers", - "supported_sync_modes": ["full_refresh", "incremental"], - "json_schema": { - "properties": { - "_id": { - "type": "string" - }, - "username": { - "type": "string" - }, - "name": { - "type": "string" - }, - "address": { - "type": "string" - }, - "birthdate": { - "type": "string" - }, - "active": { - "type": "boolean" - }, - "accounts": { - "type": "array" - }, - "tier_and_details": { - "type": "string" - } - } - } - } - } - ] -} diff --git a/airbyte-integrations/connectors/source-mongodb/integration_tests/dump/analytics.archive b/airbyte-integrations/connectors/source-mongodb/integration_tests/dump/analytics.archive deleted file mode 100644 index c491368bcf47..000000000000 Binary files a/airbyte-integrations/connectors/source-mongodb/integration_tests/dump/analytics.archive and /dev/null differ diff --git a/airbyte-integrations/connectors/source-mongodb/integration_tests/valid_config.json b/airbyte-integrations/connectors/source-mongodb/integration_tests/valid_config.json deleted file mode 100644 index 0bff6cfb1fb2..000000000000 --- a/airbyte-integrations/connectors/source-mongodb/integration_tests/valid_config.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "host": "127.0.0.1", - "port": "27888", - "database": "sample_analytics", - "user": "user", - "password": "password", - "auth_source": "admin", - "ssl": false -} diff --git a/airbyte-integrations/connectors/source-mongodb/lib/airbyte_logger.rb b/airbyte-integrations/connectors/source-mongodb/lib/airbyte_logger.rb deleted file mode 100644 index a519e42cb58b..000000000000 --- a/airbyte-integrations/connectors/source-mongodb/lib/airbyte_logger.rb +++ /dev/null @@ -1,27 +0,0 @@ -require_relative './airbyte_protocol.rb' - -class AirbyteLogger - def self.format_log(text, log_level=Level::Info) - alm = AirbyteLogMessage.from_dynamic!({ - 'level' => log_level, - 'message' => text - }) - - AirbyteMessage.from_dynamic!({ - 'type' => Type::Log, - 'log' => alm.to_dynamic - }).to_json - end - - def self.logger_formatter - proc { |severity, datetime, progname, msg| - format_log("[#{datetime}] #{severity} : #{progname} | #{msg.dump}\n\n") - } - end - - def self.log(text, log_level=Level::Info) - message = format_log(text, log_level=Level::Info) - - puts message - end -end diff --git a/airbyte-integrations/connectors/source-mongodb/lib/airbyte_protocol.rb b/airbyte-integrations/connectors/source-mongodb/lib/airbyte_protocol.rb deleted file mode 100644 index 61c402405970..000000000000 --- a/airbyte-integrations/connectors/source-mongodb/lib/airbyte_protocol.rb +++ /dev/null @@ -1,462 +0,0 @@ -# The file was generated in several steps. -# -# 1. Convert airbyte_protocol.yaml to JSON -# 2. Use https://app.quicktype.io/?l=ruby to generate code. Parameters: -# Type strictness: Coercible -# Plain types only: Disabled -# 3. Fix module `Types` according to https://dry-rb.org/gems/dry-types/master/built-in-types/ -# 4. Replace all instance variable calls to just method calls (remove all characters) -# 4. Add `.compact` call to resulting object in every `to_dynamic` method -# -# -# -# This code may look unusually verbose for Ruby (and it is), but -# it performs some subtle and complex validation of JSON data. -# -# To parse this JSON, add 'dry-struct' and 'dry-types' gems, then do: -# -# airbyte = Airbyte.from_json! "{…}" -# puts airbyte.configured_airbyte_catalog&.streams.first.stream.supported_sync_modes&.first -# -# If from_json! succeeds, the value returned matches the schema. - -require 'json' -require 'dry-types' -require 'dry-struct' - -module Types - include Dry::Types() - - Int = Coercible::Integer - Bool = Strict::Bool - Hash = Coercible::Hash - String = Coercible::String - Type = Coercible::String.enum("CATALOG", "CONNECTION_STATUS", "LOG", "RECORD", "SPEC", "STATE") - SyncMode = Coercible::String.enum("full_refresh", "incremental") - Status = Coercible::String.enum("FAILED", "SUCCEEDED") - Level = Coercible::String.enum("DEBUG", "ERROR", "FATAL", "INFO", "TRACE", "WARN") -end - -# Message type -module Type - Catalog = "CATALOG" - ConnectionStatus = "CONNECTION_STATUS" - Log = "LOG" - Record = "RECORD" - Spec = "SPEC" - State = "STATE" -end - -module SyncMode - FullRefresh = "full_refresh" - Incremental = "incremental" -end - -class AirbyteStream < Dry::Struct - - # Path to the field that will be used to determine if a record is new or modified since the - # last sync. If not provided by the source, the end user will have to specify the - # comparable themselves. - attribute :default_cursor_field, Types.Array(Types::String).optional - - # Stream schema using Json Schema specs. - attribute :json_schema, Types::Hash.meta(of: Types::Any) - - # Stream's name. - attribute :airbyte_stream_name, Types::String - - # If the source defines the cursor field, then it does any other cursor field inputs will - # be ignored. If it does not either the user_provided one is used or as a backup the - # default one is used. - attribute :source_defined_cursor, Types::Bool.optional - - attribute :supported_sync_modes, Types.Array(Types::SyncMode).optional - - def self.from_dynamic!(d) - d = Types::Hash[d] - new( - default_cursor_field: d["default_cursor_field"], - json_schema: Types::Hash[d.fetch("json_schema")].map { |k, v| [k, Types::Any[v]] }.to_h, - airbyte_stream_name: d.fetch("name"), - source_defined_cursor: d["source_defined_cursor"], - supported_sync_modes: d["supported_sync_modes"], - ) - end - - def self.from_json!(json) - from_dynamic!(JSON.parse(json)) - end - - def to_dynamic - { - "default_cursor_field" => default_cursor_field, - "json_schema" => json_schema, - "name" => airbyte_stream_name, - "source_defined_cursor" => source_defined_cursor, - "supported_sync_modes" => supported_sync_modes, - }.compact - end - - def to_json(options = nil) - JSON.generate(to_dynamic, options) - end -end - -# log message: any kind of logging you want the platform to know about. -# -# Airbyte stream schema catalog -class AirbyteCatalog < Dry::Struct - attribute :streams, Types.Array(AirbyteStream) - - def self.from_dynamic!(d) - d = Types::Hash[d] - new( - streams: d.fetch("streams").map { |x| AirbyteStream.from_dynamic!(x) }, - ) - end - - def self.from_json!(json) - from_dynamic!(JSON.parse(json)) - end - - def to_dynamic - { - "streams" => streams.map { |x| x.to_dynamic }, - }.compact - end - - def to_json(options = nil) - JSON.generate(to_dynamic, options) - end -end - -module Status - Failed = "FAILED" - Succeeded = "SUCCEEDED" -end - -# Airbyte connection status -class AirbyteConnectionStatus < Dry::Struct - attribute :message, Types::String.optional - attribute :status, Types::Status - - def self.from_dynamic!(d) - d = Types::Hash[d] - new( - message: d["message"], - status: d.fetch("status"), - ) - end - - def self.from_json!(json) - from_dynamic!(JSON.parse(json)) - end - - def to_dynamic - { - "message" => message, - "status" => status, - }.compact - end - - def to_json(options = nil) - JSON.generate(to_dynamic, options) - end -end - -# the type of logging -module Level - Debug = "DEBUG" - Error = "ERROR" - Fatal = "FATAL" - Info = "INFO" - Trace = "TRACE" - Warn = "WARN" -end - -# log message: any kind of logging you want the platform to know about. -class AirbyteLogMessage < Dry::Struct - - # the type of logging - attribute :level, Types::Level - - # the log message - attribute :message, Types::String - - def self.from_dynamic!(d) - d = Types::Hash[d] - new( - level: d.fetch("level"), - message: d.fetch("message"), - ) - end - - def self.from_json!(json) - from_dynamic!(JSON.parse(json)) - end - - def to_dynamic - { - "level" => level, - "message" => message, - }.compact - end - - def to_json(options = nil) - JSON.generate(to_dynamic, options) - end -end - -# record message: the record -class AirbyteRecordMessage < Dry::Struct - - # the record data - attribute :data, Types::Hash.meta(of: Types::Any) - - # when the data was emitted from the source. epoch in millisecond. - attribute :emitted_at, Types::Int - - # the name of the stream for this record - attribute :stream, Types::String - - def self.from_dynamic!(d) - d = Types::Hash[d] - new( - data: Types::Hash[d.fetch("data")].map { |k, v| [k, Types::Any[v]] }.to_h, - emitted_at: d.fetch("emitted_at"), - stream: d.fetch("stream"), - ) - end - - def self.from_json!(json) - from_dynamic!(JSON.parse(json)) - end - - def to_dynamic - { - "data" => data, - "emitted_at" => emitted_at, - "stream" => stream, - }.compact - end - - def to_json(options = nil) - JSON.generate(to_dynamic, options) - end -end - -# Specification of a connector (source/destination) -class ConnectorSpecification < Dry::Struct - attribute :changelog_url, Types::String.optional - - # ConnectorDefinition specific blob. Must be a valid JSON string. - attribute :connection_specification, Types::Hash.meta(of: Types::Any) - - attribute :documentation_url, Types::String.optional - - # If the connector supports incremental mode or not. - attribute :supports_incremental, Types::Bool.optional - - def self.from_dynamic!(d) - d = Types::Hash[d] - new( - changelog_url: d["changelogUrl"], - connection_specification: Types::Hash[d.fetch("connectionSpecification")].map { |k, v| [k, Types::Any[v]] }.to_h, - documentation_url: d["documentationUrl"], - supports_incremental: d["supportsIncremental"], - ) - end - - def self.from_json!(json) - from_dynamic!(JSON.parse(json)) - end - - def to_dynamic - { - "changelogUrl" => changelog_url, - "connectionSpecification" => connection_specification, - "documentationUrl" => documentation_url, - "supportsIncremental" => supports_incremental, - }.compact - end - - def to_json(options = nil) - JSON.generate(to_dynamic, options) - end -end - -# schema message: the state. Must be the last message produced. The platform uses this -# information -class AirbyteStateMessage < Dry::Struct - - # the state data - attribute :data, Types::Hash.meta(of: Types::Any) - - def self.from_dynamic!(d) - d = Types::Hash[d] - new( - data: Types::Hash[d.fetch("data")].map { |k, v| [k, Types::Any[v]] }.to_h, - ) - end - - def self.from_json!(json) - from_dynamic!(JSON.parse(json)) - end - - def to_dynamic - { - "data" => data, - }.compact - end - - def to_json(options = nil) - JSON.generate(to_dynamic, options) - end -end - -class AirbyteMessage < Dry::Struct - - # log message: any kind of logging you want the platform to know about. - attribute :catalog, AirbyteCatalog.optional - - attribute :connection_status, AirbyteConnectionStatus.optional - - # log message: any kind of logging you want the platform to know about. - attribute :log, AirbyteLogMessage.optional - - # record message: the record - attribute :record, AirbyteRecordMessage.optional - - attribute :spec, ConnectorSpecification.optional - - # schema message: the state. Must be the last message produced. The platform uses this - # information - attribute :state, AirbyteStateMessage.optional - - # Message type - attribute :airbyte_message_type, Types::Type - - def self.from_dynamic!(d) - d = Types::Hash[d] - new( - catalog: d["catalog"] ? AirbyteCatalog.from_dynamic!(d["catalog"]) : nil, - connection_status: d["connectionStatus"] ? AirbyteConnectionStatus.from_dynamic!(d["connectionStatus"]) : nil, - log: d["log"] ? AirbyteLogMessage.from_dynamic!(d["log"]) : nil, - record: d["record"] ? AirbyteRecordMessage.from_dynamic!(d["record"]) : nil, - spec: d["spec"] ? ConnectorSpecification.from_dynamic!(d["spec"]) : nil, - state: d["state"] ? AirbyteStateMessage.from_dynamic!(d["state"]) : nil, - airbyte_message_type: d.fetch("type"), - ) - end - - def self.from_json!(json) - from_dynamic!(JSON.parse(json)) - end - - def to_dynamic - { - "catalog" => catalog&.to_dynamic, - "connectionStatus" => connection_status&.to_dynamic, - "log" => log&.to_dynamic, - "record" => record&.to_dynamic, - "spec" => spec&.to_dynamic, - "state" => state&.to_dynamic, - "type" => airbyte_message_type, - }.compact - end - - def to_json(options = nil) - JSON.generate(to_dynamic, options) - end -end - -class ConfiguredAirbyteStream < Dry::Struct - - # Path to the field that will be used to determine if a record is new or modified since the - # last sync. This field is REQUIRED if `sync_mode` is `incremental`. Otherwise it is - # ignored. - attribute :cursor_field, Types.Array(Types::String).optional - - attribute :stream, AirbyteStream - attribute :sync_mode, Types::SyncMode.optional - - def self.from_dynamic!(d) - d = Types::Hash[d] - new( - cursor_field: d["cursor_field"], - stream: AirbyteStream.from_dynamic!(d.fetch("stream")), - sync_mode: d["sync_mode"], - ) - end - - def self.from_json!(json) - from_dynamic!(JSON.parse(json)) - end - - def to_dynamic - { - "cursor_field" => cursor_field, - "stream" => stream.to_dynamic, - "sync_mode" => sync_mode, - }.compact - end - - def to_json(options = nil) - JSON.generate(to_dynamic, options) - end -end - -# Airbyte stream schema catalog -class ConfiguredAirbyteCatalog < Dry::Struct - attribute :streams, Types.Array(ConfiguredAirbyteStream) - - def self.from_dynamic!(d) - d = Types::Hash[d] - new( - streams: d.fetch("streams").map { |x| ConfiguredAirbyteStream.from_dynamic!(x) }, - ) - end - - def self.from_json!(json) - from_dynamic!(JSON.parse(json)) - end - - def to_dynamic - { - "streams" => streams.map { |x| x.to_dynamic }, - }.compact - end - - def to_json(options = nil) - JSON.generate(to_dynamic, options) - end -end - -# AirbyteProtocol structs -class Airbyte < Dry::Struct - attribute :airbyte_message, AirbyteMessage.optional - attribute :configured_airbyte_catalog, ConfiguredAirbyteCatalog.optional - - def self.from_dynamic!(d) - d = Types::Hash[d] - new( - airbyte_message: d["airbyte_message"] ? AirbyteMessage.from_dynamic!(d["airbyte_message"]) : nil, - configured_airbyte_catalog: d["configured_airbyte_catalog"] ? ConfiguredAirbyteCatalog.from_dynamic!(d["configured_airbyte_catalog"]) : nil, - ) - end - - def self.from_json!(json) - from_dynamic!(JSON.parse(json)) - end - - def to_dynamic - { - "airbyte_message" => airbyte_message&.to_dynamic, - "configured_airbyte_catalog" => configured_airbyte_catalog&.to_dynamic, - }.compact - end - - def to_json(options = nil) - JSON.generate(to_dynamic, options) - end -end diff --git a/airbyte-integrations/connectors/source-mongodb/lib/mongodb_configured_stream/base.rb b/airbyte-integrations/connectors/source-mongodb/lib/mongodb_configured_stream/base.rb deleted file mode 100644 index a06ee2fd6f3e..000000000000 --- a/airbyte-integrations/connectors/source-mongodb/lib/mongodb_configured_stream/base.rb +++ /dev/null @@ -1,43 +0,0 @@ -require_relative '../airbyte_logger.rb' - -module MongodbConfiguredStream - class Base - attr_reader :processed_count, :configured_stream - - def initialize(configured_stream:, state:, client:) - @configured_stream = configured_stream - @state = state - @client = client - - @processed_count = 0 - end - - def stream - @stream ||= configured_stream['stream'] - end - - def stream_name - @stream_name ||= configured_stream['stream']['name'] - end - - def sync_mode - @sync_mode ||= configured_stream['sync_mode'] - end - - def compose_query - {} - end - - def valid? - true - end - - def after_item_processed(item) - @processed_count += 1 - end - - def after_stream_processed - AirbyteLogger.log("Stream #{stream_name} successfully processed!") - end - end -end diff --git a/airbyte-integrations/connectors/source-mongodb/lib/mongodb_configured_stream/factory.rb b/airbyte-integrations/connectors/source-mongodb/lib/mongodb_configured_stream/factory.rb deleted file mode 100644 index 189f4b195ff6..000000000000 --- a/airbyte-integrations/connectors/source-mongodb/lib/mongodb_configured_stream/factory.rb +++ /dev/null @@ -1,17 +0,0 @@ -require_relative '../airbyte_logger.rb' - -require_relative './full_refresh.rb' -require_relative './incremental.rb' - -class MongodbConfiguredStream::Factory - def self.build(configured_stream:, state:, client:) - case configured_stream['sync_mode'] - when SyncMode::FullRefresh - MongodbConfiguredStream::FullRefresh.new(configured_stream: configured_stream, state: state, client: client) - when SyncMode::Incremental - MongodbConfiguredStream::Incremental.new(configured_stream: configured_stream, state: state, client: client) - else - AirbyteLogger.log("Sync mode #{configured_stream['sync_mode']} is not supported!", Level::Fatal) - end - end -end diff --git a/airbyte-integrations/connectors/source-mongodb/lib/mongodb_configured_stream/full_refresh.rb b/airbyte-integrations/connectors/source-mongodb/lib/mongodb_configured_stream/full_refresh.rb deleted file mode 100644 index 0ad730cfd9ad..000000000000 --- a/airbyte-integrations/connectors/source-mongodb/lib/mongodb_configured_stream/full_refresh.rb +++ /dev/null @@ -1,6 +0,0 @@ -require_relative '../airbyte_logger.rb' - -require_relative './base.rb' - -class MongodbConfiguredStream::FullRefresh < MongodbConfiguredStream::Base -end diff --git a/airbyte-integrations/connectors/source-mongodb/lib/mongodb_configured_stream/incremental.rb b/airbyte-integrations/connectors/source-mongodb/lib/mongodb_configured_stream/incremental.rb deleted file mode 100644 index 2fba51fc6ee5..000000000000 --- a/airbyte-integrations/connectors/source-mongodb/lib/mongodb_configured_stream/incremental.rb +++ /dev/null @@ -1,93 +0,0 @@ -require_relative '../airbyte_logger.rb' - -require_relative './base.rb' -require_relative '../mongodb_types_converter.rb' -require_relative '../mongodb_types_explorer.rb' - -class MongodbConfiguredStream::Incremental < MongodbConfiguredStream::Base - DATETIME_TYPES = [Date, Time, DateTime] - CURSOR_TYPES = { - datetime: 'DATETIME', - integer: 'integer', - } - - attr_reader :cursor_field_type - - def initialize(configured_stream:, state:, client:) - super - - @cursor_field_type = determine_cursor_field_type - AirbyteLogger.log("Cursor type was determined as: #{@cursor_field_type}") - - value = @state.get(stream_name: stream_name, cursor_field: cursor_field) - @cursor = value && convert_cursor(value) - end - - def cursor_field - @cursor_field ||= configured_stream['cursor_field']&.first - end - - def compose_query - if @cursor - { - cursor_field => { - "$gt": @cursor - } - } - else - {} - end - end - - def valid? - if configured_stream['cursor_field'].count != 1 - AirbyteLogger.log("Stream #{stream_name} has invalid configuration. Cursor field #{wrapper['cursor_field']} configuration is invalid. Should contain exactly one document property name.", Level::Fatal) - return false - end - - true - end - - def after_item_processed(item) - super - - if item[cursor_field] - converted_cursor = convert_cursor(item[cursor_field]) - if !@cursor || converted_cursor && converted_cursor > @cursor - @cursor = converted_cursor - end - else - AirbyteLogger.log("Cursor is empty! Incremental sync results might be unpredictable! Item: #{item}", Level::Fatal) - end - end - - def after_stream_processed - super - - @state.set(stream_name: stream_name, cursor_field: cursor_field, cursor: @cursor) - @state.dump_state! - end - - private - - def determine_cursor_field_type - MongodbTypesExplorer.run(collection: @client[stream_name], field: cursor_field) do |type| - if DATETIME_TYPES.include?(type) - CURSOR_TYPES[:datetime] - else - CURSOR_TYPES[:integer] - end - end - end - - def convert_cursor(value) - if cursor_field_type == CURSOR_TYPES[:datetime] - Time.parse(value) - elsif cursor_field_type == CURSOR_TYPES[:integer] - value.to_i - else - AirbyteLogger.log("Cursor type #{cursor_field_type} is not supported!", Level::Fatal) - end - end - -end diff --git a/airbyte-integrations/connectors/source-mongodb/lib/mongodb_reader.rb b/airbyte-integrations/connectors/source-mongodb/lib/mongodb_reader.rb deleted file mode 100644 index b0bb8a477655..000000000000 --- a/airbyte-integrations/connectors/source-mongodb/lib/mongodb_reader.rb +++ /dev/null @@ -1,68 +0,0 @@ -require_relative './airbyte_protocol.rb' -require_relative './airbyte_logger.rb' - -require_relative './mongodb_stream.rb' -require_relative './mongodb_types_converter.rb' -require_relative './mongodb_configured_stream/factory.rb' - -class MongodbReader - BATCH_SIZE = 10_000 - LOG_BATCH_SIZE = 10_000 - - def initialize(client:, catalog:, state:) - @client = client - @catalog = catalog - @state = state - end - - def read - @catalog['streams'].each do |configured_stream| - wrapper = MongodbConfiguredStream::Factory.build(configured_stream: configured_stream, state: @state, client: @client) - - AirbyteLogger.log("Reading stream #{wrapper.stream_name} in #{wrapper.sync_mode} mode") - - if wrapper.valid? - read_configured_stream(wrapper) - end - end - end - - private - - def read_configured_stream(wrapper) - collection = @client[wrapper.stream_name] - - projection_config = wrapper.stream['json_schema']['properties'].keys.each_with_object({}) do |key, obj| - obj[key] = 1 - end - - full_count = collection.count - - collection.find(wrapper.compose_query).projection(projection_config).batch_size(BATCH_SIZE).each do |item| - item.each_pair do |key, value| - item[key] = MongodbTypesConverter.convert_value_to_type(value, wrapper.stream['json_schema']['properties'][key]['type']) - end - - record = AirbyteRecordMessage.from_dynamic!({ - "data" => item, - "emitted_at" => Time.now.to_i * 1000, - "stream" => wrapper.stream_name, - }) - - message = AirbyteMessage.from_dynamic!({ - 'type' => Type::Record, - 'record' => record.to_dynamic, - }) - - puts message.to_json - - wrapper.after_item_processed(item) - - if wrapper.processed_count % LOG_BATCH_SIZE == 0 - AirbyteLogger.log("[#{wrapper.processed_count}/#{full_count}}] Reading stream #{wrapper.stream_name} is in progress") - end - end - - wrapper.after_stream_processed - end -end diff --git a/airbyte-integrations/connectors/source-mongodb/lib/mongodb_source.rb b/airbyte-integrations/connectors/source-mongodb/lib/mongodb_source.rb deleted file mode 100644 index ad30fcd83916..000000000000 --- a/airbyte-integrations/connectors/source-mongodb/lib/mongodb_source.rb +++ /dev/null @@ -1,84 +0,0 @@ -require_relative './airbyte_protocol.rb' -require_relative './airbyte_logger.rb' - -require_relative './mongodb_stream.rb' -require_relative './mongodb_reader.rb' -require_relative './mongodb_state.rb' - -class MongodbSource - def spec - spec = JSON.parse(File.read(__dir__ + '/spec.json')) - - message = AirbyteMessage.from_dynamic!({ - 'type' => Type::Spec, - 'spec' => spec, - }) - - puts message.to_json - end - - def check(config:) - @config = JSON.parse(File.read(config)) - - result = begin - client.collections.first.find.limit(1).first - {'status' => Status::Succeeded} - rescue Exception => e - AirbyteLogger.log(e.backtrace.join("\n"), Level::Fatal) - {'status' => Status::Failed, 'message' => 'Authentication failed.'} - end - - message = AirbyteMessage.from_dynamic!({ - 'type' => Type::ConnectionStatus, - 'connectionStatus' => result, - }) - - puts message.to_json - end - - def discover(config:) - @config = JSON.parse(File.read(config)) - - streams = client.collections.map do |collection| - AirbyteLogger.log("Discovering stream #{collection.name}") - MongodbStream.new(collection: collection).discover - end - - catalog = AirbyteCatalog.from_dynamic!({ - 'streams' => streams, - }) - - puts AirbyteMessage.from_dynamic!({ - 'type' => Type::Catalog, - 'catalog' => catalog.to_dynamic - }).to_json - end - - def read(config:, catalog:, state: nil) - @config = JSON.parse(File.read(config)) - @catalog = JSON.parse(File.read(catalog)) - @state = MongodbState.new(state_file: state) - - MongodbReader.new(client: client, catalog: @catalog, state: @state).read - end - - def method_missing(m, *args, &block) - AirbyteLogger.log("There's no method called #{m}", Level::Fatal) - end - - private - - def client - @client ||= begin - uri = "mongodb://#{@config['user']}:#{@config['password']}@#{@config['host']}:#{@config['port']}/#{@config['database']}?authSource=#{@config['auth_source']}" - if !@config.fetch(:"replica_set", "").strip.empty? - uri += "&replicaSet=#{@config['replica_set']}&ssl=true" - elsif ["true", true].include?(@config.fetch("ssl", false)) - uri += "&ssl=true" - end - @client = Mongo::Client.new(uri) - @client.logger.formatter = AirbyteLogger.logger_formatter - @client - end - end -end diff --git a/airbyte-integrations/connectors/source-mongodb/lib/mongodb_state.rb b/airbyte-integrations/connectors/source-mongodb/lib/mongodb_state.rb deleted file mode 100644 index 5c95985ee721..000000000000 --- a/airbyte-integrations/connectors/source-mongodb/lib/mongodb_state.rb +++ /dev/null @@ -1,43 +0,0 @@ -require_relative './airbyte_protocol.rb' -require_relative './airbyte_logger.rb' - -require 'json' - -class MongodbState - - def initialize(state_file:) - @state = if state_file - JSON.parse(File.read(state_file)) - else - {} - end - - AirbyteLogger.log("Initialized with state:\n#{JSON.pretty_generate(@state)}") - end - - def get(stream_name:, cursor_field:) - @state.dig(stream_name, cursor_field) - end - - def set(stream_name:, cursor_field:, cursor:) - @state[stream_name] ||= {} - @state[stream_name][cursor_field] = cursor - end - - def dump_state! - json = @state.to_json - - AirbyteLogger.log("Saving state:\n#{JSON.pretty_generate(@state)}") - - asm = AirbyteStateMessage.from_dynamic!({ - 'data' => @state, - }) - - message = AirbyteMessage.from_dynamic!({ - 'type' => Type::State, - 'state' => asm.to_dynamic, - }) - - puts message.to_json - end -end diff --git a/airbyte-integrations/connectors/source-mongodb/lib/mongodb_stream.rb b/airbyte-integrations/connectors/source-mongodb/lib/mongodb_stream.rb deleted file mode 100644 index 4d16e4a2f48a..000000000000 --- a/airbyte-integrations/connectors/source-mongodb/lib/mongodb_stream.rb +++ /dev/null @@ -1,76 +0,0 @@ -require_relative './airbyte_protocol.rb' -require_relative './airbyte_logger.rb' - -require_relative './mongodb_types_explorer.rb' - -class MongodbStream - DISCOVER_LIMIT = 10_000 - - AIRBYTE_TYPES = { - boolean: 'boolean', - number: 'number', - integer: 'integer', - string: 'string', - object: 'object', - array: 'array', - } - - TYPES_MAPPING = { - Float => AIRBYTE_TYPES[:number], - Integer => AIRBYTE_TYPES[:integer], - String => AIRBYTE_TYPES[:string], - DateTime => AIRBYTE_TYPES[:string], - TrueClass => AIRBYTE_TYPES[:boolean], - FalseClass => AIRBYTE_TYPES[:boolean], - Array => AIRBYTE_TYPES[:array], - Hash => AIRBYTE_TYPES[:object], - } - FALLBACK_TYPE = AIRBYTE_TYPES[:string] - - def initialize(collection:) - @collection = collection - @properties = {} - end - - def discover - discover_properties - - AirbyteStream.from_dynamic!({ - "name" => @collection.name, - "supported_sync_modes" => [SyncMode::FullRefresh, SyncMode::Incremental], - "source_defined_cursor" => false, - "json_schema" => { - "properties": @properties - } - }).to_dynamic - end - - private - - - def discover_property_type(property) - MongodbTypesExplorer.run(collection: @collection, field: property) do |type| - TYPES_MAPPING[type] || FALLBACK_TYPE - end || FALLBACK_TYPE - end - - def discover_properties - map = "function() { for (var key in this) { emit(key, null); } }" - reduce = "function(key, stuff) { return null; }" - - opts = { - out: {inline: 1}, - raw: true, - } - - view = Mongo::Collection::View.new(@collection, {}, limit: DISCOVER_LIMIT) - props = view.map_reduce(map, reduce, opts).map do |obj| - obj['_id'] - end - - props.each do |prop| - @properties[prop] = { 'type' => discover_property_type(prop) } - AirbyteLogger.log(" #{@collection.name}.#{prop} TYPE IS #{@properties[prop]['type']}") - end - end -end diff --git a/airbyte-integrations/connectors/source-mongodb/lib/mongodb_types_converter.rb b/airbyte-integrations/connectors/source-mongodb/lib/mongodb_types_converter.rb deleted file mode 100644 index b36bd4372913..000000000000 --- a/airbyte-integrations/connectors/source-mongodb/lib/mongodb_types_converter.rb +++ /dev/null @@ -1,24 +0,0 @@ -require_relative './airbyte_protocol.rb' - -require_relative './mongodb_stream.rb' - -class MongodbTypesConverter - def self.convert_value_to_type(value, type) - case type - when MongodbStream::AIRBYTE_TYPES[:boolean] - !!value - when MongodbStream::AIRBYTE_TYPES[:number] - value.to_f - when MongodbStream::AIRBYTE_TYPES[:integer] - value.to_i - when MongodbStream::AIRBYTE_TYPES[:string] - value.to_s - when MongodbStream::AIRBYTE_TYPES[:object] - value.is_a?(Hash) ? value : { 'value' => value.to_s } - when MongodbStream::AIRBYTE_TYPES[:array] - value.is_a?(Array) ? value : [ value.to_s ] - else - value.to_s - end - end -end diff --git a/airbyte-integrations/connectors/source-mongodb/lib/mongodb_types_explorer.rb b/airbyte-integrations/connectors/source-mongodb/lib/mongodb_types_explorer.rb deleted file mode 100644 index 44668f5b34e1..000000000000 --- a/airbyte-integrations/connectors/source-mongodb/lib/mongodb_types_explorer.rb +++ /dev/null @@ -1,40 +0,0 @@ -require_relative './airbyte_protocol.rb' - -require_relative './mongodb_stream.rb' - -class MongodbTypesExplorer - EXPLORE_LIMIT = 1_000 - - @@cache = {} - - def self.run(collection:, field:, limit: EXPLORE_LIMIT, &type_mapping_block) - determine_field_types_for_collection(collection: collection, limit: limit, &type_mapping_block) - - @@cache[collection.name][field] - end - - private - - def self.determine_field_types_for_collection(collection:, limit:, &type_mapping_block) - return if @@cache[collection.name] - - airbyte_types = {} - - collection.find.limit(limit).each do |item| - item.each_pair do |key, value| - mapped_value = type_mapping_block[value.class] - - airbyte_types[key] ||= Set[] - airbyte_types[key].add(mapped_value) - end - end - - @@cache[collection.name] = {} - airbyte_types.each_pair do |field, types| - # Has one specific type - if types.count == 1 - @@cache[collection.name][field] = types.first - end - end - end -end diff --git a/airbyte-integrations/connectors/source-mongodb/lib/spec.json b/airbyte-integrations/connectors/source-mongodb/lib/spec.json deleted file mode 100644 index 390f196ccdfe..000000000000 --- a/airbyte-integrations/connectors/source-mongodb/lib/spec.json +++ /dev/null @@ -1,70 +0,0 @@ -{ - "documentationUrl": "https://docs.airbyte.com/integrations/sources/mongodb", - "changelogUrl": "https://docs.airbyte.com/integrations/sources/mongodb", - "connectionSpecification": { - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "Mongodb Source Spec", - "type": "object", - "required": ["host", "port", "database", "user", "password", "auth_source"], - "additionalProperties": false, - "properties": { - "host": { - "title": "Host", - "type": "string", - "description": "Host of a Mongo database to be replicated.", - "order": 0 - }, - "port": { - "title": "Port", - "type": "integer", - "description": "Port of a Mongo database to be replicated.", - "minimum": 0, - "maximum": 65536, - "default": 27017, - "examples": ["27017"], - "order": 1 - }, - "database": { - "title": "Database name", - "type": "string", - "description": "Database to be replicated.", - "order": 2 - }, - "user": { - "title": "User", - "type": "string", - "description": "User", - "order": 3 - }, - "password": { - "title": "Password", - "type": "string", - "description": "Password", - "airbyte_secret": true, - "order": 4 - }, - "auth_source": { - "title": "Authentication source", - "type": "string", - "description": "Authentication source where user information is stored. See the Mongo docs for more info.", - "default": "admin", - "examples": ["admin"], - "order": 5 - }, - "replica_set": { - "title": "Replica Set", - "type": "string", - "description": "The name of the set to filter servers by, when connecting to a replica set (Under this condition, the 'TLS connection' value automatically becomes 'true'). See the Mongo docs for more info.", - "default": "", - "order": 6 - }, - "ssl": { - "title": "TLS connection", - "type": "boolean", - "description": "If this switch is enabled, TLS connections will be used to connect to MongoDB.", - "default": false, - "order": 7 - } - } - } -} diff --git a/airbyte-integrations/connectors/source-mongodb/source.rb b/airbyte-integrations/connectors/source-mongodb/source.rb deleted file mode 100644 index 5aac0c5b1ae4..000000000000 --- a/airbyte-integrations/connectors/source-mongodb/source.rb +++ /dev/null @@ -1,16 +0,0 @@ -require 'optparse' -require 'mongo' -require 'slop' -require_relative './lib/mongodb_source' - -# require 'byebug' - -parsed = Slop.parse do |o| - o.string '--config', 'Config file path' - o.string '--catalog', 'Catalog file path' - o.string '--state', 'State file path' -end - -opts = parsed.to_hash.select { |_, value| value } - -MongodbSource.new.public_send(parsed.arguments.first, **opts)