Hudi
Hudi
一、编译安装
组件 | 版本 |
---|---|
Hadoop | 3.2.4 |
Hive | 3.1.3 |
Flink | 1.15.3 scala-2.12 |
Spark | 3.2.3,scala-2.12 |
1.1 安装Maven
# 解压放置
[tpxcer@hadoop1 ~]$ wget https://dlcdn.apache.org/maven/maven-3/3.8.6/binaries/apache-maven-3.8.6-bin.tar.gz
[root@hadoop1 ~]# mkdir /opt/module
[root@hadoop1 ~]# chmod 777 /opt/module/
[tpxcer@hadoop1 ~]$ tar -zxvf apache-maven-3.8.6-bin.tar.gz -C /opt/module/
[tpxcer@hadoop1 module]$ ln -s apache-maven-3.8.6/ maven
# 添加环境变量
sudo vim /etc/profile
#MAVEN_HOME
export MAVEN_HOME=/opt/module/maven
export PATH=$PATH:$MAVEN_HOME/bin
source /etc/profile
mvn -v
# maven修改为阿里镜像
[tpxcer@hadoop1 ~]$ vim /opt/module/maven/conf/settings.xml
<!-- 添加阿里云镜像-->
<mirror>
<id>nexus-aliyun</id>
<mirrorOf>central</mirrorOf>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>
1.2 编译Hudi
2.2.1 上传源码包
[tpxcer@hadoop1 ~]$ wget https://dlcdn.apache.org/hudi/0.12.1/hudi-0.12.1.src.tgz
[tpxcer@hadoop1 ~]$ tar -zxvf hudi-0.12.1.src.tgz -C /opt/soft
2.2.2 修改pom文件
vim hudi/pom.xml
新增repository加速依赖下载
<repository>
<id>nexus-aliyun</id>
<name>nexus-aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
修改依赖的组件版本
<hadoop.version>3.2.4</hadoop.version>
<hive.version>3.1.3</hive.version>
2.2.3 修改源码兼容hadoop3
Hudi默认依赖的hadoop2,要兼容hadoop3,除了修改版本,还需要修改如下代码:
[tpxcer@hadoop1 soft]$ vim /opt/soft/hudi/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
// 第一百行
try (FSDataOutputStream outputStream = new FSDataOutputStream(baos)) {
改为
try (FSDataOutputStream outputStream = new FSDataOutputStream(baos)) {
2.2.4 解决spark模块依赖冲突
修改了Hive版本为3.1.2,其携带的jetty是0.9.3,hudi本身用的0.9.4,存在依赖冲突。
java.lang.NoSuchMethodError: org.apache.hudi.org.apache.jetty.server.session.SessionHandler.setHttpOnly(Z)V
- 修改hudi-spark-bundle的pom文件,排除低版本jetty,添加hudi指定版本的jetty:
vim /opt/soft/hudi/packaging/hudi-spark-bundle/pom.xml
<!-- Hive -->
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-service</artifactId>
<version>${hive.version}</version>
<scope>${spark.bundle.hive.scope}</scope>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.pentaho</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-service-rpc</artifactId>
<version>${hive.version}</version>
<scope>${spark.bundle.hive.scope}</scope>
</dependency>
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
<scope>${spark.bundle.hive.scope}</scope>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive.version}</version>
<scope>${spark.bundle.hive.scope}</scope>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-core</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-common</artifactId>
<version>${hive.version}</version>
<scope>${spark.bundle.hive.scope}</scope>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty.orbit</groupId>
<artifactId>javax.servlet</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 增加hudi配置版本的jetty -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
<version>${jetty.version}</version>
</dependency>
- 修改hudi-utilities-bundle的pom文件,排除低版本jetty,添加hudi指定版本的jetty:
vim /opt/soft/hudi-0.12.1/packaging/hudi-utilities-bundle/pom.xml
<!-- Hive -->
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-service</artifactId>
<version>${hive.version}</version>
<scope>${utilities.bundle.hive.scope}</scope>
<exclusions>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.pentaho</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-service-rpc</artifactId>
<version>${hive.version}</version>
<scope>${utilities.bundle.hive.scope}</scope>
</dependency>
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
<scope>${utilities.bundle.hive.scope}</scope>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive.version}</version>
<scope>${utilities.bundle.hive.scope}</scope>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-core</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-common</artifactId>
<version>${hive.version}</version>
<scope>${utilities.bundle.hive.scope}</scope>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty.orbit</groupId>
<artifactId>javax.servlet</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 增加hudi配置版本的jetty -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
<version>${jetty.version}</version>
</dependency>
2.2.6 执行编译命令
mvn clean package -DskipTests -Dspark3.2 -Dflink1.15 -Dscala-2.12 -Dhadoop.version=3.2.4 -Pflink-bundle-shade-hive3
2.2.7 编译成功
[tpxcer@hadoop1 hudi]$ hudi-cli/hudi-cli.sh
===================================================================
* ___ ___ *
* /\__\ ___ /\ \ ___ *
* / / / /\__\ / \ \ /\ \ *
* / /__/ / / / / /\ \ \ \ \ \ *
* / \ \ ___ / / / / / \ \__\ / \__\ *
* / /\ \ /\__\ / /__/ ___ / /__/ \ |__| / /\/__/ *
* \/ \ \/ / / \ \ \ /\__\ \ \ \ / / / /\/ / / *
* \ / / \ \ / / / \ \ / / / \ /__/ *
* / / / \ \/ / / \ \/ / / \ \__\ *
* / / / \ / / \ / / \/__/ *
* \/__/ \/__/ \/__/ Apache Hudi CLI *
* *
===================================================================
1491 [main] INFO org.apache.hudi.cli.Main [] - Starting Main v0.12.1 using Java 1.8.0_311 on hadoop1 with PID 9566 (/opt/soft/hudi-0.12.1/hudi-cli/target/hudi-cli-0.12.1.jar started by tpxcer in /opt/soft/hudi-0.12.1)
1503 [main] INFO org.apache.hudi.cli.Main [] - No active profile set, falling back to 1 default profile: "default"
Table command getting loaded
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/soft/hudi-0.12.1/hudi-cli/target/lib/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/soft/hudi-0.12.1/hudi-cli/target/lib/slf4j-reload4j-1.7.35.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
2842 [main] WARN org.jline [] - The Parser of class org.springframework.shell.jline.ExtendedDefaultParser does not support the CompletingParsedLine interface. Completion with escaped or quoted words won't work correctly.
2916 [main] INFO org.apache.hudi.cli.Main [] - Started Main in 1.873 seconds (JVM running for 2.969)
编译完成后,相关的包在packaging目录的各个模块中
[tpxcer@hadoop1 hudi]$ ls packaging/
hudi-aws-bundle hudi-flink-bundle hudi-hadoop-mr-bundle hudi-integ-test-bundle hudi-presto-bundle hudi-timeline-server-bundle hudi-utilities-bundle README.md
hudi-datahub-sync-bundle hudi-gcp-bundle hudi-hive-sync-bundle hudi-kafka-connect-bundle hudi-spark-bundle hudi-trino-bundle hudi-utilities-slim-bundle
二、集成 Spark
Hudi官方Spark文档Spark Quick Start Guide
2.1 安装Spark
2.1.1 安装准备
[tpxcer@node01 ~]$ wget https://dlcdn.apache.org/spark/spark-3.2.3/spark-3.2.3-bin-hadoop3.2.tgz
[tpxcer@node01 ~]$ tar -zxvf spark-3.2.3-bin-hadoop3.2.tgz -C /opt/module/
[tpxcer@node01 ~]$ cd /opt/module/
[tpxcer@node01 module]$ ln -s spark-3.2.3-bin-hadoop3.2/ spark
2.1.2 配置环境变量
sudo vim /etc/profile.d/my_env.sh
#SPARK_HOME
export SPARK_HOME=/opt/module/spark
export PATH=$PATH:$SPARK_HOME/bin
source /etc/profile.d/my_env.sh
2.1.3 拷贝编译好的包到spark的jars目录
[tpxcer@node01 target]$ cp /opt/soft/hudi/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.12.1.jar /opt/module/spark/jars/
2.2 Spark Shell
2.2.1 启动 spark-shell
启动命令
#针对Spark 3.2
spark-shell \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
设置表名,基本路径和数据生成器
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
2.2.2 插入数据
新增数据,生成一些数据,将其加载到DataFrame中,然后将DataFrame写入Hudi表。
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
查看生成的数据
cd /tmp/hudi_trips_cow/
ls
2.2.3 查询数据
- 转换成DF
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
注意:该表有三级分区(区域/国家/城市),在0.9.0版本以前的hudi,在load中的路径需要按照分区目录拼接"",如:load(basePath + "////*"),当前版本不需要。
- 查询
scala> spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
+------------------+-------------------+-------------------+-------------+
| fare| begin_lon| begin_lat| ts|
+------------------+-------------------+-------------------+-------------+
| 64.27696295884016| 0.4923479652912024| 0.5731835407930634|1673128628106|
| 27.79478688582596| 0.6273212202489661|0.11488393157088261|1673106003304|
| 93.56018115236618|0.14285051259466197|0.21624150367601136|1673191167365|
| 33.92216483948643| 0.9694586417848392| 0.1856488085068272|1672667908175|
|34.158284716382845|0.46157858450465483| 0.4726905879569653|1673050451069|
| 43.4923811219014| 0.8779402295427752| 0.6100070562136587|1673184204562|
| 66.62084366450246|0.03844104444445928| 0.0750588760043035|1672972246078|
| 41.06290929046368| 0.8192868687714224| 0.651058505660742|1672713875654|
+------------------+-------------------+-------------------+-------------+
- 时间旅行查询
spark.read.
format("hudi").
option("as.of.instant", "20210728141108100").
load(basePath)
spark.read.
format("hudi").
option("as.of.instant", "2021-07-28 14:11:08.200").
load(basePath)
// 表示 "as.of.instant = 2021-07-28 00:00:00"
spark.read.
format("hudi").
option("as.of.instant", "2021-07-28").
load(basePath)
2.2.4 更新数据
类似于插入新数据,使用数据生成器生成新数据对历史数据进行更新。将数据加载到DataFrame中并将DataFrame写入Hudi表中。
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
注意:保存模式现在是Append。通常,除非是第一次创建表,否则请始终使用追加模式。现在再次查询数据将显示更新的行程数据。每个写操作都会生成一个用时间戳表示的新提交。查找以前提交中相同的_hoodie_record_keys在该表的_hoodie_commit_time、rider、driver字段中的变化。
查询更新后的数据,要重新加载该hudi表:
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath)
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
2.2.5 增量查询
Hudi还提供了增量查询的方式,可以获取从给定提交时间戳以来更改的数据流。需要指定增量查询的beginTime,选择性指定endTime。如果我们希望在给定提交之后进行所有更改,则不需要指定endTime(这是常见的情况)
- 重新加载数据
spark.
read.
format("hudi").
load(basePath).
createOrReplaceTempView("hudi_trips_snapshot")
- 获取指定beginTime
val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2)
- 创建增量查询的表
val tripsIncrementalDF = spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
- 查询增量表
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()
这将过滤出beginTime之后提交且fare>20的数据。 利用增量查询,我们能在批处理数据上创建streaming pipelines。
2.2.6 指定时间点查询
查询特定时间点的数据,可以将endTime指向特定时间,beginTime指向000(表示最早提交时间)
- 指定beginTime和endTime
val beginTime = "000"
val endTime = commits(commits.length - 2)
- 根据指定时间创建表
val tripsPointInTimeDF = spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
option(END_INSTANTTIME_OPT_KEY, endTime).
load(basePath)
tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
- 查询
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
2.2.7 删除数据
根据传入的HoodieKeys来删除(uuid + partitionpath),只有append模式,才支持删除功能。
- 获取总行数
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
- 取其中2条用来删除
val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
- 将待删除的2条数据构建DF
val deletes = dataGen.generateDeletes(ds.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
- 执行删除
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
- 统计删除数据后的行数,验证删除是否成功
val roAfterDeleteViewDF = spark.
read.
format("hudi").
load(basePath)
roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
// 返回的总行数应该比原来少2行
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
2.2.8 覆盖数据
- 查看当前表的key
spark.
read.format("hudi").
load(basePath).
select("uuid","partitionpath").
sort("partitionpath","uuid").
show(100, false)
- 生成一些新的行程数据
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.
read.json(spark.sparkContext.parallelize(inserts, 2)).
filter("partitionpath = 'americas/united_states/san_francisco'")
- 覆盖指定分区
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION.key(),"insert_overwrite").
option(PRECOMBINE_FIELD.key(), "ts").
option(RECORDKEY_FIELD.key(), "uuid").
option(PARTITIONPATH_FIELD.key(), "partitionpath").
option(TBL_NAME.key(), tableName).
mode(Append).
save(basePath)
- 查询覆盖后的key,发生了变化
spark.
read.format("hudi").
load(basePath).
select("uuid","partitionpath").
sort("partitionpath","uuid").
show(100, false)
2.3 Spark SQL方式
2.3.1 创建表
- 启动Hive的Metastore
nohup hive --service metastore &
- 启动spark-sql
如果没有配置hive环境变量,手动拷贝hive-site.xml到spark的conf下 把mysql-connector-java-8.0.30.jar放到
/opt/module/spark/jars
#针对Spark 3.2
spark-sql \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
- 建表参数
参数名 | 默认值 | 说明 |
---|---|---|
primaryKey | uuid | 表的主键名,多个字段用逗号分隔。 同 hoodie.datasource.write.recordkey.field |
preCombineField | 表的预合并字段。 同 hoodie.datasource.write.precombine.field | |
type | cow | 创建的表类型: type = 'cow' type = 'mor' 同hoodie.datasource.write.table.type |
- 创建非分区表
-- 创建一个cow表,默认primaryKey 'uuid',不提供preCombineField
create table hudi_cow_nonpcf_tbl (
uuid int,
name string,
price double
) using hudi;
-- 创建一个mor非分区表
create table hudi_mor_tbl (
id int,
name string,
price double,
ts bigint
) using hudi
tblproperties (
type = 'mor',
primaryKey = 'id',
preCombineField = 'ts'
);
- 创建分区表
-- 创建一个cow分区外部表,指定primaryKey和preCombineField
create table hudi_cow_pt_tbl (
id bigint,
name string,
ts bigint,
dt string,
hh string
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl';
- 在已有的hudi表上创建新表
不需要指定模式和非分区列(如果存在)之外的任何属性,Hudi可以自动识别模式和配置。
-- 非分区表
create table hudi_existing_tbl0 using hudi
location 'file:///tmp/hudi/dataframe_hudi_nonpt_table';
-- 分区表
create table hudi_existing_tbl1 using hudi
partitioned by (dt, hh)
location 'file:///tmp/hudi/dataframe_hudi_pt_table';
- 通过CTAS (Create Table As Select)建表
为了提高向hudi表加载数据的性能,CTAS使用批量插入作为写操作。
-- 通过CTAS创建cow非分区表,不指定preCombineField
create table hudi_ctas_cow_nonpcf_tbl
using hudi
tblproperties (primaryKey = 'id')
as
select 1 as id, 'a1' as name, 10 as price;
-- 通过CTAS创建cow分区表,指定preCombineField
create table hudi_ctas_cow_pt_tbl
using hudi
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts')
partitioned by (dt)
as
select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-12-01' as dt;
通过CTAS从其他表加载数据
-- 创建内部表
create table parquet_mngd using parquet location 'file:///tmp/parquet_dataset/*.parquet';
-- 通过CTAS加载数据
create table hudi_ctas_cow_pt_tbl2 using hudi location 'file:/tmp/hudi/hudi_tbl/' options (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (datestr) as select * from parquet_mngd;
2.3.2 插入数据
默认情况下,如果提供了preCombineKey,则insert into的写操作类型为upsert,否则使用insert。
- 向非分区表插入数据
insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20;
insert into hudi_mor_tbl select 1, 'a1', 20, 1000;
- 向分区表动态分区插入数据
insert into hudi_cow_pt_tbl partition (dt, hh)
select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh;
- 向分区表静态分区插入数据
insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11')
select 2, 'a2', 1000;
- 使用bulk_insert插入数据
hudi支持使用bulk_insert作为写操作的类型,只需要设置两个配置:hoodie.sql.bulk.insert.enable
和hoodie.sql.insert.mode
。
-- 向指定preCombineKey的表插入数据,则写操作为upsert
insert into hudi_mor_tbl select 1, 'a1_1', 20, 1001;
select id, name, price, ts from hudi_mor_tbl;
1 a1_1 20.0 1001
-- 向指定preCombineKey的表插入数据,指定写操作为bulk_insert
set hoodie.sql.bulk.insert.enable=true;
set hoodie.sql.insert.mode=non-strict;
insert into hudi_mor_tbl select 1, 'a1_2', 20, 1002;
select id, name, price, ts from hudi_mor_tbl;
1 a1_1 20.0 1001
1 a1_2 20.0 1002
2.3.3 查询数据
- 查询
select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0
- 时间旅行查询
Hudi从0.9.0开始就支持时间旅行查询。Spark SQL方式要求Spark版本 3.2及以上。
-- 关闭前面开启的bulk_insert
set hoodie.sql.bulk.insert.enable=false;
create table hudi_cow_pt_tbl1 (
id bigint,
name string,
ts bigint,
dt string,
hh string
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl1';
-- 插入一条id为1的数据
insert into hudi_cow_pt_tbl1 select 1, 'a0', 1000, '2021-12-09', '10';
select * from hudi_cow_pt_tbl1;
-- 修改id为1的数据
insert into hudi_cow_pt_tbl1 select 1, 'a1', 1001, '2021-12-09', '10';
select * from hudi_cow_pt_tbl1;
-- 基于第一次提交时间进行时间旅行
select * from hudi_cow_pt_tbl1 timestamp as of '20220307091628793' where id = 1;
-- 其他时间格式的时间旅行写法
select * from hudi_cow_pt_tbl1 timestamp as of '2022-03-07 09:16:28.100' where id = 1;
select * from hudi_cow_pt_tbl1 timestamp as of '2022-03-08' where id = 1;
2.3.4 更新数据
- update
更新操作需要指定preCombineField。
-- 语法
UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression]
-- 执行更新
update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;
update hudi_cow_pt_tbl1 set name = 'a1_1', ts = 1001 where id = 1;
-- update using non-PK field
update hudi_cow_pt_tbl1 set ts = 1111 where name = 'a1_1';
- MergeInto
-- 语法
MERGE INTO tableIdentifier AS target_alias
USING (sub_query | tableIdentifier) AS source_alias
ON <merge_condition>
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN NOT MATCHED [ AND <condition> ] THEN <not_matched_action> ]
<merge_condition> =A equal bool condition
<matched_action> =
DELETE |
UPDATE SET * |
UPDATE SET column1 = expression1 [, column2 = expression2 ...]
<not_matched_action> =
INSERT * |
INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])
-- 执行案例
-- 1、准备source表:非分区的hudi表,插入数据
create table merge_source (id int, name string, price double, ts bigint) using hudi
tblproperties (primaryKey = 'id', preCombineField = 'ts');
insert into merge_source values (1, "old_a1", 22.22, 2900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);
merge into hudi_mor_tbl as target
using merge_source as source
on target.id = source.id
when matched then update set *
when not matched then insert *
;
-- 2、准备source表:分区的parquet表,插入数据
create table merge_source2 (id int, name string, flag string, dt string, hh string) using parquet;
insert into merge_source2 values (1, "new_a1", 'update', '2021-12-09', '10'), (2, "new_a2", 'delete', '2021-12-09', '11'), (3, "new_a3", 'insert', '2021-12-09', '12');
merge into hudi_cow_pt_tbl1 as target
using (
select id, name, '2000' as ts, flag, dt, hh from merge_source2
) source
on target.id = source.id
when matched and flag != 'delete' then
update set id = source.id, name = source.name, ts = source.ts, dt = source.dt, hh = source.hh
when matched and flag = 'delete' then delete
when not matched then
insert (id, name, ts, dt, hh) values(source.id, source.name, source.ts, source.dt, source.hh)
;
2.3.5 删除数据
- 语法
DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION]
- 案例
delete from hudi_cow_nonpcf_tbl where uuid = 1;
delete from hudi_mor_tbl where id % 2 = 0;
-- 使用非主键字段删除
delete from hudi_cow_pt_tbl1 where name = 'a1_1';
...........
三、集成 Flink
3.1 环境准备
- 拷贝编译好的jar包到Flink的lib目录下
cp /opt/soft/hudi/packaging/hudi-flink-bundle/target/hudi-flink1.15-bundle-0.12.1.jar /opt/module/flink/lib/
- 拷贝guava包,解决依赖冲突
cp /opt/module/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar /opt/module/flink/lib/
- 配置Hadoop环境变量
sudo vim /etc/profile.d/my_env.sh
export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
source /etc/profile.d/my_env.sh
3.2 sql-client方式
- 修改
flink-conf.yaml
配置
vim /opt/module/flink/conf/flink-conf.yaml
classloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 4
state.backend: rocksdb
execution.checkpointing.interval: 30000
state.checkpoints.dir: hdfs://node01:8020/ckps
state.backend.incremental: true
rest.bind-address: 0.0.0.0
- local模式
修改workers
vim /opt/module/flink/conf/workers
#表示:会在本地启动3个TaskManager的 local集群
localhost
localhost
localhost
启动Flink
[tpxcer@node01 module]$ /opt/module/flink/bin/start-cluster.sh
[tpxcer@node01 module]$ jps
3680 TaskManagerRunner
3284 TaskManagerRunner
2518 StandaloneSessionClusterEntrypoint
2894 TaskManagerRunner
查看webui http://node01:8081
启动Flink的sql-client
[tpxcer@node01 module]$ /opt/module/flink/bin/sql-client.sh embedded
- yarn-session模式
解决依赖问题
[tpxcer@node01 module]$ cp /opt/module/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.2.4.jar /opt/module/flink/lib/
启动yarn-session
/opt/module/flink/bin/yarn-session.sh -d
通过http://node01:8088/cluster
http://node01:8088/proxy/application_1671429969753_0004/#/overview
查看部署情况
启动sql-client
/opt/module/flink/bin/sql-client.sh embedded -s yarn-session
3.3 插入数据
set sql-client.execution.result-mode=tableau;
-- 创建hudi表
CREATE TABLE t1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://node01:8020/tmp/hudi_flink/t1',
'table.type' = 'MERGE_ON_READ' –- 默认是COW
);
或如下写法
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20),
PRIMARY KEY(uuid) NOT ENFORCED
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://node01:8020/tmp/hudi_flink/t1',
'table.type' = 'MERGE_ON_READ'
);
-- 插入数据
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
3.4 查询数据
select * from t1;
3.5 更新数据
insert into t1 values
('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');
3.6 流式插入
- 创建测试表
CREATE TABLE sourceT (
uuid varchar(20),
name varchar(10),
age int,
ts timestamp(3),
`partition` varchar(20)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);
create table t2(
uuid varchar(20),
name varchar(10),
age int,
ts timestamp(3),
`partition` varchar(20)
)
with (
'connector' = 'hudi',
'path' = '/tmp/hudi_flink/t2',
'table.type' = 'MERGE_ON_READ'
);
- 执行插入
insert into t2 select * from sourceT;
[INFO] Submitting SQL update statement to the cluster...
2023-01-31 11:39:14,715 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at node01/192.168.50.223:8032
2023-01-31 11:39:14,715 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-01-31 11:39:14,722 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node03:33271 of application 'application_1675069792410_0001'.
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: dbf339339c2741082172d6a1df61e3d1
- 通过上文中的
node03:33271
查看运行状态
3.7 IDEA编码方式
除了用sql-client,还可以自己编写FlinkSQL程序,打包提交Flink作业。flink-hudi-demo
- 提交运行
将代码打成jar包,上传到目录myjars,执行提交命令:
bin/flink run -t yarn-per-job \
-c com.xxxx.hudi.flink.HudiDemo \
./myjars/flink-hudi-demo-1.0-SNAPSHOT.jar
3.8 类型映射
Flink SQL Type | Hudi Type | Avro logical type |
---|---|---|
CHAR / VARCHAR / STRING | string | |
BOOLEAN | boolean | |
BINARY / VARBINARY | bytes | |
DECIMAL | fixed | decimal |
TINYINT | int | |
SMALLINT | int | |
INT | int | |
BIGINT | long | |
FLOAT | float | |
DOUBLE | double | |
DATE | int | date |
TIME | int | time-millis |
TIMESTAMP | long | timestamp-millis |
ARRAY | array | |
MAP (key must be string/char/varchar type) | map | |
MULTISET (element must be string/char/varchar type) | map | |
ROW | record |