Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

ISSUE-10293: [Functions] Unable to create/update functions and Admin operations often fail #2422

Open
sijie opened this issue Apr 21, 2021 · 0 comments

Comments

@sijie
Copy link
Member

sijie commented Apr 21, 2021

Original Issue: apache#10293


I created a build based on master (2.8.0-SNAPSHOT), and I'm getting a number of issues.

When I try to update, start, stop a function, get function status, or get function stats, I get this exception (or something similar) 90% of the time:

2021-04-21T01:43:37,293 [pulsar-web-46-14] ERROR org.apache.pulsar.functions.worker.rest.api.ComponentImpl - Update Failed
org.apache.pulsar.client.admin.PulsarAdminException: java.util.concurrent.CompletionException: org.asynchttpclient.exception.RemotelyClosedException: Remotely closed
at org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:229) ~[org.apache.pulsar-pulsar-client-admin-original-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at org.apache.pulsar.client.admin.internal.FunctionsImpl.lambda$updateOnWorkerLeaderAsync$13(FunctionsImpl.java:1078) ~[org.apache.pulsar-pulsar-client-admin-original-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884) ~[?:1.8.0_282]
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866) ~[?:1.8.0_282]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_282]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_282]
at org.asynchttpclient.netty.NettyResponseFuture.abort(NettyResponseFuture.java:273) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
at org.asynchttpclient.netty.request.NettyRequestSender.abort(NettyRequestSender.java:473) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
at org.asynchttpclient.netty.request.NettyRequestSender.handleUnexpectedClosedChannel(NettyRequestSender.java:484) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
at org.asynchttpclient.netty.handler.AsyncHttpClientHandler.channelInactive(AsyncHttpClientHandler.java:145) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.handler.stream.ChunkedWriteHandler.channelInactive(ChunkedWriteHandler.java:138) ~[io.netty-netty-handler-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.handler.codec.http.HttpContentDecoder.channelInactive(HttpContentDecoder.java:235) ~[io.netty-netty-codec-http-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelInactive(CombinedChannelDuplexHandler.java:418) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:389) ~[io.netty-netty-codec-4.1.63.Final.jar:4.1.63.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:354) ~[io.netty-netty-codec-4.1.63.Final.jar:4.1.63.Final]
at io.netty.handler.codec.http.HttpClientCodec$Decoder.channelInactive(HttpClientCodec.java:311) ~[io.netty-netty-codec-http-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.CombinedChannelDuplexHandler.channelInactive(CombinedChannelDuplexHandler.java:221) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:831) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) ~[io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
Caused by: java.util.concurrent.CompletionException: org.asynchttpclient.exception.RemotelyClosedException: Remotely closed
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_282]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_282]
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:661) ~[?:1.8.0_282]
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) ~[?:1.8.0_282]
... 38 more
Caused by: org.asynchttpclient.exception.RemotelyClosedException: Remotely closed
at org.asynchttpclient.exception.RemotelyClosedException.INSTANCE(Unknown Source) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
500 Internal Server Error HTTP/1.1

I suspect this is related to apache#10291

Steps to reproduce:

Create a build server to build Pulsar from apache/master via this commands:

# Install docker and git
sudo apt-get update
sudo apt-get install \
    apt-transport-https \
    ca-certificates \
    curl \
    gnupg \
    lsb-release -y
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg
echo \
  "deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu \
  $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
sudo apt-get update
sudo apt-get install docker-ce docker-ce-cli containerd.io -y
# download my Pulsar fork
git clone https://github.com/devinbost/pulsar.git

# download and install act
curl https://raw.githubusercontent.com/nektos/act/master/install.sh | sudo bash
# checkout my Pulsar branch
cd pulsar
git checkout disable-function-batching

# Fix docker permissions
sudo groupadd docker
sudo usermod -aG docker ${USER}
sudo chmod 666 /var/run/docker.sock
sudo chmod g+rwx "$HOME/.docker" -R
sudo chown "$USER":"$USER" /home/"$USER"/.docker -R
# May need to run: sudo systemctl restart docker

# Verify these commands can be run:
docker pull gcr.io/google-samples/hello-app:1.0
docker run hello-world

