Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement/2024 05 add large update #219

Merged
merged 8 commits into from
May 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,6 @@ import ldbc.sql.{ PreparedStatement, ResultSet }
private[jdbc] open class PreparedStatementImpl[F[_]: Sync](statement: java.sql.PreparedStatement)
extends PreparedStatement[F]:

@deprecated("This method cannot be called on a PreparedStatement.", "0.3.0")
override def executeQuery(sql: String): F[ResultSet[F]] = throw new UnsupportedOperationException(
"This method cannot be called on a PreparedStatement."
)

@deprecated("This method cannot be called on a PreparedStatement.", "0.3.0")
override def executeUpdate(sql: String): F[Int] = throw new UnsupportedOperationException(
"This method cannot be called on a PreparedStatement."
)

@deprecated("This method cannot be called on a PreparedStatement.", "0.3.0")
override def execute(sql: String): F[Boolean] = throw new UnsupportedOperationException(
"This method cannot be called on a PreparedStatement."
)

override def addBatch(sql: String): F[Unit] = throw new UnsupportedOperationException(
"This method cannot be called on a PreparedStatement."
)

override def executeQuery(): F[ResultSet[F]] = Sync[F].blocking(statement.executeQuery()).map(ResultSetImpl.apply)

override def executeUpdate(): F[Int] = Sync[F].blocking(statement.executeUpdate())
Expand Down Expand Up @@ -115,3 +96,11 @@ private[jdbc] open class PreparedStatementImpl[F[_]: Sync](statement: java.sql.P
Sync[F].blocking(statement.execute(sql, autoGeneratedKeys))

override def isClosed(): F[Boolean] = Sync[F].blocking(statement.isClosed)

override def getLargeUpdateCount(): F[Long] = Sync[F].blocking(statement.getLargeUpdateCount)

override def executeLargeUpdate(): F[Long] =
Sync[F].blocking(statement.executeLargeUpdate())

override def executeLargeBatch(): F[Array[Long]] =
Sync[F].blocking(statement.executeLargeBatch())
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,14 @@ private[jdbc] case class StatementImpl[F[_]: Sync](statement: java.sql.Statement
Sync[F].blocking(statement.execute(sql, autoGeneratedKeys))

override def isClosed(): F[Boolean] = Sync[F].blocking(statement.isClosed)

override def getLargeUpdateCount(): F[Long] = Sync[F].blocking(statement.getLargeUpdateCount)

override def executeLargeUpdate(sql: String): F[Long] =
Sync[F].blocking(statement.executeLargeUpdate(sql))

override def executeLargeUpdate(sql: String, autoGeneratedKeys: Int): F[Long] =
Sync[F].blocking(statement.executeLargeUpdate(sql, autoGeneratedKeys))

override def executeLargeBatch(): F[Array[Long]] =
Sync[F].blocking(statement.executeLargeBatch())
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.typelevel.otel4s.Attribute
*/
class BatchUpdateException(
message: String,
updateCounts: List[Int],
updateCounts: List[Long],
sqlState: Option[String] = None,
vendorCode: Option[Int] = None,
sql: Option[String] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ case class ERRPacket(

def toException(message: String, sql: String): SQLException = toException(message, Some(sql))

def toException(message: String, updateCounts: Vector[Int]): SQLException = BatchUpdateException(
def toException(message: String, updateCounts: Vector[Long]): SQLException = BatchUpdateException(
message = message,
updateCounts = updateCounts.toList,
sqlState = sqlState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ case class CallableStatementImpl[F[_]: Temporal: Exchange: Tracer](
}
} <* params.set(ListMap.empty)

override def executeUpdate(): F[Int] =
override def executeLargeUpdate(): F[Long] =
checkClosed() *>
checkNullOrEmptyQuery(sql) *>
exchange[F, Int]("statement") { (span: Span[F]) =>
exchange[F, Long]("statement") { (span: Span[F]) =>
if sql.toUpperCase.startsWith("CALL") then
executeCallStatement(span).flatMap { resultSets =>
resultSets.headOption match
Expand Down Expand Up @@ -131,7 +131,7 @@ case class CallableStatementImpl[F[_]: Temporal: Exchange: Tracer](
))*
) *>
sendQuery(buildQuery(sql, params)).flatMap {
case result: OKPacket => lastInsertId.set(result.lastInsertId) *> ev.pure(result.affectedRows.toInt)
case result: OKPacket => lastInsertId.set(result.lastInsertId) *> ev.pure(result.affectedRows)
case error: ERRPacket => ev.raiseError(error.toException("Failed to execute query", sql))
case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet"))
}
Expand Down Expand Up @@ -193,10 +193,10 @@ case class CallableStatementImpl[F[_]: Temporal: Exchange: Tracer](

override def clearBatch(): F[Unit] = batchedArgs.set(Vector.empty)

override def executeBatch(): F[Array[Int]] =
override def executeLargeBatch(): F[Array[Long]] =
checkClosed() *>
checkNullOrEmptyQuery(sql) *>
exchange[F, Array[Int]]("statement") { (span: Span[F]) =>
exchange[F, Array[Long]]("statement") { (span: Span[F]) =>
batchedArgs.get.flatMap { args =>
span.addAttributes(
(attributes ++ List(
Expand All @@ -211,7 +211,7 @@ case class CallableStatementImpl[F[_]: Temporal: Exchange: Tracer](
case q if q.startsWith("INSERT") =>
sendQuery(sql.split("VALUES").head + " VALUES" + args.mkString(","))
.flatMap {
case _: OKPacket => ev.pure(Array.fill(args.length)(Statement.SUCCESS_NO_INFO))
case _: OKPacket => ev.pure(Array.fill(args.length)(Statement.SUCCESS_NO_INFO.toLong))
case error: ERRPacket => ev.raiseError(error.toException("Failed to execute query", sql))
case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet"))
}
Expand All @@ -227,15 +227,15 @@ case class CallableStatementImpl[F[_]: Temporal: Exchange: Tracer](
)
) *>
args
.foldLeft(ev.pure(Vector.empty[Int])) { ($acc, _) =>
.foldLeft(ev.pure(Vector.empty[Long])) { ($acc, _) =>
for
acc <- $acc
result <-
protocol
.receive(GenericResponsePackets.decoder(protocol.initialPacket.capabilityFlags))
.flatMap {
case result: OKPacket =>
lastInsertId.set(result.lastInsertId) *> ev.pure(acc :+ result.affectedRows.toInt)
lastInsertId.set(result.lastInsertId) *> ev.pure(acc :+ result.affectedRows)
case error: ERRPacket =>
ev.raiseError(error.toException("Failed to execute batch", acc))
case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ case class ClientPreparedStatement[F[_]: Temporal: Exchange: Tracer](
} <* params.set(ListMap.empty)
}

override def executeUpdate(): F[Int] =
checkClosed() *> checkNullOrEmptyQuery(sql) *> exchange[F, Int]("statement") { (span: Span[F]) =>
override def executeLargeUpdate(): F[Long] =
checkClosed() *> checkNullOrEmptyQuery(sql) *> exchange[F, Long]("statement") { (span: Span[F]) =>
params.get.flatMap { params =>
span.addAttributes(
(attributes ++ List(
Expand All @@ -139,7 +139,7 @@ case class ClientPreparedStatement[F[_]: Temporal: Exchange: Tracer](
ComQueryPacket(buildQuery(sql, params), protocol.initialPacket.capabilityFlags, ListMap.empty)
) *>
protocol.receive(GenericResponsePackets.decoder(protocol.initialPacket.capabilityFlags)).flatMap {
case result: OKPacket => lastInsertId.set(result.lastInsertId) *> ev.pure(result.affectedRows.toInt)
case result: OKPacket => lastInsertId.set(result.lastInsertId) *> ev.pure(result.affectedRows)
case error: ERRPacket => ev.raiseError(error.toException("Failed to execute query", sql))
case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet"))
}
Expand All @@ -161,11 +161,11 @@ case class ClientPreparedStatement[F[_]: Temporal: Exchange: Tracer](

override def clearBatch(): F[Unit] = batchedArgs.set(Vector.empty)

override def executeBatch(): F[Array[Int]] =
override def executeLargeBatch(): F[Array[Long]] =
checkClosed() *> checkNullOrEmptyQuery(sql) *> (
sql.trim.toLowerCase match
case q if q.startsWith("insert") =>
exchange[F, Array[Int]]("statement") { (span: Span[F]) =>
exchange[F, Array[Long]]("statement") { (span: Span[F]) =>
protocol.resetSequenceId *>
batchedArgs.get.flatMap { args =>
span.addAttributes(
Expand All @@ -188,7 +188,7 @@ case class ClientPreparedStatement[F[_]: Temporal: Exchange: Tracer](
protocol
.receive(GenericResponsePackets.decoder(protocol.initialPacket.capabilityFlags))
.flatMap {
case _: OKPacket => ev.pure(Array.fill(args.length)(Statement.SUCCESS_NO_INFO))
case _: OKPacket => ev.pure(Array.fill(args.length)(Statement.SUCCESS_NO_INFO.toLong))
case error: ERRPacket => ev.raiseError(error.toException("Failed to execute query", sql))
case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet"))
}
Expand All @@ -198,7 +198,7 @@ case class ClientPreparedStatement[F[_]: Temporal: Exchange: Tracer](
case q if q.startsWith("update") || q.startsWith("delete") =>
protocol.resetSequenceId *>
protocol.comSetOption(EnumMySQLSetOption.MYSQL_OPTION_MULTI_STATEMENTS_ON) *>
exchange[F, Array[Int]]("statement") { (span: Span[F]) =>
exchange[F, Array[Long]]("statement") { (span: Span[F]) =>
protocol.resetSequenceId *>
batchedArgs.get.flatMap { args =>
span.addAttributes(
Expand All @@ -219,15 +219,15 @@ case class ClientPreparedStatement[F[_]: Temporal: Exchange: Tracer](
)
) *>
args
.foldLeft(ev.pure(Vector.empty[Int])) { ($acc, _) =>
.foldLeft(ev.pure(Vector.empty[Long])) { ($acc, _) =>
for
acc <- $acc
result <-
protocol
.receive(GenericResponsePackets.decoder(protocol.initialPacket.capabilityFlags))
.flatMap {
case result: OKPacket =>
lastInsertId.set(result.lastInsertId) *> ev.pure(acc :+ result.affectedRows.toInt)
lastInsertId.set(result.lastInsertId) *> ev.pure(acc :+ result.affectedRows)
case error: ERRPacket =>
ev.raiseError(error.toException("Failed to execute batch", acc))
case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ case class ServerPreparedStatement[F[_]: Temporal: Exchange: Tracer](
yield resultSet
}

override def executeUpdate(): F[Int] =
checkClosed() *> checkNullOrEmptyQuery(sql) *> exchange[F, Int]("statement") { (span: Span[F]) =>
override def executeLargeUpdate(): F[Long] =
checkClosed() *> checkNullOrEmptyQuery(sql) *> exchange[F, Long]("statement") { (span: Span[F]) =>
params.get.flatMap { params =>
span.addAttributes(
(attributes ++ List(
Expand All @@ -124,7 +124,7 @@ case class ServerPreparedStatement[F[_]: Temporal: Exchange: Tracer](
protocol.resetSequenceId *>
protocol.send(ComStmtExecutePacket(statementId, params)) *>
protocol.receive(GenericResponsePackets.decoder(protocol.initialPacket.capabilityFlags)).flatMap {
case result: OKPacket => lastInsertId.set(result.lastInsertId) *> ev.pure(result.affectedRows.toInt)
case result: OKPacket => lastInsertId.set(result.lastInsertId) *> ev.pure(result.affectedRows)
case error: ERRPacket => ev.raiseError(error.toException("Failed to execute query", sql))
case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet"))
}
Expand All @@ -146,11 +146,11 @@ case class ServerPreparedStatement[F[_]: Temporal: Exchange: Tracer](

override def clearBatch(): F[Unit] = batchedArgs.set(Vector.empty)

override def executeBatch(): F[Array[Int]] =
override def executeLargeBatch(): F[Array[Long]] =
checkClosed() *> checkNullOrEmptyQuery(sql) *> (
sql.trim.toLowerCase match
case q if q.startsWith("insert") =>
exchange[F, Array[Int]]("statement") { (span: Span[F]) =>
exchange[F, Array[Long]]("statement") { (span: Span[F]) =>
protocol.resetSequenceId *>
batchedArgs.get.flatMap { args =>
span.addAttributes(
Expand All @@ -173,7 +173,7 @@ case class ServerPreparedStatement[F[_]: Temporal: Exchange: Tracer](
protocol
.receive(GenericResponsePackets.decoder(protocol.initialPacket.capabilityFlags))
.flatMap {
case _: OKPacket => ev.pure(Array.fill(args.length)(Statement.SUCCESS_NO_INFO))
case _: OKPacket => ev.pure(Array.fill(args.length)(Statement.SUCCESS_NO_INFO.toLong))
case error: ERRPacket => ev.raiseError(error.toException("Failed to execute query", sql))
case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet"))
}
Expand All @@ -183,7 +183,7 @@ case class ServerPreparedStatement[F[_]: Temporal: Exchange: Tracer](
case q if q.startsWith("update") || q.startsWith("delete") =>
protocol.resetSequenceId *>
protocol.comSetOption(EnumMySQLSetOption.MYSQL_OPTION_MULTI_STATEMENTS_ON) *>
exchange[F, Array[Int]]("statement") { (span: Span[F]) =>
exchange[F, Array[Long]]("statement") { (span: Span[F]) =>
protocol.resetSequenceId *>
batchedArgs.get.flatMap { args =>
span.addAttributes(
Expand All @@ -204,15 +204,15 @@ case class ServerPreparedStatement[F[_]: Temporal: Exchange: Tracer](
)
) *>
args
.foldLeft(ev.pure(Vector.empty[Int])) { ($acc, _) =>
.foldLeft(ev.pure(Vector.empty[Long])) { ($acc, _) =>
for
acc <- $acc
result <-
protocol
.receive(GenericResponsePackets.decoder(protocol.initialPacket.capabilityFlags))
.flatMap {
case result: OKPacket =>
lastInsertId.set(result.lastInsertId) *> ev.pure(acc :+ result.affectedRows.toInt)
lastInsertId.set(result.lastInsertId) *> ev.pure(acc :+ result.affectedRows)
case error: ERRPacket =>
ev.raiseError(error.toException("Failed to execute batch", acc))
case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ private[ldbc] trait SharedPreparedStatement[F[_]: Temporal]
setTimestamp(parameterIndex, value.asInstanceOf[LocalDateTime])
case unknown => throw new SQLException(s"Unsupported object type ${ unknown.getClass.getName } for setObject")

override def executeUpdate(): F[Int] = executeLargeUpdate().map(_.toInt)

override def executeBatch(): F[Array[Int]] =
executeLargeBatch().map(_.map(_.toInt))

protected def buildQuery(original: String, params: ListMap[Int, Parameter]): String =
val query = original.toCharArray
params
Expand Down
Loading