Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add API to return automatic compaction config history #13699

Merged
merged 6 commits into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/operations/api-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,18 @@ Returns all automatic compaction configs.

Returns an automatic compaction config of a dataSource.

`GET /druid/coordinator/v1/config/compaction/{dataSource}/history?interval={interval}&count={count}`

Returns the history of the automatic compaction config for a dataSource. Optionally accepts `interval` and `count`
query string parameters to filter by interval and limit the number of results respectively. If the dataSource does not
exist or there is no compaction history for the dataSource, a 404 response is returned.

The response contains a list of objects with the following keys:
* `globalConfig`: A json object containing automatic compaction config that applies to the entire cluster.
* `compactionConfig`: A json object containing the automatic compaction config for the datasource.
* `auditInfo`: A json object that contains information about the change made - like `author`, `comment` and `ip`.
* `auditTime`: The date and time when the change was made.

`POST /druid/coordinator/v1/config/compaction/taskslots?ratio={someRatio}&max={someMaxSlots}`

Update the capacity for compaction tasks. `ratio` and `max` are used to limit the max number of compaction tasks.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.coordinator;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.audit.AuditInfo;
import org.joda.time.DateTime;

/**
* A DTO containing audit information for compaction config for a datasource.
*/
public class DataSourceCompactionConfigAuditEntry
{
private final GlobalCompactionConfig globalConfig;
private final DataSourceCompactionConfig compactionConfig;
private final AuditInfo auditInfo;
private final DateTime auditTime;

@JsonCreator
public DataSourceCompactionConfigAuditEntry(
@JsonProperty("globalConfig") GlobalCompactionConfig globalConfig,
@JsonProperty("compactionConfig") DataSourceCompactionConfig compactionConfig,
@JsonProperty("auditInfo") AuditInfo auditInfo,
@JsonProperty("auditTime") DateTime auditTime
)
{
this.globalConfig = globalConfig;
this.compactionConfig = compactionConfig;
this.auditInfo = auditInfo;
this.auditTime = auditTime;
}

@JsonProperty
public GlobalCompactionConfig getGlobalConfig()
{
return globalConfig;
}

@JsonProperty
public DataSourceCompactionConfig getCompactionConfig()
{
return compactionConfig;
}

@JsonProperty
public AuditInfo getAuditInfo()
{
return auditInfo;
}

@JsonProperty
public DateTime getAuditTime()
{
return auditTime;
}

/**
* A DTO containing compaction config for that affects the entire cluster.
*/
public static class GlobalCompactionConfig
{
private final double compactionTaskSlotRatio;
private final int maxCompactionTaskSlots;
private final boolean useAutoScaleSlots;

@JsonCreator
public GlobalCompactionConfig(
@JsonProperty("compactionTaskSlotRatio")
double compactionTaskSlotRatio,
@JsonProperty("maxCompactionTaskSlots") int maxCompactionTaskSlots,
@JsonProperty("useAutoScaleSlots") boolean useAutoScaleSlots
)
{
this.compactionTaskSlotRatio = compactionTaskSlotRatio;
this.maxCompactionTaskSlots = maxCompactionTaskSlots;
this.useAutoScaleSlots = useAutoScaleSlots;
}

@JsonProperty
public double getCompactionTaskSlotRatio()
{
return compactionTaskSlotRatio;
}

@JsonProperty
public int getMaxCompactionTaskSlots()
{
return maxCompactionTaskSlots;
}

@JsonProperty
public boolean isUseAutoScaleSlots()
{
return useAutoScaleSlots;
}

@JsonIgnore
public boolean hasSameConfig(CoordinatorCompactionConfig coordinatorCompactionConfig)
{
return useAutoScaleSlots == coordinatorCompactionConfig.isUseAutoScaleSlots() &&
compactionTaskSlotRatio == coordinatorCompactionConfig.getCompactionTaskSlotRatio() &&
coordinatorCompactionConfig.getMaxCompactionTaskSlots() == maxCompactionTaskSlots;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.coordinator;

import org.apache.druid.audit.AuditInfo;
import org.joda.time.DateTime;

import java.util.List;
import java.util.Stack;

/**
* A utility class to build the config history for a datasource from audit entries for
* {@link CoordinatorCompactionConfig}. The {@link CoordinatorCompactionConfig} contains the entire config for the
* cluster, so this class creates adds audit entires to the history only when a setting for this datasource or a global
* setting has changed.
*/
public class DataSourceCompactionConfigHistory
{
private final Stack<DataSourceCompactionConfigAuditEntry> auditEntries = new Stack<>();
private final String dataSource;

public DataSourceCompactionConfigHistory(String dataSource)
{
this.dataSource = dataSource;
}

public void add(CoordinatorCompactionConfig coordinatorCompactionConfig, AuditInfo auditInfo, DateTime auditTime)
{
DataSourceCompactionConfigAuditEntry current = auditEntries.isEmpty() ? null : auditEntries.peek();
DataSourceCompactionConfigAuditEntry newEntry = null;
boolean hasDataSourceCompactionConfig = false;
for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) {
if (dataSource.equals(dataSourceCompactionConfig.getDataSource())) {
suneet-s marked this conversation as resolved.
Show resolved Hide resolved
hasDataSourceCompactionConfig = true;
if (
current == null ||
(
!dataSourceCompactionConfig.equals(current.getCompactionConfig()) ||
!current.getGlobalConfig().hasSameConfig(coordinatorCompactionConfig)
)
) {
current = new DataSourceCompactionConfigAuditEntry(
new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig(
coordinatorCompactionConfig.getCompactionTaskSlotRatio(),
coordinatorCompactionConfig.getMaxCompactionTaskSlots(),
coordinatorCompactionConfig.isUseAutoScaleSlots()
),
dataSourceCompactionConfig,
auditInfo,
auditTime
);
newEntry = current;
}
break;
}
}
if (newEntry != null) {
auditEntries.push(newEntry);
} else if (current != null && !hasDataSourceCompactionConfig) {
newEntry = new DataSourceCompactionConfigAuditEntry(
new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig(
coordinatorCompactionConfig.getCompactionTaskSlotRatio(),
coordinatorCompactionConfig.getMaxCompactionTaskSlots(),
coordinatorCompactionConfig.isUseAutoScaleSlots()
),
null,
auditInfo,
auditTime
);
auditEntries.push(newEntry);
}
}

public List<DataSourceCompactionConfigAuditEntry> getHistory()
{
return auditEntries;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,28 @@

package org.apache.druid.server.http;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.audit.AuditEntry;
import org.apache.druid.audit.AuditInfo;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.common.config.ConfigManager.SetResult;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.common.utils.ServletResourceUtils;
import org.apache.druid.guice.annotations.JsonNonNull;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.MetadataStorageConnector;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfigHistory;
import org.apache.druid.server.http.security.ConfigResourceFilter;
import org.joda.time.Interval;

import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
Expand All @@ -49,6 +56,9 @@
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
Expand All @@ -67,17 +77,23 @@ public class CoordinatorCompactionConfigsResource
private final JacksonConfigManager manager;
private final MetadataStorageConnector connector;
private final MetadataStorageTablesConfig connectorConfig;
private final AuditManager auditManager;
private final ObjectMapper jsonMapperOnlyNonNullValue;

@Inject
public CoordinatorCompactionConfigsResource(
JacksonConfigManager manager,
MetadataStorageConnector connector,
MetadataStorageTablesConfig connectorConfig
MetadataStorageTablesConfig connectorConfig,
AuditManager auditManager,
@JsonNonNull ObjectMapper jsonMapperOnlyNonNullValue
)
{
this.manager = manager;
this.connector = connector;
this.connectorConfig = connectorConfig;
this.auditManager = auditManager;
this.jsonMapperOnlyNonNullValue = jsonMapperOnlyNonNullValue;
}

@GET
Expand All @@ -101,7 +117,10 @@ public Response setCompactionTaskLimit(
{
Callable<SetResult> callable = () -> {
final byte[] currentBytes = getCurrentConfigInByteFromDb();
final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(manager, currentBytes);
final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(
manager,
currentBytes
);
final CoordinatorCompactionConfig newCompactionConfig = CoordinatorCompactionConfig.from(
current,
compactionTaskSlotRatio,
Expand Down Expand Up @@ -130,7 +149,10 @@ public Response addOrUpdateCompactionConfig(
{
Callable<SetResult> callable = () -> {
final byte[] currentBytes = getCurrentConfigInByteFromDb();
final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(manager, currentBytes);
final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(
manager,
currentBytes
);
final CoordinatorCompactionConfig newCompactionConfig;
final Map<String, DataSourceCompactionConfig> newConfigs = current
.getCompactionConfigs()
Expand Down Expand Up @@ -168,6 +190,51 @@ public Response getCompactionConfig(@PathParam("dataSource") String dataSource)
return Response.ok().entity(config).build();
}

@GET
@Path("/{dataSource}/history")
@Produces(MediaType.APPLICATION_JSON)
public Response getCompactionConfigHistory(
@PathParam("dataSource") String dataSource,
@QueryParam("interval") String interval,
@QueryParam("count") Integer count
)
{
Interval theInterval = interval == null ? null : Intervals.of(interval);
try {
List<AuditEntry> auditEntries;
if (theInterval == null && count != null) {
auditEntries = auditManager.fetchAuditHistory(
CoordinatorCompactionConfig.CONFIG_KEY,
CoordinatorCompactionConfig.CONFIG_KEY,
count
);
} else {
auditEntries = auditManager.fetchAuditHistory(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can a user pass in both interval and count?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied the pattern in CoordinatorDynamicConfigsResource#getDatasourceRuleHistory where it looks like we ignore the count silently if the interval is specified.

CoordinatorCompactionConfig.CONFIG_KEY,
CoordinatorCompactionConfig.CONFIG_KEY,
theInterval
);
}
DataSourceCompactionConfigHistory history = new DataSourceCompactionConfigHistory(dataSource);
for (AuditEntry audit : auditEntries) {
CoordinatorCompactionConfig coordinatorCompactionConfig = CoordinatorCompactionConfig.convertByteToConfig(
manager,
audit.getPayload().getBytes(StandardCharsets.UTF_8)
);
history.add(coordinatorCompactionConfig, audit.getAuditInfo(), audit.getAuditTime());
}
if (history.getHistory().isEmpty()) {
return Response.status(Response.Status.NOT_FOUND).build();
}
return Response.ok(history.getHistory()).build();
}
catch (IllegalArgumentException e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ServletResourceUtils.sanitizeException(e))
.build();
}
}

@DELETE
@Path("/{dataSource}")
@Produces(MediaType.APPLICATION_JSON)
Expand All @@ -180,7 +247,10 @@ public Response deleteCompactionConfig(
{
Callable<SetResult> callable = () -> {
final byte[] currentBytes = getCurrentConfigInByteFromDb();
final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(manager, currentBytes);
final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(
manager,
currentBytes
);
final Map<String, DataSourceCompactionConfig> configs = current
.getCompactionConfigs()
.stream()
Expand Down
Loading