# Install Java JDK 8
sudo apt-get update
sudo apt-get install openjdk-8-jdk -y

# Setup maven
wget https://archive.apache.org/dist/maven/maven-3/3.6.1/binaries/apache-maven-3.6.1-bin.tar.gz
sudo tar xf ./apache-maven-*.tar.gz -C /opt
sudo ln -s /opt/apache-maven-3.6.1/bin/mvn /usr/local/bin/mvn

# install more dependencies
sudo apt-get install unzip zip

# Build
mvn install -DskipTests
cd docker
mvn package -Pdocker -DskipTests

# Tag and push image to GCP Container Registry (or an equivalent container registry):
docker tag apachepulsar/pulsar-all gcr.io/$PROJECT_ID/$NAME

docker push gcr.io/$PROJECT_ID/$NAME

Spin up a Pulsar cluster using this broker config:

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Original content brought from version 2.3.0 release

### --- General broker settings --- ###

# Zookeeper quorum connection string
zookeeperServers=fab08.xyz.com:2181,fab09.xyz.com:2181,fab10.xyz.com:2181

# Configuration Store connection string
configurationStoreServers=fab08.xyz.com:2181,fab09.xyz.com:2181,fab10.xyz.com:2181
# Broker data port
brokerServicePort=6650

# Port to use to server HTTP request
webServicePort=8080

# Broker data port for TLS - By default TLS is disabled
brokerServicePortTls=6651

# Port to use to server HTTPS request - By default TLS is disabled
webServicePortTls=8443
# Hostname or IP address the service binds on, default is 0.0.0.0.
bindAddress=0.0.0.0

# Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors()
numIOThreads=

# Number of threads to use for HTTP requests processing. Default is set to Runtime.getRuntime().availableProcessors()
numHttpServerThreads=

# Flag to control features that are meant to be used when running in standalone mode
isRunningStandalone=

# Name of the cluster to which this broker belongs to
clusterName=pulsar-pcdc1-green-test

# Enable cluster's failure-domain which can distribute brokers into logical region
failureDomainsEnabled=false

# Zookeeper session timeout in milliseconds
zooKeeperSessionTimeoutMillis=30000
# Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
brokerShutdownTimeoutMs=60000

# Enable backlog quota check. Enforces action on topic when the quota is reached
backlogQuotaCheckEnabled=true

# How often to check for topics that have reached the quota
backlogQuotaCheckIntervalInSeconds=60

# Default per-topic backlog quota limit
backlogQuotaDefaultLimitGB=10

# Default backlog quota retention policy. Default is producer_request_hold
# 'producer_request_hold' Policy which holds producer's send request until the resource becomes available (or holding times out)
# 'producer_exception' Policy which throws javax.jms.ResourceAllocationException to the producer
# 'consumer_backlog_eviction' Policy which evicts the oldest message from the slowest consumer's backlog
backlogQuotaDefaultRetentionPolicy=producer_request_hold
# Enable the deletion of inactive topics
brokerDeleteInactiveTopicsEnabled=true

# How often to check for inactive topics
brokerDeleteInactiveTopicsFrequencySeconds=60

# How frequently to proactively check and purge expired messages
messageExpiryCheckIntervalInMinutes=5

# How long to delay rewinding cursor and dispatching messages when active consumer is changed
activeConsumerFailoverDelayTimeMillis=1000

# How long to delete inactive subscriptions from last consuming
# When it is 0, inactive subscriptions are not deleted automatically
subscriptionExpirationTimeMinutes=0

# Enable subscription message redelivery tracker to send redelivery count to consumer (default is enabled)
subscriptionRedeliveryTrackerEnabled=true

# How frequently to proactively check and purge expired subscription
subscriptionExpiryCheckIntervalInMinutes=5
# Set the default behavior for message deduplication in the broker
# This can be overridden per-namespace. If enabled, broker will reject
# messages that were already stored in the topic
brokerDeduplicationEnabled=false

# Maximum number of producer information that it's going to be
# persisted for deduplication purposes
brokerDeduplicationMaxNumberOfProducers=10000

