Is it possible to write to Amazon Elasticsearch Service using elasticsearch-hadoop? #565

Closed
fblundun opened this Issue Oct 7, 2015 · 16 comments

Projects

None yet

6 participants

fblundun commented Oct 7, 2015

I have been trying to write to AWS's new Amazon Elasticsearch Service from a Scalding job using elasticsearch-hadoop (via scalding-taps).

This job has previously worked using Elasticsearch manually installed on an EC2 instance, implying that the problem is specific to Amazon Elasticsearch Service.

The first time I tried I got this error:

Caused by: org.elasticsearch.hadoop.EsHadoopIllegalStateException: Cluster state volatile; cannot find node backing shards - please check whether your cluster is stable
        at org.elasticsearch.hadoop.rest.RestRepository.getWriteTargetPrimaryShards(RestRepository.java:370)
        at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:425)
        at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:393)
        at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.init(EsOutputFormat.java:173)
        at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.write(EsOutputFormat.java:149)
        at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.collect(MapTask.java:844)
        at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:596)
        at cascading.tap.hadoop.util.MeasuredOutputCollector.collect(MeasuredOutputCollector.java:69)
        at org.elasticsearch.hadoop.cascading.EsHadoopScheme.sink(EsHadoopScheme.java:212)
        at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:153)
        ... 21 more

I then tried setting es.nodes.client.only to true and instead got this error:

Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Client-only routing specified but no client nodes with HTTP-enabled were found in the cluster...
        at org.elasticsearch.hadoop.rest.InitializationUtils.filterNonClientNodesIfNeeded(InitializationUtils.java:82)
        at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:373)
        at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.init(EsOutputFormat.java:173)
        at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.write(EsOutputFormat.java:149)
        at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.collect(MapTask.java:844)
        at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:596)
        at cascading.tap.hadoop.util.MeasuredOutputCollector.collect(MeasuredOutputCollector.java:69)
        at org.elasticsearch.hadoop.cascading.EsHadoopScheme.sink(EsHadoopScheme.java:212)
        at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:153)
        ... 21 more

I haven't made any changes to Amazon's default Elasticsearch configuration. Does anybody have any idea how to make this work?

Member
costin commented Oct 7, 2015

Yes you just need to properly configure Elasticsearch (I need to add a section about this in the docs).
The issue lies with how Amazon configures its network namely Elasticsearch is running on private IPs but its reacheable through public ones.
When connector reaches ES, it only knows about the private IPs by default and since it cannot reach them, throws the exception above.

The solution is simple - use the AWS plugin which enables proper discovery in EC2/Amazon and also allows Elasticsearch to use the Amazon public IP for broadcast namely EC2 as explained here.

Hope this helps,

fblundun commented Oct 7, 2015

Thanks very much @costin ! I will try using the plugin.

fblundun commented Oct 7, 2015

The Amazon Elasticsearch Service Limits page says that only a few specific plugins are supported. Further investigation is needed but I'm not sure it's actually possible to enable the AWS Cloud Plugin...

Member
costin commented Oct 7, 2015

Now I see that you are referring to the Amazon Elasticsearch service. I'm not familiar with that but I expect this to be issue (private vs public). Is there anyway to configure the published/advertised IP?
If not that's quite an oversight since it means the nodes in Amazon cannot be accessed directly.
A workaround to this would be to install a proxy that is accessible form outside and has visibility into the private IPs.
The connector can use it thus to route its calls - it kills parallelism (all connections go through only one node) but it's should work.

Have you tried raising this with Amazon?

Cheers,

malpani commented Oct 19, 2015

@costin Is discovery via cloud-aws mandatory for the connection to work?
Amazon Elasticsearch service currently does not expose private IPs or allow other nodes to join the cluster for security purposes. There is an ELB fronting the cluster nodes whose endpoint (listening on port 80 and not 9200) is exposed for access via REST APIs. TCP is not supported currently.

Having taken these into consideration, would users of elastic-hadoop be able to talk to the service using something like below or do you foresee problems with this approach?

 es.nodes = foo.us-east-1.es.amazonaws.com 
 es.port = 80
 es.nodes.discovery = false

SigV4 for signing request will probably not work and users can restrict access to theor Amazon ES cluster via IP based access control?

Update: I tried this using port 80 with "es.nodes.discovery" set to "false" but got the same error as from my first attempt:

Caused by: org.elasticsearch.hadoop.EsHadoopIllegalStateException: Cluster state volatile; cannot find node backing shards - please check whether your cluster is stable
        at org.elasticsearch.hadoop.rest.RestRepository.getWriteTargetPrimaryShards(RestRepository.java:370)
        at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:425)
        at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:393)
        at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.init(EsOutputFormat.java:173)
        at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.write(EsOutputFormat.java:149)
        at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.collect(MapTask.java:844)
        at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:596)
        at cascading.tap.hadoop.util.MeasuredOutputCollector.collect(MeasuredOutputCollector.java:69)
        at org.elasticsearch.hadoop.cascading.EsHadoopScheme.sink(EsHadoopScheme.java:212)
        at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:153)
        ... 21 more
malpani commented Oct 19, 2015

Access to some of the internal cluster management APIs ( like _cluster/state) are also currently restricted, is the connector trying to sniff shard placements for optimization? Is that causing this.

Current list of supported operations - http://docs.aws.amazon.com/elasticsearch-service/latest/developerguide/es-gsg-supported-operations.html

Member
costin commented Oct 20, 2015

@malpani the connector doesn't rely on _cluster/state but it does use search_shard API which is not exposed by Amazon. This alone eliminates the whole premise behind the connector which is to use parallelism to connect Hadoop/Spark to ES efficiently.
Sorry...

Member
costin commented Oct 21, 2015

@fblundun @malpani I've added a new feature in master (and published the new builds - namely 2.2.0.BUILD-SNAPSHOT) which allows the connector to work with restricted cloud-like ES environments.
By setting es.nodes.wan.only to true, the connector will disable discovery and its typical peer-to-peer connections and use only the nodes indicates in es.nodes. For Amazon or Found for example this would be the publicly accessible gateway.
All connections to and from ES will be made through this node - clearly it won't be as efficient as connecting to each shard/node directly but it's also the only possible way.

Can you please try it out and report back how it works for you?

Cheers,

Thanks very much @costin! I will try it out tomorrow and report the results.

Great news! The test run with the latest build succeeded. Thanks @malpani and @costin for all your help!

Thanks so much everybody!

malpani commented Oct 23, 2015

Thats great to hear! Thank you @costin , @fblundun and @alexanderdean

Member
costin commented Oct 25, 2015

Thanks for the feedback. This feature remains 2.2 only - please use the dev build until M2 is released (in short order).

@costin costin closed this Oct 25, 2015
@costin costin removed the v2.1.2 label Oct 25, 2015
stettix commented Dec 9, 2015

Thanks for this fix, it was essential for my current project as well. Hoping it gets into a full release soon!

Paul424 commented May 9, 2016

Thank you for this fix; just got a first spark job running with es.nodes.wan.only set to true.

Just to help others out; we are using the following dependencies in the pom:

<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.6</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark_2.10</artifactId> <version>2.3.1</version> </dependency>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment