Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.socketio4j.socketio.store.StoreFactory;
import com.socketio4j.socketio.store.hazelcast.HazelcastStoreFactory;
import com.socketio4j.socketio.store.memory.MemoryStoreFactory;
import com.socketio4j.socketio.store.redis_pubsub.RedissonStoreFactory;
import com.socketio4j.socketio.store.redis_pubsub.RedisStoreFactory;


import io.netty.handler.codec.http.HttpDecoderConfig;
Expand Down Expand Up @@ -111,7 +111,7 @@ public void setJsonSupport(JsonSupport jsonSupport) {
* @param clientStoreFactory - implements StoreFactory
*
* @see MemoryStoreFactory
* @see RedissonStoreFactory
* @see RedisStoreFactory
* @see HazelcastStoreFactory
*/
public void setStoreFactory(StoreFactory clientStoreFactory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import com.socketio4j.socketio.store.event.EventType;


public class HazelcastEventStore implements EventStore {
public class HazelcastPubSubEventStore implements EventStore {

private final HazelcastInstance hazelcastPub;
private final HazelcastInstance hazelcastSub;
Expand All @@ -51,9 +51,9 @@ public class HazelcastEventStore implements EventStore {
private final ConcurrentMap<EventType, ITopic<EventMessage>> activePubTopics = new ConcurrentHashMap<>();
private final ConcurrentMap<UUID, ITopic<?>> activeSubTopics = new ConcurrentHashMap<>();

private static final Logger log = LoggerFactory.getLogger(HazelcastEventStore.class);
private static final Logger log = LoggerFactory.getLogger(HazelcastPubSubEventStore.class);

public HazelcastEventStore(
public HazelcastPubSubEventStore(
@NotNull HazelcastInstance hazelcastPub,
@NotNull HazelcastInstance hazelcastSub,
@Nullable Long nodeId,
Expand Down Expand Up @@ -178,17 +178,17 @@ public Builder(@NotNull HazelcastInstance hazelcastPub,
// Optional setters (fluent)
// --------------------------------------------------

public HazelcastEventStore.Builder nodeId(long nodeId) {
public HazelcastPubSubEventStore.Builder nodeId(long nodeId) {
this.nodeId = nodeId;
return this;
}

public HazelcastEventStore.Builder eventStoreMode(@NotNull EventStoreMode mode) {
public HazelcastPubSubEventStore.Builder eventStoreMode(@NotNull EventStoreMode mode) {
this.eventStoreMode = Objects.requireNonNull(mode, "eventStoreMode");
return this;
}

public HazelcastEventStore.Builder topicNamePrefix(@NotNull String prefix) {
public HazelcastPubSubEventStore.Builder topicNamePrefix(@NotNull String prefix) {
if (prefix.isEmpty()) {
throw new IllegalArgumentException("ringBufferNamePrefix cannot be empty");
}
Expand All @@ -200,8 +200,8 @@ public HazelcastEventStore.Builder topicNamePrefix(@NotNull String prefix) {
// Build
// --------------------------------------------------

public HazelcastEventStore build() {
return new HazelcastEventStore(
public HazelcastPubSubEventStore build() {
return new HazelcastPubSubEventStore(
hazelcastPub,
hazelcastSub,
nodeId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,30 @@
import java.util.Objects;
import java.util.UUID;

import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.hazelcast.core.HazelcastInstance;
import com.socketio4j.socketio.store.Store;
import com.socketio4j.socketio.store.event.BaseStoreFactory;
import com.socketio4j.socketio.store.event.EventStore;

import com.socketio4j.socketio.store.event.EventStoreMode;

/**
* WARN: It's necessary to add netty-socketio.jar in hazelcast server classpath.
*
* A {@code StoreFactory} implementation that provides session-scoped storage backed
* by Hazelcast and allows users to supply an {@link EventStore} of their choice.
* <p>
* Session data is stored in Hazelcast via {@link HazelcastStore}, while event
* propagation is determined entirely by the provided {@link EventStore}. This
* design allows hybrid configurations such as:
* <ul>
* <li>Hazelcast session storage + Kafka event distribution</li>
* <li>Hazelcast session storage + Redis Streams event distribution</li>
* <li>Hazelcast session storage + in-memory event propagation (local only)</li>
* </ul>
* <p>
* If no {@link EventStore} is supplied, {@link HazelcastPubSubEventStore} is used by default.
*/
public class HazelcastStoreFactory extends BaseStoreFactory {

Expand All @@ -41,17 +53,34 @@ public class HazelcastStoreFactory extends BaseStoreFactory {
private final EventStore eventStore;

/**
* API 4.y.z
* @param hazelcastClient
* @param eventStore
* Creates a {@code HazelcastStoreFactory} using the provided Hazelcast instance and
* a caller-supplied {@link EventStore}.
*
* @apiNote Added in API version {@code 4.0.0}
*
* @param hazelcastClient non-null Hazelcast instance
* @param eventStore non-null event store implementation
* @throws NullPointerException if either argument is {@code null}
*/
public HazelcastStoreFactory(HazelcastInstance hazelcastClient, EventStore eventStore) {

Objects.requireNonNull(hazelcastClient, "hazelcastClient cannot be null");
Objects.requireNonNull(eventStore, "eventStore cannot be null");
public HazelcastStoreFactory(@NotNull HazelcastInstance hazelcastClient,
@NotNull EventStore eventStore) {
this.hazelcastClient = Objects.requireNonNull(hazelcastClient, "hazelcastClient cannot be null");
this.eventStore = Objects.requireNonNull(eventStore, "eventStore cannot be null");
}

this.hazelcastClient = hazelcastClient;
this.eventStore = eventStore;
/**
* Creates a {@code HazelcastStoreFactory} using default Hazelcast-backed event distribution.
* <p>
* Session data remains in Hazelcast, while events are propagated via {@link HazelcastPubSubEventStore}
* in {@link EventStoreMode#MULTI_CHANNEL} mode.
*
* @apiNote Added in API version {@code 4.0.0}
*
* @param hazelcastClient non-null Hazelcast instance
*/
public HazelcastStoreFactory(@NotNull HazelcastInstance hazelcastClient) {
this(hazelcastClient,
new HazelcastPubSubEventStore(hazelcastClient, hazelcastClient, null, null, null));
}

@Override
Expand All @@ -60,19 +89,26 @@ public Store createStore(UUID sessionId) {
}

@Override
public void shutdown() {
eventStore.shutdown();
public EventStore eventStore() {
return eventStore;
}


@Override
public EventStore eventStore() {
return eventStore;
public void shutdown() {
try {
eventStore.shutdown();
} catch (Exception e) {
log.error("Failed to shut down event store", e);
}
}

@Override
public <K, V> Map<K, V> createMap(String name) {
return hazelcastClient.getMap(name);
}

}
@Override
public String toString() {
return getClass().getSimpleName() + " (Hazelcast session store)";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import com.socketio4j.socketio.store.event.EventType;


public class HazelcastRingBufferEventStore implements EventStore {
public class HazelcastPubSubRingBufferEventStore implements EventStore {

private final HazelcastInstance hazelcastPub;
private final HazelcastInstance hazelcastSub;
Expand All @@ -50,12 +50,12 @@ public class HazelcastRingBufferEventStore implements EventStore {
private final ConcurrentMap<EventType, ITopic<EventMessage>> activePubTopics = new ConcurrentHashMap<>();
private final ConcurrentMap<UUID, ITopic<?>> activeSubTopics = new ConcurrentHashMap<>();

private static final Logger log = LoggerFactory.getLogger(HazelcastRingBufferEventStore.class);
private static final Logger log = LoggerFactory.getLogger(HazelcastPubSubRingBufferEventStore.class);
private final String ringBufferNamePrefix;

private static final String DEFAULT_RING_BUFFER_NAME_PREFIX = "SOCKETIO4J:";

public HazelcastRingBufferEventStore(
public HazelcastPubSubRingBufferEventStore(
@NotNull HazelcastInstance hazelcastPub,
@NotNull HazelcastInstance hazelcastSub,
@Nullable Long nodeId,
Expand Down Expand Up @@ -210,8 +210,8 @@ public Builder ringBufferNamePrefix(@NotNull String prefix) {
// Build
// --------------------------------------------------

public HazelcastRingBufferEventStore build() {
return new HazelcastRingBufferEventStore(
public HazelcastPubSubRingBufferEventStore build() {
return new HazelcastPubSubRingBufferEventStore(
hazelcastPub,
hazelcastSub,
nodeId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,51 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

import org.jetbrains.annotations.NotNull;

import com.socketio4j.socketio.store.Store;
import com.socketio4j.socketio.store.event.BaseStoreFactory;
import com.socketio4j.socketio.store.event.EventStore;

/**
* A {@code StoreFactory} implementation that provides per-session in-memory storage.
* <p>
* Session data is stored locally in JVM memory via {@link MemoryStore}. Event propagation
* is determined entirely by the provided {@link EventStore}. This allows combinations like:
* <ul>
* <li>Memory session storage + Kafka event distribution</li>
* <li>Memory session storage + Redis Streams event distribution</li>
* <li>Memory session storage + in-memory event propagation (local only)</li>
* </ul>
* <p>
* If no {@link EventStore} is supplied, {@link MemoryEventStore} is used by default.
*/
public class MemoryStoreFactory extends BaseStoreFactory {

private final EventStore eventStore;

/**
* Creates a new {@code MemoryStoreFactory} using {@link MemoryEventStore}.
* Both session data and events remain local to the JVM.
* @apiNote Added in API version {@code 4.0.0}
*/
public MemoryStoreFactory() {
this.eventStore = new MemoryEventStore();
}

public MemoryStoreFactory(EventStore eventStore) {
Objects.requireNonNull(eventStore, "eventStore can not be null");
this.eventStore = eventStore;
/**
* Creates a new {@code MemoryStoreFactory} using the provided {@link EventStore}.
* Session data remains local, but event propagation depends on the given implementation.
*
* @apiNote Added in API version {@code 4.0.0}
*
* @param eventStore non-null event store
* @throws NullPointerException if {@code eventStore} is {@code null}
*/
public MemoryStoreFactory(@NotNull EventStore eventStore) {
this.eventStore = Objects.requireNonNull(eventStore, "eventStore can not be null");
}

@Override
public Store createStore(UUID sessionId) {
return new MemoryStore();
Expand All @@ -47,18 +76,19 @@ public EventStore eventStore() {
return eventStore;
}

@Override
public void shutdown() {
}

@Override
public String toString() {
return getClass().getSimpleName() + " (local session store only)";
public void shutdown() {
// no-op
}

@Override
public <K, V> Map<K, V> createMap(String name) {
return new ConcurrentHashMap<>();
}

@Override
public String toString() {
return getClass().getSimpleName() + " (memory session store)";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
* Unreliable Redis Pub/Sub based EventStore.
* Events are ephemeral and not replayed.
*/
public class RedissonEventStore implements EventStore {
public class RedisPubSubEventStore implements EventStore {

private final RedissonClient redissonPub;
private final RedissonClient redissonSub;
Expand All @@ -51,7 +51,7 @@ public class RedissonEventStore implements EventStore {
private final ConcurrentMap<Integer, RTopic> activeSubTopics = new ConcurrentHashMap<>();
private final ConcurrentMap<EventType, RTopic> activePubTopics = new ConcurrentHashMap<>();

private static final Logger log = LoggerFactory.getLogger(RedissonEventStore.class);
private static final Logger log = LoggerFactory.getLogger(RedisPubSubEventStore.class);

// ----------------------------------------------------------------------
// Constructors
Expand All @@ -63,7 +63,7 @@ public class RedissonEventStore implements EventStore {
* @param eventStoreMode
* @param nodeId
*/
public RedissonEventStore(@NotNull RedissonClient redissonPub,
public RedisPubSubEventStore(@NotNull RedissonClient redissonPub,
@NotNull RedissonClient redissonSub,
@Nullable EventStoreMode eventStoreMode,
@Nullable Long nodeId) {
Expand Down Expand Up @@ -187,8 +187,8 @@ public Builder eventStoreMode(@NotNull EventStoreMode mode) {
// Build
// -------------------------

public RedissonEventStore build() {
return new RedissonEventStore(
public RedisPubSubEventStore build() {
return new RedisPubSubEventStore(
redissonPub,
redissonSub,
eventStoreMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import com.socketio4j.socketio.store.Store;


public class RedissonStore implements Store {
public class RedisStore implements Store {

private final RMap<String, Object> map;

public RedissonStore(UUID sessionId, RedissonClient redisson) {
public RedisStore(UUID sessionId, RedissonClient redisson) {
this.map = redisson.getMap(sessionId.toString());
}

Expand Down
Loading
Loading