Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Handle deletion of Kafka destinations #161

Merged
merged 15 commits into from
Apr 21, 2017
Merged

Handle deletion of Kafka destinations #161

merged 15 commits into from
Apr 21, 2017

Conversation

kirg
Copy link
Contributor

@kirg kirg commented Apr 19, 2017

No description provided.

@coveralls
Copy link

Coverage Status

Changes Unknown when pulling 09eaa44 on retention-kafka into ** on master**.

@coveralls
Copy link

Coverage Status

Changes Unknown when pulling 87421fa on retention-kafka into ** on master**.

@coveralls
Copy link

Coverage Status

Changes Unknown when pulling 96cc077 on retention-kafka into ** on master**.

@coveralls
Copy link

Coverage Status

Changes Unknown when pulling f46ed74 on retention-kafka into ** on master**.

@coveralls
Copy link

Coverage Status

Changes Unknown when pulling f46ed74 on retention-kafka into ** on master**.

@coveralls
Copy link

Coverage Status

Changes Unknown when pulling f47914d on retention-kafka into ** on master**.

@coveralls
Copy link

Coverage Status

Changes Unknown when pulling de3ee03 on retention-kafka into ** on master**.

@kirg kirg requested a review from venkat1109 April 21, 2017 01:34
@kirg kirg self-assigned this Apr 21, 2017
// input/store operations and jump directly to sealing extent in metadata
if common.IsKafkaPhantomInput(event.inputID) &&
common.AreKafkaPhantomStores(event.storeIDs) {
event.state = updateMetadataState
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log this lifecyle event for kafka extent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add a comment explaining when is an extentDownEvent expected for a kafka extent.


for _, replicaStat := range extStats.GetReplicaStats() {
extInfo.storehosts = append(extInfo.storehosts, storehostID(replicaStat.GetStoreUUID()))
storehosts = append(storehosts, storehostID(replicaStat.GetStoreUUID()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the difference between the storehosts/storeUUIDs ? They both store the same set (except for the typecast)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

storeUUIDs is '[]string', while storehosts is '[]storeUUID'. i have defined separate types for store/extent/dest UUIDs -- and those are used within the main code (retention.go); it helps avoiding inadvertent mistakes like passing in one UUID for another, etc (basically, get in the advantages of being strongly typed).


extInfo := &extentInfo{
id: extentID(extent.GetExtentUUID()),
status: extStats.GetStatus(),
statusUpdatedTime: time.Unix(0, extStats.GetStatusUpdatedTimeMillis()*int64(time.Millisecond)),
storehosts: make([]storehostID, 0, len(storeUUIDs)),
storehosts: storehosts,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you just typecast like []string(storeUUIDs) instead of copying the whole set ?
this is one of the reasons I hate typedefs :(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You cannot typecast slices (container types, in general) with differing containing types -- afaik, the only way to convert is to do the copy. Yes, I know it sucks. :-)

log.WithFields(bark.Fields{
common.TagCnsmID: cgInfo.id,
common.TagErr: err,
}).Error(`computeRetention: minAckAddr GetAckLevel failed`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also print isKafkaPhantomExtent

((allHaveConsumed && softRetentionConsumed) || hardRetentionConsumed) {
((allHaveConsumed && softRetentionConsumed) ||
hardRetentionConsumed ||
ext.kafkaPhantomExtent) {

log.WithFields(bark.Fields{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

print isKafkaPhantomExtent

@venkat1109
Copy link
Contributor

Ship it after addressing comments

@coveralls
Copy link

Coverage Status

Changes Unknown when pulling 9bd1e87 on retention-kafka into ** on master**.

@kirg kirg merged commit 16b4998 into master Apr 21, 2017
@kirg kirg deleted the retention-kafka branch April 21, 2017 18:24
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants