-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
Copy pathLocalDefinitionsProvider.java
146 lines (124 loc) · 6.9 KB
/
LocalDefinitionsProvider.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.config.init;
import static io.airbyte.config.init.JsonDefinitionsHelper.addMissingCustomField;
import static io.airbyte.config.init.JsonDefinitionsHelper.addMissingPublicField;
import static io.airbyte.config.init.JsonDefinitionsHelper.addMissingTombstoneField;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.io.Resources;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.commons.yaml.Yamls;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.ConfigNotFoundException;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
/**
* This provider contains all definitions according to the local yaml files.
*/
final public class LocalDefinitionsProvider implements DefinitionsProvider {
public static final Class<?> DEFAULT_SEED_DEFINITION_RESOURCE_CLASS = SeedType.class;
private final static String PROTOCOL_VERSION = "protocol_version";
private final static String SPEC = "spec";
private Map<UUID, StandardSourceDefinition> sourceDefinitions;
private Map<UUID, StandardDestinationDefinition> destinationDefinitions;
// TODO inject via dependency injection framework
private final Class<?> seedResourceClass;
public LocalDefinitionsProvider(final Class<?> seedResourceClass) throws IOException {
this.seedResourceClass = seedResourceClass;
// TODO remove this call once dependency injection framework manages object creation
initialize();
}
// TODO will be called automatically by the dependency injection framework on object creation
public void initialize() throws IOException {
this.sourceDefinitions =
parseDefinitions(this.seedResourceClass, SeedType.STANDARD_SOURCE_DEFINITION.getResourcePath(), SeedType.SOURCE_SPEC.getResourcePath(),
SeedType.STANDARD_SOURCE_DEFINITION.getIdName(), SeedType.SOURCE_SPEC.getIdName(), StandardSourceDefinition.class);
this.destinationDefinitions = parseDefinitions(this.seedResourceClass, SeedType.STANDARD_DESTINATION_DEFINITION.getResourcePath(),
SeedType.DESTINATION_SPEC.getResourcePath(), SeedType.STANDARD_DESTINATION_DEFINITION.getIdName(), SeedType.DESTINATION_SPEC.getIdName(),
StandardDestinationDefinition.class);
}
@Override
public StandardSourceDefinition getSourceDefinition(final UUID definitionId) throws ConfigNotFoundException {
final StandardSourceDefinition definition = this.sourceDefinitions.get(definitionId);
if (definition == null) {
throw new ConfigNotFoundException(SeedType.STANDARD_SOURCE_DEFINITION.name(), definitionId.toString());
}
return definition;
}
@Override
public List<StandardSourceDefinition> getSourceDefinitions() {
return new ArrayList<>(this.sourceDefinitions.values());
}
@Override
public StandardDestinationDefinition getDestinationDefinition(final UUID definitionId) throws ConfigNotFoundException {
final StandardDestinationDefinition definition = this.destinationDefinitions.get(definitionId);
if (definition == null) {
throw new ConfigNotFoundException(SeedType.STANDARD_DESTINATION_DEFINITION.name(), definitionId.toString());
}
return definition;
}
@Override
public List<StandardDestinationDefinition> getDestinationDefinitions() {
return new ArrayList<>(this.destinationDefinitions.values());
}
@SuppressWarnings("UnstableApiUsage")
private static <T> Map<UUID, T> parseDefinitions(final Class<?> seedDefinitionsResourceClass,
final String definitionsYamlPath,
final String specYamlPath,
final String definitionIdField,
final String specIdField,
final Class<T> definitionModel)
throws IOException {
final Map<String, JsonNode> rawDefinitions = getJsonElements(seedDefinitionsResourceClass, definitionsYamlPath, definitionIdField);
final Map<String, JsonNode> rawSpecs = getJsonElements(seedDefinitionsResourceClass, specYamlPath, specIdField);
return rawDefinitions.entrySet().stream()
.collect(Collectors.toMap(e -> UUID.fromString(e.getKey()), e -> {
final JsonNode withMissingFields = addMissingFields(e.getValue());
final ObjectNode withSpec = (ObjectNode) mergeSpecIntoDefinition(withMissingFields, rawSpecs);
final String protocolVersion =
withSpec.has(SPEC) && withSpec.get(SPEC).has(PROTOCOL_VERSION) ? withSpec.get(SPEC).get(PROTOCOL_VERSION).asText() : null;
withSpec.put("protocolVersion", AirbyteProtocolVersion.getWithDefault(protocolVersion).serialize());
return Jsons.object(withSpec, definitionModel);
}));
}
private static Map<String, JsonNode> getJsonElements(final Class<?> seedDefinitionsResourceClass, final String resourcePath, final String idName)
throws IOException {
final URL url = Resources.getResource(seedDefinitionsResourceClass, resourcePath);
final String yamlString = Resources.toString(url, StandardCharsets.UTF_8);
final JsonNode configList = Yamls.deserialize(yamlString);
return MoreIterators.toList(configList.elements()).stream().collect(Collectors.toMap(
json -> json.get(idName).asText(),
json -> json));
}
/**
* Merges the corresponding spec JSON into the definition JSON. This is necessary because specs are
* stored in a separate resource file from definitions.
*
* @param definitionJson JSON of connector definition that is missing a spec
* @param specConfigs map of docker image to JSON of docker image/connector spec pair
* @return JSON of connector definition including the connector spec
*/
private static JsonNode mergeSpecIntoDefinition(final JsonNode definitionJson, final Map<String, JsonNode> specConfigs) {
final String dockerImage = definitionJson.get("dockerRepository").asText() + ":" + definitionJson.get("dockerImageTag").asText();
final JsonNode specConfigJson = specConfigs.get(dockerImage);
if (specConfigJson == null || specConfigJson.get(SPEC) == null) {
throw new UnsupportedOperationException(String.format("There is no seed spec for docker image %s", dockerImage));
}
((ObjectNode) definitionJson).set(SPEC, specConfigJson.get(SPEC));
return definitionJson;
}
private static JsonNode addMissingFields(final JsonNode element) {
return addMissingPublicField(addMissingCustomField(addMissingTombstoneField(element)));
}
}