Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for using Oxia as the metadata store for Pulsar and BookKeeper #544

Merged
merged 68 commits into from
Nov 22, 2024

Conversation

yuweisung
Copy link
Contributor

Feature: add oxia template

Motivation

Adding oxia templates in helmchart.

Modifications

Just add the template with the correct variables.
The oxia component is disabled in default values.yaml.

Verifying this change

  • Make sure that the change passes the CI checks.

@yuweisung
Copy link
Contributor Author

next step: disable zk and modify the init script to integrate pulsar with oxia.

@yuweisung
Copy link
Contributor Author

yuweisung commented Nov 6, 2024

what's done:

  1. add oxia templates from streamnative.
  2. add oxia components in Values.yaml
  3. modify the oxia templates to fit Values.yaml
  4. verify the template by "helm template test1 ." locally.

@yuweisung yuweisung changed the title Add-oxia-template feature: add-oxia-template Nov 6, 2024
@yuweisung yuweisung changed the title feature: add-oxia-template feat: add-oxia-template Nov 6, 2024
@yuweisung yuweisung changed the title feat: add-oxia-template feat: add oxia template Nov 6, 2024
@lhotari
Copy link
Member

lhotari commented Nov 21, 2024

@merlimat what are the recommended default Oxia namespaces for brokers and bookies? Is it simply broker and bookkeeper?

Yes, it should be a good start, with 3 shards each.

What command is needed for creating Oxia namespaces?

The namespaces are automatically created by updating coordinator config-map.

eg: https://github.com/streamnative/oxia/blob/main/deploy/charts/oxia-cluster/templates/coordinator-configmap.yaml#L23-L26

@merlimat I've made changes accordingly.

I can see that Oxia is working, however there are problems.

│ Name: pulsar-ci-oxia-coordinator-status │
│ Namespace: pulsar │
│ Labels: │
│ Annotations: │
│ │
│ Data │
│ ==== │
│ status: │
│ ---- │
│ namespaces: │
│ bookkeeper: │
│ replicationFactor: 1 │
│ shards: │
│ 1: │
│ status: 1 │
│ term: 13 │
│ leader: │
│ public: pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648 │
│ internal: pulsar-ci-oxia-svc.pulsar.svc:6649 │
│ ensemble: │
│ - public: pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648 │
│ internal: pulsar-ci-oxia-svc.pulsar.svc:6649 │
│ removedNodes: [] │
│ int32HashRange: │
│ min: 0 │
│ max: 4294967295 │
│ broker: │
│ replicationFactor: 1 │
│ shards: │
│ 0: │
│ status: 1 │
│ term: 13 │
│ leader: │
│ public: pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648 │
│ internal: pulsar-ci-oxia-svc.pulsar.svc:6649 │
│ ensemble: │
│ - public: pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648 │
│ internal: pulsar-ci-oxia-svc.pulsar.svc:6649 │
│ removedNodes: [] │
│ int32HashRange: │
│ min: 0 │
│ max: 4294967295 │
│ shardIdGenerator: 2 │
│ serverIdx: 0

The problem is that the bookkeeper init job fails. the last error message in logs is:

