CODE大全
您的位置 : 首页 > 前端在线资源 >

Kafka KSQL入门

发布时间:『 2018-01-06

基于前面两篇关于KSQL的介绍和实现原理,本文将继续进入到KSQL的开发实战阶段,给大家讲解一个简单的demo程序。

本实例将演示一个使用KSQL的简单工作流程来针对Kafka中的数据编写流式查询。

由于KSQL查询Kafka群集中的数据,因此您需要启动Kafka群集,其中包括ZooKeeper和Kafka代理。

第一步:启动Kafka集群并启动KSQL。

Docker安装KSQL 非Docker环境下安装KSQL

第二步:成功启动Kafka集群并启动KSQL后,您将看到KSQL提示符:

                   ======================================
                   =      _  __ _____  ____  _          =
                   =     | |/ // ____|/ __ \| |         =
                   =     | ' /| (___ | |  | | |         =
                   =     |  <  \___ \| |  | | |         =
                   =     | . \ ____) | |__| | |____     =
                   =     |_|\_\_____/ \___\_\______|    =
                   =                                    =
                   =   Streaming SQL Engine for Kafka   =
Copyright 2017 Confluent Inc.                         

CLI v0.1, Server v0.1 located at http://localhost:9098

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>

第三步:KSQL提供了结构化查询语言来查询Kafka数据,因此您需要一些数据进行查询。

如果您使用Docker Compose文件,Docker容器已经在运行,该数据生成器正在向Kafka集群不断生成Kafka消息。不需要采取进一步行动。 如果您不使用Docker环境,请按照非Docker环境中说明的方法向Kafka群集生成数据。

创建Stream 和 Table

本示例显示了从Kafka主题中查询数据的示例,pageviews并users使用以下模式:

创建Stream 和 Table

在继续之前,请确认:

在您启动KSQL的终端窗口中,您会看到ksql>提示。 如果你不想使用Docker,你必须手http://www.bdx-leb.com/a0f4/eafe79ff297a.html动运行数据发生器来产生主题叫pageviews 和users。参考非Docker环境中向Kafka群集生成数据的方法。

从Kafka的pageviews主题(topic)创建一http://www.bdx-leb.com/6106/1ff47d223366.html个pageviews_original流(Stream)。指定value_format的DELIMITED。然后DESCRIBE是新的STREAM。请注意,KSQL创建了额外的列ROWTIME,它们对应于Kafka消息时间戳,并且ROWKEY对应于Kafka消息密钥。

ksql> CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED');
ksql> DESCRIBE pageviews_original;
 Field    | Type            
----------------------------
 ROWTIME  | BIGINT          
 ROWKEY   | VARCHAR(STRING) 
 VIEWTIME | BIGINT          
 USERID   | VARCHAR(STRING) 
 PAGEID   | VARCHAR(STRING) 

创建一个表,users_original从卡夫卡的话题users,指定value_format的JSON。然后DESCRIBE新的TABLE。

ksql> CREATE TABLE users_original (registertime bigint, gender varchar, regionid varchar, userid varchar) WITH (kafka_topic='users', value_format='JSON');
ksql> DESCRIBE users_original;
 Field        | Type            
--------------------------------
 ROWTIME      | BIGINT          
 ROWKEY       | VARCHAR(STRING) 
 REGISTERTIME | BIGINT          
 GENDER       | VARCHAR(STRING) 
 REGIONID     | VARCHAR(STRING) 
 USERID       | VARCHAR(STRING)

显示所有STREAMS和TABLES。

ksql> SHOW STREAMS;

 Stream Name              | Kafka Topic              | Format    
-----------------------------------------------------------------
 PAGEVIEWS_ORIGINAL       | pageviews                | DELIMITED 

ksql> SHOW TABLES;

 Table Name        | Kafka Topic   http://www.bdx-leb.com/18d8/e3b79fdf1f0d.html    | Format    | Windowed 
--------------------------------------------------------------
 USERS_ORIGINAL    | users             | JSON      | false  

使用查询

默认情况下,KSQL从最新的偏移量读取流和表的主题。

使用SELECT创建从流返回数据的查询。要停止查看数据,请按<ctrl-c>。您可以选择包含LIMIT关键字来限制http://www.bdx-leb.com/7535/ffc29d1b1150.html查询结果中返回的行数。请注意,由于数据生成的随机性,精确的数据输出可能会有所不同。

ksql> CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED');
ksql> DESCRIBE pageviews_original;
 Field    | Type            
