Skip to content

Commit

Permalink
changes for v0.8.0-ALPHA
Browse files Browse the repository at this point in the history
SSL support for ES
better PORT handling for ES
correct basicAuth for ES
  • Loading branch information
apatrida committed Apr 12, 2017
1 parent 7a31f1c commit 6b0618b
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 18 deletions.
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,47 @@ The Data Import Handler also defines some UDF functions for use within SQL:
|unescapeHtmlEntites(string)|Unescapes HTML entities found in the text|`0.6.0-ALPHA`|
|fluffly(string)|A silly function that prepends the word "fluffly" to the text, used as a *test* function to mark values as being changed by processing|`0.6.0-ALPHA`|

### Auth and HTTPS for Elasticsearch

(_Since version `0.8.0-ALPHA`_)

For basic AUTH with Elasticsearch you can add the following to the source or target Elasticsearch definitions:

```hocon
"basicAuth": {
"username": "elastic",
"password": "changeme"
}
```

And for SSL, enable it within the Elasticsearch definition as well:

```hocon
"enableSsl": true
```

Here is a full example, when connection to Elastic Cloud instance:

```hocon
"targetElasticsearch": {
"nodes": [
"123myinstance456.us-east-1.aws.found.io"
],
"basicAuth": {
"username": "elastic",
"password": "changeme"
},
"port": 9243,
"enableSsl": true,
"settings": {
"es.index.auto.create": true,
"es.nodes.wan.only": true
}
}
```

Note the use of the `es.nodes.wan.only` setting to use the external host names for the cluster members, and not internal AWS addresses.

### State Management and History:

State for the `lastRun` value is per-statement and stored in the target Elasticsearch cluster for that statement. An index
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
group = uy.kohesive.elasticsearch
version = 0.7.0-ALPHA
version = 0.8.0-ALPHA

version_gradle=3.4.1
version_kotlin=1.1.0
Expand Down
25 changes: 21 additions & 4 deletions src/main/kotlin/uy/kohesive/elasticsearch/dataimport/App.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class App {
}

