-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathLocalDynamoDbSpec.scala
108 lines (92 loc) · 3.72 KB
/
LocalDynamoDbSpec.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package dynamodb
import com.amazonaws.services.dynamodbv2.local.server.DynamoDBProxyServer
import dynamodb.DbHelper.Row
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model.{PutItemRequest, QueryResponse}
import zio._
import zio.console.Console
import zio.interop.javaz._
import zio.stream._
import scala.collection.JavaConverters._
class LocalDynamoDbSpec extends WordSpec with Matchers with BeforeAndAfterAll with DefaultRuntime {
private lazy val server: DynamoDBProxyServer = DbHelper.createServer
override def beforeAll(): Unit =
server.start()
override def afterAll(): Unit = server.stop()
"zio" should {
"stream dynamoDb table" in {
val dynamodb: DynamoDbAsyncClient = DbHelper.createClient
val completableFuture = dynamodb.createTable(DbHelper.createTableRequest)
val zioCreateTable = ZIO.fromCompletionStage(UIO.effectTotal(completableFuture))
val zioPutItems = ZIO.foreach(1 to 5) { i =>
val item: PutItemRequest = ScalaPutItemRequest(tableName = "Entitlement")
.cols(
"id" -> "id",
"entitlement" -> s"entitlement$i",
"orderDate" -> s"orderDate$i",
"accountId" -> s"1234abcd"
)
.item
ZIO.fromCompletionStage(UIO.effectTotal(dynamodb.putItem(item)))
}
val program: ZIO[Console, Throwable, Int] = for {
_ <- zioCreateTable
_ <- zioPutItems
processedCount <- streamWithJavaInterop(dynamodb)
.run(ZSink.foldLeftM(0) { (s: Int, a: Row) =>
// constant memory space processing here
console.putStrLn("Sink >>> " + a.get("orderDate")) *> UIO.effectTotal(s + 1)
})
} yield processedCount
val processedCount = unsafeRun(program)
processedCount shouldBe 5
}
}
def streamWithJavaInterop(client: DynamoDbAsyncClient): ZStream[Console, Throwable, Row] = {
val lekStart = QueryResponse.builder().build().lastEvaluatedKey()
val stream: ZStream[Console, Throwable, QueryResponse] = Stream.unfoldM(lekStart) { lek =>
val task: Task[QueryResponse] = // constant memory space processing here
ZIO.fromCompletionStage(UIO.effectTotal(DbHelper.findAllByIdInTheLastYear(client, limit = 3, "id", lek)))
task
.map { qr =>
Some((qr, qr.lastEvaluatedKey()))
}
}
stream
.tap(_ => console.putStrLn("Stream >>> fetched a batch ot items"))
.takeUntil { qr =>
qr.items.size == 0 || qr.lastEvaluatedKey.size == 0
}
.flatMap(qr => Stream.fromIterable(qr.items.asScala))
}
// we can manually lift CompletionStage to a Task as well
def streamWithManualJavaInterop(client: DynamoDbAsyncClient): ZStream[Console, Throwable, Row] = {
val lekStart = QueryResponse.builder().build().lastEvaluatedKey()
val stream: Stream[Throwable, QueryResponse] = Stream.unfoldM(lekStart) { lek =>
Task
.effectAsync[QueryResponse] { cb => // constant memory space processing here
DbHelper
.findAllByIdInTheLastYear(client, limit = 3, "id", lek)
.handle[Unit]((result, err) => {
err match {
case null =>
cb(IO.succeed(result))
case ex =>
cb(IO.fail(ex))
}
})
()
}
.map { qr =>
Some((qr, qr.lastEvaluatedKey()))
}
}
stream
.tap(_ => console.putStrLn("Stream >>> fetched a batch ot items"))
.takeUntil { qr =>
qr.items.size == 0 || qr.lastEvaluatedKey.size == 0
}
.flatMap(qr => Stream.fromIterable(qr.items.asScala))
}
}