从MySQL到ES -- 大宽表解决方案 (二)

上一篇文章里我使用了canal将mysql的数据同步到了es当中,但也留下了一些遗憾,像表间关联不能超过两级,只能从mysql中同步数据,稍复杂的逻辑就要自己实现client,还有实践过程中不停的踩坑等等问题。所以我后来又寻找了很多其他方案, 然后发现了2020年才刚起步的Flink CDC项目,它是Flink的一个衍生项目,CDC 全称 change data capture,是用来进行变更数据的捕获的。看了官方文档和一些社区会议视频便爱上了这个项目,配置方便,功能又那么强大。接下来我按上次的sku再来演示一下。

大家可能都听过 Flink, Flink 是一款分布式的计算引擎,它可以用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即实时地处理一些实时数据流,实时地产生数据的结果;也可以用来做一些基于事件的应用。

那Flink CDC 又是什么呢,如果说Flink是用来处理流的, 那Flink CDC就是那个来提供流的, 它是Flink 的 source。它会进行各种数据源的change 的捕获, 然后来提供给 Flink, 之后Flink再进行计算, 然后 sink到输出端。

引用一张官方的图来解释下。

https://robchxx.notion.site/image/https%3A%2F%2Fs3-us-west-2.amazonaws.com%2Fsecure.notion-static.com%2F110af5e8-70e0-4ee2-ae64-4720d8b1eb70%2Fimage.png?id=f2553e87-cead-46cc-8f02-5cca86246f9a&table=block&spaceId=c913953c-7dcd-467e-9494-b176142e9ba0&width=1830&userId=&cache=v2

左边是数据源, 右边是要将处理或者计算好的数据输出的端, 中间就是Flink CDC 和 Flink相结合的过程。

而且Flink还有一个极大的优势是, 它描述数据源和描述输出的方式, 就和SQL语法一样,写起来极为丝滑和舒适。

接下来先让我们像上期那样,将 Mysql 的三张表聚合打到ES中,记住,这只是Flink CDC小试身手。

复刻上期

数据准备

首先依然是docker内启动mysql5.7,修改ini为binlog row模式,然后创好三张表,初始化一些数据(可以见上一篇文章)。

https://robchxx.notion.site/image/https%3A%2F%2Fs3-us-west-2.amazonaws.com%2Fsecure.notion-static.com%2Fa578c20e-5885-44d7-b4b0-22c6c298a9fc%2FUntitled.png?id=18c9b583-b254-48cf-9d30-49a9aeac2eaa&table=block&spaceId=c913953c-7dcd-467e-9494-b176142e9ba0&width=1830&userId=&cache=v2

然后是es的mappings

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
    "mappings": {
        "properties": {
            "id": {
                "type": "integer"
            },
            "category_id": {
                "type": "integer"
            },
            "spu_id": {
                "type": "integer"
            },
            "name": {
                "type": "keyword"
            },
            "category_name": {
                "type": "keyword"
            },
            "spu_name": {
                "type": "keyword"
            }
        }
    }

下载Flink https://archive.apache.org/dist/flink/flink-1.16.0/flink-1.16.0-bin-scala_2.12.tgz

解压到 flink-1.16.0文件夹中

然后下载mysql和es的依赖jar包

https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar

https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar

直接放到放到flink-1.16.0/lib 目录下即可。

然后到flink-1.16.0/bin 目录下执行 ./start-cluster.sh 就运行了起来。

然后我们可以在localhost:8081上看到界面, 对,没错,它竟然还有界面,因为依托于强大的Flink。

https://robchxx.notion.site/image/https%3A%2F%2Fs3-us-west-2.amazonaws.com%2Fsecure.notion-static.com%2Fb9f9c0a6-1728-4204-90fc-7bd4ef851caf%2FUntitled.png?id=3a6c8899-fbe1-431f-b9bc-0a8ff9a002ee&table=block&spaceId=c913953c-7dcd-467e-9494-b176142e9ba0&width=1830&userId=&cache=v2

创建CDC Source

然后就开始最神奇的一步,

我们在 flink-1.16.0/bin 执行 ./sql-client.sh

https://robchxx.notion.site/image/https%3A%2F%2Fs3-us-west-2.amazonaws.com%2Fsecure.notion-static.com%2Fbd43d969-3942-4fc7-87b0-c0132817d80e%2FUntitled.png?id=fda9f58a-6378-4b24-b09f-cac1803f1482&table=block&spaceId=c913953c-7dcd-467e-9494-b176142e9ba0&width=1830&userId=&cache=v2

会出现一个flink sql的输入界面。

