Skip to content

Commit 3f5d7d3

Browse files
Merge branch 'main' into navarone/use-api-keys-native-connectors
2 parents f95b3f5 + 208fa0f commit 3f5d7d3

17 files changed

+116
-206
lines changed

config.yml

+13
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,17 @@
110110
#elasticsearch.bulk.queue_max_mem_size: 25
111111
#
112112
#
113+
## Minimal interval of time between MemQueue checks for being full
114+
#elasticsearch.bulk.queue_refresh_interval: 1
115+
#
116+
#
117+
## Maximal interval of time during which MemQueue does not dequeue a single document
118+
## For example, if no documents were sent to Elasticsearch within 60 seconds because of
119+
## Elasticsearch being overloaded, then an error will be raised.
120+
## This mechanism exists to be a circuit-breaker for stuck jobs and stuck Elasticsearch.
121+
#elasticsearch.bulk.queue_refresh_timeout: 60
122+
#
123+
#
113124
## The max size in MB of a bulk request.
114125
## When the next request being prepared reaches that size, the query is
115126
## emitted even if `chunk_size` is not yet reached.
@@ -134,6 +145,8 @@
134145
#
135146
## Retry interval between failed bulk attempts
136147
#elasticsearch.bulk.retry_interval: 10
148+
#
149+
#
137150
## ------------------------------- Service ----------------------------------
138151
#
139152
## Connector service/framework related configurations

connectors/access_control.py

+24-16
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,26 @@
55
#
66

77
ACCESS_CONTROL = "_allow_access_control"
8+
DLS_QUERY = """{
9+
"bool": {
10+
"should": [
11+
{
12+
"bool": {
13+
"must_not": {
14+
"exists": {
15+
"field": "_allow_access_control"
16+
}
17+
}
18+
}
19+
},
20+
{
21+
"terms": {
22+
"_allow_access_control.enum": {{#toJson}}access_control{{/toJson}}
23+
}
24+
}
25+
]
26+
}
27+
}"""
828

929

1030
def prefix_identity(prefix, identity):
@@ -25,21 +45,9 @@ def es_access_control_query(access_control):
2545

2646
return {
2747
"query": {
28-
"template": {"params": {"access_control": filtered_access_control}},
29-
"source": {
30-
"bool": {
31-
"filter": {
32-
"bool": {
33-
"should": [
34-
{
35-
"terms": {
36-
f"{ACCESS_CONTROL}.enum": filtered_access_control
37-
}
38-
},
39-
]
40-
}
41-
}
42-
}
43-
},
48+
"template": {
49+
"params": {"access_control": filtered_access_control},
50+
"source": DLS_QUERY,
51+
}
4452
}
4553
}

