Skip to content
This repository has been archived by the owner on Jan 4, 2022. It is now read-only.

Commit

Permalink
Merge pull request #3190 from cjellick/branch-v0.183.37
Browse files Browse the repository at this point in the history
Backport cleanup logic fixes
  • Loading branch information
ibuildthecloud authored Apr 3, 2018
2 parents ac6eb5c + 75b82cd commit e7a5635
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@ public class CleanableTable {
public final Table<?> table;
public final Field<Long> idField;
public final Field<Date> removeField;
public final boolean referenceCheckOnly;

private Integer rowsDeleted = 0;
private Integer rowsSkipped = 0;

private CleanableTable(Table<?> table) {
this(table, null);
private CleanableTable(Table<?> table, boolean refCheckOnly) {
this(table, null, refCheckOnly);
}

private CleanableTable(Table<?> table, Field<Date> removedField) {
private CleanableTable(Table<?> table, Field<Date> removedField, boolean refCheckOnly) {
this.table = table;
this.idField = getIdField(table);
this.removeField = removedField == null ? getRemoveField(table) : removedField;
this.referenceCheckOnly = refCheckOnly;
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -82,10 +84,14 @@ public String toString() {
}

public static CleanableTable from(Table<?> table) {
return new CleanableTable(table);
return new CleanableTable(table, false);
}

public static CleanableTable from(Table<?> table, Field<Date> removedField) {
return new CleanableTable(table, removedField);
return new CleanableTable(table, removedField, false);
}

public static CleanableTable forReference(Table<?> table) {
return new CleanableTable(table, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import io.cattle.platform.core.model.tables.UserPreferenceTable;
import io.cattle.platform.core.model.tables.VolumeStoragePoolMapTable;
import io.cattle.platform.core.model.tables.VolumeTable;
import io.cattle.platform.core.model.tables.VolumeTemplateTable;
import io.cattle.platform.core.model.tables.ZoneTable;
import io.cattle.platform.db.jooq.dao.impl.AbstractJooqDao;
import io.cattle.platform.object.jooq.utils.JooqUtils;
Expand All @@ -81,15 +82,19 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.HashSet;
import java.util.List;

import org.jooq.Field;
import org.jooq.ForeignKey;
import org.jooq.Param;
import org.jooq.Query;
import org.jooq.Record1;
import org.jooq.Result;
import org.jooq.ResultQuery;
import org.jooq.Table;
import org.jooq.exception.DataAccessException;
import org.jooq.impl.DSL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -101,6 +106,8 @@
*/
public class TableCleanup extends AbstractJooqDao implements Task {

private static final String ID_GREATER_THAN_PARAM = "idGreaterThan";

public static final Long SECOND_MILLIS = 1000L;

private static final Logger log = LoggerFactory.getLogger(TableCleanup.class);
Expand All @@ -111,6 +118,18 @@ public class TableCleanup extends AbstractJooqDao implements Task {
public static final DynamicLongProperty EVENT_AGE_LIMIT_SECONDS = ArchaiusUtil.getLong("events.purge.after.seconds");
public static final DynamicLongProperty AUDIT_LOG_AGE_LIMIT_SECONDS = ArchaiusUtil.getLong("audit_log.purge.after.seconds");
public static final DynamicLongProperty SERVICE_LOG_AGE_LIMIT_SECONDS = ArchaiusUtil.getLong("service_log.purge.after.seconds");
public static final String MOUNT_DELETE_QUERY = "delete m " +
" from mount as m " +
" join ( " +
" select mm.id " +
" from mount as mm " +
" join instance as i on mm.instance_id = i.id " +
" join volume as v on mm.volume_id = v.id " +
" where i.state = 'purged' " +
" and i.removed < ? " +
" and (v.data like '%\"isHostPath\":true%' or v.data like '%\"driver\":\"local\"%')" +
" limit ? " +
" ) mx on m.id = mx.id";

private List<CleanableTable> processInstanceTables;
private List<CleanableTable> eventTables;
Expand All @@ -133,6 +152,7 @@ public void run() {
Date otherCutoff = new Date(current - MAIN_TABLES_AGE_LIMIT_SECONDS.getValue() * SECOND_MILLIS);
cleanupLabelTables(otherCutoff);
cleanupExternalHandlerExternalHandlerProcessMapTables(otherCutoff);
cleanupMountTable(otherCutoff);

Date processInstanceCutoff = new Date(current - PROCESS_INSTANCE_AGE_LIMIT_SECONDS.get() * SECOND_MILLIS);
cleanup("process_instance", processInstanceTables, processInstanceCutoff);
Expand All @@ -147,7 +167,7 @@ public void run() {
Date serviceLogCutoff = new Date(current - SERVICE_LOG_AGE_LIMIT_SECONDS.get() * SECOND_MILLIS);
cleanup("service_log", serviceLogTables, serviceLogCutoff);

cleanup("other", otherTables, otherCutoff);
cleanup("main", otherTables, otherCutoff);
}

private void cleanupServiceEventTable(Date cutoff) {
Expand Down Expand Up @@ -249,81 +269,115 @@ private void cleanupLabelTables(Date cutoff) {
}
}

private void cleanupMountTable(Date cutoff) {
log.debug("Cleaning up mount table [cutoff={}]", cutoff);
Query q = create().query(MOUNT_DELETE_QUERY, cutoff, QUERY_LIMIT_ROWS.getValue());
int rowsAffected = 0;
int total = 0;

do {
rowsAffected = q.execute();
if (rowsAffected > 0) {
log.debug("Deleted {} unneeded rows from mount table", rowsAffected);
}
total += rowsAffected;
} while (rowsAffected > 0);

if (total > 0) {
log.debug("[Rows Deleted] mount={}", total);
}
}

@SuppressWarnings("unchecked")
private void cleanup(String name, List<CleanableTable> tables, Date cutoffTime) {
log.info("Cleaning up {} tables [cutoff={}]", name, cutoffTime);
for (CleanableTable table : tables) {
Field<Long> id = table.idField;
Field<Date> remove = table.removeField;
if (table.referenceCheckOnly) {
// This table is just for reference checking when cleaning other tables. Don't attempt to delete any rows from this table
continue;
}

ResultQuery<Record1<Long>> ids = create()
.select(id)
Field<Long> idField = table.idField;
Field<Date> removeField = table.removeField;
ResultQuery<Record1<Long>> idsToDeleteQuery = create()
.select(idField)
.from(table.table)
.where(remove.lt(cutoffTime))
.where(removeField.lt(cutoffTime)
.and(idField.gt(DSL.param(ID_GREATER_THAN_PARAM, 0L))))
.orderBy(idField)
.limit(QUERY_LIMIT_ROWS.getValue());
Param<?> idGreaterThanParam = idsToDeleteQuery.getParam(ID_GREATER_THAN_PARAM);

table.clearRowCounts();
Result<Record1<Long>> toDelete;
HashSet<Long> idsToFix = new HashSet<Long>();
HashSet<Long> idsToFilter = new HashSet<Long>();
ResultQuery<Record1<Long>> filterIds;
Result<Record1<Long>> toFilter;

if ((toDelete = ids.fetch()).size() > 0) {
List<ForeignKey<?, ?>> keys = getReferencesFrom(table, tables);
for (ForeignKey<?, ?> key : keys) {
if (key.getFields().size() > 1) {
log.error("Composite foreign key filtering unsupported");
}
}
for (ForeignKey<?, ?> key : keys) {
Table<?> referencingTable = key.getTable();
Field<Long> foreignKeyField = (Field<Long>) key.getFields().get(0);

filterIds = create().selectDistinct(foreignKeyField).from(referencingTable);
Result<Record1<Long>> idsToDeleteResult;
HashSet<Long> idsThatCantBeDeleted = new HashSet<Long>();
HashSet<Long> idsReferencedInOtherTables = new HashSet<Long>();
ResultQuery<Record1<Long>> idsReferencedQuery;

idsToDeleteResult = idsToDeleteQuery.fetch();
if (idsToDeleteResult.size() == 0) {
// No rows to remove, move on to next table
continue;
}

if ((toFilter = filterIds.fetch()).size() > 0) {
for (Record1<Long> record : toFilter) {
idsToFilter.add(record.value1());
}
}
// Build a set of ids referenced by other tables
List<ForeignKey<?, ?>> keys = getReferencesFrom(table, tables);
for (ForeignKey<?, ?> key : keys) {
if (key.getFields().size() > 1) {
log.error("Composite foreign key filtering unsupported");
}
}
for (ForeignKey<?, ?> key : keys) {
Table<?> referencingTable = key.getTable();
Field<Long> foreignKeyField = (Field<Long>) key.getFields().get(0);
idsReferencedQuery = create().selectDistinct(foreignKeyField).from(referencingTable);
for(Record1<Long> record : idsReferencedQuery.fetch()) {
idsReferencedInOtherTables.add(record.value1());
}
}

while (toDelete.size() > 0) {
while (idsToDeleteResult.size() > 0) {
// First organize rows into sets that can and cannot be deleted
HashSet<Long> idsToDelete = new HashSet<Long>();
for (Record1<Long> record : toDelete) {
if (idsToFilter.contains(record.value1())) {
idsToFix.add(record.value1());
idsToFilter.remove(record.value1());
Long id = null;
for (Record1<Long> record : idsToDeleteResult) {
id = record.value1();
if (idsReferencedInOtherTables.contains(id)) {
idsThatCantBeDeleted.add(id);
idsReferencedInOtherTables.remove(id);
}
if ((!idsToFix.contains(record.value1()))) {
idsToDelete.add(record.value1());
if (!idsThatCantBeDeleted.contains(id)) {
idsToDelete.add(id);
}
}
if (idsToDelete.size() == 0) {
break;
}
try {
table.addRowsDeleted(create()
.delete(table.table)
.where(id.in(idsToDelete))
.execute());

} catch (org.jooq.exception.DataAccessException e) {
log.info(e.getMessage());
break;
}
if (idsToFix.size() > 0) {
ids = create().select(id).from(table.table).where(remove.lt(cutoffTime)).orderBy(id)
.limit(QUERY_LIMIT_ROWS.getValue()).offset(idsToFix.size());
Long lastId = id;

// Delete rows that aren't referenced by other tables
if (idsToDelete.size() > 0) {
try {
int rowsDeleted = create()
.delete(table.table)
.where(idField.in(idsToDelete))
.execute();
table.addRowsDeleted(rowsDeleted);
log.debug("Deleted {} unneeded rows from {} table", idsToDelete.size(), table.table);
} catch (DataAccessException e) {
log.info("Problem executing cleanup query: " + e.getMessage());
break;
}
}
toDelete = ids.fetch();

// Get the next batch of rows to consider for deletion
idGreaterThanParam.setConverted(lastId);
idsToDeleteResult = idsToDeleteQuery.fetch();
}
if (idsToFix.size() > 0) {
table.addRowsSkipped(idsToFix.size());
log.info("Skipped {} where id in {}", table.table, idsToFix);

if (idsThatCantBeDeleted.size() > 0) {
table.addRowsSkipped(idsThatCantBeDeleted.size());
log.trace("Skipped {} where id in {}", table.table, idsThatCantBeDeleted);
}
}

StringBuffer buffDeleted = new StringBuffer("[Rows Deleted] ");
StringBuffer buffSkipped = new StringBuffer("[Rows Skipped] ");
boolean deletedActivity = false;
Expand All @@ -345,7 +399,6 @@ private void cleanup(String name, List<CleanableTable> tables, Date cutoffTime)
}
}

log.info("Cleanup {} tables [cutoff={}]", name, cutoffTime);
if (deletedActivity) {
log.info(buffDeleted.toString());
}
Expand Down Expand Up @@ -491,6 +544,7 @@ private static List<CleanableTable> getOtherTables() {
CleanableTable.from(TaskInstanceTable.TASK_INSTANCE),
CleanableTable.from(UserPreferenceTable.USER_PREFERENCE),
CleanableTable.from(VolumeTable.VOLUME),
CleanableTable.from(VolumeTemplateTable.VOLUME_TEMPLATE),
CleanableTable.from(VolumeStoragePoolMapTable.VOLUME_STORAGE_POOL_MAP),
// These tables are cleaned through specialized logic but we need to keep them in the "other" list so that they
// are picked up for foreign key references.
Expand All @@ -500,7 +554,10 @@ private static List<CleanableTable> getOtherTables() {
CleanableTable.from(ServiceEventTable.SERVICE_EVENT),
CleanableTable.from(ScheduledUpgradeTable.SCHEDULED_UPGRADE),
CleanableTable.from(SecretTable.SECRET),
CleanableTable.from(ZoneTable.ZONE));
CleanableTable.from(ZoneTable.ZONE),
CleanableTable.forReference(AuditLogTable.AUDIT_LOG),
CleanableTable.forReference(ServiceLogTable.SERVICE_LOG),
CleanableTable.forReference(ProcessInstanceTable.PROCESS_INSTANCE));
/* The most offending tables never set remove_time
service_event
external_handler_external_handler_process_map
Expand Down

0 comments on commit e7a5635

Please sign in to comment.