然后我们将我们的source像创表一样输入进去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
-- Flink SQL
Flink SQL> CREATE TABLE category (
    id INT,
    name STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = '654321',
    'database-name' = 'es-test',
    'table-name' = 'category'
  );

Flink SQL> CREATE TABLE spu (
   id INT,
   category_id INT,
   name STRING,
   PRIMARY KEY (id) NOT ENFORCED
 ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = '654321',
    'database-name' = 'es-test',
    'table-name' = 'spu'
 );

Flink SQL> CREATE TABLE sku (
   id INT,
   spu_id INT,
   name STRING,
   PRIMARY KEY (id) NOT ENFORCED
 ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = '654321',
    'database-name' = 'es-test',
    'table-name' = 'sku'
 );

就这样我们定义好了我们的source。

创建CDC Sink

然后就要创建Sink了,Sink就是我们要输出的地方,也像创表那样简单,这次我们输出到es中

1
2
3
4
5
6
7
8
9
10
11
12
13
Flink SQL> CREATE TABLE product (
   id INT,
   category_id INT,
   spu_id INT,
   name STRING,
   category_name STRING,
   spu_name STRING,
   PRIMARY KEY (id) NOT ENFORCED
 ) WITH (
     'connector' = 'elasticsearch-7',
     'hosts' = 'http://localhost:9200',
     'index' = 'product'
 );

Source => Sink

最后就是建立对应关系

1
2
3
4
5
Flink SQL> INSERT INTO product
select k.id as id, c.id as category_id, p.id as spu_id, k.name as name, c.name as category_name, p.name as spu_name
from sku k
left join spu p on p.id = k.spu_id
left join category c on c.id = p.category_id;

对没错,它连建立对应关系都像sql一样,只要select出来,然后insert到要输出的表中就行。
当我们按下回车

https://robchxx.notion.site/image/https%3A%2F%2Fs3-us-west-2.amazonaws.com%2Fsecure.notion-static.com%2Fb7a9cf37-2ee7-448a-834f-02262aeb502f%2FUntitled.png?id=ecaaa0cc-a7ee-4dfc-8801-1a8e3053f193&table=block&spaceId=c913953c-7dcd-467e-9494-b176142e9ba0&width=1830&userId=&cache=v2

它会告诉我们job已经成功的提交了。

这时候我们我们打开kibana其实已经可以成功的看到数据过来了

https://robchxx.notion.site/image/https%3A%2F%2Fs3-us-west-2.amazonaws.com%2Fsecure.notion-static.com%2F9ea524f4-c1c7-4362-ad6f-9057cf660361%2FUntitled.png?id=07c33ca0-4578-4148-9c8e-4b3f0b14d34f&table=block&spaceId=c913953c-7dcd-467e-9494-b176142e9ba0&width=1830&userId=&cache=v2

然后我们改下category的名称,测试一些多次关联后的表的变动是否会同步

https://robchxx.notion.site/image/https%3A%2F%2Fs3-us-west-2.amazonaws.com%2Fsecure.notion-static.com%2F6408f2a1-7a2b-4885-9b68-314ae9d9374b%2Fimage_(1).png?id=9e0e5675-524a-4b4b-9ca6-1337aeb26757&table=block&spaceId=c913953c-7dcd-467e-9494-b176142e9ba0&width=1830&userId=&cache=v2

将category id为1的name加了-test之后,刷新我们可以看到es内的数据也同步进行了变更,说明是会进行同步的。

打卡Flink的web界面,其实我们是可以看到job的处理流的

https://robchxx.notion.site/image/https%3A%2F%2Fs3-us-west-2.amazonaws.com%2Fsecure.notion-static.com%2F4a7b3235-839e-4027-be36-02daaa38ce26%2FUntitled.png?id=e72b519c-eef8-4feb-8419-423ee61a055d&table=block&spaceId=c913953c-7dcd-467e-9494-b176142e9ba0&width=1830&userId=&cache=v2

可以看到它先进行了sku和spu的计算,再和category进行计算,最后流打入到我们的es中。

踩的小坑

这次使用Flink CDC相比上次的Canal还是顺利了许多,虽然也有踩一点小坑。

insert的表的字段顺序要和select的顺序一致

https://robchxx.notion.site/image/https%3A%2F%2Fs3-us-west-2.amazonaws.com%2Fsecure.notion-static.com%2Fd54de23a-652b-4f2b-b6e3-20f142be2b8e%2FUntitled.png?id=1b272204-d280-40e1-a78b-84bf94e4af7c&table=block&spaceId=c913953c-7dcd-467e-9494-b176142e9ba0&width=1830&userId=&cache=v2

