Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add integrations queries for Flint #1150

Merged
merged 25 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
283d663
Switch heading types on setup page
Swiddis Oct 17, 2023
d230445
Merge remote-tracking branch 'upstream/main' into new-headers
Swiddis Oct 19, 2023
55357df
Add alb and nginx create table queries
Swiddis Oct 23, 2023
900c02d
Merge remote-tracking branch 'upstream/main' into integ-queries
Swiddis Oct 23, 2023
725a4b4
Switch from toast to callout for set up failures
Swiddis Oct 23, 2023
75d4510
Fix label selection for truncated labels
Swiddis Oct 23, 2023
f89bc70
Fix button color
Swiddis Oct 23, 2023
a83df4c
Fix tests
Swiddis Oct 23, 2023
3c30aba
Remove loading progress bar
Swiddis Oct 23, 2023
78fa19c
Remove unused imports
Swiddis Oct 23, 2023
9f13cf0
Refactor labels to make distinctions more semantically useful
Swiddis Oct 23, 2023
08dfb97
Merge remote-tracking branch 'upstream/main' into integ-loading
Swiddis Oct 23, 2023
f9a397d
Merge branch 'integ-loading' into integ-tags
Swiddis Oct 23, 2023
2c048e8
Merge branch 'integ-tags' into integ-queries
Swiddis Oct 23, 2023
1a01617
Add running queries for ELB integration
Swiddis Oct 26, 2023
47cc88b
Update tests
Swiddis Oct 26, 2023
e7ffde6
Improve error handling for integration creation
Swiddis Oct 26, 2023
385db93
Merge remote-tracking branch 'upstream/main' into integ-queries
Swiddis Oct 26, 2023
23cd257
Resolve missed merge
Swiddis Oct 26, 2023
dba790b
Fix another missed merge marker
Swiddis Oct 26, 2023
744e7b3
Remove buggy table from nginx
Swiddis Oct 26, 2023
08867b5
Improve validation for integration setup
Swiddis Oct 26, 2023
c337aee
Make create flow actually use new field
Swiddis Oct 26, 2023
1f28a3c
Add onblur check to validation for s3 link
Swiddis Oct 26, 2023
0c266a6
Update queries for parsing raw logs
Swiddis Oct 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -354,12 +354,14 @@ exports[`Integration Setup Page Renders integration setup page as expected 1`] =
aria-describedby="random_html_id-help-0"
async={false}
compressed={false}
customOptionText="Select {searchValue} as your index"
fullWidth={false}
id="random_html_id"
isClearable={true}
isLoading={true}
onBlur={[Function]}
onChange={[Function]}
onCreateOption={[Function]}
onFocus={[Function]}
options={Array []}
selectedOptions={
Expand Down Expand Up @@ -817,6 +819,7 @@ exports[`Integration Setup Page Renders integration setup page as expected 1`] =
>
<EuiButtonEmpty
color="text"
disabled={false}
iconType="cross"
onClick={[Function]}
>
Expand Down Expand Up @@ -1354,12 +1357,14 @@ exports[`Integration Setup Page Renders the form as expected 1`] = `
aria-describedby="random_html_id-help-0"
async={false}
compressed={false}
customOptionText="Select {searchValue} as your index"
fullWidth={false}
id="random_html_id"
isClearable={true}
isLoading={true}
onBlur={[Function]}
onChange={[Function]}
onCreateOption={[Function]}
onFocus={[Function]}
options={Array []}
selectedOptions={
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ export async function addIntegrationRequest(
setToast: (title: string, color?: Color, text?: string | undefined) => void,
name?: string,
dataSource?: string
) {
): Promise<boolean> {
const http = coreRefs.http!;
if (addSample) {
createDataSourceMappings(
Expand All @@ -309,7 +309,7 @@ export async function addIntegrationRequest(
dataSource = `ss4o_${integration.type}-${integrationTemplateId}-sample-sample`;
}

const response: boolean = await http
let response: boolean = await http
.post(`${INTEGRATIONS_BASE}/store/${templateName}`, {
body: JSON.stringify({ name, dataSource }),
})
Expand All @@ -323,7 +323,7 @@ export async function addIntegrationRequest(
return false;
});
if (!addSample || !response) {
return;
return response;
}
const data: { sampleData: unknown[] } = await http
.get(`${INTEGRATIONS_BASE}/repository/${templateName}/data`)
Expand All @@ -337,16 +337,21 @@ export async function addIntegrationRequest(
data.sampleData
.map((record) => `{"create": { "_index": "${dataSource}" } }\n${JSON.stringify(record)}`)
.join('\n') + '\n';
http
response = await http
.post(CONSOLE_PROXY, {
body: requestBody,
query: {
path: `${dataSource}/_bulk?refresh=wait_for`,
method: 'POST',
},
})
.then((_) => {
return true;
})
.catch((err) => {
console.error(err);
setToast('Failed to load sample data', 'danger');
return false;
});
return response;
}
78 changes: 49 additions & 29 deletions public/components/integrations/components/setup_integration.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ const INTEGRATION_CONNECTION_DATA_SOURCE_TYPES: Map<
[
's3',
{
title: 'Table',
lower: 'table',
help: 'Select a table to pull the data from.',
title: 'Catalog',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why catalog here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the form isn't referring to a table, I asked around for a better name.

lower: 'catalog',
help: 'Select a catalog to pull the data from.',
},
],
[
Expand Down Expand Up @@ -126,40 +126,42 @@ const suggestDataSources = async (type: string): Promise<Array<{ label: string }

const runQuery = async (
query: string,
trackProgress: (step: number) => void
): Promise<Result<object>> => {
datasource: string,
sessionId: string | null
): Promise<Result<{ poll: object; sessionId: string }>> => {
// Used for polling
const sleep = (ms: number) => {
return new Promise((resolve) => setTimeout(resolve, ms));
};

try {
const http = coreRefs.http!;
const queryId = (
await http.post(CONSOLE_PROXY, {
body: JSON.stringify({ query, lang: 'sql' }),
query: {
path: '_plugins/_async_query',
method: 'POST',
},
})
).queryId;
const queryResponse: { queryId: string; sessionId: string } = await http.post(CONSOLE_PROXY, {
body: JSON.stringify({ query, datasource, lang: 'sql', sessionId }),
query: {
path: '_plugins/_async_query',
method: 'POST',
},
});
const [queryId, newSessionId] = [queryResponse.queryId, queryResponse.sessionId];
while (true) {
const poll = await http.post(CONSOLE_PROXY, {
const poll: { status: string; error?: string } = await http.post(CONSOLE_PROXY, {
body: '{}',
query: {
path: '_plugins/_async_query/' + queryId,
method: 'GET',
},
});
if (poll.status === 'PENDING') {
trackProgress(1);
} else if (poll.status === 'RUNNING') {
trackProgress(2);
} else if (poll.status === 'SUCCESS') {
trackProgress(3);
return { ok: true, value: poll };
} else if (poll.status === 'FAILURE') {
if (poll.status.toLowerCase() === 'success') {
return {
ok: true,
value: {
poll,
sessionId: newSessionId,
},
};
// Fail status can inconsistently be "failed" or "failure"
} else if (poll.status.toLowerCase().startsWith('fail')) {
return {
ok: false,
error: new Error(poll.error ?? 'No error information provided', { cause: poll }),
Expand Down Expand Up @@ -254,6 +256,16 @@ export function SetupIntegrationForm({
}}
selectedOptions={[{ label: config.connectionDataSource }]}
singleSelection={{ asPlainText: true }}
onCreateOption={(searchValue) => {
const normalizedSearchValue = searchValue.trim();
if (!normalizedSearchValue) {
return;
}
const newOption = { label: normalizedSearchValue };
setDataSourceSuggestions((ds) => ds.concat([newOption]));
updateConfig({ connectionDataSource: newOption.label });
}}
customOptionText={`Select {searchValue} as your ${connectionType.lower}`}
/>
</EuiFormRow>
{config.connectionType === 's3' ? (
Expand Down Expand Up @@ -305,6 +317,7 @@ export function SetupBottomBar({
hash = hash.substring(0, hash.lastIndexOf('/setup'));
window.location.hash = hash;
}}
disabled={loading}
>
Discard
</EuiButtonEmpty>
Expand All @@ -318,9 +331,10 @@ export function SetupBottomBar({
disabled={config.displayName.length < 1 || config.connectionDataSource.length < 1}
onClick={async () => {
setLoading(true);
let sessionId: string | null = null;

if (config.connectionType === 'index') {
await addIntegrationRequest(
const res = await addIntegrationRequest(
false,
integration.name,
config.displayName,
Expand All @@ -329,6 +343,9 @@ export function SetupBottomBar({
config.displayName,
config.connectionDataSource
);
if (!res) {
setLoading(false);
}
} else if (config.connectionType === 's3') {
const http = coreRefs.http!;

Expand All @@ -344,7 +361,8 @@ export function SetupBottomBar({
);
queryStr = queryStr.replaceAll('{s3_bucket_location}', config.connectionLocation);
queryStr = queryStr.replaceAll('{object_name}', integration.name);
const result = await runQuery(queryStr, (_) => {});
queryStr = queryStr.replaceAll(/\s+/g, ' ');
const result = await runQuery(queryStr, config.connectionDataSource, sessionId);
if (!result.ok) {
setLoading(false);
setCalloutLikeToast(
Expand All @@ -354,22 +372,25 @@ export function SetupBottomBar({
);
return;
}
sessionId = result.value.sessionId ?? sessionId;
}
// Once everything is ready, add the integration to the new datasource as usual
// TODO determine actual values here after more about queries is known
await addIntegrationRequest(
const res = await addIntegrationRequest(
false,
integration.name,
config.displayName,
integration,
setCalloutLikeToast,
config.displayName,
config.connectionDataSource
`flint_${config.connectionDataSource}_default_${integration.name}_mview`
);
if (!res) {
setLoading(false);
}
} else {
console.error('Invalid data source type');
}
setLoading(false);
}}
>
Add Integration
Expand Down Expand Up @@ -407,7 +428,6 @@ export function SetupIntegrationPage({ integration }: { integration: string }) {
} as IntegrationTemplate);

const [setupCallout, setSetupCallout] = useState({ show: false } as SetupCallout);

const [showLoading, setShowLoading] = useState(false);

useEffect(() => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"description": "AWS CloudTrail log collector",
"license": "Apache-2.0",
"type": "logs-aws_cloudtrail",
"labels": ["Observability", "Logs", "AWS", "Flint S3", "Cloud"],
"labels": ["Observability", "Logs", "AWS", "Cloud"],
"author": "OpenSearch",
"sourceUrl": "https://github.com/opensearch-project/dashboards-observability/tree/main/server/adaptors/integrations/__data__/repository/aws_cloudtrail/info",
"statics": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
CREATE MATERIALIZED VIEW
{table_name}_mview AS
SELECT
type as `aws.elb.elb_type`,
time as `@timestamp`,
elb as `aws.elb.elb_name`,
client_ip as `aws.elb.client.ip`,
client_port as `aws.elb.client.port`,
target_ip as `aws.elb.target_ip`,
target_port as `aws.elb.target_port`,
request_processing_time as `aws.elb.request_processing_time`,
target_processing_time as `aws.elb.target_processing_time`,
response_processing_time as `aws.elb.response_processing_time`,
elb_status_code as `aws.elb.elb_status_code`,
target_status_code as `aws.elb.target_status_code`,
received_bytes as `aws.elb.received_bytes`,
sent_bytes as `aws.elb.sent_bytes`,
request_verb as `http.request.method`,
request_url as `url.full`,
request_proto as `url.schema`,
user_agent as `http.user_agent.name`,
ssl_cipher as `aws.elb.ssl_cipher`,
ssl_protocol as `aws.elb.ssl_protocol`,
target_group_arn as `aws.elb.target_group_arn`,
trace_id as `traceId`,
domain_name as `url.domain`,
chosen_cert_arn as `aws.elb.chosen_cert_arn`,
matched_rule_priority as `aws.elb.matched_rule_priority`,
request_creation_time as `aws.elb.request_creation_time`,
actions_executed as `aws.elb.actions_executed`,
redirect_url as `aws.elb.redirect_url`,
lambda_error_reason as `aws.elb.lambda_error_reason`,
target_port_list as `aws.elb.target_port_list`,
target_status_code_list as `aws.elb.target_status_code_list`,
classification as `aws.elb.classification`,
classification_reason as `aws.elb.classification_reason`
FROM
{table_name};
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
CREATE EXTERNAL TABLE IF NOT EXISTS {table_name} (
type string,
time timestamp,
elb string,
client_ip string,
client_port bigint,
target_ip string,
target_port bigint,
request_processing_time double,
target_processing_time double,
response_processing_time double,
elb_status_code bigint,
target_status_code string,
received_bytes bigint,
sent_bytes bigint,
request_verb string,
request_url string,
request_proto string,
user_agent string,
ssl_cipher string,
ssl_protocol string,
target_group_arn string,
trace_id string,
domain_name string,
chosen_cert_arn string,
matched_rule_priority string,
request_creation_time string,
actions_executed string,
redirect_url string,
lambda_error_reason string,
target_port_list string,
target_status_code_list string,
classification string,
classification_reason string
)
USING parquet
LOCATION '{s3_bucket_location}';
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
REFRESH MATERIALIZED VIEW {table_name}_mview;
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,24 @@
"savedObjects": {
"name": "aws_elb",
"version": "1.0.0"
}
},
"queries": [
{
"name": "create_table",
"version": "1.0.0",
"language": "sql"
},
{
"name": "create_mv",
"version": "1.0.0",
"language": "sql"
},
{
"name": "refresh_mv",
"version": "1.0.0",
"language": "sql"
}
]
},
"sampleData": {
"path": "sample.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"description": "AWS VPC Flow log collector",
"license": "Apache-2.0",
"type": "logs_vpc",
"labels": ["Observability", "Logs", "AWS", "Flint S3", "Cloud"],
"labels": ["Observability", "Logs", "AWS", "Cloud"],
"author": "Haidong Wang",
"sourceUrl": "https://github.com/opensearch-project/dashboards-observability/tree/main/server/adaptors/integrations/__data__/repository/aws_vpc_flow/info",
"statics": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"description": "Nginx HTTP server collector",
"license": "Apache-2.0",
"type": "logs",
"labels": ["Observability", "Logs", "Flint S3"],
"labels": ["Observability", "Logs"],
"author": "OpenSearch",
"sourceUrl": "https://github.com/opensearch-project/dashboards-observability/tree/main/server/adaptors/integrations/__data__/repository/nginx/info",
"statics": {
Expand Down
Loading