Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Endless block streaming #357

Merged
merged 49 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
6fabd61
wip
arndey Jul 14, 2023
159bcba
simple version of endless block streaming
arndey Jul 14, 2023
0ab77bd
test improvement
arndey Jul 14, 2023
ee34e8d
Merge branch 'iroha2-dev' into iroha-java/330
arndey Jul 14, 2023
834cbea
minor fix
arndey Jul 14, 2023
264117a
Merge branch 'iroha2-dev' into iroha-java/330
arndey Jul 18, 2023
33723cc
Merge branch 'iroha2-dev' into iroha-java/330
arndey Jul 19, 2023
18c8208
Merge remote-tracking branch 'origin/iroha-java/330' into iroha-java/330
arndey Jul 19, 2023
747e337
Merge branch 'iroha2-dev' into iroha-java/330-feature
arndey Jul 19, 2023
de5a1a0
wip
arndey Jul 19, 2023
5ea700f
wip
arndey Jul 20, 2023
c4cacd1
non-working
arndey Jul 20, 2023
877b4f8
working version
arndey Jul 25, 2023
7543a4f
async && UUID
arndey Jul 25, 2023
908f284
refactoring
arndey Jul 25, 2023
f0fcd9a
Merge branch 'iroha-java/330-feature-2' into iroha-java/330
arndey Jul 25, 2023
7da7726
java block stream test
arndey Jul 26, 2023
e68341e
fix
arndey Jul 26, 2023
1467310
trying
arndey Jul 26, 2023
4d05f2f
trying fix
arndey Jul 26, 2023
f4473df
FIX TESTS
arndey Jul 27, 2023
aaab17e
comments fixes
arndey Jul 31, 2023
15cbc76
wip
arndey Jul 31, 2023
d860eb5
minor changes
arndey Aug 1, 2023
6b687b7
more improvements
arndey Aug 1, 2023
3b2a4cd
Lazy channel
Mingela Aug 1, 2023
b24fadf
Slight redesign
Mingela Aug 1, 2023
e4b57f5
ktlint
Mingela Aug 1, 2023
4f96d0d
Fixed JavaTest
Mingela Aug 1, 2023
e584901
Small fixes
Mingela Aug 1, 2023
a148bb4
ktlint
Mingela Aug 1, 2023
bcdd253
Merge branch 'iroha2-dev' into iroha-java/330
Mingela Aug 2, 2023
be70f1b
fix
arndey Aug 4, 2023
3048770
TEMPORARILY
arndey Aug 4, 2023
12c6894
Careful closing
Mingela Aug 8, 2023
a7adeff
Slight refactor
Mingela Aug 8, 2023
72123a5
DEBUG
Mingela Aug 8, 2023
f7086fd
ktlint
Mingela Aug 8, 2023
c94f472
fixup! DEBUG
Mingela Aug 8, 2023
75c347a
Even more accurate closing
Mingela Aug 8, 2023
e315ccb
ktlint
Mingela Aug 8, 2023
8a04c7c
Some more redesign
Mingela Aug 9, 2023
c2498ee
ktlint
Mingela Aug 9, 2023
083f7b7
Additional warn log
Mingela Aug 9, 2023
008f3d0
Additional warn log
Mingela Aug 9, 2023
adfef1c
Removed debugging log
Mingela Aug 9, 2023
c7cdc60
Merge remote-tracking branch 'origin/iroha-java/330' into iroha-java/330
arndey Aug 10, 2023
df081a4
minor changes
arndey Aug 10, 2023
e50a5d2
remove @Disabled
arndey Aug 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import jp.co.soramitsu.iroha2.generated.Asset
import jp.co.soramitsu.iroha2.generated.AssetDefinitionId
import jp.co.soramitsu.iroha2.generated.AssetId
import jp.co.soramitsu.iroha2.generated.AssetValue
import jp.co.soramitsu.iroha2.generated.CommittedBlock
import jp.co.soramitsu.iroha2.generated.DomainId
import jp.co.soramitsu.iroha2.generated.EvaluatesTo
import jp.co.soramitsu.iroha2.generated.Executable
Expand Down Expand Up @@ -41,6 +42,8 @@ import jp.co.soramitsu.iroha2.generated.TriggerOfFilterBoxAndExecutable
import jp.co.soramitsu.iroha2.generated.TriggerOfFilterBoxAndOptimizedExecutable
import jp.co.soramitsu.iroha2.generated.Value
import jp.co.soramitsu.iroha2.generated.ValueKind
import jp.co.soramitsu.iroha2.generated.VersionedBlockMessage
import jp.co.soramitsu.iroha2.generated.VersionedCommittedBlock
import jp.co.soramitsu.iroha2.generated.VersionedSignedTransaction
import jp.co.soramitsu.iroha2.transaction.TransactionBuilder
import net.i2p.crypto.eddsa.EdDSAEngine
Expand Down Expand Up @@ -378,6 +381,13 @@ fun IdBox.extractId(): Any = when (this) {
is IdBox.ParameterId -> this.parameterId
}

