Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gh-3238: target delete accumulo #3249

Merged
merged 27 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.common.collect.Sets;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.ClientConfiguration;
Expand All @@ -42,6 +43,7 @@
import uk.gov.gchq.gaffer.accumulostore.key.exception.AccumuloElementConversionException;
import uk.gov.gchq.gaffer.accumulostore.key.exception.IteratorSettingException;
import uk.gov.gchq.gaffer.accumulostore.operation.handler.AddElementsHandler;
import uk.gov.gchq.gaffer.accumulostore.operation.handler.DeleteElementsHandler;
import uk.gov.gchq.gaffer.accumulostore.operation.handler.GenerateSplitPointsFromSampleHandler;
import uk.gov.gchq.gaffer.accumulostore.operation.handler.GetAdjacentIdsHandler;
import uk.gov.gchq.gaffer.accumulostore.operation.handler.GetAllElementsHandler;
Expand Down Expand Up @@ -105,6 +107,7 @@
import uk.gov.gchq.koryphe.iterable.ChainedIterable;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -415,8 +418,7 @@ protected OutputOperationHandler<GetTraits, Set<StoreTrait>> getGetTraitsHandler

@Override
protected OperationHandler<? extends DeleteElements> getDeleteElementsHandler() {
// TODO: implement accumulo delete handler logic
return null;
return new DeleteElementsHandler();
}

/**
Expand Down Expand Up @@ -492,6 +494,58 @@ protected void insertGraphElements(final Iterable<? extends Element> elements) t
}
}

/**
* Method to delete {@link Element}s from Accumulo.
*
* @param elements The elements to be deleted.
* @throws StoreException If there is a failure to delete elements.
*/
public void deleteElements(final Iterable<? extends Element> elements) throws StoreException {
deleteGraphElements(elements);
}

protected void deleteGraphElements(final Iterable<? extends Element> elements) throws StoreException {
// Create BatchWriter
final BatchWriter writer = TableUtils.createBatchWriter(this);
p29876 marked this conversation as resolved.
Show resolved Hide resolved
// Loop through elements, convert to mutations, and add to
// BatchWriter.as
tb06904 marked this conversation as resolved.
Show resolved Hide resolved
// The BatchWriter takes care of batching them up, sending them without
// too high a latency, etc.
if (nonNull(elements)) {
tb06904 marked this conversation as resolved.
Show resolved Hide resolved
for (final Element element : elements) {

final Pair<Key, Key> keys;
try {
keys = keyPackage.getKeyConverter().getKeysFromElement(element);
} catch (final AccumuloElementConversionException e) {
LOGGER.error(FAILED_TO_CREATE_AN_ACCUMULO_FROM_ELEMENT_OF_TYPE_WHEN_TRYING_TO_INSERT_ELEMENTS, "key", element.getGroup());
continue;
}

for (final Key key : Arrays.asList(keys.getFirst(), keys.getSecond())) {
if (nonNull(key)) {
final Mutation m = new Mutation(key.getRow());
m.putDelete(key.getColumnFamily(), key.getColumnQualifier(), key.getTimestamp());

try {
writer.addMutation(m);
} catch (final MutationsRejectedException e) {
LOGGER.error("Failed to create an accumulo key mutation");
continue;
}
}
}
}
} else {
throw new GafferRuntimeException("Could not find any elements to delete from graph.", Status.BAD_REQUEST);
}
try {
writer.close();
tb06904 marked this conversation as resolved.
Show resolved Hide resolved
} catch (final MutationsRejectedException e) {
LOGGER.warn("Accumulo batch writer failed to close", e);
}
}

