Skip to content

Commit

Permalink
No need for describe cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
akrambek committed Sep 19, 2024
1 parent f556baa commit a1275ca
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,40 +13,6 @@
# specific language governing permissions and limitations under the License.
#

connect "zilla://streams/app1"
option zilla:window 8192
option zilla:transmission "half-duplex"

write zilla:begin.ext ${kafka:beginEx()
.typeId(zilla:id("kafka"))
.request()
.describeCluster()
.includeAuthorizedOperations("false")
.build()
.build()}

connected

read zilla:begin.ext ${kafka:matchBeginEx()
.typeId(zilla:id("kafka"))
.response()
.describeCluster()
.throttle(0)
.error(0)
.clusterId("cluster-0")
.controllerId(0)
.broker()
.brokerId(1)
.host("broker1.example.com")
.port(9092)
.build()
.authorizedOperations(0)
.build()
.build()}

write close
read closed

connect "zilla://streams/app1"
option zilla:window 8192
option zilla:transmission "half-duplex"
Expand All @@ -59,7 +25,6 @@ write zilla:begin.ext ${kafka:beginEx()
.name("dev.cities")
.partitionCount(1)
.replicas(1)
.assignment(0, 1)
.config("cleanup.policy", "compact")
.build()
.timeout(30000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,39 +21,6 @@ accept ${serverAddress}

accepted

read zilla:begin.ext ${kafka:matchBeginEx()
.typeId(zilla:id("kafka"))
.request()
.describeCluster()
.includeAuthorizedOperations("false")
.build()
.build()}

connected

write zilla:begin.ext ${kafka:beginEx()
.typeId(zilla:id("kafka"))
.response()
.describeCluster()
.throttle(0)
.error(0)
.clusterId("cluster-0")
.controllerId(0)
.broker()
.brokerId(1)
.host("broker1.example.com")
.port(9092)
.build()
.authorizedOperations(0)
.build()
.build()}
write flush

read closed
write close

accepted

read zilla:begin.ext ${kafka:matchBeginEx()
.typeId(zilla:id("kafka"))
.request()
Expand All @@ -62,7 +29,6 @@ read zilla:begin.ext ${kafka:matchBeginEx()
.name("dev.cities")
.partitionCount(1)
.replicas(1)
.assignment(0, 1)
.config("cleanup.policy", "compact")
.build()
.timeout(30000)
Expand Down
2 changes: 1 addition & 1 deletion incubator/binding-pgsql-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
</licenses>

<properties>
<jacoco.coverage.ratio>0.83</jacoco.coverage.ratio>
<jacoco.coverage.ratio>0.82</jacoco.coverage.ratio>
<jacoco.missed.count>0</jacoco.missed.count>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.types.stream.ExtensionFW;
import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.types.stream.FlushFW;
import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.types.stream.KafkaBeginExFW;
import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.types.stream.KafkaDescribeClusterResponseBeginExFW;
import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.types.stream.PgsqlBeginExFW;
import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.types.stream.PgsqlDataExFW;
import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.types.stream.PgsqlFlushExFW;
Expand Down Expand Up @@ -226,9 +225,7 @@ private final class PgsqlProxy
private final String database;
private final PgsqlKafkaBindingConfig binding;
private final KafkaCreateTopicsProxy createTopicsProxy;
private final KafkaDescribeClusterProxy describeClusterProxy;

private final List<Integer> brokers;
private final IntArrayQueue queries;

private final long initialId;
Expand Down Expand Up @@ -275,10 +272,8 @@ private PgsqlProxy(
String dbValue = parameters.get("database\u0000");
this.database = dbValue.substring(0, dbValue.length() - 1);
this.queries = new IntArrayQueue();
this.brokers = new ArrayList<>();

this.createTopicsProxy = new KafkaCreateTopicsProxy(routedId, resolvedId, this);
this.describeClusterProxy = new KafkaDescribeClusterProxy(routedId, resolvedId, this);
}

private void onAppMessage(
Expand Down Expand Up @@ -686,7 +681,6 @@ private void cleanup(
long traceId,
long authorization)
{
describeClusterProxy.doKafkaAbortAndReset(traceId, authorization);
createTopicsProxy.doKafkaAbortAndReset(traceId, authorization);

doAppAbortAndReset(traceId, authorization);
Expand Down Expand Up @@ -974,7 +968,6 @@ private void doKafkaBegin(
initialMax = delegate.initialMax;
state = PgsqlKafkaState.openingInitial(state);

final int numBrokers = delegate.brokers.size();
final int partitionCount = config.kafkaCreateTopicsPartitionCount();

final KafkaBeginExFW kafkaBeginEx =
Expand All @@ -985,18 +978,8 @@ private void doKafkaBegin(
.topics(ct ->
topics.forEach(t -> ct.item(i -> i
.name(t)
.partitionCount(1)
.partitionCount(partitionCount)
.replicas(config.kafkaCreateTopicsReplicas())
.assignments(a -> a
.item(ai ->
{
for (int p = 0; p < partitionCount; p++)
{
int brokerIndex = p % numBrokers;
int brokerId = delegate.brokers.get(brokerIndex);
ai.partitionId(p).leaderId(brokerId);
}
}))
.configs(cf -> cf
.item(ci -> ci.name("cleanup.policy").value(deletionPolicy))))))
.timeout(config.kafkaTopicRequestTimeoutMs())
Expand Down Expand Up @@ -1048,74 +1031,6 @@ protected void onKafkaBegin(
}
}

private final class KafkaDescribeClusterProxy extends KafkaProxy
{
private KafkaDescribeClusterProxy(
long originId,
long routedId,
PgsqlProxy delegate)
{
super(originId, routedId, delegate);
}

protected void doKafkaBegin(
long traceId,
long authorization)
{
state = PgsqlKafkaState.openingInitial(state);

final KafkaBeginExFW kafkaBeginEx =
kafkaBeginExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(kafkaTypeId)
.request(r -> r
.describeCluster(d -> d.includeAuthorizedOperations(0)))
.build();

kafka = newKafkaConsumer(this::onKafkaMessage, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization, 0, kafkaBeginEx);
}

@Override
protected void onKafkaBegin(
BeginFW begin)
{
final long sequence = begin.sequence();
final long acknowledge = begin.acknowledge();
final long traceId = begin.traceId();
final long authorization = begin.authorization();
final OctetsFW extension = begin.extension();

assert acknowledge <= sequence;
assert sequence >= replySeq;
assert acknowledge >= replyAck;

replySeq = sequence;
replyAck = acknowledge;
state = PgsqlKafkaState.openingReply(state);

assert replyAck <= replySeq;

final ExtensionFW beginEx = extension.get(extensionRO::tryWrap);
final KafkaBeginExFW kafkaBeginEx =
beginEx != null && beginEx.typeId() == kafkaTypeId ? extension.get(kafkaBeginExRO::tryWrap) : null;

final KafkaDescribeClusterResponseBeginExFW describeCluster = kafkaBeginEx.response().describeCluster();

if (describeCluster.error() == NO_ERROR)
{
describeCluster.brokers().forEach(b -> delegate.brokers.add(b.brokerId()));
delegate.onKafkaDescribeClusterBegin(traceId, authorization);

doKafkaWindow(traceId, authorization);
doKafkaEnd(traceId, authorization);
}
else
{
delegate.cleanup(traceId, authorization);
}
}
}

private void doBegin(
final MessageConsumer receiver,
final long originId,
Expand Down Expand Up @@ -1328,15 +1243,11 @@ private void decodeCreateTopicCommand(
int offset,
int length)
{
if (server.commandsProcessed == 2)
if (server.commandsProcessed == 1)
{
server.onCommandCompleted(traceId, authorization, length, PgsqlKafkaCompletionCommand.CREATE_TOPIC_COMMAND);
}
else if (server.commandsProcessed == 0)
{
server.describeClusterProxy.doKafkaBegin(traceId, authorization);
}
else if (server.commandsProcessed == 1)
{
final CreateTable statement = (CreateTable) parseStatement(buffer, offset, length);
final String topic = statement.getTable().getName();
Expand Down

0 comments on commit a1275ca

Please sign in to comment.