Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
31fe50b
chore(config): enable user system by default
Xiao-zhen-Liu Sep 29, 2025
f596578
Merge branch 'main' into xiaozhen-default-user-sys
Xiao-zhen-Liu Sep 30, 2025
e4ea83a
wip.
Xiao-zhen-Liu Sep 30, 2025
6934701
add mockTexeraDB to tests.
Xiao-zhen-Liu Oct 1, 2025
1eaf002
wip.
Xiao-zhen-Liu Oct 1, 2025
f53ad2e
wip.
Xiao-zhen-Liu Oct 1, 2025
1addd2d
wip.
Xiao-zhen-Liu Oct 1, 2025
bd92fd8
wip.
Xiao-zhen-Liu Oct 1, 2025
c4e4501
wip.
Xiao-zhen-Liu Oct 1, 2025
0a4c262
wip.
Xiao-zhen-Liu Oct 1, 2025
cf07a32
wip.
Xiao-zhen-Liu Oct 1, 2025
2438732
wip.
Xiao-zhen-Liu Oct 1, 2025
d482a6a
wip.
Xiao-zhen-Liu Oct 1, 2025
62f36a1
wip.
Xiao-zhen-Liu Oct 1, 2025
25da914
wip.
Xiao-zhen-Liu Oct 1, 2025
d62c9c3
wip.
Xiao-zhen-Liu Oct 1, 2025
ca66a16
revert.
Xiao-zhen-Liu Oct 1, 2025
8ce30f4
add await and remove lazy val.
Xiao-zhen-Liu Oct 1, 2025
7d57518
Merge remote-tracking branch 'origin/xiaozhen-default-user-sys' into …
Xiao-zhen-Liu Oct 1, 2025
c5ea3d3
revert.
Xiao-zhen-Liu Oct 1, 2025
633afe2
add await.
Xiao-zhen-Liu Oct 1, 2025
440e00d
remove await.
Xiao-zhen-Liu Oct 2, 2025
fb1e9d8
test using external database
Xiao-zhen-Liu Oct 2, 2025
b48fd42
test using external database
Xiao-zhen-Liu Oct 2, 2025
f897e95
test using external database
Xiao-zhen-Liu Oct 2, 2025
a1544af
Merge branch 'main' into xiaozhen-default-user-sys
aglinxinyuan Oct 2, 2025
996f436
wip.
Xiao-zhen-Liu Oct 3, 2025
7187d8c
Fix MockTexeraDB.
Xiao-zhen-Liu Oct 3, 2025
eea0277
Merge branch 'main' into xiaozhen-default-user-sys
Xiao-zhen-Liu Oct 3, 2025
054c972
refactoring.
Xiao-zhen-Liu Oct 3, 2025
8b1b692
refactoring.
Xiao-zhen-Liu Oct 3, 2025
2c811d9
refactoring.
Xiao-zhen-Liu Oct 3, 2025
2145a11
Merge branch 'main' into xiaozhen-default-user-sys
Xiao-zhen-Liu Oct 3, 2025
614fe70
Merge branch 'main' into xiaozhen-default-user-sys
Xiao-zhen-Liu Oct 6, 2025
0fac3f9
Merge branch 'main' into xiaozhen-default-user-sys
aglinxinyuan Oct 7, 2025
6ed02d2
Merge branch 'main' into xiaozhen-default-user-sys
aglinxinyuan Oct 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/github-action-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ jobs:
psql -h localhost -U postgres -f core/scripts/sql/texera_lakefs.sql
env:
PGPASSWORD: postgres
- name: Create texera_db_for_test_cases
run: psql -h localhost -U postgres -v DB_NAME=texera_db_for_test_cases -f core/scripts/sql/texera_ddl.sql
env:
PGPASSWORD: postgres
- name: Compile with sbt
run: cd core && sbt clean package
- name: Run backend tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,27 @@ package edu.uci.ics.amber.engine.e2e
import akka.actor.{ActorSystem, Props}
import akka.testkit.{ImplicitSender, TestKit}
import akka.util.Timeout
import ch.vorburger.mariadb4j.DB
import com.twitter.util.{Await, Duration, Promise}
import edu.uci.ics.amber.clustering.SingleNodeListener
import edu.uci.ics.amber.core.storage.DocumentFactory
import edu.uci.ics.amber.core.storage.model.VirtualDocument
import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping
import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory}
import edu.uci.ics.amber.core.tuple.{AttributeType, Tuple}
import edu.uci.ics.amber.core.virtualidentity.{OperatorIdentity, PhysicalOpIdentity}
import edu.uci.ics.amber.core.workflow.{GlobalPortIdentity, PortIdentity, WorkflowContext}
import edu.uci.ics.amber.core.virtualidentity.OperatorIdentity
import edu.uci.ics.amber.core.workflow.{PortIdentity, WorkflowContext}
import edu.uci.ics.amber.engine.architecture.controller._
import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.EmptyRequest
import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
import edu.uci.ics.amber.engine.common.AmberRuntime
import edu.uci.ics.amber.engine.common.client.AmberClient
import edu.uci.ics.amber.engine.e2e.TestUtils.buildWorkflow
import edu.uci.ics.amber.engine.e2e.TestUtils.{
buildWorkflow,
cleanupWorkflowExecutionData,
initiateTexeraDBForTestCases,
setUpWorkflowExecutionData
}
import edu.uci.ics.amber.operator.TestOperators
import edu.uci.ics.amber.operator.aggregate.AggregationFunction
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId
import edu.uci.ics.texera.workflow.LogicalLink
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
Expand All @@ -54,14 +58,22 @@ class DataProcessingSpec

