Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate texera database from mysql to postgres #3254

Merged
merged 25 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
package edu.uci.ics.amber.engine.architecture.scheduling

import edu.uci.ics.amber.core.storage.{DocumentFactory, StorageConfig}
import edu.uci.ics.amber.core.storage.DocumentFactory
import edu.uci.ics.amber.core.tuple.Tuple
import edu.uci.ics.amber.core.virtualidentity.ActorVirtualIdentity
import edu.uci.ics.amber.core.workflow.WorkflowContext
import edu.uci.ics.amber.engine.architecture.scheduling.DefaultCostEstimator.DEFAULT_OPERATOR_COST
import edu.uci.ics.amber.engine.common.AmberLogging
import edu.uci.ics.amber.core.virtualidentity.ActorVirtualIdentity
import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.dao.SqlServer.withTransaction
import edu.uci.ics.texera.dao.jooq.generated.Tables.{WORKFLOW_EXECUTIONS, WORKFLOW_VERSION}
import org.jooq.types.UInteger

import scala.util.{Failure, Success, Try}
import java.net.URI
import scala.util.{Failure, Success, Try}

/**
* A cost estimator should estimate a cost of running a region under the given resource constraints as units.
Expand Down Expand Up @@ -85,11 +84,7 @@ class DefaultCostEstimator(

val uriString: String = withTransaction(
SqlServer
.getInstance(
StorageConfig.jdbcUrl,
StorageConfig.jdbcUsername,
StorageConfig.jdbcPassword
)
.getInstance()
.createDSLContext()
) { context =>
context
Expand All @@ -99,7 +94,7 @@ class DefaultCostEstimator(
.on(WORKFLOW_VERSION.VID.eq(WORKFLOW_EXECUTIONS.VID))
.where(
WORKFLOW_VERSION.WID
.eq(UInteger.valueOf(wid))
.eq(wid.toInt)
.and(WORKFLOW_EXECUTIONS.STATUS.eq(3.toByte))
)
.orderBy(WORKFLOW_EXECUTIONS.STARTING_TIME.desc())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package edu.uci.ics.texera.web

import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.storage.DocumentFactory
import edu.uci.ics.amber.core.storage.{DocumentFactory, StorageConfig}
import edu.uci.ics.amber.core.storage.util.mongo.MongoDatabaseManager
import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.engine.architecture.controller.ControllerConfig
Expand All @@ -16,6 +16,7 @@ import edu.uci.ics.amber.engine.common.client.AmberClient
import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage
import edu.uci.ics.amber.engine.common.{AmberConfig, AmberRuntime, Utils}
import edu.uci.ics.amber.core.virtualidentity.ExecutionIdentity
import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.web.auth.JwtAuth.setupJwtAuth
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.WorkflowExecutions
import edu.uci.ics.texera.web.resource.WorkflowWebsocketResource
Expand Down Expand Up @@ -98,6 +99,12 @@ class ComputingUnitMaster extends io.dropwizard.Application[Configuration] with

override def run(configuration: Configuration, environment: Environment): Unit = {

SqlServer.initConnection(
StorageConfig.jdbcUrl,
StorageConfig.jdbcUsername,
StorageConfig.jdbcPassword
)

val webSocketUpgradeFilter =
WebSocketUpgradeFilter.configureContext(environment.getApplicationContext)
webSocketUpgradeFilter.getFactory.getPolicy.setIdleTimeout(Duration.ofHours(1).toMillis)
Expand Down Expand Up @@ -149,7 +156,7 @@ class ComputingUnitMaster extends io.dropwizard.Application[Configuration] with
*/
private def cleanExecutions(
executions: List[WorkflowExecutions],
statusChangeFunc: Byte => Byte
statusChangeFunc: Short => Short
): Unit = {
// drop the collection and update the status to ABORTED
executions.foreach(execEntry => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ object ComputingUnitWorker {

def main(args: Array[String]): Unit = {
val argMap = parseArgs(args)

// start actor system worker node
AmberRuntime.startActorWorker(argMap.get(Symbol("serverAddr")).asInstanceOf[Option[String]])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.texera.web.auth.JwtAuth.jwtConsumer
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.User
import org.apache.http.client.utils.URLEncodedUtils
import org.jooq.types.UInteger

import java.net.URI
import java.nio.charset.Charset
Expand Down Expand Up @@ -39,7 +38,7 @@ class ServletAwareConfigurator extends ServerEndpointConfig.Configurator with La
config.getUserProperties.put(
classOf[User].getName,
new User(
UInteger.valueOf(claims.getClaimValue("userId").asInstanceOf[Long]),
claims.getClaimValue("userId").asInstanceOf[Long].toInt,
claims.getSubject,
String.valueOf(claims.getClaimValue("email").asInstanceOf[String]),
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.github.dirkraft.dropwizard.fileassets.FileAssetsBundle
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.storage.StorageConfig
import edu.uci.ics.amber.core.storage.util.dataset.GitVersionControlLocalFileStorage
import edu.uci.ics.amber.engine.common.Utils
import edu.uci.ics.amber.util.PathUtils
import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.web.auth.JwtAuth.setupJwtAuth
import edu.uci.ics.texera.web.auth.SessionUser
import edu.uci.ics.texera.web.resource._
Expand Down Expand Up @@ -98,6 +100,12 @@ class TexeraWebApplication
// serve backend at /api
environment.jersey.setUrlPattern("/api/*")

SqlServer.initConnection(
StorageConfig.jdbcUrl,
StorageConfig.jdbcUsername,
StorageConfig.jdbcPassword
)

// redirect all 404 to index page, according to Angular routing requirements
val eph = new ErrorPageErrorHandler
eph.addErrorPage(404, "/")
Expand Down Expand Up @@ -148,5 +156,6 @@ class TexeraWebApplication
environment.jersey.register(classOf[AdminExecutionResource])
environment.jersey.register(classOf[UserQuotaResource])
environment.jersey.register(classOf[AIAssistantResource])

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@ package edu.uci.ics.texera.web
import edu.uci.ics.texera.web.model.websocket.request.TexeraWebSocketRequest
import io.reactivex.rxjava3.disposables.Disposable
import io.reactivex.rxjava3.subjects.PublishSubject
import org.jooq.types.UInteger

import scala.reflect.{ClassTag, classTag}

class WebsocketInput(errorHandler: Throwable => Unit) {
private val wsInput = PublishSubject.create[(TexeraWebSocketRequest, Option[UInteger])]()
private val wsInput = PublishSubject.create[(TexeraWebSocketRequest, Option[Integer])]()

def subscribe[T <: TexeraWebSocketRequest: ClassTag](
callback: (T, Option[UInteger]) => Unit
callback: (T, Option[Integer]) => Unit
): Disposable = {
wsInput.subscribe((evt: (TexeraWebSocketRequest, Option[UInteger])) => {
wsInput.subscribe((evt: (TexeraWebSocketRequest, Option[Integer])) => {
evt._1 match {
case req: T if classTag[T].runtimeClass.isInstance(req) =>
try {
Expand All @@ -28,7 +27,7 @@ class WebsocketInput(errorHandler: Throwable => Unit) {
})
}

def onNext(req: TexeraWebSocketRequest, uidOpt: Option[UInteger]): Unit = {
def onNext(req: TexeraWebSocketRequest, uidOpt: Option[Integer]): Unit = {
wsInput.onNext((req, uidOpt))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package edu.uci.ics.texera.web.auth

import edu.uci.ics.texera.web.auth.GuestAuthFilter.GUEST
import edu.uci.ics.texera.dao.jooq.generated.enums.UserRole
import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.User
import io.dropwizard.auth.AuthFilter

Expand All @@ -18,7 +18,7 @@ import javax.ws.rs.core.SecurityContext
override protected def newInstance = new GuestAuthFilter
}

val GUEST: User = new User(null, "guest", null, null, null, UserRole.REGULAR, null)
val GUEST: User = new User(null, "guest", null, null, null, UserRoleEnum.REGULAR, null)
}

@PreMatching
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package edu.uci.ics.texera.web.auth

import edu.uci.ics.texera.dao.jooq.generated.enums.UserRole
import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.User
import org.jooq.types.UInteger

import java.security.Principal

Expand All @@ -11,11 +10,11 @@ class SessionUser(val user: User) extends Principal {

override def getName: String = user.getName

def getUid: UInteger = user.getUid
def getUid: Integer = user.getUid

def getEmail: String = user.getEmail

def getGoogleId: String = user.getGoogleId

def isRoleOf(role: UserRole): Boolean = user.getRole == role
def isRoleOf(role: UserRoleEnum): Boolean = user.getRole == role
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package edu.uci.ics.texera.web.auth

import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.texera.dao.jooq.generated.enums.UserRole
import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.User
import io.dropwizard.auth.Authenticator
import org.jooq.types.UInteger

import org.jose4j.jwt.consumer.JwtContext

import java.util.Optional
Expand All @@ -16,8 +16,9 @@ object UserAuthenticator extends Authenticator[JwtContext, SessionUser] with Laz
try {
val userName = context.getJwtClaims.getSubject
val email = context.getJwtClaims.getClaimValue("email").asInstanceOf[String]
val userId = UInteger.valueOf(context.getJwtClaims.getClaimValue("userId").asInstanceOf[Long])
val role = UserRole.valueOf(context.getJwtClaims.getClaimValue("role").asInstanceOf[String])
val userId = context.getJwtClaims.getClaimValue("userId").asInstanceOf[Long].toInt
val role =
UserRoleEnum.valueOf(context.getJwtClaims.getClaimValue("role").asInstanceOf[String])
val googleId = context.getJwtClaims.getClaimValue("googleId").asInstanceOf[String]
val user = new User(userId, userName, email, null, googleId, role, null)
Optional.of(new SessionUser(user))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package edu.uci.ics.texera.web.auth

import edu.uci.ics.texera.dao.jooq.generated.enums.UserRole
import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum
import io.dropwizard.auth.Authorizer

object UserRoleAuthorizer extends Authorizer[SessionUser] {
override def authorize(user: SessionUser, role: String): Boolean = {
user.isRoleOf(UserRole.valueOf(role))
user.isRoleOf(UserRoleEnum.valueOf(role))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import edu.uci.ics.texera.web.model.collab.response.HeartBeatResponse
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.User
import edu.uci.ics.texera.web.resource.CollaborationResource._
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowAccessResource
import org.jooq.types.UInteger

import javax.websocket.server.ServerEndpoint
import javax.websocket.{OnClose, OnMessage, OnOpen, Session}
Expand All @@ -25,7 +24,7 @@ object CollaborationResource {
final val DUMMY_WID = -1

private def checkIsReadOnly(wId: Int, uId: Int): Boolean = {
!WorkflowAccessResource.hasWriteAccess(UInteger.valueOf(wId), UInteger.valueOf(uId))
!WorkflowAccessResource.hasWriteAccess(Integer.valueOf(wId), Integer.valueOf(uId))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package edu.uci.ics.texera.web.resource

import edu.uci.ics.amber.core.storage.StorageConfig
import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.web.auth.SessionUser
import edu.uci.ics.texera.dao.jooq.generated.Tables.USER_CONFIG
import edu.uci.ics.texera.dao.jooq.generated.tables.daos.UserConfigDao
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{User, UserConfig}
import edu.uci.ics.texera.web.auth.SessionUser
import io.dropwizard.auth.Auth

import javax.annotation.security.RolesAllowed
Expand All @@ -25,7 +24,7 @@ import scala.jdk.CollectionConverters.CollectionHasAsScala
class UserConfigResource {
final private lazy val userDictionaryDao = new UserConfigDao(
SqlServer
.getInstance(StorageConfig.jdbcUrl, StorageConfig.jdbcUsername, StorageConfig.jdbcPassword)
.getInstance()
.createDSLContext()
.configuration
)
Expand All @@ -43,7 +42,7 @@ class UserConfigResource {
*/
private def getDict(user: User): Map[String, String] = {
SqlServer
.getInstance(StorageConfig.jdbcUrl, StorageConfig.jdbcUsername, StorageConfig.jdbcPassword)
.getInstance()
.createDSLContext()
.select()
.from(USER_CONFIG)
Expand Down Expand Up @@ -79,7 +78,7 @@ class UserConfigResource {
*/
private def getValueByKey(user: User, key: String): String = {
SqlServer
.getInstance(StorageConfig.jdbcUrl, StorageConfig.jdbcUsername, StorageConfig.jdbcPassword)
.getInstance()
.createDSLContext()
.fetchOne(
USER_CONFIG,
Expand Down Expand Up @@ -118,7 +117,7 @@ class UserConfigResource {
private def dictEntryExists(user: User, key: String): Boolean = {
userDictionaryDao.existsById(
SqlServer
.getInstance(StorageConfig.jdbcUrl, StorageConfig.jdbcUsername, StorageConfig.jdbcPassword)
.getInstance()
.createDSLContext()
.newRecord(USER_CONFIG.UID, USER_CONFIG.KEY)
.values(user.getUid, key)
Expand Down Expand Up @@ -154,7 +153,7 @@ class UserConfigResource {
private def deleteDictEntry(user: User, key: String): Unit = {
userDictionaryDao.deleteById(
SqlServer
.getInstance(StorageConfig.jdbcUrl, StorageConfig.jdbcUsername, StorageConfig.jdbcPassword)
.getInstance()
.createDSLContext()
.newRecord(USER_CONFIG.UID, USER_CONFIG.KEY)
.values(user.getUid, key)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package edu.uci.ics.texera.web.resource.auth

import edu.uci.ics.amber.core.storage.StorageConfig
import edu.uci.ics.amber.engine.common.AmberConfig
import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.web.auth.JwtAuth._
Expand All @@ -11,7 +10,7 @@ import edu.uci.ics.texera.web.model.http.request.auth.{
}
import edu.uci.ics.texera.web.model.http.response.TokenIssueResponse
import edu.uci.ics.texera.dao.jooq.generated.Tables.USER
import edu.uci.ics.texera.dao.jooq.generated.enums.UserRole
import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum
import edu.uci.ics.texera.dao.jooq.generated.tables.daos.UserDao
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.User
import edu.uci.ics.texera.web.resource.auth.AuthResource._
Expand All @@ -24,7 +23,7 @@ object AuthResource {

final private lazy val userDao = new UserDao(
SqlServer
.getInstance(StorageConfig.jdbcUrl, StorageConfig.jdbcUsername, StorageConfig.jdbcPassword)
.getInstance()
.createDSLContext()
.configuration
)
Expand All @@ -40,7 +39,7 @@ object AuthResource {
def retrieveUserByUsernameAndPassword(name: String, password: String): Option[User] = {
Option(
SqlServer
.getInstance(StorageConfig.jdbcUrl, StorageConfig.jdbcUsername, StorageConfig.jdbcPassword)
.getInstance()
.createDSLContext()
.select()
.from(USER)
Expand Down Expand Up @@ -88,7 +87,7 @@ class AuthResource {
val user = new User
user.setName(username)
user.setEmail(username)
user.setRole(UserRole.ADMIN)
user.setRole(UserRoleEnum.ADMIN)
// hash the plain text password
user.setPassword(new StrongPasswordEncryptor().encryptPassword(request.password))
userDao.insert(user)
Expand Down
Loading
Loading