diff --git a/.github/workflows/github-action-build.yml b/.github/workflows/github-action-build.yml index e9f0148cd83..ad8b3cc391b 100644 --- a/.github/workflows/github-action-build.yml +++ b/.github/workflows/github-action-build.yml @@ -115,6 +115,10 @@ jobs: psql -h localhost -U postgres -f core/scripts/sql/texera_lakefs.sql env: PGPASSWORD: postgres + - name: Create texera_db_for_test_cases + run: psql -h localhost -U postgres -v DB_NAME=texera_db_for_test_cases -f core/scripts/sql/texera_ddl.sql + env: + PGPASSWORD: postgres - name: Compile with sbt run: cd core && sbt clean package - name: Run backend tests diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index 2cf6e53febd..4d3a6ad9d7f 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -22,23 +22,27 @@ package edu.uci.ics.amber.engine.e2e import akka.actor.{ActorSystem, Props} import akka.testkit.{ImplicitSender, TestKit} import akka.util.Timeout -import ch.vorburger.mariadb4j.DB import com.twitter.util.{Await, Duration, Promise} import edu.uci.ics.amber.clustering.SingleNodeListener +import edu.uci.ics.amber.core.storage.DocumentFactory import edu.uci.ics.amber.core.storage.model.VirtualDocument -import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping -import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory} import edu.uci.ics.amber.core.tuple.{AttributeType, Tuple} -import edu.uci.ics.amber.core.virtualidentity.{OperatorIdentity, PhysicalOpIdentity} -import edu.uci.ics.amber.core.workflow.{GlobalPortIdentity, PortIdentity, WorkflowContext} +import edu.uci.ics.amber.core.virtualidentity.OperatorIdentity +import edu.uci.ics.amber.core.workflow.{PortIdentity, WorkflowContext} import edu.uci.ics.amber.engine.architecture.controller._ import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.EmptyRequest import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED import edu.uci.ics.amber.engine.common.AmberRuntime import edu.uci.ics.amber.engine.common.client.AmberClient -import edu.uci.ics.amber.engine.e2e.TestUtils.buildWorkflow +import edu.uci.ics.amber.engine.e2e.TestUtils.{ + buildWorkflow, + cleanupWorkflowExecutionData, + initiateTexeraDBForTestCases, + setUpWorkflowExecutionData +} import edu.uci.ics.amber.operator.TestOperators import edu.uci.ics.amber.operator.aggregate.AggregationFunction +import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId import edu.uci.ics.texera.workflow.LogicalLink import org.scalatest.flatspec.AnyFlatSpecLike import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} @@ -54,14 +58,22 @@ class DataProcessingSpec implicit val timeout: Timeout = Timeout(5.seconds) - var inMemoryMySQLInstance: Option[DB] = None val workflowContext: WorkflowContext = new WorkflowContext() + override protected def beforeEach(): Unit = { + setUpWorkflowExecutionData() + } + + override protected def afterEach(): Unit = { + cleanupWorkflowExecutionData() + } + override def beforeAll(): Unit = { system.actorOf(Props[SingleNodeListener](), "cluster-info") // These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run. // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") + initiateTexeraDBForTestCases() } override def afterAll(): Unit = { @@ -88,30 +100,21 @@ class DataProcessingSpec if (evt.state == COMPLETED) { results = workflow.logicalPlan.getTerminalOperatorIds .filter(terminalOpId => { - val uri = VFSURIFactory.createResultURI( - workflowContext.workflowId, + val uri = getResultUriByLogicalPortId( workflowContext.executionId, - GlobalPortIdentity( - PhysicalOpIdentity(logicalOpId = terminalOpId, layerName = "main"), - PortIdentity() - ) + terminalOpId, + PortIdentity() ) - // expecting the first output port only. - ExecutionResourcesMapping - .getResourceURIs(workflowContext.executionId) - .contains(uri) + uri.nonEmpty }) .map(terminalOpId => { //TODO: remove the delay after fixing the issue of reporting "completed" status too early. Thread.sleep(1000) - val uri = VFSURIFactory.createResultURI( - workflowContext.workflowId, + val uri = getResultUriByLogicalPortId( workflowContext.executionId, - GlobalPortIdentity( - PhysicalOpIdentity(logicalOpId = terminalOpId, layerName = "main"), - PortIdentity() - ) - ) + terminalOpId, + PortIdentity() + ).get terminalOpId -> DocumentFactory .openDocument(uri) ._1 diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala index bd4164798de..db28c9f4597 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala @@ -31,10 +31,15 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.EmptyRequest import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED import edu.uci.ics.amber.engine.common.AmberRuntime import edu.uci.ics.amber.engine.common.client.AmberClient +import edu.uci.ics.amber.engine.e2e.TestUtils.{ + cleanupWorkflowExecutionData, + initiateTexeraDBForTestCases, + setUpWorkflowExecutionData +} import edu.uci.ics.amber.operator.{LogicalOp, TestOperators} import edu.uci.ics.texera.workflow.LogicalLink -import org.scalatest.BeforeAndAfterAll import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import scala.concurrent.duration._ @@ -42,17 +47,27 @@ class PauseSpec extends TestKit(ActorSystem("PauseSpec", AmberRuntime.akkaConfig)) with ImplicitSender with AnyFlatSpecLike - with BeforeAndAfterAll { + with BeforeAndAfterAll + with BeforeAndAfterEach { implicit val timeout: Timeout = Timeout(5.seconds) val logger = Logger("PauseSpecLogger") + override protected def beforeEach(): Unit = { + setUpWorkflowExecutionData() + } + + override protected def afterEach(): Unit = { + cleanupWorkflowExecutionData() + } + override def beforeAll(): Unit = { system.actorOf(Props[SingleNodeListener](), "cluster-info") // These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run. // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") + initiateTexeraDBForTestCases() } override def afterAll(): Unit = { diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/TestUtils.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/TestUtils.scala index b20c74cb282..c80ece82e2c 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/TestUtils.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/TestUtils.scala @@ -19,9 +19,24 @@ package edu.uci.ics.amber.engine.e2e +import edu.uci.ics.amber.config.StorageConfig import edu.uci.ics.amber.core.workflow.WorkflowContext import edu.uci.ics.amber.engine.architecture.controller.Workflow import edu.uci.ics.amber.operator.LogicalOp +import edu.uci.ics.texera.dao.SqlServer +import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum +import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ + UserDao, + WorkflowDao, + WorkflowExecutionsDao, + WorkflowVersionDao +} +import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ + User, + WorkflowExecutions, + WorkflowVersion, + Workflow => WorkflowPojo +} import edu.uci.ics.texera.web.model.websocket.request.LogicalPlanPojo import edu.uci.ics.texera.workflow.{LogicalLink, WorkflowCompiler} @@ -40,4 +55,78 @@ object TestUtils { ) } + /** + * If a test case accesses the user system through singleton resources that cache the DSLContext (e.g., executes a + * workflow, which accesses WorkflowExecutionsResource), we use a separate texera_db specifically for such test cases. + * Note such test cases need to clean up the database at the end of running each test case. + */ + def initiateTexeraDBForTestCases(): Unit = { + SqlServer.initConnection( + StorageConfig.jdbcUrlForTestCases, + StorageConfig.jdbcUsername, + StorageConfig.jdbcPassword + ) + } + + val testUser: User = { + val user = new User + user.setUid(Integer.valueOf(1)) + user.setName("test_user") + user.setRole(UserRoleEnum.ADMIN) + user.setPassword("123") + user.setEmail("test_user@test.com") + user + } + + val testWorkflowEntry: WorkflowPojo = { + val workflow = new WorkflowPojo + workflow.setName("test workflow") + workflow.setWid(Integer.valueOf(1)) + workflow.setContent("test workflow content") + workflow.setDescription("test description") + workflow + } + + val testWorkflowVersionEntry: WorkflowVersion = { + val workflowVersion = new WorkflowVersion + workflowVersion.setWid(Integer.valueOf(1)) + workflowVersion.setVid(Integer.valueOf(1)) + workflowVersion.setContent("test version content") + workflowVersion + } + + val testWorkflowExecutionEntry: WorkflowExecutions = { + val workflowExecution = new WorkflowExecutions + workflowExecution.setEid(Integer.valueOf(1)) + workflowExecution.setVid(Integer.valueOf(1)) + workflowExecution.setUid(Integer.valueOf(1)) + workflowExecution.setStatus(3.toByte) + workflowExecution.setEnvironmentVersion("test engine") + workflowExecution + } + + def setUpWorkflowExecutionData(): Unit = { + val dslConfig = SqlServer.getInstance().context.configuration() + val userDao = new UserDao(dslConfig) + val workflowDao = new WorkflowDao(dslConfig) + val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) + val workflowVersionDao = new WorkflowVersionDao(dslConfig) + userDao.insert(testUser) + workflowDao.insert(testWorkflowEntry) + workflowVersionDao.insert(testWorkflowVersionEntry) + workflowExecutionsDao.insert(testWorkflowExecutionEntry) + } + + def cleanupWorkflowExecutionData(): Unit = { + val dslConfig = SqlServer.getInstance().context.configuration() + val userDao = new UserDao(dslConfig) + val workflowDao = new WorkflowDao(dslConfig) + val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) + val workflowVersionDao = new WorkflowVersionDao(dslConfig) + workflowExecutionsDao.deleteById(1) + workflowVersionDao.deleteById(1) + workflowDao.deleteById(1) + userDao.deleteById(1) + } + } diff --git a/core/config/src/main/resources/storage.conf b/core/config/src/main/resources/storage.conf index 7aba655b461..85a62b77a3b 100644 --- a/core/config/src/main/resources/storage.conf +++ b/core/config/src/main/resources/storage.conf @@ -125,6 +125,12 @@ storage { url = "jdbc:postgresql://localhost:5432/texera_db?currentSchema=texera_db,public" url = ${?STORAGE_JDBC_URL} + # Some e2e test cases require the user system. To make sure running those test cases can pass the CI, and that + # running them locally do not contaminate developers' own texera_db, we use another database with a different + # name for running these test cases. + url-for-test-cases = "jdbc:postgresql://localhost:5432/texera_db_for_test_cases?currentSchema=texera_db,public" + url-for-test-cases = ${?STORAGE_JDBC_URL_FOR_TEST_CASES} + username = "postgres" username = ${?STORAGE_JDBC_USERNAME} diff --git a/core/config/src/main/resources/user-system.conf b/core/config/src/main/resources/user-system.conf index aa7d65a3332..72f5e23d234 100644 --- a/core/config/src/main/resources/user-system.conf +++ b/core/config/src/main/resources/user-system.conf @@ -17,7 +17,7 @@ # See PR https://github.com/Texera/texera/pull/3326 for configuration guidelines. user-sys { - enabled = false + enabled = true enabled = ${?USER_SYS_ENABLED} admin-username = "texera" diff --git a/core/config/src/main/scala/edu/uci/ics/amber/config/StorageConfig.scala b/core/config/src/main/scala/edu/uci/ics/amber/config/StorageConfig.scala index fd369fbb5b6..92a8e15d3f4 100644 --- a/core/config/src/main/scala/edu/uci/ics/amber/config/StorageConfig.scala +++ b/core/config/src/main/scala/edu/uci/ics/amber/config/StorageConfig.scala @@ -34,6 +34,7 @@ object StorageConfig { // JDBC specifics val jdbcUrl: String = conf.getString("storage.jdbc.url") + val jdbcUrlForTestCases: String = conf.getString("storage.jdbc.url-for-test-cases") val jdbcUsername: String = conf.getString("storage.jdbc.username") val jdbcPassword: String = conf.getString("storage.jdbc.password") diff --git a/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala b/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala index 18eb92b9f34..dd30bc0bda1 100644 --- a/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala +++ b/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala @@ -62,6 +62,10 @@ object SqlServer { instance.get } + def clearInstance(): Unit = { + instance = None + } + /** * A utility function for create a transaction block using given sql context * @param dsl the sql context diff --git a/core/dao/src/test/scala/edu/uci/ics/texera/dao/MockTexeraDB.scala b/core/dao/src/test/scala/edu/uci/ics/texera/dao/MockTexeraDB.scala index 1bb49518163..cc291f94927 100644 --- a/core/dao/src/test/scala/edu/uci/ics/texera/dao/MockTexeraDB.scala +++ b/core/dao/src/test/scala/edu/uci/ics/texera/dao/MockTexeraDB.scala @@ -67,6 +67,7 @@ trait MockTexeraDB { value.close() dbInstance = None dslContext = None + SqlServer.clearInstance() case None => // do nothing } @@ -92,12 +93,15 @@ trait MockTexeraDB { } finally { source.close() } - val parts: Array[String] = content.split("(?m)^\\\\c texera_db") + val parts: Array[String] = content.split("(?m)^CREATE DATABASE :\"DB_NAME\";") def removeCCommands(sql: String): String = sql.linesIterator .filterNot(_.trim.startsWith("\\c")) .mkString("\n") - executeScriptInJDBC(embedded.getPostgresDatabase.getConnection, removeCCommands(parts(0))) + val createDBStatement = + """DROP DATABASE IF EXISTS texera_db; + |CREATE DATABASE texera_db;""".stripMargin + executeScriptInJDBC(embedded.getPostgresDatabase.getConnection, createDBStatement) val texeraDB = embedded.getDatabase(username, database) var tablesAndIndexCreation = removeCCommands(parts(1)) diff --git a/core/scripts/sql/texera_ddl.sql b/core/scripts/sql/texera_ddl.sql index b9cac6f5457..7b0f9b9063d 100644 --- a/core/scripts/sql/texera_ddl.sql +++ b/core/scripts/sql/texera_ddl.sql @@ -15,18 +15,29 @@ -- specific language governing permissions and limitations -- under the License. +-- ============================================ +-- 0. Specify the database name +-- (defaults to texera_db) +-- Override the name with: +-- psql -v DB_NAME= ... +-- ============================================ +\if :{?DB_NAME} +\else + \set DB_NAME 'texera_db' +\endif + -- ============================================ -- 1. Drop and recreate the database (psql only) -- Remove if you already created texera_db -- ============================================ \c postgres -DROP DATABASE IF EXISTS texera_db; -CREATE DATABASE texera_db; +DROP DATABASE IF EXISTS :"DB_NAME"; +CREATE DATABASE :"DB_NAME"; -- ============================================ -- 2. Connect to the new database (psql only) -- ============================================ -\c texera_db +\c :"DB_NAME" CREATE SCHEMA IF NOT EXISTS texera_db; SET search_path TO texera_db, public;