Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Design proposal] Point in time search #3960

Closed
bharath-techie opened this issue Jul 20, 2022 · 3 comments
Closed

[Design proposal] Point in time search #3960

bharath-techie opened this issue Jul 20, 2022 · 3 comments
Labels
enhancement Enhancement or improvement to existing feature or request v2.4.0 'Issues and PRs related to version v2.4.0'

Comments

@bharath-techie
Copy link
Contributor

bharath-techie commented Jul 20, 2022

Overview

Today, in OpenSearch, if you run the same search query at different points of time, chances are you will get different result as data is constantly changing. However, in real world scenario when analysing data or trying to provide a consistent user experience to your end users you may want the result from a query not to change while the context remains the same and control when changes should appear in the result set. You want to be able to run multiple queries on the same data set and paginate through the data set expecting consistent result. This is not possible using current available options in OpenSearch.

OpenSearch currently supports the following options to achieve pagination, each having a certain limitation:

  1. Scroll API : Scroll API cannot share point in time context with other queries. Moreover, the scroll API only allows to move forwards(next page) in the search, cases when the client sends the request for a page but fails to get a response, a subsequent retry call skips the page(retried for) and returns the next page in the scroll.
  2. Search After : The search_after mechanism doesn't preserve the state of data when the search was issued, so one can paginate using the key (search_after) and fetch subsequent pages while getting more recent results since the search was issued as the pagination progresses.
  3. From To : This mechanism does not support deep pagination since every page request requires the shard to process all previous results and then filter the requested page which might be taxing deeper the pagination goes

Point in time search can be used to address some of the above shortcomings.

What is Point in Time Search

User can create a Point In Time for a set of indices. A Point In Time is a set of segments which are frozen in time taken at the moment of creation. We lock the segments of those indices’ shards and create contexts to access and query only those shards.
The Create PIT API returns a PIT Id. This PIT ID can be used as part of multiple search queries to get results on that view of data. The indices might ingest more documents or process modification/deletion of documents, but the Point In Time created will be immune to that and provide a view of data which remains unchanged from the time of creation. This allows users to run multiple search queries on a given Point In Time and gives the same result every time.

For feature proposal please refer to - #1147

Screenshot 2022-02-23 at 3 37 04 PM

PIT only takes data into account the moment a PIT is created. This implies on a lower level that none of the resources that are required to return the data from the initial request are modified or deleted. Segments are kept around, even though the segment might already have been merged away and is not needed for the live data set. This results in keeping more data around than just the live data set. More data means more segments, more file handles, and more heap to keep metadata from segments in the heap.

Requirements

Functional Requirements

  1. Ability to submit a request to create a point in time for indices and get a PIT Id.
  2. Ability to pass PIT Id in search request to query the segments kept alive as a part of the Point In Time. (Already implemented)
  3. Ability to set a keep-alive to restrict the amount of time we retain the PIT shards’ segments (reader contexts)
  4. Ability to set indices, preferences, routing, indices_options in a PIT in the same fashion as a _search.
  5. Ability to submit a delete PIT.
  6. Replace scroll API as the primary pagination solution (PIT + search_after, search slicing)

Non functional Requirements

  1. Idempotency — Submitting the same search request with a given PIT Id should return the same result every time. If pagination is used, the n-th page should have the same result every time.
  2. PIT should work unaffected by shard relocations.
  3. Integration with OpenSearch Dashboards
  4. Integration with OS High Level Rest Client
  5. Expired PITs should be eventually cleaned up.
  6. User should be able to able to control PIT resource consumption through OpenSearch Settings.

Scope

The document scope is limited to Core OpenSearch changes needed to support PIT. For V1, we are considering following functionalities

  1. Support for create and delete PIT.
  2. PIT expiry after a keep-alive.
  3. Using PIT Id in search request.
  4. Support for delete all PITs
  5. Monitoring PIT disk utilization.
  6. Support for search slicing
  7. Security integration
  8. Opensearch dashboard support