# Number of entries after which a dedup info snapshot is taken.
# A larger interval will lead to fewer snapshots being taken, though it would
# increase the topic recovery time when the entries published after the
# snapshot need to be replayed.
brokerDeduplicationEntriesInterval=1000

# Time of inactivity after which the broker will discard the deduplication information
# relative to a disconnected producer. Default is 6 hours.
brokerDeduplicationProducerInactivityTimeoutMinutes=360

# When a namespace is created without specifying the number of bundle, this
# value will be used as the default
defaultNumberOfNamespaceBundles=64

# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false

# Path for the file used to determine the rotation status for the broker when responding
# to service discovery health checks
statusFilePath=

# If true, (and ModularLoadManagerImpl is being used), the load manager will attempt to
# use only brokers running the latest software version (to minimize impact to bundles)
preferLaterVersions=false

# Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending
# messages to consumer once, this limit reaches until consumer starts acknowledging messages back.
# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
maxUnackedMessagesPerConsumer=0

# Max number of unacknowledged messages allowed per shared subscription. Broker will stop dispatching messages to
# all consumers of the subscription once this limit reaches until consumer starts acknowledging messages back and
# unack count reaches to limit/2. Using a value of 0, is disabling unackedMessage-limit
# check and dispatcher can dispatch messages without any restriction
maxUnackedMessagesPerSubscription=0

# Max number of unacknowledged messages allowed per broker. Once this limit reaches, broker will stop dispatching
# messages to all shared subscription which has higher number of unack messages until subscriptions start
# acknowledging messages back and unack count reaches to limit/2. Using a value of 0, is disabling
# unackedMessage-limit check and broker doesn't block dispatchers
maxUnackedMessagesPerBroker=0

# Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher unacked messages
# than this percentage limit and subscription will not receive any new messages until that subscription acks back
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16
# Too many subscribe requests from a consumer can cause broker rewinding consumer cursors and loading data from bookies,
# hence causing high network bandwidth usage
# When the positive value is set, broker will throttle the subscribe requests for one consumer.
# Otherwise, the throttling will be disabled. The default value of this setting is 0 - throttling is disabled.
subscribeThrottlingRatePerConsumer=0

# Rate period for {subscribeThrottlingRatePerConsumer}. Default is 30s.
subscribeRatePeriodPerConsumerInSecond=30

# Default messages per second dispatch throttling-limit for every topic. Using a value of 0, is disabling default
# message dispatch-throttling
dispatchThrottlingRatePerTopicInMsg=0

# Default bytes per second dispatch throttling-limit for every topic. Using a value of 0, is disabling
# default message-byte dispatch-throttling
dispatchThrottlingRatePerTopicInByte=0

# Default number of message dispatching throttling-limit for a subscription.
# Using a value of 0, is disabling default message dispatch-throttling.
dispatchThrottlingRatePerSubscriptionInMsg=0

# Default number of message-bytes dispatching throttling-limit for a subscription.
# Using a value of 0, is disabling default message-byte dispatch-throttling.
# Devin Bost: I think the line below was a typo in a config... But, I'm leaving it in case it's required.
dispatchThrottlingRatePerSubscribeInByte=0

# By default we enable dispatch-throttling for both caught up consumers as well as consumers who have
# backlog.
dispatchThrottlingOnNonBacklogConsumerEnabled=true
# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
maxConcurrentLookupRequest=50000

# Max number of concurrent topic loading request broker allows to control number of zk-operations
maxConcurrentTopicLoadRequest=5000

# Max concurrent non-persistent message can be processed per connection
maxConcurrentNonPersistentMessagePerConnection=1000

# Number of worker threads to serve non-persistent topic
numWorkerThreadsForNonPersistentTopic=8

# Enable broker to load persistent topics
enablePersistentTopics=true

# Enable broker to load non-persistent topics
enableNonPersistentTopics=true

# Enable to run bookie along with broker
enableRunBookieTogether=false

# Enable to run bookie autorecovery along with broker
enableRunBookieAutoRecoveryTogether=false

# Max number of producers allowed to connect to topic. Once this limit reaches, Broker will reject new producers
# until the number of connected producers decrease.
# Using a value of 0, is disabling maxProducersPerTopic-limit check.
maxProducersPerTopic=0

