-- Flink SQL Flink SQL>CREATETABLE category ( id INT, name STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector'='mysql-cdc', 'hostname'='localhost', 'port'='3306', 'username'='root', 'password'='654321', 'database-name'='es-test', 'table-name'='category' );
Flink SQL>CREATETABLE spu ( id INT, category_id INT, name STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector'='mysql-cdc', 'hostname'='localhost', 'port'='3306', 'username'='root', 'password'='654321', 'database-name'='es-test', 'table-name'='spu' );
Flink SQL>CREATETABLE sku ( id INT, spu_id INT, name STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector'='mysql-cdc', 'hostname'='localhost', 'port'='3306', 'username'='root', 'password'='654321', 'database-name'='es-test', 'table-name'='sku' );
就这样我们定义好了我们的source。
创建CDC Sink
然后就要创建Sink了,Sink就是我们要输出的地方,也像创表那样简单,这次我们输出到es中
1 2 3 4 5 6 7 8 9 10 11 12 13
Flink SQL>CREATETABLE product ( id INT, category_id INT, spu_id INT, name STRING, category_name STRING, spu_name STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector'='elasticsearch-7', 'hosts'='http://localhost:9200', 'index'='product' );
Source => Sink
最后就是建立对应关系
1 2 3 4 5
Flink SQL>INSERTINTO product select k.id as id, c.id as category_id, p.id as spu_id, k.name as name, c.name as category_name, p.name as spu_name from sku k leftjoin spu p on p.id = k.spu_id leftjoin category c on c.id = p.category_id;
CREATE TABLE product_v2 ( id INT, category_id INT, spu_id INT, name STRING, category_name STRING, spu_name STRING, material ARRAY<ROW<material_type STRING, material_explain STRING, content STRING>>, stock_num INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://localhost:9200', 'index' = 'product_v2' );
然后就是写最后的insert预计,将表聚合导到es中
1 2 3 4 5 6 7
INSERT INTO product_v2 select k.id as id, c.id as category_id, p.id as spu_id, k.name as name, c.name as category_name, p.name as spu_name, m.detail as material, s.stock_num as stock_num from sku k left join spu p on p.id = k.spu_id left join category c on c.id = p.category_id left join material m on m.sku_id = k.id left join stock s on s.sku_id = k.id;
docker exec -it kafka kafka-console-producer.sh --broker-list172.23.80.104:9092--topic test docker exec -it kafka kafka-console-consumer.sh --bootstrap-server172.23.80.104:9092--topic test --from-beginning
是可以通的。
然后就是像之前那样在Flink中创表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
CREATE TABLE KafkaTable8 ( id INT, category_id INT, spu_id INT, name STRING, category_name STRING, spu_name STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'test', 'properties.bootstrap.servers' = '172.23.80.104:9092', 'key.format' = 'json', 'value.format' = 'json' );
然后写insert语句创建连接
1 2 3 4 5
INSERT INTO KafkaTable8 select k.id as id, c.id as category_id, p.id as spu_id, k.name as name, c.name as category_name, p.name as spu_name from sku k left join spu p on p.id = k.spu_id left join category c on c.id = p.category_id;