Point In Time Constructs

Point In Time Id

The identifier for a Point In Time, base64 URL encoded. It chiefly encapsulates the following information - for each shard that’s a part of the PIT it tells which node has the PIT segments i.e it has map of shard Id to node routing. Every node containing the copy of a given shard will not have segments marked for PIT.

SearchContextId:
    Map<ShardId, SearchContextIdForNode> shards;
SearchContextIdForNode:
    String node;
    ShardSearchContextId searchContextId; //unique id generated for each shard reader context.

Point in time reader context

Point In Time Reader Context

For one copy of each shard in the PIT, a PIT Reader Context is created. A Reader Context holds a reference to a Lucene IndexSearcher. An Index Searcher implements search over a single IndexReader. IndexReader is a construct that provides an interface for accessing a point-in-time view of an index. Any changes made to the index via IndexWriter will not be visible until a new IndexReader is opened. Hence by virtue of holding onto the searcher we are able to exclusively query the shards at that Point In Time.

The PIT Reader Context also holds information about the segments it retains for that shard. This helps us in monitoring the disk utilization of PITs.

PITReaderContext:
    ShardSearchContextId id
    List<Segments>
    TimeValue keepAlive
    TimeValue lastAccessedTime
    SearcherSupplier
    AbstractRefCounted refCounted;
    String pitId

What happens when we delete the PIT while a Point In Time Search is running:

When a PIT is marked for deletion, its reference count (refCounted in above POJO) is decremented by 1. But every search query that is using this PIT Reader Context will increment the reference by 1. The PIT Reader Context is only deleted when the refCounted hits 0. Hence even if a PIT is requested to be deleted, it is reaped from memory and resources held are released only once all searches are completed and no references are being held.

High Level API Interactions

Create Point In Time API

Unlike a scroll, by creating a dedicated PIT, we decouple the context from a single query and make it re-usable across arbitrary search requests by passing the PIT Id. We can achieve this by using the Create Point In Time API.

POST "/<target indices>/point_in_time?*keep_alive*=1h&routing=&expand_wildcards=&preference=" 
{
    "id": "o463QQEPbXktaW5kZXgtMDAwMDAxFnNOWU43ckt3U3IyaFVpbGE1UWEtMncAFjFyeXBsRGJmVFM2RTB6eVg1aVVqQncAAAAAAAAAAAIWcDVrM3ZIX0pRNS1XejE5YXRPRFhzUQEWc05ZTjdyS3dTcjJoVWlsYTVRYS0ydwAA",
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "creation_time": 1658146050064
}
  • Request
    • The path parameter is the target index/indices(all values eligible for /target/_search) are accepted.
    • The keep_alive parameter tells OpenSearch how long it should keep a point in time alive. It’s a mandatory query parameter.
    • We limit the maximum keep-alive of a PIT using a cluster level setting "point_in_time.max_keep_alive" (defaults to 24h [keeping it the same as scrolls. Users can change it as required.])
    • We limit the number of open PIT contexts on each node using a node level setting "search.max_open_pit_context" (Defaults to 300. )
    • The optional parameters are preference, routing, expand_wildcards which have the same function as compared to passing them in a _search request.
    • The allow partial results parameter by default is false and is an optional parameter to allow PIT creation with partial failures.
  • The result from the above request includes an id, which should be passed to the id of the pit parameter of a search request. Result will also contain _shards results as part of the response. Rest status will be based on the shards results.

