diff --git a/README.md b/README.md index a997e7e..2558d66 100644 --- a/README.md +++ b/README.md @@ -379,6 +379,47 @@ The Data Import Handler also defines some UDF functions for use within SQL: |unescapeHtmlEntites(string)|Unescapes HTML entities found in the text|`0.6.0-ALPHA`| |fluffly(string)|A silly function that prepends the word "fluffly" to the text, used as a *test* function to mark values as being changed by processing|`0.6.0-ALPHA`| +### Auth and HTTPS for Elasticsearch + +(_Since version `0.8.0-ALPHA`_) + +For basic AUTH with Elasticsearch you can add the following to the source or target Elasticsearch definitions: + +```hocon + "basicAuth": { + "username": "elastic", + "password": "changeme" + } +``` + +And for SSL, enable it within the Elasticsearch definition as well: + +```hocon + "enableSsl": true +``` + +Here is a full example, when connection to Elastic Cloud instance: + +```hocon +"targetElasticsearch": { + "nodes": [ + "123myinstance456.us-east-1.aws.found.io" + ], + "basicAuth": { + "username": "elastic", + "password": "changeme" + }, + "port": 9243, + "enableSsl": true, + "settings": { + "es.index.auto.create": true, + "es.nodes.wan.only": true + } +} +``` + +Note the use of the `es.nodes.wan.only` setting to use the external host names for the cluster members, and not internal AWS addresses. + ### State Management and History: State for the `lastRun` value is per-statement and stored in the target Elasticsearch cluster for that statement. An index diff --git a/gradle.properties b/gradle.properties index 683f92a..4ea4d0d 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ group = uy.kohesive.elasticsearch -version = 0.7.0-ALPHA +version = 0.8.0-ALPHA version_gradle=3.4.1 version_kotlin=1.1.0 diff --git a/src/main/kotlin/uy/kohesive/elasticsearch/dataimport/App.kt b/src/main/kotlin/uy/kohesive/elasticsearch/dataimport/App.kt index c64caf7..5a4b9cd 100644 --- a/src/main/kotlin/uy/kohesive/elasticsearch/dataimport/App.kt +++ b/src/main/kotlin/uy/kohesive/elasticsearch/dataimport/App.kt @@ -37,7 +37,7 @@ class App { } try { - App().run(configFile!!.inputStream(), configFile) + App().run(configFile!!.inputStream(), configFile.parentFile) } catch (ex: Throwable) { System.err.println("Data import failed due to:") System.err.println(ex.message) @@ -64,7 +64,8 @@ class App { println("Connecting to target ES clusters to check state...") val stateMap: Map = cfg.importSteps.map { importStep -> - val mgr = ElasticSearchStateManager(importStep.targetElasticsearch.nodes, importStep.targetElasticsearch.basicAuth) + val mgr = ElasticSearchStateManager(importStep.targetElasticsearch.nodes, importStep.targetElasticsearch.port ?: 9200, + importStep.targetElasticsearch.enableSsl ?: false, importStep.targetElasticsearch.basicAuth) mgr.init() importStep.statements.map { statement -> statement.id to mgr @@ -176,6 +177,13 @@ class App { options.put("es.net.http.auth.user", it.username) options.put("es.net.http.auth.pass", it.password) } + es.port?.let { + options.put("es.port", it.toString()) + } + es.enableSsl?.let { + options.put("es.net.ssl", it.toString()) + } + es.settings?.let { options.putAll(it) } table.settings?.let { options.putAll(it) } @@ -238,7 +246,7 @@ class App { println("\n Execute statement: ($dateMsg)\n${statement.description.replaceIndent(" ")}") if (!stateMgr.lockStatement(uniqueId, statement)) { - System.err.println(" Cannot aquire lock for statement ${statement.id}") + System.err.println(" Cannot acquire lock for statement ${statement.id}") } else { try { // look for table create setting @@ -248,11 +256,20 @@ class App { options.put("es.net.http.auth.user", it.username) options.put("es.net.http.auth.pass", it.password) } + import.targetElasticsearch.port?.let { port -> + options.put("es.port", port.toString()) + } + import.targetElasticsearch.enableSsl?.let { enableSsl -> + options.put("es.net.ssl", enableSsl.toString()) + } import.targetElasticsearch.settings?.let { options.putAll(it) } statement.settings?.let { options.putAll(it) } val autocreate: Boolean = options.getOrDefault("es.index.auto.create", "true").toBoolean() - val esClient = MicroEsClient(import.targetElasticsearch.nodes, import.targetElasticsearch.basicAuth) + val esClient = MicroEsClient(import.targetElasticsearch.nodes, + import.targetElasticsearch.port ?: 9200, + import.targetElasticsearch.enableSsl ?: false, + import.targetElasticsearch.basicAuth) val indexExists = esClient.checkIndexExists(statement.indexName) if (!autocreate) { if (!indexExists) { diff --git a/src/main/kotlin/uy/kohesive/elasticsearch/dataimport/Config.kt b/src/main/kotlin/uy/kohesive/elasticsearch/dataimport/Config.kt index 1b02a0a..218a1eb 100644 --- a/src/main/kotlin/uy/kohesive/elasticsearch/dataimport/Config.kt +++ b/src/main/kotlin/uy/kohesive/elasticsearch/dataimport/Config.kt @@ -14,6 +14,8 @@ data class Connections(val elasticsearch: List?, data class EsConnection(val nodes: List, val basicAuth: AuthInfo? = null, + val port: Int? = 9200, + val enableSsl: Boolean? = false, val tables: List, val settings: Map? = null) { } @@ -45,6 +47,8 @@ data class PrepStatement(val description: String, val sqlQuery: String) data class Importer(val description: String, val targetElasticsearch: EsTargetConnection, val statements: List) data class EsTargetConnection(val nodes: List, val basicAuth: AuthInfo? = null, + val port: Int? = 9200, + val enableSsl: Boolean? = false, val settings: Map? = null) { } diff --git a/src/main/kotlin/uy/kohesive/elasticsearch/dataimport/MicroEsClient.kt b/src/main/kotlin/uy/kohesive/elasticsearch/dataimport/MicroEsClient.kt index effc4b7..d0999d8 100644 --- a/src/main/kotlin/uy/kohesive/elasticsearch/dataimport/MicroEsClient.kt +++ b/src/main/kotlin/uy/kohesive/elasticsearch/dataimport/MicroEsClient.kt @@ -1,17 +1,21 @@ package uy.kohesive.elasticsearch.dataimport import com.fasterxml.jackson.module.kotlin.readValue -import okhttp3.MediaType -import okhttp3.OkHttpClient -import okhttp3.Request -import okhttp3.RequestBody +import okhttp3.* +import okhttp3.internal.http.HttpHeaders /** * TODO: Change to use RestClient from ES / Spark integration */ -class MicroEsClient(nodes: List, auth: AuthInfo?) { - val http = OkHttpClient() - val url = if (auth != null) "http://${auth.username}:${auth.password}@${nodes.first()}" else "http://${nodes.first()}" + +class MicroEsClient(nodes: List, port: Int = 9200, enableSsl: Boolean = false, auth: AuthInfo? = null) { + val http = OkHttpClient().newBuilder().apply { + auth?.let { addInterceptor(BasicAuthInterceptor(it)) } + }.build() + val protocol = if (enableSsl) "https" else "http" + val host = nodes.first() + val hostWithPort = if (':' in host.substringAfter('@', host)) host else "${host}:${port}" + val url = "${protocol}://${hostWithPort}" fun String.fixRestAppendage(): String { if (this.startsWith("?")) return this @@ -112,4 +116,12 @@ class MicroEsClient(nodes: List, auth: AuthInfo?) { data class CallResponse(val code: Int, val responseJson: String) { val isSuccess: Boolean get() = code in 200..299 } +} + +class BasicAuthInterceptor(val authInfo: AuthInfo) : Interceptor { + override fun intercept(chain: Interceptor.Chain): Response { + val request = chain.request() + val requestWithAuth = request.newBuilder().header("Authorization", Credentials.basic(authInfo.username, authInfo.password)).build() + return chain.proceed(requestWithAuth) + } } \ No newline at end of file diff --git a/src/main/kotlin/uy/kohesive/elasticsearch/dataimport/State.kt b/src/main/kotlin/uy/kohesive/elasticsearch/dataimport/State.kt index d991200..d25b503 100644 --- a/src/main/kotlin/uy/kohesive/elasticsearch/dataimport/State.kt +++ b/src/main/kotlin/uy/kohesive/elasticsearch/dataimport/State.kt @@ -26,9 +26,9 @@ private fun EsImportStatement.stateKey(): String = this.indexName + "-" + this.i // TODO: better state management // This is NOT using the ES client because we do not want conflicts with Spark dependencies -class ElasticSearchStateManager(val nodes: List, val auth: AuthInfo?) : StateManager { +class ElasticSearchStateManager(val nodes: List, val port: Int = 9200, val enableSsl: Boolean = false, val auth: AuthInfo?) : StateManager { val STATE_INDEX = ".kohesive-dih-state-v2" - val esClient = MicroEsClient(nodes, auth) + val esClient = MicroEsClient(nodes, port, enableSsl, auth) override fun init() { if (esClient.checkIndexExists(STATE_INDEX)) { diff --git a/src/test/resources/manual-mappings.json b/src/test/resources/manual-mappings.json index 4a541f4..3186611 100644 --- a/src/test/resources/manual-mappings.json +++ b/src/test/resources/manual-mappings.json @@ -50,6 +50,11 @@ "type": "nGram", "min_gram": "3", "max_gram": "15" + }, + "haystack_edge_ngram_filter": { + "type": "edge_ngram", + "min_gram": "3", + "max_gram": "15" } }, "analyzer": { @@ -163,7 +168,15 @@ "icu_normalizer", "haystack_ngram_filter" ], - "tokenizer": "standard" + "tokenizer": "icu_tokenizer" + }, + "edge_ngram_analysis": { + "type": "custom", + "filter": [ + "icu_normalizer", + "haystack_edge_ngram_filter" + ], + "tokenizer": "icu_tokenizer" } } }, @@ -223,9 +236,6 @@ "id": { "type": "keyword" }, - "type": { - "type": "keyword" - }, "offer": { "type": "boolean" }, @@ -268,6 +278,10 @@ "ngram": { "type": "text", "analyzer": "ngram_analysis" + }, + "engram": { + "type": "text", + "analyzer": "edge_ngram_analysis" } } }, @@ -315,6 +329,10 @@ "ngram": { "type": "text", "analyzer": "ngram_analysis" + }, + "engram": { + "type": "text", + "analyzer": "edge_ngram_analysis" } }, "term_vector": "with_positions_offsets" @@ -363,6 +381,10 @@ "ngram": { "type": "text", "analyzer": "ngram_analysis" + }, + "engram": { + "type": "text", + "analyzer": "edge_ngram_analysis" } }, "term_vector": "with_positions_offsets" @@ -410,6 +432,10 @@ "ngram": { "type": "text", "analyzer": "ngram_analysis" + }, + "engram": { + "type": "text", + "analyzer": "edge_ngram_analysis" } }, "term_vector": "with_positions_offsets"