# Max number of consumers allowed to connect to topic. Once this limit reaches, Broker will reject new consumers
# until the number of connected consumers decrease.
# Using a value of 0, is disabling maxConsumersPerTopic-limit check.
maxConsumersPerTopic=0

# Max number of consumers allowed to connect to subscription. Once this limit reaches, Broker will reject new consumers
# until the number of connected consumers decrease.
# Using a value of 0, is disabling maxConsumersPerSubscription-limit check.
maxConsumersPerSubscription=0
# Interval between checks to see if topics with compaction policies need to be compacted
brokerServiceCompactionMonitorIntervalInSeconds=60
### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
proxyRoles=

# If this flag is set then the broker authenticates the original Auth data
# else it just accepts the originalPrincipal and authorizes it (if required).
authenticateOriginalAuthData=false

# Deprecated - Use webServicePortTls and brokerServicePortTls instead
#
#   * We have the same logic above for the way of enabling TLS through port assignments
#
tlsEnabled=true
# Path for the TLS certificate file
tlsCertificateFilePath=/pulsar/conf/auth/broker.cert.pem

# Path for the TLS private key file
tlsKeyFilePath=/pulsar/conf/auth/broker.key-pk8.pem

# Path for the trusted TLS certificate file.
# This cert is used to verify that any certs presented by connecting clients
# are signed by a certificate authority. If this verification
# fails, then the certs are untrusted and the connections are dropped.
tlsTrustCertsFilePath=/pulsar/conf/auth/ca.cert.pem

# Accept untrusted TLS certificate from client.
# If true, a client with a cert which cannot be verified with the
# 'tlsTrustCertsFilePath' cert will allowed to connect to the server,
# though the cert will not be used for client authentication.
tlsAllowInsecureConnection=false

# Specify the tls protocols the broker will use to negotiate during TLS handshake
# (a comma-separated list of protocol names).
# Examples:- [TLSv1.2, TLSv1.1, TLSv1]
tlsProtocols=TLSv1.2,TLSv1.1,TLSv1

# Specify the tls cipher the broker will use to negotiate during TLS Handshake
# (a comma-separated list of ciphers).
# Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
tlsCiphers=ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-CHACHA20-POLY1305,ECDHE-RSA-CHACHA20-POLY1305,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,ECDHE-PSK-CHACHA20-POLY1305,AES256-GCM-SHA384,AES128-GCM-SHA256,ECDHE-PSK-AES256-CBC-SHA,AES256-SHA,PSK-AES256-CBC-SHA,ECDHE-PSK-AES128-CBC-SHA,AES128-SHA,PSK-AES128-CBC-SHA

# Trusted client certificates are required for to connect TLS
# Reject the Connection if the Client Certificate is not trusted.
# In effect, this requires that all connecting clients perform TLS client
# authentication.
tlsRequireTrustedClientCertOnConnect=false



### --- Authentication --- ###

# Enable authentication
authenticationEnabled=true

# Autentication provider name list, which is comma separated list of class names
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
# Enforce authorization
authorizationEnabled=true

# Authorization provider fully qualified class-name
authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider

# Allow wildcard matching in authorization
# (wildcard matching only applicable if wildcard-char:
# * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
authorizationAllowWildcardsMatching=false

# Role names that are treated as "super-user", meaning they will be able to do all admin
# operations and publish/consume from all topics
superUserRoles=admin,superuser

# Authentication settings of the broker itself. Used when the broker connects to other brokers,
# either in same or other clusters
brokerClientTlsEnabled=false
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
brokerClientAuthenticationParameters=token:eyJhbGc...xyx
brokerClientTrustCertsFilePath=/pulsar/conf/auth/ca.cert.pem


# brokerClientTlsEnabled=false
# brokerClientAuthenticationPlugin=
# brokerClientAuthenticationParameters=
# brokerClientTrustCertsFilePath=


# Supported Athenz provider domain names(comma separated) for authentication
athenzDomainNames=

# When this parameter is not empty, unauthenticated users perform as anonymousUserRole
anonymousUserRole=anonymous

### --- Token Authentication Provider --- ###

