ElacticSearch自动同步Hbase数据做二级索引

最近在做blog的迁移,把原先的文章准备迁移到新的blog平台上。
先简单的介绍下项目背景吧,[Es的版本是2.4 hbase是1.2] ,最近公司业务调整 需要把原有的数据从关系型数据库调整到Hbase中,奈何Hbase做查询性能并不是那么优秀,所以准备集成Es做二级索引。在网上找了一大堆的资料 全部都是ElasticSearch1.X的 到2.X的版本基本上没有资料。自己折腾了好几天 现在终于整合成功。

好了 不多说 上代码 代码是基于Hbase的BaseRegionObserver修改写的,我这里重写了BaseRegionObserver
的postPut的方法和start方法 由于只是一个简单的Demo 还有很多方便没有考虑 所以自己供参考用,整个项目也只有这一个类,需要删除和更新也同步的同学请自己完善类 的方法。

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.plugin.deletebyquery.DeleteByQueryPlugin;
import java.io.IOException;
import java.net.InetAddress;
import java.util.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HabseElasticSearchObServer extends BaseRegionObserver {

private static final Logger logger = LoggerFactory.getLogger(HabseElasticSearchObServer.class);

private Client client;
private static final String CLUSTER_NAME = "estest"; //实例名称
private static final String IP = "192.168.11.27";//Host地址
private static final int PORT = 9300; //端口

@Override
public void start(CoprocessorEnvironment e) throws IOException {
Settings settings = Settings.settingsBuilder().put("cluster.name",CLUSTER_NAME).put("client.transport.sniff", true).build();
client = TransportClient.builder().settings(settings).addPlugin(DeleteByQueryPlugin.class).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(IP), PORT));
}



@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
super.postPut(e, put, edit, durability);
try {
String tableName=Bytes.toString(e.getEnvironment().getRegionInfo().getTable().getName());
ArrayList<Cell> editCells=edit.getCells();
String rowKey=null,family=null,colum=null,value=null;
Map<String,Object> familyMap=new HashMap<String, Object>();
for(Cell cell:editCells){
rowKey=Bytes.toString(cell.getRow());
family=Bytes.toString(cell.getFamily());
value=Bytes.toString(cell.getValue());
colum=Bytes.toString(cell.getQualifier());
familyMap.put(colum,value);
}
client.prepareIndex("auth", tableName, rowKey).setSource(XContentFactory.jsonBuilder()
.startObject()
.field(family, familyMap)
.endObject()).get();
} catch (Exception ex){
logger.error("------------sync hbase data to es error " + ex );
}
}
}

这里说明一下,如果没有安装delete-by-query的插件则不需要添加addPlugin(DeleteByQueryPlugin.class)这句话请自行删除.

POM文件部分内容如下,delete-by-query的引用看具体情况 是用来做Es的查询删除用的,但是前二个是必须的。

 <dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>delete-by-query</artifactId>
<version>2.4.0</version>
</dependency>
</dependencies>
`

好了,整体的修改就可以。用修改的jar替换hbase的jar。然后hbase集群启动的时候当有数据进来的时候Es也会同步新增数据。这样就可以用Es做hbase的二级索引查询了。
但是目前这中方式对hbase的侵入性较大,后续如果有什么更好的方法,我这边也会同步跟进更新。