亿华智慧云亿华智慧云

Flink SQL 知其所以然:Group 聚合操作

Group 聚合Group 聚合定义(支持 Batch\Streaming 任务):Flink 也支持 Group 聚合。知其作Group 聚合和上面介绍到的合操窗口聚合的不同之处,就在于 Group 聚合是知其作按照数据的类别进行分组,比如年龄、合操性别,知其作是合操横向的;而窗口聚合是在时间粒度上对数据进行分组,是知其作纵向的。如下图所示,合操就展示出了其区别。知其作其中按颜色分 key(横向)​ 就是合操 Group 聚合,按窗口划分(纵向) 就是知其作窗口聚合。

tumble window + key

应用场景:一般用于对数据进行分组,合操然后后续使用聚合函数进行 count、知其作sum 等聚合操作。合操

那么这时候,知其作小伙伴萌就会问到,我其实可以把窗口聚合的写法也转换为 Group 聚合,只需要把 Group 聚合的 Group By key 换成时间就行,那这两个聚合的区别到底在哪?

首先来举一个例子看看怎么将窗口聚合转换为 Group 聚合。假如一个窗口聚合是WordPress模板按照 1 分钟的粒度进行聚合,如下 SQL:

复制-- 数据源表CREATE TABLE source_table ( -- 维度数据 dim STRING, -- 用户 id user_id BIGINT, -- 用户 price BIGINT, -- 事件时间戳 row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), -- watermark 设置 WATERMARK FOR row_time AS row_time - INTERVAL 5

SECOND

) WITH ( connector = datagen, rows-per-second = 10, fields.dim.length = 1, fields.user_id.min = 1, fields.user_id.max = 100000, fields.price.min = 1, fields.price.max = 100000)-- 数据汇表CREATE TABLE sink_table ( dim STRING, pv BIGINT, sum_price BIGINT, max_price BIGINT, min_price BIGINT, uv BIGINT, window_start bigint) WITH ( connector = print)-- 数据处理逻辑insert into

sink_table

select dim, count(*) as pv, sum(price) as sum_price, max(price) as max_price, min(price) as min_price, -- 计算 uv 数 count(distinct user_id) as uv, UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval 1 minute) AS STRING)) * 1000 as

window_start

from

source_table

group by dim, -- 按照 Flink SQL tumble 窗口写法划分窗口 tumble(row_time, interval 1 minute)1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.

转换为 Group 聚合的写法如下:

Group 聚合

复制-- 数据源表CREATE TABLE source_table ( -- 维度数据 dim STRING, -- 用户 id user_id BIGINT, -- 用户 price BIGINT, -- 事件时间戳 row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), -- watermark 设置 WATERMARK FOR row_time AS row_time - INTERVAL 5

SECOND

) WITH ( connector = datagen, rows-per-second = 10, fields.dim.length = 1, fields.user_id.min = 1, fields.user_id.max = 100000, fields.price.min = 1, fields.price.max = 100000);-- 数据汇表CREATE TABLE sink_table ( dim STRING, pv BIGINT, sum_price BIGINT, max_price BIGINT, min_price BIGINT, uv BIGINT, window_start bigint) WITH ( connector = print);-- 数据处理逻辑insert into

sink_table

select dim, count(*) as pv, sum(price) as sum_price, max(price) as max_price, min(price) as min_price, -- 计算 uv 数 count(distinct user_id) as uv, cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as

window_start

from

source_table

group by dim, -- 将秒级别时间戳 / 60 转化为 1min cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.

确实没错,上面这个转换是一点问题都没有的。

但是窗口聚合和 Group by 聚合的差异在于:

本质区别:窗口聚合是具有时间语义的,其本质是想实现窗口结束输出结果之后,后续有迟到的数据也不会对原有的结果发生更改了,即输出结果值是定值(不考虑 allowLateness)。而 Group by 聚合是没有时间语义的,不管数据迟到多长时间,只要数据来了,就把上一次的输出的结果数据撤回,然后把计算好的新的结果数据发出。