我一开始select 的顺序没跟es一样,导致报错了,我以为会根据名称自动匹配,结果还要顺序一致。

Mysql时区要和Flink配置的时区一致

insert 命令执行后我发现界面没有报错,但是es里一直没数据。

一番排查查找日志后我发现是Mysql里面的时区跟Flink不一致导致的

https://robchxx.notion.site/image/https%3A%2F%2Fs3-us-west-2.amazonaws.com%2Fsecure.notion-static.com%2F3ef6b83f-8ac8-43ae-949f-81a05a6c5d13%2FUntitled.png?id=93efa2b9-8a49-44fb-a979-9655621af3fa&table=block&spaceId=c913953c-7dcd-467e-9494-b176142e9ba0&width=1830&userId=&cache=v2

说我的timezone不是 Asia/Shanhai,然后我一看还真不是,我是UTC,然后timezone用的System,System又是因为docker启动的,所以也是UTC。

1
set global time_zone='Asia/Shanghai';

https://robchxx.notion.site/image/https%3A%2F%2Fs3-us-west-2.amazonaws.com%2Fsecure.notion-static.com%2F0495e975-640a-4cea-81f8-64cfdb9646b7%2Fimage_(2).png?id=59eaa953-6015-410d-8182-1d033fa19557&table=block&spaceId=c913953c-7dcd-467e-9494-b176142e9ba0&width=1660&userId=&cache=v2
设置好之后就可以正常同步了。

支持异源

支持异源,表示Flink CDC可以不仅从一个库拿数据,它还可以同时从多个库拿数据,并且这些库还可以是不同的数据库架构。这也是Canal无法比拟的地方,Canal只能同步Mysql的数据。

现在我通过一个例子来演示一下,Flink CDC的这个能力。

我们现在已经有了category,spu,sku表了。

此时我们又使用mongoDB 存储了sku的素材数据,因为mongo比较灵活,所以我们使用mongo来存这个数据。

然后我们又利用postgress的性能高效,用它来存储了sku的库存数据。

这时候如果我们再需要把这些数据同步到es里,对以前来说会比较困难,但对Flink来说就像之前创表再insert一样容易。

https://robchxx.notion.site/image/https%3A%2F%2Fs3-us-west-2.amazonaws.com%2Fsecure.notion-static.com%2F09f0a8bb-5b02-4ab8-b4c8-dfced57e0d64%2Fimage_(3).png?id=70fa08e1-8236-463d-bc02-61e241ba22be&table=block&spaceId=c913953c-7dcd-467e-9494-b176142e9ba0&width=1830&userId=&cache=v2

然后我们配置一下mongo和postgress的环境和数据

mongoDB

像之前一样,我们先在docker内把mongoDB跑起来

1
2
3
4
5
6
docker run --name mongo \
-v /home/robinson/code/mongo_data:/data/db \
-p 0.0.0.0:27017:27017 \
-e MONGO_INITDB_ROOT_USERNAME=root \
docker.io/library/mongo:5.0 \
--replSet rs0

创库创集合填充数据

https://robchxx.notion.site/image/https%3A%2F%2Fs3-us-west-2.amazonaws.com%2Fsecure.notion-static.com%2F607c31db-7e3a-4071-9324-8d5fb8ec331e%2Fimage_(4).png?id=b8e3b1b6-cc6f-415d-a8be-9442721c9640&table=block&spaceId=c913953c-7dcd-467e-9494-b176142e9ba0&width=850&userId=&cache=v2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// 创表
use data
// 创一个collection
db.createCollection("material")
// 然后往collection内预置数据
db.material.insertMany([
{
"sku_id" : 1,
"detail" : [
{
"material_type" : "1",
"material_explain" : "主图",
"content" : "https://testtest.com/img.jpg"
},
{
"material_type" : "2",
"material_explain" : "详情页图",
"content" : "https://testtest.com/img2.jpg"
}
]
},
{
"sku_id" : 2,
"detail" : [
{
"material_type" : "1",
"material_explain" : "主图",
"content" : "https://testtest.com/img3.jpg"
},
{
"material_type" : "2",
"material_explain" : "详情页图",
"content" : "https://testtest.com/img4.jpg"
}
]
},
{
"sku_id" : 3,
"detail" : [
{
"material_type" : "1",
"material_explain" : "主图",
"content" : "https://testtest.com/img5.jpg"
},
{
"material_type" : "2",
"material_explain" : "详情页图",
"content" : "https://testtest.com/img6.jpg"
}
]
}
])

