博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ElasticSearch(八):springboot集成ElasticSearch集群并使用
阅读量:6571 次
发布时间:2019-06-24

本文共 11213 字,大约阅读时间需要 37 分钟。

1. 集群的搭建

见:ElasticSearch(七)

2. springboot配置集群

2.1 创建springboot项目,使用idea创建,不过多介绍(创建项目时候建议不要勾选elasticsearch,springboot目前自带的elasticsearch版本为5.6.10,如果你版本高于这个版本,还是自己手动导入。)

2.2 导入依赖

UTF-8
UTF-8
1.8
6.3.2
org.elasticsearch
elasticsearch
 
${elasticSearch.version}
org.elasticsearch.client
transport
${elasticSearch.version}
org.elasticsearch
elasticsearch
org.elasticsearch.client
elasticsearch-rest-high-level-client
${elasticSearch.version}
org.elasticsearch.plugin
transport-netty4-client
${elasticSearch.version}

对于依赖需要说明的几点:

2.2.1. org.elasticsearch.client--transport 依赖添加之后,会依赖一系列的插件,客户端等,虽然在springboot2.0中依旧依赖  org.elasticsearch-elasticsearch-6.3.2,但是在依赖列表中,其添加的依赖依然是elasticSearch5.6.10的依赖,所以必须排除这个依赖,手动添加org.elasticsearch-elasticsearch6.3.2的依赖,目前只有这种解决方法,否则导致版本不一致冲突。如下:

当我排除 org.elasticsearch.client.transport的elasticsearch的依赖之后,重新添加elasticsearch 6.3.2的依赖之后,就显示的是同样的elasticsearch6.3.2。显示如下:

2.2.2. 这时候如果你再springboot中配置了TransportClient的方法Bean,则启动项目,会报错:

这是因为:transport-netty4-client的版本是5.6.0,而我们使用的所有的elasticsearch版本都是6.3.2,导致jar包冲突,所以,我们必须将transport-netty4-client的版本更新到6.3.2。

这就需要导入jar:org.elasticsearch.plugin----transport-netty4-client 的jar,(具体依赖将上面),这时候transport-netty4-client的版本也是6.3.2了。

2.2.3. 到这里已经可以使用elasticsearch的集群了,不过我们又导入了一个 elasticsearch-rest-high-level-client的jar,目的是:为了使用某些特殊的api。参见:

3. 启动项目,连接elasticSearch集群

3.1 配置集群信息

