Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Web console: use new sampler features #14017

Merged
merged 14 commits into from
Apr 7, 2023
9 changes: 6 additions & 3 deletions docs/development/extensions-core/kafka-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ This topic covers how to submit a supervisor spec to ingest event data, also kno
- For a walk-through, see the [Loading from Apache Kafka](../../tutorials/tutorial-kafka.md) tutorial.

## Kafka support

The Kafka indexing service supports transactional topics introduced in Kafka 0.11.x by default. The consumer for Kafka indexing service is incompatible with older Kafka brokers. If you are using an older version, refer to the [Kafka upgrade guide](https://kafka.apache.org/documentation/#upgrade).

Additionally, you can set `isolation.level` to `read_uncommitted` in `consumerProperties` if either:
Expand All @@ -51,6 +52,7 @@ If your Kafka cluster enables consumer-group based ACLs, you can set `group.id`
To use the Kafka indexing service, load the `druid-kafka-indexing-service` extension on both the Overlord and the MiddleManagers. See [Loading extensions](../extensions.md#loading-extensions) for instructions on how to configure extensions.

## Define a supervisor spec

Similar to the ingestion spec for batch ingestion, the supervisor spec configures the data ingestion for Kafka streaming ingestion. A supervisor spec has the following sections:
- `dataSchema` to specify the Druid datasource name, primary timestamp, dimensions, metrics, transforms, and any necessary filters.
- `ioConfig` to configure Kafka connection settings and configure how Druid parses the data. Kafka-specific connection details go in the `consumerProperties`. The `ioConfig` is also where you define the input format (`inputFormat`) of your Kafka data. For supported formats for Kafka and information on how to configure the input format, see [Data formats](../../ingestion/data-formats.md).
Expand Down Expand Up @@ -128,6 +130,7 @@ The following example demonstrates a supervisor spec for Kafka that uses the `JS
```

### Kafka input format supervisor spec example

If you want to ingest data from other fields in addition to the Kafka message contents, you can use the `kafka` input format. The `kafka` input format lets you ingest:
- the event key field
- event headers
Expand All @@ -141,7 +144,7 @@ For example, consider the following structure for a message that represents a fi
- **Event timestamp**: "Nov. 10, 2021 at 14:06"

When you use the `kafka` input format, you configure the way that Druid names the dimensions created from the Kafka message:
- `headerLabelPrefix`: Supply a prefix to the Kafka headers to avoid any conflicts with named dimensions. The default is `kafka.header`. Considering the header from the example, Druid maps the header to the following column: `kafka.header.environment`.
- `headerColumnPrefix`: Supply a prefix to the Kafka headers to avoid any conflicts with named dimensions. The default is `kafka.header`. Considering the header from the example, Druid maps the header to the following column: `kafka.header.environment`.
- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the Druid schema to avoid conflicts with other time columns. The default is `kafka.timestamp`.
- `keyColumnName`: Supply the name for the Kafka key column in Druid. The default is `kafka.key`.
Additionally, you must provide information about how Druid should parse the data in the Kafka message:
Expand All @@ -159,7 +162,7 @@ Additionally, you must provide information about how Druid should parse the data

For more information on data formats, see [Data formats](../../ingestion/data-formats.md).

Finally, add the Kafka message columns to the `dimensionsSpec`. For the key and timestamp, you can use the dimension names you defined for `keyColumnName` and `timestampColumnName`. For header dimensions, append the header key to the `headerLabelPrefix`. For example `kafka.header.environment`.
Finally, add the Kafka message columns to the `dimensionsSpec`. For the key and timestamp, you can use the dimension names you defined for `keyColumnName` and `timestampColumnName`. For header dimensions, append the header key to the `headerColumnPrefix`. For example `kafka.header.environment`.

The following supervisor spec demonstrates how to ingest the Kafka header, key, and timestamp into Druid dimensions:
```
Expand All @@ -174,7 +177,7 @@ The following supervisor spec demonstrates how to ingest the Kafka header, key,
"topic": "wiki-edits",
"inputFormat": {
"type": "kafka",
"headerLabelPrefix": "kafka.header.",
"headerColumnPrefix": "kafka.header.",
"timestampColumnName": "kafka.timestamp",
"keyColumnName": "kafka.key",
"headerFormat": {
Expand Down
4 changes: 2 additions & 2 deletions docs/ingestion/data-formats.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ Configure the Kafka `inputFormat` to load complete kafka records including heade
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| `type` | String | Set value to `kafka`. | yes |
| `headerLabelPrefix` | String | Custom label prefix for all the header columns. | no (default = "kafka.header.") |
| `headerColumnPrefix` | String | Custom prefix for all the header columns. | no (default = "kafka.header.") |
| `timestampColumnName` | String | Name of the column for the kafka record's timestamp.| no (default = "kafka.timestamp") |
| `keyColumnName` | String | Name of the column for the kafka record's key.| no (default = "kafka.key") |
| `headerFormat` | Object | `headerFormat` specifies how to parse the Kafka headers. Supports String types. Because Kafka header values are bytes, the parser decodes them as UTF-8 encoded strings. To change this behavior, implement your own parser based on the encoding style. Change the 'encoding' type in `KafkaStringHeaderFormat` to match your custom implementation. | no |
Expand All @@ -183,7 +183,7 @@ For example:
"ioConfig": {
"inputFormat": {
"type": "kafka",
"headerLabelPrefix": "kafka.header.",
"headerColumnPrefix": "kafka.header.",
"timestampColumnName": "kafka.timestamp",
"keyColumnName": "kafka.key",
"headerFormat":
Expand Down
2 changes: 1 addition & 1 deletion licenses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5733,7 +5733,7 @@ license_category: binary
module: web-console
license_name: Apache License version 2.0
copyright: Imply Data
version: 0.18.2
version: 0.18.3

---

Expand Down
8 changes: 4 additions & 4 deletions web-console/e2e-tests/reindexing.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ function validateConnectLocalData(preview: string) {
expect(lines.length).toBe(500);
const firstLine = lines[0];
expect(firstLine).toBe(
'Druid row: {' +
'[Druid row: {' +
'"__time":1442018818771' +
',"channel":"#en.wikipedia"' +
',"comment":"added project"' +
Expand All @@ -131,11 +131,11 @@ function validateConnectLocalData(preview: string) {
',"added":36' +
',"deleted":0' +
',"delta":36' +
'}',
'}]',
);
const lastLine = lines[lines.length - 1];
expect(lastLine).toBe(
'Druid row: {' +
'[Druid row: {' +
'"__time":1442020314823' +
',"channel":"#en.wikipedia"' +
',"comment":"/* History */[[WP:AWB/T|Typo fixing]], [[WP:AWB/T|typo(s) fixed]]: nothern → northern using [[Project:AWB|AWB]]"' +
Expand All @@ -150,7 +150,7 @@ function validateConnectLocalData(preview: string) {
',"added":1' +
',"deleted":0' +
',"delta":1' +
'}',
'}]',
);
}

Expand Down
14 changes: 7 additions & 7 deletions web-console/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion web-console/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
"d3-axis": "^2.1.0",
"d3-scale": "^3.3.0",
"d3-selection": "^2.0.0",
"druid-query-toolkit": "^0.18.2",
"druid-query-toolkit": "^0.18.3",
"file-saver": "^2.0.2",
"follow-redirects": "^1.14.7",
"fontsource-open-sans": "^3.0.9",
Expand Down
2 changes: 1 addition & 1 deletion web-console/script/create-sql-docs.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const snarkdown = require('snarkdown');

const writefile = 'lib/sql-docs.js';

const MINIMUM_EXPECTED_NUMBER_OF_FUNCTIONS = 164;
const MINIMUM_EXPECTED_NUMBER_OF_FUNCTIONS = 167;
const MINIMUM_EXPECTED_NUMBER_OF_DATA_TYPES = 14;

const initialFunctionDocs = {
Expand Down
4 changes: 2 additions & 2 deletions web-console/src/components/auto-form/auto-form.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export interface Field<M> {
hide?: Functor<M, boolean>;
hideInMore?: Functor<M, boolean>;
valueAdjustment?: (value: any) => any;
adjustment?: (model: Partial<M>) => Partial<M>;
adjustment?: (model: Partial<M>, oldModel: Partial<M>) => Partial<M>;
issueWithValue?: (value: any) => string | undefined;

customSummary?: (v: any) => string;
Expand Down Expand Up @@ -217,7 +217,7 @@ export class AutoForm<T extends Record<string, any>> extends React.PureComponent
}

if (field.adjustment) {
newModel = field.adjustment(newModel);
newModel = field.adjustment(newModel, model);
}

this.modelChange(newModel);
Expand Down
7 changes: 0 additions & 7 deletions web-console/src/console-application.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import {
import './console-application.scss';

export interface ConsoleApplicationProps {
exampleManifestsUrl?: string;
defaultQueryContext?: Record<string, any>;
mandatoryQueryContext?: Record<string, any>;
}
Expand Down Expand Up @@ -213,15 +212,12 @@ export class ConsoleApplication extends React.PureComponent<
};

private readonly wrappedDataLoaderView = () => {
const { exampleManifestsUrl } = this.props;

return this.wrapInViewContainer(
'data-loader',
<LoadDataView
mode="all"
initTaskId={this.taskId}
initSupervisorId={this.supervisorId}
exampleManifestsUrl={exampleManifestsUrl}
goToIngestion={this.goToIngestionWithTaskGroupId}
/>,
'narrow-pad',
Expand All @@ -241,14 +237,11 @@ export class ConsoleApplication extends React.PureComponent<
};

private readonly wrappedClassicBatchDataLoaderView = () => {
const { exampleManifestsUrl } = this.props;

return this.wrapInViewContainer(
'classic-batch-data-loader',
<LoadDataView
mode="batch"
initTaskId={this.taskId}
exampleManifestsUrl={exampleManifestsUrl}
goToIngestion={this.goToIngestionWithTaskGroupId}
/>,
'narrow-pad',
Expand Down
85 changes: 0 additions & 85 deletions web-console/src/druid-models/dimension-spec/dimension-spec.spec.ts

This file was deleted.

20 changes: 11 additions & 9 deletions web-console/src/druid-models/dimension-spec/dimension-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@

import type { Field } from '../../components';
import { filterMap, typeIs } from '../../utils';
import type { SampleHeaderAndRows } from '../../utils/sampler';
import { guessColumnTypeFromHeaderAndRows } from '../ingestion-spec/ingestion-spec';
import type { SampleResponse } from '../../utils/sampler';
import { getHeaderNamesFromSampleResponse } from '../../utils/sampler';
import { guessColumnTypeFromSampleResponse } from '../ingestion-spec/ingestion-spec';

export interface DimensionsSpec {
readonly dimensions?: (string | DimensionSpec)[];
readonly dimensionExclusions?: string[];
readonly spatialDimensions?: any[];
readonly includeAllDimensions?: boolean;
readonly useSchemaDiscovery?: boolean;
}

export interface DimensionSpec {
Expand Down Expand Up @@ -77,20 +80,19 @@ export function inflateDimensionSpec(dimensionSpec: string | DimensionSpec): Dim
}

export function getDimensionSpecs(
headerAndRows: SampleHeaderAndRows,
sampleResponse: SampleResponse,
typeHints: Record<string, string>,
guessNumericStringsAsNumbers: boolean,
hasRollup: boolean,
): (string | DimensionSpec)[] {
return filterMap(headerAndRows.header, h => {
if (h === '__time') return;
const type =
return filterMap(getHeaderNamesFromSampleResponse(sampleResponse, true), h => {
const dimensionType =
typeHints[h] ||
guessColumnTypeFromHeaderAndRows(headerAndRows, h, guessNumericStringsAsNumbers);
if (type === 'string') return h;
guessColumnTypeFromSampleResponse(sampleResponse, h, guessNumericStringsAsNumbers);
if (dimensionType === 'string') return h;
if (hasRollup) return;
return {
type,
type: dimensionType === 'COMPLEX<json>' ? 'json' : dimensionType,
name: h,
};
});
Expand Down
Loading