Skip to content

Commit

Permalink
move disco
Browse files Browse the repository at this point in the history
  • Loading branch information
kgyrtkirk committed May 16, 2024
1 parent cab3d94 commit 27735f2
Show file tree
Hide file tree
Showing 3 changed files with 289 additions and 134 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.quidem;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.discovery.DruidNodeDiscovery.Listener;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.server.DruidNode;
import org.apache.druid.sql.calcite.run.NativeSqlEngine;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.schema.BrokerSegmentMetadataCache;
import org.apache.druid.sql.calcite.util.CalciteTests;

import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.BooleanSupplier;

public class DiscovertModule extends AbstractModule {

DiscovertModule() {
}

@Override
protected void configure()
{
// builder.addModule(propOverrideModuel());
}

@Provides
@LazySingleton
public BrokerSegmentMetadataCache provideCache() {
return null;
}

@Provides
@LazySingleton
public Properties getProps() {
Properties localProps = new Properties();
localProps.put("druid.enableTlsPort", "false");
localProps.put("druid.zk.service.enabled", "false");
localProps.put("druid.plaintextPort", "12345");
localProps.put("druid.host", "localhost");
return localProps;
}

@Provides
@LazySingleton
public SqlEngine createMockSqlEngine(
final QuerySegmentWalker walker,
final QueryRunnerFactoryConglomerate conglomerate,
@Json ObjectMapper jsonMapper )
{
return new NativeSqlEngine(CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), jsonMapper);
}


@Provides
@LazySingleton
DruidNodeDiscoveryProvider getProvider() {
final DruidNode coordinatorNode = new DruidNode("test-coordinator", "dummy", false, 8081, null, true, false);
DiscovertModule.FakeDruidNodeDiscoveryProvider provider = new FakeDruidNodeDiscoveryProvider(
ImmutableMap.of(
NodeRole.COORDINATOR, new FakeDruidNodeDiscovery(ImmutableMap.of(NodeRole.COORDINATOR, coordinatorNode))
)
);
return provider;
}

/**
* A fake {@link DruidNodeDiscoveryProvider} for {@link #createMockSystemSchema}.
*/
private static class FakeDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
{
private final Map<NodeRole, DiscovertModule.FakeDruidNodeDiscovery> nodeDiscoveries;

public FakeDruidNodeDiscoveryProvider(Map<NodeRole, DiscovertModule.FakeDruidNodeDiscovery> nodeDiscoveries)
{
this.nodeDiscoveries = nodeDiscoveries;
}

@Override
public BooleanSupplier getForNode(DruidNode node, NodeRole nodeRole)
{
boolean get = nodeDiscoveries.getOrDefault(nodeRole, new FakeDruidNodeDiscovery())
.getAllNodes()
.stream()
.anyMatch(x -> x.getDruidNode().equals(node));
return () -> get;
}

@Override
public DruidNodeDiscovery getForNodeRole(NodeRole nodeRole)
{
return nodeDiscoveries.getOrDefault(nodeRole, new FakeDruidNodeDiscovery());
}
}

private static class FakeDruidNodeDiscovery implements DruidNodeDiscovery
{
private final Set<DiscoveryDruidNode> nodes;

FakeDruidNodeDiscovery()
{
this.nodes = new HashSet<>();
}

FakeDruidNodeDiscovery(Map<NodeRole, DruidNode> nodes)
{
this.nodes = Sets.newHashSetWithExpectedSize(nodes.size());
nodes.forEach((k, v) -> {
addNode(v, k);
});
}

@Override
public Collection<DiscoveryDruidNode> getAllNodes()
{
return nodes;
}

void addNode(DruidNode node, NodeRole role)
{
final DiscoveryDruidNode discoveryNode = new DiscoveryDruidNode(node, role, ImmutableMap.of());
this.nodes.add(discoveryNode);
}

@Override
public void registerListener(Listener listener)
{

}
}



}
135 changes: 1 addition & 134 deletions integration-tests/src/main/java/org/apache/druid/quidem/Launcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
Expand All @@ -38,10 +36,6 @@
import org.apache.druid.cli.CliBroker2;
import org.apache.druid.curator.CuratorModule;
import org.apache.druid.curator.discovery.DiscoveryModule;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.AnnouncerModule;
import org.apache.druid.guice.BrokerProcessingModule;
import org.apache.druid.guice.BrokerServiceModule;
Expand All @@ -67,7 +61,6 @@
import org.apache.druid.guice.StorageNodeModule;
import org.apache.druid.guice.annotations.Client;
import org.apache.druid.guice.annotations.EscalatedClient;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.http.HttpClientModule;
import org.apache.druid.guice.security.AuthenticatorModule;
import org.apache.druid.guice.security.AuthorizerModule;
Expand All @@ -83,7 +76,6 @@
import org.apache.druid.metadata.storage.derby.DerbyMetadataStorageDruidModule;
import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.rpc.guice.ServiceClientModule;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
Expand Down Expand Up @@ -112,9 +104,7 @@
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.run.NativeSqlEngine;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.schema.BrokerSegmentMetadataCache;
import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.apache.druid.sql.calcite.schema.DruidSchemaName;
import org.apache.druid.sql.calcite.util.CacheTestHelperModule;
Expand All @@ -141,14 +131,10 @@
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.BooleanSupplier;

