-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement kafka topic create/delete api #315
feat: implement kafka topic create/delete api #315
Conversation
493b990
to
2dca2ea
Compare
044853d
to
5660915
Compare
…st; add partial unit test
…CreateDeleteTopics
…d Client(); add tests for duplicate topic creation and deleting non-existent topic
…ontroller unavailability
…ve unused ReplicationFactor field
…; extract error code for setting kafka error codes; set TopicInfosByName lowercase
5660915
to
bec6a1d
Compare
var resp kafkaprotocol.CreateTopicsResponse | ||
r, err := conn.SendRequest(&req, kafkaprotocol.APIKeyCreateTopics, 5, &resp) | ||
require.NoError(t, err) | ||
createResp, ok := r.(*kafkaprotocol.CreateTopicsResponse) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be coordinator not available?
Responses: make([]kafkaprotocol.DeleteTopicsResponseDeletableTopicResult, len(req.TopicNames)), | ||
} | ||
for tidx, topicName := range req.TopicNames { | ||
var errMsg string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be *string otherwise we are setting err message to pointer to empty string even when there is no error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment is outdated as the code has changed a bit. Please let me know if this is still needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same issue is there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it. Done.
agent/kafka_handler.go
Outdated
return common.UnknownError | ||
} | ||
|
||
func detectInvalidTopic(name string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking... it would be better to do this check in topicmeta.CreateTopic(), not here, can you move it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is outdated.
…ion; modularise CreateTopics request handler
agent/conf.go
Outdated
GroupCoordinatorConf: group.NewConf(), | ||
TxCoordinatorConf: tx.NewConf(), | ||
MaxControllerClients: DefaultMaxControllerClients, | ||
DefaultDefaultTopicRetentionTime: DefaultDefaultTopicRetentionTime, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the member called "DefaultDefaultTopicRetentionTime"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahhh...the member should be DefaultTopicRetentionTime
and the constant should be DefaultDefaultTopicRetentionTime
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the constant is the default of the default
{"Invalid special characters", "topic@", int16(kafkaprotocol.ErrorCodeInvalidTopicException)}, | ||
} | ||
for _, tc := range testCases { | ||
t.Run(tc.name, func(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's nicer if you put this in a separate method
return kafkaprotocol.ErrorCodeUnknownServerError | ||
} | ||
|
||
func isInvalidTopicName(name string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned in a previous comment, this check should be done in topicmeta.Manager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but Agent
doesn't have topicmeta.Manager
type Agent struct {
lock sync.RWMutex
cfg Conf
started bool
transportServer transport.Server
kafkaServer *kafkaserver2.KafkaServer
tablePusher *pusher.TablePusher
batchFetcher *fetcher.BatchFetcher
controller *control.Controller
controlClientCache *control.ClientCache
membership ClusterMembership
compactionWorkersService *lsm.CompactionWorkerService
partitionHashes *parthash.PartitionHashes
fetchCache *fetchcache.Cache
groupCoordinator *group.Coordinator
txCoordinator *tx.Coordinator
topicMetaCache *topicmeta.LocalCache
manifold *membershipChangedManifold
partitionLeaders map[string]map[int]map[int]int32
clusterMembershipFactory ClusterMembershipFactory
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand what you mean.
I am suggesting the check that the topic name is valid should be in topicmeta.manager.CreateTopic - do you not think that is a better place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But don't worry about this - I will do it in a later PR
Responses: make([]kafkaprotocol.DeleteTopicsResponseDeletableTopicResult, len(req.TopicNames)), | ||
} | ||
for tidx, topicName := range req.TopicNames { | ||
var errMsg string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same issue is there
Closing this. The PR is currently broken - there are merge conflicts, unaddressed issues and tests don't run. I will fix it, then submit as a new PR. |
Fixes #285