一 、研发背景
DataX官方开源的版本支持HDFS文件的读写 ,并没有支持基于JDBC的Hive数据读写 ,很多时候一些数据同步不太方便,比如在读取Hive之前先执行一些sql 、读取一些Hive的视图数据、或者在数据同步时执行一段固定的SQL ,将SQL执行结果写入下游等各种场景 ,实际上还是需要Hive插件来支持 。而在实际工作中 ,我们也遇到了类似的一些情况需要二次开发DataX以支持此类场景 。本插件已在生产环境稳定运行一年有余 ,现分享给大家 ,如有问题也可联系我(qq:1821088755)。
二 、HiveReader插件介绍
hivereader插件比较简单 ,共有三个类 ,两个配置文件 。其中:
HiveReader:实现DataX框架核心方法 ,是具体逻辑 。
HiveReaderErrorCode:继承了DataX框架的ErrorCode类 ,是用于统一异常处理DataXException类中调用,具体是新增了一个枚举值 。
HiveConnByKerberos:是在检测到Hive具备Kerberos认证要求时 ,进行认证的工具类 。
plugin.json:DataX插件固定的配置文件 ,用于指定插件的入口类 。
plugin_job_template.json:二次开发插件,一般需要提供一下具体的使用方式 ,此json文件即为HiveReader插件的配置方式说明 。
2.1 HiveReader类
首先是HiveReader类 ,需要注意的是一些常量或枚举值,需要自行添加 ,其中DataBaseType枚举类中 ,需要新增Hive枚举项并添加Hive的驱动类全路径 ,具体见注释 ,另外就是Kerberos认证相关的几个配置 ,一个是keytab的路径 ,一个是krb5.conf的路径 ,另外一个是principle的值 。
2.2 HiveConnByKerberos类
HiveConnByKerberos类比较简单 ,是一个通用的Kerberos认证的接口 。
package com.alibaba.datax.plugin.reader.hivereader;
import com.alibaba.datax.common.exception.PlumberException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
@Slf4j
public class HiveConnByKerberos {
public static void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf,String krb5conf) {
System.setProperty("java.security.krb5.conf",krb5conf);
if (StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) {
UserGroupInformation.setConfiguration(hadoopConf);
try {
UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
}
catch (Exception e) {
log.error("kerberos认证失败");
String message = String.format("kerberos认证失败 ,请检查 " +
"kerberosKeytabFilePath[%s] 和 kerberosPrincipal[%s]",
kerberosKeytabFilePath, kerberosPrincipal);
e.printStackTrace();
throw DataXException.asDataXException(HiveReaderErrorCode.KERBEROS_LOGIN_ERROR, message, e);
}
}
}
}
2.3 HiveReaderErrorCode类
HiveReaderErrorCode类,主要就是集成ErrorCode类 ,并添加一个枚举项 ,这块可直接在ErrorCode类添加,也可使用此类 ,为固定写法 。
package com.alibaba.datax.plugin.reader.hivereader;
import com.alibaba.datax.common.spi.ErrorCode;
public enum HiveReaderErrorCode
implements ErrorCode
{
KERBEROS_LOGIN_ERROR("HiveReader-13", "KERBEROS认证失败");
private final String code;
private final String description;
HiveReaderErrorCode(String code, String description)
{
this.code = code;
this.description = description;
}
@Override
public String getCode()
{
return this.code;
}
@Override
public String getDescription()
{
return this.description;
}
@Override
public String toString()
{
return String.format("Code:[%s], Description:[%s]. ", this.code, this.description);
}
}
2.4 plugin.json文件
{
"name": "hivereader",
"class": "com.alibaba.datax.plugin.reader.hivereader.HiveReader",
"description": "Retrieve data from Hive via jdbc",
"developer": "wxm"
}
2.5 plugin_job_template.json文件
这块需要注意的一个问题是 ,如果Kerberos认证的Hive连接URL有两种方式,如果是基于zookeeper的方式 ,则需保证运行DataX服务的节点与zookeeper节点网络是打通的 ,并且一定不要忘记写上具体的Hive库名。
{
"name": "hivereader",
"parameter": {
"column": [
"*"
],
"username": "hive",
"password": "", "preSql":"show databases;",
"connection": [
{
"jdbcUrl": [
"jdbc:hive2://localhost:10000/default;principal=hive/_HOST@EXAMPLE.COM"
],
"table": [
"hive_reader"
]
}
],
"where": "logdate=20211013" ,
"haveKerberos": true,
"kerberosKeytabFilePath": "/etc/security/keytabs/hive.headless.keytab",
"kerberosPrincipal": "hive@EXAMPLE.COM"
}
}
声明:本站所有文章 ,如无特殊说明或标注 ,均为本站原创发布 。任何个人或组织 ,在未征得本站同意时 ,禁止复制 、盗用 、采集 、发布本站内容到任何网站 、书籍等各类媒体平台 。如若本站内容侵犯了原著者的合法权益 ,可联系我们进行处理。