public class Launcher
{
Expand Down Expand Up @@ -457,126 +443,6 @@ private static Module propOverrideModuel1()
return m;
}

public static class DiscovertModule extends AbstractModule {

DiscovertModule() {
}

@Override
protected void configure()
{
// builder.addModule(propOverrideModuel());
}

@Provides
@LazySingleton
public BrokerSegmentMetadataCache provideCache() {
return null;
}

@Provides
@LazySingleton
public Properties getProps() {
Properties localProps = new Properties();
localProps.put("druid.enableTlsPort", "false");
localProps.put("druid.zk.service.enabled", "false");
localProps.put("druid.plaintextPort", "12345");
localProps.put("druid.host", "localhost");
return localProps;
}

@Provides
@LazySingleton
public SqlEngine createMockSqlEngine(
final QuerySegmentWalker walker,
final QueryRunnerFactoryConglomerate conglomerate,
@Json ObjectMapper jsonMapper )
{
return new NativeSqlEngine(CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), jsonMapper);
}


@Provides
@LazySingleton
DruidNodeDiscoveryProvider getProvider() {
final DruidNode coordinatorNode = new DruidNode("test-coordinator", "dummy", false, 8081, null, true, false);
FakeDruidNodeDiscoveryProvider provider = new FakeDruidNodeDiscoveryProvider(
ImmutableMap.of(
NodeRole.COORDINATOR, new FakeDruidNodeDiscovery(ImmutableMap.of(NodeRole.COORDINATOR, coordinatorNode))
)
);
return provider;
}

/**
* A fake {@link DruidNodeDiscoveryProvider} for {@link #createMockSystemSchema}.
*/
private static class FakeDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
{
private final Map<NodeRole, FakeDruidNodeDiscovery> nodeDiscoveries;

public FakeDruidNodeDiscoveryProvider(Map<NodeRole, FakeDruidNodeDiscovery> nodeDiscoveries)
{
this.nodeDiscoveries = nodeDiscoveries;
}

@Override
public BooleanSupplier getForNode(DruidNode node, NodeRole nodeRole)
{
boolean get = nodeDiscoveries.getOrDefault(nodeRole, new FakeDruidNodeDiscovery())
.getAllNodes()
.stream()
.anyMatch(x -> x.getDruidNode().equals(node));
return () -> get;
}

@Override
public DruidNodeDiscovery getForNodeRole(NodeRole nodeRole)
{
return nodeDiscoveries.getOrDefault(nodeRole, new FakeDruidNodeDiscovery());
}
}

private static class FakeDruidNodeDiscovery implements DruidNodeDiscovery
{
private final Set<DiscoveryDruidNode> nodes;

FakeDruidNodeDiscovery()
{
this.nodes = new HashSet<>();
}

FakeDruidNodeDiscovery(Map<NodeRole, DruidNode> nodes)
{
this.nodes = Sets.newHashSetWithExpectedSize(nodes.size());
nodes.forEach((k, v) -> {
addNode(v, k);
});
}

@Override
public Collection<DiscoveryDruidNode> getAllNodes()
{
return nodes;
}

void addNode(DruidNode node, NodeRole role)
{
final DiscoveryDruidNode discoveryNode = new DiscoveryDruidNode(node, role, ImmutableMap.of());
this.nodes.add(discoveryNode);
}

@Override
public void registerListener(Listener listener)
{

}
}



}

static class CustomStartupInjectorBuilder extends StartupInjectorBuilder {

private List<com.google.inject.Module> overrideModules =new ArrayList<>();
Expand Down Expand Up @@ -853,6 +719,7 @@ protected List<? extends Module> getModules() {
// ret.add(new AvaticaBasedConnectionModule());
ret.add(binder -> binder.bind(RequestLogger.class).toInstance(new TestRequestLogger()));
ret.add(CacheTestHelperModule.ResultCacheMode.DISABLED.makeModule());
// ret.add(CacheTestHelperModule2.ResultCacheMode.DISABLED.makeModule());
ret.add(new QuidemCaptureModule());
ret.addAll(super.getModules());
return ret;
Expand Down
Loading

0 comments on commit 27735f2

Please sign in to comment.