From e762e16df8b7af7b4733d04c071b52d4b032794a Mon Sep 17 00:00:00 2001 From: "jiang.pengcheng" Date: Fri, 4 Sep 2020 17:43:21 +0800 Subject: [PATCH 01/10] Implement MongoDBActivationStore Co-authored-by: Chetan Mehrotra --- ansible/README.md | 36 + ansible/group_vars/all | 11 + ansible/initMongoDB.yml | 25 + ansible/library/mongodb.py | 283 ++++++++ ansible/mongodb.yml | 22 + ansible/roles/controller/tasks/deploy.yml | 5 + ansible/roles/invoker/tasks/deploy.yml | 5 + ansible/roles/mongodb/tasks/clean.yml | 15 + ansible/roles/mongodb/tasks/deploy.yml | 24 + ansible/roles/mongodb/tasks/main.yml | 13 + ansible/tasks/db/checkDb.yml | 4 + ansible/templates/whisk.conf.j2 | 10 + common/scala/build.gradle | 2 + .../scala/src/main/resources/application.conf | 7 + .../apache/openwhisk/core/WhiskConfig.scala | 1 + .../database/mongodb/AsyncStreamSink.scala | 122 ++++ .../database/mongodb/AsyncStreamSource.scala | 104 +++ .../mongodb/MongoDBArtifactStore.scala | 663 ++++++++++++++++++ .../MongoDBArtifactStoreProvider.scala | 99 +++ .../database/mongodb/MongoDBViewMapper.scala | 224 ++++++ tests/src/test/resources/application.conf.j2 | 5 + .../mongodb/AsyncStreamGraphTests.scala | 153 ++++ .../mongodb/MongoDBArtifactStoreTests.scala | 26 + .../mongodb/MongoDBAttachmentStoreTests.scala | 33 + .../mongodb/MongoDBStoreBehaviorBase.scala | 45 ++ .../mongodb/MongoDBViewMapperTests.scala | 256 +++++++ tools/build/README.md | 1 + tools/build/redo | 7 + 28 files changed, 2201 insertions(+) create mode 100644 ansible/initMongoDB.yml create mode 100644 ansible/library/mongodb.py create mode 100644 ansible/mongodb.yml create mode 100644 ansible/roles/mongodb/tasks/clean.yml create mode 100644 ansible/roles/mongodb/tasks/deploy.yml create mode 100644 ansible/roles/mongodb/tasks/main.yml create mode 100644 common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamSink.scala create mode 100644 common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamSource.scala create mode 100644 common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala create mode 100644 common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreProvider.scala create mode 100644 common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBViewMapper.scala create mode 100644 tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamGraphTests.scala create mode 100644 tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreTests.scala create mode 100644 tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAttachmentStoreTests.scala create mode 100644 tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBStoreBehaviorBase.scala create mode 100644 tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBViewMapperTests.scala diff --git a/ansible/README.md b/ansible/README.md index d08b688c362..fd5bd4722a9 100644 --- a/ansible/README.md +++ b/ansible/README.md @@ -196,6 +196,42 @@ ansible-playbook -i environments/$ENVIRONMENT routemgmt.yml - To use the API Gateway, you'll need to run `apigateway.yml` and `routemgmt.yml`. - Use `ansible-playbook -i environments/$ENVIRONMENT openwhisk.yml` to avoid wiping the data store. This is useful to start OpenWhisk after restarting your Operating System. +### Deploying Using MongoDB + +You can use MongoDB as the database backend, and there is an ansible task to deploy a single node MongoDB server for testing and developing + +- Deploy mongodb server(Optional) + +``` +ansible-playbook -i environments/ mongodb.yml -e mongodb_data_volume="/tmp/mongo-data" +``` + +- Then execute + +``` +cd +./gradlew distDocker +cd ansible +ansible-playbook -i environments/ initMongodb.yml -e mongodb_connect_string="mongodb://172.17.0.1:27017" +ansible-playbook -i environments/ apigateway.yml -e mongodb_connect_string="mongodb://172.17.0.1:27017" +ansible-playbook -i environments/ openwhisk.yml -e mongodb_connect_string="mongodb://172.17.0.1:27017" -e database_backend="MongoDB" + +# installs a catalog of public packages and actions +ansible-playbook -i environments/ postdeploy.yml + +# to use the API gateway +ansible-playbook -i environments/ apigateway.yml +ansible-playbook -i environments/ routemgmt.yml +``` + +Available parameters for ansible are +``` + mongodb: + connect_string: "{{ mongodb_connect_string }}" + database: "{{ mongodb_database | default('whisks') }}" + data_volume: "{{ mongodb_data_volume | default('mongo-data') }}" +``` + ### Using ElasticSearch to Store Activations You can use ElasticSearch (ES) to store activations separately while other entities remain stored in CouchDB. There is an Ansible playbook to setup a simple ES cluster for testing and development purposes. diff --git a/ansible/group_vars/all b/ansible/group_vars/all index bcaba7a4e97..daabccdd4f1 100644 --- a/ansible/group_vars/all +++ b/ansible/group_vars/all @@ -256,6 +256,7 @@ db: port: "{{ db_port | default(lookup('ini', 'db_port section=db_creds file={{ playbook_dir }}/db_local.ini')) }}" host: "{{ db_host | default(lookup('ini', 'db_host section=db_creds file={{ playbook_dir }}/db_local.ini')) }}" persist_path: "{{ db_persist_path | default(false) }}" + backend: "{{ database_backend | default('CouchDB') }}" instances: "{{ groups['db'] | length }}" authkeys: - guest @@ -295,6 +296,10 @@ db: admin: username: "{{ elastic_username | default('admin') }}" password: "{{ elastic_password | default('admin') }}" + mongodb: + connect_string: "{{ mongodb_connect_string }}" + database: "{{ mongodb_database | default('whisks') }}" + data_volume: "{{ mongodb_data_volume | default('mongo-data') }}" apigateway: port: @@ -322,6 +327,12 @@ elasticsearch_connect_string: "{% set ret = [] %}\ {{ ret.append( hostvars[host].ansible_host + ':' + ((db.elasticsearch.port+loop.index-1)|string) ) }}\ {% endfor %}\ {{ ret | join(',') }}" +mongodb: + version: 4.4.0 + commonEnv: + CONFIG_whisk_mongodb_uri: "{{ db.mongodb.connect_string }}" + CONFIG_whisk_mongodb_database: "{{ db.mongodb.database }}" + CONFIG_whisk_spi_ArtifactStoreProvider: "org.apache.openwhisk.core.database.mongodb.MongoDBArtifactStoreProvider" docker: # The user to install docker for. Defaults to the ansible user if not set. This will be the user who is able to run diff --git a/ansible/initMongoDB.yml b/ansible/initMongoDB.yml new file mode 100644 index 00000000000..c790996f5fb --- /dev/null +++ b/ansible/initMongoDB.yml @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more contributor +# license agreements; and to You under the Apache License, Version 2.0. +--- +# This playbook will initialize the immortal DBs in the database account. +# This step is usually done only once per deployment. + +- hosts: ansible + tasks: + - name: create necessary auth keys + mongodb: + connect_string: "{{ db.mongodb.connect_string }}" + database: "{{ db.mongodb.database }}" + collection: "whiskauth" + doc: + _id: "{{ item }}" + subject: "{{ item }}" + namespaces: + - name: "{{ item }}" + uuid: "{{ key.split(':')[0] }}" + key: "{{ key.split(':')[1] }}" + mode: "doc" + force_update: True + vars: + key: "{{ lookup('file', 'files/auth.{{ item }}') }}" + with_items: "{{ db.authkeys }}" diff --git a/ansible/library/mongodb.py b/ansible/library/mongodb.py new file mode 100644 index 00000000000..fb610faec09 --- /dev/null +++ b/ansible/library/mongodb.py @@ -0,0 +1,283 @@ +#!/usr/bin/python + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +DOCUMENTATION = ''' +--- +module: mongodb +short_description: A module which support some simple operations on MongoDB. +description: + - Including add user/insert document/create indexes in MongoDB +options: + connect_string: + description: + - The uri of mongodb server + required: true + database: + description: + - The name of the database you want to manipulate + required: true + user: + description: + - The name of the user to add or remove, required when use 'user' mode + required: false + default: null + password: + description: + - The password to use for the user, required when use 'user' mode + required: false + default: null + roles: + description: + - The roles of the user, it's a list of dict, each dict requires two fields: 'db' and 'role', required when use 'user' mode + required: false + default: null + collection: + required: false + description: + - The name of the collection you want to manipulate, required when use 'doc' or 'indexes' mode + doc: + required: false + description: + - The document you want to insert into MongoDB, required when use 'doc' mode + indexes: + required: false + description: + - The indexes you want to create in MongoDB, it's a list of dict, you can see the example for the usage, required when use 'index' mode + force_update: + required: false + description: + - Whether replace/update existing user or doc or raise DuplicateKeyError, default is false + mode: + required: false + default: user + choices: ['user', 'doc', 'index'] + description: + - use 'user' mode if you want to add user, 'doc' mode to insert document, 'index' mode to create indexes + +requirements: [ "pymongo" ] +author: + - "Jinag PengCheng" +''' + +EXAMPLES = ''' +# add user +- mongodb: + connect_string: mongodb://localhost:27017 + database: admin + user: test + password: 123456 + roles: + - db: test_database + role: read + force_update: true + +# add doc +- mongodb: + connect_string: mongodb://localhost:27017 + mode: doc + database: admin + collection: main + doc: + id: "id/document" + title: "the name of document" + content: "which doesn't matter" + force_update: true + +# add indexes +- mongodb: + connect_string: mongodb://localhost:27017 + mode: index + database: admin + collection: main + indexes: + - index: + - field: updated_at + direction: 1 + - field: name + direction: -1 + name: test-index + unique: true +''' + +import traceback + +from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils._text import to_native + +try: + from pymongo import ASCENDING, DESCENDING, GEO2D, GEOHAYSTACK, GEOSPHERE, HASHED, TEXT + from pymongo import IndexModel + from pymongo import MongoClient + from pymongo.errors import DuplicateKeyError +except ImportError: + pass + + +# ========================================= +# MongoDB module specific support methods. +# + +class UnknownIndexPlugin(Exception): + pass + + +def check_params(params, mode, module): + missed_params = [] + for key in OPERATIONS[mode]['required']: + if params[key] is None: + missed_params.append(key) + + if missed_params: + module.fail_json(msg="missing required arguments: %s" % (",".join(missed_params))) + + +def _recreate_user(module, db, user, password, roles): + try: + db.command("dropUser", user) + db.command("createUser", user, pwd=password, roles=roles) + except Exception as e: + module.fail_json(msg='Unable to create user: %s' % to_native(e), exception=traceback.format_exc()) + + + +def user(module, client, db_name, **kwargs): + roles = kwargs['roles'] + if roles is None: + roles = [] + db = client[db_name] + + try: + db.command("createUser", kwargs['user'], pwd=kwargs['password'], roles=roles) + except DuplicateKeyError as e: + if kwargs['force_update']: + _recreate_user(module, db, kwargs['user'], kwargs['password'], roles) + else: + module.fail_json(msg='Unable to create user: %s' % to_native(e), exception=traceback.format_exc()) + except Exception as e: + module.fail_json(msg='Unable to create user: %s' % to_native(e), exception=traceback.format_exc()) + + module.exit_json(changed=True, user=kwargs['user']) + + +def doc(module, client, db_name, **kwargs): + coll = client[db_name][kwargs['collection']] + try: + coll.insert_one(kwargs['doc']) + except DuplicateKeyError as e: + if kwargs['force_update']: + try: + coll.replace_one({'_id': kwargs['doc']['_id']}, kwargs['doc']) + except Exception as e: + module.fail_json(msg='Unable to insert doc: %s' % to_native(e), exception=traceback.format_exc()) + else: + module.fail_json(msg='Unable to insert doc: %s' % to_native(e), exception=traceback.format_exc()) + except Exception as e: + module.fail_json(msg='Unable to insert doc: %s' % to_native(e), exception=traceback.format_exc()) + + kwargs['doc']['_id'] = str(kwargs['doc']['_id']) + module.exit_json(changed=True, doc=kwargs['doc']) + + +def _clean_index_direction(direction): + if direction in ["1", "-1"]: + direction = int(direction) + + if direction not in [ASCENDING, DESCENDING, GEO2D, GEOHAYSTACK, GEOSPHERE, HASHED, TEXT]: + raise UnknownIndexPlugin("Unable to create indexes: Unknown index plugin: %s" % direction) + return direction + + +def _clean_index_options(options): + res = {} + supported_options = set(['name', 'unique', 'background', 'sparse', 'bucketSize', 'min', 'max', 'expireAfterSeconds']) + for key in set(options.keys()).intersection(supported_options): + res[key] = options[key] + if key in ['min', 'max', 'bucketSize', 'expireAfterSeconds']: + res[key] = int(res[key]) + + return res + + +def parse_indexes(idx): + keys = [(k['field'], _clean_index_direction(k['direction'])) for k in idx.pop('index')] + options = _clean_index_options(idx) + return IndexModel(keys, **options) + + +def index(module, client, db_name, **kwargs): + parsed_indexes = map(parse_indexes, kwargs['indexes']) + try: + coll = client[db_name][kwargs['collection']] + coll.create_indexes(parsed_indexes) + except Exception as e: + module.fail_json(msg='Unable to create indexes: %s' % to_native(e), exception=traceback.format_exc()) + + module.exit_json(changed=True, indexes=kwargs['indexes']) + + +OPERATIONS = { + 'user': { 'function': user, 'params': ['user', 'password', 'roles', 'force_update'], 'required': ['user', 'password']}, + 'doc': {'function': doc, 'params': ['doc', 'collection', 'force_update'], 'required': ['doc', 'collection']}, + 'index': {'function': index, 'params': ['indexes', 'collection'], 'required': ['indexes', 'collection']} +} + + +# ========================================= +# Module execution. +# + +def main(): + module = AnsibleModule( + argument_spec=dict( + connect_string=dict(required=True), + database=dict(required=True, aliases=['db']), + mode=dict(default='user', choices=['user', 'doc', 'index']), + user=dict(default=None), + password=dict(default=None, no_log=True), + roles=dict(default=None, type='list'), + collection=dict(default=None), + doc=dict(default=None, type='dict'), + force_update=dict(default=False, type='bool'), + indexes=dict(default=None, type='list'), + ) + ) + + mode = module.params['mode'] + + db_name = module.params['database'] + + params = {key: module.params[key] for key in OPERATIONS[mode]['params']} + check_params(params, mode, module) + + try: + client = MongoClient(module.params['connect_string']) + except NameError: + module.fail_json(msg='the python pymongo module is required') + except Exception as e: + module.fail_json(msg='unable to connect to database: %s' % to_native(e), exception=traceback.format_exc()) + + OPERATIONS[mode]['function'](module, client, db_name, **params) + + +if __name__ == '__main__': + main() diff --git a/ansible/mongodb.yml b/ansible/mongodb.yml new file mode 100644 index 00000000000..46e94828c6c --- /dev/null +++ b/ansible/mongodb.yml @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more contributor +# license agreements; and to You under the Apache License, Version 2.0. +--- +# This playbook deploys a MongoDB for Openwhisk. + +- hosts: localhost + tasks: + - name: check if db_local.ini exists? + tags: ini + stat: path="{{ playbook_dir }}/db_local.ini" + register: db_check + + - name: prepare db_local.ini + tags: ini + local_action: template src="db_local.ini.j2" dest="{{ playbook_dir }}/db_local.ini" + when: not db_check.stat.exists + +# This is for test, only deploy it on the first node, please use a shard cluster mongodb for +# production env +- hosts: db[0] + roles: + - mongodb diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml index 64724c41d54..7b4d5de29ef 100644 --- a/ansible/roles/controller/tasks/deploy.yml +++ b/ansible/roles/controller/tasks/deploy.yml @@ -292,6 +292,11 @@ env: "{{ env | combine(elastic_env) }}" when: db.activation_store.backend == "ElasticSearch" +- name: merge mongodb env variables + set_fact: + env: "{{ env | combine(mongodb.commonEnv) }}" + when: db.backend == "MongoDB" + - name: populate volumes for controller set_fact: controller_volumes: diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml index ea4ce48114b..c7b4eec3b75 100644 --- a/ansible/roles/invoker/tasks/deploy.yml +++ b/ansible/roles/invoker/tasks/deploy.yml @@ -307,6 +307,11 @@ env: "{{ env | combine(elastic_env) }}" when: db.activation_store.backend == "ElasticSearch" +- name: merge mongodb env variables + set_fact: + env: "{{ env | combine(mongodb.commonEnv) }}" + when: db.backend == "MongoDB" + - name: include plugins include_tasks: "{{ inv_item }}.yml" with_items: "{{ invoker_plugins | default([]) }}" diff --git a/ansible/roles/mongodb/tasks/clean.yml b/ansible/roles/mongodb/tasks/clean.yml new file mode 100644 index 00000000000..b095a7ce1fa --- /dev/null +++ b/ansible/roles/mongodb/tasks/clean.yml @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more contributor +# license agreements; and to You under the Apache License, Version 2.0. +--- +# Remove MongoDB server + +- name: remove MongoDB + docker_container: + name: mongodb + state: absent + keep_volumes: False + +- name: remove MongoDB data volume + docker_volume: + name: "{{ db.mongodb.data_volume }}" + state: absent diff --git a/ansible/roles/mongodb/tasks/deploy.yml b/ansible/roles/mongodb/tasks/deploy.yml new file mode 100644 index 00000000000..4853b480ba0 --- /dev/null +++ b/ansible/roles/mongodb/tasks/deploy.yml @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more contributor +# license agreements; and to You under the Apache License, Version 2.0. +--- +# This role will run a MongoDB server on the db group, this is only for test, please use +# shared cluster for production env + +- name: (re)start mongodb + vars: + mongodb_image: "{{ mongodb.docker_image | default('mongo:' ~ mongodb.version ) }}" + docker_container: + name: mongodb + image: "{{ mongodb_image }}" + state: started + recreate: true + restart_policy: "{{ docker.restart.policy }}" + hostname: "mongodb" + user: "mongodb" + volumes: + - "{{ db.mongodb.data_volume }}:/data/db" + ports: + - "27017:27017" + +- name: wait until the MongoDB in this host is up and running + local_action: wait_for host={{ ansible_host }} port=27017 state=started delay=5 timeout=60 diff --git a/ansible/roles/mongodb/tasks/main.yml b/ansible/roles/mongodb/tasks/main.yml new file mode 100644 index 00000000000..e402c082ed2 --- /dev/null +++ b/ansible/roles/mongodb/tasks/main.yml @@ -0,0 +1,13 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more contributor +# license agreements; and to You under the Apache License, Version 2.0. +--- +# This role will deploy a database server. Use the role if you want to use CouchCB locally. +# In deploy mode it will start the MongoDB container. +# In clean mode it will remove the MongoDB container. + +- import_tasks: deploy.yml + when: mode == "deploy" + +- import_tasks: clean.yml + when: mode == "clean" + diff --git a/ansible/tasks/db/checkDb.yml b/ansible/tasks/db/checkDb.yml index 5962400cdac..922deca1d43 100644 --- a/ansible/tasks/db/checkDb.yml +++ b/ansible/tasks/db/checkDb.yml @@ -28,3 +28,7 @@ user: "{{ dbUser }}" password: "{{ dbPass }}" force_basic_auth: yes + when: db.backend == "CouchDB" + +# the collection in MongoDB doesn't need to be created in advance, so just skip it +# - name: check if {{ dbName }} on MongoDB exists diff --git a/ansible/templates/whisk.conf.j2 b/ansible/templates/whisk.conf.j2 index 9bb9d538ab6..e6da958e6f0 100644 --- a/ansible/templates/whisk.conf.j2 +++ b/ansible/templates/whisk.conf.j2 @@ -14,4 +14,14 @@ whisk { WhiskActivation = "{{ db.whisk.activations }}" } } + {% if db.backend == 'MongoDB' %} + mongodb { + uri = "{{ db.mongodb.connect_string }}" + database = "{{ db.mongodb.database }}" + } + + spi { + ArtifactStoreProvider = org.apache.openwhisk.core.database.mongodb.MongoDBArtifactStoreProvider + } + {% endif %} } diff --git a/common/scala/build.gradle b/common/scala/build.gradle index ffd7480021a..79a36251a24 100644 --- a/common/scala/build.gradle +++ b/common/scala/build.gradle @@ -90,6 +90,8 @@ dependencies { compile "com.microsoft.azure:azure-cosmosdb:2.6.2" compile "com.sksamuel.elastic4s:elastic4s-http_${gradle.scala.depVersion}:6.7.4" + //for mongo + compile "org.mongodb.scala:mongo-scala-driver_${gradle.scala.depVersion}:2.6.0" compile ("com.lightbend.akka:akka-stream-alpakka-s3_${gradle.scala.depVersion}:1.1.2") { exclude group: 'org.apache.httpcomponents' //Not used as alpakka uses akka-http diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf index a894360cdab..cde2d859068 100644 --- a/common/scala/src/main/resources/application.conf +++ b/common/scala/src/main/resources/application.conf @@ -373,6 +373,13 @@ whisk { #} } + # MongoDB related configuration + # For example: + # mongodb { + # uri = mongodb://localhost:27017 # DB Uri + # database = # Database name + #} + # transaction ID related configuration transactions { header = "X-Request-ID" diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala index 19ad39de3c9..c3581529993 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala @@ -207,6 +207,7 @@ object ConfigKeys { val couchdb = "whisk.couchdb" val cosmosdb = "whisk.cosmosdb" + val mongodb = "whisk.mongodb" val kafka = "whisk.kafka" val kafkaCommon = s"$kafka.common" val kafkaProducer = s"$kafka.producer" diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamSink.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamSink.scala new file mode 100644 index 00000000000..4387d23f786 --- /dev/null +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamSink.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database.mongodb + +import java.nio.ByteBuffer + +import akka.Done +import akka.stream.{Attributes, IOResult, Inlet, SinkShape} +import akka.stream.scaladsl.Sink +import akka.stream.stage.{AsyncCallback, GraphStageLogic, GraphStageWithMaterializedValue, InHandler} +import akka.util.ByteString +import org.mongodb.scala.Completed +import org.mongodb.scala.gridfs.{AsyncOutputStream} + +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Success, Try} + +class AsyncStreamSink(stream: AsyncOutputStream)(implicit ec: ExecutionContext) + extends GraphStageWithMaterializedValue[SinkShape[ByteString], Future[IOResult]] { + val in: Inlet[ByteString] = Inlet("AsyncStream.in") + + override val shape: SinkShape[ByteString] = SinkShape(in) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[IOResult]) = { + val ioResultPromise = Promise[IOResult]() + val logic = new GraphStageLogic(shape) with InHandler { + handler => + var buffers: Iterator[ByteBuffer] = Iterator() + var writeCallback: AsyncCallback[Try[Int]] = _ + var closeCallback: AsyncCallback[Try[Completed]] = _ + var position: Int = _ + var writeDone = Promise[Completed] + + setHandler(in, this) + + override def preStart(): Unit = { + //close operation is async and thus requires the stage to remain open + //even after all data is read + setKeepGoing(true) + writeCallback = getAsyncCallback[Try[Int]](handleWriteResult) + closeCallback = getAsyncCallback[Try[Completed]](handleClose) + pull(in) + } + + override def onPush(): Unit = { + buffers = grab(in).asByteBuffers.iterator + writeDone = Promise[Completed] + writeNextBufferOrPull() + } + + override def onUpstreamFinish(): Unit = { + //Work done perform close + //Using async "blessed" callback does not work at this stage so + // need to invoke as normal callback + //TODO Revisit this + + //write of ByteBuffers from ByteString is an async operation. For last push + //the write operation may involve multiple async callbacks and by that time + //onUpstreamFinish may get invoked. So to ensure that close operation is performed + //"after" the last push writes are done we rely on writeDone promise + //and schedule the close on its completion + writeDone.future.onComplete(_ => stream.close().head().onComplete(handleClose)) + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + fail(ex) + } + + private def handleWriteResult(bytesWrittenOrFailure: Try[Int]): Unit = bytesWrittenOrFailure match { + case Success(bytesWritten) => + position += bytesWritten + writeNextBufferOrPull() + case Failure(failure) => fail(failure) + } + + private def handleClose(completed: Try[Completed]): Unit = completed match { + case Success(Completed()) => + completeStage() + ioResultPromise.trySuccess(IOResult(position, Success(Done))) + case Failure(failure) => + fail(failure) + } + + private def writeNextBufferOrPull(): Unit = { + if (buffers.hasNext) { + stream.write(buffers.next()).head().onComplete(writeCallback.invoke) + } else { + writeDone.trySuccess(Completed()) + pull(in) + } + } + + private def fail(failure: Throwable) = { + failStage(failure) + ioResultPromise.trySuccess(IOResult(position, Failure(failure))) + } + + } + (logic, ioResultPromise.future) + } +} + +object AsyncStreamSink { + def apply(stream: AsyncOutputStream)(implicit ec: ExecutionContext): Sink[ByteString, Future[IOResult]] = { + Sink.fromGraph(new AsyncStreamSink(stream)) + } +} diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamSource.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamSource.scala new file mode 100644 index 00000000000..d5732946296 --- /dev/null +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamSource.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database.mongodb + +import java.nio.ByteBuffer + +import akka.Done +import akka.stream.SourceShape +import akka.stream.Attributes +import akka.stream.Outlet +import akka.stream.IOResult +import akka.stream.scaladsl.Source +import akka.stream.stage.GraphStageLogic +import akka.stream.stage.OutHandler +import akka.stream.stage.GraphStageWithMaterializedValue +import akka.stream.stage.AsyncCallback +import akka.util.ByteString +import org.mongodb.scala.Completed +import org.mongodb.scala.gridfs.AsyncInputStream + +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.Promise +import scala.util.Success +import scala.util.Try +import scala.util.Failure + +class AsyncStreamSource(stream: AsyncInputStream, chunkSize: Int)(implicit ec: ExecutionContext) + extends GraphStageWithMaterializedValue[SourceShape[ByteString], Future[IOResult]] { + require(chunkSize > 0, "chunkSize must be greater than 0") + val out: Outlet[ByteString] = Outlet("AsyncStream.out") + + override val shape: SourceShape[ByteString] = SourceShape(out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[IOResult]) = { + val ioResultPromise = Promise[IOResult]() + val logic = new GraphStageLogic(shape) with OutHandler { + handler => + val buffer = ByteBuffer.allocate(chunkSize) + var readCallback: AsyncCallback[Try[Int]] = _ + var closeCallback: AsyncCallback[Try[Completed]] = _ + var position: Int = _ + + setHandler(out, this) + + override def preStart(): Unit = { + readCallback = getAsyncCallback[Try[Int]](handleBufferRead) + closeCallback = getAsyncCallback[Try[Completed]](handleClose) + } + + override def onPull(): Unit = { + stream.read(buffer).head().onComplete(readCallback.invoke) + } + + private def handleBufferRead(bytesReadOrFailure: Try[Int]): Unit = bytesReadOrFailure match { + case Success(bytesRead) if bytesRead >= 0 => + buffer.flip + push(out, ByteString.fromByteBuffer(buffer)) + buffer.clear + position += bytesRead + case Success(_) => + stream.close().head().onComplete(closeCallback.invoke) //Work done perform close + case Failure(failure) => + fail(failure) + } + + private def handleClose(completed: Try[Completed]): Unit = completed match { + case Success(Completed()) => + completeStage() + ioResultPromise.trySuccess(IOResult(position, Success(Done))) + case Failure(failure) => + fail(failure) + } + + private def fail(failure: Throwable) = { + failStage(failure) + ioResultPromise.trySuccess(IOResult(position, Failure(failure))) + } + } + (logic, ioResultPromise.future) + } +} + +object AsyncStreamSource { + def apply(stream: AsyncInputStream, chunkSize: Int = 512 * 1024)( + implicit ec: ExecutionContext): Source[ByteString, Future[IOResult]] = { + Source.fromGraph(new AsyncStreamSource(stream, chunkSize)) + } +} diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala new file mode 100644 index 00000000000..07f7a443c08 --- /dev/null +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala @@ -0,0 +1,663 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database.mongodb + +import java.security.MessageDigest + +import akka.actor.ActorSystem +import akka.event.Logging.ErrorLevel +import akka.http.scaladsl.model._ +import akka.stream.ActorMaterializer +import akka.stream.scaladsl._ +import akka.util.ByteString +import com.mongodb.client.gridfs.model.GridFSUploadOptions +import org.apache.openwhisk.common.{Logging, LoggingMarkers, TransactionId} +import org.apache.openwhisk.core.database._ +import org.apache.openwhisk.core.database.StoreUtils._ +import org.apache.openwhisk.core.entity.Attachments.Attached +import org.apache.openwhisk.core.entity.{DocId, DocInfo, DocRevision, DocumentReader, UUID} +import org.apache.openwhisk.http.Messages +import org.bson.json.{JsonMode, JsonWriterSettings} +import org.mongodb.scala.bson.BsonString +import org.mongodb.scala.bson.collection.immutable.Document +import org.mongodb.scala.gridfs.{GridFSBucket, GridFSFile, MongoGridFSException} +import org.mongodb.scala.model._ +import org.mongodb.scala.{MongoClient, MongoCollection, MongoException} +import spray.json._ + +import scala.concurrent.Future +import scala.util.Try + +object MongoDBArtifactStore { + val _computed = "_computed" +} + +/** + * Basic client to put and delete artifacts in a data store. + * + * @param client the mongodb client to access database + * @param dbName the name of the database to operate on + * @param collName the name of the collection to operate on + * @param documentHandler helper class help to simulate the designDoc of CouchDB + * @param viewMapper helper class help to simulate the designDoc of CouchDB + */ +class MongoDBArtifactStore[DocumentAbstraction <: DocumentSerializer](client: MongoClient, + dbName: String, + collName: String, + documentHandler: DocumentHandler, + viewMapper: MongoDBViewMapper, + val inliningConfig: InliningConfig, + val attachmentStore: Option[AttachmentStore])( + implicit system: ActorSystem, + val logging: Logging, + jsonFormat: RootJsonFormat[DocumentAbstraction], + val materializer: ActorMaterializer, + docReader: DocumentReader) + extends ArtifactStore[DocumentAbstraction] + with DocumentProvider + with DefaultJsonProtocol + with AttachmentSupport[DocumentAbstraction] { + + import MongoDBArtifactStore._ + + protected[core] implicit val executionContext = system.dispatcher + + private val mongodbScheme = "mongodb" + val attachmentScheme: String = attachmentStore.map(_.scheme).getOrElse(mongodbScheme) + + private val database = client.getDatabase(dbName) + private val collection = getCollectionAndCreateIndexes + private val gridFSBucket = GridFSBucket(database, collName) + + private val jsonWriteSettings = JsonWriterSettings.builder().outputMode(JsonMode.RELAXED).build + + // MongoDB doesn't support using `$` as the first char of field name, so below two fields needs to be encoded first + private val fieldsNeedEncode = Seq("annotations", "parameters") + + override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] = { + val asJson = d.toDocumentRecord + + val id: String = asJson.fields.getOrElse("_id", JsString.empty).convertTo[String].trim + require(!id.isEmpty, "document id must be defined") + + val (old_rev, rev) = revisionCalculate(asJson) + val docinfoStr = s"id: $id, rev: $rev" + val start = + transid.started(this, LoggingMarkers.DATABASE_SAVE, s"[PUT] '$collName' saving document: '$docinfoStr'") + + val encodedData = encodeFields(fieldsNeedEncode, asJson) + + val data = JsObject( + encodedData.fields + (_computed -> documentHandler.computedFields(asJson)) + ("_rev" -> rev.toJson)) + + val filters = + if (rev.startsWith("1-")) { + // for new document, we should get no matched document and insert new one + // if there is a matched document, that one with no _rev filed will be replaced + // if there is a document with the same id but has an _rev field, will return en E11000(conflict) error + Filters.and(Filters.eq("_id", id), Filters.not(Filters.exists("_rev"))) + } else { + // for old document, we should find a matched document and replace it + // if no matched document find and try to insert new document, mongodb will return an E11000 error + Filters.and(Filters.eq("_id", id), Filters.eq("_rev", old_rev)) + } + + val f = + collection + .findOneAndReplace( + filters, + Document(data.compactPrint), + FindOneAndReplaceOptions().upsert(true).returnDocument(ReturnDocument.AFTER)) + .toFuture() + .map { doc => + transid.finished(this, start, s"[PUT] '$collName' completed document: '$docinfoStr', document: '$doc'") + DocInfo(DocId(id), DocRevision(rev)) + } + .recover { + case t: MongoException if t.getCode == 11000 => + transid.finished(this, start, s"[PUT] '$dbName', document: '$docinfoStr'; conflict.") + throw DocumentConflictException("conflict on 'put'") + case t: MongoException => + transid.failed( + this, + start, + s"[PUT] '$dbName' failed to put document: '$docinfoStr'; return error code: '${t.getCode}'", + ErrorLevel) + throw new Exception("Unexpected mongodb server error: " + t.getMessage) + } + + reportFailure( + f, + failure => + transid + .failed(this, start, s"[PUT] '$collName' internal error, failure: '${failure.getMessage}'", ErrorLevel)) + } + + override protected[database] def del(doc: DocInfo)(implicit transid: TransactionId): Future[Boolean] = { + require(doc != null && doc.rev.asString != null, "doc revision required for delete") + + val start = + transid.started(this, LoggingMarkers.DATABASE_DELETE, s"[DEL] '$collName' deleting document: '$doc'") + + val f = collection + .deleteOne(Filters.and(Filters.eq("_id", doc.id.id), Filters.eq("_rev", doc.rev.rev))) + .toFuture() + .flatMap { result => + if (result.getDeletedCount == 1) { // the result can only be 1 or 0 + transid.finished(this, start, s"[DEL] '$collName' completed document: '$doc'") + Future(true) + } else { + collection.find(Filters.eq("_id", doc.id.id)).toFuture.map { result => + if (result.size == 1) { + // find the document according to _id, conflict + transid.finished(this, start, s"[DEL] '$collName', document: '$doc'; conflict.") + throw DocumentConflictException("conflict on 'delete'") + } else { + // doesn't find the document according to _id, not found + transid.finished(this, start, s"[DEL] '$collName', document: '$doc'; not found.") + throw NoDocumentException(s"$doc not found on 'delete'") + } + } + } + } + .recover { + case t: MongoException => + transid.failed( + this, + start, + s"[DEL] '$collName' failed to delete document: '$doc'; error code: '${t.getCode}'", + ErrorLevel) + throw new Exception("Unexpected mongodb server error: " + t.getMessage) + } + + reportFailure( + f, + failure => + transid.failed( + this, + start, + s"[DEL] '$collName' internal error, doc: '$doc', failure: '${failure.getMessage}'", + ErrorLevel)) + } + + override protected[database] def get[A <: DocumentAbstraction](doc: DocInfo, + attachmentHandler: Option[(A, Attached) => A] = None)( + implicit transid: TransactionId, + ma: Manifest[A]): Future[A] = { + + val start = transid.started(this, LoggingMarkers.DATABASE_GET, s"[GET] '$dbName' finding document: '$doc'") + + require(doc != null, "doc undefined") + + val f = collection + .find(Filters.eq("_id", doc.id.id)) // method deserialize will check whether the _rev matched + .toFuture() + .map(result => + if (result.isEmpty) { + transid.finished(this, start, s"[GET] '$collName', document: '$doc'; not found.") + throw NoDocumentException("not found on 'get'") + } else { + transid.finished(this, start, s"[GET] '$collName' completed: found document '$doc'") + val response = result.head.toJson(jsonWriteSettings).parseJson.asJsObject + val decodeData = decodeFields(fieldsNeedEncode, response) + + val deserializedDoc = deserialize[A, DocumentAbstraction](doc, decodeData) + attachmentHandler + .map(processAttachments(deserializedDoc, decodeData, doc.id.id, _)) + .getOrElse(deserializedDoc) + }) + .recoverWith { + case t: MongoException => + transid.finished(this, start, s"[GET] '$collName' failed to get document: '$doc'; error code: '${t.getCode}'") + throw new Exception("Unexpected mongodb server error: " + t.getMessage) + case _: DeserializationException => throw DocumentUnreadable(Messages.corruptedEntity) + } + + reportFailure( + f, + failure => + transid.failed( + this, + start, + s"[GET] '$collName' internal error, doc: '$doc', failure: '${failure.getMessage}'", + ErrorLevel)) + } + + override protected[database] def get(id: DocId)(implicit transid: TransactionId): Future[Option[JsObject]] = { + val start = transid.started(this, LoggingMarkers.DATABASE_GET, s"[GET] '$collName' finding document: '$id'") + val f = collection + .find(Filters.equal("_id", id.id)) + .head() + .map { + case d: Document => + transid.finished(this, start, s"[GET] '$dbName' completed: found document '$id'") + Some(decodeFields(fieldsNeedEncode, d.toJson(jsonWriteSettings).parseJson.asJsObject)) + case null => + transid.finished(this, start, s"[GET] '$dbName', document: '$id'; not found.") + None + } + .recover { + case t: MongoException => + transid.failed( + this, + start, + s"[GET] '$collName' failed to get document: '$id'; error code: '${t.getCode}'", + ErrorLevel) + throw new Exception("Unexpected mongodb server error: " + t.getMessage) + } + + reportFailure( + f, + failure => + transid.failed( + this, + start, + s"[GET] '$collName' internal error, doc: '$id', failure: '${failure.getMessage}'", + ErrorLevel)) + } + + override protected[core] def query(table: String, + startKey: List[Any], + endKey: List[Any], + skip: Int, + limit: Int, + includeDocs: Boolean, + descending: Boolean, + reduce: Boolean, + stale: StaleParameter)(implicit transid: TransactionId): Future[List[JsObject]] = { + require(!(reduce && includeDocs), "reduce and includeDocs cannot both be true") + require(!reduce, "Reduce scenario not supported") //TODO Investigate reduce + require(skip >= 0, "skip should be non negative") + require(limit >= 0, "limit should be non negative") + + val Array(ddoc, viewName) = table.split("/") + + val find = collection + .find(viewMapper.filter(ddoc, viewName, startKey, endKey)) + + viewMapper.sort(ddoc, viewName, descending).foreach(find.sort) + + find.skip(skip).limit(limit) + + val realIncludeDocs = includeDocs | documentHandler.shouldAlwaysIncludeDocs(ddoc, viewName) + val start = transid.started(this, LoggingMarkers.DATABASE_QUERY, s"[QUERY] '$collName' searching '$table") + + val f = find + .toFuture() + .map { docs => + transid.finished(this, start, s"[QUERY] '$dbName' completed: matched ${docs.size}") + docs.map { doc => + val js = decodeFields(fieldsNeedEncode, doc.toJson(jsonWriteSettings).parseJson.convertTo[JsObject]) + documentHandler.transformViewResult( + ddoc, + viewName, + startKey, + endKey, + realIncludeDocs, + JsObject(js.fields - _computed), + MongoDBArtifactStore.this) + } + } + .flatMap(Future.sequence(_)) + .map(_.flatten.toList) + .recover { + case t: MongoException => + transid.failed(this, start, s"[QUERY] '$collName' failed; error code: '${t.getCode}'", ErrorLevel) + throw new Exception("Unexpected mongodb server error: " + t.getMessage) + } + + reportFailure( + f, + failure => + transid + .failed(this, start, s"[QUERY] '$collName' internal error, failure: '${failure.getMessage}'", ErrorLevel)) + } + + protected[core] def count(table: String, startKey: List[Any], endKey: List[Any], skip: Int, stale: StaleParameter)( + implicit transid: TransactionId): Future[Long] = { + require(skip >= 0, "skip should be non negative") + + val Array(ddoc, viewName) = table.split("/") + val start = transid.started(this, LoggingMarkers.DATABASE_QUERY, s"[COUNT] '$dbName' searching '$table") + + val query = viewMapper.filter(ddoc, viewName, startKey, endKey) + + val option = CountOptions().skip(skip) + val f = + collection + .countDocuments(query, option) + .toFuture() + .map { result => + transid.finished(this, start, s"[COUNT] '$collName' completed: count $result") + result + } + .recover { + case t: MongoException => + transid.failed(this, start, s"[COUNT] '$collName' failed; error code: '${t.getCode}'", ErrorLevel) + throw new Exception("Unexpected mongodb server error: " + t.getMessage) + } + + reportFailure( + f, + failure => + transid + .failed(this, start, s"[COUNT] '$dbName' internal error, failure: '${failure.getMessage}'", ErrorLevel)) + } + + override protected[database] def putAndAttach[A <: DocumentAbstraction]( + doc: A, + update: (A, Attached) => A, + contentType: ContentType, + docStream: Source[ByteString, _], + oldAttachment: Option[Attached])(implicit transid: TransactionId): Future[(DocInfo, Attached)] = { + + attachmentStore match { + case Some(as) => + attachToExternalStore(doc, update, contentType, docStream, oldAttachment, as) + case None => + attachToMongo(doc, update, contentType, docStream, oldAttachment) + } + + } + + private def attachToMongo[A <: DocumentAbstraction]( + doc: A, + update: (A, Attached) => A, + contentType: ContentType, + docStream: Source[ByteString, _], + oldAttachment: Option[Attached])(implicit transid: TransactionId): Future[(DocInfo, Attached)] = { + + for { + bytesOrSource <- inlineOrAttach(docStream) + uri = uriOf(bytesOrSource, UUID().asString) + attached <- { + bytesOrSource match { + case Left(bytes) => + Future.successful(Attached(uri.toString, contentType, Some(bytes.size), Some(digest(bytes)))) + case Right(source) => + attach(doc, uri.path.toString, contentType, source).map { r => + Attached(uri.toString, contentType, Some(r.length), Some(r.digest)) + } + } + } + docInfo <- put(update(doc, attached)) + + //Remove old attachment if it was part of attachmentStore + _ <- oldAttachment + .map { old => + val oldUri = Uri(old.attachmentName) + if (oldUri.scheme == mongodbScheme) { + val name = oldUri.path.toString + gridFSBucket.delete(BsonString(s"${docInfo.id.id}/$name")).toFuture.map { _ => + true + } + } else { + Future.successful(true) + } + } + .getOrElse(Future.successful(true)) + } yield (docInfo, attached) + } + + private def attach(d: DocumentAbstraction, name: String, contentType: ContentType, docStream: Source[ByteString, _])( + implicit transid: TransactionId): Future[AttachResult] = { + + logging.info(this, s"Uploading attach $name") + val asJson = d.toDocumentRecord + val id: String = asJson.fields("_id").convertTo[String].trim + require(!id.isEmpty, "document id must be defined") + + val start = transid.started( + this, + LoggingMarkers.DATABASE_ATT_SAVE, + s"[ATT_PUT] '$collName' uploading attachment '$name' of document 'id: $id'") + + val document: org.bson.Document = new org.bson.Document("contentType", contentType.toString) + //add the document id to the metadata + document.append("belongsTo", id) + + val option = new GridFSUploadOptions().metadata(document) + + val uploadStream = gridFSBucket.openUploadStream(BsonString(s"$id/$name"), name, option) + val sink = AsyncStreamSink(uploadStream) + + val f = docStream + .runWith(combinedSink(sink)) + .map { r => + transid + .finished(this, start, s"[ATT_PUT] '$collName' completed uploading attachment '$name' of document '$id'") + AttachResult(r.digest, r.length) + } + .recover { + case t: MongoException => + transid.failed( + this, + start, + s"[ATT_PUT] '$collName' failed to upload attachment '$name' of document '$id'; error code '${t.getCode}'", + ErrorLevel) + throw new Exception("Unexpected mongodb server error: " + t.getMessage) + } + + reportFailure( + f, + failure => + transid.failed( + this, + start, + s"[ATT_PUT] '$collName' internal error, name: '$name', doc: '$id', failure: '${failure.getMessage}'", + ErrorLevel)) + } + + override protected[core] def readAttachment[T](doc: DocInfo, attached: Attached, sink: Sink[ByteString, Future[T]])( + implicit transid: TransactionId): Future[T] = { + + val name = attached.attachmentName + val attachmentUri = Uri(name) + + attachmentUri.scheme match { + case AttachmentSupport.MemScheme => + memorySource(attachmentUri).runWith(sink) + case s if s == mongodbScheme || attachmentUri.isRelative => + //relative case is for compatibility with earlier naming approach where attachment name would be like 'jarfile' + //Compared to current approach of ':' + readAttachmentFromMongo(doc, attachmentUri, sink) + case s if attachmentStore.isDefined && attachmentStore.get.scheme == s => + attachmentStore.get.readAttachment(doc.id, attachmentUri.path.toString, sink) + case _ => + throw new IllegalArgumentException(s"Unknown attachment scheme in attachment uri $attachmentUri") + } + } + + private def readAttachmentFromMongo[T](doc: DocInfo, attachmentUri: Uri, sink: Sink[ByteString, Future[T]])( + implicit transid: TransactionId): Future[T] = { + + val attachmentName = attachmentUri.path.toString + val start = transid.started( + this, + LoggingMarkers.DATABASE_ATT_GET, + s"[ATT_GET] '$dbName' finding attachment '$attachmentName' of document '$doc'") + + require(doc != null, "doc undefined") + require(doc.rev.rev != null, "doc revision must be specified") + + val downloadStream = gridFSBucket.openDownloadStream(BsonString(s"${doc.id.id}/$attachmentName")) + + def readStream(file: GridFSFile) = { + val source = AsyncStreamSource(downloadStream) + source + .runWith(sink) + .map { result => + transid + .finished( + this, + start, + s"[ATT_GET] '$collName' completed: found attachment '$attachmentName' of document '$doc'") + result + } + } + + def getGridFSFile = { + downloadStream + .gridFSFile() + .head() + .transform( + identity, { + case ex: MongoGridFSException if ex.getMessage.contains("File not found") => + transid.finished( + this, + start, + s"[ATT_GET] '$collName', retrieving attachment '$attachmentName' of document '$doc'; not found.") + NoDocumentException("Not found on 'readAttachment'.") + case ex: MongoGridFSException => + transid.failed( + this, + start, + s"[ATT_GET] '$collName' failed to get attachment '$attachmentName' of document '$doc'; error code: '${ex.getCode}'", + ErrorLevel) + throw new Exception("Unexpected mongodb server error: " + ex.getMessage) + case t => t + }) + } + + val f = for { + file <- getGridFSFile + result <- readStream(file) + } yield result + + reportFailure( + f, + failure => + transid.failed( + this, + start, + s"[ATT_GET] '$dbName' internal error, name: '$attachmentName', doc: '$doc', failure: '${failure.getMessage}'", + ErrorLevel)) + + } + + override protected[core] def deleteAttachments[T](doc: DocInfo)(implicit transid: TransactionId): Future[Boolean] = + attachmentStore + .map(as => as.deleteAttachments(doc.id)) + .getOrElse(Future.successful(true)) // For MongoDB it is expected that the entire document is deleted. + + override def shutdown(): Unit = { + // MongoClient maintains the connection pool internally, we don't need to manage it + attachmentStore.foreach(_.shutdown()) + } + + private def reportFailure[T, U](f: Future[T], onFailure: Throwable => U): Future[T] = { + f.failed.foreach { + case _: ArtifactStoreException => // These failures are intentional and shouldn't trigger the catcher. + case x => onFailure(x) + } + f + } + + // calculate the revision manually, to be compatible with couchdb's _rev field + private def revisionCalculate(doc: JsObject): (String, String) = { + val md: MessageDigest = MessageDigest.getInstance("MD5") + val new_rev = md.digest(doc.toString.getBytes()).map(0xFF & _).map { "%02x".format(_) }.foldLeft("") { _ + _ } + doc.fields + .get("_rev") + .map { value => + val start = value.convertTo[String].trim.split("-").apply(0).toInt + 1 + (value.convertTo[String].trim, s"$start-$new_rev") + } + .getOrElse { + ("", s"1-$new_rev") + } + } + + private def processAttachments[A <: DocumentAbstraction](doc: A, + js: JsObject, + docId: String, + attachmentHandler: (A, Attached) => A): A = { + js.fields("exec").asJsObject().fields.get("code").map { + case code: JsObject => + code.getFields("attachmentName", "attachmentType", "digest", "length") match { + case Seq(JsString(name), JsString(contentTypeValue), JsString(digest), JsNumber(length)) => + val contentType = ContentType.parse(contentTypeValue) match { + case Right(ct) => ct + case Left(_) => ContentTypes.NoContentType //Should not happen + } + attachmentHandler( + doc, + Attached(getAttachmentName(name), contentType, Some(length.longValue()), Some(digest))) + case x => + throw DeserializationException("Attachment json does not have required fields" + x) + } + case _ => doc + } getOrElse { + doc // This should not happen as an action always contain field: exec.code + } + } + + /** + * Determines if the attachment scheme confirms to new UUID based scheme or not + * and generates the name based on that + */ + private def getAttachmentName(name: String): String = { + Try(java.util.UUID.fromString(name)) + .map(_ => Uri.from(scheme = attachmentScheme, path = name).toString) + .getOrElse(name) + } + + private def getCollectionAndCreateIndexes: MongoCollection[Document] = { + val coll = database.getCollection(collName) + // create indexes in specific collection if they do not exist + coll.listIndexes().toFuture().map { idxes => + val keys = idxes.map { + _.get("key").map { fields => + Document(fields.asDocument()) + } getOrElse { + Document.empty // this should not happen + } + } + + viewMapper.indexes.foreach { idx => + if (!keys.contains(idx)) + coll.createIndex(idx).toFuture + } + } + coll + } + + // encode JsValue which has complex and arbitrary structure to JsString + private def encodeFields(fields: Seq[String], jsValue: JsObject): JsObject = { + var data = jsValue.fields + fields.foreach { field => + data.get(field).foreach { value => + data = data.updated(field, JsString(value.compactPrint)) + } + } + JsObject(data) + } + + // decode fields from JsString + private def decodeFields(fields: Seq[String], jsValue: JsObject): JsObject = { + var data = jsValue.fields + fields.foreach { field => + data.get(field).foreach { value => + Try { + data = data.updated(field, value.asInstanceOf[JsString].value.parseJson) + } + } + } + JsObject(data) + } +} diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreProvider.scala new file mode 100644 index 00000000000..72fe371aceb --- /dev/null +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreProvider.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database.mongodb + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import org.apache.openwhisk.common.Logging +import org.apache.openwhisk.core.ConfigKeys +import org.apache.openwhisk.core.database._ +import org.apache.openwhisk.core.entity.size._ +import org.apache.openwhisk.core.entity.{DocumentReader, WhiskActivation, WhiskAuth, WhiskEntity} +import org.mongodb.scala.MongoClient +import pureconfig._ +import pureconfig.generic.auto._ +import spray.json.RootJsonFormat + +import scala.reflect.ClassTag + +case class MongoDBConfig(uri: String, database: String) { + assume(Set(database, uri).forall(_.nonEmpty), "At least one expected property is missing") + + def collectionFor[D](implicit tag: ClassTag[D]) = tag.runtimeClass.getSimpleName.toLowerCase +} + +object MongoDBClient { + private var _client: Option[MongoClient] = None + + def client(config: MongoDBConfig): MongoClient = { + _client.getOrElse { + val client = MongoClient(config.uri) + _client = Some(client) + client + } + } +} + +object MongoDBArtifactStoreProvider extends ArtifactStoreProvider { + + def makeStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean)( + implicit jsonFormat: RootJsonFormat[D], + docReader: DocumentReader, + actorSystem: ActorSystem, + logging: Logging, + materializer: ActorMaterializer): ArtifactStore[D] = { + val dbConfig = loadConfigOrThrow[MongoDBConfig](ConfigKeys.mongodb) + makeArtifactStore(dbConfig, getAttachmentStore()) + } + + def makeArtifactStore[D <: DocumentSerializer: ClassTag](dbConfig: MongoDBConfig, + attachmentStore: Option[AttachmentStore])( + implicit jsonFormat: RootJsonFormat[D], + docReader: DocumentReader, + actorSystem: ActorSystem, + logging: Logging, + materializer: ActorMaterializer): ArtifactStore[D] = { + + val inliningConfig = loadConfigOrThrow[InliningConfig](ConfigKeys.db) + + val (handler, mapper) = handlerAndMapper(implicitly[ClassTag[D]]) + + new MongoDBArtifactStore[D]( + MongoDBClient.client(dbConfig), + dbConfig.database, + dbConfig.collectionFor[D], + handler, + mapper, + inliningConfig, + attachmentStore) + } + + private def handlerAndMapper[D](entityType: ClassTag[D])( + implicit actorSystem: ActorSystem, + logging: Logging, + materializer: ActorMaterializer): (DocumentHandler, MongoDBViewMapper) = { + entityType.runtimeClass match { + case x if x == classOf[WhiskEntity] => + (WhisksHandler, WhisksViewMapper) + case x if x == classOf[WhiskActivation] => + (ActivationHandler, ActivationViewMapper) + case x if x == classOf[WhiskAuth] => + (SubjectHandler, SubjectViewMapper) + } + } +} diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBViewMapper.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBViewMapper.scala new file mode 100644 index 00000000000..fe792be506a --- /dev/null +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBViewMapper.scala @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database.mongodb + +import org.apache.openwhisk.core.database._ +import org.apache.openwhisk.core.entity.WhiskQueries +import org.mongodb.scala.Document +import org.mongodb.scala.bson.conversions.Bson +import org.mongodb.scala.model.Filters._ +import org.mongodb.scala.model.Sorts + +trait MongoDBViewMapper { + protected val _computed: String = "_computed" + protected val TOP: String = WhiskQueries.TOP + + val indexes: List[Document] + + def filter(ddoc: String, view: String, startKey: List[Any], endKey: List[Any]): Bson + + def sort(ddoc: String, view: String, descending: Boolean): Option[Bson] + + protected def checkKeys(startKey: List[Any], endKey: List[Any]): Unit = { + require(startKey.nonEmpty) + require(endKey.nonEmpty) + require(startKey.head == endKey.head, s"First key should be same => ($startKey) - ($endKey)") + } +} + +private object ActivationViewMapper extends MongoDBViewMapper { + private val NS = "namespace" + private val NS_WITH_PATH = s"${_computed}.${ActivationHandler.NS_PATH}" + private val START = "start" + override val indexes: List[Document] = + List( + Document(s"$START" -> -1), + Document(s"$START" -> -1, s"$NS" -> -1), + Document(s"$NS_WITH_PATH" -> -1, s"$START" -> -1)) + + override def filter(ddoc: String, view: String, startKey: List[Any], endKey: List[Any]): Bson = { + checkKeys(startKey, endKey) + view match { + //whisks-filters ddoc uses namespace + invoking action path as first key + case "activations" if ddoc.startsWith("whisks-filters") => createActivationFilter(NS_WITH_PATH, startKey, endKey) + //whisks ddoc uses namespace as first key + case "activations" if ddoc.startsWith("whisks") => createActivationFilter(NS, startKey, endKey) + case _ => throw UnsupportedView(s"$ddoc/$view") + } + } + + override def sort(ddoc: String, view: String, descending: Boolean): Option[Bson] = { + view match { + case "activations" if ddoc.startsWith("whisks-filters") => + val sort = if (descending) Sorts.descending(NS_WITH_PATH, START) else Sorts.ascending(NS_WITH_PATH, START) + Some(sort) + case "activations" if ddoc.startsWith("whisks") => + val sort = if (descending) Sorts.descending(NS, START) else Sorts.ascending(NS, START) + Some(sort) + case _ => throw UnsupportedView(s"$ddoc/$view") + } + } + + private def createActivationFilter(nsPropName: String, startKey: List[Any], endKey: List[Any]) = { + require(startKey.head.isInstanceOf[String]) + val matchNS = equal(nsPropName, startKey.head) + + val filter = (startKey, endKey) match { + case (_ :: Nil, _ :: `TOP` :: Nil) => + matchNS + case (_ :: since :: Nil, _ :: `TOP` :: `TOP` :: Nil) => + and(matchNS, gte(START, since)) + case (_ :: since :: Nil, _ :: upto :: `TOP` :: Nil) => + and(matchNS, gte(START, since), lte(START, upto)) + case _ => throw UnsupportedQueryKeys(s"$startKey, $endKey") + } + filter + } +} + +private object WhisksViewMapper extends MongoDBViewMapper { + private val NS = "namespace" + private val ROOT_NS = s"${_computed}.${WhisksHandler.ROOT_NS}" + private val TYPE = "entityType" + private val UPDATED = "updated" + private val PUBLISH = "publish" + private val BINDING = "binding" + override val indexes: List[Document] = + List(Document(s"$NS" -> -1, s"$UPDATED" -> -1), Document(s"$ROOT_NS" -> -1, s"$UPDATED" -> -1)) + + override def filter(ddoc: String, view: String, startKey: List[Any], endKey: List[Any]): Bson = { + checkKeys(startKey, endKey) + view match { + case "all" => listAllInNamespace(ddoc, view, startKey, endKey) + case _ => listCollectionInNamespace(ddoc, view, startKey, endKey) + } + } + + private def listCollectionInNamespace(ddoc: String, view: String, startKey: List[Any], endKey: List[Any]): Bson = { + + val entityType = getEntityType(ddoc, view) + + val matchType = equal(TYPE, entityType) + val matchNS = equal(NS, startKey.head) + val matchRootNS = equal(ROOT_NS, startKey.head) + + val filter = (startKey, endKey) match { + case (ns :: Nil, _ :: `TOP` :: Nil) => + or(and(matchType, matchNS), and(matchType, matchRootNS)) + case (ns :: since :: Nil, _ :: `TOP` :: `TOP` :: Nil) => + // @formatter:off + or( + and(matchType, matchNS, gte(UPDATED, since)), + and(matchType, matchRootNS, gte(UPDATED, since)) + ) + // @formatter:on + case (ns :: since :: Nil, _ :: upto :: `TOP` :: Nil) => + or( + and(matchType, matchNS, gte(UPDATED, since), lte(UPDATED, upto)), + and(matchType, matchRootNS, gte(UPDATED, since), lte(UPDATED, upto))) + case _ => throw UnsupportedQueryKeys(s"$ddoc/$view -> ($startKey, $endKey)") + } + if (view == "packages-public") + and(equal(BINDING, Map.empty), equal(PUBLISH, true), filter) + else + filter + } + + private def listAllInNamespace(ddoc: String, view: String, startKey: List[Any], endKey: List[Any]): Bson = { + val matchRootNS = equal(ROOT_NS, startKey.head) + val filter = (startKey, endKey) match { + case (ns :: Nil, _ :: `TOP` :: Nil) => + and(exists(TYPE), matchRootNS) + case _ => throw UnsupportedQueryKeys(s"$ddoc/$view -> ($startKey, $endKey)") + } + filter + } + + override def sort(ddoc: String, view: String, descending: Boolean): Option[Bson] = { + view match { + case "actions" | "rules" | "triggers" | "packages" | "packages-public" | "all" + if ddoc.startsWith("whisks") || ddoc.startsWith("all-whisks") => + val sort = if (descending) Sorts.descending(UPDATED) else Sorts.ascending(UPDATED) + Some(sort) + case _ => throw UnsupportedView(s"$ddoc/$view") + } + } + + private def getEntityType(ddoc: String, view: String): String = view match { + case "actions" => "action" + case "rules" => "rule" + case "triggers" => "trigger" + case "packages" | "packages-public" => "package" + case _ => throw UnsupportedView(s"$ddoc/$view") + } +} +private object SubjectViewMapper extends MongoDBViewMapper { + private val BLOCKED = "blocked" + private val SUBJECT = "subject" + private val UUID = "uuid" + private val KEY = "key" + private val NS_NAME = "namespaces.name" + private val NS_UUID = "namespaces.uuid" + private val NS_KEY = "namespaces.key" + private val CONCURRENT_INVOCATIONS = "concurrentInvocations" + private val INVOCATIONS_PERMINUTE = "invocationsPerMinute" + override val indexes: List[Document] = + List(Document(s"$NS_NAME" -> -1), Document(s"$NS_UUID" -> -1, s"$NS_KEY" -> -1)) + + override def filter(ddoc: String, view: String, startKey: List[Any], endKey: List[Any]): Bson = { + require(startKey == endKey, s"startKey: $startKey and endKey: $endKey must be same for $ddoc/$view") + (ddoc, view) match { + case (s, "identities") if s.startsWith("subjects") => + filterForMatchingSubjectOrNamespace(ddoc, view, startKey, endKey) + case ("namespaceThrottlings", "blockedNamespaces") => + or(equal(BLOCKED, true), equal(CONCURRENT_INVOCATIONS, 0), equal(INVOCATIONS_PERMINUTE, 0)) + case _ => + throw UnsupportedView(s"$ddoc/$view") + } + } + + override def sort(ddoc: String, view: String, descending: Boolean): Option[Bson] = { + (ddoc, view) match { + case (s, "identities") if s.startsWith("subjects") => None + case ("namespaceThrottlings", "blockedNamespaces") => None + case _ => + throw UnsupportedView(s"$ddoc/$view") + } + } + + private def filterForMatchingSubjectOrNamespace(ddoc: String, + view: String, + startKey: List[Any], + endKey: List[Any]): Bson = { + val notBlocked = notEqual(BLOCKED, true) + startKey match { + case (ns: String) :: Nil => and(notBlocked, or(equal(SUBJECT, ns), equal(NS_NAME, ns))) + case (uuid: String) :: (key: String) :: Nil => + // @formatter:off + and( + notBlocked, + or( + and(equal(UUID, uuid), equal(KEY, key)), + and(equal(NS_UUID, uuid), equal(NS_KEY, key)) + )) + // @formatter:on + case _ => throw UnsupportedQueryKeys(s"$ddoc/$view -> ($startKey, $endKey)") + } + } + +} diff --git a/tests/src/test/resources/application.conf.j2 b/tests/src/test/resources/application.conf.j2 index cdae2bddaa7..04defe3b9fd 100644 --- a/tests/src/test/resources/application.conf.j2 +++ b/tests/src/test/resources/application.conf.j2 @@ -63,6 +63,11 @@ whisk { throughput = 400 } + mongodb { + uri = ${?MONGODB_CONNECT_STRING} + database = ${?MONGODB_DATABASE} + } + controller { protocol = {{ controller.protocol }} https { diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamGraphTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamGraphTests.scala new file mode 100644 index 00000000000..4b289094547 --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamGraphTests.scala @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database.mongodb + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, IOException, InputStream} + +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{Keep, Sink, Source, StreamConverters} +import akka.stream.testkit.TestSubscriber +import akka.util.ByteString +import common.WskActorSystem +import org.apache.commons.io.IOUtils +import org.junit.runner.RunWith +import org.mockito.ArgumentMatchers._ +import org.mockito.Mockito._ +import org.mongodb.scala.gridfs.helpers.AsyncStreamHelper +import org.scalatest.{FlatSpec, Matchers} +import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures} +import org.scalatest.junit.JUnitRunner +import org.scalatestplus.mockito.MockitoSugar + +import scala.util.Random + +@RunWith(classOf[JUnitRunner]) +class AsyncStreamGraphTests + extends FlatSpec + with Matchers + with ScalaFutures + with WskActorSystem + with MockitoSugar + with IntegrationPatience { + + implicit val mat = ActorMaterializer() + + behavior of "AsyncStreamSource" + + it should "read all bytes" in { + val bytes = randomBytes(4000) + val asyncStream = AsyncStreamHelper.toAsyncInputStream(bytes) + + val readStream = AsyncStreamSource(asyncStream, 42).runWith(StreamConverters.asInputStream()) + val readBytes = IOUtils.toByteArray(readStream) + + bytes shouldBe readBytes + } + + it should "close the stream when done" in { + val bytes = randomBytes(4000) + val inputStream = new ByteArrayInputStream(bytes) + val spiedStream = spy(inputStream) + val asyncStream = AsyncStreamHelper.toAsyncInputStream(spiedStream) + + val readStream = AsyncStreamSource(asyncStream, 42).runWith(StreamConverters.asInputStream()) + val readBytes = IOUtils.toByteArray(readStream) + + bytes shouldBe readBytes + verify(spiedStream).close() + } + + it should "onError with failure and return a failed IOResult when reading from failed stream" in { + val inputStream = mock[InputStream] + + val exception = new IOException("Boom") + doThrow(exception).when(inputStream).read(any()) + val asyncStream = AsyncStreamHelper.toAsyncInputStream(inputStream) + + val (ioResult, p) = AsyncStreamSource(asyncStream).toMat(Sink.asPublisher(false))(Keep.both).run() + val c = TestSubscriber.manualProbe[ByteString]() + p.subscribe(c) + + val sub = c.expectSubscription() + sub.request(1) + + val error = c.expectError() + error.getCause should be theSameInstanceAs exception + + ioResult.futureValue.status.isFailure shouldBe true + } + + behavior of "AsyncStreamSink" + + it should "write all bytes" in { + val bytes = randomBytes(4000) + val source = StreamConverters.fromInputStream(() => new ByteArrayInputStream(bytes), 42) + + val os = new ByteArrayOutputStream() + val asyncStream = AsyncStreamHelper.toAsyncOutputStream(os) + + val sink = AsyncStreamSink(asyncStream) + val ioResult = source.toMat(sink)(Keep.right).run() + + ioResult.futureValue.count shouldBe bytes.length + + val writtenBytes = os.toByteArray + writtenBytes shouldBe bytes + } + + it should "close the stream when done" in { + val bytes = randomBytes(4000) + val source = StreamConverters.fromInputStream(() => new ByteArrayInputStream(bytes), 42) + + val outputStream = new CloseRecordingStream() + val asyncStream = AsyncStreamHelper.toAsyncOutputStream(outputStream) + + val sink = AsyncStreamSink(asyncStream) + val ioResult = source.toMat(sink)(Keep.right).run() + + ioResult.futureValue.count shouldBe 4000 + outputStream.toByteArray shouldBe bytes + outputStream.closed shouldBe true + } + + it should "onError with failure and return a failed IOResult when writing to failed stream" in { + val os = new ByteArrayOutputStream() + val asyncStream = AsyncStreamHelper.toAsyncOutputStream(os) + + val sink = AsyncStreamSink(asyncStream) + val ioResult = Source(1 to 10) + .map { n ⇒ + if (n == 7) throw new Error("bees!") + n + } + .map(ByteString(_)) + .runWith(sink) + ioResult.futureValue.status.isFailure shouldBe true + } + + private def randomBytes(size: Int): Array[Byte] = { + val arr = new Array[Byte](size) + Random.nextBytes(arr) + arr + } + + private class CloseRecordingStream extends ByteArrayOutputStream { + var closed: Boolean = _ + override def close() = { super.close(); closed = true } + } +} diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreTests.scala new file mode 100644 index 00000000000..24c62c10d3e --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreTests.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database.mongodb + +import org.apache.openwhisk.core.database.test.behavior.ArtifactStoreBehavior +import org.junit.runner.RunWith +import org.scalatest.FlatSpec +import org.scalatest.junit.JUnitRunner + +@RunWith(classOf[JUnitRunner]) +class MongoDBArtifactStoreTests extends FlatSpec with MongoDBStoreBehaviorBase with ArtifactStoreBehavior {} diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAttachmentStoreTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAttachmentStoreTests.scala new file mode 100644 index 00000000000..31eb5932c37 --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAttachmentStoreTests.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database.mongodb + +import org.apache.openwhisk.core.database.DocumentSerializer +import org.apache.openwhisk.core.database.memory.MemoryAttachmentStoreProvider +import org.apache.openwhisk.core.database.test.behavior.ArtifactStoreAttachmentBehaviors +import org.junit.runner.RunWith +import org.scalatest.FlatSpec +import org.scalatest.junit.JUnitRunner + +import scala.reflect.ClassTag + +@RunWith(classOf[JUnitRunner]) +class MongoDBAttachmentStoreTests extends FlatSpec with MongoDBStoreBehaviorBase with ArtifactStoreAttachmentBehaviors { + override protected def getAttachmentStore[D <: DocumentSerializer: ClassTag]() = + Some(MemoryAttachmentStoreProvider.makeStore[D]()) +} diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBStoreBehaviorBase.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBStoreBehaviorBase.scala new file mode 100644 index 00000000000..2dde7f163d7 --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBStoreBehaviorBase.scala @@ -0,0 +1,45 @@ +package org.apache.openwhisk.core.database.mongodb + +import org.apache.openwhisk.core.ConfigKeys +import org.apache.openwhisk.core.database.test.behavior.ArtifactStoreBehaviorBase +import org.apache.openwhisk.core.database.{ArtifactStore, AttachmentStore, DocumentSerializer} +import org.apache.openwhisk.core.entity._ +import org.scalatest.FlatSpec +import pureconfig.loadConfigOrThrow +import pureconfig.generic.auto._ + +import scala.reflect.{classTag, ClassTag} +import scala.util.Try + +trait MongoDBStoreBehaviorBase extends FlatSpec with ArtifactStoreBehaviorBase { + override def storeType = "MongoDB" + + override lazy val storeAvailableCheck: Try[Any] = storeConfigTry + + val storeConfigTry = Try { loadConfigOrThrow[MongoDBConfig](ConfigKeys.mongodb) } + + override lazy val authStore = { + implicit val docReader: DocumentReader = WhiskDocumentReader + MongoDBArtifactStoreProvider.makeArtifactStore[WhiskAuth](storeConfigTry.get, getAttachmentStore[WhiskAuth]()) + } + + override lazy val entityStore = + MongoDBArtifactStoreProvider.makeArtifactStore[WhiskEntity](storeConfigTry.get, getAttachmentStore[WhiskEntity]())( + classTag[WhiskEntity], + WhiskEntityJsonFormat, + WhiskDocumentReader, + actorSystem, + logging, + materializer) + + override lazy val activationStore = { + implicit val docReader: DocumentReader = WhiskDocumentReader + MongoDBArtifactStoreProvider + .makeArtifactStore[WhiskActivation](storeConfigTry.get, getAttachmentStore[WhiskActivation]()) + } + + override protected def getAttachmentStore(store: ArtifactStore[_]) = + store.asInstanceOf[MongoDBArtifactStore[_]].attachmentStore + + protected def getAttachmentStore[D <: DocumentSerializer: ClassTag](): Option[AttachmentStore] = None +} diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBViewMapperTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBViewMapperTests.scala new file mode 100644 index 00000000000..44b121b5b16 --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBViewMapperTests.scala @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database.mongodb + +import org.apache.openwhisk.core.database.{UnsupportedQueryKeys, UnsupportedView} +import org.apache.openwhisk.core.entity.WhiskQueries.TOP +import org.bson.conversions.Bson +import org.junit.runner.RunWith +import org.mongodb.scala.MongoClient +import org.mongodb.scala.bson.BsonDocument +import org.mongodb.scala.bson.collection.immutable.Document +import org.mongodb.scala.model.Filters.{equal => meq, _} +import org.mongodb.scala.model.Sorts +import org.scalatest.{FlatSpec, Matchers, OptionValues} +import org.scalatest.junit.JUnitRunner + +@RunWith(classOf[JUnitRunner]) +class MongoDBViewMapperTests extends FlatSpec with Matchers with OptionValues { + implicit class RichBson(val b: Bson) { + def toDoc: BsonDocument = b.toBsonDocument(classOf[Document], MongoClient.DEFAULT_CODEC_REGISTRY) + } + + behavior of "ActivationViewMapper filter" + + it should "match all activations in namespace" in { + ActivationViewMapper.filter("whisks.v2.1.0", "activations", List("ns1"), List("ns1", TOP)).toDoc shouldBe + meq("namespace", "ns1").toDoc + ActivationViewMapper.filter("whisks-filters.v2.1.0", "activations", List("ns1"), List("ns1", TOP)).toDoc shouldBe + meq("_computed.nspath", "ns1").toDoc + } + + it should "match all activations in namespace since zero" in { + ActivationViewMapper.filter("whisks.v2.1.0", "activations", List("ns1", 0), List("ns1", TOP, TOP)).toDoc shouldBe + and(meq("namespace", "ns1"), gte("start", 0)).toDoc + + ActivationViewMapper + .filter("whisks-filters.v2.1.0", "activations", List("ns1", 0), List("ns1", TOP, TOP)) + .toDoc shouldBe + and(meq("_computed.nspath", "ns1"), gte("start", 0)).toDoc + } + + it should "match all activations in namespace since some value" in { + ActivationViewMapper.filter("whisks.v2.1.0", "activations", List("ns1", 42), List("ns1", TOP, TOP)).toDoc shouldBe + and(meq("namespace", "ns1"), gte("start", 42)).toDoc + + ActivationViewMapper + .filter("whisks-filters.v2.1.0", "activations", List("ns1", 42), List("ns1", TOP, TOP)) + .toDoc shouldBe + and(meq("_computed.nspath", "ns1"), gte("start", 42)).toDoc + } + + it should "match all activations in namespace between 2 instants" in { + ActivationViewMapper.filter("whisks.v2.1.0", "activations", List("ns1", 42), List("ns1", 314, TOP)).toDoc shouldBe + and(meq("namespace", "ns1"), gte("start", 42), lte("start", 314)).toDoc + + ActivationViewMapper + .filter("whisks-filters.v2.1.0", "activations", List("ns1", 42), List("ns1", 314, TOP)) + .toDoc shouldBe + and(meq("_computed.nspath", "ns1"), gte("start", 42), lte("start", 314)).toDoc + } + + it should "throw UnsupportedQueryKeys for unknown keys" in { + intercept[UnsupportedQueryKeys] { + ActivationViewMapper.filter("whisks.v2.1.0", "activations", List("ns1"), List("ns1", "foo")) + } + } + + it should "throw UnsupportedView exception for unknown views" in { + intercept[UnsupportedView] { + ActivationViewMapper.filter("whisks.v2.1.0", "activation-foo", List("ns1"), List("ns1", TOP)) + } + } + + behavior of "ActivationViewMapper sort" + + it should "sort descending" in { + ActivationViewMapper.sort("whisks-filters.v2.1.0", "activations", descending = true).value.toDoc shouldBe + Sorts.descending("_computed.nspath", "start").toDoc + ActivationViewMapper.sort("whisks.v2.1.0", "activations", descending = true).value.toDoc shouldBe + Sorts.descending("namespace", "start").toDoc + } + + it should "sort ascending" in { + ActivationViewMapper.sort("whisks-filters.v2.1.0", "activations", descending = false).value.toDoc shouldBe + Sorts.ascending("_computed.nspath", "start").toDoc + ActivationViewMapper.sort("whisks.v2.1.0", "activations", descending = false).value.toDoc shouldBe + Sorts.ascending("namespace", "start").toDoc + } + + it should "throw UnsupportedView" in { + intercept[UnsupportedView] { + ActivationViewMapper.sort("whisks.v2.1.0", "activation-foo", descending = true) + } + } + + behavior of "WhisksViewMapper filter" + + val whiskTypes = Seq( + ("actions", "action"), + ("packages", "package"), + ("packages-public", "package"), + ("rules", "rule"), + ("triggers", "trigger")) + + it should "match entities of specific type in namespace" in { + whiskTypes.foreach { + case (view, entityType) => + var filters = + or( + and(meq("entityType", entityType), meq("namespace", "ns1")), + and(meq("entityType", entityType), meq("_computed.rootns", "ns1"))) + if (view == "packages-public") + filters = getPublicPackageFilter(filters) + WhisksViewMapper.filter("whisks.v2.1.0", view, List("ns1"), List("ns1", TOP)).toDoc shouldBe filters.toDoc + } + } + + it should "match entities of specific type in namespace and updated since" in { + whiskTypes.foreach { + case (view, entityType) => + var filters = + or( + and(meq("entityType", entityType), meq("namespace", "ns1"), gte("updated", 42)), + and(meq("entityType", entityType), meq("_computed.rootns", "ns1"), gte("updated", 42))) + if (view == "packages-public") + filters = getPublicPackageFilter(filters) + WhisksViewMapper + .filter("whisks.v2.1.0", view, List("ns1", 42), List("ns1", TOP, TOP)) + .toDoc shouldBe filters.toDoc + } + } + + it should "match all entities of specific type in namespace and between" in { + whiskTypes.foreach { + case (view, entityType) => + var filters = + or( + and(meq("entityType", entityType), meq("namespace", "ns1"), gte("updated", 42), lte("updated", 314)), + and(meq("entityType", entityType), meq("_computed.rootns", "ns1"), gte("updated", 42), lte("updated", 314))) + if (view == "packages-public") + filters = getPublicPackageFilter(filters) + WhisksViewMapper + .filter("whisks.v2.1.0", view, List("ns1", 42), List("ns1", 314, TOP)) + .toDoc shouldBe filters.toDoc + } + } + + it should "match all entities in namespace" in { + WhisksViewMapper.filter("whisks.v2.1.0", "all", List("ns1"), List("ns1", TOP)).toDoc shouldBe + and(exists("entityType"), meq("_computed.rootns", "ns1")).toDoc + } + + it should "throw UnsupportedQueryKeys for unknown keys" in { + intercept[UnsupportedQueryKeys] { + WhisksViewMapper.filter("whisks.v2.1.0", "actions", List("ns1"), List("ns1", "foo")) + } + intercept[UnsupportedQueryKeys] { + WhisksViewMapper.filter("whisks.v2.1.0", "all", List("ns1"), List("ns1", "foo")) + } + } + + it should "throw UnsupportedView exception for unknown views" in { + intercept[UnsupportedView] { + WhisksViewMapper.filter("whisks.v2.1.0", "actions-foo", List("ns1"), List("ns1", TOP)) + } + } + + behavior of "WhisksViewMapper sort" + + it should "sort descending" in { + whiskTypes.foreach { + case (view, _) => + WhisksViewMapper.sort("whisks.v2.1.0", view, descending = true).value.toDoc shouldBe + Sorts.descending("updated").toDoc + } + } + + it should "sort ascending" in { + whiskTypes.foreach { + case (view, _) => + WhisksViewMapper.sort("whisks.v2.1.0", view, descending = false).value.toDoc shouldBe + Sorts.ascending("updated").toDoc + } + } + + it should "throw UnsupportedView" in { + intercept[UnsupportedView] { + WhisksViewMapper.sort("whisks.v2.1.0", "action-foo", descending = true) + } + } + + behavior of "SubjectViewMapper filter" + + it should "match by subject or namespace" in { + SubjectViewMapper.filter("subjects", "identities", List("foo"), List("foo")).toDoc shouldBe + and(notEqual("blocked", true), or(meq("subject", "foo"), meq("namespaces.name", "foo"))).toDoc + } + + it should "match by uuid and key" in { + SubjectViewMapper.filter("subjects", "identities", List("u1", "k1"), List("u1", "k1")).toDoc shouldBe + and( + notEqual("blocked", true), + or(and(meq("uuid", "u1"), meq("key", "k1")), and(meq("namespaces.uuid", "u1"), meq("namespaces.key", "k1")))).toDoc + } + + it should "match by blocked or invocationsPerMinute or concurrentInvocations" in { + SubjectViewMapper + .filter("namespaceThrottlings", "blockedNamespaces", List("u1", "k1"), List("u1", "k1")) + .toDoc shouldBe + or(meq("blocked", true), meq("concurrentInvocations", 0), meq("invocationsPerMinute", 0)).toDoc + } + + it should "throw exception when keys are not same" in { + intercept[IllegalArgumentException] { + SubjectViewMapper.filter("subjects", "identities", List("u1", "k1"), List("u1", "k2")) + } + } + + it should "throw UnsupportedQueryKeys exception when keys are not know" in { + intercept[UnsupportedQueryKeys] { + SubjectViewMapper.filter("subjects", "identities", List("u1", "k1", "foo"), List("u1", "k1", "foo")) + } + } + + it should "throw UnsupportedView exception when view is not known" in { + intercept[UnsupportedView] { + SubjectViewMapper.filter("subjects", "identities-foo", List("u1", "k1", "foo"), List("u1", "k1", "foo")) + } + } + + behavior of "SubjectViewMapper sort" + + it should "sort none" in { + SubjectViewMapper.sort("subjects", "identities", descending = true) shouldBe None + SubjectViewMapper.sort("namespaceThrottlings", "blockedNamespaces", descending = true) shouldBe None + } + + private def getPublicPackageFilter(filters: Bson): Bson = { + and(meq("binding", Map.empty), meq("publish", true), filters) + } +} diff --git a/tools/build/README.md b/tools/build/README.md index 0282482ecc9..ce5590f58a6 100644 --- a/tools/build/README.md +++ b/tools/build/README.md @@ -31,6 +31,7 @@ The script is called `redo` because for most development, one will want to "redo - initialize environment and `docker-machine` (for mac): `redo setup prereq` - start CouchDB container and initialize DB with system and guest keys: `redo couchdb initdb` - start ElasticSearch container to store activations: `redo elasticsearch` +- start MongoDB container to as database backend: `redo mongodb` - build and deploy system: `redo deploy` - run tests: `redo props tests` diff --git a/tools/build/redo b/tools/build/redo index 9ffa04ead7f..b149c1aa3fa 100755 --- a/tools/build/redo +++ b/tools/build/redo @@ -241,6 +241,13 @@ Components = [ 'deploy elasticsearch', modes = 'clean'), + makeComponent('mongodb', + 'deploy mongodb', + modes = 'clean'), + + makeComponent('initMongoDB', + 'initialize mongodb with guest/system keys'), + makeComponent('build', 'build system', yaml = False, From 5caca52ceef5029eba8c8af5e44adb3c293de8c3 Mon Sep 17 00:00:00 2001 From: "jiang.pengcheng" Date: Fri, 4 Sep 2020 18:19:25 +0800 Subject: [PATCH 02/10] Upgrade mongo-scala to 2.7.0 --- common/scala/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/scala/build.gradle b/common/scala/build.gradle index 79a36251a24..135ac8e415e 100644 --- a/common/scala/build.gradle +++ b/common/scala/build.gradle @@ -91,7 +91,7 @@ dependencies { compile "com.sksamuel.elastic4s:elastic4s-http_${gradle.scala.depVersion}:6.7.4" //for mongo - compile "org.mongodb.scala:mongo-scala-driver_${gradle.scala.depVersion}:2.6.0" + compile "org.mongodb.scala:mongo-scala-driver_${gradle.scala.depVersion}:2.7.0" compile ("com.lightbend.akka:akka-stream-alpakka-s3_${gradle.scala.depVersion}:1.1.2") { exclude group: 'org.apache.httpcomponents' //Not used as alpakka uses akka-http From 3e0a8793f0505e70fa00684cda476e607b83c011 Mon Sep 17 00:00:00 2001 From: "jiang.pengcheng" Date: Mon, 7 Sep 2020 08:23:28 +0800 Subject: [PATCH 03/10] Fix test --- .../core/database/mongodb/MongoDBArtifactStore.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala index 07f7a443c08..4ad71f97c41 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala @@ -595,9 +595,7 @@ class MongoDBArtifactStore[DocumentAbstraction <: DocumentSerializer](client: Mo case Right(ct) => ct case Left(_) => ContentTypes.NoContentType //Should not happen } - attachmentHandler( - doc, - Attached(getAttachmentName(name), contentType, Some(length.longValue()), Some(digest))) + attachmentHandler(doc, Attached(getAttachmentName(name), contentType, Some(length.longValue), Some(digest))) case x => throw DeserializationException("Attachment json does not have required fields" + x) } From 8290ec7b05054d14d0932502a57a12ca11db25a7 Mon Sep 17 00:00:00 2001 From: "jiang.pengcheng" Date: Mon, 7 Sep 2020 09:12:37 +0800 Subject: [PATCH 04/10] Add license headers --- ansible/initMongoDB.yml | 18 ++++++++++++++++-- ansible/mongodb.yml | 18 ++++++++++++++++-- ansible/roles/mongodb/tasks/clean.yml | 18 ++++++++++++++++-- ansible/roles/mongodb/tasks/deploy.yml | 18 ++++++++++++++++-- ansible/roles/mongodb/tasks/main.yml | 18 ++++++++++++++++-- .../mongodb/MongoDBStoreBehaviorBase.scala | 17 +++++++++++++++++ 6 files changed, 97 insertions(+), 10 deletions(-) diff --git a/ansible/initMongoDB.yml b/ansible/initMongoDB.yml index c790996f5fb..58672a64c6e 100644 --- a/ansible/initMongoDB.yml +++ b/ansible/initMongoDB.yml @@ -1,5 +1,19 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more contributor -# license agreements; and to You under the Apache License, Version 2.0. +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --- # This playbook will initialize the immortal DBs in the database account. # This step is usually done only once per deployment. diff --git a/ansible/mongodb.yml b/ansible/mongodb.yml index 46e94828c6c..2a0b4f6c377 100644 --- a/ansible/mongodb.yml +++ b/ansible/mongodb.yml @@ -1,5 +1,19 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more contributor -# license agreements; and to You under the Apache License, Version 2.0. +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --- # This playbook deploys a MongoDB for Openwhisk. diff --git a/ansible/roles/mongodb/tasks/clean.yml b/ansible/roles/mongodb/tasks/clean.yml index b095a7ce1fa..642f18003de 100644 --- a/ansible/roles/mongodb/tasks/clean.yml +++ b/ansible/roles/mongodb/tasks/clean.yml @@ -1,5 +1,19 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more contributor -# license agreements; and to You under the Apache License, Version 2.0. +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --- # Remove MongoDB server diff --git a/ansible/roles/mongodb/tasks/deploy.yml b/ansible/roles/mongodb/tasks/deploy.yml index 4853b480ba0..14bf146e7d1 100644 --- a/ansible/roles/mongodb/tasks/deploy.yml +++ b/ansible/roles/mongodb/tasks/deploy.yml @@ -1,5 +1,19 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more contributor -# license agreements; and to You under the Apache License, Version 2.0. +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --- # This role will run a MongoDB server on the db group, this is only for test, please use # shared cluster for production env diff --git a/ansible/roles/mongodb/tasks/main.yml b/ansible/roles/mongodb/tasks/main.yml index e402c082ed2..2d27a9c1995 100644 --- a/ansible/roles/mongodb/tasks/main.yml +++ b/ansible/roles/mongodb/tasks/main.yml @@ -1,5 +1,19 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more contributor -# license agreements; and to You under the Apache License, Version 2.0. +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --- # This role will deploy a database server. Use the role if you want to use CouchCB locally. # In deploy mode it will start the MongoDB container. diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBStoreBehaviorBase.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBStoreBehaviorBase.scala index 2dde7f163d7..d3dae92422e 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBStoreBehaviorBase.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBStoreBehaviorBase.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.openwhisk.core.database.mongodb import org.apache.openwhisk.core.ConfigKeys From 9220ade37df3be8fbb9f77f4fe658204e38adf06 Mon Sep 17 00:00:00 2001 From: "jiang.pengcheng" Date: Mon, 7 Sep 2020 09:53:18 +0800 Subject: [PATCH 05/10] Add default value for mongodb_connect_string --- ansible/group_vars/all | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ansible/group_vars/all b/ansible/group_vars/all index daabccdd4f1..3a23efe4477 100644 --- a/ansible/group_vars/all +++ b/ansible/group_vars/all @@ -297,7 +297,7 @@ db: username: "{{ elastic_username | default('admin') }}" password: "{{ elastic_password | default('admin') }}" mongodb: - connect_string: "{{ mongodb_connect_string }}" + connect_string: "{{ mongodb_connect_string | default('mongodb://172.17.0.1:27017') }}" database: "{{ mongodb_database | default('whisks') }}" data_volume: "{{ mongodb_data_volume | default('mongo-data') }}" From 3674e5359fe866a33040af2c45d96a8c3ddd641d Mon Sep 17 00:00:00 2001 From: "jiang.pengcheng" Date: Tue, 15 Sep 2020 09:42:48 +0800 Subject: [PATCH 06/10] Rename graph stage classes used for mongodb gridfs --- .../mongodb/MongoDBArtifactStore.scala | 4 ++-- ...Sink.scala => MongoDBAsyncStreamSink.scala} | 6 +++--- ...ce.scala => MongoDBAsyncStreamSource.scala} | 6 +++--- ...cala => MongoDBAsyncStreamGraphTests.scala} | 18 +++++++++--------- 4 files changed, 17 insertions(+), 17 deletions(-) rename common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/{AsyncStreamSink.scala => MongoDBAsyncStreamSink.scala} (96%) rename common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/{AsyncStreamSource.scala => MongoDBAsyncStreamSource.scala} (94%) rename tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/{AsyncStreamGraphTests.scala => MongoDBAsyncStreamGraphTests.scala} (88%) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala index 4ad71f97c41..32c0983edf0 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala @@ -434,7 +434,7 @@ class MongoDBArtifactStore[DocumentAbstraction <: DocumentSerializer](client: Mo val option = new GridFSUploadOptions().metadata(document) val uploadStream = gridFSBucket.openUploadStream(BsonString(s"$id/$name"), name, option) - val sink = AsyncStreamSink(uploadStream) + val sink = MongoDBAsyncStreamSink(uploadStream) val f = docStream .runWith(combinedSink(sink)) @@ -498,7 +498,7 @@ class MongoDBArtifactStore[DocumentAbstraction <: DocumentSerializer](client: Mo val downloadStream = gridFSBucket.openDownloadStream(BsonString(s"${doc.id.id}/$attachmentName")) def readStream(file: GridFSFile) = { - val source = AsyncStreamSource(downloadStream) + val source = MongoDBAsyncStreamSource(downloadStream) source .runWith(sink) .map { result => diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamSink.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSink.scala similarity index 96% rename from common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamSink.scala rename to common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSink.scala index 4387d23f786..6a715589a98 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamSink.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSink.scala @@ -30,7 +30,7 @@ import org.mongodb.scala.gridfs.{AsyncOutputStream} import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.{Failure, Success, Try} -class AsyncStreamSink(stream: AsyncOutputStream)(implicit ec: ExecutionContext) +class MongoDBAsyncStreamSink(stream: AsyncOutputStream)(implicit ec: ExecutionContext) extends GraphStageWithMaterializedValue[SinkShape[ByteString], Future[IOResult]] { val in: Inlet[ByteString] = Inlet("AsyncStream.in") @@ -115,8 +115,8 @@ class AsyncStreamSink(stream: AsyncOutputStream)(implicit ec: ExecutionContext) } } -object AsyncStreamSink { +object MongoDBAsyncStreamSink { def apply(stream: AsyncOutputStream)(implicit ec: ExecutionContext): Sink[ByteString, Future[IOResult]] = { - Sink.fromGraph(new AsyncStreamSink(stream)) + Sink.fromGraph(new MongoDBAsyncStreamSink(stream)) } } diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamSource.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSource.scala similarity index 94% rename from common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamSource.scala rename to common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSource.scala index d5732946296..1a7fad9f9e4 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamSource.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSource.scala @@ -40,7 +40,7 @@ import scala.util.Success import scala.util.Try import scala.util.Failure -class AsyncStreamSource(stream: AsyncInputStream, chunkSize: Int)(implicit ec: ExecutionContext) +class MongoDBAsyncStreamSource(stream: AsyncInputStream, chunkSize: Int)(implicit ec: ExecutionContext) extends GraphStageWithMaterializedValue[SourceShape[ByteString], Future[IOResult]] { require(chunkSize > 0, "chunkSize must be greater than 0") val out: Outlet[ByteString] = Outlet("AsyncStream.out") @@ -96,9 +96,9 @@ class AsyncStreamSource(stream: AsyncInputStream, chunkSize: Int)(implicit ec: E } } -object AsyncStreamSource { +object MongoDBAsyncStreamSource { def apply(stream: AsyncInputStream, chunkSize: Int = 512 * 1024)( implicit ec: ExecutionContext): Source[ByteString, Future[IOResult]] = { - Source.fromGraph(new AsyncStreamSource(stream, chunkSize)) + Source.fromGraph(new MongoDBAsyncStreamSource(stream, chunkSize)) } } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamGraphTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamGraphTests.scala similarity index 88% rename from tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamGraphTests.scala rename to tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamGraphTests.scala index 4b289094547..90770f30ded 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/AsyncStreamGraphTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamGraphTests.scala @@ -37,7 +37,7 @@ import org.scalatestplus.mockito.MockitoSugar import scala.util.Random @RunWith(classOf[JUnitRunner]) -class AsyncStreamGraphTests +class MongoDBAsyncStreamGraphTests extends FlatSpec with Matchers with ScalaFutures @@ -47,13 +47,13 @@ class AsyncStreamGraphTests implicit val mat = ActorMaterializer() - behavior of "AsyncStreamSource" + behavior of "MongoDBAsyncStreamSource" it should "read all bytes" in { val bytes = randomBytes(4000) val asyncStream = AsyncStreamHelper.toAsyncInputStream(bytes) - val readStream = AsyncStreamSource(asyncStream, 42).runWith(StreamConverters.asInputStream()) + val readStream = MongoDBAsyncStreamSource(asyncStream, 42).runWith(StreamConverters.asInputStream()) val readBytes = IOUtils.toByteArray(readStream) bytes shouldBe readBytes @@ -65,7 +65,7 @@ class AsyncStreamGraphTests val spiedStream = spy(inputStream) val asyncStream = AsyncStreamHelper.toAsyncInputStream(spiedStream) - val readStream = AsyncStreamSource(asyncStream, 42).runWith(StreamConverters.asInputStream()) + val readStream = MongoDBAsyncStreamSource(asyncStream, 42).runWith(StreamConverters.asInputStream()) val readBytes = IOUtils.toByteArray(readStream) bytes shouldBe readBytes @@ -79,7 +79,7 @@ class AsyncStreamGraphTests doThrow(exception).when(inputStream).read(any()) val asyncStream = AsyncStreamHelper.toAsyncInputStream(inputStream) - val (ioResult, p) = AsyncStreamSource(asyncStream).toMat(Sink.asPublisher(false))(Keep.both).run() + val (ioResult, p) = MongoDBAsyncStreamSource(asyncStream).toMat(Sink.asPublisher(false))(Keep.both).run() val c = TestSubscriber.manualProbe[ByteString]() p.subscribe(c) @@ -92,7 +92,7 @@ class AsyncStreamGraphTests ioResult.futureValue.status.isFailure shouldBe true } - behavior of "AsyncStreamSink" + behavior of "MongoDBAsyncStreamSink" it should "write all bytes" in { val bytes = randomBytes(4000) @@ -101,7 +101,7 @@ class AsyncStreamGraphTests val os = new ByteArrayOutputStream() val asyncStream = AsyncStreamHelper.toAsyncOutputStream(os) - val sink = AsyncStreamSink(asyncStream) + val sink = MongoDBAsyncStreamSink(asyncStream) val ioResult = source.toMat(sink)(Keep.right).run() ioResult.futureValue.count shouldBe bytes.length @@ -117,7 +117,7 @@ class AsyncStreamGraphTests val outputStream = new CloseRecordingStream() val asyncStream = AsyncStreamHelper.toAsyncOutputStream(outputStream) - val sink = AsyncStreamSink(asyncStream) + val sink = MongoDBAsyncStreamSink(asyncStream) val ioResult = source.toMat(sink)(Keep.right).run() ioResult.futureValue.count shouldBe 4000 @@ -129,7 +129,7 @@ class AsyncStreamGraphTests val os = new ByteArrayOutputStream() val asyncStream = AsyncStreamHelper.toAsyncOutputStream(os) - val sink = AsyncStreamSink(asyncStream) + val sink = MongoDBAsyncStreamSink(asyncStream) val ioResult = Source(1 to 10) .map { n ⇒ if (n == 7) throw new Error("bees!") From 68120f2170dc9f9b53361ab0cb51c4e9458dbe29 Mon Sep 17 00:00:00 2001 From: "jiang.pengcheng" Date: Wed, 7 Apr 2021 17:04:40 +0800 Subject: [PATCH 07/10] Update readme --- ansible/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ansible/README.md b/ansible/README.md index fd5bd4722a9..71b95ea270f 100644 --- a/ansible/README.md +++ b/ansible/README.md @@ -198,9 +198,9 @@ ansible-playbook -i environments/$ENVIRONMENT routemgmt.yml ### Deploying Using MongoDB -You can use MongoDB as the database backend, and there is an ansible task to deploy a single node MongoDB server for testing and developing +You can choose MongoDB instead of CouchDB as the database backend to store entities and activations. -- Deploy mongodb server(Optional) +- Deploy a mongodb server(Optional, for test and develop only, use an external MongoDB server in production) ``` ansible-playbook -i environments/ mongodb.yml -e mongodb_data_volume="/tmp/mongo-data" From 747de8fd7aec7f75c0ee8192f5c325adfc9902de Mon Sep 17 00:00:00 2001 From: "jiang.pengcheng" Date: Fri, 16 Apr 2021 14:52:28 +0800 Subject: [PATCH 08/10] Update based on comments --- ansible/README.md | 2 +- ansible/group_vars/all | 4 ---- ansible/roles/controller/tasks/deploy.yml | 12 ++++++++++-- ansible/roles/invoker/tasks/deploy.yml | 12 ++++++++++-- ansible/roles/mongodb/tasks/clean.yml | 5 ----- .../core/database/mongodb/MongoDBArtifactStore.scala | 12 ++++++------ .../mongodb/MongoDBArtifactStoreProvider.scala | 2 +- 7 files changed, 28 insertions(+), 21 deletions(-) diff --git a/ansible/README.md b/ansible/README.md index 71b95ea270f..849c0879660 100644 --- a/ansible/README.md +++ b/ansible/README.md @@ -198,7 +198,7 @@ ansible-playbook -i environments/$ENVIRONMENT routemgmt.yml ### Deploying Using MongoDB -You can choose MongoDB instead of CouchDB as the database backend to store entities and activations. +You can choose MongoDB instead of CouchDB as the database backend to store entities. - Deploy a mongodb server(Optional, for test and develop only, use an external MongoDB server in production) diff --git a/ansible/group_vars/all b/ansible/group_vars/all index 3a23efe4477..c2ef44bfcda 100644 --- a/ansible/group_vars/all +++ b/ansible/group_vars/all @@ -329,10 +329,6 @@ elasticsearch_connect_string: "{% set ret = [] %}\ {{ ret | join(',') }}" mongodb: version: 4.4.0 - commonEnv: - CONFIG_whisk_mongodb_uri: "{{ db.mongodb.connect_string }}" - CONFIG_whisk_mongodb_database: "{{ db.mongodb.database }}" - CONFIG_whisk_spi_ArtifactStoreProvider: "org.apache.openwhisk.core.database.mongodb.MongoDBArtifactStoreProvider" docker: # The user to install docker for. Defaults to the ansible user if not set. This will be the user who is able to run diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml index 7b4d5de29ef..642368fe4c5 100644 --- a/ansible/roles/controller/tasks/deploy.yml +++ b/ansible/roles/controller/tasks/deploy.yml @@ -292,9 +292,17 @@ env: "{{ env | combine(elastic_env) }}" when: db.activation_store.backend == "ElasticSearch" -- name: merge mongodb env variables +- name: setup mongodb artifact store env set_fact: - env: "{{ env | combine(mongodb.commonEnv) }}" + mongodb_env: + "CONFIG_whisk_mongodb_uri": "{{ db.mongodb.connect_string }}" + "CONFIG_whisk_mongodb_database": "{{ db.mongodb.database }}" + "CONFIG_whisk_spi_ArtifactStoreProvider": "org.apache.openwhisk.core.database.mongodb.MongoDBArtifactStoreProvider" + when: db.backend == "MongoDB" + +- name: merge mongodb artifact store env + set_fact: + env: "{{ env | combine(mongodb_env) }}" when: db.backend == "MongoDB" - name: populate volumes for controller diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml index c7b4eec3b75..227ceb5af0f 100644 --- a/ansible/roles/invoker/tasks/deploy.yml +++ b/ansible/roles/invoker/tasks/deploy.yml @@ -307,9 +307,17 @@ env: "{{ env | combine(elastic_env) }}" when: db.activation_store.backend == "ElasticSearch" -- name: merge mongodb env variables +- name: setup mongodb artifact store env set_fact: - env: "{{ env | combine(mongodb.commonEnv) }}" + mongodb_env: + "CONFIG_whisk_mongodb_uri": "{{ db.mongodb.connect_string }}" + "CONFIG_whisk_mongodb_database": "{{ db.mongodb.database }}" + "CONFIG_whisk_spi_ArtifactStoreProvider": "org.apache.openwhisk.core.database.mongodb.MongoDBArtifactStoreProvider" + when: db.backend == "MongoDB" + +- name: merge mongodb artifact store env + set_fact: + env: "{{ env | combine(mongodb_env) }}" when: db.backend == "MongoDB" - name: include plugins diff --git a/ansible/roles/mongodb/tasks/clean.yml b/ansible/roles/mongodb/tasks/clean.yml index 642f18003de..248ee5563f1 100644 --- a/ansible/roles/mongodb/tasks/clean.yml +++ b/ansible/roles/mongodb/tasks/clean.yml @@ -22,8 +22,3 @@ name: mongodb state: absent keep_volumes: False - -- name: remove MongoDB data volume - docker_volume: - name: "{{ db.mongodb.data_volume }}" - state: absent diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala index 32c0983edf0..05921b1202e 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala @@ -17,8 +17,6 @@ package org.apache.openwhisk.core.database.mongodb -import java.security.MessageDigest - import akka.actor.ActorSystem import akka.event.Logging.ErrorLevel import akka.http.scaladsl.model._ @@ -81,7 +79,7 @@ class MongoDBArtifactStore[DocumentAbstraction <: DocumentSerializer](client: Mo val attachmentScheme: String = attachmentStore.map(_.scheme).getOrElse(mongodbScheme) private val database = client.getDatabase(dbName) - private val collection = getCollectionAndCreateIndexes + private val collection = getCollectionAndCreateIndexes() private val gridFSBucket = GridFSBucket(database, collName) private val jsonWriteSettings = JsonWriterSettings.builder().outputMode(JsonMode.RELAXED).build @@ -129,6 +127,7 @@ class MongoDBArtifactStore[DocumentAbstraction <: DocumentSerializer](client: Mo DocInfo(DocId(id), DocRevision(rev)) } .recover { + // E11000 means a duplicate key error case t: MongoException if t.getCode == 11000 => transid.finished(this, start, s"[PUT] '$dbName', document: '$docinfoStr'; conflict.") throw DocumentConflictException("conflict on 'put'") @@ -570,8 +569,9 @@ class MongoDBArtifactStore[DocumentAbstraction <: DocumentSerializer](client: Mo // calculate the revision manually, to be compatible with couchdb's _rev field private def revisionCalculate(doc: JsObject): (String, String) = { - val md: MessageDigest = MessageDigest.getInstance("MD5") - val new_rev = md.digest(doc.toString.getBytes()).map(0xFF & _).map { "%02x".format(_) }.foldLeft("") { _ + _ } + val md = StoreUtils.emptyDigest() + val new_rev = + md.digest(doc.toString.getBytes()).map(0xFF & _).map { "%02x".format(_) }.foldLeft("") { _ + _ }.take(32) doc.fields .get("_rev") .map { value => @@ -615,7 +615,7 @@ class MongoDBArtifactStore[DocumentAbstraction <: DocumentSerializer](client: Mo .getOrElse(name) } - private def getCollectionAndCreateIndexes: MongoCollection[Document] = { + private def getCollectionAndCreateIndexes(): MongoCollection[Document] = { val coll = database.getCollection(collName) // create indexes in specific collection if they do not exist coll.listIndexes().toFuture().map { idxes => diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreProvider.scala index 72fe371aceb..555b27484ef 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreProvider.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreProvider.scala @@ -32,7 +32,7 @@ import spray.json.RootJsonFormat import scala.reflect.ClassTag case class MongoDBConfig(uri: String, database: String) { - assume(Set(database, uri).forall(_.nonEmpty), "At least one expected property is missing") + assume(Set(database, uri).forall(_.trim.nonEmpty), "At least one expected property is missing") def collectionFor[D](implicit tag: ClassTag[D]) = tag.runtimeClass.getSimpleName.toLowerCase } From fb2aada7cdf4d0d223894a220092818d5819ebef Mon Sep 17 00:00:00 2001 From: "jiang.pengcheng" Date: Tue, 25 May 2021 17:03:31 +0800 Subject: [PATCH 09/10] Fix typo and update README --- ansible/README.md | 3 ++- .../openwhisk/core/database/mongodb/MongoDBArtifactStore.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/ansible/README.md b/ansible/README.md index 849c0879660..16e593a8621 100644 --- a/ansible/README.md +++ b/ansible/README.md @@ -200,7 +200,8 @@ ansible-playbook -i environments/$ENVIRONMENT routemgmt.yml You can choose MongoDB instead of CouchDB as the database backend to store entities. -- Deploy a mongodb server(Optional, for test and develop only, use an external MongoDB server in production) +- Deploy a mongodb server(Optional, for test and develop only, use an external MongoDB server in production). + You need to execute `pip install pymongo` first ``` ansible-playbook -i environments/ mongodb.yml -e mongodb_data_volume="/tmp/mongo-data" diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala index 05921b1202e..bab4a1f9e6c 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala @@ -106,7 +106,7 @@ class MongoDBArtifactStore[DocumentAbstraction <: DocumentSerializer](client: Mo val filters = if (rev.startsWith("1-")) { // for new document, we should get no matched document and insert new one - // if there is a matched document, that one with no _rev filed will be replaced + // if there is a matched document, that one with no _rev field will be replaced // if there is a document with the same id but has an _rev field, will return en E11000(conflict) error Filters.and(Filters.eq("_id", id), Filters.not(Filters.exists("_rev"))) } else { From d62917ddad6e7bbb78c073cd47f594b6ea80e6fd Mon Sep 17 00:00:00 2001 From: "jiang.pengcheng" Date: Tue, 25 May 2021 18:05:13 +0800 Subject: [PATCH 10/10] Rename db.backend to db.artifact_store.backend --- ansible/README.md | 2 +- ansible/group_vars/all | 3 ++- ansible/roles/controller/tasks/deploy.yml | 4 ++-- ansible/roles/invoker/tasks/deploy.yml | 4 ++-- ansible/tasks/db/checkDb.yml | 2 +- ansible/templates/whisk.conf.j2 | 2 +- 6 files changed, 9 insertions(+), 8 deletions(-) diff --git a/ansible/README.md b/ansible/README.md index 16e593a8621..7cc05f88b75 100644 --- a/ansible/README.md +++ b/ansible/README.md @@ -215,7 +215,7 @@ cd cd ansible ansible-playbook -i environments/ initMongodb.yml -e mongodb_connect_string="mongodb://172.17.0.1:27017" ansible-playbook -i environments/ apigateway.yml -e mongodb_connect_string="mongodb://172.17.0.1:27017" -ansible-playbook -i environments/ openwhisk.yml -e mongodb_connect_string="mongodb://172.17.0.1:27017" -e database_backend="MongoDB" +ansible-playbook -i environments/ openwhisk.yml -e mongodb_connect_string="mongodb://172.17.0.1:27017" -e db_artifact_backend="MongoDB" # installs a catalog of public packages and actions ansible-playbook -i environments/ postdeploy.yml diff --git a/ansible/group_vars/all b/ansible/group_vars/all index c2ef44bfcda..fe578efbf7d 100644 --- a/ansible/group_vars/all +++ b/ansible/group_vars/all @@ -256,7 +256,6 @@ db: port: "{{ db_port | default(lookup('ini', 'db_port section=db_creds file={{ playbook_dir }}/db_local.ini')) }}" host: "{{ db_host | default(lookup('ini', 'db_host section=db_creds file={{ playbook_dir }}/db_local.ini')) }}" persist_path: "{{ db_persist_path | default(false) }}" - backend: "{{ database_backend | default('CouchDB') }}" instances: "{{ groups['db'] | length }}" authkeys: - guest @@ -275,6 +274,8 @@ db: invoker: user: "{{ db_invoker_user | default(lookup('ini', 'db_username section=invoker file={{ playbook_dir }}/db_local.ini')) }}" pass: "{{ db_invoker_pass | default(lookup('ini', 'db_password section=invoker file={{ playbook_dir }}/db_local.ini')) }}" + artifact_store: + backend: "{{ db_artifact_backend | default('CouchDB') }}" activation_store: backend: "{{ db_activation_backend | default('CouchDB') }}" elasticsearch: diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml index 642368fe4c5..b4ac09bd094 100644 --- a/ansible/roles/controller/tasks/deploy.yml +++ b/ansible/roles/controller/tasks/deploy.yml @@ -298,12 +298,12 @@ "CONFIG_whisk_mongodb_uri": "{{ db.mongodb.connect_string }}" "CONFIG_whisk_mongodb_database": "{{ db.mongodb.database }}" "CONFIG_whisk_spi_ArtifactStoreProvider": "org.apache.openwhisk.core.database.mongodb.MongoDBArtifactStoreProvider" - when: db.backend == "MongoDB" + when: db.artifact_store.backend == "MongoDB" - name: merge mongodb artifact store env set_fact: env: "{{ env | combine(mongodb_env) }}" - when: db.backend == "MongoDB" + when: db.artifact_store.backend == "MongoDB" - name: populate volumes for controller set_fact: diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml index 227ceb5af0f..cf67f97acd6 100644 --- a/ansible/roles/invoker/tasks/deploy.yml +++ b/ansible/roles/invoker/tasks/deploy.yml @@ -313,12 +313,12 @@ "CONFIG_whisk_mongodb_uri": "{{ db.mongodb.connect_string }}" "CONFIG_whisk_mongodb_database": "{{ db.mongodb.database }}" "CONFIG_whisk_spi_ArtifactStoreProvider": "org.apache.openwhisk.core.database.mongodb.MongoDBArtifactStoreProvider" - when: db.backend == "MongoDB" + when: db.artifact_store.backend == "MongoDB" - name: merge mongodb artifact store env set_fact: env: "{{ env | combine(mongodb_env) }}" - when: db.backend == "MongoDB" + when: db.artifact_store.backend == "MongoDB" - name: include plugins include_tasks: "{{ inv_item }}.yml" diff --git a/ansible/tasks/db/checkDb.yml b/ansible/tasks/db/checkDb.yml index 922deca1d43..bb78a66a06e 100644 --- a/ansible/tasks/db/checkDb.yml +++ b/ansible/tasks/db/checkDb.yml @@ -28,7 +28,7 @@ user: "{{ dbUser }}" password: "{{ dbPass }}" force_basic_auth: yes - when: db.backend == "CouchDB" + when: db.artifact_store.backend == "CouchDB" # the collection in MongoDB doesn't need to be created in advance, so just skip it # - name: check if {{ dbName }} on MongoDB exists diff --git a/ansible/templates/whisk.conf.j2 b/ansible/templates/whisk.conf.j2 index e6da958e6f0..c6ef5599ebf 100644 --- a/ansible/templates/whisk.conf.j2 +++ b/ansible/templates/whisk.conf.j2 @@ -14,7 +14,7 @@ whisk { WhiskActivation = "{{ db.whisk.activations }}" } } - {% if db.backend == 'MongoDB' %} + {% if db.artifact_store.backend == 'MongoDB' %} mongodb { uri = "{{ db.mongodb.connect_string }}" database = "{{ db.mongodb.database }}"