try {
App().run(configFile!!.inputStream(), configFile)
App().run(configFile!!.inputStream(), configFile.parentFile)
} catch (ex: Throwable) {
System.err.println("Data import failed due to:")
System.err.println(ex.message)
Expand All @@ -64,7 +64,8 @@ class App {

println("Connecting to target ES clusters to check state...")
val stateMap: Map<String, StateManager> = cfg.importSteps.map { importStep ->
val mgr = ElasticSearchStateManager(importStep.targetElasticsearch.nodes, importStep.targetElasticsearch.basicAuth)
val mgr = ElasticSearchStateManager(importStep.targetElasticsearch.nodes, importStep.targetElasticsearch.port ?: 9200,
importStep.targetElasticsearch.enableSsl ?: false, importStep.targetElasticsearch.basicAuth)
mgr.init()
importStep.statements.map { statement ->
statement.id to mgr
Expand Down Expand Up @@ -176,6 +177,13 @@ class App {
options.put("es.net.http.auth.user", it.username)
options.put("es.net.http.auth.pass", it.password)
}
es.port?.let {
options.put("es.port", it.toString())
}
es.enableSsl?.let {
options.put("es.net.ssl", it.toString())
}

es.settings?.let { options.putAll(it) }
table.settings?.let { options.putAll(it) }

Expand Down Expand Up @@ -238,7 +246,7 @@ class App {
println("\n Execute statement: ($dateMsg)\n${statement.description.replaceIndent(" ")}")

if (!stateMgr.lockStatement(uniqueId, statement)) {
System.err.println(" Cannot aquire lock for statement ${statement.id}")
System.err.println(" Cannot acquire lock for statement ${statement.id}")
} else {
try {
// look for table create setting
Expand All @@ -248,11 +256,20 @@ class App {
options.put("es.net.http.auth.user", it.username)
options.put("es.net.http.auth.pass", it.password)
}
import.targetElasticsearch.port?.let { port ->
options.put("es.port", port.toString())
}
import.targetElasticsearch.enableSsl?.let { enableSsl ->
options.put("es.net.ssl", enableSsl.toString())
}
import.targetElasticsearch.settings?.let { options.putAll(it) }
statement.settings?.let { options.putAll(it) }

val autocreate: Boolean = options.getOrDefault("es.index.auto.create", "true").toBoolean()
val esClient = MicroEsClient(import.targetElasticsearch.nodes, import.targetElasticsearch.basicAuth)
val esClient = MicroEsClient(import.targetElasticsearch.nodes,
import.targetElasticsearch.port ?: 9200,
import.targetElasticsearch.enableSsl ?: false,
import.targetElasticsearch.basicAuth)
val indexExists = esClient.checkIndexExists(statement.indexName)
if (!autocreate) {
if (!indexExists) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ data class Connections(val elasticsearch: List<EsConnection>?,

data class EsConnection(val nodes: List<String>,
val basicAuth: AuthInfo? = null,
val port: Int? = 9200,
val enableSsl: Boolean? = false,
val tables: List<EsSource>,
val settings: Map<String, String>? = null) {
}
Expand Down Expand Up @@ -45,6 +47,8 @@ data class PrepStatement(val description: String, val sqlQuery: String)
data class Importer(val description: String, val targetElasticsearch: EsTargetConnection, val statements: List<EsImportStatement>)
data class EsTargetConnection(val nodes: List<String>,
val basicAuth: AuthInfo? = null,
val port: Int? = 9200,
val enableSsl: Boolean? = false,
val settings: Map<String, String>? = null) {

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
package uy.kohesive.elasticsearch.dataimport

import com.fasterxml.jackson.module.kotlin.readValue
import okhttp3.MediaType
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.RequestBody
import okhttp3.*
import okhttp3.internal.http.HttpHeaders

/**
* TODO: Change to use RestClient from ES / Spark integration
*/
class MicroEsClient(nodes: List<String>, auth: AuthInfo?) {
val http = OkHttpClient()
val url = if (auth != null) "http://${auth.username}:${auth.password}@${nodes.first()}" else "http://${nodes.first()}"

class MicroEsClient(nodes: List<String>, port: Int = 9200, enableSsl: Boolean = false, auth: AuthInfo? = null) {
val http = OkHttpClient().newBuilder().apply {
auth?.let { addInterceptor(BasicAuthInterceptor(it)) }
}.build()
val protocol = if (enableSsl) "https" else "http"
val host = nodes.first()
val hostWithPort = if (':' in host.substringAfter('@', host)) host else "${host}:${port}"
val url = "${protocol}://${hostWithPort}"

fun String.fixRestAppendage(): String {
if (this.startsWith("?")) return this
Expand Down Expand Up @@ -112,4 +116,12 @@ class MicroEsClient(nodes: List<String>, auth: AuthInfo?) {
data class CallResponse(val code: Int, val responseJson: String) {
val isSuccess: Boolean get() = code in 200..299
}
}

class BasicAuthInterceptor(val authInfo: AuthInfo) : Interceptor {
override fun intercept(chain: Interceptor.Chain): Response {
val request = chain.request()
val requestWithAuth = request.newBuilder().header("Authorization", Credentials.basic(authInfo.username, authInfo.password)).build()
return chain.proceed(requestWithAuth)
}
}
4 changes: 2 additions & 2 deletions src/main/kotlin/uy/kohesive/elasticsearch/dataimport/State.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ private fun EsImportStatement.stateKey(): String = this.indexName + "-" + this.i

// TODO: better state management
// This is NOT using the ES client because we do not want conflicts with Spark dependencies
class ElasticSearchStateManager(val nodes: List<String>, val auth: AuthInfo?) : StateManager {
class ElasticSearchStateManager(val nodes: List<String>, val port: Int = 9200, val enableSsl: Boolean = false, val auth: AuthInfo?) : StateManager {
val STATE_INDEX = ".kohesive-dih-state-v2"
val esClient = MicroEsClient(nodes, auth)
val esClient = MicroEsClient(nodes, port, enableSsl, auth)

override fun init() {
if (esClient.checkIndexExists(STATE_INDEX)) {
Expand Down
34 changes: 30 additions & 4 deletions src/test/resources/manual-mappings.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
"type": "nGram",
"min_gram": "3",
"max_gram": "15"
},
"haystack_edge_ngram_filter": {
"type": "edge_ngram",
"min_gram": "3",
"max_gram": "15"
}
},
"analyzer": {
Expand Down Expand Up @@ -163,7 +168,15 @@
"icu_normalizer",
"haystack_ngram_filter"
],
"tokenizer": "standard"
"tokenizer": "icu_tokenizer"
},
"edge_ngram_analysis": {
"type": "custom",
"filter": [
"icu_normalizer",
"haystack_edge_ngram_filter"
],
"tokenizer": "icu_tokenizer"
}
}
},
Expand Down Expand Up @@ -223,9 +236,6 @@
"id": {
"type": "keyword"
},
"type": {
"type": "keyword"
},
"offer": {
"type": "boolean"
},
Expand Down Expand Up @@ -268,6 +278,10 @@
"ngram": {
"type": "text",
"analyzer": "ngram_analysis"
},
"engram": {
"type": "text",
"analyzer": "edge_ngram_analysis"
}
}
},
Expand Down Expand Up @@ -315,6 +329,10 @@
"ngram": {
"type": "text",
"analyzer": "ngram_analysis"
},
"engram": {
"type": "text",
"analyzer": "edge_ngram_analysis"
}
},
"term_vector": "with_positions_offsets"
Expand Down Expand Up @@ -363,6 +381,10 @@
"ngram": {
"type": "text",
"analyzer": "ngram_analysis"
},
"engram": {
"type": "text",
"analyzer": "edge_ngram_analysis"
}
},
"term_vector": "with_positions_offsets"
Expand Down Expand Up @@ -410,6 +432,10 @@
"ngram": {
"type": "text",
"analyzer": "ngram_analysis"
},
"engram": {
"type": "text",
"analyzer": "edge_ngram_analysis"
}
},
"term_vector": "with_positions_offsets"
Expand Down

0 comments on commit 6b0618b

Please sign in to comment.