import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.common.settings.Settings;import org.elasticsearch.common.transport.TransportAddress;import org.elasticsearch.transport.client.PreBuiltTransportClient;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.net.InetAddress;/** * @Auther: cc * @Date:  * @Description: */@Configurationpublic class ESConfig {    private Logger logger  = LoggerFactory.getLogger(this.getClass());    @Value("${elasticsearch.firstIp}")    private String firstIp;    @Value("${elasticsearch.secondIp}")    private String secondIp;    @Value("${elasticsearch.thirdIp}")    private String thirdIp;    @Value("${elasticsearch.firstPort}")    private String firstPort;    @Value("${elasticsearch.secondPort}")    private String secondPort;    @Value("${elasticsearch.thirdPort}")    private String thirdPort;    @Value("${elasticsearch.clusterName}")    private String clusterName;    @Bean    public TransportClient getTransportClient() {        logger.info("ElasticSearch初始化开始。。");        logger.info("要连接的节点1的ip是{},端口是{},集群名为{}" , firstIp , firstPort , clusterName);        logger.info("要连接的节点2的ip是{},端口是{},集群名为{}" , secondIp , secondPort , clusterName);        logger.info("要连接的节点3的ip是{},端口是{},集群名为{}" , thirdIp , thirdPort , clusterName);        TransportClient transportClient = null;        try {            Settings settings = Settings.builder()                    .put("cluster.name",clusterName)    //集群名称                    .put("client.transport.sniff",true)  //目的是为了可以找到集群,嗅探机制开启                    .build();            transportClient = new PreBuiltTransportClient(settings);            TransportAddress firstAddress = new TransportAddress(InetAddress.getByName(firstIp),Integer.parseInt(firstPort));            TransportAddress secondAddress = new TransportAddress(InetAddress.getByName(secondIp),Integer.parseInt(secondPort));            TransportAddress thirdAddress = new TransportAddress(InetAddress.getByName(thirdIp),Integer.parseInt(thirdPort));            transportClient.addTransportAddress(firstAddress);            transportClient.addTransportAddress(secondAddress);            transportClient.addTransportAddress(thirdAddress);            logger.info("ElasticSearch初始化完成。。");        }catch (Exception e){            e.printStackTrace();            logger.error("ElasticSearch初始化失败:" +  e.getMessage(),e);        }        return transportClient;    }}

对于上面代码解释:

3.1.1 首先需要再配置文件中配置服务器集群的所有ip,端口,然后通过@value导入到config类中。

3.2.2 类上必须加@Configuration注解,方法上必须加@Bean注解。

3.2 启动项目,连接集群

启动项目,如果不报错就可行了。

4. 使用springboot操作索引

4.1 创建索引

主要使用方法:

CreateIndexRequest createIndexRequest = Requests.createIndexRequest(index).settings(settings).mapping(type,mapping);  //指定setting,mapping创建索引,如果非结构化索引的话,不指定mappingCreateIndexResponse response = transportClient.admin().indices().create(createIndexRequest).get();logger.info("建立索引映射成功:" + response.isAcknowledged());

4.2 删除索引

DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(index);                  //创建删除索引的请求DeleteIndexResponse response = transportClient.admin().indices().delete(deleteIndexRequest).get();    //删除索引的响应logger.info("删除索引结果:{}",response.isAcknowledged());

完整代码如下

import com.cc.es.domain.base.ResultBean;import io.swagger.annotations.Api;import io.swagger.annotations.ApiImplicitParam;import io.swagger.annotations.ApiImplicitParams;import io.swagger.annotations.ApiOperation;import org.apache.commons.lang3.StringUtils;import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;import org.elasticsearch.client.Requests;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.common.xcontent.XContentBuilder;import org.elasticsearch.common.xcontent.XContentFactory;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestMethod;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;import java.util.*;/** * @Auther: Administrator * @Date: 2018/8/21 07 * @Description: */@Api(value = "Index", tags = "索引")@RestController@RequestMapping("index")public class IndexController {    private final String INDEX = "index";    private final String TYPE = "type";    private Logger logger = LoggerFactory.getLogger(this.getClass());    @Resource    private TransportClient transportClient;    @ApiOperation(value = "结构化创建索引")    @ApiImplicitParams({            @ApiImplicitParam(name = "index", value = "索引名", required = true, dataType = "String", paramType = "query"),            @ApiImplicitParam(name = "type", value = "类型", required = true, dataType = "Integer", paramType = "query"),            @ApiImplicitParam(name = "fields", value = "结构化索引字段名,不定参数,传入的时候参数名为索引字段名,值为对应的数据类型")    })    @RequestMapping(value = "/create" , method = RequestMethod.POST)    public ResultBean createIndex(@RequestParam Map
param){ ResultBean resultBean = new ResultBean(); String index = null; String type = null; List
fieldList = new ArrayList<>(); logger.info("接收的创建索引的参数:" + param); Set
> set = param.entrySet(); for (Map.Entry
entry: set) { String key = entry.getKey(); if(key.trim().equals(INDEX)){ index = entry.getValue(); }else if(key.trim().equals(TYPE)){ type = entry.getValue(); }else{ fieldList.add(key); } } if(StringUtils.isBlank(index) || StringUtils.isBlank(type)){ resultBean.setSuccess(false); resultBean.setMsg("参数错误!"); return resultBean; } try { XContentBuilder settings = XContentFactory.jsonBuilder() .startObject() .field("number_of_shards",6) .field("number_of_replicas",1) .startObject("analysis").startObject("analyzer").startObject("ik") .field("tokenizer","ik_max_word") .endObject().endObject().endObject() .endObject(); XContentBuilder mapping = XContentFactory.jsonBuilder(); mapping.startObject().field("dynamic","strict").startObject("properties"); for (int i = 0,j = fieldList.size(); i < j; i++) { String field = fieldList.get(i); String fieldType = param.get(field); mapping.startObject(field).field("type",fieldType); if(fieldType.trim().equals("date")){ mapping.field("format","yyyy-MM-dd HH:mm:ss || yyyy-MM-dd "); } mapping.endObject(); } mapping.endObject().endObject(); CreateIndexRequest createIndexRequest = Requests.createIndexRequest(index).settings(settings).mapping(type,mapping); CreateIndexResponse response = transportClient.admin().indices().create(createIndexRequest).get(); logger.info("建立索引映射成功:" + response.isAcknowledged()); resultBean.setSuccess(true); resultBean.setMsg("创建索引成功!"); } catch (Exception e) { resultBean.setSuccess(false); resultBean.setMsg("创建索引失败!"); logger.error("创建索引失败!要创建的索引为{},文档类型为{},异常为:",index,type,e.getMessage(),e); } return resultBean; } @ApiOperation(value = "删除索引") @ApiImplicitParams({ @ApiImplicitParam(name = "index", value = "索引名", required = true, dataType = "String", paramType = "query"), }) @RequestMapping(value = "/delete" , method = RequestMethod.POST) public ResultBean deleteIndex(String index){ ResultBean resultBean = new ResultBean(); if (StringUtils.isBlank(index)) { resultBean.setMsg("参数错误,索引为空"); resultBean.setSuccess(false); return resultBean; } try { DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(index); DeleteIndexResponse response = transportClient.admin().indices().delete(deleteIndexRequest).get(); logger.info("删除索引结果:{}",response.isAcknowledged()); resultBean.setSuccess(response.isAcknowledged()); resultBean.setMsg(response.isAcknowledged() ? "删除索引成功!" : "删除索引失败!"); } catch (Exception e) { resultBean.setSuccess(false); resultBean.setMsg("创建索引失败!"); logger.error("删除索引失败!要删除的索引为{},异常为:",index,e.getMessage(),e); } return resultBean; } }
View Code

 

到目前为止,springboot的索引已经完成。这里都是使用的原生的一些api,以后可能还会使用一些别的方法完成。

 

转载于:https://www.cnblogs.com/chenmc/p/9548847.html

你可能感兴趣的文章
django迁移数据库报错解决
查看>>
51nod——1640 天气晴朗的魔法 有边权限制的最大生成树
查看>>
OpenFlow技术白皮书-V1.0
查看>>
Ubuntu 安装Eclipse CDT 开发C、C++
查看>>
SIFT 特征点提取算法
查看>>
问题-XE8客户端访问Webservice时报“no selected dom vendor”
查看>>
day40-mysql数据备份、pymysql模块
查看>>
importerror:cannot import name 'smart_unicode'
查看>>
熟悉常用的Linux操作
查看>>
5.2 5.3 实验五 四则运算
查看>>
app的推广
查看>>
poi 导出 excel
查看>>
servlet - 乱码、重定向、cookie
查看>>
Directx教程(30) 如何保证渲染物体不会变形
查看>>
uva-10887-枚举
查看>>
uva-10718-贪心
查看>>
DDD
查看>>
用python写算法1[用两个栈实现一个队列]
查看>>
mysql-锁表机制分析(转)
查看>>
Zabbix3.0安装与部署(centos7)
查看>>