----------------------------
 ROWTIME  | BIGINT          
 ROWKEY   | VARCHAR(STRING) 
 VIEWTIME | BIGINT          
 USERID   | VARCHAR(STRING) 
 PAGEID   | VARCHAR(STRING) 

通过CREATE STREAM在SELECT语句之前使用关键字来创建持久性查询。与上述非持久查询不同,此查询的结果将写入Kafka主题PAGEVIEWS_FEMALE。下面的查询将pageviews通过LEFT JOIN使用users_original用户ID 的TABLE来满足STREAM,满足条件。

ksql> CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE';
ksql> DESCRIBE pageviews_female;
 Field    | Type            
----------------------------
 ROWTIME  | BIGINT          
 ROWKEY   | VARCHAR(STRING) 
 USERID   | VARCHAR(STRING) 
 PAGEID   | VARCHAR(STRING) 
 REGIONID | VARCHAR(STRING) 
 GENDER   | VARCHAR(STRING) 

使用SELECT查看查询结果,因为他们进来。停止观看查询结果,按<ctrl-c>。这将停止打印到控制台,但不会终止实际的查询。该查询继续运行在Kernel应用程序中。

ksql> SELECT * FROM pageviews_female;
1502477856762 | User_2 | User_2 | Page_55 | Region_9 | FEMALE
1502477857946 | User_5 | User_5 | Page_14 | Region_2 | FEMALE
1502477858436 | User_3 | User_3 | Page_60 | Region_3 | FEMALE
^CQuery terminated
ksql> 

创建一个新的持久性查询,其中使用另一个条件LIKE。此查http://www.bdx-leb.com/59b5/4a6e3c4c4e74.html询的结果将写入名为“Kafka”的主题pageviews_enriched_r8_r9。

ksql> CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='DELIMITED') AS SELECT * FROM pageviews_femhttp://www.bdx-leb.com/de7c/43db9618e761.htmlale WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';

创建一个新的持久性查询,当计数大于1时,会在30秒的滚动窗口中计算每个区域和性别组合的网页浏览量。此查询的结果将写入名为的Kafka主题PAGEVIEWS_REGIONS。

ksql> CREATE TABLE pageviews_regions AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1;
ksql> DESCRIBE pageviews_regions;
 Field    | Type            
----------------------------
 ROWTIME  | BIGINT          
 ROWKEY   | VARCHAR(STRING) 
 GENDER   | VARCHAR(STRING) 
 REGIONID | VARCHAR(STRING) 
 NUMUSERS | BIGINT 

使用SELECT查看从上面的查询结果。

ksql> SELECT regionid, numusers FROM pageviews_regions LIMIT 5;
Region_3 | 4
Region_3 | 5
Region_6 | 5
Region_6 | 6
Region_3 | 8
LIMIT reached for the partition.
Query terminated
ksql> 

显示所有持久查询。

ksql> SHOW QUERIES;
 Query http://www.bdx-leb.com/b77b/7d0eaf90a44a.htmlID | Kafka Topic              | Query String                                                                                                 http://www.bdx-leb.com/228b/b8b2b742b82a.html                                                                                                                     
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 1        | PAGEVIEWS_FEMALE         | CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE'; 
 2        | pageviews_enriched_r8_r9 | CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='DELIMITED') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';                                
 3        | PAGEVIEWS_REGIONS        | CREATE TABLE pageviews_regions AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1;   

退出终端

查询将作为KSQL应用程序持续运行,直到它们被手动终止。退出KSQL不会终止持久查询。

从输出中SHOW QUERIES;标识您要终止的查询ID。例如,如果你想终止查询ID 2:

ksql> TERMINATE 2;

要从KSQL退出,键入'exit�http://www.bdx-leb.com/04f2/ed872613fd7f.html39;。

ksql> exit

关闭Docker Compose

如果你运行Docker Compose,你必须明确地关闭Docker Compose。

下面的命令将删除所有KSQL查询和主题数据。

$ docker-compose down

更多详细命令,请参阅docker-compose down文档。

停止Confluent平台

使用下面的命令,即可停止confluent平台。

$ conhttp://www.bdx-leb.com/2a71/05356d1d114e.htmlfluent stop

参考资料

https://github.com/confluentinc/ksql/tree/0.1.x/docs/quickstart

——— 全文完 ———
Powered by 张鑫旭 | 鄂ICP备14009759号-2 | 网站留言 Copyright © 2014-2016 张鑫旭 版权所有