implicit val timeout: Timeout = Timeout(5.seconds)

var inMemoryMySQLInstance: Option[DB] = None
val workflowContext: WorkflowContext = new WorkflowContext()

override protected def beforeEach(): Unit = {
setUpWorkflowExecutionData()
}

override protected def afterEach(): Unit = {
cleanupWorkflowExecutionData()
}

override def beforeAll(): Unit = {
system.actorOf(Props[SingleNodeListener](), "cluster-info")
// These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run.
// Explicitly load the JDBC driver to avoid flaky CI failures.
Class.forName("org.postgresql.Driver")
initiateTexeraDBForTestCases()
}

override def afterAll(): Unit = {
Expand All @@ -88,30 +100,21 @@ class DataProcessingSpec
if (evt.state == COMPLETED) {
results = workflow.logicalPlan.getTerminalOperatorIds
.filter(terminalOpId => {
val uri = VFSURIFactory.createResultURI(
workflowContext.workflowId,
val uri = getResultUriByLogicalPortId(
workflowContext.executionId,
GlobalPortIdentity(
PhysicalOpIdentity(logicalOpId = terminalOpId, layerName = "main"),
PortIdentity()
)
terminalOpId,
PortIdentity()
)
// expecting the first output port only.
ExecutionResourcesMapping
.getResourceURIs(workflowContext.executionId)
.contains(uri)
uri.nonEmpty
})
.map(terminalOpId => {
//TODO: remove the delay after fixing the issue of reporting "completed" status too early.
Thread.sleep(1000)
val uri = VFSURIFactory.createResultURI(
workflowContext.workflowId,
val uri = getResultUriByLogicalPortId(
workflowContext.executionId,
GlobalPortIdentity(
PhysicalOpIdentity(logicalOpId = terminalOpId, layerName = "main"),
PortIdentity()
)
)
terminalOpId,
PortIdentity()
).get
terminalOpId -> DocumentFactory
.openDocument(uri)
._1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,43 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.EmptyRequest
import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
import edu.uci.ics.amber.engine.common.AmberRuntime
import edu.uci.ics.amber.engine.common.client.AmberClient
import edu.uci.ics.amber.engine.e2e.TestUtils.{
cleanupWorkflowExecutionData,
initiateTexeraDBForTestCases,
setUpWorkflowExecutionData
}
import edu.uci.ics.amber.operator.{LogicalOp, TestOperators}
import edu.uci.ics.texera.workflow.LogicalLink
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

import scala.concurrent.duration._

class PauseSpec
extends TestKit(ActorSystem("PauseSpec", AmberRuntime.akkaConfig))
with ImplicitSender
with AnyFlatSpecLike
with BeforeAndAfterAll {
with BeforeAndAfterAll
with BeforeAndAfterEach {

implicit val timeout: Timeout = Timeout(5.seconds)

val logger = Logger("PauseSpecLogger")

override protected def beforeEach(): Unit = {
setUpWorkflowExecutionData()
}

override protected def afterEach(): Unit = {
cleanupWorkflowExecutionData()
}

override def beforeAll(): Unit = {
system.actorOf(Props[SingleNodeListener](), "cluster-info")
// These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run.
// Explicitly load the JDBC driver to avoid flaky CI failures.
Class.forName("org.postgresql.Driver")
initiateTexeraDBForTestCases()
}

override def afterAll(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,24 @@

package edu.uci.ics.amber.engine.e2e

import edu.uci.ics.amber.config.StorageConfig
import edu.uci.ics.amber.core.workflow.WorkflowContext
import edu.uci.ics.amber.engine.architecture.controller.Workflow
import edu.uci.ics.amber.operator.LogicalOp
import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum
import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{
UserDao,
WorkflowDao,
WorkflowExecutionsDao,
WorkflowVersionDao
}
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{
User,
WorkflowExecutions,
WorkflowVersion,
Workflow => WorkflowPojo
}
import edu.uci.ics.texera.web.model.websocket.request.LogicalPlanPojo
import edu.uci.ics.texera.workflow.{LogicalLink, WorkflowCompiler}

Expand All @@ -40,4 +55,78 @@ object TestUtils {
)
}

/**
* If a test case accesses the user system through singleton resources that cache the DSLContext (e.g., executes a
* workflow, which accesses WorkflowExecutionsResource), we use a separate texera_db specifically for such test cases.
* Note such test cases need to clean up the database at the end of running each test case.
*/
def initiateTexeraDBForTestCases(): Unit = {
SqlServer.initConnection(
StorageConfig.jdbcUrlForTestCases,
StorageConfig.jdbcUsername,
StorageConfig.jdbcPassword
)
}

val testUser: User = {
val user = new User
user.setUid(Integer.valueOf(1))
user.setName("test_user")
user.setRole(UserRoleEnum.ADMIN)
user.setPassword("123")
user.setEmail("test_user@test.com")
user
}

val testWorkflowEntry: WorkflowPojo = {
val workflow = new WorkflowPojo
workflow.setName("test workflow")
workflow.setWid(Integer.valueOf(1))
workflow.setContent("test workflow content")
workflow.setDescription("test description")
workflow
}

val testWorkflowVersionEntry: WorkflowVersion = {
val workflowVersion = new WorkflowVersion
workflowVersion.setWid(Integer.valueOf(1))
workflowVersion.setVid(Integer.valueOf(1))
workflowVersion.setContent("test version content")
workflowVersion
}

val testWorkflowExecutionEntry: WorkflowExecutions = {
val workflowExecution = new WorkflowExecutions
workflowExecution.setEid(Integer.valueOf(1))
workflowExecution.setVid(Integer.valueOf(1))
workflowExecution.setUid(Integer.valueOf(1))
workflowExecution.setStatus(3.toByte)
workflowExecution.setEnvironmentVersion("test engine")
workflowExecution
}

def setUpWorkflowExecutionData(): Unit = {
val dslConfig = SqlServer.getInstance().context.configuration()
val userDao = new UserDao(dslConfig)
val workflowDao = new WorkflowDao(dslConfig)
val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig)
val workflowVersionDao = new WorkflowVersionDao(dslConfig)
userDao.insert(testUser)
workflowDao.insert(testWorkflowEntry)
workflowVersionDao.insert(testWorkflowVersionEntry)
workflowExecutionsDao.insert(testWorkflowExecutionEntry)
}

def cleanupWorkflowExecutionData(): Unit = {
val dslConfig = SqlServer.getInstance().context.configuration()
val userDao = new UserDao(dslConfig)
val workflowDao = new WorkflowDao(dslConfig)
val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig)
val workflowVersionDao = new WorkflowVersionDao(dslConfig)
workflowExecutionsDao.deleteById(1)
workflowVersionDao.deleteById(1)
workflowDao.deleteById(1)
userDao.deleteById(1)
}

}
6 changes: 6 additions & 0 deletions core/config/src/main/resources/storage.conf
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ storage {
url = "jdbc:postgresql://localhost:5432/texera_db?currentSchema=texera_db,public"
url = ${?STORAGE_JDBC_URL}

# Some e2e test cases require the user system. To make sure running those test cases can pass the CI, and that
# running them locally do not contaminate developers' own texera_db, we use another database with a different
# name for running these test cases.
url-for-test-cases = "jdbc:postgresql://localhost:5432/texera_db_for_test_cases?currentSchema=texera_db,public"
url-for-test-cases = ${?STORAGE_JDBC_URL_FOR_TEST_CASES}

username = "postgres"
username = ${?STORAGE_JDBC_USERNAME}

Expand Down
2 changes: 1 addition & 1 deletion core/config/src/main/resources/user-system.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

# See PR https://github.com/Texera/texera/pull/3326 for configuration guidelines.
user-sys {
enabled = false
enabled = true
enabled = ${?USER_SYS_ENABLED}

admin-username = "texera"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ object StorageConfig {

// JDBC specifics
val jdbcUrl: String = conf.getString("storage.jdbc.url")
val jdbcUrlForTestCases: String = conf.getString("storage.jdbc.url-for-test-cases")
val jdbcUsername: String = conf.getString("storage.jdbc.username")
val jdbcPassword: String = conf.getString("storage.jdbc.password")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ object SqlServer {
instance.get
}

def clearInstance(): Unit = {
instance = None
}

/**
* A utility function for create a transaction block using given sql context
* @param dsl the sql context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ trait MockTexeraDB {
value.close()
dbInstance = None
dslContext = None
SqlServer.clearInstance()
case None =>
// do nothing
}
Expand All @@ -92,12 +93,15 @@ trait MockTexeraDB {
} finally {
source.close()
}
val parts: Array[String] = content.split("(?m)^\\\\c texera_db")
val parts: Array[String] = content.split("(?m)^CREATE DATABASE :\"DB_NAME\";")
def removeCCommands(sql: String): String =
sql.linesIterator
.filterNot(_.trim.startsWith("\\c"))
.mkString("\n")
executeScriptInJDBC(embedded.getPostgresDatabase.getConnection, removeCCommands(parts(0)))
val createDBStatement =
"""DROP DATABASE IF EXISTS texera_db;
|CREATE DATABASE texera_db;""".stripMargin
executeScriptInJDBC(embedded.getPostgresDatabase.getConnection, createDBStatement)
val texeraDB = embedded.getDatabase(username, database)
var tablesAndIndexCreation = removeCCommands(parts(1))

Expand Down
17 changes: 14 additions & 3 deletions core/scripts/sql/texera_ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,29 @@
-- specific language governing permissions and limitations
-- under the License.

-- ============================================
-- 0. Specify the database name
-- (defaults to texera_db)
-- Override the name with:
-- psql -v DB_NAME=<alternative_name> ...
-- ============================================
\if :{?DB_NAME}
\else
\set DB_NAME 'texera_db'
\endif

-- ============================================
-- 1. Drop and recreate the database (psql only)
-- Remove if you already created texera_db
-- ============================================
\c postgres
DROP DATABASE IF EXISTS texera_db;
CREATE DATABASE texera_db;
DROP DATABASE IF EXISTS :"DB_NAME";
CREATE DATABASE :"DB_NAME";

-- ============================================
-- 2. Connect to the new database (psql only)
-- ============================================
\c texera_db
\c :"DB_NAME"

CREATE SCHEMA IF NOT EXISTS texera_db;
SET search_path TO texera_db, public;
Expand Down
Loading