Components

  1. RestCreatePITHandler — The Rest endpoint for handling the create point in time request from the client. The rest handler is responsible for parsing the request, converting, delegating the request to the local transport handler, registering a response listener to be invoked once the response is ready to be invoked.
  2. TransportCreatePITAction
    1. The local transport handler for the request. To open readers for the shards which are a part of the PIT indices requested, we make a morphed search request to assemble a list of Shard Targets (gives info about shard id+ index + node id) and only create and store contexts for those shards i.e. for only a single copy of each shard. Why are we calling this morphed? We don’t follow this query/fetch phase.
    2. In a normal search request, once we have a list of shards, we execute a phase on the shard - Shard Query Phase or Shard Fetch Phase.
    3. Phase 1 ( create PIT reader context ) - Instead, in the case of PIT, we execute a single PIT-specific phase for each shard which calls SearchService.createPitReaderContext() on each node and sets a temporary keep-alive. ( 30 seconds )
    4. Phase 2 ( save PIT ID in PIT reader context ) - Once we get the response of phase 1 from all the shards, we form the PIT ID and make another call to the same shards to store the PIT ID and also update the keep alive from the request. Creation time will also be broadcasted to all the shards along with PIT id and keep alive to keep it consistent.
    5. Will finalize which threadpool (generic or search) after running load tests.
  3. TransportSearchAction — The plumbing logic to support the above mentioned single phase (create contexts Phase)search request has been done here. The PIT request’s indices, preferences, Once all the PIT shard contexts are opened, a search response is built from the shard phase results(list of per-shard PIT Context Id+node). The PIT Id is an encoding of these very results and is generated and returned as a part of this search response.

Why we need to store PIT ID :

  • When a user creates a Point In Time, we generate a PIT Id which is a hash of information about which nodes the PIT shards lie on. This PIT id is only returned as a response to the create PIT API call.
  • If the user couldn’t make note of the PIT Id or if he loses the PIT Id, he would have lost access to the Point In Time for the entirety of its duration and has no way to even delete the PIT without invoking a delete-all PIT’s command.
  • Users are not able to tag a set of PITs with labels to enable better classification(PITs can be created over different sets of indices at different times) and and management of point in times.

createpit-1(1)

Failure Scenarios:

Phase 1

  • Coordinator node fails:
    • request returns 5xx
    • Any PIT contexts created will get cleared asap.
  • All nodes or all shards fail:
    • request returns 5xx. no pit contexts created
  • Some nodes or some shards fail:
    • request moves forward.
    • failed shards/nodes wont have pit contexts.
    • PIT Id will be created from successful PIT context list

Phase 2

  • If any of the shard update operation fails , we will fail the entire operation.
  • Coordinator node fails after broadcasting requests to some or all PIT contexts to update keep alive and PIT Id
    • request returns 5xx
    • updated PIT contexts will now become zombies and wont be reaped until keep alive elapses
  • Coordinator node fails before broadcasting update PIT Id, keep-alive request to any context:
    • request returns 5xx
    • PIT contexts will get cleared asap
  • Coordinator node fails after updating all contexts and before returning response to client (I dont know if this is a plausible scenario)
    • request returns 5xx
    • updated PIT contexts will now become zombies and wont be reaped until keep alive elapses
  • At least one PIT request fails to update PIT with PIT id, keep alive:
    • Coordinator node calls delete PIT.

Delete Point In Time API

PITs are automatically closed when the keep_alive is elapsed. However, keeping PITs has a cost; hence, they should be deleted as soon as they are no longer used in search requests. We may also delete a PIT and free the resources before its keep alive using the Delete Point In Time APIs.

`DELETE /_search/point_in_time

{
    "pit_id": [
        "o463QQEPbXktaW5kZXgtMDAwMDAxFkhGN09fMVlPUkVPLXh6MUExZ1hpaEEAFjBGbmVEZHdGU1EtaFhhUFc4ZkR5cWcAAAAAAAAAAAEWaXBPNVJtZEhTZDZXTWFFR05waXdWZwEWSEY3T18xWU9SRU8teHoxQTFnWGloQQAA",
        "o463QQEPbXktaW5kZXgtMDAwMDAxFkhGN09fMVlPUkVPLXh6MUExZ1hpaEEAFjBGbmVEZHdGU1EtaFhhUFc4ZkR5cWcAAAAAAAAAAAIWaXBPNVJtZEhTZDZXTWFFR05waXdWZwEWSEY3T18xWU9SRU8teHoxQTFnWGloQQAA"
    ]
}`

