Skip to content

Commit

Permalink
Add broker interceptor for Intercepting all Pulsar command and REST A…
Browse files Browse the repository at this point in the history
…PI requests. (#7143)

Add broker interceptor for Intercepting all Pulsar command and REST API requests
  • Loading branch information
codelipenghui authored Jun 9, 2020
1 parent f8a697d commit dbc0649
Show file tree
Hide file tree
Showing 21 changed files with 1,109 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,18 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int maxNumPartitionsPerPartitionedTopic = 0;

@FieldContext(
category = CATEGORY_SERVER,
doc = "The directory to locate broker interceptors"
)
private String brokerInterceptorsDirectory = "./interceptors";

@FieldContext(
category = CATEGORY_SERVER,
doc = "List of broker interceptor to load, which is a list of broker interceptor names"
)
private Set<String> brokerInterceptors = Sets.newTreeSet();

@FieldContext(
doc = "There are two policies when zookeeper session expired happens, \"shutdown\" and \"reconnect\". \n\n"
+ " If uses \"shutdown\" policy, shutdown the broker when zookeeper session expired happens.\n\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.intercept.BrokerInterceptors;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService.LeaderListener;
import org.apache.pulsar.broker.loadbalance.LoadManager;
Expand Down Expand Up @@ -202,6 +204,7 @@ public class PulsarService implements AutoCloseable {

private MetricsGenerator metricsGenerator;
private TransactionMetadataStoreService transactionMetadataStoreService;
private BrokerInterceptor brokerInterceptor;

public enum State {
Init, Started, Closed
Expand Down Expand Up @@ -454,7 +457,9 @@ public void start() throws PulsarServerException {

this.defaultOffloader = createManagedLedgerOffloader(
OffloadPolicies.create(this.getConfiguration().getProperties()));

this.brokerInterceptor = BrokerInterceptors.load(config);
brokerService.setInterceptor(getBrokerInterceptor());
this.brokerInterceptor.initialize(config);
brokerService.start();

this.webService = new WebService(this);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.intercept;

import com.google.common.annotations.Beta;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;

import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import java.io.IOException;

/**
* A plugin interface that allows you to intercept the
* client requests to the Pulsar brokers.
*
* <p>BrokerInterceptor callbacks may be called from multiple threads. Interceptor
* implementation must ensure thread-safety, if needed.
*/
@Beta
public interface BrokerInterceptor extends AutoCloseable {

/**
* Called by the broker while new command incoming.
*/
void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws Exception;

/**
* Called by the web service while new request incoming.
*/
void onWebServiceRequest(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException;

/**
* Initialize the broker interceptor.
*
* @throws Exception when fail to initialize the broker interceptor.
*/
void initialize(ServiceConfiguration conf) throws Exception;

BrokerInterceptor DISABLED = new BrokerInterceptorDisabled();

/**
* Broker interceptor disabled implementation.
*/
class BrokerInterceptorDisabled implements BrokerInterceptor {

@Override
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws Exception {
//No-op
}

@Override
public void onWebServiceRequest(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
chain.doFilter(request, response);
}

@Override
public void initialize(ServiceConfiguration conf) throws Exception {
//No-op
}

@Override
public void close() {
//No-op
}
}

/**
* Close this broker interceptor.
*/
@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.intercept;

import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Metadata information about a broker interceptor.
*/
@Data
@NoArgsConstructor
public class BrokerInterceptorDefinition {

/**
* The name of the broker interceptor.
*/
private String name;

/**
* The description of the broker interceptor to be used for user help.
*/
private String description;

/**
* The class name for the broker interceptor.
*/
private String interceptorClass;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.intercept;

import lombok.Data;
import lombok.experimental.Accessors;
import java.util.Map;
import java.util.TreeMap;

/**
* The collection of broker interceptor.
*/
@Data
@Accessors(fluent = true)
public class BrokerInterceptorDefinitions {

private final Map<String, BrokerInterceptorMetadata> interceptors = new TreeMap<>();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.intercept;

import lombok.Data;
import lombok.NoArgsConstructor;

import java.nio.file.Path;

/**
* The metadata of broker interceptor
*/
@Data
@NoArgsConstructor
public class BrokerInterceptorMetadata {

/**
* The definition of the broker interceptor.
*/
private BrokerInterceptorDefinition definition;

/**
* The path to the handler package.
*/
private Path archivePath;
}
Loading

0 comments on commit dbc0649

Please sign in to comment.