SQL
Github 关注数排行榜
-- 通过DDL语句创建SLS源表,SLS中存放了Github的实时数据。
CREATE TEMPORARY TABLE gh_event(
id STRING, -- 每个事件的唯一ID。
created_at BIGINT, -- 事件时间,单位秒。
created_at_ts as TO_TIMESTAMP(created_at*1000), -- 事件时间戳(当前会话时区下的时间戳,如:Asia/Shanghai)。
type STRING, -- Github事件类型,如:。ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。
actor_id STRING, -- Github用户ID。
actor_login STRING, -- Github用户名。
repo_id STRING, -- Github仓库ID。
repo_name STRING, -- Github仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。
org STRING, -- Github组织ID。
org_login STRING -- Github组织名,如: apache,google,alibaba等。
) WITH (
'connector' = 'sls', -- 实时采集的Github事件存放在阿里云SLS中。
'project' = 'github-events-shanghai', -- 存放公开数据的SLS项目。例如'github-events-hangzhou'。
'endPoint' = 'https://cn-shanghai-intranet.log.aliyuncs.com', -- 公开数据仅限VVP通过私网地址访问。例如'https://cn-hangzhou-intranet.log.aliyuncs.com'。
'logStore' = 'realtime-github-events', -- 存放公开数据的SLS logStore。
'accessId' = 'LTAI5tNF1rP8PKVyYjr9TKgh', -- 只读账号的AK。
'accessKey' = 'FDgsh6fjSmkbFsx083tN6HOiqNVWTP', -- 只读账号的SK。
'batchGetSize' = '500', -- 批量读取数据,每批最多拉取500条。
'startTime' = '2023-07-27 00:00:00' -- 开始时间,尽量设置到需要计算的时间附近,否则无效计算的时间较长。默认值为当前值
);
-- 配置开启mini-batch, 每2s处理一次。
SET 'table.exec.mini-batch.enabled'='true';
SET 'table.exec.mini-batch.allow-latency'='2s';
SET 'table.exec.mini-batch.size'='4096';
-- 作业设置4个并发,聚合更快。
SET 'parallelism.default' = '1';
-- 查看Github新增star数Top 5仓库。
SELECT DATE_FORMAT(created_at_ts, 'yyyy-MM-dd') as `date`, repo_name, COUNT(*) as num
FROM gh_event WHERE type = 'WatchEvent'
GROUP BY DATE_FORMAT(created_at_ts, 'yyyy-MM-dd'), repo_name
ORDER BY num DESC
统计组织活跃度变化
REATE TEMPORARY TABLE gh_event(
id STRING, -- 每个事件的唯一ID。
created_at BIGINT, -- 事件时间,单位秒。
created_at_ts as TO_TIMESTAMP(created_at*1000), -- 事件时间戳(当前会话时区下的时间戳,如:Asia/Shanghai)。
type STRING, -- Github事件类型,如:。ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。
actor_id STRING, -- Github用户ID。
actor_login STRING, -- Github用户名。
repo_id STRING, -- Github仓库ID。
repo_name STRING, -- Github仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。
org STRING, -- Github组织ID。
org_login STRING -- Github组织名,如: apache,google,alibaba等。
) WITH (
'connector' = 'sls', -- 实时采集的Github事件存放在阿里云SLS中。
'project' = 'github-events-shanghai', -- 存放公开数据的SLS项目。例如'github-events-hangzhou'。
'endPoint' = 'https://cn-shanghai-intranet.log.aliyuncs.com', -- 公开数据仅限VVP通过私网地址访问。例如'https://cn-hangzhou-intranet.log.aliyuncs.com'。
'logStore' = 'realtime-github-events', -- 存放公开数据的SLS logStore。
'accessId' = ' ', -- 只读账号的AK。
'accessKey' = ' ', -- 只读账号的SK。
'batchGetSize' = '500', -- 批量读取数据,每批最多拉取500条。
'startTime' = '2023-06-07 14:00:00' -- 开始时间,尽量设置到需要计算的时间附近,否则无效计算的时间较长
);
-- 配置开启mini-batch, 每2s处理一次。
SET 'table.exec.mini-batch.enabled'='true';
SET 'table.exec.mini-batch.allow-latency'='2s';
SET 'table.exec.mini-batch.size'='4096';
-- 作业设置4个并发,聚合更快。
SET 'parallelism.default' = '4';
-- 从一天前开始统计事件总量
SELECT NOW(), max(created_at_ts) as created_ts, COUNT(*) as event_count
FROM gh_event
WHERE org_login ='apache' and
created_at_ts >= NOW() - INTERVAL '1' DAY;
统计仓库贡献时间分布情况
CREATE TEMPORARY TABLE gh_event(
id STRING, -- 每个事件的唯一ID。
created_at BIGINT, -- 事件时间,单位秒。
created_at_ts as TO_TIMESTAMP(created_at*1000), -- 事件时间戳(当前会话时区下的时间戳,如:Asia/Shanghai)。
type STRING, -- Github事件类型,如:。ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。
actor_id STRING, -- Github用户ID。
actor_login STRING, -- Github用户名。
repo_id STRING, -- Github仓库ID。
repo_name STRING, -- Github仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。
org STRING, -- Github组织ID。
org_login STRING -- Github组织名,如: apache,google,alibaba等。
) WITH (
'connector' = 'sls', -- 实时采集的Github事件存放在阿里云SLS中。
'project' = 'github-events-shanghai', -- 存放公开数据的SLS项目。例如'github-events-hangzhou'。
'endPoint' = 'https://cn-shanghai-intranet.log.aliyuncs.com', -- 公开数据仅限VVP通过私网地址访问。例如'https://cn-hangzhou-intranet.log.aliyuncs.com'。
'logStore' = 'realtime-github-events', -- 存放公开数据的SLS logStore。
'accessId' = ' ', -- 只读账号的AK。
'accessKey' = ' ', -- 只读账号的SK。
'batchGetSize' = '500', -- 批量读取数据,每批最多拉取500条。
'startTime' = '2023-06-01 14:00:00' -- 开始时间,尽量设置到需要计算的时间附近,否则无效计算的时间较长
);
-- 配置开启mini-batch, 每2s处理一次。
SET 'table.exec.mini-batch.enabled'='true';
SET 'table.exec.mini-batch.allow-latency'='2s';
SET 'table.exec.mini-batch.size'='4096';
-- 作业设置4个并发,聚合更快。
SET 'parallelism.default' = '4';
-- 配置开启mini-batch, 每2s处理一次。
SET 'table.exec.mini-batch.enabled'='true';
SET 'table.exec.mini-batch.allow-latency'='2s';
SET 'table.exec.mini-batch.size'='4096';
-- 作业设置4个并发,聚合更快。
SET 'parallelism.default' = '4';
-- 统计从上周起的贡献量
SELECT DATE_FORMAT(created_at_ts, 'yyyy-MM-dd') as comment_date, HOUR(created_at_ts) AS comment_hour ,COUNT(*) AS comment_count
FROM gh_event
WHERE created_at_ts >= NOW() - INTERVAL '7' DAY
AND repo_name = 'apache/flink'
AND (type ='CommitCommentEvent' OR
type='IssueCommentEvent' or
type = 'PullRequestReviewCommentEvent'or
type = 'PushEvent' or
type = 'PullRequestEvent' or
type = 'PullRequestReviewEvent')
GROUP BY DATE_FORMAT(created_at_ts, 'yyyy-MM-dd'), HOUR(created_at_ts) ;
cdc链接
CREATE TEMPORARY TABLE source_table (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
record_time TIMESTAMP_LTZ(3),
good_id INT,
amount INT,
WATERMARK FOR record_time AS record_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '******************.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = '***********',
'password' = '***********',
'database-name' = '***********',
'table-name' = 'source_table'
);
按照时间窗口进行分组计算
我们使用 TUMBLE相关窗口函数结合 GROUP BY,将长度 15 秒内的订单数据按照商品 ID 进行归类,并使用 SUM计算其销售总额。
SELECT
good_id,
tumble_start(
record_time, interval '15' seconds
) AS record_timestamp,
sum(amount) AS total_amount
FROM
source_table
GROUP BY
tumble (
record_time, interval '15' seconds
),
good_id;
连接纬度表
这里,我们希望根据上一步中统计出的「每 15 秒商品销售量」信息,计算出每件商品的销售额。由于商品名称及商品价格数据存储在另一张维度表 dimension_table中,我们需要将结果视图和 dimension_table进行 JOIN 操作,并将「商品销售量」、「商品价格」相乘计算出「商品销售额」,并提取结果中的商品可读名称信息作为结果表。
CREATE TEMPORARY TABLE dimension_table (
good_id INT NOT NULL PRIMARY KEY NOT ENFORCED,
good_name VARCHAR(256),
good_price INT
) WITH (
'connector' = 'mysql',
'hostname' = '******************.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = '***********',
'password' = '***********',
'database-name' = '***********',
'table-name' = 'dimension_table'
);
SELECT
record_timestamp,
good_name,
total_amount * good_price AS revenue
FROM
(
SELECT
good_id,
tumble_start(
record_time, interval '15' seconds
) AS record_timestamp,
sum(amount) AS total_amount
FROM
source_table
GROUP BY
tumble (
record_time, interval '15' seconds
),
good_id
) AS tumbled_table
LEFT JOIN dimension_table ON tumbled_table.good_id = dimension_table.good_id;
将表写入mysql
我们将这些实时的统计数据写回数据库,Flink SQL 也可以简单地实现这一点。首先我们需要创建一张用于连接汇表的 Flink 临时表,如下所示:
CREATE TEMPORARY TABLE sink_table (
record_timestamp TIMESTAMP(3) NOT NULL PRIMARY KEY NOT ENFORCED,
good_name VARCHAR(128),
sell_amount INT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://******************.mysql.rds.aliyuncs.com:3306/***********',
'table-name' = 'sink_table',
'username' = '***********',
'password' = '***********',
'scan.auto-commit' = 'true'
);
然后,只需要将上面的 SELECT 语句的输出结果 INSERT 到该表就可以了:
INSERT INTO sink_table
SELECT
record_timestamp,
-- ... 和上面的语句一样
数据写入ES
创建临时表
CREATE TEMPORARY TABLE es_sink(
order_id BIGINT,
`user_id` bigint,
auction_id bigint,
cat_id bigint,
cat1 bigint,
property varchar,
buy_mount int,
`day` varchar ,
birthday varchar,
gender int,
PRIMARY KEY(order_id) NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://**********:9200',
'index' = 'enriched_orders',
'username' ='elastic',
'password' ='*******'--创建ES实例时自定义的密码
);
插入数据
INSERT INTO es_sink
SELECT o.*,
b.birthday,
b.gender
FROM orders_dataset /*+ OPTIONS('server-id'='123450-123452') */ o
LEFT JOIN baby_dataset /*+ OPTIONS('server-id'='123453-123455') */ as b
ON o.user_id = b.user_id;