Skip to content

Commit

Permalink
feat(topicdata): adding end timestamp filter (#1629)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexisSouquiere authored Dec 17, 2023
1 parent 2780a78 commit 02fb136
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 33 deletions.
13 changes: 11 additions & 2 deletions client/src/components/DatePicker/DatePicker.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,19 @@ class DatePicker extends Component {

render = () => {
const { value } = this.state;
const { showDateTimeInput, showTimeInput, showTimeSelect, onClear } = this.props;
const { showDateTimeInput, showTimeInput, showTimeSelect, onClear, label } = this.props;
return (
<div style={{ display: 'block', padding: 10 }}>
{showDateTimeInput && (
<div style={{ marginBottom: 10, display: 'flex', flexDirection: 'row' }}>
<div
style={{
marginBottom: 10,
display: 'flex',
flexDirection: 'row',
alignItems: 'center'
}}
>
{label && <div style={{ marginRight: 10 }}>{label}</div>}
<input
value={this.getDisplayValue(value)}
className="form-control"
Expand Down Expand Up @@ -91,6 +99,7 @@ class DatePicker extends Component {
}

DatePicker.propTypes = {
label: PropTypes.string,
value: PropTypes.string,
onChange: PropTypes.func,
showDateTimeInput: PropTypes.bool,
Expand Down
113 changes: 82 additions & 31 deletions client/src/containers/Topic/Topic/TopicData/TopicData.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class TopicData extends Root {
partitionOptions: [],
offsetsOptions: [],
timestamp: '',
endTimestamp: '',
search: {
key: { text: '', type: 'C' },
value: { text: '', type: 'C' },
Expand All @@ -68,6 +69,7 @@ class TopicData extends Root {
selectedTopic: this.props.topicId,
cleanupPolicy: '',
datetime: '',
endDatetime: '',
roles: JSON.parse(sessionStorage.getItem('roles')),
canDeleteRecords: false,
percent: 0,
Expand Down Expand Up @@ -123,6 +125,9 @@ class TopicData extends Root {
: this.state.sortBy,
partition: query.get('partition') ? query.get('partition') : this.state.partition,
datetime: query.get('timestamp') ? new Date(query.get('timestamp')) : this.state.datetime,
endDatetime: query.get('endTimestamp')
? new Date(query.get('endTimestamp'))
: this.state.endDatetime,
offsetsSearch: query.get('after') ? query.get('after') : this.state.offsetsSearch,
search: this._buildSearchFromQueryString(query),
offsets: query.get('offset')
Expand Down Expand Up @@ -259,7 +264,7 @@ class TopicData extends Root {
}

_buildFilters() {
const { sortBy, partition, datetime, offsetsSearch, search } = this.state;
const { sortBy, partition, datetime, endDatetime, offsetsSearch, search } = this.state;

const filters = [];

Expand All @@ -268,21 +273,11 @@ class TopicData extends Root {
if (partition) filters.push(`partition=${partition}`);

if (datetime) {
let timestamp = datetime.toString().length > 0 ? moment(datetime) : '';
timestamp =
formatDateTime(
{
year: timestamp.year(),
monthValue: timestamp.month(),
dayOfMonth: timestamp.date(),
hour: timestamp.hour(),
minute: timestamp.minute(),
second: timestamp.second(),
milli: timestamp.millisecond()
},
'YYYY-MM-DDTHH:mm:ss.SSS'
) + 'Z';
filters.push(`timestamp=${timestamp}`);
filters.push(`timestamp=${this._buildTimestampFilter(datetime)}`);
}

if (endDatetime) {
filters.push(`endTimestamp=${this._buildTimestampFilter(endDatetime)}`);
}

Object.keys(search)
Expand All @@ -297,6 +292,24 @@ class TopicData extends Root {
return filters.join('&');
}

_buildTimestampFilter(datetime) {
let timestamp = datetime.toString().length > 0 ? moment(datetime) : '';
return (
formatDateTime(
{
year: timestamp.year(),
monthValue: timestamp.month(),
dayOfMonth: timestamp.date(),
hour: timestamp.hour(),
minute: timestamp.minute(),
second: timestamp.second(),
milli: timestamp.millisecond()
},
'YYYY-MM-DDTHH:mm:ss.SSS'
) + 'Z'
);
}

_searchMessages(changePage = false) {
this._stopEventSource();
if (this._hasAnyFilterFilled()) {
Expand Down Expand Up @@ -769,6 +782,7 @@ class TopicData extends Root {
recordCount,
showFilters,
datetime,
endDatetime,
isSearching,
canDeleteRecords,
canDownload,
Expand All @@ -783,6 +797,7 @@ class TopicData extends Root {
if (canDownload) actions.push(constants.TABLE_DOWNLOAD);

let date = moment(datetime);
let endDate = moment(endDatetime);
const { history } = this.props;
const firstColumns = [
{ colName: 'Key', colSpan: 1 },
Expand Down Expand Up @@ -868,7 +883,7 @@ class TopicData extends Root {
<Dropdown.Toggle className="nav-link dropdown-toggle">
<strong>Timestamp UTC:</strong>
{datetime !== '' &&
' ' +
' From ' +
formatDateTime(
{
year: date.year(),
Expand All @@ -880,23 +895,59 @@ class TopicData extends Root {
},
'DD-MM-YYYY HH:mm'
)}
{endDatetime !== '' &&
' To ' +
formatDateTime(
{
year: endDate.year(),
monthValue: endDate.month(),
dayOfMonth: endDate.date(),
hour: endDate.hour(),
minute: endDate.minute(),
second: endDate.second()
},
'DD-MM-YYYY HH:mm'
)}
</Dropdown.Toggle>
{!loading && (
<Dropdown.Menu>
<div className="input-group">
<DatePicker
onClear={() => {
this.setState({ datetime: '' });
}}
showDateTimeInput
showTimeSelect
value={datetime}
onChange={value => {
this.setState({ datetime: value }, () => {
this._searchMessages();
});
}}
/>
<div style={{ display: 'flex' }}>
<div>
<DatePicker
label="Start"
onClear={() => {
this.setState({ datetime: '' }, () => {
this._searchMessages();
});
}}
showDateTimeInput
showTimeSelect
value={datetime}
onChange={value => {
this.setState({ datetime: value }, () => {
this._searchMessages();
});
}}
/>
</div>
<div>
<DatePicker
label="End"
onClear={() => {
this.setState({ endDatetime: '' }, () => {
this._searchMessages();
});
}}
showDateTimeInput
showTimeSelect
value={endDatetime}
onChange={value => {
this.setState({ endDatetime: value }, () => {
this._searchMessages();
});
}}
/>
</div>
</div>
</Dropdown.Menu>
)}
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/org/akhq/controllers/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ public ResultNextList<Record> data(
Optional<Integer> partition,
Optional<RecordRepository.Options.Sort> sort,
Optional<String> timestamp,
Optional<String> endTimestamp,
Optional<String> searchByKey,
Optional<String> searchByValue,
Optional<String> searchByHeaderKey,
Expand All @@ -220,6 +221,7 @@ public ResultNextList<Record> data(
partition,
sort,
timestamp,
endTimestamp,
searchByKey,
searchByValue,
searchByHeaderKey,
Expand Down Expand Up @@ -389,6 +391,7 @@ public Publisher<Event<SearchRecord>> sse(
Optional<Integer> partition,
Optional<RecordRepository.Options.Sort> sort,
Optional<String> timestamp,
Optional<String> endTimestamp,
Optional<String> searchByKey,
Optional<String> searchByValue,
Optional<String> searchByHeaderKey,
Expand All @@ -405,6 +408,7 @@ public Publisher<Event<SearchRecord>> sse(
partition,
sort,
timestamp,
endTimestamp,
searchByKey,
searchByValue,
searchByHeaderKey,
Expand Down Expand Up @@ -460,6 +464,7 @@ public ResultNextList<Record> record(
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty()
);

Expand Down Expand Up @@ -538,6 +543,7 @@ public RecordRepository.CopyResult copy(
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty()
);

Expand All @@ -558,6 +564,7 @@ private RecordRepository.Options dataSearchOptions(
Optional<Integer> partition,
Optional<RecordRepository.Options.Sort> sort,
Optional<String> timestamp,
Optional<String> endTimestamp,
Optional<String> searchByKey,
Optional<String> searchByValue,
Optional<String> searchByHeaderKey,
Expand All @@ -571,6 +578,7 @@ private RecordRepository.Options dataSearchOptions(
partition.ifPresent(options::setPartition);
sort.ifPresent(options::setSort);
timestamp.map(r -> Instant.parse(r).toEpochMilli()).ifPresent(options::setTimestamp);
endTimestamp.map(r -> Instant.parse(r).toEpochMilli()).ifPresent(options::setEndTimestamp);
after.ifPresent(options::setAfter);
searchByKey.ifPresent(options::setSearchByKey);
searchByValue.ifPresent(options::setSearchByValue);
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,19 @@ private boolean matchFilters(BaseOptions options, Record record) {
return true;
}

private boolean matchFilters(Options options, Record record) {
if (!matchFilters((BaseOptions) options, record)) {
return false;
}

if (options.getEndTimestamp() != null) {
return record.getTimestamp().toInstant().toEpochMilli() <= options.getEndTimestamp();
}

return true;
}


private boolean matchFilter(Search searchFilter, Collection<String> stringsToSearch) {
switch (searchFilter.searchMatchType) {
case EQUALS:
Expand Down Expand Up @@ -1251,6 +1264,7 @@ public enum Sort {
private Sort sort;
private Integer partition;
private Long timestamp;
private Long endTimestamp;

public Options(Environment environment, String clusterId, String topic) {
this.sort = Sort.OLDEST;
Expand Down

0 comments on commit 02fb136

Please sign in to comment.