创建用来给flink读流的角色

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
use admin;
db.createRole(
{
role: "flinkrole",
privileges: [{
// Grant privileges on all non-system collections in all databases
resource: { db: "", collection: "" },
actions: [
"splitVector",
"listDatabases",
"listCollections",
"collStats",
"find",
"changeStream" ]
}],
roles: [
// Read config.collections and config.chunks
// for sharded cluster snapshot splitting.
{ role: 'read', db: 'config' }
]
}
);

postgress

同样先是用docker跑一个postgress出来

1
2
3
4
5
 docker run --name postgres \
-e POSTGRES_PASSWORD=654321 \
-v /home/robinson/code/postgres_data:/var/lib/postgresql/data \
-p 0.0.0.0:5432:5432 \
docker.io/library/postgres:12

然后创表填充数据
https://robchxx.notion.site/image/https%3A%2F%2Fs3-us-west-2.amazonaws.com%2Fsecure.notion-static.com%2Fe382a79d-e7da-4ebe-8685-421fafab1eeb%2Fimage_(5).png?id=9bf5df8c-6aba-4159-952f-a74a46b3b6b9&table=block&spaceId=c913953c-7dcd-467e-9494-b176142e9ba0&width=810&userId=&cache=v2

1
2
3
4
5
6
7
8
9
create table stock
(
id serial,
sku_id integer,
stock_num integer
);
INSERT INTO supply.stock (id, sku_id, stock_num) VALUES (1, 1, 123123);
INSERT INTO supply.stock (id, sku_id, stock_num) VALUES (2, 2, 342);

在Flink中create

准备工作做完后就是像mysql一样 在 Flink Sql内创表

material表

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE material (
   _id STRING,
   sku_id INT,
   detail ARRAY<ROW<material_type STRING, material_explain STRING, content STRING>>,
   PRIMARY KEY (_id) NOT ENFORCED
 ) WITH (
   'connector' = 'mongodb-cdc',
   'hosts' = 'localhost:27017',
   'username' = 'flink',
   'password' = '654321',
   'database' = 'data',
   'collection' = 'material'
 );

stock表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CREATE TABLE stock (
  id INT,
  sku_id INT,
  stock_num INT,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = 'localhost',
  'port' = '5432',
  'username' = 'postgres',
  'password' = '654321',
  'database-name' = 'postgres',
  'schema-name' = 'supply',
  'table-name' = 'stock'
);

创建新的es的mapping

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
PUT /product_v2
{
    "mappings": {
        "properties": {
            "id": {
                "type": "integer"
            },
             "category_id": {
                "type": "integer"
            },
          "spu_id": {
                "type": "integer"
            },
            "name": {
                "type": "keyword"
            },
            "category_name": {
                "type": "keyword"
            },
            "spu_name": {
                "type": "keyword"
            },
            "material": {
                "type": "object"
            },
            "stock_num": {
                "type": "integer"
            }
        }
    }
}

然后在Flink里再创这个新的product_v2的表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CREATE TABLE product_v2 (
   id INT,
   category_id INT,
   spu_id INT,
   name STRING,
   category_name STRING,
   spu_name STRING,
   material ARRAY<ROW<material_type STRING, material_explain STRING, content STRING>>,
   stock_num INT,
   PRIMARY KEY (id) NOT ENFORCED
 ) WITH (
     'connector' = 'elasticsearch-7',
     'hosts' = 'http://localhost:9200',
     'index' = 'product_v2'
 );

然后就是写最后的insert预计,将表聚合导到es中

1
2
3
4
5
6
7
INSERT INTO product_v2
select k.id as id, c.id as category_id, p.id as spu_id, k.name as name, c.name as category_name, p.name as spu_name, m.detail as material, s.stock_num as stock_num
from sku k
left join spu p on p.id = k.spu_id
left join category c on c.id = p.category_id
left join material m on m.sku_id = k.id
left join stock s on s.sku_id = k.id;

回车

https://robchxx.notion.site/image/https%3A%2F%2Fs3-us-west-2.amazonaws.com%2Fsecure.notion-static.com%2Faf5c81ac-9507-4847-910a-ad5c018f8100%2Fimage_(6).png?id=48ace6c5-7ef9-4d70-b68c-0b99b63165dd&table=block&spaceId=c913953c-7dcd-467e-9494-b176142e9ba0&width=1830&userId=&cache=v2

然后我们可以看到历史数据已经同步到了es中了,素材和库存也同步过来了。

然后我们再改下mongoDB里面的素材图片地址

图片

可以看到es内的图片地址也随之发生了变更

然后我们到Flink的web界面查看,可以看到它整个的计算过程。