运行层面:窗口聚合是和 时间 绑定的,源码库窗口聚合其中窗口的计算结果触发都是由时间(Watermark)推动的。Group by 聚合完全由数据推动触发计算,新来一条数据去根据这条数据进行计算出结果发出;由此可见两者的实现方式也大为不同。

SQL 语义

也是拿离线和实时做对比,Orders 为 kafka,target_table 为 Kafka,这个 SQL 生成的实时任务,在执行时,会生成三个算子:

数据源算子​(From Order):数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的Group 聚合算子,向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送,相同的 key 发到同一个 SubTask(并发) 中。

Group 聚合算子​(group by key + sum\count\max\min):接收到上游算子发的一条一条的数据,去状态 state 中找这个 key 之前的 sum\count\max\min 结果。如果有结果oldResult​,拿出来和当前的亿华云数据进行sum\count\max\min​ 计算出这个 key 的新结果newResult​,并将新结果[key, newResult]​ 更新到 state 中,在向下游发送新计算的结果之前,先发一条撤回上次结果的消息-[key, oldResult]​,然后再将新结果发往下游+[key, newResult]​;如果 state 中没有当前 key 的结果,则直接使用当前这条数据计算 sum\max\min 结果newResult​,并将新结果[key, newResult]​ 更新到 state 中,当前是第一次往下游发,则不需要先发回撤消息,直接发送+[key, newResult]。

数据汇算子(INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中。

这个实时任务也是 24 小时一直在运行的,所有的算子在同一时刻都是处于 running 状态的。

特别注意:

Group by 聚合涉及到了回撤流(也叫 retract 流),会产生回撤流是因为从整个 SQL 的语义来看,上游的 Kafk数据是源源不断的,无穷无尽的,那么每次这个 SQL 任务产出的结果都是一个中间结果,所以每次结果发生更新时,都需要将上一次发出的中间结果给撤回,然后将最新的结果发下去。Group by 聚合涉及到了状态:状态大小也取决于不同 key 的数量。为了防止状态无限变大,我们可以设置状态的 TTL。以上面的 SQL 为例,上面 SQL 是按照分钟进行聚合的,理论上到了今天,通常我们就可以不用关心昨天的数据了,那么我们可以设置状态过期时间为一天。关于状态过期时间的设置参数可以参考下文运行时参数 小节。

如果这个 SQL 放在 Hive 中执行时,其中 Orders 为 Hive,target_table 也为 Hive,其也会生成三个相同的算子,但是其和实时任务的执行方式完全不同:

数据源算子​(From Order):数据源算子从 Order Hive 中读取到所有的数据,然后所有数据发送给下游的Group 聚合算子,向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送,相同的 key 发到同一个算子中,然后这个算子就运行结束了,释放资源了。Group 聚合算子​(group by + sum\count\max\min):接收到上游算子发的所有数据,然后遍历计算 sum\count\max\min 结果,批量发给下游数据汇算子,这个算子也就运行结束了,释放资源了。数据汇算子(INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Hive 中,整个任务也就运行结束了,整个任务的资源也就都释放了。Group 聚合支持 Grouping sets、Rollup、Cube

Group 聚合也支持 Grouping sets、Rollup、Cube。

举一个 Grouping sets 的案例:

复制SELECT

supplier_id

,

rating

,

product_id

, COUNT(*)FROM (VALUES (supplier1, product1, 4), (supplier1, product2, 3), (supplier2, product3, 3), (supplier2, product4, 4))AS Products(supplier_id, product_id, rating)GROUP BY GROUPING SET ( ( supplier_id, product_id, rating ), ( supplier_id, product_id ), ( supplier_id, rating ), ( supplier_id ), ( product_id, rating ), ( product_id ), ( rating ), ( ))‍1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.
赞(832)
未经允许不得转载:>亿华智慧云 » Flink SQL 知其所以然:Group 聚合操作