Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: rename query as queryAsFlow to avoid confusion when using the API. #89

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ class DatasourceWithConnectionDbTest {
@Test
fun `slow query on IO dispatcher`() {
// this is a bit of a naughty test as it includes a sleep,
// but wanted to validate this against a real JDBC driver and a real slow query.
// but wanted to validate this against a real JDBC driver and a real slow queryAsFlow.
runBlocking {
val dataSource = createDataSource(postgresql)
var slept = false
val queryJob =
launch {
val int =
dataSource.withConnection { connection ->
println("Executing slow query - $connection")
println("Executing slow queryAsFlow - $connection")
connection.querySingle<Int>(
"SELECT 1, pg_sleep(1)",
{ rs, _ -> rs.getInt(1) },
Expand All @@ -51,7 +51,7 @@ class DatasourceWithConnectionDbTest {
@Test
fun `execute tx on IO dispatcher`() {
// this is a bit of a naughty test as it includes a sleep,
// but wanted to validate this against a real JDBC driver and a real slow query.
// but wanted to validate this against a real JDBC driver and a real slow queryAsFlow.
runBlocking {
val dataSource = createDataSource(postgresql)
val insertJob =
Expand Down Expand Up @@ -82,7 +82,7 @@ class DatasourceWithConnectionDbTest {
@Test
fun `execute failed tx on IO dispatcher`() {
// this is a bit of a naughty test as it includes a sleep,
// but wanted to validate this against a real JDBC driver and a real slow query.
// but wanted to validate this against a real JDBC driver and a real slow queryAsFlow.
runBlocking {
val dataSource = createDataSource(postgresql)
val queryJob =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ class FlowTest {
createDataSource(postgresql).withConnection { connection ->
val one =
async {
printGreen("Executing query - $connection, filtering even ages")
printGreen("Executing queryAsFlow - $connection, filtering even ages")
connection
.query<SuperHero>(
.queryAsFlow<SuperHero>(
"SELECT * FROM super_heroes",
)
.filter {
Expand All @@ -86,9 +86,9 @@ class FlowTest {
}
val two =
async {
printRed("Executing query - $connection, filtering odd ages")
printRed("Executing queryAsFlow - $connection, filtering odd ages")
connection
.query<SuperHero>(
.queryAsFlow<SuperHero>(
"SELECT * FROM super_heroes",
)
.filter {
Expand Down Expand Up @@ -131,9 +131,9 @@ class FlowTest {
createDataSource(postgresql).withConnection { connection ->
val one =
async {
printGreen("Executing query - $connection, filtering even ages")
printGreen("Executing queryAsFlow - $connection, filtering even ages")
connection
.query<SuperHero>(
.queryAsFlow<SuperHero>(
"SELECT * FROM super_heroes",
)
.filter {
Expand All @@ -153,9 +153,9 @@ class FlowTest {
}
val two =
async {
printRed("Executing query - $connection, filtering odd ages")
printRed("Executing queryAsFlow - $connection, filtering odd ages")
connection
.query<SuperHero>(
.queryAsFlow<SuperHero>(
"SELECT * FROM super_heroes",
)
.filter {
Expand Down Expand Up @@ -196,9 +196,9 @@ class FlowTest {
createDataSource(postgresql).withConnection { connection ->
val job =
async {
println("Executing query - $connection, exit when age 20 is exceeded")
println("Executing queryAsFlow - $connection, exit when age 20 is exceeded")
connection
.query<SuperHero>(
.queryAsFlow<SuperHero>(
"SELECT * FROM super_heroes",
)
.map {
Expand All @@ -209,7 +209,7 @@ class FlowTest {
printGreen("[${Thread.currentThread().name}] acc=$a")
a.also {
if (a > 20) {
println("Cancelling query as cumulative age is $a")
println("Cancelling queryAsFlow as cumulative age is $a")
cancel("Cumulative age exceeded 20")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import javax.sql.DataSource
import kotlin.coroutines.CoroutineContext

/**
* create a DB connection and execute query/aueries using the `Dispatchers.IO` CoroutineScope.
* create a DB connection and execute queryAsFlow/aueries using the `Dispatchers.IO` CoroutineScope.
* The connection is closed after the block is executed.
* @param block The block of code to execute.
*/
suspend inline fun <T> DataSource.withConnection(crossinline block: suspend (Connection) -> T): T = withConnection(Dispatchers.IO, block)

/**
* create a DB connection and execute query/aueries using the given CoroutineScope.
* create a DB connection and execute queryAsFlow/aueries using the given CoroutineScope.
* The connection is closed after the block is executed.
* @param context The CoroutineContext to use.
* @param block The block of code to execute.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import java.sql.ResultSet
import java.sql.SQLException

/**
* Execute a SQL query and map the results to a Flow of instances of the specified class.
* Execute a SQL queryAsFlow and map the results to a Flow of instances of the specified class.
*
* This function uses reflection to automatically map the result set columns to the properties of the specified class.
* For advanced mappings, use the overloaded version with a custom `mapper` function.
Expand All @@ -26,7 +26,7 @@ import java.sql.SQLException
* data class User(val id: Int, val name: String)
*
* // Fetch all users where "active" is true
* val users: Flow<User> = connection.query(
* val users: Flow<User> = connection.queryAsFlow(
* sql = "SELECT id, name FROM users WHERE active = :active",
* "active" to true
* )
Expand All @@ -35,27 +35,27 @@ import java.sql.SQLException
* users.collect { println(it) }
* ```
*
* @param sql The SQL query to execute.
* @param args Optional key-value pairs representing named parameters to substitute into the query.
* @return The query result as a [Flow] of [T] instances.
* @param sql The SQL queryAsFlow to execute.
* @param args Optional key-value pairs representing named parameters to substitute into the queryAsFlow.
* @return The queryAsFlow result as a [Flow] of [T] instances.
* @throws java.sql.SQLException If there's a database error.
*/
inline fun <reified T : Any> Connection.query(
inline fun <reified T : Any> Connection.queryAsFlow(
sql: String,
vararg args: Pair<String, Any?>,
): Flow<T> =
query(
queryAsFlow(
sql,
createMapper(T::class.java)::createInstance,
*args,
)

/**
* Execute a SQL query and map the results to a Flow of instances of the specified class with a custom mapper.
* Execute a SQL queryAsFlow and map the results to a Flow of instances of the specified class with a custom mapper.
*
* **Example**:
* ```kotlin
* val users: Flow<User> = connection.query(
* val users: Flow<User> = connection.queryAsFlow(
* sql = "SELECT id, name FROM users",
* mapper = { resultSet, _ ->
* User(
Expand All @@ -67,18 +67,18 @@ inline fun <reified T : Any> Connection.query(
* users.collect { println(it) }
* ```
*
* @param sql The SQL query to execute.
* @param sql The SQL queryAsFlow to execute.
* @param mapper Custom mapping function to transform the [ResultSet] into the target class.
* @param args Optional parameters to be substituted in the SQL query during execution.
* @return The query result as a [Flow] of [T] instances.
* @param args Optional parameters to be substituted in the SQL queryAsFlow during execution.
* @return The queryAsFlow result as a [Flow] of [T] instances.
* @throws KapperQueryException If there's a database error.
*/
inline fun <reified T : Any> Connection.query(
inline fun <reified T : Any> Connection.queryAsFlow(
sql: String,
noinline mapper: (ResultSet, Map<String, Field>) -> T,
vararg args: Pair<String, Any?>,
): Flow<T> {
require(sql.isNotBlank()) { "SQL query cannot be empty or blank" }
require(sql.isNotBlank()) { "SQL queryAsFlow cannot be empty or blank" }
this.executeQuery(Query(sql), args.toMap()).let { rs ->
return queryFlow(rs, mapper, sql)
}
Expand All @@ -102,7 +102,7 @@ fun <T : Any> queryFlow(
logger.info("Query results processing cancelled: ${e.message}")
throw e
} catch (e: SQLException) {
"Error executing query: $sql".also {
"Error executing queryAsFlow: $sql".also {
logger.warn(it, e)
throw KapperQueryException(it, e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class FlowQueryTest {
@Test
fun `when query emit each row as a flow item`() {
runBlocking {
connection.query<Hero>(
connection.queryAsFlow<Hero>(
queryTemplate,
mapper,
"id" to 1,
Expand All @@ -64,7 +64,7 @@ class FlowQueryTest {
runBlocking {
mockkStatic("net.samyn.kapper.MapperFactoryKt") {
every { createMapper(Hero::class.java).createInstance(any(), any()) } returns result
connection.query<Hero>(
connection.queryAsFlow<Hero>(
queryTemplate,
"id" to 1,
).toList() shouldBe listOf(result)
Expand All @@ -75,7 +75,7 @@ class FlowQueryTest {
@Test
fun `when query emit close after collection`() {
runBlocking {
connection.query<Hero>(
connection.queryAsFlow<Hero>(
queryTemplate,
mapper,
"id" to 1,
Expand All @@ -89,7 +89,7 @@ class FlowQueryTest {
val ex = Exception("test")
runBlocking {
try {
connection.query<Hero>(
connection.queryAsFlow<Hero>(
queryTemplate,
mapper,
"id" to 1,
Expand All @@ -105,13 +105,13 @@ class FlowQueryTest {

@Test
fun `when cancel close`() {
// never finish the query
// never finish the queryAsFlow
every { resultSet.next() } returns true
runBlocking {
var count = 0
val job =
async {
connection.query<Hero>(
connection.queryAsFlow<Hero>(
queryTemplate,
mapper,
"id" to 1,
Expand All @@ -134,7 +134,7 @@ class FlowQueryTest {
every { resultSet.next() } throws ex
runBlocking {
shouldThrow<KapperQueryException> {
connection.query<Hero>(
connection.queryAsFlow<Hero>(
queryTemplate,
mapper,
"id" to 1,
Expand All @@ -147,7 +147,7 @@ class FlowQueryTest {
fun `when sql blank throw`() {
runBlocking {
shouldThrow<IllegalArgumentException> {
connection.query<Hero>(
connection.queryAsFlow<Hero>(
"",
mapper,
"id" to 1,
Expand Down