## Symmetric key
# Configure the secret key to be used to validate auth tokens
# The key can be specified like:
# tokenSecretKey=data:base64,xxxxxxxxx
# tokenSecretKey=file:///my/secret.key
# tokenSecretKey=

## Asymmetric public/private key pair
# Configure the public key to be used to validate auth tokens
# The key can be specified like:
# tokenPublicKey=data:base64,xxxxxxxxx
# tokenPublicKey=file:///my/public.key
tokenPublicKey=file:///pulsar/conf/auth/public.key
### --- BookKeeper Client --- ###

# Authentication plugin to use when connecting to bookies
bookkeeperClientAuthenticationPlugin=

# BookKeeper auth plugin implementatation specifics parameters name and values
bookkeeperClientAuthenticationParametersName=
bookkeeperClientAuthenticationParameters=

# Timeout for BK add / read operations
bookkeeperClientTimeoutInSeconds=30

# Speculative reads are initiated if a read request doesn't complete within a certain time
# Using a value of 0, is disabling the speculative reads
bookkeeperClientSpeculativeReadTimeoutInMillis=0

# Use older Bookkeeper wire protocol with bookie
bookkeeperUseV2WireProtocol=true

# Enable bookies health check. Bookies that have more than the configured number of failure within
# the interval will be quarantined for some time. During this period, new ledgers won't be created
# on these bookies
bookkeeperClientHealthCheckEnabled=true
bookkeeperClientHealthCheckIntervalSeconds=60
bookkeeperClientHealthCheckErrorThresholdPerInterval=5
bookkeeperClientHealthCheckQuarantineTimeInSeconds=1800

# Enable rack-aware bookie selection policy. BK will chose bookies from different racks when
# forming a new bookie ensemble
bookkeeperClientRackawarePolicyEnabled=true

# Enable region-aware bookie selection policy. BK will chose bookies from
# different regions and racks when forming a new bookie ensemble
# If enabled, the value of bookkeeperClientRackawarePolicyEnabled is ignored
bookkeeperClientRegionawarePolicyEnabled=false

# Enable/disable reordering read sequence on reading entries.
bookkeeperClientReorderReadSequenceEnabled=false

# Enable bookie isolation by specifying a list of bookie groups to choose from. Any bookie
# outside the specified groups will not be used by the broker
bookkeeperClientIsolationGroups=
# Enable/disable having read operations for a ledger to be sticky to a single bookie.
# If this flag is enabled, the client will use one single bookie (by preference) to read
# all entries for a ledger.
bookkeeperEnableStickyReads=true


### --- Managed Ledger --- ###

# Number of bookies to use when creating a ledger
managedLedgerDefaultEnsembleSize=2

# Number of copies to store for each message
managedLedgerDefaultWriteQuorum=2

# Number of guaranteed copies (acks to wait before write is complete)
managedLedgerDefaultAckQuorum=2

# Default type of checksum to use when writing to BookKeeper. Default is "CRC32C"
# Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum).
managedLedgerDigestType=CRC32C

# Number of threads to be used for managed ledger tasks dispatching
managedLedgerNumWorkerThreads=8

# Number of threads to be used for managed ledger scheduled tasks
managedLedgerNumSchedulerThreads=8

# Amount of memory to use for caching data payload in managed ledger. This memory
# is allocated from JVM direct memory and it's shared across all the topics
# running  in the same broker. By default, uses 1/5th of available direct memory
managedLedgerCacheSizeMB=
# Threshold to which bring down the cache level when eviction is triggered
managedLedgerCacheEvictionWatermark=0.9
# Rate limit the amount of writes per second generated by consumer acking the messages
managedLedgerDefaultMarkDeleteRateLimit=1.0

# Max number of entries to append to a ledger before triggering a rollover
# A ledger rollover is triggered on these conditions
#  * Either the max rollover time has been reached
#  * or max entries have been written to the ledged and at least min-time
#    has passed   (50000)
managedLedgerMaxEntriesPerLedger=10

managedLedgerMinLedgerRolloverTimeMinutes=0

# Maximum time before forcing a ledger rollover for a topic
managedLedgerMaxLedgerRolloverTimeMinutes=240