(or) Delete All :
`DELETE /_search/point_in_time/_all`
{
    "pits": [
        {
            "successful": true,
            "pit_id": "o463QQEPbXktaW5kZXgtMDAwMDAxFkhGN09fMVlPUkVPLXh6MUExZ1hpaEEAFjBGbmVEZHdGU1EtaFhhUFc4ZkR5cWcAAAAAAAAAAAEWaXBPNVJtZEhTZDZXTWFFR05waXdWZwEWSEY3T18xWU9SRU8teHoxQTFnWGloQQAA"
        },
        {
            "successful": false,
            "pit_id": "o463QQEPbXktaW5kZXgtMDAwMDAxFkhGN09fMVlPUkVPLXh6MUExZ1hpaEEAFjBGbmVEZHdGU1EtaFhhUFc4ZkR5cWcAAAAAAAAAAAIWaXBPNVJtZEhTZDZXTWFFR05waXdWZwEWSEY3T18xWU9SRU8teHoxQTFnWGloQQAA"
        }
    ]
}

Request body: (Required, string or array of strings) PIT IDs to be cleared. To clear all PIT IDs, use _all.
Response body:
For each PIT Id requested to be deleted, we return a nested object with following fields

  • pitId : pit id

  • successful : whether the PIT is successfully deleted. Partial or complete failure are treated as failures.

delete PIT

Cross cluster behavior
Delete by ID fully supports deleting cross cluster PITs.
Delete All only deletes PITs which are local and also mixed (PITs created from both local and remote clusters)
Fully remote PITs won't be deleted by Delete ALL API.

List all PIT API

This API returns all PIT IDs present in the ES cluster which can be used by the clients / UI

GET "/_point_in_time/_all
{
    "pits": [
        {
            "pit_id": "o463QQEPbXktaW5kZXgtMDAwMDAxFnNOWU43ckt3U3IyaFVpbGE1UWEtMncAFjFyeXBsRGJmVFM2RTB6eVg1aVVqQncAAAAAAAAAAAEWcDVrM3ZIX0pRNS1XejE5YXRPRFhzUQEWc05ZTjdyS3dTcjJoVWlsYTVRYS0ydwAA",
            "creation_time": 1658146048666,
            "keep_alive": 6000000
        },
        {
            "pit_id": "o463QQEPbXktaW5kZXgtMDAwMDAxFnNOWU43ckt3U3IyaFVpbGE1UWEtMncAFjFyeXBsRGJmVFM2RTB6eVg1aVVqQncAAAAAAAAAAAIWcDVrM3ZIX0pRNS1XejE5YXRPRFhzUQEWc05ZTjdyS3dTcjJoVWlsYTVRYS0ydwAA",
            "creation_time": 1658146050064,
            "keep_alive": 6000000
        }
    ]
}

Response body:
_nodes - Contains success and failure stats of all the nodes.
Id - PIT id
Creation time - PIT creation time ( same creation time is propagated to all reader contexts during create pit )
Keep Alive - Keep alive of PIT id

listallpits(1)

Cross cluster behavior
List all retrieves PITs which are local and also mixed (PITs created from both local and remote clusters)
Fully remote PITs won't be retrieved by List ALL PITs API.

Failure scenarios
If request to all nodes fail - request returns 5xx
If request to some nodes fail - success/failure info will be part of the response

Point In Time Segments API

This API provides information about the disk utilization of a PIT. It returns low-level information about the Lucene segments a PIT is comprised of; similar to the cat segments API.

How do we get the information about the exact segments that are utilised by a PIT? When we open reader contexts for each of the PIT’s shards, we also store the segment info in the PIT reader Context.