connectors/config.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
#
21
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
32
# or more contributor license agreements. Licensed under the Elastic License 2.0;
43
# you may not use this file except in compliance with the Elastic License 2.0.
@@ -60,6 +59,8 @@ def _default_config():
6059
"bulk": {
6160
"queue_max_size": 1024,
6261
"queue_max_mem_size": 25,
62+
"queue_refresh_interval": 1,
63+
"queue_refresh_timeout": 600,
6364
"display_every": 100,
6465
"chunk_size": 1000,
6566
"max_concurrency": 5,

connectors/es/sink.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -809,8 +809,15 @@ async def async_bulk(
809809
retry_interval = options.get(
810810
"retry_interval", DEFAULT_ELASTICSEARCH_RETRY_INTERVAL
811811
)
812-
813-
stream = MemQueue(maxsize=queue_size, maxmemsize=queue_mem_size * 1024 * 1024)
812+
mem_queue_refresh_timeout = options.get("queue_refresh_timeout", 60)
813+
mem_queue_refresh_interval = options.get("queue_refresh_interval", 1)
814+
815+
stream = MemQueue(
816+
maxsize=queue_size,
817+
maxmemsize=queue_mem_size * 1024 * 1024,
818+
refresh_timeout=mem_queue_refresh_timeout,
819+
refresh_interval=mem_queue_refresh_interval,
820+
)
814821

815822
# start the fetcher
816823
self._extractor = Extractor(

connectors/services/job_scheduling.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ async def _should_schedule(job_type):
210210

211211
try:
212212
next_sync = connector.next_sync(job_type, last_wake_up_time)
213-
connector.log_debug(f"Next sync is at {next_sync}")
213+
connector.log_debug(f"Next '{job_type_value}' sync is at {next_sync}")
214214
except Exception as e:
215215
connector.log_critical(e, exc_info=True)
216216
await connector.error(str(e))

connectors/sources/network_drive.py

+3-8
Original file line numberDiff line numberDiff line change
@@ -343,9 +343,7 @@ def create_connection(self):
343343

344344
@cached_property
345345
def get_directory_details(self):
346-
return list(
347-
smbclient.walk(top=rf"\\{self.server_ip}/{self.drive_path}", port=self.port)
348-
)
346+
return list(smbclient.walk(top=rf"\\{self.server_ip}/{self.drive_path}"))
349347

350348
def find_matching_paths(self, advanced_rules):
351349
"""
@@ -409,9 +407,7 @@ async def get_files(self, path):
409407
files = []
410408
loop = asyncio.get_running_loop()
411409
try:
412-
files = await loop.run_in_executor(
413-
executor=None, func=partial(smbclient.scandir, path, port=self.port)
414-
)
410+
files = await loop.run_in_executor(None, smbclient.scandir, path)
415411
except SMBConnectionClosed as exception:
416412
self._logger.exception(
417413
f"Connection got closed. Error {exception}. Registering new session"
@@ -443,7 +439,7 @@ def fetch_file_content(self, path):
443439
"""
444440
try:
445441
with smbclient.open_file(
446-
path=path, encoding="utf-8", errors="ignore", mode="rb", port=self.port
442+
path=path, encoding="utf-8", errors="ignore", mode="rb"
447443
) as file:
448444
file_content, chunk = BytesIO(), True
449445
while chunk:
@@ -504,7 +500,6 @@ def list_file_permission(self, file_path, file_type, mode, access):
504500
buffering=0,
505501
file_type=file_type,
506502
desired_access=access,
507-
port=self.port,
508503
) as file:
509504
descriptor = self.security_info.get_descriptor(
510505
file_descriptor=file.fd, info=SECURITY_INFO_DACL

connectors/sources/sharepoint_online.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1021,7 +1021,7 @@ def _validate_sharepoint_rest_url(self, url):
10211021
actual_tenant_name = self._tenant_name_pattern.findall(url)[0]
10221022

10231023
if self._tenant_name != actual_tenant_name:
1024-
msg = f"Unable to call Sharepoint REST API - tenant name is invalid. Authenticated for tenant name: {self._tenant_name}, actual tenant name for the service: {actual_tenant_name}."
1024+
msg = f"Unable to call Sharepoint REST API - tenant name is invalid. Authenticated for tenant name: {self._tenant_name}, actual tenant name for the service: {actual_tenant_name}. For url: {url}"
10251025
raise InvalidSharepointTenant(msg)
10261026

10271027
async def close(self):

tests/sources/fixtures/oracle/connector.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@
2727
"label": "Port",
2828
"order": 2,
2929
"type": "int",
30-
"value": 9090
30+
"value": 1521
3131
},
3232
"username": {
3333
"label": "Username",
3434
"order": 3,
3535
"type": "str",
36-
"value": "admin"
36+
"value": "c##admin"
3737
},
3838
"password": {
3939
"label": "Password",
@@ -46,7 +46,7 @@
4646
"label": "Database",
4747
"order": 5,
4848
"type": "str",
49-
"value": "xe"
49+
"value": "FREE"
5050
},
5151
"tables": {
5252
"display": "textarea",

tests/sources/fixtures/oracle/docker-compose.yml

+5-5
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,11 @@ services:
7474
- esnet
7575

7676
oracle:
77-
image: gvenzl/oracle-xe:latest
77+
image: container-registry.oracle.com/database/free:latest
7878
ports:
79-
- 9090:1521
80-
environment:
81-
- ORACLE_PASSWORD=Password_123
79+
- 1521:1521
80+
environment:
81+
- ORACLE_PWD=Password_123
8282
restart: always
8383

8484
networks:
@@ -87,4 +87,4 @@ networks:
8787

8888
volumes:
8989
esdata:
90-
driver: local
90+
driver: local

tests/sources/fixtures/oracle/fixture.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
RETRIES = 1
2222
BATCH_SIZE = 100
2323

24-
USER = "admin"
24+
USER = "c##admin"
2525
PASSWORD = "Password_123"
2626
ENCODING = "UTF-8"
27-
DSN = "(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=127.0.0.1)(PORT=9090))(CONNECT_DATA=(SID=xe)))"
27+
DSN = "localhost:1521/FREE"
2828

2929
DATA_SIZE = os.environ.get("DATA_SIZE", "medium").lower()
3030

@@ -70,8 +70,8 @@ async def load():
7070
user="system", password=PASSWORD, dsn=DSN, encoding=ENCODING
7171
)
7272
cursor = connection.cursor()
73-
cursor.execute("CREATE USER admin IDENTIFIED by Password_123")
74-
cursor.execute("GRANT CONNECT, RESOURCE, DBA TO admin")
73+
cursor.execute(f"CREATE USER {USER} IDENTIFIED by {PASSWORD} CONTAINER=ALL")
74+
cursor.execute(f"GRANT CONNECT, RESOURCE, DBA TO {USER}")
7575
connection.commit()
7676

7777
connection = oracledb.connect(

tests/sources/test_confluence.py

+3-20
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from aiohttp import StreamReader
1515
from freezegun import freeze_time
1616

17+
from connectors.access_control import DLS_QUERY
1718
from connectors.protocol import Filter
1819
from connectors.source import ConfigurableFieldValueError
1920
from connectors.sources.confluence import (
@@ -982,26 +983,8 @@ async def test_get_access_control_dls_enabled():
982983
"group_id:607194d6bc3c3f006f4c35d8",
983984
"role_key:607194d6bc3c3f006f4c35d9",
984985
]
985-
}
986-
},
987-
"source": {
988-
"bool": {
989-
"filter": {
990-
"bool": {
991-
"should": [
992-
{
993-
"terms": {
994-
"_allow_access_control.enum": [
995-
"account_id:607194d6bc3c3f006f4c35d6",
996-
"group_id:607194d6bc3c3f006f4c35d8",
997-
"role_key:607194d6bc3c3f006f4c35d9",
998-
]
999-
}
1000-
},
1001-
]
1002-
}
1003-
}
1004-
}
986+
},
987+
"source": DLS_QUERY,
1005988
},
1006989
},
1007990
}

tests/sources/test_github.py

+4-21
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import pytest
1212
from aiohttp.client_exceptions import ClientResponseError
1313

14+
from connectors.access_control import DLS_QUERY
1415
from connectors.filtering.validation import SyncRuleValidationResult
1516
from connectors.protocol import Filter
1617
from connectors.source import ConfigurableFieldValueError
@@ -446,27 +447,9 @@
446447
"username:demo-user",
447448
"email:demo@example.com",
448449
]
449-
}
450-
},
451-
"source": {
452-
"bool": {
453-
"filter": {
454-
"bool": {
455-
"should": [
456-
{
457-
"terms": {
458-
"_allow_access_control.enum": [
459-
"user_id:#123",
460-
"username:demo-user",
461-
"email:demo@example.com",
462-
]
463-
}
464-
}
465-
]
466-
}
467-
}
468-
}
469-
},
450+
},
451+
"source": DLS_QUERY,
452+
}
470453
},
471454
}
472455
]

0 commit comments

Comments
 (0)