Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,9 @@ public enum BuiltinFunctionName {
.put("stddev_samp", BuiltinFunctionName.STDDEV_SAMP)
// .put("earliest", BuiltinFunctionName.EARLIEST)
// .put("latest", BuiltinFunctionName.LATEST)
.put("distinct_count_approx", BuiltinFunctionName.DISTINCT_COUNT_APPROX)
.put("dc", BuiltinFunctionName.DISTINCT_COUNT_APPROX)
.put("distinct_count", BuiltinFunctionName.DISTINCT_COUNT_APPROX)
.put("pattern", BuiltinFunctionName.INTERNAL_PATTERN)
.build();

Expand Down
24 changes: 24 additions & 0 deletions docs/user/ppl/cmd/eventstats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,28 @@ Example::
+----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+


DISTINCT_COUNT, DC(Since 3.3)
------------------

Description
>>>>>>>>>>>

Usage: DISTINCT_COUNT(expr), DC(expr). Returns the approximate number of distinct values of expr using HyperLogLog++ algorithm. Both ``DISTINCT_COUNT`` and ``DC`` are equivalent and provide the same functionality.

Example::

PPL> source=accounts | eventstats dc(state) as distinct_states, distinct_count(state) as dc_states_alt by gender;
fetched rows / total rows = 4/4
+----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-----------------+-----------------+
| account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | distinct_states | dc_states_alt |
|----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-----------------|-----------------|
| 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 1 | 1 |
| 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 3 | 3 |
| 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 3 | 3 |
| 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 3 | 3 |
+----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-----------------+-----------------+


Configuration
=============
This command requires Calcite enabled.
Expand Down Expand Up @@ -312,6 +334,8 @@ Eventstats::
source = table | where a < 50 | eventstats count(c)
source = table | eventstats min(c), max(c) by b
source = table | eventstats count(c) as count_by by b | where count_by > 1000
source = table | eventstats dc(field) as distinct_count
source = table | eventstats distinct_count(category) by region


Example 1: Calculate the average, sum and count of a field by group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,27 @@ public void supportPushDownScriptOnTextField() throws IOException {
assertJsonEqualsIgnoreId(expected, result);
}

@Test
public void testEventstatsDistinctCountExplain() throws IOException {
Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled());
String query =
"source=opensearch-sql_test_index_account | eventstats dc(state) as distinct_states";
var result = explainQueryToString(query);
String expected = loadFromFile("expectedOutput/calcite/explain_eventstats_dc.json");
assertJsonEqualsIgnoreId(expected, result);
}

@Test
public void testEventstatsDistinctCountFunctionExplain() throws IOException {
Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled());
String query =
"source=opensearch-sql_test_index_account | eventstats distinct_count(state) as"
+ " distinct_states by gender";
var result = explainQueryToString(query);
String expected = loadFromFile("expectedOutput/calcite/explain_eventstats_distinct_count.json");
assertJsonEqualsIgnoreId(expected, result);
}

