forked from apache/fluo
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Generate summaries of Fluo data apache#1054
This is a partial step for apache#1054. I would like to make the summary data available before making any decisions about how to use it for compaction decsions.
- Loading branch information
1 parent
282b94e
commit c53d3b3
Showing
7 changed files
with
271 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
96 changes: 96 additions & 0 deletions
96
modules/accumulo/src/main/java/org/apache/fluo/accumulo/summarizer/FluoCollector.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license | ||
* agreements. See the NOTICE file distributed with this work for additional information regarding | ||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance with the License. You may obtain a | ||
* copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License | ||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express | ||
* or implied. See the License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package org.apache.fluo.accumulo.summarizer; | ||
|
||
import org.apache.accumulo.core.client.summary.Summarizer.Collector; | ||
import org.apache.accumulo.core.client.summary.Summarizer.StatisticConsumer; | ||
import org.apache.accumulo.core.data.Key; | ||
import org.apache.accumulo.core.data.Value; | ||
import org.apache.fluo.accumulo.util.ColumnType; | ||
import org.apache.fluo.accumulo.util.NotificationUtil; | ||
import org.apache.fluo.accumulo.util.ReadLockUtil; | ||
|
||
public class FluoCollector implements Collector { | ||
|
||
private long ntfy = 0; | ||
private long ntfyDel = 0; | ||
private long txDone = 0; | ||
private long delLock = 0; | ||
private long lock = 0; | ||
private long data = 0; | ||
private long write = 0; | ||
private long ack = 0; | ||
private long delrlock = 0; | ||
private long rlock = 0; | ||
|
||
@Override | ||
public void accept(Key k, Value v) { | ||
|
||
if (NotificationUtil.isNtfy(k)) { | ||
if (NotificationUtil.isDelete(k)) { | ||
ntfyDel++; | ||
} else { | ||
ntfy++; | ||
} | ||
|
||
} else { | ||
ColumnType colType = ColumnType.from(k); | ||
switch (colType) { | ||
case TX_DONE: | ||
txDone++; | ||
break; | ||
case DEL_LOCK: | ||
delLock++; | ||
break; | ||
case LOCK: | ||
lock++; | ||
break; | ||
case DATA: | ||
data++; | ||
break; | ||
case WRITE: | ||
write++; | ||
break; | ||
case ACK: | ||
ack++; | ||
break; | ||
case RLOCK: | ||
if (ReadLockUtil.isDelete(k.getTimestamp())) { | ||
delrlock++; | ||
} else { | ||
rlock++; | ||
} | ||
break; | ||
default: | ||
throw new IllegalArgumentException("Unknown column type : " + colType); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void summarize(StatisticConsumer sc) { | ||
sc.accept("ntfy", ntfy); | ||
sc.accept("ntfyDel", ntfyDel); | ||
sc.accept("txDone", txDone); | ||
sc.accept("delLock", delLock); | ||
sc.accept("lock", lock); | ||
sc.accept("data", data); | ||
sc.accept("write", write); | ||
sc.accept("ack", ack); | ||
sc.accept("delrlock", delrlock); | ||
sc.accept("rlock", rlock); | ||
} | ||
} |
76 changes: 76 additions & 0 deletions
76
modules/accumulo/src/main/java/org/apache/fluo/accumulo/summarizer/FluoSummarizer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license | ||
* agreements. See the NOTICE file distributed with this work for additional information regarding | ||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance with the License. You may obtain a | ||
* copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License | ||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express | ||
* or implied. See the License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package org.apache.fluo.accumulo.summarizer; | ||
|
||
import java.util.Map; | ||
|
||
import com.google.common.base.Preconditions; | ||
import org.apache.accumulo.core.client.summary.Summarizer; | ||
import org.apache.accumulo.core.client.summary.SummarizerConfiguration; | ||
import org.apache.accumulo.core.client.summary.Summary; | ||
|
||
public class FluoSummarizer implements Summarizer { | ||
|
||
public static final SummarizerConfiguration CONFIG = | ||
SummarizerConfiguration.builder(FluoSummarizer.class).setPropertyId("fluo").build(); | ||
|
||
@Override | ||
public Collector collector(SummarizerConfiguration sc) { | ||
return new FluoCollector(); | ||
} | ||
|
||
@Override | ||
public Combiner combiner(SummarizerConfiguration sc) { | ||
return (m1, m2) -> m2.forEach((k, v) -> m1.merge(k, v, Long::sum)); | ||
} | ||
|
||
public static class Counts { | ||
|
||
public final long ntfy; | ||
public final long ntfyDel; | ||
public final long txDone; | ||
public final long delLock; | ||
public final long lock; | ||
public final long data; | ||
public final long write; | ||
public final long ack; | ||
public final long delrlock; | ||
public final long rlock; | ||
|
||
public Counts(long ntfy, long ntfyDel, long txDone, long delLock, long lock, long data, | ||
long write, long ack, long delrlock, long rlock) { | ||
this.ntfy = ntfy; | ||
this.ntfyDel = ntfyDel; | ||
this.txDone = txDone; | ||
this.delLock = delLock; | ||
this.lock = lock; | ||
this.data = data; | ||
this.write = write; | ||
this.ack = ack; | ||
this.delrlock = delrlock; | ||
this.rlock = rlock; | ||
} | ||
} | ||
|
||
public static Counts getCounts(Summary summary) { | ||
Preconditions.checkArgument( | ||
summary.getSummarizerConfiguration().getClassName().equals(FluoSummarizer.class.getName())); | ||
Map<String, Long> m = summary.getStatistics(); | ||
return new Counts(m.get("ntfy"), m.get("ntfyDel"), m.get("txDone"), m.get("delLock"), | ||
m.get("lock"), m.get("data"), m.get("write"), m.get("ack"), m.get("delrlock"), | ||
m.get("rlock")); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
77 changes: 77 additions & 0 deletions
77
modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/SummaryIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license | ||
* agreements. See the NOTICE file distributed with this work for additional information regarding | ||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance with the License. You may obtain a | ||
* copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License | ||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express | ||
* or implied. See the License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package org.apache.fluo.integration.impl; | ||
|
||
import java.util.List; | ||
|
||
import org.apache.accumulo.core.client.summary.Summary; | ||
import org.apache.fluo.accumulo.summarizer.FluoSummarizer; | ||
import org.apache.fluo.accumulo.summarizer.FluoSummarizer.Counts; | ||
import org.apache.fluo.api.client.Transaction; | ||
import org.apache.fluo.api.data.Column; | ||
import org.apache.fluo.integration.ITBaseImpl; | ||
import org.junit.Test; | ||
|
||
import static org.junit.Assert.assertEquals; | ||
|
||
public class SummaryIT extends ITBaseImpl { | ||
|
||
@Test | ||
public void testSummaries() throws Exception { | ||
try (Transaction tx = client.newTransaction()) { | ||
String seen = tx.withReadLock().gets("u:http://wikipedia.com/abc", new Column("doc", "seen")); | ||
if (seen == null) { | ||
tx.set("d:7705", new Column("doc", "source"), "http://wikipedia.com/abc"); | ||
} | ||
tx.commit(); | ||
} | ||
|
||
List<Summary> summaries = aClient.tableOperations().summaries(table).flush(true).retrieve(); | ||
|
||
Counts counts = FluoSummarizer.getCounts(summaries.get(0)); | ||
|
||
assertEquals(0, counts.ack); | ||
assertEquals(1, counts.data); | ||
assertEquals(0, counts.delLock); | ||
assertEquals(1, counts.delrlock); | ||
assertEquals(0, counts.lock); | ||
assertEquals(0, counts.ntfy); | ||
assertEquals(0, counts.ntfyDel); | ||
assertEquals(0, counts.rlock); | ||
assertEquals(1, counts.txDone); | ||
assertEquals(1, counts.write); | ||
|
||
try (Transaction tx = client.newTransaction()) { | ||
tx.set("d:7705", new Column("doc", "source"), "http://wikipedia.com/abcd"); | ||
tx.commit(); | ||
} | ||
|
||
summaries = aClient.tableOperations().summaries(table).flush(true).retrieve(); | ||
|
||
counts = FluoSummarizer.getCounts(summaries.get(0)); | ||
|
||
assertEquals(0, counts.ack); | ||
assertEquals(2, counts.data); | ||
assertEquals(0, counts.delLock); | ||
assertEquals(1, counts.delrlock); | ||
assertEquals(0, counts.lock); | ||
assertEquals(0, counts.ntfy); | ||
assertEquals(0, counts.ntfyDel); | ||
assertEquals(0, counts.rlock); | ||
assertEquals(2, counts.txDone); | ||
assertEquals(2, counts.write); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters