Skip to content

Commit

Permalink
Merge pull request #598 from alephium/optimize-finalizing
Browse files Browse the repository at this point in the history
Optimize Finalizer Service
  • Loading branch information
tdroxler authored Jan 22, 2025
2 parents fac8aac + 031888d commit c228492
Show file tree
Hide file tree
Showing 16 changed files with 338 additions and 144 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ jobs:
password: ${{ secrets.DOCKER_PASSWORD }}

- name: Build and push Docker image
uses: actions/setup-java@v3
uses: actions/setup-java@v4
with:
distribution: temurin
java-version: 11
cache: sbt
- name: Setup sbt launcher
uses: sbt/setup-sbt@v1
- run: sbt app/docker
shell: bash
4 changes: 3 additions & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ jobs:
with:
fetch-depth: 0
submodules: true
- uses: actions/setup-java@v3
- uses: actions/setup-java@v4
with:
distribution: temurin
java-version: 11
cache: sbt
- name: Setup sbt launcher
uses: sbt/setup-sbt@v1
- name: Get the version
id: get_version
run: |
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/scala.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ jobs:

steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v3
- uses: actions/setup-java@v4
with:
distribution: temurin
java-version: 11
cache: sbt
- name: Setup sbt launcher
uses: sbt/setup-sbt@v1
- run: sbt ";scalafmtCheck;test:scalafmtCheck;scalafmtSbtCheck;scalastyle;test:scalastyle;test;unidoc;app/assembly"
4 changes: 3 additions & 1 deletion app/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ explorer {
"rub",
"try",
"cad",
"aud"
"aud",
"hkd",
"thb"
]
liquidity-minimum = 100 # in USD
liquidity-minimum = ${?EXPLORER_MARKET_LIQUIDITY_MINIMUM}
Expand Down
4 changes: 3 additions & 1 deletion app/src/main/resources/explorer-backend-openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -10032,7 +10032,9 @@
"rub",
"try",
"cad",
"aud"
"aud",
"hkd",
"thb"
]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,15 +328,14 @@ object TokenQueries extends StrictLogging {
def insertNFTMetadata(
metadata: NFTMetadata
): DBActionW[Int] = {
val cleanedTokenUri = metadata.tokenUri.replaceAll("\u0000", "")
sqlu"""
INSERT INTO nft_metadata (
"token",
"token_uri",
"collection_id",
"nft_index"
)
VALUES (${metadata.id},${cleanedTokenUri},${metadata.collectionId},${metadata.nftIndex})
VALUES (${metadata.id},${metadata.tokenUri},${metadata.collectionId},${metadata.nftIndex})
ON CONFLICT
DO NOTHING
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ object BlockFlowClient extends StrictLogging {
def valToString(value: api.model.Val): Option[String] =
value match {
case api.model.ValByteVec(bytes) =>
Some(bytes.utf8String)
Some(bytes.utf8String.replaceAll("\u0000", ""))
case _ => None
}

Expand Down
130 changes: 105 additions & 25 deletions app/src/main/scala/org/alephium/explorer/service/FinalizerService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.alephium.explorer.service

import scala.collection.immutable.ArraySeq
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.{Duration => ScalaDuration, FiniteDuration}

Expand All @@ -34,6 +35,8 @@ import org.alephium.explorer.persistence.schema.CustomGetResult._
import org.alephium.explorer.persistence.schema.CustomSetParameter._
import org.alephium.explorer.util.{Scheduler, TimeUtil}
import org.alephium.explorer.util.SlickUtil._
import org.alephium.protocol.Hash
import org.alephium.protocol.model.{BlockHash, TransactionId}
import org.alephium.util.{Duration, TimeStamp}

/*
Expand Down Expand Up @@ -92,7 +95,6 @@ case object FinalizerService extends StrictLogging {
(
for {
nb <- updateOutputs(from, to)
_ <- updateTokenOutputs(from, to)
_ <- updateLastFinalizedInputTime(to)
} yield nb
).transactionally
Expand All @@ -103,29 +105,108 @@ case object FinalizerService extends StrictLogging {
}.map(_ => logger.debug(s"Outputs updated"))
}

private def updateOutputs(from: TimeStamp, to: TimeStamp): DBActionR[Int] =
/*
* Update the `outputs` and `token_outputs` tables based on data from `inputs` within the specified time range.
*/
private def updateOutputs(from: TimeStamp, to: TimeStamp)(implicit
ec: ExecutionContext
): DBActionR[Int] =
for {
txs <- findTransactions(from, to)
inputs <- findInputs(txs)
nb <- updateOutputsWithInputs(inputs)
} yield nb

/*
* Search for transactions within the specified time range.
*/
private def findTransactions(
from: TimeStamp,
to: TimeStamp
): StreamAction[(TransactionId, BlockHash, TimeStamp)] =
sql"""
SELECT hash, block_hash, block_timestamp
FROM transactions
WHERE main_chain = true
AND block_timestamp >= $from
AND block_timestamp <= $to;
""".asAS[(TransactionId, BlockHash, TimeStamp)]

/*
* Search for inputs for the specified transactions.
*/
private def findInputs(
txs: ArraySeq[(TransactionId, BlockHash, TimeStamp)]
)(implicit ec: ExecutionContext): DBActionR[ArraySeq[(TransactionId, TimeStamp, Hash)]] =
DBIO
.sequence(txs.map { case (txHash, blockHash, blockTimestamp) =>
findInputsForTx(txHash, blockHash).map { outputRefKeys =>
outputRefKeys.map { case outputRefKey =>
(txHash, blockTimestamp, outputRefKey)
}
}
})
.map(_.flatten)

/*
* Search inputs for a given `txHash` and `blockHash` using the `inputs_tx_hash_block_hash_idx` index (txHash, blockHash).
*/
private def findInputsForTx(txId: TransactionId, blockHash: BlockHash): StreamAction[Hash] =
sql"""
SELECT output_ref_key
FROM inputs
WHERE main_chain = true
AND tx_hash = $txId
AND block_hash = $blockHash;
""".asAS[Hash]

/*
* Update `outputs` and `token_outputs` tables with `txHash` and `blockTimestamp`
* based on their corresponding `outputRefKeys`.
*/
private def updateOutputsWithInputs(
inputs: ArraySeq[(TransactionId, TimeStamp, Hash)]
)(implicit ec: ExecutionContext): DBActionR[Int] =
DBIO
.sequence(inputs.map { case (txHash, blockTimestamp, outputRefKey) =>
for {
nb <- updateOutput(txHash, blockTimestamp, outputRefKey)
_ <- updateTokenOutput(txHash, blockTimestamp, outputRefKey)
} yield nb
})
.map(_.sum)

/*
* Update the `output` with `txHash` and `blockTimestamp`
* for the specified `outputRefKey` using the primary key index.
*/
private def updateOutput(
txHash: TransactionId,
blockTimestamp: TimeStamp,
outputRefKey: Hash
): DBActionR[Int] =
sqlu"""
UPDATE outputs o
SET spent_finalized = i.tx_hash, spent_timestamp = i.block_timestamp
FROM inputs i
WHERE i.output_ref_key = o.key
AND o.main_chain=true
AND i.main_chain=true
AND i.block_timestamp >= $from
AND i.block_timestamp <= $to;
"""

private def updateTokenOutputs(from: TimeStamp, to: TimeStamp): DBActionR[Int] =
UPDATE outputs
SET spent_finalized = $txHash, spent_timestamp = $blockTimestamp
WHERE key = $outputRefKey
AND main_chain = true
"""

/*
* Update the `token_output` with `txHash` and `blockTimestamp`
* for the specified `outputRefKey` using the primary key index.
*/
private def updateTokenOutput(
txHash: TransactionId,
blockTimestamp: TimeStamp,
outputRefKey: Hash
): DBActionR[Int] =
sqlu"""
UPDATE token_outputs o
SET spent_finalized = i.tx_hash, spent_timestamp = i.block_timestamp
FROM inputs i
WHERE i.output_ref_key = o.key
AND o.main_chain=true
AND i.main_chain=true
AND i.block_timestamp >= $from
AND i.block_timestamp <= $to;
"""
UPDATE token_outputs
SET spent_finalized = $txHash, spent_timestamp = $blockTimestamp
WHERE key = $outputRefKey
AND main_chain = true
"""

def getStartEndTime()(implicit
executionContext: ExecutionContext
Expand Down Expand Up @@ -160,9 +241,8 @@ case object FinalizerService extends StrictLogging {

private def getMaxInputsTs(implicit ec: ExecutionContext): DBActionR[Option[TimeStamp]] =
sql"""
SELECT MAX(block_timestamp)
FROM block_headers
GROUP BY chain_from, chain_to order by max LIMIT 1;
SELECT MIN(block_timestamp)
FROM latest_blocks
""".asAS[TimeStamp].headOrNone

private def getLastFinalizedInputTime()(implicit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ case object TokenSupplyService extends TokenSupplyService with StrictLogging {
SELECT *
FROM token_supply
ORDER BY block_timestamp DESC
LIMIT 1
""".asASE[TokenSupplyEntity](tokenSupplyGetResult).headOrNone
).map(_.map { entity =>
TokenSupply(
Expand Down Expand Up @@ -225,12 +226,9 @@ case object TokenSupplyService extends TokenSupplyService with StrictLogging {
groupSetting.chainIndexes.map { chainIndex =>
sql"""
SELECT block_timestamp
FROM block_headers
WHERE main_chain = true
AND chain_from = ${chainIndex.from}
FROM latest_blocks
WHERE chain_from = ${chainIndex.from}
AND chain_to = ${chainIndex.to}
ORDER BY block_timestamp DESC
LIMIT 1
""".asAS[TimeStamp].headOrNone
}
)
Expand Down
Loading

0 comments on commit c228492

Please sign in to comment.