Skip to content
This repository has been archived by the owner on Aug 1, 2023. It is now read-only.

Move stats to a separate job #367

Merged
merged 3 commits into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tuweni.eth.crawler.rest
package org.apache.tuweni.eth.crawler

data class ClientIdInfo(val clientId: String) : Comparable<String> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class CrawlerApplication(
val thread = Thread("crawler")
thread.isDaemon = true
thread
}.asCoroutineDispatcher()
}.asCoroutineDispatcher(),
) : CoroutineScope {

private val metricsService = MetricsService(
Expand All @@ -99,7 +99,7 @@ class CrawlerApplication(
enablePrometheus = config.metricsPrometheusEnabled()
)

fun createCoroutineContext() = Executors.newFixedThreadPool(
private fun createCoroutineContext() = Executors.newFixedThreadPool(
config.numberOfThreads()
) {
val thread = Thread("crawler")
Expand All @@ -118,7 +118,9 @@ class CrawlerApplication(
.load()
flyway.migrate()
val crawlerMeter = metricsService.meterSdkProvider["crawler"]
val repo = RelationalPeerRepository(ds, config.peerCacheExpiration(), config.clientIdsInterval(), config.clientsStatsDelay(), crawlerMeter, config.upgradesVersions(), createCoroutineContext())
val repo = RelationalPeerRepository(ds, config.peerCacheExpiration(), config.clientIdsInterval())
val statsJob =
StatsJob(repo, config.upgradesVersions(), crawlerMeter, config.clientsStatsDelay(), coroutineContext)

logger.info("Initial bootnodes: ${config.bootNodes()}")
val scraper = Scraper(
Expand Down Expand Up @@ -195,8 +197,15 @@ class CrawlerApplication(
connect(rlpxService, it.nodeId, InetSocketAddress(it.endpoint.address, it.endpoint.tcpPort ?: 30303))
}
}
val restService =
CrawlerRESTService(port = config.restPort(), networkInterface = config.restNetworkInterface(), repository = repo, maxRequestsPerSec = config.maxRequestsPerSec(), meter = meter, allowedOrigins = config.corsAllowedOrigins())
val restService = CrawlerRESTService(
port = config.restPort(),
networkInterface = config.restNetworkInterface(),
repository = repo,
stats = statsJob,
maxRequestsPerSec = config.maxRequestsPerSec(),
meter = meter,
allowedOrigins = config.corsAllowedOrigins()
)
val refreshLoop = AtomicBoolean(true)
val ethstatsDataRepository = EthstatsDataRepository(ds)
val ethstatsServer = EthStatsServer(
Expand All @@ -211,7 +220,7 @@ class CrawlerApplication(
Runtime.getRuntime().addShutdownHook(
Thread {
runBlocking {
repo.stop()
statsJob.stop()
refreshLoop.set(false)
scraper.stop().await()
dnsDaemon.close()
Expand All @@ -225,7 +234,7 @@ class CrawlerApplication(
}
)
runBlocking {
repo.start()
statsJob.start()
restService.start().await()
launch {
while (refreshLoop.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class CrawlerRESTService(
val allowedMethods: String = "*",
val allowedHeaders: String = "*",
val repository: RelationalPeerRepository,
val stats: StatsJob,
val meter: Meter,
override val coroutineContext: CoroutineContext = Dispatchers.Default,
) : CoroutineScope {
Expand Down Expand Up @@ -103,6 +104,7 @@ class CrawlerRESTService(
newServer.stopAtShutdown = true
newServer.start()
serHol.servlet.servletConfig.servletContext.setAttribute("repo", repository)
serHol.servlet.servletConfig.servletContext.setAttribute("stats", stats)
val restMetrics = RESTMetrics(
meter.longCounterBuilder("peers").setDescription("Number of times peers have been requested").build(),
meter.longCounterBuilder("clients").setDescription("Number of times client stats have been requested").build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@
*/
package org.apache.tuweni.eth.crawler

import io.opentelemetry.api.metrics.Meter
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.concurrent.AsyncResult
import org.apache.tuweni.concurrent.ExpiringMap
Expand All @@ -32,23 +29,18 @@ import org.apache.tuweni.devp2p.Peer
import org.apache.tuweni.devp2p.PeerRepository
import org.apache.tuweni.devp2p.eth.Status
import org.apache.tuweni.devp2p.parseEnodeUri
import org.apache.tuweni.eth.crawler.rest.ClientIdInfo
import org.apache.tuweni.rlpx.wire.WireConnection
import org.slf4j.LoggerFactory
import java.net.URI
import java.sql.Timestamp
import java.util.UUID
import java.util.concurrent.atomic.AtomicBoolean
import javax.sql.DataSource
import kotlin.coroutines.CoroutineContext

open class RelationalPeerRepository(
private val dataSource: DataSource,
private val expiration: Long = 5 * 60 * 1000L,
private val clientIdsInterval: Long = 24 * 60 * 60 * 1000 * 2L,
private val clientsStatsDelay: Long = 30 * 1000L,
private val meter: Meter,
private val upgradeConfigs: List<UpgradeConfig> = listOf(),
override val coroutineContext: CoroutineContext = Dispatchers.Default,
) : CoroutineScope, PeerRepository {

Expand All @@ -58,11 +50,6 @@ open class RelationalPeerRepository(

private val listeners = mutableListOf<(Peer) -> Unit>()
private val peerCache = ExpiringMap<SECP256K1.PublicKey, String>()
private val totalClientsGauge =
meter.longValueRecorderBuilder("totalClients").setDescription("Number of nodes used to compute client stats")
.build()
private val clientCalculationsCounter =
meter.longCounterBuilder("clients").setDescription("Number of times clients were computed").build()

override fun addListener(listener: (Peer) -> Unit) {
listeners.add(listener)
Expand Down Expand Up @@ -238,60 +225,6 @@ open class RelationalPeerRepository(
}
}

private var clientIds: List<ClientInfo>? = null
private var clientsStats: Map<String, Map<String, Long>>? = null
private var upgradeStats: MutableMap<String, ClientReadyStats> = mutableMapOf()
private val started = AtomicBoolean(false)

fun start() {
logger.info("Starting repo")
launch {
started.set(true)
while (started.get()) {
logger.info("Finding client ids")
val newClientIds = getClientIdsInternal()
logger.info("Found client ids ${newClientIds.size}")
clientIds = newClientIds
val newClientsStats = mutableMapOf<String, MutableMap<String, Long>>()
val total = newClientIds.stream().mapToInt { it.count }.sum()

newClientIds.forEach { newClientCount ->
val clientIdInfo = ClientIdInfo(newClientCount.clientId)
val versionStats = newClientsStats.computeIfAbsent(clientIdInfo.name) { mutableMapOf() }
val statsCount = versionStats[clientIdInfo.version] ?: 0
versionStats[clientIdInfo.version] = statsCount + newClientCount.count
}
for (upgradeConfig in upgradeConfigs) {
var upgradeReady = 0
newClientIds.forEach { newClientCount ->
val clientIdInfo = ClientIdInfo(newClientCount.clientId)
upgradeConfig.versions.get(clientIdInfo.name().lowercase())?.let { upgradeVersion ->
if (clientIdInfo >= upgradeVersion) {
upgradeReady += newClientCount.count
}
}
}
upgradeStats.put(upgradeConfig.name, ClientReadyStats(total, upgradeReady))
}
clientsStats = newClientsStats
totalClientsGauge.record(total.toLong())
clientCalculationsCounter.add(1)

delay(clientsStatsDelay)
}
}
}

suspend fun stop() {
started.set(false)
}

internal fun getUpgradeStats() = upgradeStats

internal fun getClientIds(): List<ClientInfo> = clientIds ?: listOf()

internal fun getClientStats(): Map<String, Map<String, Long>> = clientsStats ?: mapOf()

internal fun getClientIdsInternal(): List<ClientInfo> {
dataSource.connection.use { conn ->
val sql =
Expand Down
113 changes: 113 additions & 0 deletions eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/StatsJob.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tuweni.eth.crawler

import io.opentelemetry.api.metrics.Meter
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.CoroutineContext

class StatsJob(
private val repo: RelationalPeerRepository,
private val upgradeConfigs: List<UpgradeConfig> = listOf(),
private val meter: Meter,
private val clientsStatsDelay: Long = 30 * 1000L,
override val coroutineContext: CoroutineContext = Dispatchers.Default
) : CoroutineScope {

companion object {
internal val logger = LoggerFactory.getLogger(StatsJob::class.java)
}

private var clientIds: List<ClientInfo>? = null
private var clientsStats: Map<String, Map<String, Long>>? = null
private var upgradeStats: MutableMap<String, ClientReadyStats> = mutableMapOf()
private val started = AtomicBoolean(false)
private var job: Job? = null

private val totalClientsGauge =
meter.longValueRecorderBuilder("totalClients").setDescription("Number of nodes used to compute client stats")
.build()
private val clientCalculationsCounter =
meter.longCounterBuilder("clients").setDescription("Number of times clients were computed").build()

fun start() {
logger.info("Starting stats job")
job = launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
logger.info("Computing client ids")
started.set(true)
while (started.get()) {
try {
runStats()
} catch (e: Exception) {
logger.error("Error while computing stats", e)
}

delay(clientsStatsDelay)
}
}
}

private fun runStats() {
logger.info("runStats")
val newClientIds = repo.getClientIdsInternal()
logger.info("Found client ids ${newClientIds.size}")
clientIds = newClientIds
val newClientsStats = mutableMapOf<String, MutableMap<String, Long>>()
val total = newClientIds.stream().mapToInt { it.count }.sum()

newClientIds.forEach { newClientCount ->
val clientIdInfo = ClientIdInfo(newClientCount.clientId)
val versionStats = newClientsStats.computeIfAbsent(clientIdInfo.name) { mutableMapOf() }
val statsCount = versionStats[clientIdInfo.version] ?: 0
versionStats[clientIdInfo.version] = statsCount + newClientCount.count
}
for (upgradeConfig in upgradeConfigs) {
var upgradeReady = 0
newClientIds.forEach { newClientCount ->
val clientIdInfo = ClientIdInfo(newClientCount.clientId)
upgradeConfig.versions.get(clientIdInfo.name().lowercase())?.let { upgradeVersion ->
if (clientIdInfo >= upgradeVersion) {
upgradeReady += newClientCount.count
}
}
}
upgradeStats.put(upgradeConfig.name, ClientReadyStats(total, upgradeReady))
}
clientsStats = newClientsStats
totalClientsGauge.record(total.toLong())
clientCalculationsCounter.add(1)
}

suspend fun stop() {
started.set(false)
job?.cancel()
job?.join()
}

internal fun getUpgradeStats() = upgradeStats

internal fun getClientIds(): List<ClientInfo> = clientIds ?: listOf()

internal fun getClientStats(): Map<String, Map<String, Long>> = clientsStats ?: mapOf()
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.tuweni.eth.crawler.rest
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.tuweni.eth.EthJsonModule
import org.apache.tuweni.eth.crawler.RESTMetrics
import org.apache.tuweni.eth.crawler.RelationalPeerRepository
import org.apache.tuweni.eth.crawler.StatsJob
import javax.servlet.ServletContext
import javax.ws.rs.GET
import javax.ws.rs.Path
Expand All @@ -44,18 +44,18 @@ class ClientsService {
@Produces(MediaType.APPLICATION_JSON)
@Path("all")
fun getClientIds(): String {
val repo = context!!.getAttribute("repo") as RelationalPeerRepository
val stats = context!!.getAttribute("stats") as StatsJob
val metrics = context!!.getAttribute("metrics") as RESTMetrics
metrics.clientsCounter.add(1)
val peers = repo.getClientIds()
val peers = stats.getClientIds()
val result = mapper.writeValueAsString(peers)
return result
}

@Path("{upgrade}/stats")
fun getClientStats(@PathParam("upgrade") upgrade: String): String {
val repo = context!!.getAttribute("repo") as RelationalPeerRepository
val peers = repo.getUpgradeStats()[upgrade]
val stats = context!!.getAttribute("stats") as StatsJob
val peers = stats.getUpgradeStats()[upgrade]
val result = mapper.writeValueAsString(peers)
return result
}
Expand All @@ -64,8 +64,8 @@ class ClientsService {
@Produces(MediaType.APPLICATION_JSON)
@Path("stats")
fun getClientStats(): String {
val repo = context!!.getAttribute("repo") as RelationalPeerRepository
val stats = repo.getClientStats()
val statsjob = context!!.getAttribute("stats") as StatsJob
val stats = statsjob.getClientStats()
val result = mapper.writeValueAsString(stats)
return result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ class CrawlerRESTServiceTest {
@Test
fun startAndStop() = runBlocking {
val repo = mock<RelationalPeerRepository>()
val statsMeter = SdkMeterProvider.builder().build().get("stats")
val meter = SdkMeterProvider.builder().build().get("crawler")
val stats = StatsJob(repo, listOf(), statsMeter)

val service = CrawlerRESTService(repository = repo, meter = meter)
val service = CrawlerRESTService(repository = repo, meter = meter, stats = stats)
service.start().await()
service.stop().await()
Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ class RelationalPeerRepositoryTest {

@Test
fun testGetNewPeer() = runBlocking {
val repository = RelationalPeerRepository(dataSource!!, meter = meter)
val repository = RelationalPeerRepository(dataSource!!)
val peer = repository.get("localhost", 30303, SECP256K1.KeyPair.random().publicKey())
assertNotNull(peer)
}

@Test
fun testUpdateEndpoint() = runBlocking {
val repository = RelationalPeerRepository(dataSource!!, meter = meter)
val repository2 = RelationalPeerRepository(dataSource!!, meter = meter)
val repository = RelationalPeerRepository(dataSource!!)
val repository2 = RelationalPeerRepository(dataSource!!)
val peer = repository.get("localhost", 30303, SECP256K1.KeyPair.random().publicKey())
assertNotNull(peer)
val lastSeen = 32L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.tuweni.eth.crawler.rest

import org.apache.tuweni.eth.crawler.ClientIdInfo
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test

Expand Down