Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,16 @@ public class DatafusionEngine extends SearchExecEngine<DatafusionContext, Datafu

public DatafusionEngine(DataFormat dataFormat, Collection<FileMetadata> formatCatalogSnapshot, DataFusionService dataFusionService, ShardPath shardPath) throws IOException {
this.dataFormat = dataFormat;

this.datafusionReaderManager = new DatafusionReaderManager(shardPath.getDataPath().toString(), formatCatalogSnapshot, dataFormat.getName());

// Create shard ID from index name and shard ID
String shardId = shardPath.getShardId().getIndexName() + "[" + shardPath.getShardId().getId() + "]";

this.datafusionReaderManager = new DatafusionReaderManager(
shardPath.getDataPath().toString(),
formatCatalogSnapshot,
dataFormat.getName(),
shardId
);
this.datafusionService = dataFusionService;
this.cacheManager = datafusionService.getCacheManager();
this.rootAllocator = new RootAllocator(Long.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,30 @@

package org.opensearch.datafusion.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.datafusion.search.DatafusionReaderRegistry;

import java.io.IOException;
import java.util.Map;

/**
* Information about DataFusion on a specific node
*/
public class NodeDataFusionInfo extends BaseNodeResponse implements ToXContentFragment {

private static final Logger logger = LogManager.getLogger(NodeDataFusionInfo.class);

private final String dataFusionVersion;
private final int activeReaderCount;
private final int totalRefCount;
private final Map<Long, DatafusionReaderRegistry.ReaderInfo> readerDetails;

/**
* Constructor for NodeDataFusionInfo.
Expand All @@ -32,6 +41,24 @@ public class NodeDataFusionInfo extends BaseNodeResponse implements ToXContentFr
public NodeDataFusionInfo(DiscoveryNode node, String dataFusionVersion) {
super(node);
this.dataFusionVersion = dataFusionVersion;
// Collect reader information from the registry
DatafusionReaderRegistry registry = DatafusionReaderRegistry.getInstance();

// Clean up any stale readers first
int cleanedUp = registry.cleanupStaleReaders();
if (cleanedUp > 0) {
logger.info("Cleaned up {} stale readers before collecting info", cleanedUp);
}

this.activeReaderCount = registry.getActiveReaderCount();
this.totalRefCount = registry.getTotalRefCount();
this.readerDetails = registry.getAllReaderInfo();

// Log reader reference counts when DataFusionInfo is called
logger.info("DataFusion Info Request - Node: {}, Version: {}", node.getId(), dataFusionVersion);
registry.logAllReaderRefCounts("[DataFusion Info Request]");
logger.info("DataFusion Reader Statistics Summary - Active Readers: {}, Total RefCount: {}",
activeReaderCount, totalRefCount);
}

/**
Expand All @@ -42,6 +69,10 @@ public NodeDataFusionInfo(DiscoveryNode node, String dataFusionVersion) {
public NodeDataFusionInfo(StreamInput in) throws IOException {
super(in);
this.dataFusionVersion = in.readString();
this.activeReaderCount = in.readInt();
this.totalRefCount = in.readInt();
// Reader details are not serialized for transport, only collected locally
this.readerDetails = null;
}

/**
Expand All @@ -53,6 +84,9 @@ public NodeDataFusionInfo(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(dataFusionVersion);
out.writeInt(activeReaderCount);
out.writeInt(totalRefCount);
// Reader details are not serialized for transport
}

/**
Expand All @@ -67,7 +101,30 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject();
builder.startObject("data_fusion_info");
builder.field("datafusion_version", dataFusionVersion);
builder.endObject();

// Add reader statistics
builder.startObject("reader_statistics");
builder.field("active_reader_count", activeReaderCount);
builder.field("total_ref_count", totalRefCount);

// Add detailed reader information if available
if (readerDetails != null && !readerDetails.isEmpty()) {
builder.startArray("readers");
for (Map.Entry<Long, DatafusionReaderRegistry.ReaderInfo> entry : readerDetails.entrySet()) {
DatafusionReaderRegistry.ReaderInfo info = entry.getValue();
builder.startObject();
builder.field("reader_id", info.readerId);
builder.field("shard_id", info.shardId);
builder.field("ref_count", info.getRefCount());
builder.field("directory_path", info.directoryPath);
builder.field("age_ms", System.currentTimeMillis() - info.registrationTime);
builder.endObject();
}
builder.endArray();
}

builder.endObject(); // reader_statistics
builder.endObject(); // data_fusion_info
builder.endObject();
return builder;
}
Expand All @@ -79,4 +136,20 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public String getDataFusionVersion() {
return dataFusionVersion;
}

/**
* Gets the active reader count.
* @return The active reader count.
*/
public int getActiveReaderCount() {
return activeReaderCount;
}

/**
* Gets the total reference count.
* @return The total reference count.
*/
public int getTotalRefCount() {
return totalRefCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ public class DatafusionReader implements Closeable {
* The catalog snapshot reference.
*/
private CompositeEngine.ReleasableRef<CatalogSnapshot> catalogSnapshotRef;
/**
* The unique reader ID in the global registry
*/
private Long registryId;
/**
* The shard ID this reader belongs to
*/
private String shardId;



/**
* Constructor
Expand Down Expand Up @@ -78,6 +88,11 @@ public void incRef() {
*/
public void decRef() {
readerHandle.close();
// Check if this was the last reference and unregister if so
if (readerHandle.getRefCount() == 0 && registryId != null) {
DatafusionReaderRegistry.getInstance().unregisterReader(registryId);
registryId = null; // Prevent double unregistration
}
}

/**
Expand All @@ -88,8 +103,45 @@ public int getRefCount() {
return readerHandle.getRefCount();
}

/**
* Set the registry ID for this reader
* @param registryId The registry ID
*/
public void setRegistryId(Long registryId) {
this.registryId = registryId;
}

/**
* Get the registry ID for this reader
* @return The registry ID
*/
public Long getRegistryId() {
return registryId;
}

/**
* Set the shard ID for this reader
* @param shardId The shard ID
*/
public void setShardId(String shardId) {
this.shardId = shardId;
}

/**
* Get the shard ID for this reader
* @return The shard ID
*/
public String getShardId() {
return shardId;
}

@Override
public void close() {
// Unregister from the global registry if registered
if (registryId != null) {
DatafusionReaderRegistry.getInstance().unregisterReader(registryId);
registryId = null; // Prevent double unregistration
}
readerHandle.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.ReferenceManager;
import org.opensearch.index.engine.CatalogSnapshotAwareRefreshListener;
import org.opensearch.index.engine.EngineReaderManager;
Expand All @@ -28,19 +30,30 @@
import java.util.List;

public class DatafusionReaderManager implements EngineReaderManager<DatafusionReader>, CatalogSnapshotAwareRefreshListener, FileDeletionListener {
private static final Logger logger = LogManager.getLogger(DatafusionReaderManager.class);

private DatafusionReader current;
private String path;
private String dataFormat;
private Consumer<List<String>> onFilesAdded;
private String shardId;
// private final Lock refreshLock = new ReentrantLock();
// private final List<ReferenceManager.RefreshListener> refreshListeners = new CopyOnWriteArrayList();

public DatafusionReaderManager(String path, Collection<FileMetadata> files, String dataFormat) throws IOException {
this(path, files, dataFormat, null);
}

public DatafusionReaderManager(String path, Collection<FileMetadata> files, String dataFormat, String shardId) throws IOException {
WriterFileSet writerFileSet = new WriterFileSet(Path.of(URI.create("file:///" + path)), 1);
files.forEach(fileMetadata -> writerFileSet.add(fileMetadata.file()));
this.current = new DatafusionReader(path, null, List.of(writerFileSet));
this.path = path;
this.dataFormat = dataFormat;
this.shardId = shardId != null ? shardId : path; // Use path as fallback if shardId not provided

// Register the initial reader with the global registry
registerReader(this.current);
}

/**
Expand Down Expand Up @@ -83,6 +96,19 @@ public void afterRefresh(boolean didRefresh, CompositeEngine.ReleasableRef<Catal
processFileChanges(List.of(), newFiles);
}
this.current = new DatafusionReader(this.path, catalogSnapshot, catalogSnapshot.getRef().getSearchableFiles(dataFormat));
// Register the new reader with the global registry
registerReader(this.current);
}
}

/**
* Register a reader with the global registry
*/
private void registerReader(DatafusionReader reader) {
if (reader != null) {
reader.setShardId(this.shardId);
long registryId = DatafusionReaderRegistry.getInstance().registerReader(this.shardId, reader);
reader.setRegistryId(registryId);
}
}

Expand Down Expand Up @@ -117,4 +143,8 @@ public void onFileDeleted(Collection<String> files) throws IOException {
// TODO - Hook cache eviction with deletion here
System.out.println("onFileDeleted call from DatafusionReader Manager: " + files);
}

public void getRefcount(DatafusionReader reader){
logger.info("READER id {}, shardID, reader fecount",reader.getRefCount());
}
}
Loading
Loading