From 31fe50bd11020f907653a98fc312019f6fb8192d Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Mon, 29 Sep 2025 09:51:49 -0700 Subject: [PATCH 01/28] chore(config): enable user system by default --- core/config/src/main/resources/user-system.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/config/src/main/resources/user-system.conf b/core/config/src/main/resources/user-system.conf index aa7d65a3332..72f5e23d234 100644 --- a/core/config/src/main/resources/user-system.conf +++ b/core/config/src/main/resources/user-system.conf @@ -17,7 +17,7 @@ # See PR https://github.com/Texera/texera/pull/3326 for configuration guidelines. user-sys { - enabled = false + enabled = true enabled = ${?USER_SYS_ENABLED} admin-username = "texera" From e4ea83ab221926f491e4602e7c7ebcc0961d168a Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Tue, 30 Sep 2025 16:55:14 -0700 Subject: [PATCH 02/28] wip. --- .../amber/engine/e2e/DataProcessingSpec.scala | 85 +++++++++++++++++-- 1 file changed, 79 insertions(+), 6 deletions(-) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index 2cf6e53febd..e260be4c356 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -26,7 +26,6 @@ import ch.vorburger.mariadb4j.DB import com.twitter.util.{Await, Duration, Promise} import edu.uci.ics.amber.clustering.SingleNodeListener import edu.uci.ics.amber.core.storage.model.VirtualDocument -import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory} import edu.uci.ics.amber.core.tuple.{AttributeType, Tuple} import edu.uci.ics.amber.core.virtualidentity.{OperatorIdentity, PhysicalOpIdentity} @@ -39,6 +38,20 @@ import edu.uci.ics.amber.engine.common.client.AmberClient import edu.uci.ics.amber.engine.e2e.TestUtils.buildWorkflow import edu.uci.ics.amber.operator.TestOperators import edu.uci.ics.amber.operator.aggregate.AggregationFunction +import edu.uci.ics.texera.dao.MockTexeraDB +import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum +import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ + UserDao, + WorkflowDao, + WorkflowExecutionsDao, + WorkflowVersionDao +} +import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ + User, + WorkflowExecutions, + WorkflowVersion, + Workflow => WorkflowPojo +} import edu.uci.ics.texera.workflow.LogicalLink import org.scalatest.flatspec.AnyFlatSpecLike import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} @@ -50,22 +63,85 @@ class DataProcessingSpec with ImplicitSender with AnyFlatSpecLike with BeforeAndAfterAll - with BeforeAndAfterEach { + with BeforeAndAfterEach + with MockTexeraDB { implicit val timeout: Timeout = Timeout(5.seconds) var inMemoryMySQLInstance: Option[DB] = None val workflowContext: WorkflowContext = new WorkflowContext() + private val testUser: User = { + val user = new User + user.setUid(Integer.valueOf(1)) + user.setName("test_user") + user.setRole(UserRoleEnum.ADMIN) + user.setPassword("123") + user.setEmail("test_user@test.com") + user + } + + private val testWorkflowEntry: WorkflowPojo = { + val workflow = new WorkflowPojo + workflow.setName("test workflow") + workflow.setWid(Integer.valueOf(1)) + workflow.setContent("test workflow content") + workflow.setDescription("test description") + workflow + } + + private val testWorkflowVersionEntry: WorkflowVersion = { + val workflowVersion = new WorkflowVersion + workflowVersion.setWid(Integer.valueOf(1)) + workflowVersion.setVid(Integer.valueOf(1)) + workflowVersion.setContent("test version content") + workflowVersion + } + + private val testWorkflowExecutionEntry: WorkflowExecutions = { + val workflowExecution = new WorkflowExecutions + workflowExecution.setEid(Integer.valueOf(1)) + workflowExecution.setVid(Integer.valueOf(1)) + workflowExecution.setUid(Integer.valueOf(1)) + workflowExecution.setStatus(3.toByte) + workflowExecution.setEnvironmentVersion("test engine") + workflowExecution + } + + override protected def beforeEach(): Unit = { + val userDao = new UserDao(getDSLContext.configuration()) + val workflowDao = new WorkflowDao(getDSLContext.configuration()) + val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) + val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) + userDao.insert(testUser) + workflowDao.insert(testWorkflowEntry) + workflowVersionDao.insert(testWorkflowVersionEntry) + workflowExecutionsDao.insert(testWorkflowExecutionEntry) + } + + override protected def afterEach(): Unit = { + val dsl = getDSLContext + dsl.execute( + """ + TRUNCATE TABLE workflow_executions RESTART IDENTITY CASCADE; + TRUNCATE TABLE workflow_version RESTART IDENTITY CASCADE; + TRUNCATE TABLE workflow RESTART IDENTITY CASCADE; + TRUNCATE TABLE "user" RESTART IDENTITY CASCADE; + """.stripMargin + ) + } + override def beforeAll(): Unit = { system.actorOf(Props[SingleNodeListener](), "cluster-info") // These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run. // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") + initializeDBAndReplaceDSLContext() } override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) + shutdownDB() } def executeWorkflow(workflow: Workflow): Map[OperatorIdentity, List[Tuple]] = { @@ -96,10 +172,7 @@ class DataProcessingSpec PortIdentity() ) ) - // expecting the first output port only. - ExecutionResourcesMapping - .getResourceURIs(workflowContext.executionId) - .contains(uri) + true }) .map(terminalOpId => { //TODO: remove the delay after fixing the issue of reporting "completed" status too early. From 6934701bcf7f3d6fb1f798977071f003a81bda28 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Tue, 30 Sep 2025 20:00:55 -0700 Subject: [PATCH 03/28] add mockTexeraDB to tests. --- .../amber/engine/e2e/DataProcessingSpec.scala | 48 +++++------ .../uci/ics/amber/engine/e2e/PauseSpec.scala | 81 ++++++++++++++++++- 2 files changed, 100 insertions(+), 29 deletions(-) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index e260be4c356..7ddbf3f6e7c 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -25,11 +25,11 @@ import akka.util.Timeout import ch.vorburger.mariadb4j.DB import com.twitter.util.{Await, Duration, Promise} import edu.uci.ics.amber.clustering.SingleNodeListener +import edu.uci.ics.amber.core.storage.DocumentFactory import edu.uci.ics.amber.core.storage.model.VirtualDocument -import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory} import edu.uci.ics.amber.core.tuple.{AttributeType, Tuple} -import edu.uci.ics.amber.core.virtualidentity.{OperatorIdentity, PhysicalOpIdentity} -import edu.uci.ics.amber.core.workflow.{GlobalPortIdentity, PortIdentity, WorkflowContext} +import edu.uci.ics.amber.core.virtualidentity.OperatorIdentity +import edu.uci.ics.amber.core.workflow.{PortIdentity, WorkflowContext} import edu.uci.ics.amber.engine.architecture.controller._ import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.EmptyRequest import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED @@ -52,6 +52,7 @@ import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ WorkflowVersion, Workflow => WorkflowPojo } +import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId import edu.uci.ics.texera.workflow.LogicalLink import org.scalatest.flatspec.AnyFlatSpecLike import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} @@ -120,15 +121,14 @@ class DataProcessingSpec } override protected def afterEach(): Unit = { - val dsl = getDSLContext - dsl.execute( - """ - TRUNCATE TABLE workflow_executions RESTART IDENTITY CASCADE; - TRUNCATE TABLE workflow_version RESTART IDENTITY CASCADE; - TRUNCATE TABLE workflow RESTART IDENTITY CASCADE; - TRUNCATE TABLE "user" RESTART IDENTITY CASCADE; - """.stripMargin - ) + val userDao = new UserDao(getDSLContext.configuration()) + val workflowDao = new WorkflowDao(getDSLContext.configuration()) + val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) + val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) + workflowExecutionsDao.deleteById(1) + workflowVersionDao.deleteById(1) + workflowDao.deleteById(1) + userDao.deleteById(1) } override def beforeAll(): Unit = { @@ -140,8 +140,8 @@ class DataProcessingSpec } override def afterAll(): Unit = { - TestKit.shutdownActorSystem(system) shutdownDB() + TestKit.shutdownActorSystem(system) } def executeWorkflow(workflow: Workflow): Map[OperatorIdentity, List[Tuple]] = { @@ -164,27 +164,21 @@ class DataProcessingSpec if (evt.state == COMPLETED) { results = workflow.logicalPlan.getTerminalOperatorIds .filter(terminalOpId => { - val uri = VFSURIFactory.createResultURI( - workflowContext.workflowId, + val uri = getResultUriByLogicalPortId( workflowContext.executionId, - GlobalPortIdentity( - PhysicalOpIdentity(logicalOpId = terminalOpId, layerName = "main"), - PortIdentity() - ) + terminalOpId, + PortIdentity() ) - true + uri.nonEmpty }) .map(terminalOpId => { //TODO: remove the delay after fixing the issue of reporting "completed" status too early. Thread.sleep(1000) - val uri = VFSURIFactory.createResultURI( - workflowContext.workflowId, + val uri = getResultUriByLogicalPortId( workflowContext.executionId, - GlobalPortIdentity( - PhysicalOpIdentity(logicalOpId = terminalOpId, layerName = "main"), - PortIdentity() - ) - ) + terminalOpId, + PortIdentity() + ).get terminalOpId -> DocumentFactory .openDocument(uri) ._1 diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala index bd4164798de..f07b8d24591 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala @@ -32,8 +32,22 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregat import edu.uci.ics.amber.engine.common.AmberRuntime import edu.uci.ics.amber.engine.common.client.AmberClient import edu.uci.ics.amber.operator.{LogicalOp, TestOperators} +import edu.uci.ics.texera.dao.MockTexeraDB +import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum +import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ + UserDao, + WorkflowDao, + WorkflowExecutionsDao, + WorkflowVersionDao +} +import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ + User, + WorkflowExecutions, + WorkflowVersion, + Workflow => WorkflowPojo +} import edu.uci.ics.texera.workflow.LogicalLink -import org.scalatest.BeforeAndAfterAll +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.flatspec.AnyFlatSpecLike import scala.concurrent.duration._ @@ -42,20 +56,83 @@ class PauseSpec extends TestKit(ActorSystem("PauseSpec", AmberRuntime.akkaConfig)) with ImplicitSender with AnyFlatSpecLike - with BeforeAndAfterAll { + with BeforeAndAfterAll + with BeforeAndAfterEach + with MockTexeraDB { implicit val timeout: Timeout = Timeout(5.seconds) val logger = Logger("PauseSpecLogger") + private val testUser: User = { + val user = new User + user.setUid(Integer.valueOf(1)) + user.setName("test_user") + user.setRole(UserRoleEnum.ADMIN) + user.setPassword("123") + user.setEmail("test_user@test.com") + user + } + + private val testWorkflowEntry: WorkflowPojo = { + val workflow = new WorkflowPojo + workflow.setName("test workflow") + workflow.setWid(Integer.valueOf(1)) + workflow.setContent("test workflow content") + workflow.setDescription("test description") + workflow + } + + private val testWorkflowVersionEntry: WorkflowVersion = { + val workflowVersion = new WorkflowVersion + workflowVersion.setWid(Integer.valueOf(1)) + workflowVersion.setVid(Integer.valueOf(1)) + workflowVersion.setContent("test version content") + workflowVersion + } + + private val testWorkflowExecutionEntry: WorkflowExecutions = { + val workflowExecution = new WorkflowExecutions + workflowExecution.setEid(Integer.valueOf(1)) + workflowExecution.setVid(Integer.valueOf(1)) + workflowExecution.setUid(Integer.valueOf(1)) + workflowExecution.setStatus(3.toByte) + workflowExecution.setEnvironmentVersion("test engine") + workflowExecution + } + + override protected def beforeEach(): Unit = { + val userDao = new UserDao(getDSLContext.configuration()) + val workflowDao = new WorkflowDao(getDSLContext.configuration()) + val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) + val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) + userDao.insert(testUser) + workflowDao.insert(testWorkflowEntry) + workflowVersionDao.insert(testWorkflowVersionEntry) + workflowExecutionsDao.insert(testWorkflowExecutionEntry) + } + + override protected def afterEach(): Unit = { + val userDao = new UserDao(getDSLContext.configuration()) + val workflowDao = new WorkflowDao(getDSLContext.configuration()) + val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) + val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) + workflowExecutionsDao.deleteById(1) + workflowVersionDao.deleteById(1) + workflowDao.deleteById(1) + userDao.deleteById(1) + } + override def beforeAll(): Unit = { system.actorOf(Props[SingleNodeListener](), "cluster-info") // These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run. // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") + initializeDBAndReplaceDSLContext() } override def afterAll(): Unit = { + shutdownDB() TestKit.shutdownActorSystem(system) } From 1eaf0021d01ac24f9b008c03a4af8e512c3146f1 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Tue, 30 Sep 2025 21:12:15 -0700 Subject: [PATCH 04/28] wip. --- .../scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index 7ddbf3f6e7c..0da710accb7 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -132,7 +132,7 @@ class DataProcessingSpec } override def beforeAll(): Unit = { - system.actorOf(Props[SingleNodeListener](), "cluster-info") + system.actorOf(Props[SingleNodeListener](), "cluster-info-DataProcessingSpec") // These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run. // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") From f53ad2e64c9d40e6bc425ff9c35b1ca076149a52 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Tue, 30 Sep 2025 21:20:48 -0700 Subject: [PATCH 05/28] wip. --- .../scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index 0da710accb7..7ddbf3f6e7c 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -132,7 +132,7 @@ class DataProcessingSpec } override def beforeAll(): Unit = { - system.actorOf(Props[SingleNodeListener](), "cluster-info-DataProcessingSpec") + system.actorOf(Props[SingleNodeListener](), "cluster-info") // These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run. // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") From 1addd2d638dcd7a2c9be2533ce8d1891c7d2dcdb Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Tue, 30 Sep 2025 21:28:12 -0700 Subject: [PATCH 06/28] wip. --- .../amber/engine/e2e/DataProcessingSpec.scala | 42 +++++++++---------- .../uci/ics/amber/engine/e2e/PauseSpec.scala | 42 +++++++++---------- 2 files changed, 38 insertions(+), 46 deletions(-) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index 7ddbf3f6e7c..b4ada7a76c8 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -25,6 +25,7 @@ import akka.util.Timeout import ch.vorburger.mariadb4j.DB import com.twitter.util.{Await, Duration, Promise} import edu.uci.ics.amber.clustering.SingleNodeListener +import edu.uci.ics.amber.config.StorageConfig import edu.uci.ics.amber.core.storage.DocumentFactory import edu.uci.ics.amber.core.storage.model.VirtualDocument import edu.uci.ics.amber.core.tuple.{AttributeType, Tuple} @@ -38,20 +39,10 @@ import edu.uci.ics.amber.engine.common.client.AmberClient import edu.uci.ics.amber.engine.e2e.TestUtils.buildWorkflow import edu.uci.ics.amber.operator.TestOperators import edu.uci.ics.amber.operator.aggregate.AggregationFunction -import edu.uci.ics.texera.dao.MockTexeraDB +import edu.uci.ics.texera.dao.{MockTexeraDB, SqlServer} import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum -import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ - UserDao, - WorkflowDao, - WorkflowExecutionsDao, - WorkflowVersionDao -} -import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ - User, - WorkflowExecutions, - WorkflowVersion, - Workflow => WorkflowPojo -} +import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{UserDao, WorkflowDao, WorkflowExecutionsDao, WorkflowVersionDao} +import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{User, WorkflowExecutions, WorkflowVersion, Workflow => WorkflowPojo} import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId import edu.uci.ics.texera.workflow.LogicalLink import org.scalatest.flatspec.AnyFlatSpecLike @@ -110,10 +101,11 @@ class DataProcessingSpec } override protected def beforeEach(): Unit = { - val userDao = new UserDao(getDSLContext.configuration()) - val workflowDao = new WorkflowDao(getDSLContext.configuration()) - val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) - val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) + val dslConfig = SqlServer.getInstance().context.configuration() + val userDao = new UserDao(dslConfig) + val workflowDao = new WorkflowDao(dslConfig) + val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) + val workflowVersionDao = new WorkflowVersionDao(dslConfig) userDao.insert(testUser) workflowDao.insert(testWorkflowEntry) workflowVersionDao.insert(testWorkflowVersionEntry) @@ -121,10 +113,11 @@ class DataProcessingSpec } override protected def afterEach(): Unit = { - val userDao = new UserDao(getDSLContext.configuration()) - val workflowDao = new WorkflowDao(getDSLContext.configuration()) - val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) - val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) + val dslConfig = SqlServer.getInstance().context.configuration() + val userDao = new UserDao(dslConfig) + val workflowDao = new WorkflowDao(dslConfig) + val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) + val workflowVersionDao = new WorkflowVersionDao(dslConfig) workflowExecutionsDao.deleteById(1) workflowVersionDao.deleteById(1) workflowDao.deleteById(1) @@ -136,11 +129,14 @@ class DataProcessingSpec // These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run. // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") - initializeDBAndReplaceDSLContext() + SqlServer.initConnection( + StorageConfig.jdbcUrl, + StorageConfig.jdbcUsername, + StorageConfig.jdbcPassword + ) } override def afterAll(): Unit = { - shutdownDB() TestKit.shutdownActorSystem(system) } diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala index f07b8d24591..9f0a136de70 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala @@ -25,6 +25,7 @@ import akka.util.Timeout import com.twitter.util.{Await, Promise} import com.typesafe.scalalogging.Logger import edu.uci.ics.amber.clustering.SingleNodeListener +import edu.uci.ics.amber.config.StorageConfig import edu.uci.ics.amber.core.workflow.{PortIdentity, WorkflowContext} import edu.uci.ics.amber.engine.architecture.controller.{ControllerConfig, ExecutionStateUpdate} import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.EmptyRequest @@ -32,20 +33,10 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregat import edu.uci.ics.amber.engine.common.AmberRuntime import edu.uci.ics.amber.engine.common.client.AmberClient import edu.uci.ics.amber.operator.{LogicalOp, TestOperators} -import edu.uci.ics.texera.dao.MockTexeraDB +import edu.uci.ics.texera.dao.{MockTexeraDB, SqlServer} import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum -import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ - UserDao, - WorkflowDao, - WorkflowExecutionsDao, - WorkflowVersionDao -} -import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ - User, - WorkflowExecutions, - WorkflowVersion, - Workflow => WorkflowPojo -} +import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{UserDao, WorkflowDao, WorkflowExecutionsDao, WorkflowVersionDao} +import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{User, WorkflowExecutions, WorkflowVersion, Workflow => WorkflowPojo} import edu.uci.ics.texera.workflow.LogicalLink import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.flatspec.AnyFlatSpecLike @@ -102,10 +93,11 @@ class PauseSpec } override protected def beforeEach(): Unit = { - val userDao = new UserDao(getDSLContext.configuration()) - val workflowDao = new WorkflowDao(getDSLContext.configuration()) - val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) - val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) + val dslConfig = SqlServer.getInstance().context.configuration() + val userDao = new UserDao(dslConfig) + val workflowDao = new WorkflowDao(dslConfig) + val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) + val workflowVersionDao = new WorkflowVersionDao(dslConfig) userDao.insert(testUser) workflowDao.insert(testWorkflowEntry) workflowVersionDao.insert(testWorkflowVersionEntry) @@ -113,10 +105,11 @@ class PauseSpec } override protected def afterEach(): Unit = { - val userDao = new UserDao(getDSLContext.configuration()) - val workflowDao = new WorkflowDao(getDSLContext.configuration()) - val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) - val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) + val dslConfig = SqlServer.getInstance().context.configuration() + val userDao = new UserDao(dslConfig) + val workflowDao = new WorkflowDao(dslConfig) + val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) + val workflowVersionDao = new WorkflowVersionDao(dslConfig) workflowExecutionsDao.deleteById(1) workflowVersionDao.deleteById(1) workflowDao.deleteById(1) @@ -128,11 +121,14 @@ class PauseSpec // These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run. // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") - initializeDBAndReplaceDSLContext() + SqlServer.initConnection( + StorageConfig.jdbcUrl, + StorageConfig.jdbcUsername, + StorageConfig.jdbcPassword + ) } override def afterAll(): Unit = { - shutdownDB() TestKit.shutdownActorSystem(system) } From bd92fd895007772151de7ec7d0f0351f2200a8f7 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Tue, 30 Sep 2025 21:32:24 -0700 Subject: [PATCH 07/28] wip. --- .../ics/amber/engine/e2e/DataProcessingSpec.scala | 14 ++++++++++++-- .../edu/uci/ics/amber/engine/e2e/PauseSpec.scala | 14 ++++++++++++-- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index b4ada7a76c8..8193dfe7884 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -41,8 +41,18 @@ import edu.uci.ics.amber.operator.TestOperators import edu.uci.ics.amber.operator.aggregate.AggregationFunction import edu.uci.ics.texera.dao.{MockTexeraDB, SqlServer} import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum -import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{UserDao, WorkflowDao, WorkflowExecutionsDao, WorkflowVersionDao} -import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{User, WorkflowExecutions, WorkflowVersion, Workflow => WorkflowPojo} +import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ + UserDao, + WorkflowDao, + WorkflowExecutionsDao, + WorkflowVersionDao +} +import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ + User, + WorkflowExecutions, + WorkflowVersion, + Workflow => WorkflowPojo +} import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId import edu.uci.ics.texera.workflow.LogicalLink import org.scalatest.flatspec.AnyFlatSpecLike diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala index 9f0a136de70..383fe0b3100 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala @@ -35,8 +35,18 @@ import edu.uci.ics.amber.engine.common.client.AmberClient import edu.uci.ics.amber.operator.{LogicalOp, TestOperators} import edu.uci.ics.texera.dao.{MockTexeraDB, SqlServer} import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum -import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{UserDao, WorkflowDao, WorkflowExecutionsDao, WorkflowVersionDao} -import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{User, WorkflowExecutions, WorkflowVersion, Workflow => WorkflowPojo} +import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ + UserDao, + WorkflowDao, + WorkflowExecutionsDao, + WorkflowVersionDao +} +import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ + User, + WorkflowExecutions, + WorkflowVersion, + Workflow => WorkflowPojo +} import edu.uci.ics.texera.workflow.LogicalLink import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.flatspec.AnyFlatSpecLike From c4e4501550f311de9b05303898053bd19ab22f43 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Tue, 30 Sep 2025 21:52:00 -0700 Subject: [PATCH 08/28] wip. --- core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala b/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala index 18eb92b9f34..dcda6f1aaa1 100644 --- a/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala +++ b/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala @@ -55,6 +55,8 @@ object SqlServer { if (instance.isEmpty) { val server = new SqlServer(url, user, password) instance = Some(server) + } else { + println(s"Reusing existing SqlServer: ${instance.get.context}") } } From 0a4c262c6dabda58d3f920c3040707ddd83daa3f Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Tue, 30 Sep 2025 21:59:21 -0700 Subject: [PATCH 09/28] wip. --- core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala b/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala index dcda6f1aaa1..18eb92b9f34 100644 --- a/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala +++ b/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala @@ -55,8 +55,6 @@ object SqlServer { if (instance.isEmpty) { val server = new SqlServer(url, user, password) instance = Some(server) - } else { - println(s"Reusing existing SqlServer: ${instance.get.context}") } } From cf07a32b6cfaf7980ea3e8b15462f61754da4c97 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Wed, 1 Oct 2025 09:58:09 -0700 Subject: [PATCH 10/28] wip. --- core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala b/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala index 18eb92b9f34..823de355d9e 100644 --- a/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala +++ b/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala @@ -53,8 +53,11 @@ object SqlServer { def initConnection(url: String, user: String, password: String): Unit = { if (instance.isEmpty) { + println(s"Initiating connection: $url") val server = new SqlServer(url, user, password) instance = Some(server) + } else { + println(s"Reusing connection: ${instance.get.dataSource.getURL}") } } From 243873218aada29f6c2b4f8e53960d6653664da1 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Wed, 1 Oct 2025 10:59:20 -0700 Subject: [PATCH 11/28] wip. --- .../scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index 8193dfe7884..8dfd3871119 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -112,6 +112,7 @@ class DataProcessingSpec override protected def beforeEach(): Unit = { val dslConfig = SqlServer.getInstance().context.configuration() + println(s"Using dslConfig $dslConfig") val userDao = new UserDao(dslConfig) val workflowDao = new WorkflowDao(dslConfig) val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) From d482a6acc4cf3d86f141b53449ed8fb850e66c52 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Wed, 1 Oct 2025 11:29:16 -0700 Subject: [PATCH 12/28] wip. --- .../amber/engine/e2e/DataProcessingSpec.scala | 33 +++++++------------ .../uci/ics/amber/engine/e2e/PauseSpec.scala | 33 ++++++++----------- 2 files changed, 26 insertions(+), 40 deletions(-) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index 8dfd3871119..e61338c2044 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -41,20 +41,13 @@ import edu.uci.ics.amber.operator.TestOperators import edu.uci.ics.amber.operator.aggregate.AggregationFunction import edu.uci.ics.texera.dao.{MockTexeraDB, SqlServer} import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum -import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ - UserDao, - WorkflowDao, - WorkflowExecutionsDao, - WorkflowVersionDao -} -import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ - User, - WorkflowExecutions, - WorkflowVersion, - Workflow => WorkflowPojo -} +import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{UserDao, WorkflowDao, WorkflowExecutionsDao, WorkflowVersionDao} +import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{User, WorkflowExecutions, WorkflowVersion, Workflow => WorkflowPojo} import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId import edu.uci.ics.texera.workflow.LogicalLink +import org.jooq.SQLDialect +import org.jooq.impl.DSL +import org.postgresql.ds.PGSimpleDataSource import org.scalatest.flatspec.AnyFlatSpecLike import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} @@ -70,7 +63,8 @@ class DataProcessingSpec implicit val timeout: Timeout = Timeout(5.seconds) - var inMemoryMySQLInstance: Option[DB] = None + val dataSource: PGSimpleDataSource = new PGSimpleDataSource() + val SQL_DIALECT: SQLDialect = SQLDialect.POSTGRES val workflowContext: WorkflowContext = new WorkflowContext() private val testUser: User = { @@ -111,8 +105,7 @@ class DataProcessingSpec } override protected def beforeEach(): Unit = { - val dslConfig = SqlServer.getInstance().context.configuration() - println(s"Using dslConfig $dslConfig") + val dslConfig = DSL.using(dataSource, SQL_DIALECT).configuration() val userDao = new UserDao(dslConfig) val workflowDao = new WorkflowDao(dslConfig) val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) @@ -124,7 +117,7 @@ class DataProcessingSpec } override protected def afterEach(): Unit = { - val dslConfig = SqlServer.getInstance().context.configuration() + val dslConfig = DSL.using(dataSource, SQL_DIALECT).configuration() val userDao = new UserDao(dslConfig) val workflowDao = new WorkflowDao(dslConfig) val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) @@ -140,11 +133,9 @@ class DataProcessingSpec // These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run. // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") - SqlServer.initConnection( - StorageConfig.jdbcUrl, - StorageConfig.jdbcUsername, - StorageConfig.jdbcPassword - ) + dataSource.setUrl(StorageConfig.jdbcUrl) + dataSource.setUser(StorageConfig.jdbcUsername) + dataSource.setUser(StorageConfig.jdbcPassword) } override def afterAll(): Unit = { diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala index 383fe0b3100..ac465406c7b 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala @@ -35,19 +35,12 @@ import edu.uci.ics.amber.engine.common.client.AmberClient import edu.uci.ics.amber.operator.{LogicalOp, TestOperators} import edu.uci.ics.texera.dao.{MockTexeraDB, SqlServer} import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum -import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ - UserDao, - WorkflowDao, - WorkflowExecutionsDao, - WorkflowVersionDao -} -import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ - User, - WorkflowExecutions, - WorkflowVersion, - Workflow => WorkflowPojo -} +import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{UserDao, WorkflowDao, WorkflowExecutionsDao, WorkflowVersionDao} +import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{User, WorkflowExecutions, WorkflowVersion, Workflow => WorkflowPojo} import edu.uci.ics.texera.workflow.LogicalLink +import org.jooq.SQLDialect +import org.jooq.impl.DSL +import org.postgresql.ds.PGSimpleDataSource import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.flatspec.AnyFlatSpecLike @@ -65,6 +58,10 @@ class PauseSpec val logger = Logger("PauseSpecLogger") + val dataSource: PGSimpleDataSource = new PGSimpleDataSource() + val SQL_DIALECT: SQLDialect = SQLDialect.POSTGRES + val workflowContext: WorkflowContext = new WorkflowContext() + private val testUser: User = { val user = new User user.setUid(Integer.valueOf(1)) @@ -103,7 +100,7 @@ class PauseSpec } override protected def beforeEach(): Unit = { - val dslConfig = SqlServer.getInstance().context.configuration() + val dslConfig = DSL.using(dataSource, SQL_DIALECT).configuration() val userDao = new UserDao(dslConfig) val workflowDao = new WorkflowDao(dslConfig) val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) @@ -115,7 +112,7 @@ class PauseSpec } override protected def afterEach(): Unit = { - val dslConfig = SqlServer.getInstance().context.configuration() + val dslConfig = DSL.using(dataSource, SQL_DIALECT).configuration() val userDao = new UserDao(dslConfig) val workflowDao = new WorkflowDao(dslConfig) val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) @@ -131,11 +128,9 @@ class PauseSpec // These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run. // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") - SqlServer.initConnection( - StorageConfig.jdbcUrl, - StorageConfig.jdbcUsername, - StorageConfig.jdbcPassword - ) + dataSource.setUrl(StorageConfig.jdbcUrl) + dataSource.setUser(StorageConfig.jdbcUsername) + dataSource.setUser(StorageConfig.jdbcPassword) } override def afterAll(): Unit = { From 62f36a12583172c9d8adaa554c1f5ee7c8f26170 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Wed, 1 Oct 2025 11:33:22 -0700 Subject: [PATCH 13/28] wip. --- .../ics/amber/engine/e2e/DataProcessingSpec.scala | 14 ++++++++++++-- .../edu/uci/ics/amber/engine/e2e/PauseSpec.scala | 14 ++++++++++++-- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index e61338c2044..7cbebfbc2f9 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -41,8 +41,18 @@ import edu.uci.ics.amber.operator.TestOperators import edu.uci.ics.amber.operator.aggregate.AggregationFunction import edu.uci.ics.texera.dao.{MockTexeraDB, SqlServer} import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum -import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{UserDao, WorkflowDao, WorkflowExecutionsDao, WorkflowVersionDao} -import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{User, WorkflowExecutions, WorkflowVersion, Workflow => WorkflowPojo} +import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ + UserDao, + WorkflowDao, + WorkflowExecutionsDao, + WorkflowVersionDao +} +import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ + User, + WorkflowExecutions, + WorkflowVersion, + Workflow => WorkflowPojo +} import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId import edu.uci.ics.texera.workflow.LogicalLink import org.jooq.SQLDialect diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala index ac465406c7b..9231f32eb70 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala @@ -35,8 +35,18 @@ import edu.uci.ics.amber.engine.common.client.AmberClient import edu.uci.ics.amber.operator.{LogicalOp, TestOperators} import edu.uci.ics.texera.dao.{MockTexeraDB, SqlServer} import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum -import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{UserDao, WorkflowDao, WorkflowExecutionsDao, WorkflowVersionDao} -import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{User, WorkflowExecutions, WorkflowVersion, Workflow => WorkflowPojo} +import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ + UserDao, + WorkflowDao, + WorkflowExecutionsDao, + WorkflowVersionDao +} +import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ + User, + WorkflowExecutions, + WorkflowVersion, + Workflow => WorkflowPojo +} import edu.uci.ics.texera.workflow.LogicalLink import org.jooq.SQLDialect import org.jooq.impl.DSL From 25da9147b48095d51a0dbeef9d8f82673a9b582a Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Wed, 1 Oct 2025 11:41:51 -0700 Subject: [PATCH 14/28] wip. --- .../scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala | 2 +- .../src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index 7cbebfbc2f9..7ae9fd1f4de 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -145,7 +145,7 @@ class DataProcessingSpec Class.forName("org.postgresql.Driver") dataSource.setUrl(StorageConfig.jdbcUrl) dataSource.setUser(StorageConfig.jdbcUsername) - dataSource.setUser(StorageConfig.jdbcPassword) + dataSource.setPassword(StorageConfig.jdbcPassword) } override def afterAll(): Unit = { diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala index 9231f32eb70..a34eac561ed 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala @@ -140,7 +140,7 @@ class PauseSpec Class.forName("org.postgresql.Driver") dataSource.setUrl(StorageConfig.jdbcUrl) dataSource.setUser(StorageConfig.jdbcUsername) - dataSource.setUser(StorageConfig.jdbcPassword) + dataSource.setPassword(StorageConfig.jdbcPassword) } override def afterAll(): Unit = { From d62c9c31fff7ba84ecd97eef587777e8766f3ccb Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Wed, 1 Oct 2025 11:50:50 -0700 Subject: [PATCH 15/28] wip. --- core/amber/build.sbt | 2 ++ .../edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala | 6 ++++++ .../test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala | 6 ++++++ 3 files changed, 14 insertions(+) diff --git a/core/amber/build.sbt b/core/amber/build.sbt index d6d4dc73232..f0bf9c339b9 100644 --- a/core/amber/build.sbt +++ b/core/amber/build.sbt @@ -41,6 +41,8 @@ conflictManager := ConflictManager.latestRevision // ensuring no parallel execution of multiple tasks concurrentRestrictions in Global += Tags.limit(Tags.Test, 1) +Test / parallelExecution := false +Test / fork := true // add python as an additional source Compile / unmanagedSourceDirectories += baseDirectory.value / "src" / "main" / "python" diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index 7ae9fd1f4de..f3a80a8955b 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -146,6 +146,12 @@ class DataProcessingSpec dataSource.setUrl(StorageConfig.jdbcUrl) dataSource.setUser(StorageConfig.jdbcUsername) dataSource.setPassword(StorageConfig.jdbcPassword) + SqlServer.initConnection( + url = StorageConfig.jdbcUrl, + user = StorageConfig.jdbcUsername, + password = StorageConfig.jdbcPassword + ) + } override def afterAll(): Unit = { diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala index a34eac561ed..cb6c1956e3d 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala @@ -141,6 +141,12 @@ class PauseSpec dataSource.setUrl(StorageConfig.jdbcUrl) dataSource.setUser(StorageConfig.jdbcUsername) dataSource.setPassword(StorageConfig.jdbcPassword) + SqlServer.initConnection( + url = StorageConfig.jdbcUrl, + user = StorageConfig.jdbcUsername, + password = StorageConfig.jdbcPassword + ) + } override def afterAll(): Unit = { From ca66a169209a48d798da9e68477a51e4e0b90085 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Wed, 1 Oct 2025 16:04:24 -0700 Subject: [PATCH 16/28] revert. --- core/amber/build.sbt | 2 + .../amber/engine/e2e/DataProcessingSpec.scala | 38 ++++++++++++------ .../uci/ics/amber/engine/e2e/PauseSpec.scala | 39 +++++++++++++------ .../edu/uci/ics/texera/dao/SqlServer.scala | 3 ++ 4 files changed, 59 insertions(+), 23 deletions(-) diff --git a/core/amber/build.sbt b/core/amber/build.sbt index d6d4dc73232..f0bf9c339b9 100644 --- a/core/amber/build.sbt +++ b/core/amber/build.sbt @@ -41,6 +41,8 @@ conflictManager := ConflictManager.latestRevision // ensuring no parallel execution of multiple tasks concurrentRestrictions in Global += Tags.limit(Tags.Test, 1) +Test / parallelExecution := false +Test / fork := true // add python as an additional source Compile / unmanagedSourceDirectories += baseDirectory.value / "src" / "main" / "python" diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index 7ddbf3f6e7c..f3a80a8955b 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -25,6 +25,7 @@ import akka.util.Timeout import ch.vorburger.mariadb4j.DB import com.twitter.util.{Await, Duration, Promise} import edu.uci.ics.amber.clustering.SingleNodeListener +import edu.uci.ics.amber.config.StorageConfig import edu.uci.ics.amber.core.storage.DocumentFactory import edu.uci.ics.amber.core.storage.model.VirtualDocument import edu.uci.ics.amber.core.tuple.{AttributeType, Tuple} @@ -38,7 +39,7 @@ import edu.uci.ics.amber.engine.common.client.AmberClient import edu.uci.ics.amber.engine.e2e.TestUtils.buildWorkflow import edu.uci.ics.amber.operator.TestOperators import edu.uci.ics.amber.operator.aggregate.AggregationFunction -import edu.uci.ics.texera.dao.MockTexeraDB +import edu.uci.ics.texera.dao.{MockTexeraDB, SqlServer} import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ UserDao, @@ -54,6 +55,9 @@ import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ } import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId import edu.uci.ics.texera.workflow.LogicalLink +import org.jooq.SQLDialect +import org.jooq.impl.DSL +import org.postgresql.ds.PGSimpleDataSource import org.scalatest.flatspec.AnyFlatSpecLike import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} @@ -69,7 +73,8 @@ class DataProcessingSpec implicit val timeout: Timeout = Timeout(5.seconds) - var inMemoryMySQLInstance: Option[DB] = None + val dataSource: PGSimpleDataSource = new PGSimpleDataSource() + val SQL_DIALECT: SQLDialect = SQLDialect.POSTGRES val workflowContext: WorkflowContext = new WorkflowContext() private val testUser: User = { @@ -110,10 +115,11 @@ class DataProcessingSpec } override protected def beforeEach(): Unit = { - val userDao = new UserDao(getDSLContext.configuration()) - val workflowDao = new WorkflowDao(getDSLContext.configuration()) - val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) - val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) + val dslConfig = DSL.using(dataSource, SQL_DIALECT).configuration() + val userDao = new UserDao(dslConfig) + val workflowDao = new WorkflowDao(dslConfig) + val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) + val workflowVersionDao = new WorkflowVersionDao(dslConfig) userDao.insert(testUser) workflowDao.insert(testWorkflowEntry) workflowVersionDao.insert(testWorkflowVersionEntry) @@ -121,10 +127,11 @@ class DataProcessingSpec } override protected def afterEach(): Unit = { - val userDao = new UserDao(getDSLContext.configuration()) - val workflowDao = new WorkflowDao(getDSLContext.configuration()) - val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) - val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) + val dslConfig = DSL.using(dataSource, SQL_DIALECT).configuration() + val userDao = new UserDao(dslConfig) + val workflowDao = new WorkflowDao(dslConfig) + val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) + val workflowVersionDao = new WorkflowVersionDao(dslConfig) workflowExecutionsDao.deleteById(1) workflowVersionDao.deleteById(1) workflowDao.deleteById(1) @@ -136,11 +143,18 @@ class DataProcessingSpec // These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run. // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") - initializeDBAndReplaceDSLContext() + dataSource.setUrl(StorageConfig.jdbcUrl) + dataSource.setUser(StorageConfig.jdbcUsername) + dataSource.setPassword(StorageConfig.jdbcPassword) + SqlServer.initConnection( + url = StorageConfig.jdbcUrl, + user = StorageConfig.jdbcUsername, + password = StorageConfig.jdbcPassword + ) + } override def afterAll(): Unit = { - shutdownDB() TestKit.shutdownActorSystem(system) } diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala index f07b8d24591..cb6c1956e3d 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala @@ -25,6 +25,7 @@ import akka.util.Timeout import com.twitter.util.{Await, Promise} import com.typesafe.scalalogging.Logger import edu.uci.ics.amber.clustering.SingleNodeListener +import edu.uci.ics.amber.config.StorageConfig import edu.uci.ics.amber.core.workflow.{PortIdentity, WorkflowContext} import edu.uci.ics.amber.engine.architecture.controller.{ControllerConfig, ExecutionStateUpdate} import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.EmptyRequest @@ -32,7 +33,7 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregat import edu.uci.ics.amber.engine.common.AmberRuntime import edu.uci.ics.amber.engine.common.client.AmberClient import edu.uci.ics.amber.operator.{LogicalOp, TestOperators} -import edu.uci.ics.texera.dao.MockTexeraDB +import edu.uci.ics.texera.dao.{MockTexeraDB, SqlServer} import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ UserDao, @@ -47,6 +48,9 @@ import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ Workflow => WorkflowPojo } import edu.uci.ics.texera.workflow.LogicalLink +import org.jooq.SQLDialect +import org.jooq.impl.DSL +import org.postgresql.ds.PGSimpleDataSource import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.flatspec.AnyFlatSpecLike @@ -64,6 +68,10 @@ class PauseSpec val logger = Logger("PauseSpecLogger") + val dataSource: PGSimpleDataSource = new PGSimpleDataSource() + val SQL_DIALECT: SQLDialect = SQLDialect.POSTGRES + val workflowContext: WorkflowContext = new WorkflowContext() + private val testUser: User = { val user = new User user.setUid(Integer.valueOf(1)) @@ -102,10 +110,11 @@ class PauseSpec } override protected def beforeEach(): Unit = { - val userDao = new UserDao(getDSLContext.configuration()) - val workflowDao = new WorkflowDao(getDSLContext.configuration()) - val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) - val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) + val dslConfig = DSL.using(dataSource, SQL_DIALECT).configuration() + val userDao = new UserDao(dslConfig) + val workflowDao = new WorkflowDao(dslConfig) + val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) + val workflowVersionDao = new WorkflowVersionDao(dslConfig) userDao.insert(testUser) workflowDao.insert(testWorkflowEntry) workflowVersionDao.insert(testWorkflowVersionEntry) @@ -113,10 +122,11 @@ class PauseSpec } override protected def afterEach(): Unit = { - val userDao = new UserDao(getDSLContext.configuration()) - val workflowDao = new WorkflowDao(getDSLContext.configuration()) - val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) - val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) + val dslConfig = DSL.using(dataSource, SQL_DIALECT).configuration() + val userDao = new UserDao(dslConfig) + val workflowDao = new WorkflowDao(dslConfig) + val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) + val workflowVersionDao = new WorkflowVersionDao(dslConfig) workflowExecutionsDao.deleteById(1) workflowVersionDao.deleteById(1) workflowDao.deleteById(1) @@ -128,11 +138,18 @@ class PauseSpec // These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run. // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") - initializeDBAndReplaceDSLContext() + dataSource.setUrl(StorageConfig.jdbcUrl) + dataSource.setUser(StorageConfig.jdbcUsername) + dataSource.setPassword(StorageConfig.jdbcPassword) + SqlServer.initConnection( + url = StorageConfig.jdbcUrl, + user = StorageConfig.jdbcUsername, + password = StorageConfig.jdbcPassword + ) + } override def afterAll(): Unit = { - shutdownDB() TestKit.shutdownActorSystem(system) } diff --git a/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala b/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala index 18eb92b9f34..823de355d9e 100644 --- a/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala +++ b/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala @@ -53,8 +53,11 @@ object SqlServer { def initConnection(url: String, user: String, password: String): Unit = { if (instance.isEmpty) { + println(s"Initiating connection: $url") val server = new SqlServer(url, user, password) instance = Some(server) + } else { + println(s"Reusing connection: ${instance.get.dataSource.getURL}") } } From 8ce30f46cc750ed162967f0cf495aca7ca18dc7c Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Wed, 1 Oct 2025 16:06:52 -0700 Subject: [PATCH 17/28] add await and remove lazy val. --- .../user/workflow/WorkflowExecutionsResource.scala | 6 ++---- .../edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala | 5 +++-- .../test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala | 2 ++ 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index c3b63a2df81..c8890841521 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -51,10 +51,8 @@ import scala.collection.mutable import scala.jdk.CollectionConverters._ object WorkflowExecutionsResource { - final private lazy val context = SqlServer - .getInstance() - .createDSLContext() - final private lazy val executionsDao = new WorkflowExecutionsDao(context.configuration) + private def context: DSLContext = SqlServer.getInstance().createDSLContext() + private def executionsDao = new WorkflowExecutionsDao(context.configuration) private def getExecutionById(eId: Integer): WorkflowExecutions = { executionsDao.fetchOneByEid(eId) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index f3a80a8955b..d5fa17eea0f 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -22,7 +22,6 @@ package edu.uci.ics.amber.engine.e2e import akka.actor.{ActorSystem, Props} import akka.testkit.{ImplicitSender, TestKit} import akka.util.Timeout -import ch.vorburger.mariadb4j.DB import com.twitter.util.{Await, Duration, Promise} import edu.uci.ics.amber.clustering.SingleNodeListener import edu.uci.ics.amber.config.StorageConfig @@ -39,7 +38,6 @@ import edu.uci.ics.amber.engine.common.client.AmberClient import edu.uci.ics.amber.engine.e2e.TestUtils.buildWorkflow import edu.uci.ics.amber.operator.TestOperators import edu.uci.ics.amber.operator.aggregate.AggregationFunction -import edu.uci.ics.texera.dao.{MockTexeraDB, SqlServer} import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ UserDao, @@ -53,6 +51,7 @@ import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ WorkflowVersion, Workflow => WorkflowPojo } +import edu.uci.ics.texera.dao.{MockTexeraDB, SqlServer} import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId import edu.uci.ics.texera.workflow.LogicalLink import org.jooq.SQLDialect @@ -61,6 +60,7 @@ import org.postgresql.ds.PGSimpleDataSource import org.scalatest.flatspec.AnyFlatSpecLike import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import scala.concurrent.Await import scala.concurrent.duration.DurationInt class DataProcessingSpec @@ -156,6 +156,7 @@ class DataProcessingSpec override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) + Await.ready(system.whenTerminated, 1.seconds) } def executeWorkflow(workflow: Workflow): Map[OperatorIdentity, List[Tuple]] = { diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala index cb6c1956e3d..ed594b64ead 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala @@ -55,6 +55,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.flatspec.AnyFlatSpecLike import scala.concurrent.duration._ +import scala.concurrent.Await class PauseSpec extends TestKit(ActorSystem("PauseSpec", AmberRuntime.akkaConfig)) @@ -151,6 +152,7 @@ class PauseSpec override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) + Await.ready(system.whenTerminated, 1.seconds) } def shouldPause( From c5ea3d317035bbc1d224a3a3d4329c6506f60de3 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Wed, 1 Oct 2025 16:17:13 -0700 Subject: [PATCH 18/28] revert. --- core/amber/build.sbt | 2 - .../amber/engine/e2e/DataProcessingSpec.scala | 130 ++++-------------- .../uci/ics/amber/engine/e2e/PauseSpec.scala | 100 +------------- .../edu/uci/ics/texera/dao/SqlServer.scala | 3 - 4 files changed, 26 insertions(+), 209 deletions(-) diff --git a/core/amber/build.sbt b/core/amber/build.sbt index f0bf9c339b9..d6d4dc73232 100644 --- a/core/amber/build.sbt +++ b/core/amber/build.sbt @@ -41,8 +41,6 @@ conflictManager := ConflictManager.latestRevision // ensuring no parallel execution of multiple tasks concurrentRestrictions in Global += Tags.limit(Tags.Test, 1) -Test / parallelExecution := false -Test / fork := true // add python as an additional source Compile / unmanagedSourceDirectories += baseDirectory.value / "src" / "main" / "python" diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index d5fa17eea0f..2cf6e53febd 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -22,14 +22,15 @@ package edu.uci.ics.amber.engine.e2e import akka.actor.{ActorSystem, Props} import akka.testkit.{ImplicitSender, TestKit} import akka.util.Timeout +import ch.vorburger.mariadb4j.DB import com.twitter.util.{Await, Duration, Promise} import edu.uci.ics.amber.clustering.SingleNodeListener -import edu.uci.ics.amber.config.StorageConfig -import edu.uci.ics.amber.core.storage.DocumentFactory import edu.uci.ics.amber.core.storage.model.VirtualDocument +import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping +import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory} import edu.uci.ics.amber.core.tuple.{AttributeType, Tuple} -import edu.uci.ics.amber.core.virtualidentity.OperatorIdentity -import edu.uci.ics.amber.core.workflow.{PortIdentity, WorkflowContext} +import edu.uci.ics.amber.core.virtualidentity.{OperatorIdentity, PhysicalOpIdentity} +import edu.uci.ics.amber.core.workflow.{GlobalPortIdentity, PortIdentity, WorkflowContext} import edu.uci.ics.amber.engine.architecture.controller._ import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.EmptyRequest import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED @@ -38,29 +39,10 @@ import edu.uci.ics.amber.engine.common.client.AmberClient import edu.uci.ics.amber.engine.e2e.TestUtils.buildWorkflow import edu.uci.ics.amber.operator.TestOperators import edu.uci.ics.amber.operator.aggregate.AggregationFunction -import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum -import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ - UserDao, - WorkflowDao, - WorkflowExecutionsDao, - WorkflowVersionDao -} -import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ - User, - WorkflowExecutions, - WorkflowVersion, - Workflow => WorkflowPojo -} -import edu.uci.ics.texera.dao.{MockTexeraDB, SqlServer} -import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId import edu.uci.ics.texera.workflow.LogicalLink -import org.jooq.SQLDialect -import org.jooq.impl.DSL -import org.postgresql.ds.PGSimpleDataSource import org.scalatest.flatspec.AnyFlatSpecLike import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} -import scala.concurrent.Await import scala.concurrent.duration.DurationInt class DataProcessingSpec @@ -68,95 +50,22 @@ class DataProcessingSpec with ImplicitSender with AnyFlatSpecLike with BeforeAndAfterAll - with BeforeAndAfterEach - with MockTexeraDB { + with BeforeAndAfterEach { implicit val timeout: Timeout = Timeout(5.seconds) - val dataSource: PGSimpleDataSource = new PGSimpleDataSource() - val SQL_DIALECT: SQLDialect = SQLDialect.POSTGRES + var inMemoryMySQLInstance: Option[DB] = None val workflowContext: WorkflowContext = new WorkflowContext() - private val testUser: User = { - val user = new User - user.setUid(Integer.valueOf(1)) - user.setName("test_user") - user.setRole(UserRoleEnum.ADMIN) - user.setPassword("123") - user.setEmail("test_user@test.com") - user - } - - private val testWorkflowEntry: WorkflowPojo = { - val workflow = new WorkflowPojo - workflow.setName("test workflow") - workflow.setWid(Integer.valueOf(1)) - workflow.setContent("test workflow content") - workflow.setDescription("test description") - workflow - } - - private val testWorkflowVersionEntry: WorkflowVersion = { - val workflowVersion = new WorkflowVersion - workflowVersion.setWid(Integer.valueOf(1)) - workflowVersion.setVid(Integer.valueOf(1)) - workflowVersion.setContent("test version content") - workflowVersion - } - - private val testWorkflowExecutionEntry: WorkflowExecutions = { - val workflowExecution = new WorkflowExecutions - workflowExecution.setEid(Integer.valueOf(1)) - workflowExecution.setVid(Integer.valueOf(1)) - workflowExecution.setUid(Integer.valueOf(1)) - workflowExecution.setStatus(3.toByte) - workflowExecution.setEnvironmentVersion("test engine") - workflowExecution - } - - override protected def beforeEach(): Unit = { - val dslConfig = DSL.using(dataSource, SQL_DIALECT).configuration() - val userDao = new UserDao(dslConfig) - val workflowDao = new WorkflowDao(dslConfig) - val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) - val workflowVersionDao = new WorkflowVersionDao(dslConfig) - userDao.insert(testUser) - workflowDao.insert(testWorkflowEntry) - workflowVersionDao.insert(testWorkflowVersionEntry) - workflowExecutionsDao.insert(testWorkflowExecutionEntry) - } - - override protected def afterEach(): Unit = { - val dslConfig = DSL.using(dataSource, SQL_DIALECT).configuration() - val userDao = new UserDao(dslConfig) - val workflowDao = new WorkflowDao(dslConfig) - val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) - val workflowVersionDao = new WorkflowVersionDao(dslConfig) - workflowExecutionsDao.deleteById(1) - workflowVersionDao.deleteById(1) - workflowDao.deleteById(1) - userDao.deleteById(1) - } - override def beforeAll(): Unit = { system.actorOf(Props[SingleNodeListener](), "cluster-info") // These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run. // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") - dataSource.setUrl(StorageConfig.jdbcUrl) - dataSource.setUser(StorageConfig.jdbcUsername) - dataSource.setPassword(StorageConfig.jdbcPassword) - SqlServer.initConnection( - url = StorageConfig.jdbcUrl, - user = StorageConfig.jdbcUsername, - password = StorageConfig.jdbcPassword - ) - } override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) - Await.ready(system.whenTerminated, 1.seconds) } def executeWorkflow(workflow: Workflow): Map[OperatorIdentity, List[Tuple]] = { @@ -179,21 +88,30 @@ class DataProcessingSpec if (evt.state == COMPLETED) { results = workflow.logicalPlan.getTerminalOperatorIds .filter(terminalOpId => { - val uri = getResultUriByLogicalPortId( + val uri = VFSURIFactory.createResultURI( + workflowContext.workflowId, workflowContext.executionId, - terminalOpId, - PortIdentity() + GlobalPortIdentity( + PhysicalOpIdentity(logicalOpId = terminalOpId, layerName = "main"), + PortIdentity() + ) ) - uri.nonEmpty + // expecting the first output port only. + ExecutionResourcesMapping + .getResourceURIs(workflowContext.executionId) + .contains(uri) }) .map(terminalOpId => { //TODO: remove the delay after fixing the issue of reporting "completed" status too early. Thread.sleep(1000) - val uri = getResultUriByLogicalPortId( + val uri = VFSURIFactory.createResultURI( + workflowContext.workflowId, workflowContext.executionId, - terminalOpId, - PortIdentity() - ).get + GlobalPortIdentity( + PhysicalOpIdentity(logicalOpId = terminalOpId, layerName = "main"), + PortIdentity() + ) + ) terminalOpId -> DocumentFactory .openDocument(uri) ._1 diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala index ed594b64ead..bd4164798de 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala @@ -25,7 +25,6 @@ import akka.util.Timeout import com.twitter.util.{Await, Promise} import com.typesafe.scalalogging.Logger import edu.uci.ics.amber.clustering.SingleNodeListener -import edu.uci.ics.amber.config.StorageConfig import edu.uci.ics.amber.core.workflow.{PortIdentity, WorkflowContext} import edu.uci.ics.amber.engine.architecture.controller.{ControllerConfig, ExecutionStateUpdate} import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.EmptyRequest @@ -33,126 +32,31 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregat import edu.uci.ics.amber.engine.common.AmberRuntime import edu.uci.ics.amber.engine.common.client.AmberClient import edu.uci.ics.amber.operator.{LogicalOp, TestOperators} -import edu.uci.ics.texera.dao.{MockTexeraDB, SqlServer} -import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum -import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ - UserDao, - WorkflowDao, - WorkflowExecutionsDao, - WorkflowVersionDao -} -import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ - User, - WorkflowExecutions, - WorkflowVersion, - Workflow => WorkflowPojo -} import edu.uci.ics.texera.workflow.LogicalLink -import org.jooq.SQLDialect -import org.jooq.impl.DSL -import org.postgresql.ds.PGSimpleDataSource -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.BeforeAndAfterAll import org.scalatest.flatspec.AnyFlatSpecLike import scala.concurrent.duration._ -import scala.concurrent.Await class PauseSpec extends TestKit(ActorSystem("PauseSpec", AmberRuntime.akkaConfig)) with ImplicitSender with AnyFlatSpecLike - with BeforeAndAfterAll - with BeforeAndAfterEach - with MockTexeraDB { + with BeforeAndAfterAll { implicit val timeout: Timeout = Timeout(5.seconds) val logger = Logger("PauseSpecLogger") - val dataSource: PGSimpleDataSource = new PGSimpleDataSource() - val SQL_DIALECT: SQLDialect = SQLDialect.POSTGRES - val workflowContext: WorkflowContext = new WorkflowContext() - - private val testUser: User = { - val user = new User - user.setUid(Integer.valueOf(1)) - user.setName("test_user") - user.setRole(UserRoleEnum.ADMIN) - user.setPassword("123") - user.setEmail("test_user@test.com") - user - } - - private val testWorkflowEntry: WorkflowPojo = { - val workflow = new WorkflowPojo - workflow.setName("test workflow") - workflow.setWid(Integer.valueOf(1)) - workflow.setContent("test workflow content") - workflow.setDescription("test description") - workflow - } - - private val testWorkflowVersionEntry: WorkflowVersion = { - val workflowVersion = new WorkflowVersion - workflowVersion.setWid(Integer.valueOf(1)) - workflowVersion.setVid(Integer.valueOf(1)) - workflowVersion.setContent("test version content") - workflowVersion - } - - private val testWorkflowExecutionEntry: WorkflowExecutions = { - val workflowExecution = new WorkflowExecutions - workflowExecution.setEid(Integer.valueOf(1)) - workflowExecution.setVid(Integer.valueOf(1)) - workflowExecution.setUid(Integer.valueOf(1)) - workflowExecution.setStatus(3.toByte) - workflowExecution.setEnvironmentVersion("test engine") - workflowExecution - } - - override protected def beforeEach(): Unit = { - val dslConfig = DSL.using(dataSource, SQL_DIALECT).configuration() - val userDao = new UserDao(dslConfig) - val workflowDao = new WorkflowDao(dslConfig) - val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) - val workflowVersionDao = new WorkflowVersionDao(dslConfig) - userDao.insert(testUser) - workflowDao.insert(testWorkflowEntry) - workflowVersionDao.insert(testWorkflowVersionEntry) - workflowExecutionsDao.insert(testWorkflowExecutionEntry) - } - - override protected def afterEach(): Unit = { - val dslConfig = DSL.using(dataSource, SQL_DIALECT).configuration() - val userDao = new UserDao(dslConfig) - val workflowDao = new WorkflowDao(dslConfig) - val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) - val workflowVersionDao = new WorkflowVersionDao(dslConfig) - workflowExecutionsDao.deleteById(1) - workflowVersionDao.deleteById(1) - workflowDao.deleteById(1) - userDao.deleteById(1) - } - override def beforeAll(): Unit = { system.actorOf(Props[SingleNodeListener](), "cluster-info") // These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run. // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") - dataSource.setUrl(StorageConfig.jdbcUrl) - dataSource.setUser(StorageConfig.jdbcUsername) - dataSource.setPassword(StorageConfig.jdbcPassword) - SqlServer.initConnection( - url = StorageConfig.jdbcUrl, - user = StorageConfig.jdbcUsername, - password = StorageConfig.jdbcPassword - ) - } override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) - Await.ready(system.whenTerminated, 1.seconds) } def shouldPause( diff --git a/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala b/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala index 823de355d9e..18eb92b9f34 100644 --- a/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala +++ b/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala @@ -53,11 +53,8 @@ object SqlServer { def initConnection(url: String, user: String, password: String): Unit = { if (instance.isEmpty) { - println(s"Initiating connection: $url") val server = new SqlServer(url, user, password) instance = Some(server) - } else { - println(s"Reusing connection: ${instance.get.dataSource.getURL}") } } From 633afe29a6e2a668e6c08a1e5893277e9561fd90 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Wed, 1 Oct 2025 16:21:48 -0700 Subject: [PATCH 19/28] add await. --- .../amber/engine/e2e/DataProcessingSpec.scala | 113 ++++++++++++++---- .../uci/ics/amber/engine/e2e/PauseSpec.scala | 83 ++++++++++++- 2 files changed, 172 insertions(+), 24 deletions(-) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index 2cf6e53febd..6d4ddc66b95 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -25,12 +25,11 @@ import akka.util.Timeout import ch.vorburger.mariadb4j.DB import com.twitter.util.{Await, Duration, Promise} import edu.uci.ics.amber.clustering.SingleNodeListener +import edu.uci.ics.amber.core.storage.DocumentFactory import edu.uci.ics.amber.core.storage.model.VirtualDocument -import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping -import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory} import edu.uci.ics.amber.core.tuple.{AttributeType, Tuple} -import edu.uci.ics.amber.core.virtualidentity.{OperatorIdentity, PhysicalOpIdentity} -import edu.uci.ics.amber.core.workflow.{GlobalPortIdentity, PortIdentity, WorkflowContext} +import edu.uci.ics.amber.core.virtualidentity.OperatorIdentity +import edu.uci.ics.amber.core.workflow.{PortIdentity, WorkflowContext} import edu.uci.ics.amber.engine.architecture.controller._ import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.EmptyRequest import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED @@ -39,6 +38,21 @@ import edu.uci.ics.amber.engine.common.client.AmberClient import edu.uci.ics.amber.engine.e2e.TestUtils.buildWorkflow import edu.uci.ics.amber.operator.TestOperators import edu.uci.ics.amber.operator.aggregate.AggregationFunction +import edu.uci.ics.texera.dao.MockTexeraDB +import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum +import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ + UserDao, + WorkflowDao, + WorkflowExecutionsDao, + WorkflowVersionDao +} +import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ + User, + WorkflowExecutions, + WorkflowVersion, + Workflow => WorkflowPojo +} +import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId import edu.uci.ics.texera.workflow.LogicalLink import org.scalatest.flatspec.AnyFlatSpecLike import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} @@ -50,22 +64,86 @@ class DataProcessingSpec with ImplicitSender with AnyFlatSpecLike with BeforeAndAfterAll - with BeforeAndAfterEach { + with BeforeAndAfterEach + with MockTexeraDB { implicit val timeout: Timeout = Timeout(5.seconds) var inMemoryMySQLInstance: Option[DB] = None val workflowContext: WorkflowContext = new WorkflowContext() + private val testUser: User = { + val user = new User + user.setUid(Integer.valueOf(1)) + user.setName("test_user") + user.setRole(UserRoleEnum.ADMIN) + user.setPassword("123") + user.setEmail("test_user@test.com") + user + } + + private val testWorkflowEntry: WorkflowPojo = { + val workflow = new WorkflowPojo + workflow.setName("test workflow") + workflow.setWid(Integer.valueOf(1)) + workflow.setContent("test workflow content") + workflow.setDescription("test description") + workflow + } + + private val testWorkflowVersionEntry: WorkflowVersion = { + val workflowVersion = new WorkflowVersion + workflowVersion.setWid(Integer.valueOf(1)) + workflowVersion.setVid(Integer.valueOf(1)) + workflowVersion.setContent("test version content") + workflowVersion + } + + private val testWorkflowExecutionEntry: WorkflowExecutions = { + val workflowExecution = new WorkflowExecutions + workflowExecution.setEid(Integer.valueOf(1)) + workflowExecution.setVid(Integer.valueOf(1)) + workflowExecution.setUid(Integer.valueOf(1)) + workflowExecution.setStatus(3.toByte) + workflowExecution.setEnvironmentVersion("test engine") + workflowExecution + } + + override protected def beforeEach(): Unit = { + val userDao = new UserDao(getDSLContext.configuration()) + val workflowDao = new WorkflowDao(getDSLContext.configuration()) + val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) + val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) + userDao.insert(testUser) + workflowDao.insert(testWorkflowEntry) + workflowVersionDao.insert(testWorkflowVersionEntry) + workflowExecutionsDao.insert(testWorkflowExecutionEntry) + } + + override protected def afterEach(): Unit = { + val userDao = new UserDao(getDSLContext.configuration()) + val workflowDao = new WorkflowDao(getDSLContext.configuration()) + val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) + val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) + workflowExecutionsDao.deleteById(1) + workflowVersionDao.deleteById(1) + workflowDao.deleteById(1) + userDao.deleteById(1) + } + override def beforeAll(): Unit = { system.actorOf(Props[SingleNodeListener](), "cluster-info") // These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run. // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") + initializeDBAndReplaceDSLContext() } override def afterAll(): Unit = { + shutdownDB() TestKit.shutdownActorSystem(system) + import scala.concurrent.Await + Await.ready(system.whenTerminated, 1.seconds) } def executeWorkflow(workflow: Workflow): Map[OperatorIdentity, List[Tuple]] = { @@ -88,30 +166,21 @@ class DataProcessingSpec if (evt.state == COMPLETED) { results = workflow.logicalPlan.getTerminalOperatorIds .filter(terminalOpId => { - val uri = VFSURIFactory.createResultURI( - workflowContext.workflowId, + val uri = getResultUriByLogicalPortId( workflowContext.executionId, - GlobalPortIdentity( - PhysicalOpIdentity(logicalOpId = terminalOpId, layerName = "main"), - PortIdentity() - ) + terminalOpId, + PortIdentity() ) - // expecting the first output port only. - ExecutionResourcesMapping - .getResourceURIs(workflowContext.executionId) - .contains(uri) + uri.nonEmpty }) .map(terminalOpId => { //TODO: remove the delay after fixing the issue of reporting "completed" status too early. Thread.sleep(1000) - val uri = VFSURIFactory.createResultURI( - workflowContext.workflowId, + val uri = getResultUriByLogicalPortId( workflowContext.executionId, - GlobalPortIdentity( - PhysicalOpIdentity(logicalOpId = terminalOpId, layerName = "main"), - PortIdentity() - ) - ) + terminalOpId, + PortIdentity() + ).get terminalOpId -> DocumentFactory .openDocument(uri) ._1 diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala index bd4164798de..5fa985c4815 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala @@ -32,8 +32,22 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregat import edu.uci.ics.amber.engine.common.AmberRuntime import edu.uci.ics.amber.engine.common.client.AmberClient import edu.uci.ics.amber.operator.{LogicalOp, TestOperators} +import edu.uci.ics.texera.dao.MockTexeraDB +import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum +import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ + UserDao, + WorkflowDao, + WorkflowExecutionsDao, + WorkflowVersionDao +} +import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ + User, + WorkflowExecutions, + WorkflowVersion, + Workflow => WorkflowPojo +} import edu.uci.ics.texera.workflow.LogicalLink -import org.scalatest.BeforeAndAfterAll +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.flatspec.AnyFlatSpecLike import scala.concurrent.duration._ @@ -42,21 +56,86 @@ class PauseSpec extends TestKit(ActorSystem("PauseSpec", AmberRuntime.akkaConfig)) with ImplicitSender with AnyFlatSpecLike - with BeforeAndAfterAll { + with BeforeAndAfterAll + with BeforeAndAfterEach + with MockTexeraDB { implicit val timeout: Timeout = Timeout(5.seconds) val logger = Logger("PauseSpecLogger") + private val testUser: User = { + val user = new User + user.setUid(Integer.valueOf(1)) + user.setName("test_user") + user.setRole(UserRoleEnum.ADMIN) + user.setPassword("123") + user.setEmail("test_user@test.com") + user + } + + private val testWorkflowEntry: WorkflowPojo = { + val workflow = new WorkflowPojo + workflow.setName("test workflow") + workflow.setWid(Integer.valueOf(1)) + workflow.setContent("test workflow content") + workflow.setDescription("test description") + workflow + } + + private val testWorkflowVersionEntry: WorkflowVersion = { + val workflowVersion = new WorkflowVersion + workflowVersion.setWid(Integer.valueOf(1)) + workflowVersion.setVid(Integer.valueOf(1)) + workflowVersion.setContent("test version content") + workflowVersion + } + + private val testWorkflowExecutionEntry: WorkflowExecutions = { + val workflowExecution = new WorkflowExecutions + workflowExecution.setEid(Integer.valueOf(1)) + workflowExecution.setVid(Integer.valueOf(1)) + workflowExecution.setUid(Integer.valueOf(1)) + workflowExecution.setStatus(3.toByte) + workflowExecution.setEnvironmentVersion("test engine") + workflowExecution + } + + override protected def beforeEach(): Unit = { + val userDao = new UserDao(getDSLContext.configuration()) + val workflowDao = new WorkflowDao(getDSLContext.configuration()) + val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) + val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) + userDao.insert(testUser) + workflowDao.insert(testWorkflowEntry) + workflowVersionDao.insert(testWorkflowVersionEntry) + workflowExecutionsDao.insert(testWorkflowExecutionEntry) + } + + override protected def afterEach(): Unit = { + val userDao = new UserDao(getDSLContext.configuration()) + val workflowDao = new WorkflowDao(getDSLContext.configuration()) + val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) + val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) + workflowExecutionsDao.deleteById(1) + workflowVersionDao.deleteById(1) + workflowDao.deleteById(1) + userDao.deleteById(1) + } + override def beforeAll(): Unit = { system.actorOf(Props[SingleNodeListener](), "cluster-info") // These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run. // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") + initializeDBAndReplaceDSLContext() } override def afterAll(): Unit = { + shutdownDB() TestKit.shutdownActorSystem(system) + import scala.concurrent.Await + Await.ready(system.whenTerminated, 1.seconds) } def shouldPause( From 440e00dc4c10b1cd71a6454a1603c6432ca471f6 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Wed, 1 Oct 2025 17:34:23 -0700 Subject: [PATCH 20/28] remove await. --- .../scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala | 2 -- .../src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala | 2 -- 2 files changed, 4 deletions(-) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index 6d4ddc66b95..7ddbf3f6e7c 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -142,8 +142,6 @@ class DataProcessingSpec override def afterAll(): Unit = { shutdownDB() TestKit.shutdownActorSystem(system) - import scala.concurrent.Await - Await.ready(system.whenTerminated, 1.seconds) } def executeWorkflow(workflow: Workflow): Map[OperatorIdentity, List[Tuple]] = { diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala index 5fa985c4815..f07b8d24591 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala @@ -134,8 +134,6 @@ class PauseSpec override def afterAll(): Unit = { shutdownDB() TestKit.shutdownActorSystem(system) - import scala.concurrent.Await - Await.ready(system.whenTerminated, 1.seconds) } def shouldPause( From fb1e9d82ddb8763b584f3cc896a9dea076cf0d35 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Wed, 1 Oct 2025 19:34:42 -0700 Subject: [PATCH 21/28] test using external database --- .../workflow/WorkflowExecutionsResource.scala | 6 ++-- .../amber/engine/e2e/DataProcessingSpec.scala | 28 +++++++++++-------- .../edu/uci/ics/texera/dao/SqlServer.scala | 6 ++-- 3 files changed, 23 insertions(+), 17 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index c8890841521..c3b63a2df81 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -51,8 +51,10 @@ import scala.collection.mutable import scala.jdk.CollectionConverters._ object WorkflowExecutionsResource { - private def context: DSLContext = SqlServer.getInstance().createDSLContext() - private def executionsDao = new WorkflowExecutionsDao(context.configuration) + final private lazy val context = SqlServer + .getInstance() + .createDSLContext() + final private lazy val executionsDao = new WorkflowExecutionsDao(context.configuration) private def getExecutionById(eId: Integer): WorkflowExecutions = { executionsDao.fetchOneByEid(eId) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index 7ddbf3f6e7c..8193dfe7884 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -25,6 +25,7 @@ import akka.util.Timeout import ch.vorburger.mariadb4j.DB import com.twitter.util.{Await, Duration, Promise} import edu.uci.ics.amber.clustering.SingleNodeListener +import edu.uci.ics.amber.config.StorageConfig import edu.uci.ics.amber.core.storage.DocumentFactory import edu.uci.ics.amber.core.storage.model.VirtualDocument import edu.uci.ics.amber.core.tuple.{AttributeType, Tuple} @@ -38,7 +39,7 @@ import edu.uci.ics.amber.engine.common.client.AmberClient import edu.uci.ics.amber.engine.e2e.TestUtils.buildWorkflow import edu.uci.ics.amber.operator.TestOperators import edu.uci.ics.amber.operator.aggregate.AggregationFunction -import edu.uci.ics.texera.dao.MockTexeraDB +import edu.uci.ics.texera.dao.{MockTexeraDB, SqlServer} import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ UserDao, @@ -110,10 +111,11 @@ class DataProcessingSpec } override protected def beforeEach(): Unit = { - val userDao = new UserDao(getDSLContext.configuration()) - val workflowDao = new WorkflowDao(getDSLContext.configuration()) - val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) - val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) + val dslConfig = SqlServer.getInstance().context.configuration() + val userDao = new UserDao(dslConfig) + val workflowDao = new WorkflowDao(dslConfig) + val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) + val workflowVersionDao = new WorkflowVersionDao(dslConfig) userDao.insert(testUser) workflowDao.insert(testWorkflowEntry) workflowVersionDao.insert(testWorkflowVersionEntry) @@ -121,10 +123,11 @@ class DataProcessingSpec } override protected def afterEach(): Unit = { - val userDao = new UserDao(getDSLContext.configuration()) - val workflowDao = new WorkflowDao(getDSLContext.configuration()) - val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) - val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) + val dslConfig = SqlServer.getInstance().context.configuration() + val userDao = new UserDao(dslConfig) + val workflowDao = new WorkflowDao(dslConfig) + val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) + val workflowVersionDao = new WorkflowVersionDao(dslConfig) workflowExecutionsDao.deleteById(1) workflowVersionDao.deleteById(1) workflowDao.deleteById(1) @@ -136,11 +139,14 @@ class DataProcessingSpec // These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run. // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") - initializeDBAndReplaceDSLContext() + SqlServer.initConnection( + StorageConfig.jdbcUrl, + StorageConfig.jdbcUsername, + StorageConfig.jdbcPassword + ) } override def afterAll(): Unit = { - shutdownDB() TestKit.shutdownActorSystem(system) } diff --git a/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala b/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala index 18eb92b9f34..c08ac933985 100644 --- a/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala +++ b/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala @@ -52,10 +52,8 @@ object SqlServer { private var instance: Option[SqlServer] = None def initConnection(url: String, user: String, password: String): Unit = { - if (instance.isEmpty) { - val server = new SqlServer(url, user, password) - instance = Some(server) - } + val server = new SqlServer(url, user, password) + instance = Some(server) } def getInstance(): SqlServer = { From b48fd427f9a79faac0aa49882e3a4edd375c195c Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Wed, 1 Oct 2025 19:51:17 -0700 Subject: [PATCH 22/28] test using external database --- .../uci/ics/amber/engine/e2e/PauseSpec.scala | 42 +++++++++---------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala index f07b8d24591..9f0a136de70 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala @@ -25,6 +25,7 @@ import akka.util.Timeout import com.twitter.util.{Await, Promise} import com.typesafe.scalalogging.Logger import edu.uci.ics.amber.clustering.SingleNodeListener +import edu.uci.ics.amber.config.StorageConfig import edu.uci.ics.amber.core.workflow.{PortIdentity, WorkflowContext} import edu.uci.ics.amber.engine.architecture.controller.{ControllerConfig, ExecutionStateUpdate} import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.EmptyRequest @@ -32,20 +33,10 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregat import edu.uci.ics.amber.engine.common.AmberRuntime import edu.uci.ics.amber.engine.common.client.AmberClient import edu.uci.ics.amber.operator.{LogicalOp, TestOperators} -import edu.uci.ics.texera.dao.MockTexeraDB +import edu.uci.ics.texera.dao.{MockTexeraDB, SqlServer} import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum -import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ - UserDao, - WorkflowDao, - WorkflowExecutionsDao, - WorkflowVersionDao -} -import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ - User, - WorkflowExecutions, - WorkflowVersion, - Workflow => WorkflowPojo -} +import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{UserDao, WorkflowDao, WorkflowExecutionsDao, WorkflowVersionDao} +import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{User, WorkflowExecutions, WorkflowVersion, Workflow => WorkflowPojo} import edu.uci.ics.texera.workflow.LogicalLink import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.flatspec.AnyFlatSpecLike @@ -102,10 +93,11 @@ class PauseSpec } override protected def beforeEach(): Unit = { - val userDao = new UserDao(getDSLContext.configuration()) - val workflowDao = new WorkflowDao(getDSLContext.configuration()) - val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) - val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) + val dslConfig = SqlServer.getInstance().context.configuration() + val userDao = new UserDao(dslConfig) + val workflowDao = new WorkflowDao(dslConfig) + val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) + val workflowVersionDao = new WorkflowVersionDao(dslConfig) userDao.insert(testUser) workflowDao.insert(testWorkflowEntry) workflowVersionDao.insert(testWorkflowVersionEntry) @@ -113,10 +105,11 @@ class PauseSpec } override protected def afterEach(): Unit = { - val userDao = new UserDao(getDSLContext.configuration()) - val workflowDao = new WorkflowDao(getDSLContext.configuration()) - val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) - val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) + val dslConfig = SqlServer.getInstance().context.configuration() + val userDao = new UserDao(dslConfig) + val workflowDao = new WorkflowDao(dslConfig) + val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) + val workflowVersionDao = new WorkflowVersionDao(dslConfig) workflowExecutionsDao.deleteById(1) workflowVersionDao.deleteById(1) workflowDao.deleteById(1) @@ -128,11 +121,14 @@ class PauseSpec // These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run. // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") - initializeDBAndReplaceDSLContext() + SqlServer.initConnection( + StorageConfig.jdbcUrl, + StorageConfig.jdbcUsername, + StorageConfig.jdbcPassword + ) } override def afterAll(): Unit = { - shutdownDB() TestKit.shutdownActorSystem(system) } From f897e9562b4de96277254ca8534dcadbf538e76c Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Wed, 1 Oct 2025 20:18:32 -0700 Subject: [PATCH 23/28] test using external database --- .../edu/uci/ics/amber/engine/e2e/PauseSpec.scala | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala index 9f0a136de70..383fe0b3100 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala @@ -35,8 +35,18 @@ import edu.uci.ics.amber.engine.common.client.AmberClient import edu.uci.ics.amber.operator.{LogicalOp, TestOperators} import edu.uci.ics.texera.dao.{MockTexeraDB, SqlServer} import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum -import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{UserDao, WorkflowDao, WorkflowExecutionsDao, WorkflowVersionDao} -import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{User, WorkflowExecutions, WorkflowVersion, Workflow => WorkflowPojo} +import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ + UserDao, + WorkflowDao, + WorkflowExecutionsDao, + WorkflowVersionDao +} +import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ + User, + WorkflowExecutions, + WorkflowVersion, + Workflow => WorkflowPojo +} import edu.uci.ics.texera.workflow.LogicalLink import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.flatspec.AnyFlatSpecLike From 996f436b097c9a700fa29b440748d7f56e80ff50 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Thu, 2 Oct 2025 20:44:30 -0700 Subject: [PATCH 24/28] wip. --- .github/workflows/github-action-build.yml | 4 ++++ .../ics/amber/engine/e2e/DataProcessingSpec.scala | 2 +- .../edu/uci/ics/amber/engine/e2e/PauseSpec.scala | 2 +- core/config/src/main/resources/storage.conf | 3 +++ .../edu/uci/ics/amber/config/StorageConfig.scala | 1 + core/scripts/sql/texera_ddl.sql | 12 +++++++++--- 6 files changed, 19 insertions(+), 5 deletions(-) diff --git a/.github/workflows/github-action-build.yml b/.github/workflows/github-action-build.yml index 14df23456a4..1d64b21a60a 100644 --- a/.github/workflows/github-action-build.yml +++ b/.github/workflows/github-action-build.yml @@ -112,6 +112,10 @@ jobs: run: psql -h localhost -U postgres -f deployment/k8s/texera-helmchart/files/texera_ddl.sql env: PGPASSWORD: postgres + - name: Create test_texera_db + run: psql -h localhost -U postgres -v DB_NAME=test_texera_db -f core/scripts/sql/texera_ddl.sql + env: + PGPASSWORD: postgres - name: Compile with sbt run: cd core && sbt clean package - name: Run backend tests diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index 8193dfe7884..57ac38a32f9 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -140,7 +140,7 @@ class DataProcessingSpec // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") SqlServer.initConnection( - StorageConfig.jdbcUrl, + StorageConfig.jdbcTestUrl, StorageConfig.jdbcUsername, StorageConfig.jdbcPassword ) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala index 383fe0b3100..3c05b360abb 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala @@ -132,7 +132,7 @@ class PauseSpec // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") SqlServer.initConnection( - StorageConfig.jdbcUrl, + StorageConfig.jdbcTestUrl, StorageConfig.jdbcUsername, StorageConfig.jdbcPassword ) diff --git a/core/config/src/main/resources/storage.conf b/core/config/src/main/resources/storage.conf index 7aba655b461..043f6f01d82 100644 --- a/core/config/src/main/resources/storage.conf +++ b/core/config/src/main/resources/storage.conf @@ -125,6 +125,9 @@ storage { url = "jdbc:postgresql://localhost:5432/texera_db?currentSchema=texera_db,public" url = ${?STORAGE_JDBC_URL} + test-url = "jdbc:postgresql://localhost:5432/test_texera_db?currentSchema=texera_db,public" + test-url = ${?STORAGE_JDBC_TEST_URL} + username = "postgres" username = ${?STORAGE_JDBC_USERNAME} diff --git a/core/config/src/main/scala/edu/uci/ics/amber/config/StorageConfig.scala b/core/config/src/main/scala/edu/uci/ics/amber/config/StorageConfig.scala index 90d2c985e99..02cf4165d60 100644 --- a/core/config/src/main/scala/edu/uci/ics/amber/config/StorageConfig.scala +++ b/core/config/src/main/scala/edu/uci/ics/amber/config/StorageConfig.scala @@ -34,6 +34,7 @@ object StorageConfig { // JDBC specifics val jdbcUrl: String = conf.getString("storage.jdbc.url") + val jdbcTestUrl: String = conf.getString("storage.jdbc.test-url") val jdbcUsername: String = conf.getString("storage.jdbc.username") val jdbcPassword: String = conf.getString("storage.jdbc.password") diff --git a/core/scripts/sql/texera_ddl.sql b/core/scripts/sql/texera_ddl.sql index e98f8e21729..e5d34e7abe5 100644 --- a/core/scripts/sql/texera_ddl.sql +++ b/core/scripts/sql/texera_ddl.sql @@ -15,18 +15,24 @@ -- specific language governing permissions and limitations -- under the License. +-- Defaults; can be overridden with: psql -v DB_NAME=... +\if :{?DB_NAME} +\else + \set DB_NAME 'texera_db' +\endif + -- ============================================ -- 1. Drop and recreate the database (psql only) -- Remove if you already created texera_db -- ============================================ \c postgres -DROP DATABASE IF EXISTS texera_db; -CREATE DATABASE texera_db; +DROP DATABASE IF EXISTS :"DB_NAME"; +CREATE DATABASE :"DB_NAME"; -- ============================================ -- 2. Connect to the new database (psql only) -- ============================================ -\c texera_db +\c :"DB_NAME" CREATE SCHEMA IF NOT EXISTS texera_db; SET search_path TO texera_db, public; From 7187d8c7c616219f8b24d74390675c80fb77b05a Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Thu, 2 Oct 2025 21:25:23 -0700 Subject: [PATCH 25/28] Fix MockTexeraDB. --- .../test/scala/edu/uci/ics/texera/dao/MockTexeraDB.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/dao/src/test/scala/edu/uci/ics/texera/dao/MockTexeraDB.scala b/core/dao/src/test/scala/edu/uci/ics/texera/dao/MockTexeraDB.scala index 1bb49518163..5893bc545c2 100644 --- a/core/dao/src/test/scala/edu/uci/ics/texera/dao/MockTexeraDB.scala +++ b/core/dao/src/test/scala/edu/uci/ics/texera/dao/MockTexeraDB.scala @@ -92,12 +92,15 @@ trait MockTexeraDB { } finally { source.close() } - val parts: Array[String] = content.split("(?m)^\\\\c texera_db") + val parts: Array[String] = content.split("(?m)^CREATE DATABASE :\"DB_NAME\";") def removeCCommands(sql: String): String = sql.linesIterator .filterNot(_.trim.startsWith("\\c")) .mkString("\n") - executeScriptInJDBC(embedded.getPostgresDatabase.getConnection, removeCCommands(parts(0))) + val createDBStatement = + """DROP DATABASE IF EXISTS texera_db; + |CREATE DATABASE texera_db;""".stripMargin + executeScriptInJDBC(embedded.getPostgresDatabase.getConnection, createDBStatement) val texeraDB = embedded.getDatabase(username, database) var tablesAndIndexCreation = removeCCommands(parts(1)) From 054c97232d5381a7bd08b1bd5e901cf28bdb8acc Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Fri, 3 Oct 2025 14:41:22 -0700 Subject: [PATCH 26/28] refactoring. --- .github/workflows/github-action-build.yml | 4 ++-- .../uci/ics/amber/engine/e2e/DataProcessingSpec.scala | 2 +- .../scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala | 2 +- core/config/src/main/resources/storage.conf | 7 +++++-- .../scala/edu/uci/ics/amber/config/StorageConfig.scala | 2 +- .../main/scala/edu/uci/ics/texera/dao/SqlServer.scala | 10 ++++++++-- .../scala/edu/uci/ics/texera/dao/MockTexeraDB.scala | 1 + 7 files changed, 19 insertions(+), 9 deletions(-) diff --git a/.github/workflows/github-action-build.yml b/.github/workflows/github-action-build.yml index 1d64b21a60a..e32c5510fed 100644 --- a/.github/workflows/github-action-build.yml +++ b/.github/workflows/github-action-build.yml @@ -112,8 +112,8 @@ jobs: run: psql -h localhost -U postgres -f deployment/k8s/texera-helmchart/files/texera_ddl.sql env: PGPASSWORD: postgres - - name: Create test_texera_db - run: psql -h localhost -U postgres -v DB_NAME=test_texera_db -f core/scripts/sql/texera_ddl.sql + - name: Create texera_db_for_test_cases + run: psql -h localhost -U postgres -v DB_NAME=texera_db_for_test_cases -f core/scripts/sql/texera_ddl.sql env: PGPASSWORD: postgres - name: Compile with sbt diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index 57ac38a32f9..108629a183b 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -140,7 +140,7 @@ class DataProcessingSpec // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") SqlServer.initConnection( - StorageConfig.jdbcTestUrl, + StorageConfig.jdbcUrlForTestCases, StorageConfig.jdbcUsername, StorageConfig.jdbcPassword ) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala index 3c05b360abb..eafa7fa53ea 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala @@ -132,7 +132,7 @@ class PauseSpec // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") SqlServer.initConnection( - StorageConfig.jdbcTestUrl, + StorageConfig.jdbcUrlForTestCases, StorageConfig.jdbcUsername, StorageConfig.jdbcPassword ) diff --git a/core/config/src/main/resources/storage.conf b/core/config/src/main/resources/storage.conf index 043f6f01d82..85a62b77a3b 100644 --- a/core/config/src/main/resources/storage.conf +++ b/core/config/src/main/resources/storage.conf @@ -125,8 +125,11 @@ storage { url = "jdbc:postgresql://localhost:5432/texera_db?currentSchema=texera_db,public" url = ${?STORAGE_JDBC_URL} - test-url = "jdbc:postgresql://localhost:5432/test_texera_db?currentSchema=texera_db,public" - test-url = ${?STORAGE_JDBC_TEST_URL} + # Some e2e test cases require the user system. To make sure running those test cases can pass the CI, and that + # running them locally do not contaminate developers' own texera_db, we use another database with a different + # name for running these test cases. + url-for-test-cases = "jdbc:postgresql://localhost:5432/texera_db_for_test_cases?currentSchema=texera_db,public" + url-for-test-cases = ${?STORAGE_JDBC_URL_FOR_TEST_CASES} username = "postgres" username = ${?STORAGE_JDBC_USERNAME} diff --git a/core/config/src/main/scala/edu/uci/ics/amber/config/StorageConfig.scala b/core/config/src/main/scala/edu/uci/ics/amber/config/StorageConfig.scala index 2523e1e120a..92a8e15d3f4 100644 --- a/core/config/src/main/scala/edu/uci/ics/amber/config/StorageConfig.scala +++ b/core/config/src/main/scala/edu/uci/ics/amber/config/StorageConfig.scala @@ -34,7 +34,7 @@ object StorageConfig { // JDBC specifics val jdbcUrl: String = conf.getString("storage.jdbc.url") - val jdbcTestUrl: String = conf.getString("storage.jdbc.test-url") + val jdbcUrlForTestCases: String = conf.getString("storage.jdbc.url-for-test-cases") val jdbcUsername: String = conf.getString("storage.jdbc.username") val jdbcPassword: String = conf.getString("storage.jdbc.password") diff --git a/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala b/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala index c08ac933985..dd30bc0bda1 100644 --- a/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala +++ b/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala @@ -52,14 +52,20 @@ object SqlServer { private var instance: Option[SqlServer] = None def initConnection(url: String, user: String, password: String): Unit = { - val server = new SqlServer(url, user, password) - instance = Some(server) + if (instance.isEmpty) { + val server = new SqlServer(url, user, password) + instance = Some(server) + } } def getInstance(): SqlServer = { instance.get } + def clearInstance(): Unit = { + instance = None + } + /** * A utility function for create a transaction block using given sql context * @param dsl the sql context diff --git a/core/dao/src/test/scala/edu/uci/ics/texera/dao/MockTexeraDB.scala b/core/dao/src/test/scala/edu/uci/ics/texera/dao/MockTexeraDB.scala index 5893bc545c2..cc291f94927 100644 --- a/core/dao/src/test/scala/edu/uci/ics/texera/dao/MockTexeraDB.scala +++ b/core/dao/src/test/scala/edu/uci/ics/texera/dao/MockTexeraDB.scala @@ -67,6 +67,7 @@ trait MockTexeraDB { value.close() dbInstance = None dslContext = None + SqlServer.clearInstance() case None => // do nothing } From 8b1b692d02128932c1123eb2019a4e6bf488d638 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Fri, 3 Oct 2025 15:24:55 -0700 Subject: [PATCH 27/28] refactoring. --- .../amber/engine/e2e/DataProcessingSpec.scala | 86 +++--------------- .../uci/ics/amber/engine/e2e/PauseSpec.scala | 86 +++--------------- .../uci/ics/amber/engine/e2e/TestUtils.scala | 89 +++++++++++++++++++ 3 files changed, 109 insertions(+), 152 deletions(-) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index 108629a183b..0b3bacfb64e 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -25,7 +25,6 @@ import akka.util.Timeout import ch.vorburger.mariadb4j.DB import com.twitter.util.{Await, Duration, Promise} import edu.uci.ics.amber.clustering.SingleNodeListener -import edu.uci.ics.amber.config.StorageConfig import edu.uci.ics.amber.core.storage.DocumentFactory import edu.uci.ics.amber.core.storage.model.VirtualDocument import edu.uci.ics.amber.core.tuple.{AttributeType, Tuple} @@ -36,23 +35,15 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.EmptyRequest import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED import edu.uci.ics.amber.engine.common.AmberRuntime import edu.uci.ics.amber.engine.common.client.AmberClient -import edu.uci.ics.amber.engine.e2e.TestUtils.buildWorkflow +import edu.uci.ics.amber.engine.e2e.TestUtils.{ + buildWorkflow, + cleanupWorkflowExecutionData, + initiateTexeraDBForTestCases, + setUpWorkflowExecutionData +} import edu.uci.ics.amber.operator.TestOperators import edu.uci.ics.amber.operator.aggregate.AggregationFunction -import edu.uci.ics.texera.dao.{MockTexeraDB, SqlServer} -import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum -import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ - UserDao, - WorkflowDao, - WorkflowExecutionsDao, - WorkflowVersionDao -} -import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ - User, - WorkflowExecutions, - WorkflowVersion, - Workflow => WorkflowPojo -} +import edu.uci.ics.texera.dao.MockTexeraDB import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId import edu.uci.ics.texera.workflow.LogicalLink import org.scalatest.flatspec.AnyFlatSpecLike @@ -73,65 +64,12 @@ class DataProcessingSpec var inMemoryMySQLInstance: Option[DB] = None val workflowContext: WorkflowContext = new WorkflowContext() - private val testUser: User = { - val user = new User - user.setUid(Integer.valueOf(1)) - user.setName("test_user") - user.setRole(UserRoleEnum.ADMIN) - user.setPassword("123") - user.setEmail("test_user@test.com") - user - } - - private val testWorkflowEntry: WorkflowPojo = { - val workflow = new WorkflowPojo - workflow.setName("test workflow") - workflow.setWid(Integer.valueOf(1)) - workflow.setContent("test workflow content") - workflow.setDescription("test description") - workflow - } - - private val testWorkflowVersionEntry: WorkflowVersion = { - val workflowVersion = new WorkflowVersion - workflowVersion.setWid(Integer.valueOf(1)) - workflowVersion.setVid(Integer.valueOf(1)) - workflowVersion.setContent("test version content") - workflowVersion - } - - private val testWorkflowExecutionEntry: WorkflowExecutions = { - val workflowExecution = new WorkflowExecutions - workflowExecution.setEid(Integer.valueOf(1)) - workflowExecution.setVid(Integer.valueOf(1)) - workflowExecution.setUid(Integer.valueOf(1)) - workflowExecution.setStatus(3.toByte) - workflowExecution.setEnvironmentVersion("test engine") - workflowExecution - } - override protected def beforeEach(): Unit = { - val dslConfig = SqlServer.getInstance().context.configuration() - val userDao = new UserDao(dslConfig) - val workflowDao = new WorkflowDao(dslConfig) - val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) - val workflowVersionDao = new WorkflowVersionDao(dslConfig) - userDao.insert(testUser) - workflowDao.insert(testWorkflowEntry) - workflowVersionDao.insert(testWorkflowVersionEntry) - workflowExecutionsDao.insert(testWorkflowExecutionEntry) + setUpWorkflowExecutionData() } override protected def afterEach(): Unit = { - val dslConfig = SqlServer.getInstance().context.configuration() - val userDao = new UserDao(dslConfig) - val workflowDao = new WorkflowDao(dslConfig) - val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) - val workflowVersionDao = new WorkflowVersionDao(dslConfig) - workflowExecutionsDao.deleteById(1) - workflowVersionDao.deleteById(1) - workflowDao.deleteById(1) - userDao.deleteById(1) + cleanupWorkflowExecutionData() } override def beforeAll(): Unit = { @@ -139,11 +77,7 @@ class DataProcessingSpec // These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run. // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") - SqlServer.initConnection( - StorageConfig.jdbcUrlForTestCases, - StorageConfig.jdbcUsername, - StorageConfig.jdbcPassword - ) + initiateTexeraDBForTestCases() } override def afterAll(): Unit = { diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala index eafa7fa53ea..85908e2e835 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala @@ -25,31 +25,22 @@ import akka.util.Timeout import com.twitter.util.{Await, Promise} import com.typesafe.scalalogging.Logger import edu.uci.ics.amber.clustering.SingleNodeListener -import edu.uci.ics.amber.config.StorageConfig import edu.uci.ics.amber.core.workflow.{PortIdentity, WorkflowContext} import edu.uci.ics.amber.engine.architecture.controller.{ControllerConfig, ExecutionStateUpdate} import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.EmptyRequest import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED import edu.uci.ics.amber.engine.common.AmberRuntime import edu.uci.ics.amber.engine.common.client.AmberClient -import edu.uci.ics.amber.operator.{LogicalOp, TestOperators} -import edu.uci.ics.texera.dao.{MockTexeraDB, SqlServer} -import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum -import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ - UserDao, - WorkflowDao, - WorkflowExecutionsDao, - WorkflowVersionDao -} -import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ - User, - WorkflowExecutions, - WorkflowVersion, - Workflow => WorkflowPojo +import edu.uci.ics.amber.engine.e2e.TestUtils.{ + cleanupWorkflowExecutionData, + initiateTexeraDBForTestCases, + setUpWorkflowExecutionData } +import edu.uci.ics.amber.operator.{LogicalOp, TestOperators} +import edu.uci.ics.texera.dao.MockTexeraDB import edu.uci.ics.texera.workflow.LogicalLink -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import scala.concurrent.duration._ @@ -65,65 +56,12 @@ class PauseSpec val logger = Logger("PauseSpecLogger") - private val testUser: User = { - val user = new User - user.setUid(Integer.valueOf(1)) - user.setName("test_user") - user.setRole(UserRoleEnum.ADMIN) - user.setPassword("123") - user.setEmail("test_user@test.com") - user - } - - private val testWorkflowEntry: WorkflowPojo = { - val workflow = new WorkflowPojo - workflow.setName("test workflow") - workflow.setWid(Integer.valueOf(1)) - workflow.setContent("test workflow content") - workflow.setDescription("test description") - workflow - } - - private val testWorkflowVersionEntry: WorkflowVersion = { - val workflowVersion = new WorkflowVersion - workflowVersion.setWid(Integer.valueOf(1)) - workflowVersion.setVid(Integer.valueOf(1)) - workflowVersion.setContent("test version content") - workflowVersion - } - - private val testWorkflowExecutionEntry: WorkflowExecutions = { - val workflowExecution = new WorkflowExecutions - workflowExecution.setEid(Integer.valueOf(1)) - workflowExecution.setVid(Integer.valueOf(1)) - workflowExecution.setUid(Integer.valueOf(1)) - workflowExecution.setStatus(3.toByte) - workflowExecution.setEnvironmentVersion("test engine") - workflowExecution - } - override protected def beforeEach(): Unit = { - val dslConfig = SqlServer.getInstance().context.configuration() - val userDao = new UserDao(dslConfig) - val workflowDao = new WorkflowDao(dslConfig) - val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) - val workflowVersionDao = new WorkflowVersionDao(dslConfig) - userDao.insert(testUser) - workflowDao.insert(testWorkflowEntry) - workflowVersionDao.insert(testWorkflowVersionEntry) - workflowExecutionsDao.insert(testWorkflowExecutionEntry) + setUpWorkflowExecutionData() } override protected def afterEach(): Unit = { - val dslConfig = SqlServer.getInstance().context.configuration() - val userDao = new UserDao(dslConfig) - val workflowDao = new WorkflowDao(dslConfig) - val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) - val workflowVersionDao = new WorkflowVersionDao(dslConfig) - workflowExecutionsDao.deleteById(1) - workflowVersionDao.deleteById(1) - workflowDao.deleteById(1) - userDao.deleteById(1) + cleanupWorkflowExecutionData() } override def beforeAll(): Unit = { @@ -131,11 +69,7 @@ class PauseSpec // These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run. // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") - SqlServer.initConnection( - StorageConfig.jdbcUrlForTestCases, - StorageConfig.jdbcUsername, - StorageConfig.jdbcPassword - ) + initiateTexeraDBForTestCases() } override def afterAll(): Unit = { diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/TestUtils.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/TestUtils.scala index b20c74cb282..c80ece82e2c 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/TestUtils.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/TestUtils.scala @@ -19,9 +19,24 @@ package edu.uci.ics.amber.engine.e2e +import edu.uci.ics.amber.config.StorageConfig import edu.uci.ics.amber.core.workflow.WorkflowContext import edu.uci.ics.amber.engine.architecture.controller.Workflow import edu.uci.ics.amber.operator.LogicalOp +import edu.uci.ics.texera.dao.SqlServer +import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum +import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ + UserDao, + WorkflowDao, + WorkflowExecutionsDao, + WorkflowVersionDao +} +import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ + User, + WorkflowExecutions, + WorkflowVersion, + Workflow => WorkflowPojo +} import edu.uci.ics.texera.web.model.websocket.request.LogicalPlanPojo import edu.uci.ics.texera.workflow.{LogicalLink, WorkflowCompiler} @@ -40,4 +55,78 @@ object TestUtils { ) } + /** + * If a test case accesses the user system through singleton resources that cache the DSLContext (e.g., executes a + * workflow, which accesses WorkflowExecutionsResource), we use a separate texera_db specifically for such test cases. + * Note such test cases need to clean up the database at the end of running each test case. + */ + def initiateTexeraDBForTestCases(): Unit = { + SqlServer.initConnection( + StorageConfig.jdbcUrlForTestCases, + StorageConfig.jdbcUsername, + StorageConfig.jdbcPassword + ) + } + + val testUser: User = { + val user = new User + user.setUid(Integer.valueOf(1)) + user.setName("test_user") + user.setRole(UserRoleEnum.ADMIN) + user.setPassword("123") + user.setEmail("test_user@test.com") + user + } + + val testWorkflowEntry: WorkflowPojo = { + val workflow = new WorkflowPojo + workflow.setName("test workflow") + workflow.setWid(Integer.valueOf(1)) + workflow.setContent("test workflow content") + workflow.setDescription("test description") + workflow + } + + val testWorkflowVersionEntry: WorkflowVersion = { + val workflowVersion = new WorkflowVersion + workflowVersion.setWid(Integer.valueOf(1)) + workflowVersion.setVid(Integer.valueOf(1)) + workflowVersion.setContent("test version content") + workflowVersion + } + + val testWorkflowExecutionEntry: WorkflowExecutions = { + val workflowExecution = new WorkflowExecutions + workflowExecution.setEid(Integer.valueOf(1)) + workflowExecution.setVid(Integer.valueOf(1)) + workflowExecution.setUid(Integer.valueOf(1)) + workflowExecution.setStatus(3.toByte) + workflowExecution.setEnvironmentVersion("test engine") + workflowExecution + } + + def setUpWorkflowExecutionData(): Unit = { + val dslConfig = SqlServer.getInstance().context.configuration() + val userDao = new UserDao(dslConfig) + val workflowDao = new WorkflowDao(dslConfig) + val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) + val workflowVersionDao = new WorkflowVersionDao(dslConfig) + userDao.insert(testUser) + workflowDao.insert(testWorkflowEntry) + workflowVersionDao.insert(testWorkflowVersionEntry) + workflowExecutionsDao.insert(testWorkflowExecutionEntry) + } + + def cleanupWorkflowExecutionData(): Unit = { + val dslConfig = SqlServer.getInstance().context.configuration() + val userDao = new UserDao(dslConfig) + val workflowDao = new WorkflowDao(dslConfig) + val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig) + val workflowVersionDao = new WorkflowVersionDao(dslConfig) + workflowExecutionsDao.deleteById(1) + workflowVersionDao.deleteById(1) + workflowDao.deleteById(1) + userDao.deleteById(1) + } + } From 2c811d9802cc4849722f883c95a69c230e0430fa Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Fri, 3 Oct 2025 15:38:36 -0700 Subject: [PATCH 28/28] refactoring. --- .../edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala | 6 +----- .../scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala | 4 +--- core/scripts/sql/texera_ddl.sql | 7 ++++++- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index 0b3bacfb64e..4d3a6ad9d7f 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -22,7 +22,6 @@ package edu.uci.ics.amber.engine.e2e import akka.actor.{ActorSystem, Props} import akka.testkit.{ImplicitSender, TestKit} import akka.util.Timeout -import ch.vorburger.mariadb4j.DB import com.twitter.util.{Await, Duration, Promise} import edu.uci.ics.amber.clustering.SingleNodeListener import edu.uci.ics.amber.core.storage.DocumentFactory @@ -43,7 +42,6 @@ import edu.uci.ics.amber.engine.e2e.TestUtils.{ } import edu.uci.ics.amber.operator.TestOperators import edu.uci.ics.amber.operator.aggregate.AggregationFunction -import edu.uci.ics.texera.dao.MockTexeraDB import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId import edu.uci.ics.texera.workflow.LogicalLink import org.scalatest.flatspec.AnyFlatSpecLike @@ -56,12 +54,10 @@ class DataProcessingSpec with ImplicitSender with AnyFlatSpecLike with BeforeAndAfterAll - with BeforeAndAfterEach - with MockTexeraDB { + with BeforeAndAfterEach { implicit val timeout: Timeout = Timeout(5.seconds) - var inMemoryMySQLInstance: Option[DB] = None val workflowContext: WorkflowContext = new WorkflowContext() override protected def beforeEach(): Unit = { diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala index 85908e2e835..db28c9f4597 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala @@ -37,7 +37,6 @@ import edu.uci.ics.amber.engine.e2e.TestUtils.{ setUpWorkflowExecutionData } import edu.uci.ics.amber.operator.{LogicalOp, TestOperators} -import edu.uci.ics.texera.dao.MockTexeraDB import edu.uci.ics.texera.workflow.LogicalLink import org.scalatest.flatspec.AnyFlatSpecLike import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} @@ -49,8 +48,7 @@ class PauseSpec with ImplicitSender with AnyFlatSpecLike with BeforeAndAfterAll - with BeforeAndAfterEach - with MockTexeraDB { + with BeforeAndAfterEach { implicit val timeout: Timeout = Timeout(5.seconds) diff --git a/core/scripts/sql/texera_ddl.sql b/core/scripts/sql/texera_ddl.sql index 00bc27dc619..7b0f9b9063d 100644 --- a/core/scripts/sql/texera_ddl.sql +++ b/core/scripts/sql/texera_ddl.sql @@ -15,7 +15,12 @@ -- specific language governing permissions and limitations -- under the License. --- Defaults; can be overridden with: psql -v DB_NAME=... +-- ============================================ +-- 0. Specify the database name +-- (defaults to texera_db) +-- Override the name with: +-- psql -v DB_NAME= ... +-- ============================================ \if :{?DB_NAME} \else \set DB_NAME 'texera_db'