From 514a5310ea47df026416b1d2d89dad099c6c1a8d Mon Sep 17 00:00:00 2001 From: Tomasz Rumak Date: Wed, 22 Nov 2023 17:50:14 +0100 Subject: [PATCH 1/6] issue 984 - initial ignite support. --- example/data-ignite/pom.xml | 74 +++++++++++++++++ .../src/main/java/Dummy4JavaDoc.java | 2 + .../order/ignite/CacheNodeApplication.scala | 10 +++ .../data/order/ignite/IgniteLocalConfig.scala | 45 ++++++++++ .../data/order/ignite/IgniteOrderStore.scala | 69 ++++++++++++++++ .../order/ignite/IgniteOrderStoreTest.scala | 45 ++++++++++ .../vuu/data/order/ignite/TestUtils.scala | 82 +++++++++++++++++++ example/data-order/pom.xml | 54 ++++++++++++ .../org/finos/vuu/data/order/ChildOrder.scala | 19 +++++ .../finos/vuu/data/order/MapOrderStore.scala | 35 ++++++++ .../org/finos/vuu/data/order/OrderStore.scala | 10 +++ .../finos/vuu/data/order/ParentOrder.scala | 22 +++++ example/order/pom.xml | 5 ++ .../core/module/simul/SimulationModule.scala | 4 + .../simul/ignite/IgniteOrderLoader.scala | 37 +++++++++ .../simul/provider/ChildOrdersProvider.scala | 1 + .../provider/ParentChildOrdersModel.scala | 34 ++++---- .../simul/provider/ParentOrdersProvider.scala | 1 + .../provider/PermissionedOrdersProvider.scala | 1 + example/pom.xml | 2 + .../runconfigurations/SimulMain.run.xml | 7 +- 21 files changed, 544 insertions(+), 15 deletions(-) create mode 100644 example/data-ignite/pom.xml create mode 100644 example/data-ignite/src/main/java/Dummy4JavaDoc.java create mode 100644 example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/CacheNodeApplication.scala create mode 100644 example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteLocalConfig.scala create mode 100644 example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteOrderStore.scala create mode 100644 example/data-ignite/src/test/scala/org/finos/vuu/data/order/ignite/IgniteOrderStoreTest.scala create mode 100644 example/data-ignite/src/test/scala/org/finos/vuu/data/order/ignite/TestUtils.scala create mode 100644 example/data-order/pom.xml create mode 100644 example/data-order/src/main/scala/org/finos/vuu/data/order/ChildOrder.scala create mode 100644 example/data-order/src/main/scala/org/finos/vuu/data/order/MapOrderStore.scala create mode 100644 example/data-order/src/main/scala/org/finos/vuu/data/order/OrderStore.scala create mode 100644 example/data-order/src/main/scala/org/finos/vuu/data/order/ParentOrder.scala create mode 100644 example/order/src/main/scala/org/finos/vuu/core/module/simul/ignite/IgniteOrderLoader.scala diff --git a/example/data-ignite/pom.xml b/example/data-ignite/pom.xml new file mode 100644 index 000000000..f26ed9835 --- /dev/null +++ b/example/data-ignite/pom.xml @@ -0,0 +1,74 @@ + + + 4.0.0 + + + org.finos.vuu + example + 0.9.36-SNAPSHOT + + + data-ignite + + + + org.finos.vuu + data-order + 0.9.36-SNAPSHOT + + + org.apache.ignite + ignite-core + 2.15.0 + + + org.apache.ignite + ignite-calcite + 2.15.0 + + + org.apache.ignite + ignite-slf4j + 2.15.0 + + + org.scala-lang + scala-library + ${scala.version} + + + + org.scala-lang + scala-reflect + ${scala.version} + + + + + + + org.scala-tools + maven-scala-plugin + ${maven.scala.plugin} + + + + compile + testCompile + + + + + src/main/scala + src/test/scala + + -Xms64m + -Xmx1024m + + + + + + \ No newline at end of file diff --git a/example/data-ignite/src/main/java/Dummy4JavaDoc.java b/example/data-ignite/src/main/java/Dummy4JavaDoc.java new file mode 100644 index 000000000..00b8d1897 --- /dev/null +++ b/example/data-ignite/src/main/java/Dummy4JavaDoc.java @@ -0,0 +1,2 @@ +public class Dummy4JavaDoc { +} diff --git a/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/CacheNodeApplication.scala b/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/CacheNodeApplication.scala new file mode 100644 index 000000000..ec994f6f5 --- /dev/null +++ b/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/CacheNodeApplication.scala @@ -0,0 +1,10 @@ +package org.finos.vuu.data.order.ignite + +import org.apache.ignite.Ignition + +object CacheNodeApplication extends App { + IgniteLocalConfig.setPersistenceEnabled(true) + val configuration = IgniteLocalConfig.create(false) + + val ignite = Ignition.getOrStart(configuration) +} diff --git a/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteLocalConfig.scala b/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteLocalConfig.scala new file mode 100644 index 000000000..7091da974 --- /dev/null +++ b/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteLocalConfig.scala @@ -0,0 +1,45 @@ +package org.finos.vuu.data.order.ignite + +import org.apache.ignite.configuration.{CacheConfiguration, DataStorageConfiguration, IgniteConfiguration} + +import java.util.concurrent.atomic.AtomicBoolean + + +object IgniteLocalConfig { + val parentOrderCacheName = "ParentOrders" + val childOrderCacheName = "ChildOrders" + private val persistenceEnabled = new AtomicBoolean() + def create(clientMode: Boolean): IgniteConfiguration = { + val cfg = new IgniteConfiguration() + + cfg.setClientMode(clientMode) + cfg.setPeerClassLoadingEnabled(true) + + cfg.setCacheConfiguration( + createCacheConfig(parentOrderCacheName), + createCacheConfig(childOrderCacheName) + ) + + cfg.setDataStorageConfiguration( + createDataStorageConfig() + ) + + cfg + } + + def setPersistenceEnabled(enabled: Boolean): Unit = { + persistenceEnabled.set(enabled) + } + private def createCacheConfig(name: String): CacheConfiguration[?, ?] = { + val cacheConfiguration = new CacheConfiguration() + cacheConfiguration.setName(name) + } + + private def createDataStorageConfig(): DataStorageConfiguration= { + val storageConfiguration = new DataStorageConfiguration() + + storageConfiguration.getDefaultDataRegionConfiguration.setPersistenceEnabled(persistenceEnabled.get()) + + storageConfiguration + } +} diff --git a/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteOrderStore.scala b/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteOrderStore.scala new file mode 100644 index 000000000..ac3d19b52 --- /dev/null +++ b/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteOrderStore.scala @@ -0,0 +1,69 @@ +package org.finos.vuu.data.order.ignite + +import org.apache.ignite.{IgniteCache, Ignition} +import org.apache.ignite.cache.query.{IndexQuery, IndexQueryCriteriaBuilder} +import org.apache.ignite.cluster.ClusterState +import org.finos.vuu.data.order.{ChildOrder, OrderStore, ParentOrder} + +import scala.collection.mutable +import scala.jdk.CollectionConverters.CollectionHasAsScala +import scala.jdk.javaapi.CollectionConverters.asJava + +object IgniteOrderStore { + + /** + * Creates an instance of IgniteOrderStore + * + * @param clientMode defines whether the node is a client or a server that is, if cluster node keeps cache data in current jvm or not + * @return an instance of IgniteOrderStore + */ + def apply(clientMode: Boolean = true, persistenceEnabled: Boolean = false):IgniteOrderStore = { + IgniteLocalConfig.setPersistenceEnabled(persistenceEnabled) + val config = IgniteLocalConfig.create(clientMode = clientMode) + val ignite = Ignition.getOrStart(config) + + ignite.cluster().state(ClusterState.ACTIVE) + + val parentOrderCache = ignite.getOrCreateCache[Int, ParentOrder](IgniteLocalConfig.parentOrderCacheName) + val childOrderCache = ignite.getOrCreateCache[Int, ChildOrder](IgniteLocalConfig.childOrderCacheName) + + new IgniteOrderStore(parentOrderCache, childOrderCache) + } +} + +class IgniteOrderStore(private val parentOrderCache: IgniteCache[Int, ParentOrder], + private val childOrderCache: IgniteCache[Int, ChildOrder]) extends OrderStore { + + + def storeParentOrder(parentOrder: ParentOrder): Unit= { + parentOrderCache.put(parentOrder.id, parentOrder) + } + + def storeChildOrder(parentOrder: ParentOrder, childOrder: ChildOrder): Unit= { + storeParentOrder(parentOrder) + childOrderCache.put(childOrder.id, childOrder) + } + + def storeParentOrderWithChildren(parentOrder: ParentOrder, childOrders: Iterable[ChildOrder]): Unit = { + storeParentOrder(parentOrder) + + val localCache = mutable.Map.empty[Int, ChildOrder] + childOrders.foreach(order => localCache(order.id) = order) + + childOrderCache.putAll(asJava(localCache)) + } + + def findParentOrderById(id: Int): ParentOrder = { + parentOrderCache.get(id) + } + + def findChildOrderByParentId(parentId: Int): Iterable[ChildOrder] = { + val query: IndexQuery[Int, ChildOrder] = new IndexQuery[Int, ChildOrder](classOf[ChildOrder]) + + val criterion = IndexQueryCriteriaBuilder.eq("parentId", parentId) + query.setCriteria(criterion) + childOrderCache.query(query) + .getAll.asScala + .map(x => x.getValue) + } +} diff --git a/example/data-ignite/src/test/scala/org/finos/vuu/data/order/ignite/IgniteOrderStoreTest.scala b/example/data-ignite/src/test/scala/org/finos/vuu/data/order/ignite/IgniteOrderStoreTest.scala new file mode 100644 index 000000000..6ceb7bbae --- /dev/null +++ b/example/data-ignite/src/test/scala/org/finos/vuu/data/order/ignite/IgniteOrderStoreTest.scala @@ -0,0 +1,45 @@ +package org.finos.vuu.data.order.ignite + +import org.apache.ignite.{Ignite, IgniteCache} +import org.finos.vuu.data.order.{ChildOrder, ParentOrder} +import org.scalatest.BeforeAndAfter +import org.scalatest.funsuite.AnyFunSuiteLike + +class IgniteOrderStoreTest extends AnyFunSuiteLike with BeforeAndAfter { + private var ignite: Ignite = _ + private var parentOrderCache: IgniteCache[Int, ParentOrder] = _ + private var childOrderCache: IgniteCache[Int, ChildOrder] = _ + private var orderStore: IgniteOrderStore = _ + + before { + ignite = TestUtils.setupIgnite() + parentOrderCache = ignite.getOrCreateCache("parentOrderCache") + childOrderCache = ignite.getOrCreateCache("childOrderCache") + orderStore = new IgniteOrderStore(parentOrderCache, childOrderCache) + } + + test("Ignite Store And Find Order") { + orderStore.storeParentOrder(TestUtils.createParentOrder(1)) + + val parentOrder = orderStore.findParentOrderById(1) + + assert(parentOrder != null) + assert(parentOrder.id == 1) + } + + test("Ignite Store And Find Child Order") { + val parentOrder = TestUtils.createParentOrder(1) + orderStore.storeParentOrder(parentOrder) + orderStore.storeChildOrder(parentOrder, TestUtils.createChildOrder(1, 1)) + + val childOrder = orderStore.findChildOrderByParentId(1) + val persistedParentOrder = orderStore.findParentOrderById(1) + assert(childOrder != null) + assert(persistedParentOrder != null) + assert(persistedParentOrder.activeChildren == 1) + } + + after { + ignite.close() + } +} diff --git a/example/data-ignite/src/test/scala/org/finos/vuu/data/order/ignite/TestUtils.scala b/example/data-ignite/src/test/scala/org/finos/vuu/data/order/ignite/TestUtils.scala new file mode 100644 index 000000000..d78769368 --- /dev/null +++ b/example/data-ignite/src/test/scala/org/finos/vuu/data/order/ignite/TestUtils.scala @@ -0,0 +1,82 @@ +package org.finos.vuu.data.order.ignite + +import org.apache.ignite.cache.{QueryEntity, QueryIndex, QueryIndexType} +import org.apache.ignite.calcite.CalciteQueryEngineConfiguration +import org.apache.ignite.configuration.{IgniteConfiguration, SqlConfiguration} +import org.apache.ignite.{Ignite, Ignition} +import org.finos.vuu.data.order.{ChildOrder, ParentOrder} + +import java.util +import scala.jdk.CollectionConverters.IterableHasAsJava + +object TestUtils { + def createChildOrder(parentId: Int, id: Int): ChildOrder = { + ChildOrder(parentId = parentId, + id = id, + ric = "ric", + price = 1.22, + quantity = 100, + side = "Buy", + account = "account", + strategy = "", + exchange = "", + ccy = "EUR", + volLimit = 100, + filledQty = 0, + openQty = 100, + averagePrice = 0, + status = "New" + ) + } + + def createParentOrder(id: Int): ParentOrder = { + ParentOrder(id = id, + ric = "ric", + price = 1.0, + quantity = 1, + side = "Buy", + account = "account", + exchange = "exchange", + ccy = "EUR", + algo = "Sniper", + volLimit = 1.0, + filledQty = 1, + openQty = 1, + averagePrice = 0, + status = "New", + remainingQty = 100, + activeChildren = 0) + } + + def setupIgnite(): Ignite = { + val igniteConfiguration = new IgniteConfiguration() + + val parentOrderCacheConfiguration = new org.apache.ignite.configuration.CacheConfiguration[Int, ParentOrder] + val childOrderCacheConfiguration = new org.apache.ignite.configuration.CacheConfiguration[Int, ChildOrder] + + parentOrderCacheConfiguration.setIndexedTypes(classOf[Int], classOf[ParentOrder]) + parentOrderCacheConfiguration.setName("parentOrderCache") + + //childOrderCacheConfiguration.setIndexedTypes(classOf[Int], classOf[ChildOrder]) + childOrderCacheConfiguration.setName("childOrderCache") + + val fields = new util.LinkedHashMap[String, String]() + fields.put("parentId", classOf[Int].getName) + + val indexes = new util.ArrayList[QueryIndex]() + indexes.add(new QueryIndex(List("parentId").asJavaCollection, QueryIndexType.SORTED).setName("PARENTID_IDX")) + + val queryEntity: QueryEntity = new QueryEntity(classOf[Int], classOf[ChildOrder]) + .setFields(fields) + .setIndexes(indexes) + + childOrderCacheConfiguration.setQueryEntities(List(queryEntity).asJavaCollection) + igniteConfiguration.setCacheConfiguration(parentOrderCacheConfiguration, childOrderCacheConfiguration) + + val sqlConfiguration = new SqlConfiguration + sqlConfiguration.setQueryEnginesConfiguration(new CalciteQueryEngineConfiguration().setDefault(true)) + igniteConfiguration.setSqlConfiguration(sqlConfiguration) + + Ignition.getOrStart(igniteConfiguration) + } +} diff --git a/example/data-order/pom.xml b/example/data-order/pom.xml new file mode 100644 index 000000000..e591d97f2 --- /dev/null +++ b/example/data-order/pom.xml @@ -0,0 +1,54 @@ + + + 4.0.0 + + + org.finos.vuu + example + 0.9.36-SNAPSHOT + + + data-order + + + + org.scala-lang + scala-library + ${scala.version} + + + + org.scala-lang + scala-reflect + ${scala.version} + + + + + + + org.scala-tools + maven-scala-plugin + ${maven.scala.plugin} + + + + compile + testCompile + + + + + src/main/scala + src/test/scala + + -Xms64m + -Xmx1024m + + + + + + \ No newline at end of file diff --git a/example/data-order/src/main/scala/org/finos/vuu/data/order/ChildOrder.scala b/example/data-order/src/main/scala/org/finos/vuu/data/order/ChildOrder.scala new file mode 100644 index 000000000..6c7035ef2 --- /dev/null +++ b/example/data-order/src/main/scala/org/finos/vuu/data/order/ChildOrder.scala @@ -0,0 +1,19 @@ +package org.finos.vuu.data.order + +case class ChildOrder( + parentId: Int, + id: Int, + ric: String, + price: Double, + quantity: Int, + side: String, + account: String, + strategy: String, + exchange: String, + ccy: String, + volLimit: Double, + filledQty: Int, + openQty: Int, + averagePrice: Double, + status: String + ) diff --git a/example/data-order/src/main/scala/org/finos/vuu/data/order/MapOrderStore.scala b/example/data-order/src/main/scala/org/finos/vuu/data/order/MapOrderStore.scala new file mode 100644 index 000000000..73d50be46 --- /dev/null +++ b/example/data-order/src/main/scala/org/finos/vuu/data/order/MapOrderStore.scala @@ -0,0 +1,35 @@ +package org.finos.vuu.data.order + +import java.util.concurrent.ConcurrentHashMap + +class MapOrderStore extends OrderStore { + private val parentOrders = new ConcurrentHashMap[Int, ParentOrder]() + private val childOrders = new ConcurrentHashMap[Int, List[ChildOrder]]() + + override def storeParentOrder(order: ParentOrder): Unit = { + parentOrders.put(order.id, order) + } + + override def storeChildOrder(parentOrder: ParentOrder, + childOrder: ChildOrder): Unit = { + parentOrders.put(parentOrder.id, parentOrder) + childOrders.get(parentOrder.id) match { + case null => childOrders.put(parentOrder.id, List(childOrder)) + case children: List[ChildOrder] => childOrders.put(parentOrder.id, childOrder :: children) + } + } + + override def findParentOrderById(id: Int): ParentOrder = { + parentOrders.get(id) + } + + + override def findChildOrderByParentId(parentId: Int): Iterable[ChildOrder] = { + childOrders.get(parentId) + } + + override def storeParentOrderWithChildren(parentOrder: ParentOrder, childOrders: Iterable[ChildOrder]): Unit = { + parentOrders.put(parentOrder.id, parentOrder) + childOrders.foreach(childOrder => this.storeChildOrder(parentOrder, childOrder)) + } +} diff --git a/example/data-order/src/main/scala/org/finos/vuu/data/order/OrderStore.scala b/example/data-order/src/main/scala/org/finos/vuu/data/order/OrderStore.scala new file mode 100644 index 000000000..3d6282cf0 --- /dev/null +++ b/example/data-order/src/main/scala/org/finos/vuu/data/order/OrderStore.scala @@ -0,0 +1,10 @@ +package org.finos.vuu.data.order + +trait OrderStore { + def storeParentOrder(order: ParentOrder): Unit + def storeChildOrder(parentOrder: ParentOrder, childOrder: ChildOrder): Unit + def storeParentOrderWithChildren(parentOrder: ParentOrder, childOrders: Iterable[ChildOrder]): Unit + + def findParentOrderById(id: Int): ParentOrder + def findChildOrderByParentId(parentId: Int): Iterable[ChildOrder] +} diff --git a/example/data-order/src/main/scala/org/finos/vuu/data/order/ParentOrder.scala b/example/data-order/src/main/scala/org/finos/vuu/data/order/ParentOrder.scala new file mode 100644 index 000000000..c069ca748 --- /dev/null +++ b/example/data-order/src/main/scala/org/finos/vuu/data/order/ParentOrder.scala @@ -0,0 +1,22 @@ +package org.finos.vuu.data.order + +case class ParentOrder( + id: Int, + ric: String, + price: Double, + quantity: Int, + side: String, + account: String, + exchange: String, + ccy: String, + algo: String, + volLimit: Double, + filledQty: Int, + openQty: Int, + averagePrice: Double, + status: String, + remainingQty: Int, + activeChildren: Int, + owner: String = "", + permissionMask: Int = 0 + ) diff --git a/example/order/pom.xml b/example/order/pom.xml index ca144f61e..1ceb6f2ec 100644 --- a/example/order/pom.xml +++ b/example/order/pom.xml @@ -29,6 +29,11 @@ price 0.9.36-SNAPSHOT + + org.finos.vuu + data-ignite + 0.9.36-SNAPSHOT + org.scala-lang diff --git a/example/order/src/main/scala/org/finos/vuu/core/module/simul/SimulationModule.scala b/example/order/src/main/scala/org/finos/vuu/core/module/simul/SimulationModule.scala index 2c4f66d38..ad47344f8 100644 --- a/example/order/src/main/scala/org/finos/vuu/core/module/simul/SimulationModule.scala +++ b/example/order/src/main/scala/org/finos/vuu/core/module/simul/SimulationModule.scala @@ -11,6 +11,8 @@ import org.finos.vuu.core.module.simul.provider._ import org.finos.vuu.core.module.simul.service.ParentOrdersService import org.finos.vuu.core.module.{DefaultModule, ModuleFactory, TableDefContainer, ViewServerModule} import org.finos.vuu.core.table.{Columns, DataTable, TableContainer} +import org.finos.vuu.data.order.ignite.IgniteOrderStore +import org.finos.vuu.data.order.{MapOrderStore, OrderStore} import org.finos.vuu.net.rpc.RpcHandler import org.finos.vuu.net.{ClientSessionId, RequestContext} import org.finos.vuu.provider.simulation.SimulatedBigInstrumentsProvider @@ -129,6 +131,8 @@ object SimulationModule extends DefaultModule { def apply()(implicit clock: Clock, lifecycle: LifecycleContainer, tableDefContainer: TableDefContainer): ViewServerModule = { implicit val randomNumbers: SeededRandomNumbers = new SeededRandomNumbers(clock.now()) + implicit val orderStore: OrderStore = IgniteOrderStore.apply() + //implicit val orderStore: OrderStore = new MapOrderStore() val ordersModel = new ParentChildOrdersModel() diff --git a/example/order/src/main/scala/org/finos/vuu/core/module/simul/ignite/IgniteOrderLoader.scala b/example/order/src/main/scala/org/finos/vuu/core/module/simul/ignite/IgniteOrderLoader.scala new file mode 100644 index 000000000..b9a64fa1c --- /dev/null +++ b/example/order/src/main/scala/org/finos/vuu/core/module/simul/ignite/IgniteOrderLoader.scala @@ -0,0 +1,37 @@ +package org.finos.vuu.core.module.simul.ignite + +import org.finos.toolbox.lifecycle.LifecycleContainer +import org.finos.toolbox.time.{Clock, DefaultClock} +import org.finos.vuu.core.module.simul.provider.{ParentChildOrdersModel, SeededRandomNumbers} +import org.finos.vuu.data.order.{ChildOrder, OrderStore} +import org.finos.vuu.data.order.ignite.IgniteOrderStore + +import java.util.concurrent.Executors +import java.util.concurrent.atomic.LongAdder + +object IgniteOrderLoader extends App { + implicit val clock: Clock = new DefaultClock() + implicit val lifecycleContainer = new LifecycleContainer() + implicit val randomNumbers: SeededRandomNumbers = new SeededRandomNumbers(clock.now()) + implicit val orderStore: OrderStore = IgniteOrderStore.apply() + + private val ordersModel = new ParentChildOrdersModel() + private val childOrderCounter = new LongAdder() + private val executor = Executors.newWorkStealingPool() + + (0 until (100_000)).foreach(i => + executor.execute { () => + val parent = ordersModel.createParent() + val childrenToCreate = randomNumbers.seededRand(100, 250) + + val children = (0 until childrenToCreate) + .map(_ => ordersModel.createChild(parent)) + .foldLeft(List[ChildOrder]())((acc, child) => acc :+ child) + + orderStore.storeParentOrderWithChildren(parent, children) + childOrderCounter.add(children.length) + if(i % 1000 == 0) { + println(s"[${Thread.currentThread().getName}] Loaded : $i parent orders and ${childOrderCounter.sum()} child orders") + } + }) +} diff --git a/example/order/src/main/scala/org/finos/vuu/core/module/simul/provider/ChildOrdersProvider.scala b/example/order/src/main/scala/org/finos/vuu/core/module/simul/provider/ChildOrdersProvider.scala index 475ca1fdf..b6f788ba1 100644 --- a/example/order/src/main/scala/org/finos/vuu/core/module/simul/provider/ChildOrdersProvider.scala +++ b/example/order/src/main/scala/org/finos/vuu/core/module/simul/provider/ChildOrdersProvider.scala @@ -3,6 +3,7 @@ package org.finos.vuu.core.module.simul.provider import org.finos.toolbox.lifecycle.LifecycleContainer import org.finos.toolbox.time.Clock import org.finos.vuu.core.table.{DataTable, RowWithData} +import org.finos.vuu.data.order.{ChildOrder, ParentOrder} import org.finos.vuu.provider.Provider class ChildOrdersProvider(val table: DataTable, model: ParentChildOrdersModel)(implicit clock: Clock, lifecycleContainer: LifecycleContainer) extends Provider { diff --git a/example/order/src/main/scala/org/finos/vuu/core/module/simul/provider/ParentChildOrdersModel.scala b/example/order/src/main/scala/org/finos/vuu/core/module/simul/provider/ParentChildOrdersModel.scala index 5d08aee37..ac35bde85 100644 --- a/example/order/src/main/scala/org/finos/vuu/core/module/simul/provider/ParentChildOrdersModel.scala +++ b/example/order/src/main/scala/org/finos/vuu/core/module/simul/provider/ParentChildOrdersModel.scala @@ -3,12 +3,13 @@ package org.finos.vuu.core.module.simul.provider import org.finos.toolbox.lifecycle.LifecycleContainer import org.finos.toolbox.time.Clock import org.finos.vuu.core.module.auths.PermissionSet +import org.finos.vuu.data.order.{ChildOrder, OrderStore, ParentOrder} import java.util.concurrent.{ConcurrentHashMap, DelayQueue, Delayed, TimeUnit} -case class ParentOrder(id: Int, ric: String, price: Double, quantity: Int, side: String, account: String, exchange: String, ccy: String, algo: String, volLimit: Double, filledQty: Int, openQty: Int, averagePrice: Double, status: String, remainingQty: Int, activeChildren: Int, owner: String = "", permissionMask: Int = 0) - -case class ChildOrder(parentId: Int, id: Int, ric: String, price: Double, quantity: Int, side: String, account: String, strategy: String, exchange: String, ccy: String, volLimit: Double, filledQty: Int, openQty: Int, averagePrice: Double, status: String) +//case class ParentOrder(id: Int, ric: String, price: Double, quantity: Int, side: String, account: String, exchange: String, ccy: String, algo: String, volLimit: Double, filledQty: Int, openQty: Int, averagePrice: Double, status: String, remainingQty: Int, activeChildren: Int, owner: String = "", permissionMask: Int = 0) +// +//case class ChildOrder(parentId: Int, id: Int, ric: String, price: Double, quantity: Int, side: String, account: String, strategy: String, exchange: String, ccy: String, volLimit: Double, filledQty: Int, openQty: Int, averagePrice: Double, status: String) trait OrderListener { def onNewParentOrder(parentOrder: ParentOrder) @@ -73,7 +74,10 @@ case class Strategy(name: String) case class OrderPermission(name: String, mask: Int) -class ParentChildOrdersModel(implicit clock: Clock, lifecycleContainer: LifecycleContainer, randomNumbers: RandomNumbers) { +class ParentChildOrdersModel(implicit clock: Clock, + lifecycleContainer: LifecycleContainer, + randomNumbers: RandomNumbers, + orderStore: OrderStore) { private final val queue = new DelayQueue[DelayQueueAction]() private final var cycleNumber = 0l @@ -85,8 +89,8 @@ class ParentChildOrdersModel(implicit clock: Clock, lifecycleContainer: Lifecycl private final val MAX_EVENTS_PER_CYCLE = 100 - private val activeOrders = new ConcurrentHashMap[Int, ParentOrder]() - private val activeChildrenByParentId = new ConcurrentHashMap[Int, List[ChildOrder]]() + //private val activeOrders = new ConcurrentHashMap[Int, ParentOrder]() + //private val activeChildrenByParentId = new ConcurrentHashMap[Int, List[ChildOrder]]() private var listeners: List[OrderListener] = List() @@ -193,7 +197,8 @@ class ParentChildOrdersModel(implicit clock: Clock, lifecycleContainer: Lifecycl action match { case InsertParent(parent, _, _, childCount) => notifyOnParentInsert(parent) - activeOrders.put(parent.id, parent) + //activeOrders.put(parent.id, parent) + orderStore.storeParentOrder(parent) val timeToAmend = randomNumbers.seededRand(1000, 10000) val timeToCancel = randomNumbers.seededRand(10000, 120000) var timeToCreateChild = randomNumbers.seededRand(1000, 3000) @@ -246,13 +251,14 @@ class ParentChildOrdersModel(implicit clock: Clock, lifecycleContainer: Lifecycl case InsertChild(child, parent, _, _) => notifyOnChildInsert(child) val updatedParent = parent.copy(activeChildren = parent.activeChildren + 1, remainingQty = parent.remainingQty - child.quantity) - activeOrders.put(updatedParent.id, updatedParent) - activeChildrenByParentId.get(parent.id) match { - case null => - activeChildrenByParentId.put(parent.id, List(child)) - case children: List[ChildOrder] => - activeChildrenByParentId.put(parent.id, List(child) ++ children) - } + orderStore.storeChildOrder(updatedParent, child) +// activeOrders.put(updatedParent.id, updatedParent) +// activeChildrenByParentId.get(parent.id) match { +// case null => +// activeChildrenByParentId.put(parent.id, List(child)) +// case children: List[ChildOrder] => +// activeChildrenByParentId.put(parent.id, List(child) ++ children) +// } case AmendChild(child, parent, _, _) => //notifyOnChildAmend(child) diff --git a/example/order/src/main/scala/org/finos/vuu/core/module/simul/provider/ParentOrdersProvider.scala b/example/order/src/main/scala/org/finos/vuu/core/module/simul/provider/ParentOrdersProvider.scala index 9f3af7707..dbc3df289 100644 --- a/example/order/src/main/scala/org/finos/vuu/core/module/simul/provider/ParentOrdersProvider.scala +++ b/example/order/src/main/scala/org/finos/vuu/core/module/simul/provider/ParentOrdersProvider.scala @@ -4,6 +4,7 @@ import org.finos.toolbox.lifecycle.LifecycleContainer import org.finos.toolbox.thread.LifeCycleRunner import org.finos.toolbox.time.Clock import org.finos.vuu.core.table.{DataTable, RowWithData} +import org.finos.vuu.data.order.{ChildOrder, ParentOrder} import org.finos.vuu.provider.Provider class ParentOrdersProvider(val table: DataTable, val model: ParentChildOrdersModel)(implicit clock: Clock, lifecycleContainer: LifecycleContainer) extends Provider { diff --git a/example/order/src/main/scala/org/finos/vuu/core/module/simul/provider/PermissionedOrdersProvider.scala b/example/order/src/main/scala/org/finos/vuu/core/module/simul/provider/PermissionedOrdersProvider.scala index 9d6c767fd..d041f72c2 100644 --- a/example/order/src/main/scala/org/finos/vuu/core/module/simul/provider/PermissionedOrdersProvider.scala +++ b/example/order/src/main/scala/org/finos/vuu/core/module/simul/provider/PermissionedOrdersProvider.scala @@ -4,6 +4,7 @@ import org.finos.toolbox.lifecycle.LifecycleContainer import org.finos.toolbox.thread.LifeCycleRunner import org.finos.toolbox.time.Clock import org.finos.vuu.core.table.{DataTable, RowWithData} +import org.finos.vuu.data.order.{ChildOrder, ParentOrder} import org.finos.vuu.provider.Provider class PermissionedOrdersProvider(val table: DataTable, val model: ParentChildOrdersModel)(implicit clock: Clock, lifecycleContainer: LifecycleContainer) extends Provider { diff --git a/example/pom.xml b/example/pom.xml index fd808d4d1..3bcc61282 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -20,6 +20,8 @@ price basket main-java + data-order + data-ignite diff --git a/vuu/src/main/resources/runconfigurations/SimulMain.run.xml b/vuu/src/main/resources/runconfigurations/SimulMain.run.xml index ea5a58eae..38059b657 100644 --- a/vuu/src/main/resources/runconfigurations/SimulMain.run.xml +++ b/vuu/src/main/resources/runconfigurations/SimulMain.run.xml @@ -1,6 +1,6 @@ - From 9226e3304e3a51dee5c0dac73e16b10c6d7f2343 Mon Sep 17 00:00:00 2001 From: Tomasz Rumak Date: Wed, 22 Nov 2023 18:01:04 +0100 Subject: [PATCH 3/6] Update SimulMain.run.xml cleanup From 61c709089153a37d192faff1125e17e8366073d2 Mon Sep 17 00:00:00 2001 From: Tomasz Rumak Date: Fri, 24 Nov 2023 22:43:22 +0100 Subject: [PATCH 4/6] added query for slice of childorders --- .../ignite/IgniteCacheQueryApplication.scala | 35 +++++++++++++ .../data/order/ignite/IgniteLocalConfig.scala | 46 +++++++++++++++-- .../data/order/ignite/IgniteOrderStore.scala | 49 +++++++++++++++++-- 3 files changed, 120 insertions(+), 10 deletions(-) create mode 100644 example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteCacheQueryApplication.scala diff --git a/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteCacheQueryApplication.scala b/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteCacheQueryApplication.scala new file mode 100644 index 000000000..d78e4df30 --- /dev/null +++ b/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteCacheQueryApplication.scala @@ -0,0 +1,35 @@ +package org.finos.vuu.data.order.ignite + +import org.apache.ignite.{IgniteCache, Ignition} +import org.finos.vuu.data.order.{ChildOrder, ParentOrder} + +import java.time.{Duration, Instant} + +/** + * An App that belongs to the suite of the following :
+ * 1. [[CacheNodeApplication]] - an app that starts Ignite cluster storage node
+ * 2. [[IgniteOrderLoader]] - an app that loads random orders and child orders into ignite cache
+ * 3. [[IgniteCacheQueryApplication]] - a showcase class that queries Ignite for slices of ChildOrders
+ */ +object IgniteCacheQueryApplication extends App { + private val clientConfig = IgniteLocalConfig.create(true) + val ignite = Ignition.getOrStart(clientConfig) + + val childOrderCache: IgniteCache[Int, ChildOrder] = ignite.getOrCreateCache(IgniteLocalConfig.childOrderCacheName) + val parentOrderCache: IgniteCache[Int, ParentOrder] = ignite.getOrCreateCache(IgniteLocalConfig.parentOrderCacheName) + val orderStore = new IgniteOrderStore(parentOrderCache, childOrderCache) + + private val windowSize = 100 + private var offset = 0 + private var remaining = orderStore.childOrderCount() + + while (remaining > 0) { + val nextWindow = Math.min(windowSize, remaining) + + val startTime = Instant.now() + val orders = orderStore.findWindow(offset, nextWindow.toInt) + println(s"Size : ${orders.size} in ${Duration.between(startTime, Instant.now())}") + offset += nextWindow.toInt + remaining -= nextWindow + } +} diff --git a/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteLocalConfig.scala b/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteLocalConfig.scala index 7091da974..a87317e9f 100644 --- a/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteLocalConfig.scala +++ b/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteLocalConfig.scala @@ -1,14 +1,18 @@ package org.finos.vuu.data.order.ignite +import org.apache.ignite.cache.{QueryEntity, QueryIndex, QueryIndexType} import org.apache.ignite.configuration.{CacheConfiguration, DataStorageConfiguration, IgniteConfiguration} +import org.finos.vuu.data.order.ChildOrder import java.util.concurrent.atomic.AtomicBoolean +import scala.jdk.CollectionConverters.IterableHasAsJava object IgniteLocalConfig { val parentOrderCacheName = "ParentOrders" val childOrderCacheName = "ChildOrders" private val persistenceEnabled = new AtomicBoolean() + def create(clientMode: Boolean): IgniteConfiguration = { val cfg = new IgniteConfiguration() @@ -16,8 +20,8 @@ object IgniteLocalConfig { cfg.setPeerClassLoadingEnabled(true) cfg.setCacheConfiguration( - createCacheConfig(parentOrderCacheName), - createCacheConfig(childOrderCacheName) + createParentOrderCacheConfig(), + createChildOrderCacheConfig() ) cfg.setDataStorageConfiguration( @@ -30,12 +34,44 @@ object IgniteLocalConfig { def setPersistenceEnabled(enabled: Boolean): Unit = { persistenceEnabled.set(enabled) } - private def createCacheConfig(name: String): CacheConfiguration[?, ?] = { + + private def createChildOrderCacheConfig(): CacheConfiguration[?, ?] = { + val cacheConfiguration = new CacheConfiguration() + + val fields = new java.util.LinkedHashMap[String, String]() + fields.put("parentId", classOf[Int].getName) + fields.put("id", classOf[Int].getName) + fields.put("ric", classOf[String].getName) + fields.put("price", classOf[Double].getName) + fields.put("quantity", classOf[Int].getName) + fields.put("side", classOf[String].getName) + fields.put("account", classOf[String].getName) + fields.put("strategy", classOf[String].getName) + fields.put("exchange", classOf[String].getName) + fields.put("ccy", classOf[String].getName) + fields.put("volLimit", classOf[Double].getName) + fields.put("filledQty", classOf[Int].getName) + fields.put("openQty", classOf[Int].getName) + fields.put("averagePrice", classOf[Double].getName) + fields.put("status", classOf[String].getName) + + val indexes = new java.util.ArrayList[QueryIndex]() + indexes.add(new QueryIndex(List("parentId").asJavaCollection, QueryIndexType.SORTED).setName("PARENTID_IDX")) + indexes.add(new QueryIndex(List("id").asJavaCollection, QueryIndexType.SORTED).setName("CHILDID_IDX")) + + val queryEntity: QueryEntity = new QueryEntity(classOf[Int], classOf[ChildOrder]) + .setFields(fields) + .setIndexes(indexes) + cacheConfiguration.setQueryEntities(List(queryEntity).asJavaCollection) + cacheConfiguration.setName(childOrderCacheName) + } + + private def createParentOrderCacheConfig(): CacheConfiguration[?, ?] = { val cacheConfiguration = new CacheConfiguration() - cacheConfiguration.setName(name) + cacheConfiguration.setName(parentOrderCacheName) } - private def createDataStorageConfig(): DataStorageConfiguration= { + private def createDataStorageConfig(): DataStorageConfiguration = { val storageConfiguration = new DataStorageConfiguration() storageConfiguration.getDefaultDataRegionConfiguration.setPersistenceEnabled(persistenceEnabled.get()) diff --git a/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteOrderStore.scala b/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteOrderStore.scala index ac3d19b52..4bc9a019a 100644 --- a/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteOrderStore.scala +++ b/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteOrderStore.scala @@ -1,8 +1,9 @@ package org.finos.vuu.data.order.ignite -import org.apache.ignite.{IgniteCache, Ignition} -import org.apache.ignite.cache.query.{IndexQuery, IndexQueryCriteriaBuilder} +import org.apache.ignite.cache.CachePeekMode +import org.apache.ignite.cache.query.{IndexQuery, IndexQueryCriteriaBuilder, SqlFieldsQuery} import org.apache.ignite.cluster.ClusterState +import org.apache.ignite.{IgniteCache, Ignition} import org.finos.vuu.data.order.{ChildOrder, OrderStore, ParentOrder} import scala.collection.mutable @@ -17,7 +18,7 @@ object IgniteOrderStore { * @param clientMode defines whether the node is a client or a server that is, if cluster node keeps cache data in current jvm or not * @return an instance of IgniteOrderStore */ - def apply(clientMode: Boolean = true, persistenceEnabled: Boolean = false):IgniteOrderStore = { + def apply(clientMode: Boolean = true, persistenceEnabled: Boolean = false): IgniteOrderStore = { IgniteLocalConfig.setPersistenceEnabled(persistenceEnabled) val config = IgniteLocalConfig.create(clientMode = clientMode) val ignite = Ignition.getOrStart(config) @@ -35,11 +36,11 @@ class IgniteOrderStore(private val parentOrderCache: IgniteCache[Int, ParentOrde private val childOrderCache: IgniteCache[Int, ChildOrder]) extends OrderStore { - def storeParentOrder(parentOrder: ParentOrder): Unit= { + def storeParentOrder(parentOrder: ParentOrder): Unit = { parentOrderCache.put(parentOrder.id, parentOrder) } - def storeChildOrder(parentOrder: ParentOrder, childOrder: ChildOrder): Unit= { + def storeChildOrder(parentOrder: ParentOrder, childOrder: ChildOrder): Unit = { storeParentOrder(parentOrder) childOrderCache.put(childOrder.id, childOrder) } @@ -66,4 +67,42 @@ class IgniteOrderStore(private val parentOrderCache: IgniteCache[Int, ParentOrde .getAll.asScala .map(x => x.getValue) } + + // todo - make it metadata aware and extract to another class. + private def toChildOrder(cols: java.util.List[_]): ChildOrder = { + ChildOrder( + parentId = cols.get(0).asInstanceOf[Int], + id = cols.get(1).asInstanceOf[Int], + ric = cols.get(2).asInstanceOf[String], + price = cols.get(3).asInstanceOf[Double], + quantity = cols.get(4).asInstanceOf[Int], + side = cols.get(5).asInstanceOf[String], + account = cols.get(6).asInstanceOf[String], + strategy = cols.get(7).asInstanceOf[String], + exchange = cols.get(8).asInstanceOf[String], + ccy = cols.get(9).asInstanceOf[String], + volLimit = cols.get(10).asInstanceOf[Double], + filledQty = cols.get(11).asInstanceOf[Int], + openQty = cols.get(12).asInstanceOf[Int], + averagePrice = cols.get(13).asInstanceOf[Double], + status = cols.get(14).asInstanceOf[String] + ) + } + + def parentOrderCount(): Long = { + parentOrderCache.sizeLong(CachePeekMode.ALL) + } + + def childOrderCount(): Long = { + childOrderCache.sizeLong(CachePeekMode.ALL) + } + + def findWindow(startIndex: Long, rowCount: Int): Iterable[ChildOrder] = { + val query = childOrderCache.query(new SqlFieldsQuery(s"select * from ChildOrder order by id limit $rowCount offset $startIndex")) + + val buffer = mutable.ListBuffer[ChildOrder]() + query.forEach(item => buffer.addOne(toChildOrder(item))) + + buffer + } } From f9e8f9500350a7575e02152d4caceb5df4a78840 Mon Sep 17 00:00:00 2001 From: Tomasz Rumak Date: Fri, 24 Nov 2023 23:02:39 +0100 Subject: [PATCH 5/6] added query for slice of childorders --- .../org/finos/vuu/data/order/ignite/CacheNodeApplication.scala | 2 +- .../org/finos/vuu/data/order/ignite/IgniteOrderStore.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/CacheNodeApplication.scala b/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/CacheNodeApplication.scala index ec994f6f5..95ca68939 100644 --- a/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/CacheNodeApplication.scala +++ b/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/CacheNodeApplication.scala @@ -3,7 +3,7 @@ package org.finos.vuu.data.order.ignite import org.apache.ignite.Ignition object CacheNodeApplication extends App { - IgniteLocalConfig.setPersistenceEnabled(true) + IgniteLocalConfig.setPersistenceEnabled(false) val configuration = IgniteLocalConfig.create(false) val ignite = Ignition.getOrStart(configuration) diff --git a/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteOrderStore.scala b/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteOrderStore.scala index 4bc9a019a..a3f58a77c 100644 --- a/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteOrderStore.scala +++ b/example/data-ignite/src/main/scala/org/finos/vuu/data/order/ignite/IgniteOrderStore.scala @@ -98,7 +98,7 @@ class IgniteOrderStore(private val parentOrderCache: IgniteCache[Int, ParentOrde } def findWindow(startIndex: Long, rowCount: Int): Iterable[ChildOrder] = { - val query = childOrderCache.query(new SqlFieldsQuery(s"select * from ChildOrder order by id limit $rowCount offset $startIndex")) + val query = childOrderCache.query(new SqlFieldsQuery(s"select * from ChildOrder limit $rowCount offset $startIndex")) val buffer = mutable.ListBuffer[ChildOrder]() query.forEach(item => buffer.addOne(toChildOrder(item))) From fed77e47e96a497f9d26fdf0c0c70a124319c011 Mon Sep 17 00:00:00 2001 From: Tomasz Rumak Date: Fri, 24 Nov 2023 23:02:49 +0100 Subject: [PATCH 6/6] added query for slice of childorders --- .../finos/vuu/core/module/simul/ignite/IgniteOrderLoader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example/order/src/main/scala/org/finos/vuu/core/module/simul/ignite/IgniteOrderLoader.scala b/example/order/src/main/scala/org/finos/vuu/core/module/simul/ignite/IgniteOrderLoader.scala index b9a64fa1c..c5e291f9d 100644 --- a/example/order/src/main/scala/org/finos/vuu/core/module/simul/ignite/IgniteOrderLoader.scala +++ b/example/order/src/main/scala/org/finos/vuu/core/module/simul/ignite/IgniteOrderLoader.scala @@ -19,7 +19,7 @@ object IgniteOrderLoader extends App { private val childOrderCounter = new LongAdder() private val executor = Executors.newWorkStealingPool() - (0 until (100_000)).foreach(i => + (0 until (10_000)).foreach(i => executor.execute { () => val parent = ordersModel.createParent() val childrenToCreate = randomNumbers.seededRand(100, 250)