% curl "localhost:9200/_point_in_time/_segments
`
{
    "pit_id": [
        "o463QQEPbXktaW5kZXgtMDAwMDAxFkhGN09fMVlPUkVPLXh6MUExZ1hpaEEAFjBGbmVEZHdGU1EtaFhhUFc4ZkR5cWcAAAAAAAAAAAEWaXBPNVJtZEhTZDZXTWFFR05waXdWZwEWSEY3T18xWU9SRU8teHoxQTFnWGloQQAA",
        "o463QQEPbXktaW5kZXgtMDAwMDAxFkhGN09fMVlPUkVPLXh6MUExZ1hpaEEAFjBGbmVEZHdGU1EtaFhhUFc4ZkR5cWcAAAAAAAAAAAIWaXBPNVJtZEhTZDZXTWFFR05waXdWZwEWSEY3T18xWU9SRU8teHoxQTFnWGloQQAA"
    ]
}`

or

 curl "localhost:9200/_point_in_time/_segments/_all"

index  shard prirep ip            segment generation docs.count docs.deleted  size size.memory committed searchable version compound
index1 0     r      10.212.36.190 _0               0          4            0 3.8kb        1364 false     true       8.8.2   true
index1 1     p      10.212.36.190 _0               0          3            0 3.7kb        1364 false     true       8.8.2   true
index1 2     r      10.212.74.139 _0               0          2            0 3.6kb        1364 false     true       8.8.2   true

Request body: PIT ID
If PIT ID is not mentioned segments details of all PITs will be returned.

  • Components:
  1. RestPITSegmentsHandler — The Rest endpoint for handling the PIT segments request from the client. The rest handler is responsible for parsing the request, converting, delegating the request to the local transport handler, registering a response listener to be invoked once the response is ready to be invoked. From the PIT segments info received from the transport, builds a tabular response
  2. TransportCreatePITAction — The local transport handler for the request. It implements the abstraction, TransportBroadcastByNodeAction, for transporting aggregated shard-level operations - in this case fetching segments info from the reader context - in a single request per-node and executing the shard-level operations serially on the receiving node. Each shard-level operation can produce a result, these per-node shard-level results are aggregated into a single result to the coordinating node. These per-node results are aggregated into a single result (Result) to the client.

pit segments api

ENHANCEMENT to the existing node stats API to capture PIT stats

You can check how many PIT contexts are active with the nodes stats API in the search section. The API already shows similar information for Scrolls. We are simply replicating the logic to publish similar stats for PITs.


%  curl "localhost:9200/_nodes/stats/indices/search?pretty"
{
  "_nodes" : {
    "total" : 9,
    "successful" : 9,
    "failed" : 0
  },
  "cluster_name" : "pit_test_cluster",
  "nodes" : {
    "XnAGGeNARgKjsI4eG1rbwQ" : {
      "name" : "ff4f05aa900bd9d5bc4c3fb1f37440ff",
      ...
      ]
      "indices" : {
        "search" : {
           <--- new fields --->
          "pit_total" : 9,
          "pit_time_in_millis" : 11451670,
          "open_pit_contexts" : 4,
          <------------------->
          "open_contexts" : 4,
          "query_total" : 489,
          "query_time_in_millis" : 164,
          "query_current" : 0,
          "fetch_total" : 372,
          "fetch_time_in_millis" : 27,
          "fetch_current" : 0,
          "scroll_total" : 109,
          "scroll_time_in_millis" : 340,
          "scroll_current" : 0,
          "suggest_total" : 0,
          "suggest_time_in_millis" : 0,
          "suggest_current" : 0
        }
      }
    },
    ...
    ...
  }
}
  • open_pit_contexts indicates the number of pit contexts currently open
  • pit_total indicates the number of scroll contexts that have been created (completed+active) since the node last restarted
  • pit_time_in_millis indicates the time that pit contexts have been held open since the node last restarted

Using a PIT Id in a search request

