Elastic4s is mostly a wrapper around the standard Elasticsearch Java client with the intention of creating a concise, idiomatic, reactive, type safe DSL for applications in Scala that use Elasticsearch. The Java client, which can of course be used directly in Scala, is more verbose due to Java's nature. Scala lets us do better.
Elastic4s's DSL allows you to to construct your requests programatically, with syntatic and semantic errors manifested at compile time, and uses standard Scala futures to enable you to easily integrate into your existing asynchronous workflow. The aim of the DSL is that requests are written in an SQL-like way, while staying true to the Java API or Rest API.
Elastic4s supports Scala collections so you don't have to do tedious conversions from your Scala domain classes into Java collections. It also allows you to index documents directly without having to extract and set fields manually - eg from a case class, a JSON document, or a Map (or a custom source). Due to its type safe nature, it is easy to see what operations are available for any request type, because your IDE can use type information to show what methods are available.
- Type safe concise DSL
- Integrates with standard Scala futures
- Uses Scala collections library
- Leverages the built-in Java client
- Provides reactive-streams implementation
The latest release is 2.2.0 which is compatible with Elasticsearch 2.2.x. There are releases for both Scala 2.10 and Scala 2.11. For releases that are compatible with earlier versions of Elasticsearch, search maven central. For more information read Using Elastic4s in your project.
Elastic4s Release | Target Elasticsearch version |
---|---|
2.2.0 | 2.2.X |
2.1.2 | 2.1.X |
2.0.1 | 2.0.X |
1.7.5 | 1.7.X |
1.6.6 | 1.6.X |
1.5.17 | 1.5.X |
1.4.14 | 1.4.x |
1.3.3 | 1.3.x |
1.2.3.0 | 1.2.x |
1.1.2.0 | 1.1.x |
1.0.3.0 | 1.0.x |
0.90.13.2 | 0.90.x |
- #484 Fixed bug in es-streams throwing ClassCastException
- #483 Added overloaded doc method to update to accept indexables
- Optimize was renamed to ForceMerge. The existing optimize method are deprecated and
forceMerge(indexes*)
has been added in its place. - #395 Added pipeline aggregation definitions
- #458 Added parameters to clear cache operation
- #475 Fixed breaking change in terms query
rewrite
was removed from Elasticsearch's matchXXX queries so has been removed in the dsl- Added GeoCentroid aggregation
- Added
terminateAfter
to search definition - SearchType.SCAN is now deprecated in Elasticsearch
count
is deprecated in Elasticsearch and should be replaced with a search with size 0
- #473 Added missing "filter" clause from bool query
- #475 Fixed breaking change in terms query
- #474 Added "item" likes to more like this query
Major upgrade to Elasticsearch 2.0.0 including breaking changes. Please raise a PR if I've missed any breaking changes.
- In elasticsearch 2.0.0 one of the major changes has been filters have become queries. So in elastic4s this means all methods
xxxFilter
are nowxxxQuery
, eghasChildrenFilter
is nowhasChildrenQuery
. - Some options that existed only on filters like cache and cache key are now removed.
- Fuzzy like this query has been removed (this was removed in elasticsearch itself)
- Script dsl has changed. To create a script to pass into a method, you use
script(script)
orscript(name, script)
with further parameters set using the builder pattern. - DynamicTemplate dsl
template name <name>
has been removed. Now you supply the full field definition in the dsl method, as suchtemplate(field("price_*", DoubleType))
- Index_analyzer has been removed in elasticsearch. Use analyzer and then override the analyzer for search with search_analyzer
- MoreLikeThis was removed from elasticsearch in favour of a
moreLikeThisQuery
on a search request. moreLikeThisQuery
has changed camel case (capital L), also now requires the 'like' text as the 2nd method, egmoreLikeThisQuery("field").text("a")
(both can take varargs as well).- Search requests now return a richer response type. Previously it returned the java type. The richer type has java style methods so your code will continue to compile, but with deprecation warnings.
- The sorting DSL has changed in that the previous infix style methods are deprecated. So
field sort x
becomesfieldSort(x)
etc. - Or and And filters have been removed completely (not changed into queries like other filters). Use a bool query with
must
clauses for and's andshould
clauses for or's. - Highlight dsl has changed slightly,
highlight field x
is now deprecated in favour ofhighlight(x)
- Delete mapping has been removed (this is removed in elasticsearch itself)
- IndexStatus api has been removed (this was removed in elasticsearch itself)
- Template has been renamed dynamic template (to better match the terminology in elasticsesarch)
- Field and mapping syntax has changed slightly. The implicit
"fieldname" as StringType ...
has been deprecated in favour offield("fieldname", StringType)
orstringField()
,longField
, etc - In es-streams the ResponseListener has changed to accept a
BulkItemResult
instead of aBulkItemResponse
- Multiget now returns a rich scala wrapper in the form of
MultiGetResult
. The richer type has java style methods so your code will continue to compile, but with deprecation warnings. - GetSegments returns a scala wrapper in the form of
GetSegmentsResult
- IndexStats returns a scala wrapper in the form
IndexStatsResult
- Works with Elasticsearch 1.7.x
- Removed sync client (deprecated since 1.3.0)
- Fix for race condition in elastic-streams subscriber
- Added sourceAsUpsert to allow
Indexable
as upsert in update queries - Added geohash aggregation
- Added geohash cell filter
- Added cluster state api
- Added support for unmapped_type property
- Added block until index/type exists testkit helpers
- Added raw query to count dsl
- Added
show
typeclass for count - Added
InFilter
andIndicesFilter
- Added shorter syntax for field types, eg
stringField(name)
vsfield name <name> typed StringType
- Added reactive streams implementation for elastic4s.
- Support explicit field types in the update dsl
- Added missing options to restore snapshot dsl
- Added
show
typeclass for percolate register
- Added clear scroll api
- Added missing options to restore snapshot dsl
- Added clear scroll api
- Added
show
typeclass for multisearch - Allow update dsl to use explicit field values
- Added
HitAs
as a replacement for theReader
typeclass - Added indices option to mapping, count and search dsl
- Added docValuesFormat to timestamp mapping
- Added new methods to testkit
- Introduced simplier syntax for sorts
- Added
HitAs
as a replacement for theReader
typeclass - Fixed validate query for block queries
- Added
show
typeclasses for search, create index, into into, validate, count, and percolate to allow easy debugging of the json of requests.
- Added
matched_fields
and highlight filter to highlighter
- Added IterableSearch for iterating over a scroll
- Enhanced multiget dsl to include
routing
,version
andfield
options - Added rich result for GetAliasResponse
- Added context queries to suggestions
- Breaking change: Changed syntax of suggestions to be clearer and allow for type safe results
- Allow setting analyzer by name on matchphraseprefix
- Added singleMethodSyntax variant, eg
indexInto(index)
rather thanindex into index
- Added re-write to validate
- Added filter support to alias (previously only the java client filters were supported)
- Added cluster settings api
- Added field stats api
- Addd
docValuesFormat
to timestamp mapping - Added
matched_fields
and highlight filter to highlighter - Supported
stopwords_list
in filter - Reworked testkit to allow more configuration over the creating of the test clients
Starting from version 1.5.13 the main artifact has been renamed to elastic4s-core_2.x. Please update your build scripts. There is now an elastic4s-testkit_2.x which brings in a couple of useful methods for waiting until the node/cluster is in some expected state. Very useful when trying unit tests.
If you previously used the Jackson support for DocumentSource or Indexables then you need to add a new dependency elastic4s-jackson_2.x. This will allow you to do import ElasticJackson.Implicits._
which puts a Jackson based Indexable
into scope, which allows any class to be indexed automagically without the need to manually create maps or json objects. Similarly, if you are using response.hitsAs[T]
, then the same import brings in a Reader
that will convert any type to a case class.
The basic usage of the Scala driver is that you create an instance of ElasticClient
and then invoke the various execute
methods with the requests you want to perform. The execute methods are asynchronous and will return a standard Scala Future[T]
where T is the response type appropriate for your request type. For example a search request will return a response of type SearchResponse
which contains the results of the search.
Requests, such as inserting a document, searching, creating an index, etc, are created using the DSL syntax that is similar in style to SQL queries. For example to create a search request, you would do: search in "index/type" query "findthistext"
The response objects are, for the most part, the exact same type the Java API returns. This is because there is mostly no reason to wrap these as they are fairly easy to use in Scala.
All the DSL keywords are located in the ElasticDsl
trait which needs to be imported or extended.
An example is worth 1000 characters so here is a quick example of how to create a local node with a client and index a one field document. Then we will search for that document using a simple text query.
import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.ElasticDsl._
object Test extends App {
val client = ElasticClient.local
// await is a helper method to make this operation synchronous instead of async
// You would normally avoid doing this in a real program as it will block your thread
client.execute { index into "bands" / "artists" fields "name"->"coldplay" }.await
// we need to wait until the index operation has been flushed by the server.
// this is an important point - when the index future completes, that doesn't mean that the doc
// is necessarily searchable. It simply means the server has processed your request and the doc is
// queued to be flushed to the indexes. Elasticsearch is eventually consistent.
// For this demo, we'll simply wait for 2 seconds (default refresh interval is 1 second).
Thread.sleep(2000)
// now we can search for the document we indexed earlier
val resp = client.execute { search in "bands" / "artists" query "coldplay" }.await
println(resp)
}
For more in depth examples keep reading.
Here is a list of the common requests and the syntax used to create them. For more details on each request click through to the readme page. For options that are not yet documented, refer to the Elasticsearch documentation as the DSL closely mirrors the standard Java API / REST API.
Operation | Syntax |
---|---|
Add Alias | add alias "<alias>" on "<index>" |
Clear index cache | clear cache <name> |
Close index | closeIndex(<name>) |
Count | count from <indexes> types <types> <queryblock> |
Cluster health | get cluster health |
Cluster stats | get cluster stats |
Create Index | createIndex(<name>) mappings { mappings block> } [settings] |
Create Repository | createRepository(<repo>) type(<type>) settings <settings> |
Create Snapshot | createSnapshot(<name>) in <repo> ... |
Create Template | createTemplate(<name>) pattern <pattern> mappings {...} [settings] |
Delete by id | delete id <id> from <index/type> [settings] |
Delete by query | delete from <index/type> query { <queryblock> } [settings] |
Delete index | deleteIndex(<index>) [settings] |
Delete Snapshot | deleteSnapshot(<name>).in(<repo>) ... |
Delete Template | deleteTemplate(<name>) |
Explain | explain id <id> in <index/type> query { <queryblock> } |
Field stats | field stats <indexes> |
Flush Index | flush index <name> |
Get | get id <id> from <index/type> [settings] |
Get Alias | getAlias(<name>).on(<index>) |
Get Mapping | getMapping(<index> / <type>) |
Get Segments | getSegments(<indexes>) |
Get Snapshot | getSnapshot <name> from <repo> |
Get Template | getTemplate(<name>) |
Index | index into <index/type> fields { <fieldblock> } [settings] |
Index exists | indexExists(<name>) |
Index Status | indexStatus(<index>) |
More like this | morelike id <id> in <index/type> { fields <fieldsblock> } [settings] |
Multiget | multiget ( get id 1 from index, get id 2 from index, ... ) |
Multisearch | multi ( search ..., search ..., ...) |
Open index | openIndex(<name>) |
Force Merge | forceMerge(<indexes>*) [settings] |
Percolate Doc | percolateIn(<index>) doc <fieldsblock> |
Put mapping | putMapping(<index> / <type>) add { mappings block } |
Recover Index | recoverIndex(<name>) |
Refresh index | refreshIndex(<name>) |
Register Query | register id <id> into <index> query { <queryblock> } |
Remove Alias | removeAlias(<alias>).on(<index>) |
Restore Snapshot | restore snapshot <name> from <repo> ... |
Search | search in <index/type> query ... postFilter ... sort ... |
Search scroll | searchScroll(<scrollId>) |
Type Exists | typesExists(<types>) in <index> |
Update | update id <id> in <index/type> script <script> [settings] |
Validate | validateIn(<index/type>) query <queryblock> |
Please also note some java interoperability notes.
A locally configured node and client can be created simply by invoking local
on the ElasticClient
object:
import com.sksamuel.elastic4s.ElasticClient
val client = ElasticClient.local
To specify settings for the local node you can pass in a settings object like this:
val settings = ImmutableSettings.settingsBuilder()
.put("http.enabled", false)
.put("path.home", "/var/elastic/")
val client = ElasticClient.local(settings.build)
To connect to a remote elastic cluster then you need to use the remote() call specifying the hostnames and ports. Please note that this is the port for the TCP interface (normally 9300) and NOT the port you connect with when using HTTP (normally 9200).
// single node
val client = ElasticClient.remote("host1", 9300)
For multiple nodes it's better to use the elasticsearch client uri connection string. This is in the format "elasticsearch://host:port,host:port,..."
(Note, no parameters can be added). For example:
val uri = ElasticsearchClientUri("elasticsearch://foo:1234,boo:9876")
val client = ElasticClient.remote(uri)
If you need to pass settings to the client, then you need to invoke remote() with a settings object. For example to specify the cluster name (if you changed the default then you must specify the cluster name).
import org.elasticsearch.common.settings.ImmutableSettings
val settings = ImmutableSettings.settingsBuilder().put("cluster.name", "myClusterName").build()
val client = ElasticClient.remote(settings, ElasticsearchClientUri("elasticsearch://somehost:9300"))
If you already have a handle to a Node in the Java API then you can create a client from it easily:
val node = ... // node from the java API somewhere
val client = ElasticClient.fromNode(node)
All documents in Elasticsearch are stored in an index. We do not need to tell Elasticsearch in advance what an index will look like (eg what fields it will contain) as Elasticsearch will adapt the index dynamically as more documents are added, but we must at least create the index first.
To create an index called "places" that is fully dynamic we can simply use:
client.execute { create index "places" }
We can optionally set the number of shards and / or replicas
client.execute { create index "places" shards 3 replicas 2 }
Sometimes we want to specify the properties of the fields in the index in advance. This allows us to manually set the type of the field (where Elasticsearch might infer something else) or set the analyzer used, or multiple other options
To do this we add mappings:
import com.sksamuel.elastic4s.mappings.FieldType._
import com.sksamuel.elastic4s.StopAnalyzer
client.execute {
create index "places" mappings (
"cities" as (
"id" typed IntegerType,
"name" typed StringType boost 4,
"content" typed StringType analyzer StopAnalyzer
)
)
}
Then Elasticsearch is configured with those mappings for those fields only. It is still fully dynamic and other fields will be created as needed with default options. Only the fields specified will be "fixed".
More examples on the create index syntax can be found here.
Analyzers control how Elasticsearch parses the fields for indexing. For example, you might decide that you want whitespace to be important, so that "band of brothers" is indexed as a single "word" rather than the default which is to split on whitespace. There are many advanced options available in analayzers. Elasticsearch also allows us to create custom analyzers. For more details read about the DSL support for analyzers.
To index a document we need to specify the index and type and optionally we can set an id. If we don't include an id then elasticsearch will generate one for us. We must also include at least one field. Fields are specified as standard tuples.
client.execute {
index into "places" / "cities" id "uk" fields (
"name" -> "London",
"country" -> "United Kingdom",
"continent" -> "Europe",
"status" -> "Awesome"
)
}
There are many additional options we can set such as routing, version, parent, timestamp and op type. See official documentation for additional options, all of which exist in the DSL as keywords that reflect their name in the official API.
Sometimes it is useful to index directly from your domain model, and not have to create maps of fields inline. For this
elastic4s provides the Indexable
typeclass. Simply provide an implicit instance of Indexable[T]
in scope for any
class T that you wish to index, and then you can use source t
on the index request. For example:
// a simple example of a domain model
case class Character(name: String, location: String)
// how you turn the type into json is up to you
implicit object CharacterIndexable extends Indexable[Character] {
override def json(t: Character): String = s""" { "name" : "${t.name}", "location" : "${t.location}" } """
}
// now the index request reads much cleaner
val jonsnow = Character("jon snow", "the wall")
client.execute {
index into "gameofthrones" / "characters" source jonsnow
}
Some people prefer to write typeclasses manually for the types they need to support. Other people like to just have
it done automagically. For those people, elastic4s provides a Jackson
based implementation of Indexable[Any]
that will convert anything to Json.
To use this, you need to add the jackson extension to the
build.
The next step is to import the implicit into scope with import ElasticJackson.Implicits._
where ever you
want to use the source
method. With that implicit in scope, you can now pass any type you like to source
and Jackson will marshall it to json for you.
Another way that existed prior to the Indexable
typeclass was the DocumentSource
or DocumentMap
abstractions.
For these, you provide an instance of DocumentSource
that returns a Json String, or an instance of DocumentMap
that provides a Map[String, Any]
.
case class Character(name: String, location: String)
case class CharacterSource(c: Character) extends DocumentSource {
def json : String = s""" { "name" : "${c.name}", "location" : "${c.location}" } """
}
val jonsnow = Character("jon snow", "the wall")
client.execute {
index into "music" / "bands" doc CharacterSource(jonsnow)
}
There isn't much difference, but the typeclass approach (the former) is considered more idomatic scala. More details on the document traits page.
Beautiful!
Searching is naturally the most involved operation. There are many ways to do searching in elastic search and that is reflected in the higher complexity of the query DSL.
To do a simple text search, where the query is parsed from a single string
search in "places" / "cities" query "London"
That is actually an example of a SimpleStringQueryDefinition. The string is implicitly converted to that type of query. It is the same as specifying the query type directly:
search in "places" / "cities" query simpleStringQuery("London")
The simple string example is the only time we don't need to specify the query type. We can search for everything by not specifying a query at all.
search in "places" / "cities"
We might want to limit the number of results and / or set the offset.
search in "places" / "cities" query "paris" start 5 limit 10
We can search against certain fields only:
search in "places" / "cities" query termQuery("country", "France")
Or by a prefix:
search in "places" / "cities" query prefixQuery("country", "France")
Or by a regular expression (slow, but handy sometimes!):
search in "places" / "cities" query regexQuery("country", "France")
There are many other types, such as range for numeric fields, wildcards, distance, geo shapes, matching.
Read more about search syntax here. Read about multisearch here. Read about suggestions here.
By default Elasticsearch search responses contain an array of SearchHit
instances which contain things like the id,
index, type, version, etc as well as the document source as a string or map. Elastic4s provides a means to convert these
back to meaningful domain types quite easily using the HitAs[T]
typeclass. Provide an implementation of this typeclass, as
an in scope implicit, for whatever type you wish to marshall search responses into, and then you can call as[T]
on the response.
A full example:
case class Character(name: String, location: String)
implicit object CharacterHitAs extends HitAs[Character] {
override def as(hit: RichSearchHit): Character = {
Character(hit.sourceAsMap("name").toString, hit.sourceAsMap("location").toString)
}
}
val resp = client.execute {
search in "gameofthrones" / "characters" query "kings landing"
}.await // don't block in real code
// .as[Character] will look for an implicit HitAs[Character] in scope
// and then convert all the hits into Characters for us.
val characters :Seq[Character] = resp.as[Character]
This is basically the inverse of the Indexable
typeclass. And just like Indexable, there is a general purpose
Jackson HitAs[Any]
implementation for those who wish to have some sugar.
To use this, you need to add the jackson extension to the build.
The next step is to import the implicit into scope with import ElasticJackson.Implicits._
where ever you
want to use the as[T]
methods.
As a bonus feature of the Jackson implementation, if your domain object has fields called _timestamp
, _id
, _type
, _index
, or
_version
then those special fields will be automatically populated as well.
Elasticsearch can annotate results to show which part of the results matched the queries by using highlighting. Just think when you're in google and you see the snippets underneath your results - that's what highlighting does.
We can use this very easily, just add a highlighting definition to your search request, where you set the field or fields to be highlighted. Viz:
search in "music" / "bios" query "kate bush" highlighting (
highlight field "body" fragmentSize 20
)
All very straightforward. There are many options you can use to tweak the results. In the example above I have simply set the snippets to be taken from the field called "body" and to have max length 20. You can set the number of fragments to return, seperate queries to generate them and other things. See the elasticsearch page on highlighting for more info.
Sometimes we don't want to search and want to retrieve a document directly from the index by id. In this example we are retrieving the document with id 'coldplay' from the bands/rock index and type.
client.execute {
get id "coldplay" from "bands" / "rock"
}
We can get multiple documents at once too. Notice the following multiget wrapping block.
client.execute {
multiget(
get id "coldplay" from "bands/rock",
get id "keane" from "bands/rock"
)
}
See more get examples and usage of multiget here
In the rare case that we become tired of a band we might want to remove them. Naturally we wouldn't want to remove Chris Martin and boys so we're going to remove U2 instead. We think they're a little past their best (controversial). This operation assumes the id of the document is "u2".
client.execute {
delete id "u2" from "bands/rock"
}
We can take this a step further by deleting by a query rather than id. In this sense the delete is very similar to an SQL delete statement. In this example we're deleting all bands where their type is rap.
client.execute {
delete from index "bands" types "rock" where termQuery("type", "rap")
}
See more about delete on the delete page
We can update existing documents without having to do a full index, by updating a partial set of fields.
client.execute {
update 25 in "scifi/starwars" docAsUpsert (
"character" -> "chewie",
"race" -> "wookie"
)
}
Read more about updates and see more examples.
If you want to return documents that are "similar" to a current document we can do that very easily with the more like this query.
client.execute {
morelike id 4 from "beers/lager" percentTermsToMatch 0.5
}
For all the options see here.
Elasticsearch is fast. Roundtrips are not. Sometimes we want to wrestle every last inch of performance and a useful way
to do this is to batch up requests. Elastic has guessed our wishes and created the bulk API. To do this we simply
wrap index, delete and update requests using the bulk
keyword and pass to the execute
method in the client.
client.execute {
bulk (
index into "bands/rock" fields "name"->"coldplay",
index into "bands/rock" fields "name"->"kings of leon",
index into "bands/pop" fields (
"name"->"elton john",
"best_album"->"tumbleweed connection"
)
)
}
A single HTTP or TCP request is now needed for 4 operations. In addition Elasticsearch can now optimize the requests, by combinging inserts or using aggressive caching.
The example above uses simple documents just for clarity of reading; the usual optional settings can still be used. See more information on the bulk page.
It can be useful to see the json output of requests in case you wish to tinker with the request in a REST client or your browser. It can be much easier to tweak a complicated query when you have the instant feedback of the HTTP interface.
Elastic4s makes it easy to get this json where possible. Simply call .show
on a request to get back a json string. Eg:
val req = search in "music" / "bands" query "coldplay" ...
println(req.show) // would output json
client.execute { req } // now executes that request
Not all requests have a json body. For example get-by-id is modelled purely by http query parameters, there is no json body to output.
And some requests don't convert to json in the Java client so aren't yet supported by the show
typeclass.
Also, for clarity, it should be pointed out that the client doesn't send JSON to the server, it uses a binary protocol. So the provided json
format should be treated as a debugging tool only.
The requests that support .show
are search
, multi
, create index
, index into
, validate
, percolate
, count
.
All operations are normally asynchronous. Sometimes though you might want to block - for example when doing snapshots
or when creating the initial index. You can call .await
on any operation to block until the result is ready.
This is especially useful when testing.
val resp = client.execute { index into "bands/rock" fields ("name"->"coldplay", "debut"->"parachutes") }.await
resp.isInstanceOf[IndexResponse] // true
Helpers provide higher level APIs to work with Elasticsearch.
Use the reindex
helper to reindex data from source index to target index.
client.reindex(
sourceIndex = "sourceIndex",
targetIndex = "targetIndex",
chunkSize = 500,
scroll = "5m")
As it stands the Scala DSL covers all of the common operations - index, create, delete, delete by query, search, validate, percolate, update, explain, get, and bulk operations. There is good support for the various settings for each of these - more so than the Java client provides in the sense that more settings are provided in a type safe manner.
However there are settings and operations (mostly admin / cluster related) that the DSL does not yet cover (pull requests welcome!). In these cases it is necessary to drop back to the Java API. This can be done by calling .java on the client object to get the underlying java elastic client, or .admin to get the admin based client, eg, the following request is a Java API request.
client.admin.cluster.prepareHealth.setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet
This way you can still access everything the normal Java client covers in the cases where the Scala DSL is missing a construct, or where there is no need to provide a DSL.
Elastic4s has an implementation of the reactive streams api for both publishing and subscribing that is built using Akka. To use this, you need to add a dependency on the elastic4s-streams module.
There are two things you can do with the reactive streams implementation. You can create an elastic subscriber, and have that stream data from some publisher into elasticsearch. Or you can create an elastic publisher and have documents streamed out to subscribers.
First you have to add an additional dependeny to your build.sbt
libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-streams" % "1.7.4"
Import the new API with
import com.sksamuel.elastic4s.streams.ReactiveElastic._
An elastic publisher can be created for any arbitrary query you wish, and then using the efficient search scroll API, the entire dataset that matches your query is streamed out to subscribers.
And make sure you have an Akka Actor System in implicit scope
implicit val system = ActorSystem()
Then create a publisher from the client using any query you want. You must specify the scroll parameter, as the publisher uses the scroll API.
val publisher = client.publisher(search in "myindex" query "sometext" scroll "1m")
Now you can add subscribers to this publisher. They can of course be any type that adheres to the reactive-streams api, so you could stream out to a mongo database, or a filesystem, or whatever custom type you want.
publisher.subscribe(someSubscriber)
If you just want to stream out an entire index then you can use the overloaded form:
val publisher = client.publisher("index1", keepAlive = "1m")
An elastic subcriber can be created that will stream a request to elasticsearch for each item produced by a publisher. The subscriber can create index, update, or delete requests, so is a good way to synchronize datasets.
import ReactiveElastic._
And make sure you have an Akka Actor System in implicit scope.
implicit val system = ActorSystem()
Then create a subscriber, specifying the following parameters:
- A type parameter that is the type of object that the publisher will provide
- How many documents should be included per index batch (10-100 is usually good)
- How many concurrent batches should be in flight (usually around the number of cores)
- An optional
ResponseListener
that will be notified for each item that was successfully acknowledged by the es cluster - An optional function that will be called once the subscriber has received all data. Defaults to a no-op
- An optional function to call if the subscriber encouters an error. Defaults to a no-op.
In addition there should be a further implicit in scope of type RequestBuilder[T]
that will accept objects of T (the type produced by your publisher) and build an index, update, or delete request suitable for dispatchin to elasticsearch.
implicit val builder = new RequestBuilder[SomeType] {
import ElasticDsl._
// the request returned doesn't have to be an index - it can be anything supported by the bulk api
def request(t: T): BulkCompatibleDefinition = index into "index" / "type" fields ....
}
Then the subscriber can be created, and attached to a publisher:
val subscriber = client.subscriber[SomeType](batchSize, concurrentBatches, () => println "all done")
publisher.subscribe(subscriber)
For gradle users, add:
compile 'com.sksamuel.elastic4s:elastic4s-core_2.11:2.1.0'
For SBT users simply add:
libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-core" % "2.1.0"
For Maven users simply add (replace 2.11 with 2.10 for Scala 2.10):
<dependency>
<groupId>com.sksamuel.elastic4s</groupId>
<artifactId>elastic4s-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
The above is just an example and is not always up to date. Check the latest released version on maven central
This project is built with SBT. So to build
sbt compile
And to test
sbt test
Integration tests run on a local elastic that is created and torn down as part of the tests inside your standard temp folder. There is no need to configure anything externally.
- Barclays Bank
- HSBC
- Shazaam
- Graphflow
- Hotel Urbano
- Immobilien Scout
- HMRC
Raise a PR to add your company here
Contributions to elastic4s are always welcome. Good ways to contribute include:
- Raising bugs and feature requests
- Fixing bugs and enhancing the DSL
- Improving the performance of elastic4s
- Adding to the documentation
This software is licensed under the Apache 2 license, quoted below.
Copyright 2013-2014 Stephen Samuel
Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under
the License.