-
Notifications
You must be signed in to change notification settings - Fork 799
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat/blocks #1695
Feat/blocks #1695
Conversation
2c5ef1b
to
11deb9d
Compare
Thanks for the PR! Could you comment on the relationship to #1427 which seems similar from the description? |
I had not seen that draft before. Will delve into that and see what the relationship is. |
@bboreham it looks like #1427 is similar in goals to this. It's hard to know for certain since there's a lot undone on that draft, but it looks like the original idea was to attempt to make tsdb multi-tenant, whereas this PR leaves it single tenant, but creates them for each user (which was later suggested in the comments in that PR) It also appears to deal with metrics on a per-chunk level as well, where this PR handles them in block format. There is plenty of future work for this, such as user limits, shipping blocks to non-s3 chunk stores etc. but I think it's a working starting point that can be built on that seems to satisfy the high-level goal of #1427 |
2c9b399
to
4eda36b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments:
- Looks like only S3 is supported, how hard would it be to add other storages (GCS in particular).
- I see a huge scalability bottleneck in the querier as every querier will need to have all the block metadata for all the users. It is memory intensive and we should be ideally able to shard on it. I don't have a good solution here.
Other than this, I'll need to test this all out :) Would be super helpful if you could share the manifests for running a small cluster on DOK8S.
return nil | ||
} | ||
|
||
func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context, *store.BucketStore) error) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll hit this quite quickly in a multi-tenant context: thanos-io/thanos#1471
Further, I don't see a good way to horizontally scale this as there is no sharding and all queriers should have all the blocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea make sense. I wonder if we could do something like sharding users to a stateful set of queriers, so only some querier have a given user. This problem is definitely food for thought.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me add some math. Scenario:
- 1K tenants (reasonably low)
- 14 months retention (approx. 31 days / month)
- 2 hours TSDB block range
Total number of blocks per tenant: 14 * 31 * (24 / 2) = 5.2M blocks
Total number of API calls required to list all the blocks (assuming 1000 max items in the response): 5.2M / 1000 = 5200
The initial sync of a querier needs to download the meta.json
file for each block for each tenant, which means 5.2M calls to the object store, which looks not feasible to me.
A possible alternative would be index blocks by time range and implement it in a fully lazy way. We may prepend each block name stored on S3 with the min timestamp of that block (using a timestamp format which guarantees alphabetical ordering).
Given a query for the time range [A,B], we need to sync only the blocks whose min timestamp is [A-delta,B] where delta
is the max block range allowed by the system. The querier may have a config options to initially sync only the last X hours of blocks (assuming most queries are against recent data), while older blocks metadata will be lazily fetched.
Moreover, the periodic sync to find new blocks wouldn't require to iterate over all blocks of a tenant, but we could set the "StartAfter" param (of S3 ListBucket API) to the last min timestamp synched - a delta.
Does it make any sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
S3-based indexing is just a trick to avoid introducing another dependency. Indexes on Bigtable (like it work for Cortex chunks today) would serve the same purpose in a probably cleaner way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally makes sense. I do personally think this is still an optimization for after merging this PR, as this change is already large. Blocks are already sorted by ULID so we would have time ordering already so I think we could already lazy sync on that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you view this as a blocker for the initial feature merge?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a blocker for sure. I was sharing thoughts on possible evolutions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi 👋
Yup, in Thanos key thing is that blocks have to be compacted (up to 2w) somewhere so you need to avoid that many blocks. Plus right now, at last on Thanos Store and Compactor you can shard the reads using relabelling, so it can be horizontally scaled.
The only problem might be a single bucket and our flat block structure - even with sharding we still iterate over all blocks when syncing. StartAfter
logic sounds good indeed, however, this is not sufficient as blocks (at least for Thanos case) can be compacted/removed so we need something that scans if block is still there. We could do that in a lazy way as well in theory. Something that would make sense is maybe some prefix support, so we can upload/read blocks from different subdirs of object store - and the prefix could be external labels (I hope this is how you recognize tenants as well?). (:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.. which is discussed here: thanos-io/thanos#1318 Please join the discussion if you have ideas that will work for you as well! (:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see where we load the WAL after the transfer is done. I could see the WAL loading taking sometime too and I don't have a good idea on what to do in ~10mins it takes to load the WAL.
But beyond these, I think the PR is super close. Thanks for all the work on this!
return nil | ||
} | ||
|
||
// Close all user databases |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So while the ingester is in LEAVING
it'll continue to receive queries. We shouldn't close the dbs afaics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As for the first comment. The WAL is replayed when the tsdb is opened for a given user, which may not work for large WALs. I've been using 10m blocks in my testing and that particular issue didn't become a problem, though I could see how it might.
Good call on not closing the dbs. Do you know if it's safe to copy out a tsdb before it's been closed? Maybe the better idea here is to snapshot it and leave it open for queries. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know if it's safe to copy out a tsdb before it's been closed?
I'm not sure. I checked a bit the code, and the Close()
also ensures that the WAL is fsync-ed. Without closing it, we may have no guarantee that the last samples have been synched to disk.
Would be possible to reverse the problem, and not allow a LEAVING
ingester to serve queries when the storage is TSDB?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of this PR is its own branch of logic, so it is non-impactful to any existing use of s3. I'm leaning towards approve pending responses to a select few comments here.
I am really interested in the perf at scale, but much of that can only be found with load testing and observation.
@bboreham since you are on the list for 0.4 release manager, would this be something you want to be merged for the release, waited until post 0.4... or do we want to mark this as a "beta/experimental" feature?
pkg/ingester/ingester.go
Outdated
@@ -138,6 +156,16 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { | |||
f.BoolVar(&cfg.SpreadFlushes, "ingester.spread-flushes", false, "If true, spread series flushes across the whole period of MaxChunkAge") | |||
f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", 50, "Number of concurrent goroutines flushing to dynamodb.") | |||
f.DurationVar(&cfg.RateUpdatePeriod, "ingester.rate-update-period", 15*time.Second, "Period with which to update the per-user ingestion rates.") | |||
|
|||
// Prometheus 2.x block storage | |||
f.BoolVar(&cfg.V2, "ingester.v2", false, "If true, use Prometheus block storage for metrics") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC it was covered in the community call that we need a migration path for existing deployments via the storage schema config, but not block on this just for that. This is a reminder comment to open an issue for this once blocks are merged.
pkg/ingester/ingester.go
Outdated
|
||
var bkt *s3.Bucket | ||
s3Cfg := s3.Config{ | ||
Bucket: cfg.S3Bucket, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is referencing the thanos package, so is there a potential to run into the service limits at scale, that prompted the work in #1625?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can definitely hit the same API limitation of S3 providers (and has in my testing). I'm currently working on a follow-on PR to shard across buckets as in #1625. It's mostly straight forward except for the SyncBlocks
func which iterates over a bucket.
pkg/ingester/transfer.go
Outdated
} | ||
filesXfer++ | ||
|
||
// TODO(thor) To avoid corruption from errors, it's probably best to write to a temp dir, and then move that to the final location |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@thorfour was this something you intended to implement before merging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't plan to no, was trying to keep changes in a single PR smaller. I was going to do that as a follow-on PR. But if it's seen as a blocker to merge I'm happy to complete it in this.
34cd5c1
to
09ed472
Compare
I have cut the RC for 0.3 today, so no I don't want any last-minute merges except fixing serious bugs. |
Do we really want to call this |
72ddad1
to
9096870
Compare
For the GCS support I believe we'd create a different objstore.Bucket client that implements that interface. As well as objstore.BucketReader for the querier. Just wasn't on my priority list. I've uploaded a k8s gist: That should create everything needed in the cluster, including grafana preconfigured. Let me know if you run into problems with it. |
bc1c78c
to
878bcf0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You did a great job @thorfour. I've approached Cortex only recently, and my comments may miss something, but I would be glad if you could take a look.
return nil | ||
} | ||
|
||
func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context, *store.BucketStore) error) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me add some math. Scenario:
- 1K tenants (reasonably low)
- 14 months retention (approx. 31 days / month)
- 2 hours TSDB block range
Total number of blocks per tenant: 14 * 31 * (24 / 2) = 5.2M blocks
Total number of API calls required to list all the blocks (assuming 1000 max items in the response): 5.2M / 1000 = 5200
The initial sync of a querier needs to download the meta.json
file for each block for each tenant, which means 5.2M calls to the object store, which looks not feasible to me.
A possible alternative would be index blocks by time range and implement it in a fully lazy way. We may prepend each block name stored on S3 with the min timestamp of that block (using a timestamp format which guarantees alphabetical ordering).
Given a query for the time range [A,B], we need to sync only the blocks whose min timestamp is [A-delta,B] where delta
is the max block range allowed by the system. The querier may have a config options to initially sync only the last X hours of blocks (assuming most queries are against recent data), while older blocks metadata will be lazily fetched.
Moreover, the periodic sync to find new blocks wouldn't require to iterate over all blocks of a tenant, but we could set the "StartAfter" param (of S3 ListBucket API) to the last min timestamp synched - a delta.
Does it make any sense?
Signed-off-by: Thor <thansen@digitalocean.com>
Signed-off-by: Thor <thansen@digitalocean.com>
Signed-off-by: Thor <thansen@digitalocean.com>
Signed-off-by: Thor <thansen@digitalocean.com>
Signed-off-by: Thor <thansen@digitalocean.com>
Signed-off-by: Thor <thansen@digitalocean.com>
Signed-off-by: Thor <thansen@digitalocean.com>
Signed-off-by: Thor <thansen@digitalocean.com>
Signed-off-by: Thor <thansen@digitalocean.com>
Signed-off-by: Thor <thansen@digitalocean.com>
Signed-off-by: Thor <thansen@digitalocean.com>
Signed-off-by: Thor <thansen@digitalocean.com>
Signed-off-by: Thor <thansen@digitalocean.com>
Signed-off-by: Thor <thansen@digitalocean.com>
Signed-off-by: Thor <thansen@digitalocean.com>
Signed-off-by: Thor <thansen@digitalocean.com>
Signed-off-by: Thor <thansen@digitalocean.com>
Signed-off-by: Thor <thansen@digitalocean.com>
Signed-off-by: Thor <thansen@digitalocean.com>
Signed-off-by: Thor <thansen@digitalocean.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! I think we have a good starting point and can iterate on this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Goutham, this LGTM for merging and further iterations.
The motivation behind this PR was to be able to store larger sized objects into the S3 storage backend. It leverages a lot of code from the thanos project to accomplish that.
This implements ingester storage and s3 storage using the tsdb blocks and thanos shipping to s3.
Each user has a tsdb opened up under their userID, and a shipper that periodically scans that directory and uploads to s3.
Querier creates a block querier that syncs against s3 to perform the long-retention queries.