[Support for submitting a search query with a Point In Time Id already exists.]
The result from the above request includes a id, which should be passed to the id of the pit parameter of a search request.A search request with the pit parameter must not specify index, routing, and preference as these parameters are copied from the point in time. The id parameter tells OpenSearch to execute the request using contexts from this point in time. Optionally, the keep_alive parameter can be passed in the PIT body. It tells OpenSearch how long it should extend the retention of the point in time.

GET /_search
{
  "size": 10000, 
  "query": {
    "match_all": {}
  },
{
    "pit": {
	    "id":  "o463QQEPbXktaW5kZXgtMDAwMDAxFmZSQjFEb3o4U0VtbU9Mb0RMQnY4VEEAFnZJRF9KclhaUWVtSmQ4VjFBSzlMSWcAAAAAAAAAAAIWVVJkTEJpdkhUN21VMWNmRllOSDdEZwEWZlJCMURvejhTRW1tT0xvRExCdjhUQQAA", 
	    "keep_alive": "1m"  <--- optional to extend keep alive
    }
},
  "sort": [
    {
      "name.keyword": {
        "order": "desc"
      }
    }
  ]
}

Running a Point In Time Search After PIT has expired
This will cause the search query to fail with SearchPhaseExecutionException as all shards in the query would fail because PIT contexts wouldn’t be found for any of them.

{
"error": "SearchPhaseExecutionException[Failed to execute phase [query], all shards failed; shardFailures {[MHxs9PqFTq6ctF_g15Tneg][guba][2]: SearchContextMissingException",
}

Screenshot 2021-10-25 at 12 00 38 AM

Pagination using Point In Time Search

Using search_after

When running a Point In Time Search (search queries with PIT Id), you can use the search_after parameter to retrieve the next page of hits using a set of sort values from the previous page. Using search_after requires multiple search requests with the same query and sort values.
User can first run a search query with PIT.

GET /_search
{
  "size": 10000,
  "query": {
    "match" : {
      "user.id" : "elkbee"
    }
  },
  "pit": {
    "id":  "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", 
    "keep_alive": "100m"
  },
  "sort": [ 
    {"@timestamp": {"order": "asc", "format": "strict_date_optional_time_nanos"}},
    {"_shard_doc": "desc"}
  ]
}

The response would have 10000 results. To get the next page of 10000 results, user can rerun the previous search using the last doc’s sort values as the search_after argument. The search’s query and sort and pit.Id
arguments must remain unchanged.

GET /_search
{
  "size": 10000,
  "query": {
    "match" : {
      "user.id" : "elkbee"
    }
  },
  "pit": {
    "id":  "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", 
    "keep_alive": "100m"
  },
  "sort": [ 
    {"@timestamp": {"order": "asc", "format": "strict_date_optional_time_nanos"}},
    {"_shard_doc": "desc"}
  ],
  "search_after": [  
    "2021-05-20T05:30:04.832Z"
    ],
}

Fetching multiple pages in parallel
Using PIT with search_after gives you control over the ordering of results in pages of results. It requires user to fetch one page after another because user needs to know the last result in the current page to fetch the next page of results. This caters to many use cases but some users might not be bothered about the ordering and prefer leaving the pagination to Opensearch. For eg. they simply want 100k results paginated into 10 pages. They also might need all 10 pages at once and not sequentially and they might need the ability to jump from page 3 to page 10 without having to know which document to “search_after“. That’s where the second pagination option, slicing, is a solution

Search slicing

PIT searches can further be improved by slicing them, i.e. split the PIT search in multiple slices which can be consumed independently by your client application.
So, say you have a query which is supposed to return 1,000,000 hits, and you want to PIT search over that result set in batches of 50,000 hits, using a normal PIT query (i.e. without slicing), your client application will have to make the first PIT call and then 20 more synchronous calls (i.e. one after another) to retrieve each batch of 50K hits.
By using slicing, you can parallelize the 20 PIT calls. If your client application is multi-threaded, you can make each PIT call use 5 (e.g.) slices, and thus, you'll end up with 5 slices of ~10K hits that can be consumed by 5 different threads in your application, instead of having a single thread consume 50K hits. You can thus leverage the full computing power of your client application to consume those hits.


