ElasticsearchでJava APIを使ってインデックスに登録する
ElasticsearchではJavaのAPIを利用することができます。APIを使うと、普段のプログラミングと同じような感覚で全文検索エンジンのElasticsearchを利用することができ、プログラムの見通しが良くなります。
ここでは、JavaのAPIを利用して、Elasticsearchにデータを登録してみます。
ここでは話を簡単にするために、社員データを登録してみます。
登録の流れ
社員データをElasticsearchに登録するときの、全体の流れは以下のようになります。
(1)社員データをJavaのオブジェクトを使って作成
(2)(1)の社員データをJSON化する。
(3)Elasticsearchに登録する。
簡単に説明すると以上のようになります。以下では、実際のプログラムの詳細をご紹介します。
データクラスと登録用のクラス
社員データは以下のようになっています。
所属
・所属部ID
・名称
・所属部ID
・名称
社員
・所属部ID
・社員ID
・名前
・所属部ID
・社員ID
・名前
これをJavaのクラスで表すと次のようになります。
所属クラスです。
public class Department {
private int department_id;
private String name;
public int getDepartment_id() {
return department_id;
}
public void setDepartment_id(int department_id) {
this.department_id = department_id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
社員クラスです。
public class Employee {
private Department department;
private int employee_id;
private String name;
public int getEmployee_id() {
return employee_id;
}
public void setEmployee_id(int employee_id) {
this.employee_id = employee_id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Department getDepartment() {
return department;
}
public void setDepartment(Department department) {
this.department = department;
}
}
private Department department;
private int employee_id;
private String name;
public int getEmployee_id() {
return employee_id;
}
public void setEmployee_id(int employee_id) {
this.employee_id = employee_id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Department getDepartment() {
return department;
}
public void setDepartment(Department department) {
this.department = department;
}
}
テストデータを作成します。その後、Elasticsearchに接続して、複数のデータを一括して登録するようになっています。
ここではElasticsearch 5.4のJavaライブラリとJacksonというJSONを扱うライブラリを使用しています。
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentType;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectM
apper;
import java.util.ArrayList;
import java.util.List;
public class Elasticsearch{
public static void main(String[] args){
Elasticsearch es = new Elasticsearch();
List<String> data = es.createTestData();
es.index2ES(data);
}
/**
* テストデータの作成。
* @return List<String>
*/
public List<String> createTestData(){
Department department = new Department();
department.setDepartment_id(1);
department.setName("営業部");
Employee employee = new Employee();
employee.setDepartment(department);
employee.setEmployee_id(1);
employee.setName("山田康夫");
Department department2 = new Department();
department2.setDepartment_id(2);
department2.setName("人事部");
Employee employee2 = new Employee();
employee2.setDepartment(department2);
employee2.setEmployee_id(2);
employee2.setName("大木戸博士");
List<String> employees = new ArrayList<String>();
ObjectMapper objectMapper = new ObjectMapper();
try {
String json = objectMapper.writeValueAsString(employee);
String json2 = objectMapper.writeValueAsString(employee2);
employees.add(json);
employees.add(json2);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return employees;
}
/**
* Elasticsearchへ接続する。
*
* @return
*/
public TransportClient getESConnection(){
//elasticsearch.ymlでクラスターの名前を変更した場合には適宜変更してください。
Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();
TransportClient client = new PreBuiltTransportClient(settings);
try {
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
}
catch (UnknownHostException e) {
e.printStackTrace();
}
return client;
}
/**
* Elasticsearchにデータを一括登録する。
* @param jsons
*/
public void index2ES(List<String> jsons){
TransportClient client = getESConnection();
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override
public void afterBulk(long arg0, BulkRequest arg1, BulkResponse arg2) {
}
@Override
public void afterBulk(long arg0, BulkRequest arg1, Throwable arg2)
{
}
@Override
public void beforeBulk(long arg0, BulkRequest arg1) {
}
})
.setBulkActions(10000)
.setConcurrentRequests(0)
.build();
for(String json : jsons){
IndexRequest indexRequest = new IndexRequest("test","employees");
indexRequest.source(json,XContentType.JSON);
bulkProcessor.add(indexRequest);
}
bulkProcessor.flush();
bulkProcessor.close();
client.admin().indices().prepareRefresh().get();
client.prepareSearch().get();
client.close();
}
}
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentType;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectM
apper;
import java.util.ArrayList;
import java.util.List;
public class Elasticsearch{
public static void main(String[] args){
Elasticsearch es = new Elasticsearch();
List<String> data = es.createTestData();
es.index2ES(data);
}
/**
* テストデータの作成。
* @return List<String>
*/
public List<String> createTestData(){
Department department = new Department();
department.setDepartment_id(1);
department.setName("営業部");
Employee employee = new Employee();
employee.setDepartment(department);
employee.setEmployee_id(1);
employee.setName("山田康夫");
Department department2 = new Department();
department2.setDepartment_id(2);
department2.setName("人事部");
Employee employee2 = new Employee();
employee2.setDepartment(department2);
employee2.setEmployee_id(2);
employee2.setName("大木戸博士");
List<String> employees = new ArrayList<String>();
ObjectMapper objectMapper = new ObjectMapper();
try {
String json = objectMapper.writeValueAsString(employee);
String json2 = objectMapper.writeValueAsString(employee2);
employees.add(json);
employees.add(json2);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return employees;
}
/**
* Elasticsearchへ接続する。
*
* @return
*/
public TransportClient getESConnection(){
//elasticsearch.ymlでクラスターの名前を変更した場合には適宜変更してください。
Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();
TransportClient client = new PreBuiltTransportClient(settings);
try {
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
}
catch (UnknownHostException e) {
e.printStackTrace();
}
return client;
}
/**
* Elasticsearchにデータを一括登録する。
* @param jsons
*/
public void index2ES(List<String> jsons){
TransportClient client = getESConnection();
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override
public void afterBulk(long arg0, BulkRequest arg1, BulkResponse arg2) {
}
@Override
public void afterBulk(long arg0, BulkRequest arg1, Throwable arg2)
{
}
@Override
public void beforeBulk(long arg0, BulkRequest arg1) {
}
})
.setBulkActions(10000)
.setConcurrentRequests(0)
.build();
for(String json : jsons){
IndexRequest indexRequest = new IndexRequest("test","employees");
indexRequest.source(json,XContentType.JSON);
bulkProcessor.add(indexRequest);
}
bulkProcessor.flush();
bulkProcessor.close();
client.admin().indices().prepareRefresh().get();
client.prepareSearch().get();
client.close();
}
}
データを登録するときのポイントとメリット
Elasticsearchではよくスキーマ(マッピング)を設定して登録しているサンプルを見かけますが、実際はこのサンプルのように、何も考えずにJSON形式のデータを放り込んであげれば大丈夫です。
細かい検索条件を指定する場合などにはスキーマの設定が必要かもしれませんが、通常の検索であれば問題ありません。Elasticsearch Ver5.0以降からはデフォルトで完全一致用のフィールドも作成されますので、完全一致を実現するだけのためにマッピングファイルを作成するということも必要ありません。
データを作成するときは、JacksonというJSONライブラリのObjectMapperクラスを利用して、Javaのクラスを丸ごとJSON形式に変換しています。
このようにすることで、Javaのオブジェクトを利用できますし、登録データのJSON仕様を検討する手間も省けます。さらに、検索を実行する時にも得られた検索結果を取り出しやすくなります。リレーショナルデータベースのデータをElasticsearchに移行するときなど、既存のクラスファイルを再利用できるというのもメリットの1つです。
関連記事
1.ElasticsearchでJava APIを使った完全一致検索