从MySQL到ES -- 大宽表解决方案 (一)

大家是否时常在开发中遇到要查询的数据不在一个表中,需要多个表联合起来才能进行筛选,而且有时候还需要分页,而且像一些数量级较大的表,再加上一些模糊查询条件,往往容易索引失效,导致慢查。

这时候我们往往会使用ES作为大宽表,然后再ES进行各种查询操作,那我们怎么将我们的业务数据同步到ES,这就是我们今天要聊的,MySQL 到 ES 的同步方案。

同步方案

同步双写

图片

同步双写, 写入数据库的同时,再往es中也同时写入数据。这样会存在ES和业务耦合较紧密的缺点,但实现比较容易。

异步双写

图片

借助于MQ, 将ES于业务解耦开来,使得MySQL和ES的数据可以分开维护,但缺点也有,就是业务中还是多了一层MQ的中间件,需要维护MQ的发送和接受,同时也提高了系统的复杂度。

binlog 订阅

图片

使用 binlog 订阅的方式,使得 ES完全从业务中解耦出来,只需要接受 binlog 的数据,也不会给线上的 mysql 带来性能压力,也不用维护 MQ。

所以我们这次就用 binlog 订阅的方式来同步 ES。

Canal

图片

Canal 是阿里巴巴开源的 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

实现原理是 Canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议,然后接收 master 发送过来的数据。

使用

MySQL 表结构

我们这次模拟一个简单的业务场景

图片

我们有类目,spu,sku三张表, 然后我们需要根据类目名称和spu名称,查询可能的sku数据。

表结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
-- auto-generated definition
create table category
(
id int auto_increment
primary key,
name varchar(255) not null
);

-- auto-generated definition
create table spu
(
id int auto_increment
primary key,
category_id int not null,
name varchar(255) not null
);

-- auto-generated definition
create table sku
(
id int auto_increment
primary key,
spu_id int not null,
name varchar(255) not null
);



环境配置

mysql

使用 docker 安装,这样环境更统一一些,宿主机不同造成的影响也更小些。

先在本地创好 data,conf,log文件夹,然后再执行 doker run, 这次我们采用的是5.7版本

1
2
3
4
5
docker run -d -p 3306:3306 --name mysql5.7 
-v /home/robinson/code/mysql-data/data:/var/lib/mysql
-v /home/robinson/code/mysql-data/conf:/etc/mysql/conf.d
-v /home/robinson/code/mysql-data/log:/var/log/mysql
-e MYSQL_ROOT_PASSWORD=654321 mysql:5.7

执行完毕后,我们需要修改conf中的配置,改为binlog模式, 并且注册一个canal用户,授权slave相关权限。

  1. 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,在映射到宿主机的conf目录下面新建my.cnf文件 然后加入配置如下
    1
    2
    3
    4
    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
  2. 授权canal用户
    1
    2
    3
    4
    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
  3. 插入一些测试数据
    到此为止,我们mysql的准备工作就做好了。

ES

es我们也在docker中进行安装,然后再安装一个kibana,方便我们查看数据

  1. docker run es 并开放9200,9300端口
    1
    2
    3
    docker network create elastic
    docker pull docker.elastic.co/elasticsearch/elasticsearch:7.17.9
    docker run --name es01-test --net elastic -p 127.0.0.1:9200:9200 -p 127.0.0.1:9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.17.9
  2. docker run kibana
    1
    2
    docker pull docker.elastic.co/kibana/kibana:7.17.9
    docker run --name kib01-test --net elastic -p 127.0.0.1:5601:5601 -e "ELASTICSEARCH_HOSTS=http://es01-test:9200" docker.elastic.co/kibana/kibana:7.17.9
    然后打开 http://localhost:5601 就能看到kibana界面了

建ES 索引 (约等于把表结构建好了)

es的索引中 我们把 类目和spu的数据整合了进来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
PUT /product
{
    "mappings": {
        "properties": {
            "id": {
                "type": "integer"
            },
                        "category_id": {
                "type": "integer"
            },
                        "spu_id": {
                "type": "integer"
            },
            "name": {
                "type": "keyword"
            },
                        "category_name": {
                "type": "keyword"
            },
                        "spu_name": {
                "type": "keyword"
            }
        }
    }
}

自此,ES 的准备工作也差不多了,接下来就要开始部署canal了。

canal-server

本来canal也打算使用docker部署的, 但部署完毕后每次都直接exit了,而且连错误日志也没有生成, 所以最后还是打算使用原生命令执行。这次使用的是1.1.6版本,是目前最新的文档版,1.1.7还在测试当中。

  1. 下载压缩包
    1
    wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
  2. 解压缩
    1
    2
    mkdir /tmp/canal
    tar zxvf canal.deployer-1.1.6.tar.gz -C /tmp/canal
  3. 修改配置
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    vi conf/example/instance.properties

    ## mysql serverId
    canal.instance.mysql.slaveId = 1234
    #position info,需要改成自己的数据库信息
    canal.instance.master.address = 127.0.0.1:3306
    canal.instance.master.journal.name =
    canal.instance.master.position =
    canal.instance.master.timestamp =
    #canal.instance.standby.address =
    #canal.instance.standby.journal.name =
    #canal.instance.standby.position =
    #canal.instance.standby.timestamp =
    #username/password,需要改成自己的数据库信息
    canal.instance.dbUsername = canal
    canal.instance.dbPassword = canal
    canal.instance.defaultDatabaseName =
    canal.instance.connectionCharset = UTF-8
    #table regex
    canal.instance.filter.regex = .\*\\\\..\*
  4. 启动
    1
    sh bin/startup.sh

canal-adapter

因为官方自带了 es 的插件,所以只需配置一些参数即可同步到es。

我这次先是下了1.1.6的 adapter

修改了application.yml里面的配置

1
2
3
4
5
6
7
8
9
      - name: es7
        key: exampleKey
        hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
        properties:
          mode: transport # or rest
#          # security.auth: test:123456 #  only used for rest mode
          cluster.name: docker-cluster
#      - name: kudu

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
dataSourceKey: defaultDS
destination: example
outerAdapterKey: exampleKey
groupId: g1
esVersion: es7
esMapping:
  _index: product
  _id: _id
#  upsert: true
#  pk: id
  sql: "select k.id as _id, k.id as id, k.name as name, p.id as spu_id, p.name as spu_name, c.id as category_id, c.name as category_name  from sku k left join spu p on p.id = k.spu_id left join category c on c.id = p.category_id"
#  objFields:
#    _labels: array:;
  # etlCondition: "where id>='{0}'"
  commitBatch: 3000

1
sql: "select k.id as _id, k.id as id, k.name as name, p.id as spu_id, p.name as spu_name, c.id as category_id, c.name as category_name  from sku k left join spu p on p.id = k.spu_id left join category c on c.id = p.category_id"

这句sql很重要, canal 就会根据这句sql去拉取 聚合数据, 并监听binlog里面的数据
当然也遇到了一些问题。

  1. 提示 cluster 不正确
    图片

图片

集群的名字要填写正确

  1. 启动后log里不停的报异常

    1
    java.lang.NoSuchMethodError: java.nio.ByteBuffer.clear()Ljava/nio/ByteBuffer;

    https://github.com/alibaba/canal/issues/3396
    发现很多人和我一样,看了下解决方案就是把jdk从8升到11,因为8里面没有这个方法,可官方说是支持jdk8的,升级到8之后不报这个异常了。

  2. 虽然异常不报了,但启动后无论怎样也无法同步到es,并且log里啥异常也没显示
    https://github.com/alibaba/canal/issues/3144

https://github.com/alibaba/canal/issues/2284

查询后发现也有很多人也有相同问题,有人说重新编译,说是escore的包和adapter的包有冲突

说是需要源包pom加这个重新编译, 然后再提取jar包放到plugin中执行。

1
2
3
4
5
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<scope>provided</scope>//加上这行配置,让es的xxxx-with-dependency.jar不包含druid相关包
</dependency>

但我不信邪,我觉得最新版里面肯定修复了这个问题,看官方的提交记录也是修复了的,在后续的hotfix版本和1.1.7版本中。
于是,,我安装了 canal-1.1.7-alpha-2 版本

  1. 新版本依然不行
    https://github.com/alibaba/canal/issues/4354

https://github.com/alibaba/canal/issues/4533

1
Failed to bind properties under 'es-mapping' to com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig$ESMapping

看了大家的评论之后,发现是官方的spring版本对解析 _index 这些保护下划线的 yml有问题。
无奈之下

我回退到了 1.1.5 版本, 自己拉源码,改pom,然后编译打包。

终于!!!

使用1.1.5版本后可以使用了

图片

当我修改了 数据库内的数据, canal 会监听到 ,然后会同步到 es 中。

图片

当然我在调试过程中又发现了一些问题,

就是我修改主表数据,比如说name, 然后es同步中会把category_name, 和 spu_name 都修改了, 因为他们源表里面都叫name, 改同名数据会全改掉

https://github.com/alibaba/canal/issues/2023

这个bug在后续的版本里官方已经修复, 然后可惜1.1.5还存在这问题,我后续版本又使用不了。

图片

还有一个问题,就是官方不支持一对多,再一对多,比方说,我sku关联spu可以,但再加个spu关联category的关系就不行, 除非是sku关联category。这点还是有点致命的,关联关系复杂的时候可能需要自己实现canal-client来接收数据处理,或者使用其他方案。

Spring + ES

那我们数据已经同步到es中了,然后怎么使用呢

现在我们结合 SpringBoot 开始简单的查询一下我们宽表中的数据

图片

项目结构

图片

es的连接配置, 放在config中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* ElasticSearch 客户端配置
*/
@Configuration
public class RestClientConfig extends AbstractElasticsearchConfiguration {
@Override
@Bean
public RestHighLevelClient elasticsearchClient() {
final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo("localhost:9200")
.build();
return RestClients.create(clientConfiguration).rest();
}
}

Repository

1
2
3
4
5
6
7
8
9
10
/**
* ES存储 Product repository,定义Product与ES索引映射关系
*/
@Repository("esProductRepository")
public interface ESProductRepository extends ElasticsearchRepository<ESProduct, String> {
List<ESProduct> findESProductByName(String name);
@Query("{\"match\":{\"name\":\"?0\"}}")
SearchHits<ESProduct> find(String keyword);
}

Service

1
2
3
4
5
6
7
8
9
10
11
@Service
public class ESProductServiceImpl implements ESProductService {
@Autowired
private ESProductRepository esProductRepository;
public List<ESProduct> search(String keyword) {
return esProductRepository.findESProductByName(keyword);
}
public SearchHits<ESProduct> searchName(String keyword) {
return esProductRepository.find(keyword);
}
}

Controller

1
2
3
4
5
6
7
8
9
10
11
@RestController
public class EsController {
@Autowired
private ESProductService esProductService;

@GetMapping("/es-search")
public List<ESProduct> search(String keyword) {
return esProductService.search(keyword);
}
}

然后我们启动项目, 在浏览器中进行调用, 可以发现成功查询到了es里的数据。

图片

自此一个 从 MySQL binlog 同步到 ES, 再到业务项目中通过ES查询数据的链路已经可以串起来了!