Skip to content

Commit

Permalink
added query for slice of childorders
Browse files Browse the repository at this point in the history
  • Loading branch information
rumakt committed Nov 24, 2023
1 parent 9226e33 commit 61c7090
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.finos.vuu.data.order.ignite

import org.apache.ignite.{IgniteCache, Ignition}
import org.finos.vuu.data.order.{ChildOrder, ParentOrder}

import java.time.{Duration, Instant}

/**
* An App that belongs to the suite of the following : <br>
* 1. [[CacheNodeApplication]] - an app that starts Ignite cluster storage node <br>
* 2. [[IgniteOrderLoader]] - an app that loads random orders and child orders into ignite cache <br>
* 3. [[IgniteCacheQueryApplication]] - a showcase class that queries Ignite for slices of ChildOrders <br>
*/
object IgniteCacheQueryApplication extends App {
private val clientConfig = IgniteLocalConfig.create(true)
val ignite = Ignition.getOrStart(clientConfig)

val childOrderCache: IgniteCache[Int, ChildOrder] = ignite.getOrCreateCache(IgniteLocalConfig.childOrderCacheName)
val parentOrderCache: IgniteCache[Int, ParentOrder] = ignite.getOrCreateCache(IgniteLocalConfig.parentOrderCacheName)
val orderStore = new IgniteOrderStore(parentOrderCache, childOrderCache)

private val windowSize = 100
private var offset = 0
private var remaining = orderStore.childOrderCount()

while (remaining > 0) {
val nextWindow = Math.min(windowSize, remaining)

val startTime = Instant.now()
val orders = orderStore.findWindow(offset, nextWindow.toInt)
println(s"Size : ${orders.size} in ${Duration.between(startTime, Instant.now())}")
offset += nextWindow.toInt
remaining -= nextWindow
}
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
package org.finos.vuu.data.order.ignite

import org.apache.ignite.cache.{QueryEntity, QueryIndex, QueryIndexType}
import org.apache.ignite.configuration.{CacheConfiguration, DataStorageConfiguration, IgniteConfiguration}
import org.finos.vuu.data.order.ChildOrder

import java.util.concurrent.atomic.AtomicBoolean
import scala.jdk.CollectionConverters.IterableHasAsJava


object IgniteLocalConfig {
val parentOrderCacheName = "ParentOrders"
val childOrderCacheName = "ChildOrders"
private val persistenceEnabled = new AtomicBoolean()

def create(clientMode: Boolean): IgniteConfiguration = {
val cfg = new IgniteConfiguration()

cfg.setClientMode(clientMode)
cfg.setPeerClassLoadingEnabled(true)

cfg.setCacheConfiguration(
createCacheConfig(parentOrderCacheName),
createCacheConfig(childOrderCacheName)
createParentOrderCacheConfig(),
createChildOrderCacheConfig()
)

cfg.setDataStorageConfiguration(
Expand All @@ -30,12 +34,44 @@ object IgniteLocalConfig {
def setPersistenceEnabled(enabled: Boolean): Unit = {
persistenceEnabled.set(enabled)
}
private def createCacheConfig(name: String): CacheConfiguration[?, ?] = {

private def createChildOrderCacheConfig(): CacheConfiguration[?, ?] = {
val cacheConfiguration = new CacheConfiguration()

val fields = new java.util.LinkedHashMap[String, String]()
fields.put("parentId", classOf[Int].getName)
fields.put("id", classOf[Int].getName)
fields.put("ric", classOf[String].getName)
fields.put("price", classOf[Double].getName)
fields.put("quantity", classOf[Int].getName)
fields.put("side", classOf[String].getName)
fields.put("account", classOf[String].getName)
fields.put("strategy", classOf[String].getName)
fields.put("exchange", classOf[String].getName)
fields.put("ccy", classOf[String].getName)
fields.put("volLimit", classOf[Double].getName)
fields.put("filledQty", classOf[Int].getName)
fields.put("openQty", classOf[Int].getName)
fields.put("averagePrice", classOf[Double].getName)
fields.put("status", classOf[String].getName)

val indexes = new java.util.ArrayList[QueryIndex]()
indexes.add(new QueryIndex(List("parentId").asJavaCollection, QueryIndexType.SORTED).setName("PARENTID_IDX"))
indexes.add(new QueryIndex(List("id").asJavaCollection, QueryIndexType.SORTED).setName("CHILDID_IDX"))

val queryEntity: QueryEntity = new QueryEntity(classOf[Int], classOf[ChildOrder])
.setFields(fields)
.setIndexes(indexes)
cacheConfiguration.setQueryEntities(List(queryEntity).asJavaCollection)
cacheConfiguration.setName(childOrderCacheName)
}

private def createParentOrderCacheConfig(): CacheConfiguration[?, ?] = {
val cacheConfiguration = new CacheConfiguration()
cacheConfiguration.setName(name)
cacheConfiguration.setName(parentOrderCacheName)
}

private def createDataStorageConfig(): DataStorageConfiguration= {
private def createDataStorageConfig(): DataStorageConfiguration = {
val storageConfiguration = new DataStorageConfiguration()

storageConfiguration.getDefaultDataRegionConfiguration.setPersistenceEnabled(persistenceEnabled.get())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package org.finos.vuu.data.order.ignite

import org.apache.ignite.{IgniteCache, Ignition}
import org.apache.ignite.cache.query.{IndexQuery, IndexQueryCriteriaBuilder}
import org.apache.ignite.cache.CachePeekMode
import org.apache.ignite.cache.query.{IndexQuery, IndexQueryCriteriaBuilder, SqlFieldsQuery}
import org.apache.ignite.cluster.ClusterState
import org.apache.ignite.{IgniteCache, Ignition}
import org.finos.vuu.data.order.{ChildOrder, OrderStore, ParentOrder}

import scala.collection.mutable
Expand All @@ -17,7 +18,7 @@ object IgniteOrderStore {
* @param clientMode defines whether the node is a client or a server that is, if cluster node keeps cache data in current jvm or not
* @return an instance of IgniteOrderStore
*/
def apply(clientMode: Boolean = true, persistenceEnabled: Boolean = false):IgniteOrderStore = {
def apply(clientMode: Boolean = true, persistenceEnabled: Boolean = false): IgniteOrderStore = {
IgniteLocalConfig.setPersistenceEnabled(persistenceEnabled)
val config = IgniteLocalConfig.create(clientMode = clientMode)
val ignite = Ignition.getOrStart(config)
Expand All @@ -35,11 +36,11 @@ class IgniteOrderStore(private val parentOrderCache: IgniteCache[Int, ParentOrde
private val childOrderCache: IgniteCache[Int, ChildOrder]) extends OrderStore {


def storeParentOrder(parentOrder: ParentOrder): Unit= {
def storeParentOrder(parentOrder: ParentOrder): Unit = {
parentOrderCache.put(parentOrder.id, parentOrder)
}

def storeChildOrder(parentOrder: ParentOrder, childOrder: ChildOrder): Unit= {
def storeChildOrder(parentOrder: ParentOrder, childOrder: ChildOrder): Unit = {
storeParentOrder(parentOrder)
childOrderCache.put(childOrder.id, childOrder)
}
Expand All @@ -66,4 +67,42 @@ class IgniteOrderStore(private val parentOrderCache: IgniteCache[Int, ParentOrde
.getAll.asScala
.map(x => x.getValue)
}

// todo - make it metadata aware and extract to another class.
private def toChildOrder(cols: java.util.List[_]): ChildOrder = {
ChildOrder(
parentId = cols.get(0).asInstanceOf[Int],
id = cols.get(1).asInstanceOf[Int],
ric = cols.get(2).asInstanceOf[String],
price = cols.get(3).asInstanceOf[Double],
quantity = cols.get(4).asInstanceOf[Int],
side = cols.get(5).asInstanceOf[String],
account = cols.get(6).asInstanceOf[String],
strategy = cols.get(7).asInstanceOf[String],
exchange = cols.get(8).asInstanceOf[String],
ccy = cols.get(9).asInstanceOf[String],
volLimit = cols.get(10).asInstanceOf[Double],
filledQty = cols.get(11).asInstanceOf[Int],
openQty = cols.get(12).asInstanceOf[Int],
averagePrice = cols.get(13).asInstanceOf[Double],
status = cols.get(14).asInstanceOf[String]
)
}

def parentOrderCount(): Long = {
parentOrderCache.sizeLong(CachePeekMode.ALL)
}

def childOrderCount(): Long = {
childOrderCache.sizeLong(CachePeekMode.ALL)
}

def findWindow(startIndex: Long, rowCount: Int): Iterable[ChildOrder] = {
val query = childOrderCache.query(new SqlFieldsQuery(s"select * from ChildOrder order by id limit $rowCount offset $startIndex"))

val buffer = mutable.ListBuffer[ChildOrder]()
query.forEach(item => buffer.addOne(toChildOrder(item)))

buffer
}
}

0 comments on commit 61c7090

Please sign in to comment.