https://robchxx.notion.site/image/https%3A%2F%2Fs3-us-west-2.amazonaws.com%2Fsecure.notion-static.com%2Fcae532bc-21b8-48f7-8736-b8b314c1af00%2Fimage_(8).png?id=811d7f8c-b5f3-45b7-a704-191a63c8fdab&table=block&spaceId=c913953c-7dcd-467e-9494-b176142e9ba0&width=1830&userId=&cache=v2

多端输出

我之前设置的sink只有ES,所以数据只输出到了es中,那如果有一天其他组的同学说要接收sku变更的事件怎么办。

我们不用在代码里加写入事件的逻辑,可以直接写一个Sink, 将输出写入到Kafka中就行。

接下来我演示一下。

kafka配置

因为启动kafka的同时还要启动zookeeper,所以这次放到了docker-compose-file里面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
version: '3'

services:
  zookeeper:
    image: docker.io/wurstmeister/zookeeper
    restart: unless-stopped
    ports:
      - "0.0.0.0:2181:2181"
    # volumes:
    #   - /usr/local/zookeeper/data:/data
    #   - /usr/local/zookeeper/log:/datalog
    container_name: zookeeper

  kafka:
    image: docker.io/wurstmeister/kafka
    ports:
      - "0.0.0.0:9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: "172.23.80.104"
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      # KAFKA_LOG_DIRS: "/kafka/kafka-logs-1"
    # volumes:
    #   - /usr/local/kafka/logs:/kafka/kafka-logs-1
    depends_on:
      - zookeeper
    container_name: kafka

创建topic测试

1
2
docker exec -it kafka kafka-console-producer.sh --broker-list 172.23.80.104:9092 --topic test
docker exec -it kafka kafka-console-consumer.sh --bootstrap-server 172.23.80.104:9092 --topic test --from-beginning

https://robchxx.notion.site/image/https%3A%2F%2Fs3-us-west-2.amazonaws.com%2Fsecure.notion-static.com%2Fc8e27666-3d0d-4598-adcb-850cce2a51ae%2FUntitled.png?id=3e69653f-114f-42e7-975a-56e529faab30&table=block&spaceId=c913953c-7dcd-467e-9494-b176142e9ba0&width=1830&userId=&cache=v2
https://robchxx.notion.site/image/https%3A%2F%2Fs3-us-west-2.amazonaws.com%2Fsecure.notion-static.com%2F3474f7b7-56d8-4e46-9b6a-b703c9faded3%2FUntitled.png?id=1dd28e20-8e26-4838-a4d8-ca2d08c43ef2&table=block&spaceId=c913953c-7dcd-467e-9494-b176142e9ba0&width=1830&userId=&cache=v2

是可以通的。

然后就是像之前那样在Flink中创表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CREATE TABLE KafkaTable8 (
   id INT,
   category_id INT,
   spu_id INT,
   name STRING,
   category_name STRING,
   spu_name STRING,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'test',
  'properties.bootstrap.servers' = '172.23.80.104:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);

然后写insert语句创建连接

1
2
3
4
5
INSERT INTO KafkaTable8
select k.id as id, c.id as category_id, p.id as spu_id, k.name as name, c.name as category_name, p.name as spu_name
from sku k
left join spu p on p.id = k.spu_id
left join category c on c.id = p.category_id;

https://robchxx.notion.site/image/https%3A%2F%2Fs3-us-west-2.amazonaws.com%2Fsecure.notion-static.com%2Fd8139e1b-b120-4f3d-9ccf-510a590b3cc2%2Fimage_(9).png?id=93f4d924-8c21-4189-be33-f0a9eca7d5cc&table=block&spaceId=c913953c-7dcd-467e-9494-b176142e9ba0&width=1830&userId=&cache=v2
然后我们就可以看到kafka中会有事件同步过来了。

当然我们也可以像事件一样将数据Sink到Redis中,这样连缓存的建立都可以和业务代码解耦,不用再在业务中写一套写入和更新缓存的操作,所以未来Flink有无尽的可能等待着去探索。

对了,这次研究这个Flink CDC项目还给官方仓库提了一个PR被Merge了,虽然只是教程的一些错误,但还是很开心,下次争取源码的PR。

https://robchxx.notion.site/image/https%3A%2F%2Fs3-us-west-2.amazonaws.com%2Fsecure.notion-static.com%2Fd2276f9e-acd2-4edb-808a-d7a96dda1b6f%2Fimage_(10).png?id=360dee9a-4780-4883-8551-726330729db5&table=block&spaceId=c913953c-7dcd-467e-9494-b176142e9ba0&width=1830&userId=&cache=v2