2024-11-21T12:58:53,552+0000 [oxia-client-1-1] INFO  io.streamnative.oxia.client.shard.ShardManager - Retry creating stream for shard assignments namespace=bookkeeper
2024-11-21T12:58:53,555+0000 [grpc-default-worker-ELG-2-1] WARN  io.streamnative.oxia.client.shard.ShardManager - Failed receiving shard assignments.
io.grpc.StatusRuntimeException: UNIMPLEMENTED: unknown service io.streamnative.oxia.proto.OxiaClient
    at io.grpc.Status.asRuntimeException(Status.java:539) ~[io.grpc-grpc-api-1.56.1.jar:1.56.1]
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:491) ~[io.grpc-grpc-stub-1.56.1.jar:1.56.1]
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.SerializeReentrantCallsDirectExecutor.execute(SerializeReentrantCallsDirectExecutor.java:49) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.closedInternal(ClientCallImpl.java:743) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.closed(ClientCallImpl.java:683) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.RetriableStream$4.run(RetriableStream.java:830) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:94) ~[io.grpc-grpc-api-1.56.1.jar:1.56.1]
    at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:126) ~[io.grpc-grpc-api-1.56.1.jar:1.56.1]
    at io.grpc.internal.RetriableStream.safeCloseMasterListener(RetriableStream.java:825) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.RetriableStream.access$2200(RetriableStream.java:55) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.RetriableStream$Sublistener.closed(RetriableStream.java:1018) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.ForwardingClientStreamListener.closed(ForwardingClientStreamListener.java:34) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.InternalSubchannel$CallTracingTransport$1$1.closed(InternalSubchannel.java:691) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.AbstractClientStream$TransportState.closeListener(AbstractClientStream.java:458) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.AbstractClientStream$TransportState.access$400(AbstractClientStream.java:221) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.AbstractClientStream$TransportState$1.run(AbstractClientStream.java:441) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.AbstractClientStream$TransportState.deframerClosed(AbstractClientStream.java:278) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.Http2ClientStreamTransportState.deframerClosed(Http2ClientStreamTransportState.java:31) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.MessageDeframer.close(MessageDeframer.java:234) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.MessageDeframer.closeWhenComplete(MessageDeframer.java:192) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.AbstractStream$TransportState.closeDeframer(AbstractStream.java:201) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.AbstractClientStream$TransportState.transportReportStatus(AbstractClientStream.java:444) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.AbstractClientStream$TransportState.transportReportStatus(AbstractClientStream.java:400) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.AbstractClientStream$TransportState.inboundTrailersReceived(AbstractClientStream.java:383) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.Http2ClientStreamTransportState.transportTrailersReceived(Http2ClientStreamTransportState.java:183) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.grpc.netty.NettyClientStream$TransportState.transportHeadersReceived(NettyClientStream.java:334) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler.onHeadersRead(NettyClientHandler.java:379) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler.access$1200(NettyClientHandler.java:93) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler$FrameListener.onHeadersRead(NettyClientHandler.java:936) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:409) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:337) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onHeadersRead(Http2InboundFrameLogger.java:56) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$2.processFragment(DefaultHttp2FrameReader.java:476) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readHeadersFrame(DefaultHttp2FrameReader.java:484) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:253) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:159) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:173) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:393) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:453) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at java.base/java.lang.Thread.run(Unknown Source) [?:?]

in conf/bookkeeper.conf there's

metadataServiceUri=metadata-store:oxia://pulsar-ci-oxia-svc:6649/bookkeeper

Connection works from the pod:

pulsar-ci-bookie-init-gz88q:/pulsar$ nc -v -z pulsar-ci-oxia-svc 6649
pulsar-ci-oxia-svc (10.244.1.20:6649) open

I wonder how the problem can be fixed? /cc @mattisonchao

@lhotari
Copy link
Member

lhotari commented Nov 21, 2024

Last logs on coordinator:

│ {"level":"info","time":"2024-11-21T12:38:36.001675891Z","component":"shard-controller","namespace":"bookkeeper","shard":1,"term":13,"time":"2024-11-21T12:38:36.001711227Z","message":"Starting leader election"}                                                                                                                                                              │
│ {"level":"info","time":"2024-11-21T12:38:36.033715641Z","component":"shard-controller","entry-id":{"term":"-1", "offset":"-1"},"namespace":"bookkeeper","server-address":{"public":"pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648","internal":"pulsar-ci-oxia-svc.pulsar.svc:6649"},"shard":1,"time":"2024-11-21T12:38:36.033813915Z","message":"Processed newTerm response │
│ {"level":"info","time":"2024-11-21T12:38:36.033840474Z","component":"shard-controller","followers":[],"namespace":"bookkeeper","new-leader":{"public":"pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648","internal":"pulsar-ci-oxia-svc.pulsar.svc:6649"},"shard":1,"term":13,"time":"2024-11-21T12:38:36.033903832Z","message":"Successfully moved ensemble to a new term"}   │
│ {"level":"info","time":"2024-11-21T12:38:36.04050441Z","component":"shard-controller","leader":{"public":"pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648","internal":"pulsar-ci-oxia-svc.pulsar.svc:6649"},"namespace":"bookkeeper","shard":1,"term":13,"time":"2024-11-21T12:38:36.040516162Z","message":"Elected new leader"}                                              │
│ {"level":"info","time":"2024-11-21T12:38:36.040532943Z","component":"shard-controller","leader":{"public":"pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648","internal":"pulsar-ci-oxia-svc.pulsar.svc:6649"},"namespace":"bookkeeper","shard":1,"time":"2024-11-21T12:38:36.040539566Z","message":"Shard is ready"}                                                           │
│ {"level":"info","time":"2024-11-21T12:38:37.967300662Z","component":"shard-controller","namespace":"broker","shard":0,"term":13,"time":"2024-11-21T12:38:37.967319758Z","message":"Starting leader election"}                                                                                                                                                                  │
│ {"level":"info","time":"2024-11-21T12:38:37.99129862Z","component":"shard-controller","entry-id":{"term":"-1", "offset":"-1"},"namespace":"broker","server-address":{"public":"pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648","internal":"pulsar-ci-oxia-svc.pulsar.svc:6649"},"shard":0,"time":"2024-11-21T12:38:37.991327354Z","message":"Processed newTerm response"}    │
│ {"level":"info","time":"2024-11-21T12:38:37.991407553Z","component":"shard-controller","followers":[],"namespace":"broker","new-leader":{"public":"pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648","internal":"pulsar-ci-oxia-svc.pulsar.svc:6649"},"shard":0,"term":13,"time":"2024-11-21T12:38:37.991421479Z","message":"Successfully moved ensemble to a new term"}       │
│ {"level":"info","time":"2024-11-21T12:38:38.002184498Z","component":"shard-controller","leader":{"public":"pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648","internal":"pulsar-ci-oxia-svc.pulsar.svc:6649"},"namespace":"broker","shard":0,"term":13,"time":"2024-11-21T12:38:38.002203163Z","message":"Elected new leader"}                                                 │
│ {"level":"info","time":"2024-11-21T12:38:38.002277151Z","component":"shard-controller","leader":{"public":"pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648","internal":"pulsar-ci-oxia-svc.pulsar.svc:6649"},"namespace":"broker","shard":0,"time":"2024-11-21T12:38:38.002287801Z","message":"Shard is ready"}

@lhotari
Copy link
Member

lhotari commented Nov 21, 2024

@yuweisung If you'd like to debug issues in GitHub Actions, you can ssh into the running build job VM when you open a PR to your own fork. ssh authentication will happen with your public keys at https://github.com/yuweisung.keys .
This is how I've been working on this: lhotari#12 .
Each job will have ssh connection details.
image
You can run "k9s" after the k8s cluster is running.

Attempt to fix
io.grpc.StatusRuntimeException: UNIMPLEMENTED: unknown service io.streamnative.oxia.proto.OxiaClient

Since found this error in the logs:
  rpc error: code = Code(110) desc = oxia: namespace not found
  Failed to add client for shards assignments notifications
- org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer is the only implementation
@lhotari
Copy link
Member

lhotari commented Nov 22, 2024

One gap in Pulsar is the lack of a org.apache.pulsar.packages.management.core.PackagesStorage implementation that works with Oxia. The org.apache.pulsar.packages.management.storage.bookkeeper.BookKeeperPackagesStorage implementation uses the DistributedLog library which requires Zookeeper.

error message:

pulsar-ci-toolset-0:/pulsar$ bin/pulsar-admin functions create --tenant pulsar-ci --namespace test --name test-function --inputs "pulsar-ci/test/test_input" --output "pulsar-ci/test/test_output" --parallelism 1 --classname org.apache.pulsar.functions.api.examples.ExclamationFunction --jar /pulsar/examples/api-examples.jar
Cannot invoke "org.apache.distributedlog.api.namespace.Namespace.logExists(String)" because "dlogNamespace" is null

Reason: Cannot invoke "org.apache.distributedlog.api.namespace.Namespace.logExists(String)" because "dlogNamespace" is null

I have disabled function tests for Oxia in CI to make the tests pass. We can merge this PR without functions support for Oxia.

@lhotari lhotari merged commit c6ce11a into apache:master Nov 22, 2024
32 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants