Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 984 ignite cache initial sketch #997

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions example/data-ignite/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.finos.vuu</groupId>
<artifactId>example</artifactId>
<version>0.9.36-SNAPSHOT</version>
</parent>

<artifactId>data-ignite</artifactId>

<dependencies>
<dependency>
<groupId>org.finos.vuu</groupId>
<artifactId>data-order</artifactId>
<version>0.9.36-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>2.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-calcite</artifactId>
<version>2.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-slf4j</artifactId>
<version>2.15.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>${maven.scala.plugin}</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<sourceDir>src/main/scala</sourceDir>
<testSourceDir>src/test/scala</testSourceDir>
<jvmArgs>
<jvmArg>-Xms64m</jvmArg>
<jvmArg>-Xmx1024m</jvmArg>
</jvmArgs>
</configuration>
</plugin>
</plugins>
</build>
</project>
2 changes: 2 additions & 0 deletions example/data-ignite/src/main/java/Dummy4JavaDoc.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
public class Dummy4JavaDoc {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.finos.vuu.data.order.ignite

import org.apache.ignite.Ignition

object CacheNodeApplication extends App {
IgniteLocalConfig.setPersistenceEnabled(false)
val configuration = IgniteLocalConfig.create(false)

val ignite = Ignition.getOrStart(configuration)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.finos.vuu.data.order.ignite

import org.apache.ignite.{IgniteCache, Ignition}
import org.finos.vuu.data.order.{ChildOrder, ParentOrder}

import java.time.{Duration, Instant}

/**
* An App that belongs to the suite of the following : <br>
* 1. [[CacheNodeApplication]] - an app that starts Ignite cluster storage node <br>
* 2. [[IgniteOrderLoader]] - an app that loads random orders and child orders into ignite cache <br>
* 3. [[IgniteCacheQueryApplication]] - a showcase class that queries Ignite for slices of ChildOrders <br>
*/
object IgniteCacheQueryApplication extends App {
private val clientConfig = IgniteLocalConfig.create(true)
val ignite = Ignition.getOrStart(clientConfig)

val childOrderCache: IgniteCache[Int, ChildOrder] = ignite.getOrCreateCache(IgniteLocalConfig.childOrderCacheName)
val parentOrderCache: IgniteCache[Int, ParentOrder] = ignite.getOrCreateCache(IgniteLocalConfig.parentOrderCacheName)
val orderStore = new IgniteOrderStore(parentOrderCache, childOrderCache)

private val windowSize = 100
private var offset = 0
private var remaining = orderStore.childOrderCount()

while (remaining > 0) {
val nextWindow = Math.min(windowSize, remaining)

val startTime = Instant.now()
val orders = orderStore.findWindow(offset, nextWindow.toInt)
println(s"Size : ${orders.size} in ${Duration.between(startTime, Instant.now())}")
offset += nextWindow.toInt
remaining -= nextWindow
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package org.finos.vuu.data.order.ignite

import org.apache.ignite.cache.{QueryEntity, QueryIndex, QueryIndexType}
import org.apache.ignite.configuration.{CacheConfiguration, DataStorageConfiguration, IgniteConfiguration}
import org.finos.vuu.data.order.ChildOrder

import java.util.concurrent.atomic.AtomicBoolean
import scala.jdk.CollectionConverters.IterableHasAsJava


object IgniteLocalConfig {
val parentOrderCacheName = "ParentOrders"
val childOrderCacheName = "ChildOrders"
private val persistenceEnabled = new AtomicBoolean()

def create(clientMode: Boolean): IgniteConfiguration = {
val cfg = new IgniteConfiguration()

cfg.setClientMode(clientMode)
cfg.setPeerClassLoadingEnabled(true)

cfg.setCacheConfiguration(
createParentOrderCacheConfig(),
createChildOrderCacheConfig()
)

cfg.setDataStorageConfiguration(
createDataStorageConfig()
)

cfg
}

def setPersistenceEnabled(enabled: Boolean): Unit = {
persistenceEnabled.set(enabled)
}

private def createChildOrderCacheConfig(): CacheConfiguration[?, ?] = {
val cacheConfiguration = new CacheConfiguration()

val fields = new java.util.LinkedHashMap[String, String]()
fields.put("parentId", classOf[Int].getName)
fields.put("id", classOf[Int].getName)
fields.put("ric", classOf[String].getName)
fields.put("price", classOf[Double].getName)
fields.put("quantity", classOf[Int].getName)
fields.put("side", classOf[String].getName)
fields.put("account", classOf[String].getName)
fields.put("strategy", classOf[String].getName)
fields.put("exchange", classOf[String].getName)
fields.put("ccy", classOf[String].getName)
fields.put("volLimit", classOf[Double].getName)
fields.put("filledQty", classOf[Int].getName)
fields.put("openQty", classOf[Int].getName)
fields.put("averagePrice", classOf[Double].getName)
fields.put("status", classOf[String].getName)

val indexes = new java.util.ArrayList[QueryIndex]()
indexes.add(new QueryIndex(List("parentId").asJavaCollection, QueryIndexType.SORTED).setName("PARENTID_IDX"))
indexes.add(new QueryIndex(List("id").asJavaCollection, QueryIndexType.SORTED).setName("CHILDID_IDX"))

val queryEntity: QueryEntity = new QueryEntity(classOf[Int], classOf[ChildOrder])
.setFields(fields)
.setIndexes(indexes)
cacheConfiguration.setQueryEntities(List(queryEntity).asJavaCollection)
cacheConfiguration.setName(childOrderCacheName)
}

private def createParentOrderCacheConfig(): CacheConfiguration[?, ?] = {
val cacheConfiguration = new CacheConfiguration()
cacheConfiguration.setName(parentOrderCacheName)
}

private def createDataStorageConfig(): DataStorageConfiguration = {
val storageConfiguration = new DataStorageConfiguration()

storageConfiguration.getDefaultDataRegionConfiguration.setPersistenceEnabled(persistenceEnabled.get())

storageConfiguration
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.finos.vuu.data.order.ignite

import org.apache.ignite.cache.CachePeekMode
import org.apache.ignite.cache.query.{IndexQuery, IndexQueryCriteriaBuilder, SqlFieldsQuery}
import org.apache.ignite.cluster.ClusterState
import org.apache.ignite.{IgniteCache, Ignition}
import org.finos.vuu.data.order.{ChildOrder, OrderStore, ParentOrder}

import scala.collection.mutable
import scala.jdk.CollectionConverters.CollectionHasAsScala
import scala.jdk.javaapi.CollectionConverters.asJava

object IgniteOrderStore {

/**
* Creates an instance of IgniteOrderStore
*
* @param clientMode defines whether the node is a client or a server that is, if cluster node keeps cache data in current jvm or not
* @return an instance of IgniteOrderStore
*/
def apply(clientMode: Boolean = true, persistenceEnabled: Boolean = false): IgniteOrderStore = {
IgniteLocalConfig.setPersistenceEnabled(persistenceEnabled)
val config = IgniteLocalConfig.create(clientMode = clientMode)
val ignite = Ignition.getOrStart(config)

ignite.cluster().state(ClusterState.ACTIVE)

val parentOrderCache = ignite.getOrCreateCache[Int, ParentOrder](IgniteLocalConfig.parentOrderCacheName)
val childOrderCache = ignite.getOrCreateCache[Int, ChildOrder](IgniteLocalConfig.childOrderCacheName)

new IgniteOrderStore(parentOrderCache, childOrderCache)
}
}

class IgniteOrderStore(private val parentOrderCache: IgniteCache[Int, ParentOrder],
private val childOrderCache: IgniteCache[Int, ChildOrder]) extends OrderStore {


def storeParentOrder(parentOrder: ParentOrder): Unit = {
parentOrderCache.put(parentOrder.id, parentOrder)
}

def storeChildOrder(parentOrder: ParentOrder, childOrder: ChildOrder): Unit = {
storeParentOrder(parentOrder)
childOrderCache.put(childOrder.id, childOrder)
}

def storeParentOrderWithChildren(parentOrder: ParentOrder, childOrders: Iterable[ChildOrder]): Unit = {
storeParentOrder(parentOrder)

val localCache = mutable.Map.empty[Int, ChildOrder]
childOrders.foreach(order => localCache(order.id) = order)

childOrderCache.putAll(asJava(localCache))
}

def findParentOrderById(id: Int): ParentOrder = {
parentOrderCache.get(id)
}

def findChildOrderByParentId(parentId: Int): Iterable[ChildOrder] = {
val query: IndexQuery[Int, ChildOrder] = new IndexQuery[Int, ChildOrder](classOf[ChildOrder])

val criterion = IndexQueryCriteriaBuilder.eq("parentId", parentId)
query.setCriteria(criterion)
childOrderCache.query(query)
.getAll.asScala
.map(x => x.getValue)
}

// todo - make it metadata aware and extract to another class.
private def toChildOrder(cols: java.util.List[_]): ChildOrder = {
ChildOrder(
parentId = cols.get(0).asInstanceOf[Int],
id = cols.get(1).asInstanceOf[Int],
ric = cols.get(2).asInstanceOf[String],
price = cols.get(3).asInstanceOf[Double],
quantity = cols.get(4).asInstanceOf[Int],
side = cols.get(5).asInstanceOf[String],
account = cols.get(6).asInstanceOf[String],
strategy = cols.get(7).asInstanceOf[String],
exchange = cols.get(8).asInstanceOf[String],
ccy = cols.get(9).asInstanceOf[String],
volLimit = cols.get(10).asInstanceOf[Double],
filledQty = cols.get(11).asInstanceOf[Int],
openQty = cols.get(12).asInstanceOf[Int],
averagePrice = cols.get(13).asInstanceOf[Double],
status = cols.get(14).asInstanceOf[String]
)
}

def parentOrderCount(): Long = {
parentOrderCache.sizeLong(CachePeekMode.ALL)
}

def childOrderCount(): Long = {
childOrderCache.sizeLong(CachePeekMode.ALL)
}

def findWindow(startIndex: Long, rowCount: Int): Iterable[ChildOrder] = {
val query = childOrderCache.query(new SqlFieldsQuery(s"select * from ChildOrder limit $rowCount offset $startIndex"))

val buffer = mutable.ListBuffer[ChildOrder]()
query.forEach(item => buffer.addOne(toChildOrder(item)))

buffer
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.finos.vuu.data.order.ignite

import org.apache.ignite.{Ignite, IgniteCache}
import org.finos.vuu.data.order.{ChildOrder, ParentOrder}
import org.scalatest.BeforeAndAfter
import org.scalatest.funsuite.AnyFunSuiteLike

class IgniteOrderStoreTest extends AnyFunSuiteLike with BeforeAndAfter {
private var ignite: Ignite = _
private var parentOrderCache: IgniteCache[Int, ParentOrder] = _
private var childOrderCache: IgniteCache[Int, ChildOrder] = _
private var orderStore: IgniteOrderStore = _

before {
ignite = TestUtils.setupIgnite()
parentOrderCache = ignite.getOrCreateCache("parentOrderCache")
childOrderCache = ignite.getOrCreateCache("childOrderCache")
orderStore = new IgniteOrderStore(parentOrderCache, childOrderCache)
}

test("Ignite Store And Find Order") {
orderStore.storeParentOrder(TestUtils.createParentOrder(1))

val parentOrder = orderStore.findParentOrderById(1)

assert(parentOrder != null)
assert(parentOrder.id == 1)
}

test("Ignite Store And Find Child Order") {
val parentOrder = TestUtils.createParentOrder(1)
orderStore.storeParentOrder(parentOrder)
orderStore.storeChildOrder(parentOrder, TestUtils.createChildOrder(1, 1))

val childOrder = orderStore.findChildOrderByParentId(1)
val persistedParentOrder = orderStore.findParentOrderById(1)
assert(childOrder != null)
assert(persistedParentOrder != null)
assert(persistedParentOrder.activeChildren == 1)
}

after {
ignite.close()
}
}
Loading
Loading