GET /_search
{
  "slice": {
    "id": 0,  // id is the slice(page) number being requested. In every request we can only query for one slice                    
    "max": 2  // max is the total number of slices (pages) the search response will be broken down into                  
  },
  "query": {
    "match": {
      "message": "foo"
    }
  },
  "pit": {
    "id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
  }
}
GET /_search
    {
  "slice": {
    "id": 1,
    "max": 2
  },
  "pit": {
    "id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
  },
  "query": {
    "match": {
      "message": "foo"
    }
  }
}

Replacing Scroll API

Point In Time Search feature supports deep pagination when used with search_after.

We get the following benefits with PIT, compared to a scroll, as a pagination solution :

  SCROLLS PIT
1 Scrolls create new contexts for every query. Contexts created by PIT can be re-used by multiple queries.
2 Scrolls only move forwards i.e. we can only retrieve the next page. If current page fails, even those results can’t be re-queried. Search with PIT and search_after param allows jumping back and forth, hence providing support for retrieving previous pages too.

Security model

For opensearch clusters where security plugin is enabled, this following section is applicable.

Users will be able to access PIT APIs using the role point_in_time_full_access.

Role:

# Allows user to use point in time functionality
point_in_time_full_access:
  reserved: true
  index_permissions:
    - index_patterns:
        - '*'
      allowed_actions:
        - 'manage_point_in_time'

For Alias and data stream behavior :

  • PIT IDs always contain the resolved indices ( underlying indices ) when saved.
    Based on this,
  • For alias, user must have either 'index' or 'alias' permission for any PIT operation.
  • For data stream, user must have both 'data stream' AND 'backing indices of data stream' permission ( eg : data-stream-11 + .ds-my-data-stream11-000001 ) for any PIT operation.
  • With just data stream permission, user will be able to create pit but will not be able to use the PIT ID for other operations such as search without backing indices permission.

Protection and Resiliency

  • Index deletion will delete all PIT contexts for shards belonging to that index.
  • The context reaper service currently present to remove expired scroll contexts will also remove the expired PIT contexts. There is no additional change needed here.
  • During PIT creation, if all copies of a shard are unassigned, PIT context is not created for that shard. (We can print shard failures in create PIT response) Search requests using that PIT Id will not show shard failures for those shards and that total shards field in the search response will not reflect the shards which don’t have PIT contexts

Limitations

  • OS Process Restart : Since PIT contexts info are stored as a map on the nodes; in the case of OS process dying/restarting, we lose info of the PIT contexts present on the node. This would surface as partial shard failures (with Search_Context_Missing_Exception) in the search response for queries with the PIT Id . The segment files that are being retained by the PIT and not a part of the live data are deleted when OS process is killed or restarted.
  • Shard Relocation :When a shard relocates (or during hot to warm migration of an index), PIT segments belonging to the shard will remain on the current node and don’t get relocated. (This behaviour is inherited from scrolls) Currently PIT Id is a hash of a unique id and a map with ShardId as key and Node+ReaderContextId as value. In the current design, since PIT segments do not move from Node A to Node B, the PIT Id will point at Node A where the segments will be found inspite of shard relocating. Hence even if a shard relocates to a new node, search queries with PIT Id will continue to work normally. A store provides plain access to files written by an OpenSearch index shard. Each shard has a dedicated store that is uses to access Lucene's Directory which represents the lowest level of file abstraction in Lucene used to read and write Lucene indices. This class also provides access to metadata information like checksums for committed files. A committed file is a file that belongs to a segment written by a Lucene commit. When a shard relocates, it decrements the reference counter for the shard Store. If there are PITs active for the shard on the current node, the reference count for the shard doesn’t go to 0 during shard relocation, hence the PIT segments are retained on the node and still queryable.