# Delay between a ledger being successfully offloaded to long term storage
# and the ledger being deleted from bookkeeper (default is 4 hours)
managedLedgerOffloadDeletionLagMs=14400000
# Max number of entries to append to a cursor ledger
managedLedgerCursorMaxEntriesPerLedger=50000

# Max time before triggering a rollover on a cursor ledger
managedLedgerCursorRolloverTimeInSeconds=14400

# Max number of "acknowledgment holes" that are going to be persistently stored.
# When acknowledging out of order, a consumer will leave holes that are supposed
# to be quickly filled by acking all the messages. The information of which
# messages are acknowledged is persisted by compressing in "ranges" of messages
# that were acknowledged. After the max number of ranges is reached, the information
# will only be tracked in memory and messages will be redelivered in case of
# crashes.
managedLedgerMaxUnackedRangesToPersist=10000

# Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher
# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
# zookeeper.
managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000

# Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets
# corrupted at bookkeeper and managed-cursor is stuck at that ledger.
autoSkipNonRecoverableData=false

# operation timeout while updating managed-ledger metadata.
managedLedgerMetadataOperationsTimeoutSeconds=60
# Read entries timeout when broker tries to read messages from bookkeeper.
managedLedgerReadEntryTimeoutSeconds=120

# Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it).
managedLedgerAddEntryTimeoutSeconds=120

### --- Load balancer --- ###

# Enable load balancer
loadBalancerEnabled=true

# Percentage of change to trigger load report update
loadBalancerReportUpdateThresholdPercentage=10

# maximum interval to update load report
loadBalancerReportUpdateMaxIntervalMinutes=15

# Frequency of report to collect
loadBalancerHostUsageCheckIntervalMinutes=1

# Enable/disable automatic bundle unloading for load-shedding
loadBalancerSheddingEnabled=true

# Load shedding interval. Broker periodically checks whether some traffic should be offload from
# some over-loaded broker to other under-loaded brokers
loadBalancerSheddingIntervalMinutes=1

# Prevent the same topics to be shed and moved to other broker more that once within this timeframe
loadBalancerSheddingGracePeriodMinutes=30

# Usage threshold to allocate max number of topics to broker
loadBalancerBrokerMaxTopics=50000

# Usage threshold to determine a broker as over-loaded
loadBalancerBrokerOverloadedThresholdPercentage=85

# Interval to flush dynamic resource quota to ZooKeeper
loadBalancerResourceQuotaUpdateIntervalMinutes=15

# enable/disable namespace bundle auto split
loadBalancerAutoBundleSplitEnabled=true

# enable/disable automatic unloading of split bundles
loadBalancerAutoUnloadSplitBundlesEnabled=true

# maximum topics in a bundle, otherwise bundle split will be triggered
loadBalancerNamespaceBundleMaxTopics=1000

# maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered
loadBalancerNamespaceBundleMaxSessions=1000

# maximum msgRate (in + out) in a bundle, otherwise bundle split will be triggered
loadBalancerNamespaceBundleMaxMsgRate=30000

# maximum bandwidth (in + out) in a bundle, otherwise bundle split will be triggered
loadBalancerNamespaceBundleMaxBandwidthMbytes=100

# maximum number of bundles in a namespace
loadBalancerNamespaceMaximumBundles=128

# Override the auto-detection of the network interfaces max speed.
# This option is useful in some environments (eg: EC2 VMs) where the max speed
# reported by Linux is not reflecting the real bandwidth available to the broker.
# Since the network usage is employed by the load manager to decide when a broker
# is overloaded, it is important to make sure the info is correct or override it
# with the right value here. The configured value can be a double (eg: 0.8) and that
# can be used to trigger load-shedding even before hitting on NIC limits.
loadBalancerOverrideBrokerNicSpeedGbps=

# Name of load manager to use
loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl
### --- Replication --- ###

# Enable replication metrics
replicationMetricsEnabled=true

# Max number of connections to open for each broker in a remote cluster
# More connections host-to-host lead to better throughput over high-latency
# links.
replicationConnectionsPerBroker=16

# Replicator producer queue size
replicationProducerQueueSize=1000

# Replicator prefix used for replicator producer name and cursor name
replicatorPrefix=pulsar.repl

