Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into proposal/issue-331/…
Browse files Browse the repository at this point in the history
…logs-metrics-collecton-script
  • Loading branch information
Djcarrillo6 committed Nov 19, 2023
2 parents 18373d1 + 103bc89 commit 06d12d6
Show file tree
Hide file tree
Showing 10 changed files with 207 additions and 5 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
### Deprecated
### Removed
### Fixed
- Fix `TypeError` on `parallel_bulk` ([#601](https://github.com/opensearch-project/opensearch-py/pull/601))
- Fix Amazon OpenSearch Serverless integration with LangChain ([#603](https://github.com/opensearch-project/opensearch-py/pull/603))
- Fix type of `Field.__setattr__` ([604](https://github.com/opensearch-project/opensearch-py/pull/604))
### Security

## [2.4.1]
Expand Down
58 changes: 58 additions & 0 deletions guides/bulk.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
- [Bulk Indexing](#bulk-indexing)
- [Line-Delimited JSON](#line-delimited-json)
- [Bulk Helper](#bulk-helper)
- [Parallel Bulk](#parallel-bulk)
- [Data Generator](#data-generator)

# Bulk Indexing

Expand Down Expand Up @@ -46,6 +48,8 @@ data = [
response = client.bulk(data)
if response["errors"]:
print(f"There were errors!")
for item in response["items"]:
print(f"{item['index']['status']}: {item['index']['error']['type']}")
else:
print(f"Bulk-inserted {len(rc['items'])} items.")
```
Expand All @@ -69,3 +73,57 @@ response = helpers.bulk(client, docs, max_retries=3)
print(response)
```

## Parallel Bulk

Bulk helpers support `parallel_bulk` which has options to turn off exceptions, chunk size, etc.

```python
succeeded = []
failed = []
for success, item in helpers.parallel_bulk(client,
actions=data,
chunk_size=10,
raise_on_error=False,
raise_on_exception=False,
max_chunk_bytes=20 * 1024 * 1024,
request_timeout=60):

if success:
succeeded.append(item)
else:
failed.append(item)

if len(failed) > 0:
print(f"There were {len(failed)} errors:")
for item in failed:
print(f"{item['index']['error']}: {item['index']['exception']}")

if len(succeeded) > 0:
print(f"Bulk-inserted {len(succeeded)} items.")
```

## Data Generator

Use a data generator function with bulk helpers instead of building arrays.

```python
def _generate_data():
for i in range(100):
yield {"_index": index_name, "_id": i, "value": i}

succeeded = []
failed = []
for success, item in helpers.parallel_bulk(client, actions=_generate_data()):
if success:
succeeded.append(item)
else:
failed.append(item)

if len(failed) > 0:
print(f"There were {len(failed)} errors:")
for item in failed:
print(item["index"]["error"])

if len(succeeded) > 0:
print(f"Bulk-inserted {len(succeeded)} items (streaming_bulk).")
```
11 changes: 10 additions & 1 deletion opensearchpy/helpers/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,8 @@ def parallel_bulk(
max_chunk_bytes: int = 100 * 1024 * 1024,
queue_size: int = 4,
expand_action_callback: Any = expand_action,
raise_on_exception: bool = True,
raise_on_error: bool = True,
ignore_status: Any = (),
*args: Any,
**kwargs: Any
Expand Down Expand Up @@ -485,7 +487,14 @@ def _setup_queues(self) -> None:
for result in pool.imap(
lambda bulk_chunk: list(
_process_bulk_chunk(
client, bulk_chunk[1], bulk_chunk[0], ignore_status, *args, **kwargs
client,
bulk_chunk[1],
bulk_chunk[0],
raise_on_exception,
raise_on_error,
ignore_status,
*args,
**kwargs
)
),
_chunk_actions(
Expand Down
2 changes: 2 additions & 0 deletions opensearchpy/helpers/signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class RequestsAWSV4SignerAuth(requests.auth.AuthBase):

def __init__(self, credentials, region, service: str = "es") -> None: # type: ignore
self.signer = AWSV4Signer(credentials, region, service)
self.service = service # tools like LangChain rely on this, see https://github.com/opensearch-project/opensearch-py/issues/600

def __call__(self, request): # type: ignore
return self._sign_request(request) # type: ignore
Expand Down Expand Up @@ -133,6 +134,7 @@ class AWSV4SignerAuth(RequestsAWSV4SignerAuth):
class Urllib3AWSV4SignerAuth(Callable): # type: ignore
def __init__(self, credentials, region, service: str = "es") -> None: # type: ignore
self.signer = AWSV4Signer(credentials, region, service)
self.service = service # tools like LangChain rely on this, see https://github.com/opensearch-project/opensearch-py/issues/600

def __call__(self, method: str, url: str, body: Any) -> Dict[str, str]:
return self.signer.sign(method, url, body)
2 changes: 1 addition & 1 deletion opensearchpy/helpers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ def __eq__(self, other: Any) -> bool:
def __ne__(self, other: Any) -> bool:
return not self == other

def __setattr__(self, name: str, value: Optional[bool]) -> None:
def __setattr__(self, name: str, value: Any) -> None:
if name.startswith("_"):
return super(DslBase, self).__setattr__(name, value)
return self._setattr(name, value)
Expand Down
52 changes: 51 additions & 1 deletion samples/bulk/bulk-helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@


import os
from typing import Any

from opensearchpy import OpenSearch, helpers

Expand Down Expand Up @@ -49,8 +50,57 @@
for i in range(100):
data.append({"_index": index_name, "_id": i, "value": i})

# serialized bulk raising an exception on error
rc = helpers.bulk(client, data)
print(f"Bulk-inserted {rc[0]} items.")
print(f"Bulk-inserted {rc[0]} items (bulk).")

# parallel bulk with explicit error checking
succeeded = []
failed = []
for success, item in helpers.parallel_bulk(
client,
actions=data,
chunk_size=10,
raise_on_error=False,
raise_on_exception=False,
max_chunk_bytes=20 * 1024 * 1024,
request_timeout=60,
):
if success:
succeeded.append(item)
else:
failed.append(item)

if len(failed) > 0:
print(f"There were {len(failed)} errors:")
for item in failed:
print(item["index"]["error"])

if len(succeeded) > 0:
print(f"Bulk-inserted {len(succeeded)} items (parallel_bulk).")


# streaming bulk with a data generator
def _generate_data() -> Any:
for i in range(100):
yield {"_index": index_name, "_id": i, "value": i}


succeeded = []
failed = []
for success, item in helpers.streaming_bulk(client, actions=_generate_data()):
if success:
succeeded.append(item)
else:
failed.append(item)

if len(failed) > 0:
print(f"There were {len(failed)} errors:")
for item in failed:
print(item["index"]["error"])

if len(succeeded) > 0:
print(f"Bulk-inserted {len(succeeded)} items (streaming_bulk).")

# delete index
client.indices.delete(index=index_name)
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ def test_aws_signer_as_http_auth(self) -> None:
from opensearchpy.helpers.signer import RequestsAWSV4SignerAuth

auth = RequestsAWSV4SignerAuth(self.mock_session(), region)
self.assertEqual(auth.service, "es")
con = RequestsHttpConnection(http_auth=auth)
prepared_request = requests.Request("GET", "http://localhost").prepare()
auth(prepared_request)
Expand All @@ -478,6 +479,7 @@ def test_aws_signer_when_service_is_specified(self) -> None:
from opensearchpy.helpers.signer import RequestsAWSV4SignerAuth

auth = RequestsAWSV4SignerAuth(self.mock_session(), region, service)
self.assertEqual(auth.service, service)
con = RequestsHttpConnection(http_auth=auth)
prepared_request = requests.Request("GET", "http://localhost").prepare()
auth(prepared_request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ def test_aws_signer_as_http_auth_adds_headers(self, mock_open: Any) -> None:
from opensearchpy.helpers.signer import Urllib3AWSV4SignerAuth

auth = Urllib3AWSV4SignerAuth(self.mock_session(), "us-west-2")
self.assertEqual(auth.service, "es")
con = Urllib3HttpConnection(http_auth=auth, headers={"x": "y"})
con.perform_request("GET", "/")
self.assertEqual(mock_open.call_count, 1)
Expand Down Expand Up @@ -249,6 +250,7 @@ def test_aws_signer_when_service_is_specified(self) -> None:
from opensearchpy.helpers.signer import Urllib3AWSV4SignerAuth

auth = Urllib3AWSV4SignerAuth(self.mock_session(), region, service)
self.assertEqual(auth.service, service)
headers = auth("GET", "http://localhost", None)
self.assertIn("Authorization", headers)
self.assertIn("X-Amz-Date", headers)
Expand Down
53 changes: 52 additions & 1 deletion test_opensearchpy/test_helpers/test_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,62 @@ def test_all_chunks_sent(self, _process_bulk_chunk: Any) -> None:

self.assertEqual(50, mock_process_bulk_chunk.call_count) # type: ignore

@mock.patch("opensearchpy.OpenSearch.bulk")
def test_with_all_options(self, _bulk: Any) -> None:
actions = ({"x": i} for i in range(100))
list(
helpers.parallel_bulk(
OpenSearch(),
actions=actions,
chunk_size=2,
raise_on_error=False,
raise_on_exception=False,
max_chunk_bytes=20 * 1024 * 1024,
request_timeout=160,
ignore_status=(123),
)
)

self.assertEqual(50, _bulk.call_count)
_bulk.assert_called_with(
'{"index":{}}\n{"x":98}\n{"index":{}}\n{"x":99}\n', request_timeout=160
)

@mock.patch("opensearchpy.helpers.actions._process_bulk_chunk")
def test_process_bulk_chunk_with_all_options(
self, _process_bulk_chunk: Any
) -> None:
actions = ({"x": i} for i in range(100))
client = OpenSearch()
list(
helpers.parallel_bulk(
client,
actions=actions,
chunk_size=2,
raise_on_error=True,
raise_on_exception=True,
max_chunk_bytes=20 * 1024 * 1024,
request_timeout=160,
ignore_status=(123),
)
)

self.assertEqual(50, _process_bulk_chunk.call_count)
_process_bulk_chunk.assert_called_with(
client,
['{"index":{}}', '{"x":98}', '{"index":{}}', '{"x":99}'],
[({"index": {}}, {"x": 98}), ({"index": {}}, {"x": 99})],
True,
True,
123,
request_timeout=160,
)

@pytest.mark.skip # type: ignore
@mock.patch(
"opensearchpy.helpers.actions._process_bulk_chunk",
# make sure we spend some time in the thread
side_effect=lambda *a: [
side_effect=lambda *args, **kwargs: [
(True, time.sleep(0.001) or threading.current_thread().ident) # type: ignore
],
)
Expand Down
27 changes: 26 additions & 1 deletion test_opensearchpy/test_helpers/test_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import pickle
from datetime import datetime
from hashlib import sha256
from typing import Any
from typing import Any, Union

from pytest import raises

Expand Down Expand Up @@ -648,3 +648,28 @@ class MySubDocWithNested(MyDoc):
},
"title": {"type": "keyword"},
}


def test_save_double(mock_client: Any) -> None:
class MyDocumentWithDouble(MyDoc):
a_double: Union[float, field.Double] = field.Double()

def save(
self,
using: Any = None,
index: Any = None,
validate: bool = True,
skip_empty: bool = True,
return_doc_meta: bool = False,
**kwargs: Any,
) -> Any:
if not self.a_double:
self.a_double = 3.14159265359
return super().save(
using, index, validate, skip_empty, return_doc_meta, **kwargs
)

md: Any = MyDocumentWithDouble()
with raises(ValidationException):
md.save(using="mock")
assert md.a_double == 3.14159265359

0 comments on commit 06d12d6

Please sign in to comment.