/**
* Gets the {@link uk.gov.gchq.gaffer.accumulostore.key.AccumuloKeyPackage} in use by
* this AccumuloStore.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package uk.gov.gchq.gaffer.accumulostore.operation.handler;

import uk.gov.gchq.gaffer.accumulostore.AccumuloStore;
import uk.gov.gchq.gaffer.data.element.Element;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.impl.delete.DeleteElements;
import uk.gov.gchq.gaffer.store.Context;
import uk.gov.gchq.gaffer.store.Store;
import uk.gov.gchq.gaffer.store.StoreException;
import uk.gov.gchq.gaffer.store.ValidatedElements;
import uk.gov.gchq.gaffer.store.operation.handler.OperationHandler;

public class DeleteElementsHandler implements OperationHandler<DeleteElements> {

@Override
public Void doOperation(final DeleteElements operation,
p29876 marked this conversation as resolved.
Show resolved Hide resolved
final Context context, final Store store)
throws OperationException {
deleteElements(operation, (AccumuloStore) store);
return null;
}

private void deleteElements(final DeleteElements operation, final AccumuloStore store)
throws OperationException {
try {
final Iterable<? extends Element> validatedElements;
if (operation.isValidate()) {
validatedElements = new ValidatedElements(operation.getInput(), store.getSchema(),
operation.isSkipInvalidElements());
} else {
validatedElements = operation.getInput();
}
store.deleteElements(validatedElements);
} catch (final StoreException e) {
throw new OperationException("Failed to delete elements", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ public class AccumuloStoreTest {
private static final String BYTE_ENTITY_GRAPH = "byteEntityGraph";
private static final String GAFFER_1_GRAPH = "gaffer1Graph";
private static final Schema SCHEMA = Schema.fromJson(StreamUtil.schemas(AccumuloStoreTest.class));
private static final AccumuloProperties PROPERTIES = AccumuloProperties.loadStoreProperties(StreamUtil.storeProps(AccumuloStoreTest.class));
private static final AccumuloProperties CLASSIC_PROPERTIES = AccumuloProperties.loadStoreProperties(StreamUtil.openStream(AccumuloStoreTest.class, "/accumuloStoreClassicKeys.properties"));
private static final AccumuloProperties PROPERTIES = AccumuloProperties
.loadStoreProperties(StreamUtil.storeProps(AccumuloStoreTest.class));
private static final AccumuloProperties CLASSIC_PROPERTIES = AccumuloProperties.loadStoreProperties(
StreamUtil.openStream(AccumuloStoreTest.class, "/accumuloStoreClassicKeys.properties"));
private static final AccumuloStore BYTE_ENTITY_STORE = new SingleUseMiniAccumuloStore();
private static final AccumuloStore GAFFER_1_KEY_STORE = new SingleUseMiniAccumuloStore();

Expand Down Expand Up @@ -161,8 +163,12 @@ public void shouldCreateAStoreUsingGraphIdWithNamespace() throws Exception {

@Test
public void shouldBeAnOrderedStore() throws OperationException {
assertThat(BYTE_ENTITY_STORE.execute(new HasTrait.Builder().trait(StoreTrait.ORDERED).currentTraits(false).build(), new Context())).isTrue();
assertThat(GAFFER_1_KEY_STORE.execute(new HasTrait.Builder().trait(StoreTrait.ORDERED).currentTraits(false).build(), new Context())).isTrue();
assertThat(BYTE_ENTITY_STORE
.execute(new HasTrait.Builder().trait(StoreTrait.ORDERED).currentTraits(false).build(), new Context()))
.isTrue();
assertThat(GAFFER_1_KEY_STORE
.execute(new HasTrait.Builder().trait(StoreTrait.ORDERED).currentTraits(false).build(), new Context()))
.isTrue();
}

@Test
Expand Down Expand Up @@ -333,12 +339,16 @@ public void shouldHaveSupportedStoreTraits() {
assertThat(traits).isNotNull();
assertThat(traits).withFailMessage("Collection size should be 10").hasSize(10);

assertThat(traits).withFailMessage("Collection should contain INGEST_AGGREGATION trait").contains(INGEST_AGGREGATION)
assertThat(traits).withFailMessage("Collection should contain INGEST_AGGREGATION trait")
.contains(INGEST_AGGREGATION)
.withFailMessage("Collection should contain QUERY_AGGREGATION trait").contains(QUERY_AGGREGATION)
.withFailMessage("Collection should contain PRE_AGGREGATION_FILTERING trait").contains(PRE_AGGREGATION_FILTERING)
.withFailMessage("Collection should contain POST_AGGREGATION_FILTERING trait").contains(POST_AGGREGATION_FILTERING)
.withFailMessage("Collection should contain PRE_AGGREGATION_FILTERING trait")
.contains(PRE_AGGREGATION_FILTERING)
.withFailMessage("Collection should contain POST_AGGREGATION_FILTERING trait")
.contains(POST_AGGREGATION_FILTERING)
.withFailMessage("Collection should contain TRANSFORMATION trait").contains(TRANSFORMATION)
.withFailMessage("Collection should contain POST_TRANSFORMATION_FILTERING trait").contains(POST_TRANSFORMATION_FILTERING)
.withFailMessage("Collection should contain POST_TRANSFORMATION_FILTERING trait")
.contains(POST_TRANSFORMATION_FILTERING)
.withFailMessage("Collection should contain STORE_VALIDATION trait").contains(STORE_VALIDATION)
.withFailMessage("Collection should contain ORDERED trait").contains(ORDERED)
.withFailMessage("Collection should contain VISIBILITY trait").contains(VISIBILITY);
Expand Down Expand Up @@ -373,12 +383,14 @@ public void shouldFindInconsistentVertexSerialiser() throws StoreException {
// When & Then
assertThatExceptionOfType(SchemaException.class)
.isThrownBy(() -> store.preInitialise("graphId", inconsistentSchema, PROPERTIES))
.withMessage("Vertex serialiser is inconsistent. This store requires vertices to be serialised in a consistent way.");
.withMessage(
"Vertex serialiser is inconsistent. This store requires vertices to be serialised in a consistent way.");

// When & Then
assertThatExceptionOfType(SchemaException.class)
.isThrownBy(() -> store.validateSchemas())
.withMessage("Vertex serialiser is inconsistent. This store requires vertices to be serialised in a consistent way.");
.withMessage(
"Vertex serialiser is inconsistent. This store requires vertices to be serialised in a consistent way.");
}

@Test
Expand Down Expand Up @@ -486,4 +498,95 @@ public void shouldFailSchemaValidationWhenTimestampPropertyDoesNotHaveMaxAggrega
.isThrownBy(() -> store.initialise("graphId", schema, PROPERTIES))
.withMessageContaining(expectedMessage);
}

// @Test
// public void shouldDelete() throws Exception {
// // Given
// final AccumuloStore accStore = (AccumuloStore) AccumuloStore.createStore(
// "graph1",
// new Schema.Builder()
// .entity(TestGroups.ENTITY, new SchemaEntityDefinition.Builder()
// .vertex(TestTypes.ID_STRING)
// .build())
// .edge(TestGroups.EDGE, new SchemaEdgeDefinition.Builder()
// .source(TestTypes.ID_STRING)
// .destination(TestTypes.ID_STRING)
// .directed(TestTypes.DIRECTED_EITHER)
// .build())
// .type(TestTypes.ID_STRING, String.class)
// .type(TestTypes.DIRECTED_EITHER, Boolean.class)
// .build(),
// PROPERTIES);

// final Graph graph = new Graph.Builder()
// .store(accStore)
// .build();

// final Entity entityToDelete = new Builder()
// .group(TestGroups.ENTITY)
// .vertex("1")
// .build();
// final Edge edgeToDelete = new Edge.Builder()
// .group(TestGroups.EDGE)
// .source("1")
// .dest("2")
// .directed(true)
// .build();
// final Entity entityToKeep = new Builder()
// .group(TestGroups.ENTITY)
// .vertex("2")
// .build();
// final Edge edgeToKeep = new Edge.Builder()
// .group(TestGroups.EDGE)
// .source("2")
// .dest("3")
// .directed(true)
// .build();
// final List<Element> elements = Arrays.asList(
// entityToDelete,
// entityToKeep,
// edgeToDelete,
// edgeToKeep);

// graph.execute(new AddElements.Builder()
// .input(elements)
// .build(), new User());

// final Iterable<? extends Element> resultBefore = graph.execute(new GetAllElements.Builder().build(),
// new User());
// assertThat(resultBefore).hasSize(4);

// // When

// // Delete Vertex A
// final OperationChain<Void> chain = new OperationChain.Builder()
// .first(new GetElements.Builder()
// .input(new EntitySeed("1"))
// .build())
// .then(new DeleteElements())
// .build();

// graph.execute(chain, new User());

// graph.execute(new DeleteElements.Builder().input(new Builder()
// .group(TestGroups.ENTITY)
// .vertex("2")
// .build())
// .build(), new User());

// // Then
// final Iterable<? extends Element> resultsAfter = graph.execute(new GetAllElements.Builder().build(),
// new User());
// assertThat(resultsAfter)
// .asInstanceOf(InstanceOfAssertFactories.iterable(Element.class))
// .hasSize(2)
// .containsExactlyInAnyOrder(entityToKeep, edgeToKeep);

// final GetElements getElements = new GetElements.Builder()
// .input(new EntitySeed("1"))
// .build();
// final Iterable<? extends Element> getElementResults = graph.execute(getElements, new User());

// assertThat(getElementResults).isEmpty();
// }
}
Loading
Loading