/**
* Executes the PPL query and returns the result as a string with windows-style line breaks
* replaced with Unix-style ones.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import org.json.JSONObject;
import org.junit.Ignore;
import org.junit.jupiter.api.Test;
import org.opensearch.client.Request;
import org.opensearch.sql.legacy.TestsConstants;
import org.opensearch.sql.ppl.PPLIntegTestCase;

public class CalcitePPLEventstatsIT extends PPLIntegTestCase {
Expand Down Expand Up @@ -299,28 +297,6 @@ public void testUnsupportedWindowFunctions() {
}
}

@Ignore("DC should fail in window function")
public void testDistinctCountShouldFail() throws IOException {
Request request1 =
new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/5?refresh=true");
request1.setJsonEntity(
"{\"name\":\"Jim\",\"age\":27,\"state\":\"Ontario\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
client().performRequest(request1);
JSONObject actual =
executeQuery(
String.format(
"source=%s | eventstats distinct_count(state) by country",
TEST_INDEX_STATE_COUNTRY));

verifyDataRows(
actual,
rows("John", "Canada", "Ontario", 4, 2023, 25, 3),
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 3),
rows("Jim", "Canada", "Ontario", 4, 2023, 27, 3),
rows("Jake", "USA", "California", 4, 2023, 70, 2),
rows("Hello", "USA", "New York", 4, 2023, 30, 2));
}

@Test
public void testMultipleEventstat() throws IOException {
JSONObject actual =
Expand Down Expand Up @@ -617,6 +593,111 @@ public void testEventstatVarianceWithNullBy() throws IOException {
rows("Hello", "USA", "New York", 4, 2023, 30, 20, 28.284271247461902, 400, 800));
}

@Test
public void testEventstatDistinctCount() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eventstats dc(state) as dc_state", TEST_INDEX_STATE_COUNTRY));

verifySchemaInOrder(
actual,
schema("name", "string"),
schema("country", "string"),
schema("state", "string"),
schema("month", "int"),
schema("year", "int"),
schema("age", "int"),
schema("dc_state", "bigint"));

verifyDataRows(
actual,
rows("John", "Canada", "Ontario", 4, 2023, 25, 4),
rows("Jake", "USA", "California", 4, 2023, 70, 4),
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 4),
rows("Hello", "USA", "New York", 4, 2023, 30, 4));
}

@Test
public void testEventstatDistinctCountByCountry() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eventstats dc(state) as dc_state by country",
TEST_INDEX_STATE_COUNTRY));

verifySchemaInOrder(
actual,
schema("name", "string"),
schema("country", "string"),
schema("state", "string"),
schema("month", "int"),
schema("year", "int"),
schema("age", "int"),
schema("dc_state", "bigint"));

verifyDataRows(
actual,
rows("John", "Canada", "Ontario", 4, 2023, 25, 2),
rows("Jake", "USA", "California", 4, 2023, 70, 2),
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 2),
rows("Hello", "USA", "New York", 4, 2023, 30, 2));
}

@Test
public void testEventstatDistinctCountFunction() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eventstats distinct_count(country) as dc_country",
TEST_INDEX_STATE_COUNTRY));

verifySchemaInOrder(
actual,
schema("name", "string"),
schema("country", "string"),
schema("state", "string"),
schema("month", "int"),
schema("year", "int"),
schema("age", "int"),
schema("dc_country", "bigint"));

verifyDataRows(
actual,
rows("John", "Canada", "Ontario", 4, 2023, 25, 2),
rows("Jake", "USA", "California", 4, 2023, 70, 2),
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 2),
rows("Hello", "USA", "New York", 4, 2023, 30, 2));
}

@Test
public void testEventstatDistinctCountWithNull() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eventstats dc(state) as dc_state",
TEST_INDEX_STATE_COUNTRY_WITH_NULL));

verifySchemaInOrder(
actual,
schema("name", "string"),
schema("country", "string"),
schema("state", "string"),
schema("month", "int"),
schema("year", "int"),
schema("age", "int"),
schema("dc_state", "bigint"));

verifyDataRows(
actual,
rows(null, "Canada", null, 4, 2023, 10, 4),
rows("Kevin", null, null, 4, 2023, null, 4),
rows("John", "Canada", "Ontario", 4, 2023, 25, 4),
rows("Jake", "USA", "California", 4, 2023, 70, 4),
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 4),
rows("Hello", "USA", "New York", 4, 2023, 30, 4));
}

@Ignore
@Test
public void testEventstatEarliestAndLatest() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], distinct_states=[APPROX_DISTINCT_COUNT($7) OVER ()])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableWindow(window#0=[window(aggs [APPROX_DISTINCT_COUNT($7)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], distinct_states=[APPROX_DISTINCT_COUNT($7) OVER (PARTITION BY $4)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableWindow(window#0=[window(partition {4} aggs [APPROX_DISTINCT_COUNT($7)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
}
}
2 changes: 2 additions & 0 deletions ppl/src/main/antlr/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,8 @@ scalarWindowFunctionName
| LAST
| NTH
| NTILE
| DISTINCT_COUNT
| DC
;

// aggregation terms
Expand Down
Loading