Skip to content

Commit 83076ac

Browse files
feat(amber): enable user system by default (#3782)
## Purpose This PR sets user system to be enabled by default in the configuration. Currently, this flag is by default set to be disabled (a.k.a. the non-user mode). As no one is using the non-user mode and we are requiring all the developers to enable the user system, we have decided to abandon the non-user mode. ## Challenge & Design The major blocker of setting the flag to be enabled by default is two e2e test suites that rely on the non-user mode. These two test suites execute a workflow in the Amber engine in each of their test cases. Enabling the user mode would require texera_db in the test environment, as in the user-system mode, the execution of a workflow requires an `eid` (and subsequently a `vid`, `wid`, and `uid`) in `texera_db`. We could use `MockTexeraDB`, which is currently used by many unit tests. `MockTexeraDB` creates an embedded postgres instance per test suite, and the embedded db is destroyed at the end of each such test suite. However, a complexity of the two e2e test cases is they both access a singleton resource `WorkflowExecutionsResource`, which caches the DSL context from `SqlServer` (i.e., it only gets evaluated once per JVM): ``` final private lazy val context = SqlServer .getInstance() .createDSLContext() ``` In fact, most of the singleton resources in our current codebase cache the `DSLContext` / Dao, as the `DSLContext` never gets updated during the real Texera environment (i.e., the real`texera_db`'s address never changes). In the test environment, however, when working with `MockTexeraDB`, that assumption does not hold, as each instance of `MockTexeraDB` has a different address, and gets destroyed before other test suite runs. Since all the test suites are executed in the same JVM during CI run, using `MockTexeraDB` would cause the 2nd of the two e2e test cases to fail because it still uses the DSL context from the 1st test suite's `MockTexeraDB`. The diagrams below show what happens when using the embedded `MockTexeraDB` to run two e2e test suites that both need to access the same singleton resource during their execution. The 1st test suite creates an embedded DB (`DB1`) and lets the singleton `SqlServer` object set its `DSLContext` to point to `DB1`. When the test cases first access `WorkflowExecutionsResource` (`WER`), WER grabs the `DSLContext` from `SqlServer` and caches it. `WER` then queries `DB1` for all the test cases of test suite 1. When test suite 1 finishes, `DB1` gets destroyed. ![DB and CI - 1](https://github.com/user-attachments/assets/0e405744-d2e4-4543-8c51-13abd88a6845) Later, In the same JVM, when test suite 2 starts, it also creates its own embedded DB (`DB2`) and lets `SqlServer` point to `DB2`. However, as the `DSLContext` in `WER` is cached, it does not get updated when the test cases access `WER`, so `WER` still points to `DB1`, which is already destroyed, and causes failures. ![DB and CI - 2](https://github.com/user-attachments/assets/af364b16-93c5-463e-8a24-952347584b2e) To solve this problem, we could either: 1. Avoid caching DSLContext/Dao in the codebase, or 2. Let the two e2e test cases use the same real, external database (same as production environment) instead of `MockTexeraDB`. **We choose the 2nd design, as these two are e2e tests which should emulate production behavior with a real database.** To avoid polluting the developer's local `texera_db`, we use a separate test database with the same schema. ## Changes - Sets `user-sys` to be enabled by default. - Introduces a `texera_db_for_test_cases` specifically for test cases and CIs. `texera_ddl.sql` is updated to allow creating the database with a name other than `texera_db` (and still defaults to `texera_db`), and CIs will automatically create `texera_db_for_test_cases` with the same schema as `texera_db`. - Updates `DataProcessingSpec` and `PauseSpec` to use `texera_db_for_test_cases`. The two test suites now populate and cleanup this database during their run. - `MockTexeraDB` is updated to incorporate the changes to the DDL script. - `SqlServer` is also updated with a `clearInstance` logic so that other unit tests that use `MockTexeraDB` can clear their instance in `SqlServer` properly so that they do not interfere with the two e2e tests. ## Next Step Remove the `user-sys`'s`enabled` flag and its `if-else` handling logic completely. --------- Co-authored-by: Xinyuan Lin <xinyual3@uci.edu>
1 parent 257fbe0 commit 83076ac

File tree

10 files changed

+169
-32
lines changed

10 files changed

+169
-32
lines changed

.github/workflows/github-action-build.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,10 @@ jobs:
115115
psql -h localhost -U postgres -f core/scripts/sql/texera_lakefs.sql
116116
env:
117117
PGPASSWORD: postgres
118+
- name: Create texera_db_for_test_cases
119+
run: psql -h localhost -U postgres -v DB_NAME=texera_db_for_test_cases -f core/scripts/sql/texera_ddl.sql
120+
env:
121+
PGPASSWORD: postgres
118122
- name: Compile with sbt
119123
run: cd core && sbt clean package
120124
- name: Run backend tests

core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,27 @@ package edu.uci.ics.amber.engine.e2e
2222
import akka.actor.{ActorSystem, Props}
2323
import akka.testkit.{ImplicitSender, TestKit}
2424
import akka.util.Timeout
25-
import ch.vorburger.mariadb4j.DB
2625
import com.twitter.util.{Await, Duration, Promise}
2726
import edu.uci.ics.amber.clustering.SingleNodeListener
27+
import edu.uci.ics.amber.core.storage.DocumentFactory
2828
import edu.uci.ics.amber.core.storage.model.VirtualDocument
29-
import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping
30-
import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory}
3129
import edu.uci.ics.amber.core.tuple.{AttributeType, Tuple}
32-
import edu.uci.ics.amber.core.virtualidentity.{OperatorIdentity, PhysicalOpIdentity}
33-
import edu.uci.ics.amber.core.workflow.{GlobalPortIdentity, PortIdentity, WorkflowContext}
30+
import edu.uci.ics.amber.core.virtualidentity.OperatorIdentity
31+
import edu.uci.ics.amber.core.workflow.{PortIdentity, WorkflowContext}
3432
import edu.uci.ics.amber.engine.architecture.controller._
3533
import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.EmptyRequest
3634
import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
3735
import edu.uci.ics.amber.engine.common.AmberRuntime
3836
import edu.uci.ics.amber.engine.common.client.AmberClient
39-
import edu.uci.ics.amber.engine.e2e.TestUtils.buildWorkflow
37+
import edu.uci.ics.amber.engine.e2e.TestUtils.{
38+
buildWorkflow,
39+
cleanupWorkflowExecutionData,
40+
initiateTexeraDBForTestCases,
41+
setUpWorkflowExecutionData
42+
}
4043
import edu.uci.ics.amber.operator.TestOperators
4144
import edu.uci.ics.amber.operator.aggregate.AggregationFunction
45+
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId
4246
import edu.uci.ics.texera.workflow.LogicalLink
4347
import org.scalatest.flatspec.AnyFlatSpecLike
4448
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
@@ -54,14 +58,22 @@ class DataProcessingSpec
5458