fun InstructionBox.extractAccount() = this
.cast<InstructionBox.Register>()
.registerBox.`object`.expression
.cast<Expression.Raw>().value
.cast<Value.Identifiable>().identifiableBox
.cast<IdentifiableBox.NewAccount>().newAccount

fun InstructionBox.Register.extractAccount() = this
.registerBox.`object`.expression
.cast<Expression.Raw>().value
Expand Down Expand Up @@ -494,11 +504,21 @@ fun EvaluatesTo<Value>.extractValueU32() = this
.getValue<Long>()

fun TriggerOfFilterBoxAndOptimizedExecutable.extractIsi() = this.action.executable.cast<Executable.Instructions>().vec

fun TriggerOfFilterBoxAndExecutable.extractIsi() = this.action.executable.cast<Executable.Instructions>().vec

fun TriggerOfFilterBoxAndOptimizedExecutable.extractSchedule() = this.action.filter.extractSchedule()

fun TriggerOfFilterBoxAndExecutable.extractSchedule() = this.action.filter.extractSchedule()

fun VersionedBlockMessage.extractBlock() = this
.cast<VersionedBlockMessage.V1>().blockMessage.versionedCommittedBlock
.extractBlock()

fun VersionedCommittedBlock.extractBlock() = this.cast<VersionedCommittedBlock.V1>().committedBlock

fun CommittedBlock.height() = this.header.height

fun FilterBox.extractSchedule() = this
.cast<FilterBox.Time>()
.timeEventFilter.executionTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import io.ktor.client.request.setBody
import io.ktor.client.statement.HttpResponse
import io.ktor.serialization.jackson.jackson
import io.ktor.websocket.Frame
import io.ktor.websocket.close
import io.ktor.websocket.readBytes
import jp.co.soramitsu.iroha2.IrohaClientException
import jp.co.soramitsu.iroha2.TransactionRejectedException
Expand Down Expand Up @@ -65,7 +64,7 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import org.slf4j.Logger
Expand Down Expand Up @@ -243,32 +242,50 @@ open class Iroha2Client(
}
}

fun subscribeToBlockStream(from: Long = 1, count: Int) = subscribeToBlockStream(
from,
count,
action = { block -> block },
)