# Default message retention time
defaultRetentionTimeInMinutes=0

# Default retention size
defaultRetentionSizeInMB=0

# How often to check whether the connections are still alive
keepAliveIntervalSeconds=30

# bootstrap namespaces
bootstrapNamespaces=

### --- WebSocket --- ###

# Enable the WebSocket API service in broker
webSocketServiceEnabled=false

# Number of IO threads in Pulsar Client used in WebSocket proxy
webSocketNumIoThreads=8

# Number of connections per Broker in Pulsar Client used in WebSocket proxy
webSocketConnectionsPerBroker=8

# Time in milliseconds that idle WebSocket session times out
webSocketSessionIdleTimeoutMillis=300000

### --- Metrics --- ###

# Enable topic level metrics
exposeTopicLevelMetricsInPrometheus=true

# Enable consumer level metrics. default is false
exposeConsumerLevelMetricsInPrometheus=false

# Classname of Pluggable JVM GC metrics logger that can log GC specific metrics
# jvmGCMetricsLoggerClassName=

### --- Functions --- ###

# Enable Functions Worker Service in Broker
functionsWorkerEnabled=true


### --- Broker Web Stats --- ###

# Enable topic level metrics
exposePublisherStats=true
statsUpdateFrequencyInSecs=60
statsUpdateInitialDelayInSecs=60
### --- Schema storage --- ###
# The schema storage implementation used by this broker
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory

# Enforce schema validation on following cases:
#
# - if a producer without a schema attempts to produce to a topic with schema, the producer will be
#   failed to connect. PLEASE be carefully on using this, since non-java clients don't support schema.
#   if you enable this setting, it will cause non-java clients failed to produce.
isSchemaValidationEnforced=false

### --- Ledger Offloading --- ###

# The directory for all the offloader implementations
offloadersDirectory=./offloaders

# Driver to use to offload old data to long term storage (Possible values: S3, aws-s3, google-cloud-storage)
# When using google-cloud-storage, Make sure both Google Cloud Storage and Google Cloud Storage JSON API are enabled for
# the project (check from Developers Console -> Api&auth -> APIs).
### --- Deprecated config variables --- ###

# Deprecated. Use configurationStoreServers
globalZookeeperServers=

# Deprecated - Enable TLS when talking with other clusters to replicate messages
replicationTlsEnabled=false

# Deprecated. Use brokerDeleteInactiveTopicsFrequencySeconds for Pulsar 2.5.2+
brokerServicePurgeInactiveFrequencyInSeconds=60
advertisedAddress=

#Added as per request for debugging freezing topics
acknowledgmentAtBatchIndexLevelEnabled=true
unblockStuckSubscriptionEnabled=true

Start the brokers using this script:

#!/bin/bash

. /usr/local/bin/docker-functions
export SERVICE_USER=pulsar

pre_start () {
  echo "Broker pre-start"
  app="pulsar-broker"
  docker-standalone ps -aq --filter "name=pulsar-broker" | grep -q . && docker-standalone stop $app && docker-standalone rm -fv $app
  echo "stoped and removed"
}

ETC=/etc/pulsar
LOG=/var/log/pulsar/
DATA=/data/broker
APP_HOME=/

export DOCKER_RUN=" -d \
--memory-swappiness=0 \
--stop-timeout=300 \
--restart=unless-stopped \
--net=host \
-v $LOG:/pulsar/logs \
-v $DATA:/pulsar/data \
-v $ETC:/pulsar/conf:ro \
--user 0 \
gcr.io/$PROJECT_ID/$NAME:$TAG /pulsar/bin/pulsar broker "

run "$@"

It can be started via: pulsar-broker start

Then, connect to one of the brokers and issue:
bin/pulsar-admin functions [anything...]

That should cause the issue.

@sijie sijie added the type/bug label Apr 21, 2021
@sijie sijie changed the title ISSUE-10293: [Functions] Unable to create/update functions, Admin operations often fail, and performance is unusably slow ISSUE-10293: [Functions] Unable to create/update functions and Admin operations often fail Mar 4, 2022
@sijie sijie added the Stale label Mar 4, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

1 participant