pit shard relocaiton

Monitoring

  • Point in time segments API
  • Node stats API

Feature Compliance

PIT supports

  • Cross Cluster Search
  • Async search
  • Search Slicing similar to scroll.

Enhancements for V2

We can consider storing PIT segments information on disk i.e. which node contains which shard’s PIT segments. That way we would be able to relocate PIT segments to new nodes when shards relocate, make PIT resilient to OS process restarts and not be dependent on decoding PIT Id for routing requests to various PIT segments.

APPENDIX

How a Point In Time retains segments

This doc tries to give an insight into why segments are retained by PITs and how they are blocked from being merged away or deleted as part of the live data set.

Keeping segments around that are not needed for live data also means that you need more disk space to keep those segments alive, as those cannot be deleted until the PIT id is deleted. The way this works internally is by using reference counting. As long as there is a component (like a PIT search) holding a reference to the data (for example via an open file handle against an inode) there is no final deletion of that data, even though it is not part of the live dataset anymore. This is also the reason why the PIT id exists. By specifying it as part of the query, you are specifying which state you want to query.

This behaviour is already present with Scrolls implementation and has simply been re-used for Point In Time.

How Objects with RefCounted are closed

public final boolean decRef() {
    int i = refCount.decrementAndGet();
    assert i >= 0;
    if (i == 0) {
        closeInternal();
        return true;
    }
    return false;
}

Callers can only invoke incRef() when they are trying to use the resource and decRef() to release. If the refCount is 0, only then close() is called.

What happens when we free/close a PIT Reader Context

How A Point In Time Retains Segments

Update PIT reader context API

Because of the implementation changes we are doing for ‘List all PIT’ API, we can easily update the PIT context as well now.
Note : Search API can be used to update keep alive of one pit id.
Update use cases :

  1. Update id ( which is equivalent to the above )
  2. Update list
  3. Update all

Update list and update all use cases have ambiguity on how the user would use it, for example - for update list, whether we would extend the same keep alive for the entire list of PITs or if we want to extend different keep alive for different PITs.

Also we are currently not sure if update all solves any use case of the user.
So, we’ll see if need for update APIs arises once PIT goes to production and implement this API if needed.

@bharath-techie bharath-techie added enhancement Enhancement or improvement to existing feature or request untriaged labels Jul 20, 2022
@andrross andrross added discuss Issues intended to help drive brainstorming and decision making RFC Issues requesting major changes and removed untriaged labels Jul 20, 2022
@bharath-techie bharath-techie changed the title [Design proposal] [In progress] Point in time search [Design proposal] Point in time search Jul 27, 2022
@Bukhtawar Bukhtawar added roadmap v2.3.0 'Issues and PRs related to version v2.3.0' labels Aug 1, 2022
@dreamer-89
Copy link
Member

@bharath-techie : As today Sep 07, is the code freeze date for OpenSearch. Is there anything pending on this issue ?

@dreamer-89 dreamer-89 added v2.4.0 'Issues and PRs related to version v2.4.0' and removed v2.3.0 'Issues and PRs related to version v2.3.0' labels Sep 8, 2022
@Bukhtawar Bukhtawar removed discuss Issues intended to help drive brainstorming and decision making RFC Issues requesting major changes labels Oct 27, 2022
@anasalkouz
Copy link
Member

@bharath-techie is this still on track for 2.4 release? code freeze on 11/3
Is there anything pending? feel free to close it or change the label to 2.5

@dhruv16dhr
Copy link

@anasalkouz Yes this is on track for 2.4 release and will be closing it by next week (before 11/3)
@bharath-techie Please close this before 11/3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement or improvement to existing feature or request v2.4.0 'Issues and PRs related to version v2.4.0'
Projects
None yet
Development

No branches or pull requests

6 participants