/**
* Subscribe to block streaming
* @param from - block number to start from
* @param count - how many blocks to get before closing web socket
* @param count - how many blocks to get before closing the channel
* @param action - code which will be invoked after a new block received
* @param closeOn - if the condition returns true then the channel will be closed
*/
fun subscribeToBlockStream(from: Long, count: Int): Flow<VersionedBlockMessage> = flow {
fun <T> subscribeToBlockStream(
from: Long = 1,
count: Int? = null,
action: suspend (block: VersionedBlockMessage) -> T,
closeOn: suspend (block: VersionedBlockMessage) -> Boolean = { false },
): Flow<T> = channelFlow {
logger.info("Block stream channel opened")

val channel = this
var counter = 0
val peerUrl = getApiUrl()
val apiUrl = getApiUrl()
val request = VersionedBlockSubscriptionRequest.V1(
BlockSubscriptionRequest(BigInteger.valueOf(from)),
)
val payload = VersionedBlockSubscriptionRequest.encode(request)

client.webSocket(
host = peerUrl.host,
port = peerUrl.port,
host = apiUrl.host,
port = apiUrl.port,
path = WS_ENDPOINT_BLOCK_STREAM,
) {
logger.debug("WebSocket opened")
val request = VersionedBlockSubscriptionRequest.V1(
BlockSubscriptionRequest(BigInteger.valueOf(from)),
)
val payload = VersionedBlockSubscriptionRequest.encode(request)
send(payload.toFrame())

for (frame in incoming) {
logger.debug("Received frame: {}", frame)
val block = VersionedBlockMessage.decode(frame.readBytes())
emit(block)
counter++
if (counter == count) {
close()
channel.send(action(block))
if (++counter == count || closeOn(block)) {
logger.info("Block stream channel is closing")
channel.close()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,19 @@ class TransactionBuilder(builder: TransactionBuilder.() -> Unit = {}) {
)
}

@JvmOverloads
fun registerAssetDefinition(
name: Name,
domainId: DomainId,
assetValueType: AssetValueType,
metadata: Metadata = Metadata(mapOf()),
mintable: Mintable = Mintable.Infinitely(),
) = this.apply {
instructions.value.add(
Instructions.registerAssetDefinition(AssetDefinitionId(name, domainId), assetValueType, metadata, mintable),
)
}

fun registerAsset(
id: AssetId,
assetValue: AssetValue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,29 @@ import io.qameta.allure.Story
import jp.co.soramitsu.iroha2.annotations.Sdk
import jp.co.soramitsu.iroha2.annotations.SdkTestId
import jp.co.soramitsu.iroha2.client.Iroha2Client
import jp.co.soramitsu.iroha2.generated.AssetDefinitionId
import jp.co.soramitsu.iroha2.generated.AssetValueType
import jp.co.soramitsu.iroha2.generated.CommittedBlock
import jp.co.soramitsu.iroha2.generated.Executable
import jp.co.soramitsu.iroha2.generated.InstructionBox
import jp.co.soramitsu.iroha2.generated.TransactionPayload
import jp.co.soramitsu.iroha2.generated.VersionedBlockMessage
import jp.co.soramitsu.iroha2.generated.VersionedCommittedBlock
import jp.co.soramitsu.iroha2.generated.VersionedValidTransaction
import jp.co.soramitsu.iroha2.testengine.ALICE_ACCOUNT_ID
import jp.co.soramitsu.iroha2.testengine.BOB_ACCOUNT
import jp.co.soramitsu.iroha2.testengine.BOB_ACCOUNT_ID
import jp.co.soramitsu.iroha2.testengine.BOB_KEYPAIR
import jp.co.soramitsu.iroha2.testengine.DEFAULT_DOMAIN
import jp.co.soramitsu.iroha2.testengine.DEFAULT_DOMAIN_ID
import jp.co.soramitsu.iroha2.testengine.DefaultGenesis
import jp.co.soramitsu.iroha2.testengine.GENESIS
import jp.co.soramitsu.iroha2.testengine.IrohaTest
import jp.co.soramitsu.iroha2.testengine.NewAccountWithMetadata
import jp.co.soramitsu.iroha2.testengine.WithIroha
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils.random
import java.math.BigInteger
import kotlin.test.assertEquals
import kotlin.test.assertNotNull

Expand All @@ -38,89 +40,87 @@ class BlockStreamTest : IrohaTest<Iroha2Client>() {
@Test
@WithIroha([NewAccountWithMetadata::class])
@Story("Successful subscription to block stream")
@SdkTestId("when_subscribe_to_block_stream_then_success")
fun `when subscribe to block stream then success`(): Unit = runBlocking {
var blocksResult = client.subscribeToBlockStream(1, 2)
@SdkTestId("subscription_to_block_stream")
fun `subscription to block stream`(): Unit = runBlocking {
var blocksResult = client.subscribeToBlockStream(count = 2)
val newAssetName = "rox"

client.tx(BOB_ACCOUNT_ID, BOB_KEYPAIR) {
registerAssetDefinition(AssetDefinitionId(newAssetName.asName(), DEFAULT_DOMAIN_ID), AssetValueType.Store())
registerAssetDefinition(newAssetName.asName(), DEFAULT_DOMAIN_ID, AssetValueType.Store())
}
var blocks = mutableListOf<VersionedBlockMessage>()
blocksResult.collect { block -> blocks.add(block) }

val expectedSize = NewAccountWithMetadata().block.transactions.sumOf { it.size }
var instructions = checkBlockStructure(blocks[0], 1, GENESIS, GENESIS, expectedSize)
val registerDomain = instructions[0].cast<InstructionBox.Register>().extractDomain().id.name.string
var isi = checkBlockStructure(blocks[0], 1, GENESIS, GENESIS, expectedSize)
val registerDomain = isi[0].cast<InstructionBox.Register>().extractDomain().id.name.string

assertEquals(DEFAULT_DOMAIN_ID.asString(), registerDomain)
assertEquals(ALICE_ACCOUNT_ID.asString(), isi[1].extractAccount().id.asString())
assertEquals(BOB_ACCOUNT_ID.asString(), isi[2].extractAccount().id.asString())
assertEquals(
ALICE_ACCOUNT_ID.asString(),
instructions[1].cast<InstructionBox.Register>().extractAccount().id.asString(),
)
assertEquals(
BOB_ACCOUNT_ID.asString(),
instructions[2].cast<InstructionBox.Register>().extractAccount().id.asString(),
)
assertEquals(
"foo$ACCOUNT_ID_DELIMITER$DEFAULT_DOMAIN",
instructions[3].cast<InstructionBox.Register>().extractAccount().id.asString(),
"${NewAccountWithMetadata.ACCOUNT_NAME.string}$ACCOUNT_ID_DELIMITER$DEFAULT_DOMAIN",
isi[3].extractAccount().id.asString(),
)

instructions = checkBlockStructure(
blocks[1],
2,
DEFAULT_DOMAIN,
BOB_ACCOUNT,
1,
)
var newAssetDefinition = instructions[0].cast<InstructionBox.Register>().extractAssetDefinition()
isi = checkBlockStructure(blocks[1], 2, DEFAULT_DOMAIN, BOB_ACCOUNT, 1)
var newAssetDefinition = isi[0].cast<InstructionBox.Register>().extractAssetDefinition()
assertNotNull(newAssetDefinition)
assertEquals(newAssetName, newAssetDefinition.id.name.string)
assertEquals(DEFAULT_DOMAIN, newAssetDefinition.id.domainId.asString())

// get the last block second time
blocksResult = client.subscribeToBlockStream(2, 1)
blocks = mutableListOf<VersionedBlockMessage>()
blocks = mutableListOf()
blocksResult.collect { block -> blocks.add(block) }
instructions = checkBlockStructure(
blocks[0],
2,
DEFAULT_DOMAIN,
BOB_ACCOUNT,
1,
)
newAssetDefinition = instructions[0].cast<InstructionBox.Register>().extractAssetDefinition()
isi = checkBlockStructure(blocks[0], 2, DEFAULT_DOMAIN, BOB_ACCOUNT, 1)

newAssetDefinition = isi[0].cast<InstructionBox.Register>().extractAssetDefinition()
assertNotNull(newAssetDefinition)
assertEquals(newAssetName, newAssetDefinition.id.name.string)
assertEquals(DEFAULT_DOMAIN, newAssetDefinition.id.domainId.asString())
}

private fun getCommittedBlock(versionedBlockMessage: VersionedBlockMessage): CommittedBlock {
return versionedBlockMessage.cast<VersionedBlockMessage.V1>()
.blockMessage.versionedCommittedBlock.cast<VersionedCommittedBlock.V1>().committedBlock
}
@Test
@WithIroha([DefaultGenesis::class])
@Story("Successful subscription to endless block stream")
@SdkTestId("subscription_to_endless_block_stream")
fun `subscription to endless block stream`(): Unit = runBlocking {
val expectedLastHeight = BigInteger.TEN
val channel = client.subscribeToBlockStream(
action = { block -> block.extractBlock().height() },
closeOn = { block -> block.extractBlock().height() == expectedLastHeight },
)

var lastHeight = BigInteger.ZERO
launch { channel.collect { lastHeight = it } }

private fun getInstructionPayload(committedBlock: CommittedBlock): TransactionPayload {
return committedBlock.transactions[0]
.cast<VersionedValidTransaction.V1>()
.validTransaction
.payload
repeat(expectedLastHeight.intValueExact() + 5) {
delay(1000)
client.tx { setKeyValue(ALICE_ACCOUNT_ID, random(16).asName(), random(16).asValue()) }
}
assertEquals(expectedLastHeight, lastHeight)
}

private fun CommittedBlock.extractInstructionPayload() = this.transactions[0]
.cast<VersionedValidTransaction.V1>().validTransaction.payload

private fun checkBlockStructure(
blockMessage: VersionedBlockMessage,
height: Long,
instructionAccountDomain: String,
instructionAccount: String,
instructionSize: Int,
): List<InstructionBox> {
val committedBlock = getCommittedBlock(blockMessage)
val committedBlock = blockMessage.extractBlock()
val payload = committedBlock.extractInstructionPayload()
val instructions = payload.instructions.cast<Executable.Instructions>().vec

assertEquals(height, committedBlock.header.height.toLong())
val payload = getInstructionPayload(committedBlock)
assertEquals(instructionAccountDomain, payload.accountId.domainId.name.string)
assertEquals(instructionAccount, payload.accountId.name.string)
val instructions = payload.instructions.cast<Executable.Instructions>().vec
assertEquals(instructionSize, instructions.size)

return instructions
}
}