5559
implicit val timeout: Timeout = Timeout(5.seconds)
5660

57-
var inMemoryMySQLInstance: Option[DB] = None
5861
val workflowContext: WorkflowContext = new WorkflowContext()
5962

63+
override protected def beforeEach(): Unit = {
64+
setUpWorkflowExecutionData()
65+
}
66+
67+
override protected def afterEach(): Unit = {
68+
cleanupWorkflowExecutionData()
69+
}
70+
6071
override def beforeAll(): Unit = {
6172
system.actorOf(Props[SingleNodeListener](), "cluster-info")
6273
// These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run.
6374
// Explicitly load the JDBC driver to avoid flaky CI failures.
6475
Class.forName("org.postgresql.Driver")
76+
initiateTexeraDBForTestCases()
6577
}
6678

6779
override def afterAll(): Unit = {
@@ -88,30 +100,21 @@ class DataProcessingSpec
88100
if (evt.state == COMPLETED) {
89101
results = workflow.logicalPlan.getTerminalOperatorIds
90102
.filter(terminalOpId => {
91-
val uri = VFSURIFactory.createResultURI(
92-
workflowContext.workflowId,
103+
val uri = getResultUriByLogicalPortId(
93104
workflowContext.executionId,
94-
GlobalPortIdentity(
95-
PhysicalOpIdentity(logicalOpId = terminalOpId, layerName = "main"),
96-
PortIdentity()
97-
)
105+
terminalOpId,
106+
PortIdentity()
98107
)
99-
// expecting the first output port only.
100-
ExecutionResourcesMapping
101-
.getResourceURIs(workflowContext.executionId)
102-
.contains(uri)
108+
uri.nonEmpty
103109
})
104110
.map(terminalOpId => {
105111
//TODO: remove the delay after fixing the issue of reporting "completed" status too early.
106112
Thread.sleep(1000)
107-
val uri = VFSURIFactory.createResultURI(
108-
workflowContext.workflowId,
113+
val uri = getResultUriByLogicalPortId(
109114
workflowContext.executionId,
110-
GlobalPortIdentity(
111-
PhysicalOpIdentity(logicalOpId = terminalOpId, layerName = "main"),
112-
PortIdentity()
113-
)
114-
)
115+
terminalOpId,
116+
PortIdentity()
117+
).get
115118
terminalOpId -> DocumentFactory
116119
.openDocument(uri)
117120
._1

core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,28 +31,43 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.EmptyRequest
3131
import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
3232
import edu.uci.ics.amber.engine.common.AmberRuntime
3333
import edu.uci.ics.amber.engine.common.client.AmberClient
34+
import edu.uci.ics.amber.engine.e2e.TestUtils.{
35+
cleanupWorkflowExecutionData,
36+
initiateTexeraDBForTestCases,
37+
setUpWorkflowExecutionData
38+
}
3439
import edu.uci.ics.amber.operator.{LogicalOp, TestOperators}
3540
import edu.uci.ics.texera.workflow.LogicalLink
36-
import org.scalatest.BeforeAndAfterAll
3741
import org.scalatest.flatspec.AnyFlatSpecLike
42+
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
3843

3944
import scala.concurrent.duration._
4045

4146
class PauseSpec
4247
extends TestKit(ActorSystem("PauseSpec", AmberRuntime.akkaConfig))
4348
with ImplicitSender
4449
with AnyFlatSpecLike
45-
with BeforeAndAfterAll {
50+
with BeforeAndAfterAll
51+
with BeforeAndAfterEach {
4652

4753
implicit val timeout: Timeout = Timeout(5.seconds)
4854

4955
val logger = Logger("PauseSpecLogger")
5056

57+
override protected def beforeEach(): Unit = {
58+
setUpWorkflowExecutionData()
59+
}
60+
61+
override protected def afterEach(): Unit = {
62+
cleanupWorkflowExecutionData()
63+
}
64+
5165
override def beforeAll(): Unit = {
5266
system.actorOf(Props[SingleNodeListener](), "cluster-info")
5367
// These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run.
5468
// Explicitly load the JDBC driver to avoid flaky CI failures.
5569
Class.forName("org.postgresql.Driver")
70+
initiateTexeraDBForTestCases()
5671
}
5772

5873
override def afterAll(): Unit = {

core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/TestUtils.scala

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,24 @@
1919

2020
package edu.uci.ics.amber.engine.e2e
2121

22+
import edu.uci.ics.amber.config.StorageConfig
2223
import edu.uci.ics.amber.core.workflow.WorkflowContext
2324
import edu.uci.ics.amber.engine.architecture.controller.Workflow
2425
import edu.uci.ics.amber.operator.LogicalOp
26+
import edu.uci.ics.texera.dao.SqlServer
27+
import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum
28+
import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{
29+
UserDao,
30+
WorkflowDao,
31+
WorkflowExecutionsDao,
32+
WorkflowVersionDao
33+
}
34+
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{
35+
User,
36+
WorkflowExecutions,
37+
WorkflowVersion,
38+
Workflow => WorkflowPojo
39+
}
2540
import edu.uci.ics.texera.web.model.websocket.request.LogicalPlanPojo
2641
import edu.uci.ics.texera.workflow.{LogicalLink, WorkflowCompiler}
2742

@@ -40,4 +55,78 @@ object TestUtils {
4055
)
4156
}
4257

58+
/**
59+
* If a test case accesses the user system through singleton resources that cache the DSLContext (e.g., executes a
60+
* workflow, which accesses WorkflowExecutionsResource), we use a separate texera_db specifically for such test cases.
61+
* Note such test cases need to clean up the database at the end of running each test case.
62+
*/
63+
def initiateTexeraDBForTestCases(): Unit = {
64+
SqlServer.initConnection(
65+
StorageConfig.jdbcUrlForTestCases,
66+
StorageConfig.jdbcUsername,
67+
StorageConfig.jdbcPassword
68+
)
69+
}
70+
71+
val testUser: User = {
72+
val user = new User
73+
user.setUid(Integer.valueOf(1))
74+
user.setName("test_user")
75+
user.setRole(UserRoleEnum.ADMIN)
76+
user.setPassword("123")
77+
user.setEmail("test_user@test.com")
78+
user
79+
}
80+
81+
val testWorkflowEntry: WorkflowPojo = {
82+
val workflow = new WorkflowPojo
83+
workflow.setName("test workflow")
84+
workflow.setWid(Integer.valueOf(1))
85+
workflow.setContent("test workflow content")
86+
workflow.setDescription("test description")
87+
workflow
88+
}
89+
90+
val testWorkflowVersionEntry: WorkflowVersion = {
91+
val workflowVersion = new WorkflowVersion
92+
workflowVersion.setWid(Integer.valueOf(1))
93+
workflowVersion.setVid(Integer.valueOf(1))
94+
workflowVersion.setContent("test version content")
95+
workflowVersion
96+
}
97+
98+
val testWorkflowExecutionEntry: WorkflowExecutions = {
99+
val workflowExecution = new WorkflowExecutions
100+
workflowExecution.setEid(Integer.valueOf(1))
101+
workflowExecution.setVid(Integer.valueOf(1))
102+
workflowExecution.setUid(Integer.valueOf(1))
103+
workflowExecution.setStatus(3.toByte)
104+
workflowExecution.setEnvironmentVersion("test engine")
105+
workflowExecution
106+
}
107+
108+
def setUpWorkflowExecutionData(): Unit = {
109+
val dslConfig = SqlServer.getInstance().context.configuration()
110+
val userDao = new UserDao(dslConfig)
111+
val workflowDao = new WorkflowDao(dslConfig)
112+
val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig)
113+
val workflowVersionDao = new WorkflowVersionDao(dslConfig)
114+
userDao.insert(testUser)
115+
workflowDao.insert(testWorkflowEntry)
116+
workflowVersionDao.insert(testWorkflowVersionEntry)
117+
workflowExecutionsDao.insert(testWorkflowExecutionEntry)
118+
}
119+
120+
def cleanupWorkflowExecutionData(): Unit = {
121+
val dslConfig = SqlServer.getInstance().context.configuration()
122+
val userDao = new UserDao(dslConfig)
123+
val workflowDao = new WorkflowDao(dslConfig)
124+
val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig)
125+
val workflowVersionDao = new WorkflowVersionDao(dslConfig)
126+
workflowExecutionsDao.deleteById(1)
127+
workflowVersionDao.deleteById(1)
128+
workflowDao.deleteById(1)
129+
userDao.deleteById(1)
130+
}
131+
43132
}

core/config/src/main/resources/storage.conf

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,12 @@ storage {
125125
url = "jdbc:postgresql://localhost:5432/texera_db?currentSchema=texera_db,public"
126126
url = ${?STORAGE_JDBC_URL}
127127

128+
# Some e2e test cases require the user system. To make sure running those test cases can pass the CI, and that
129+
# running them locally do not contaminate developers' own texera_db, we use another database with a different
130+
# name for running these test cases.
131+
url-for-test-cases = "jdbc:postgresql://localhost:5432/texera_db_for_test_cases?currentSchema=texera_db,public"
132+
url-for-test-cases = ${?STORAGE_JDBC_URL_FOR_TEST_CASES}
133+
128134
username = "postgres"
129135
username = ${?STORAGE_JDBC_USERNAME}
130136

core/config/src/main/resources/user-system.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
# See PR https://github.com/Texera/texera/pull/3326 for configuration guidelines.
1919
user-sys {
20-
enabled = false
20+
enabled = true
2121
enabled = ${?USER_SYS_ENABLED}
2222

2323
admin-username = "texera"

core/config/src/main/scala/edu/uci/ics/amber/config/StorageConfig.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ object StorageConfig {
3434

3535
// JDBC specifics
3636
val jdbcUrl: String = conf.getString("storage.jdbc.url")
37+
val jdbcUrlForTestCases: String = conf.getString("storage.jdbc.url-for-test-cases")
3738
val jdbcUsername: String = conf.getString("storage.jdbc.username")
3839
val jdbcPassword: String = conf.getString("storage.jdbc.password")
3940

core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ object SqlServer {
6262
instance.get
6363
}
6464

65+
def clearInstance(): Unit = {
66+
instance = None
67+
}
68+
6569
/**
6670
* A utility function for create a transaction block using given sql context
6771
* @param dsl the sql context

core/dao/src/test/scala/edu/uci/ics/texera/dao/MockTexeraDB.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ trait MockTexeraDB {
6767
value.close()
6868
dbInstance = None
6969
dslContext = None
70+
SqlServer.clearInstance()
7071
case None =>
7172
// do nothing
7273
}
@@ -92,12 +93,15 @@ trait MockTexeraDB {
9293
} finally {
9394
source.close()
9495
}
95-
val parts: Array[String] = content.split("(?m)^\\\\c texera_db")
96+
val parts: Array[String] = content.split("(?m)^CREATE DATABASE :\"DB_NAME\";")
9697
def removeCCommands(sql: String): String =
9798
sql.linesIterator
9899
.filterNot(_.trim.startsWith("\\c"))
99100
.mkString("\n")
100-
executeScriptInJDBC(embedded.getPostgresDatabase.getConnection, removeCCommands(parts(0)))
101+
val createDBStatement =
102+
"""DROP DATABASE IF EXISTS texera_db;
103+
|CREATE DATABASE texera_db;""".stripMargin
104+
executeScriptInJDBC(embedded.getPostgresDatabase.getConnection, createDBStatement)
101105
val texeraDB = embedded.getDatabase(username, database)
102106
var tablesAndIndexCreation = removeCCommands(parts(1))
103107

core/scripts/sql/texera_ddl.sql

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,29 @@
1515
-- specific language governing permissions and limitations
1616
-- under the License.
1717

18+
-- ============================================
19+
-- 0. Specify the database name
20+
-- (defaults to texera_db)
21+
-- Override the name with:
22+
-- psql -v DB_NAME=<alternative_name> ...
23+
-- ============================================
24+
\if :{?DB_NAME}
25+
\else
26+
\set DB_NAME 'texera_db'
27+
\endif
28+
1829
-- ============================================
1930
-- 1. Drop and recreate the database (psql only)
2031
-- Remove if you already created texera_db
2132
-- ============================================
2233
\c postgres
23-
DROP DATABASE IF EXISTS texera_db;
24-
CREATE DATABASE texera_db;
34+
DROP DATABASE IF EXISTS :"DB_NAME";
35+
CREATE DATABASE :"DB_NAME";
2536

2637
-- ============================================
2738
-- 2. Connect to the new database (psql only)
2839
-- ============================================
29-
\c texera_db
40+
\c :"DB_NAME"
3041

3142
CREATE SCHEMA IF NOT EXISTS texera_db;
3243
SET search_path TO texera_db, public;

0 commit comments

Comments
 (0)