Sqoop
Sqoop
把MySQL数据导入到HDFS
样本数据
样本数据库地址:http://dev.mysql.com/doc/employee/en/
wget https://launchpadlibrarian.net/24493586/employees_db-full-1.0.6.tar.bz2
tar -xjf employees_db-full-1.0.6.tar.bz2
cd employees_db
mysql -u root -p -t < employees.sql
安装MySQL 驱动
CDH 文档Installing the JDBC Drivers
测试一下:显示数据库和表格清单
$ sqoop list-databases --connect jdbc:mysql://<<mysql-server>>/employees --username airawat --password
$ sqoop list-tables --connect jdbc:mysql://<<mysql-server>>/employees --username airawat --password myPassword
实例
1.创建配置文件
每次执行都要打全参数太吃力,我们可以使用配置文件把通用配置信息写进去,如下:
$ vi SqoopImportOptions.txt
#
#Options file for sqoop import
#
import
--connect
jdbc:mysql://airawat-mySqlServer-node/employees
--username
myUID
--password
myPwd
#
#All other commands should be specified in the command line
2.复制整表
m 表示map的数量
$ sqoop --options-file SqoopImportOptions.txt
--table departments
-m 1
--target-dir departments
3.复制整表到数据库,并指定字段
$ sqoop --options-file SqoopImportOptions.txt \
--table dept_emp \
--columns EMP_NO,DEPT_NO,FROM_DATE \
--as-textfile \
-m 1 \
--target-dir dept_emp
4.导入所有列单过滤行
$ sqoop --options-file SqoopImportOptions.txt \
--table employees \
--where "emp_no > 499948" \
--as-textfile \
-m 1 \
--target-dir employees
5.自定义查询导入数据(包含where子句)
$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
-m 1 \
--target-dir employeeFrfrmQrySmpl2
注意
注意,自定义查询必须跟 $CONDITIONS
,如果要并行执行,则必须带--split-by
参数来拆分数据.具体可以看官方的说明:
$Conditions
If you want to import the results of a query in parallel, then each map task will need to execute a copy of the query, with results partitioned bybounding conditions inferred by Sqoop. Your query must include the token $CONDITIONS
which each Sqoop process will replace with a unique condition expression. You must also select a splitting column with --split-by
.
Controlling parallelism
Sqoop imports data in parallel from most database sources. You can specify the number of map tasks (parallel processes) to use to perform the import by using the -m
or --num-mappers
argument. Each of these arguments takes an integer value which corresponds to the degree of parallelism to employ. By default, four tasks are used. Some databases may see improved performance by increasing this value to 8 or 16. Do not increase the degree of parallelism greater than that available within your MapReduce cluster; tasks will run serially and will likely increase the amount of time required to perform the import. Likewise, do not increase the degree of parallism higher than that which your database can reasonably support. Connecting 100 concurrent clients to your database may increase the load on the database server to a point where performance suffers as a result. When performing parallel imports, Sqoop needs a criterion by which it can split the workload. Sqoop uses a splitting column to split the workload. By default, Sqoop will identify the primary key column (if present) in a table and use it as the splitting column. The low and high values for the splitting column are retrieved from the database, and the map tasks operate on evenly-sized components of the total range. For example, if you had a table with a primary key column of id
whose minimum value was 0 and maximum value was 1000, and Sqoop was directed to use 4 tasks, Sqoop would run four processes which each execute SQL statements of the form SELECT * FROM sometable WHERE id >= lo AND id < hi
, with (lo, hi)
set to (0, 250), (250, 500), (500, 750), and (750, 1001) in the different tasks. If the actual values for the primary key are not uniformly distributed across its range, then this can result in unbalanced tasks. You should explicitly choose a different column with the --split-by
argument. For example, --split-by employee_id
. Note: Sqoop cannot currently split on multi-column indices. If your table has no index column, or has a multi-column key, then you must also manually choose a splitting column.
6.自定义查询,带where子句
$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where EMP_NO < 20000 AND $CONDITIONS' \
-m 1 \
--target-dir employeeFrfrmQry1
7.Direct 连接
默认情况下,导入使用的是JDBC ,不过一般来说数据库自带的工具传输性能更佳,当我们指定--direct这个参数的时候,Sqoop会尝试直接导入. 注意:当前,direct模式不支持大对象列.
$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where EMP_NO < 20000 AND $CONDITIONS' \
-m 1 \
--direct \
--target-dir employeeUsingDirect
8.Split by
并行执行进行记录划分
$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
--split-by EMP_NO \
--direct \
--target-dir SplitByExampleImport
9.指定边界(Boundary) 查询
还是并行相关的操作, 默认情况下sqoop会对split-by的列进行最大最小取值,然后进行划分. 不过偶尔不起作用,因此可以使用--boundary-query 进行指定.
$ sqoop --options-file SqoopImportOptions.txt --query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' --boundary-query "SELECT MIN(EMP_NO), MAX(EMP_NO) from employees" --split-by EMP_NO --direct --target-dir BoundaryQuerySample
10.指定一次获取数量 Fetch size
$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
--fetch-size=50000 \
--split-by EMP_NO \
--direct \
--target-dir FetchSize
11.压缩 Compression
$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
-z \
--split-by EMP_NO \
--direct \
--target-dir CompressedSample
12.增量导入 Incremental imports
预备
The command:
$ sqoop --options-file SqoopImportOptions.txt
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where EMP_NO < 15000 AND $CONDITIONS'
--split-by EMP_NO
--direct
--target-dir /user/airawat/sqoop-mysql/IncrementalImports
The number of records imported: 4999
$ hadoop fs -ls -R sqoop-mysql/IncrementalImports |grep part* | awk '{print $8}' |xargs hadoop fs -cat | wc -l
执行增量
三个参数:
- check-column : 指定增量导入需要检测的列
- incremental : 指定增量的方式有append 和 lastmodified
- last-value: 指定检测列的前一次最大导入值
命令
$ sqoop --options-file SqoopImportOptions.txt
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS'
--check-column EMP_NO
--incremental append
--last-value 14999
--split-by EMP_NO
--direct
--target-dir IncrementalImports
13.输出格式选项
$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
--fields-terminated-by , \
--escaped-by \\ \
--enclosed-by '\"' \
--split-by EMP_NO \
--direct \
--target-dir LineFormattingOptions
14.导入所有表
$ sqoop import-all-tables --options-file SqoopImportAllTablesOptions.txt \
--direct \
--warehouse-dir sqoop-mysql/EmployeeDatabase
#其他参考