Skip to content

Commit

Permalink
bigtable/bttest: RowKeyFilter support in emulator.
Browse files Browse the repository at this point in the history
Change-Id: Icf731374ae65551877f5b311e002607f8f5198d9
Reviewed-on: https://code-review.googlesource.com/9565
Reviewed-by: Jonathan Amsterdam <jba@google.com>
  • Loading branch information
garye committed Nov 28, 2016
1 parent 648bc87 commit e86221f
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 13 deletions.
31 changes: 30 additions & 1 deletion bigtable/bigtable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ func TestClientIntegration(t *testing.T) {
readTests := []struct {
desc string
rr RowSet
filter Filter // may be nil
filter Filter // may be nil
limit ReadOption // may be nil

// We do the read, grab all the cells, turn them into "<row>-<col>-<val>",
// sort that list, and join with a comma.
Expand Down Expand Up @@ -248,12 +249,40 @@ func TestClientIntegration(t *testing.T) {
filter: ColumnRangeFilter("follows", "h", ""),
want: "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
},
{
desc: "read with RowKeyFilter",
rr: RowRange{},
filter: RowKeyFilter(".*wash.*"),
want: "gwashington-jadams-1",
},
{
desc: "read with RowKeyFilter, no matches",
rr: RowRange{},
filter: RowKeyFilter(".*xxx.*"),
want: "",
},
{
desc: "read with FamilyFilter, no matches",
rr: RowRange{},
filter: FamilyFilter(".*xxx.*"),
want: "",
},
{
desc: "read with ColumnFilter + row limit",
rr: RowRange{},
filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson"
limit: LimitRows(2),
want: "gwashington-jadams-1,jadams-tjefferson-1",
},
}
for _, tc := range readTests {
var opts []ReadOption
if tc.filter != nil {
opts = append(opts, RowFilter(tc.filter))
}
if tc.limit != nil {
opts = append(opts, tc.limit)
}
var elt []string
err := tbl.ReadRows(context.Background(), tc.rr, func(r Row) bool {
for _, ris := range r {
Expand Down
53 changes: 41 additions & 12 deletions bigtable/bttest/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,18 @@ func (s *server) ReadRows(req *btpb.ReadRowsRequest, stream btpb.Bigtable_ReadRo
sort.Sort(byRowKey(rows))

limit := int(req.RowsLimit)
for i, r := range rows {
if limit > 0 && i >= limit {
count := 0
for _, r := range rows {
if limit > 0 && count >= limit {
return nil
}
if err := streamRow(stream, r, req.Filter); err != nil {
streamed, err := streamRow(stream, r, req.Filter)
if err != nil {
return err
}
if streamed {
count++
}
}
return nil
}
Expand All @@ -334,13 +339,17 @@ func addRows(start, end string, tbl *table, rowSet map[string]*row) {
}
}

func streamRow(stream btpb.Bigtable_ReadRowsServer, r *row, f *btpb.RowFilter) error {
// streamRow filters the given row and sends it via the given stream.
// Returns true if at least one cell matched the filter and was streamed, false otherwise.
func streamRow(stream btpb.Bigtable_ReadRowsServer, r *row, f *btpb.RowFilter) (bool, error) {
r.mu.Lock()
nr := r.copy()
r.mu.Unlock()
r = nr

filterRow(f, r)
if !filterRow(f, r) {
return false, nil
}

rrr := &btpb.ReadRowsResponse{}
for col, cells := range r.cells {
Expand All @@ -366,21 +375,22 @@ func streamRow(stream btpb.Bigtable_ReadRowsServer, r *row, f *btpb.RowFilter) e
rrr.Chunks[len(rrr.Chunks)-1].RowStatus = &btpb.ReadRowsResponse_CellChunk_CommitRow{true}
}

return stream.Send(rrr)
return true, stream.Send(rrr)
}

// filterRow modifies a row with the given filter.
func filterRow(f *btpb.RowFilter, r *row) {
// filterRow modifies a row with the given filter. Returns true if at least one cell from the row matches,
// false otherwise.
func filterRow(f *btpb.RowFilter, r *row) bool {
if f == nil {
return
return true
}
// Handle filters that apply beyond just including/excluding cells.
switch f := f.Filter.(type) {
case *btpb.RowFilter_Chain_:
for _, sub := range f.Chain.Filters {
filterRow(sub, r)
}
return
return true
case *btpb.RowFilter_Interleave_:
srs := make([]*row, 0, len(f.Interleave.Filters))
for _, sub := range f.Interleave.Filters {
Expand All @@ -399,23 +409,36 @@ func filterRow(f *btpb.RowFilter, r *row) {
for _, cs := range r.cells {
sort.Sort(byDescTS(cs))
}
return
return true
case *btpb.RowFilter_CellsPerColumnLimitFilter:
lim := int(f.CellsPerColumnLimitFilter)
for col, cs := range r.cells {
if len(cs) > lim {
r.cells[col] = cs[:lim]
}
}
return
return true
case *btpb.RowFilter_RowKeyRegexFilter:
pat := string(f.RowKeyRegexFilter)
rx, err := regexp.Compile(pat)
if err != nil {
log.Printf("Bad rowkey_regex_filter pattern %q: %v", pat, err)
return false
}
if !rx.MatchString(r.key) {
return false
}
}

// Any other case, operate on a per-cell basis.
cellCount := 0
for key, cs := range r.cells {
i := strings.Index(key, ":") // guaranteed to exist
fam, col := key[:i], key[i+1:]
r.cells[key] = filterCells(f, fam, col, cs)
cellCount += len(r.cells[key])
}
return cellCount > 0
}

func filterCells(f *btpb.RowFilter, fam, col string, cs []cell) []cell {
Expand All @@ -434,6 +457,12 @@ func includeCell(f *btpb.RowFilter, fam, col string, cell cell) bool {
}
// TODO(dsymonds): Implement many more filters.
switch f := f.Filter.(type) {
case *btpb.RowFilter_CellsPerColumnLimitFilter:
// Don't log, row-level filter
return true
case *btpb.RowFilter_RowKeyRegexFilter:
// Don't log, row-level filter
return true
default:
log.Printf("WARNING: don't know how to handle filter of type %T (ignoring it)", f)
return true
Expand Down

0 comments on commit e86221f

Please sign in to comment.