杰瑞科技汇

hive python 编写 udf

核心概念

Hive 与 Python 的交互是基于 流式处理 的:

  1. 数据传递:Hive 将查询数据(通常是 SELECT 语句的结果)以文本行的形式,通过标准输入 发送给 Python 脚本。
  2. 脚本处理:Python 脚本逐行读取输入,进行处理,并将结果通过标准输出 写回。
  3. 数据接收:Hive 从标准输出读取 Python 脚本的返回结果,并将其作为查询结果的一部分。

这种方式的优点是灵活,可以使用任何 Python 库;缺点是进程间通信的开销较大,性能通常不如原生 Java UDF。


环境准备

在开始之前,请确保你的环境已经配置好:

  1. Hive 环境:一个可用的 Hive 服务。
  2. Python 环境:Hive 节点上需要安装 Python(通常推荐 Python 2.7 或 Python 3.x,但要注意兼容性)。
  3. Hive Streaming 开关:确保 Hive 配置中允许流式处理,这通常是默认开启的。

完整步骤:编写一个简单的 Python UDF

我们将创建一个简单的 UDF,其功能是将输入字符串转换为大写。

第 1 步:编写 Python 脚本

创建一个名为 to_upper.py 的文件,这个脚本需要从标准输入读取数据,处理后再写入标准输出。

to_upper.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
def main():
    # sys.stdin 是一个文件对象,可以逐行读取
    for line in sys.stdin:
        # 去除行尾的换行符,然后转换为大写
        # Hive 默认使用制表符 \t 作为列分隔符
        # 我们假设输入是一列数据
        upper_line = line.strip().upper()
        # 将处理后的结果写入标准输出
        # print 函数会自动添加换行符,符合 Hive 的要求
        print(upper_line)
if __name__ == "__main__":
    main()

脚本说明

  • #!/usr/bin/env python:这是一个 shebang,告诉系统用哪个解释器来执行这个脚本。
  • import sys:导入 sys 模块,用于访问标准输入和输出。
  • for line in sys.stdin::循环从标准输入读取每一行数据。
  • line.strip().upper():对每一行进行简单处理——去除首尾空白字符,并转换为大写。
  • print(upper_line):将处理后的结果打印到标准输出,Hive 会捕获这些输出。

第 2 步:将 Python 脚本上传到 HDFS

Hive 需要能够访问到这个 Python 脚本,最佳实践是将其上传到 HDFS。

# 假设你的脚本在本地 /path/to/to_upper.py
# 上传到 HDFS 的 /user/hive/scripts/ 目录下
hdfs dfs -put /path/to/to_upper.py /user/hive/scripts/to_upper.py

第 3 步:在 Hive 中使用 TRANSFORM 调用脚本

我们可以在 HiveQL 中使用 TRANSFORM 语句来调用这个脚本。

HiveQL 语句

-- 创建一个测试表
CREATE TABLE test_input (
    id INT,
    name STRING
) ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
-- 插入一些测试数据
INSERT INTO TABLE test_input VALUES (1, 'alice'), (2, 'bob'), (3, 'charlie');
-- 使用 TRANSFORM 调用 Python UDF
SELECT
    TRANSFORM(id, name)
    USING 'python /user/hive/scripts/to_upper.py' -- 指定要执行的脚本和解释器
    AS (id, upper_name) -- 定义输出列的名称和类型
FROM
    test_input;

查询结果

1   ALICE
2   BOB
3   CHARLIE

TRANSFORM 语句详解

  • TRANSFORM(id, name):指定要传递给 Python 脚本的输入列,Hive 会将 idname 的值用制表符 \t 连接成一行,作为标准输入的一行。
  • USING 'python /user/hive/scripts/to_upper.py':这是核心部分,告诉 Hive 如何执行脚本。
    • python:是 Python 的可执行文件。python 命令在环境变量中找不到,可能需要使用绝对路径,如 /usr/bin/python3
    • /user/hive/scripts/to_upper.py:是 HDFS 上脚本的位置。
  • AS (id, upper_name):定义 Python 脚本输出的列,Python 脚本输出的每一行也会被用制表符 \t 分割,id 对应第一个字段,upper_name 对应第二个字段。

进阶示例:处理多列和复杂逻辑

让我们创建一个更复杂的 UDF,它接收两个数字,并返回它们的和与积。

第 1 步:编写 Python 脚本

math_ops.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
def main():
    for line in sys.stdin:
        # 输入格式是 "num1\tnum2"
        parts = line.strip().split('\t')
        # 确保有且只有两列
        if len(parts) == 2:
            try:
                num1 = float(parts[0])
                num2 = float(parts[1])
                # 计算和与积
                sum_val = num1 + num2
                product = num1 * num2
                # 输出格式 "sum\tproduct"
                print(f"{sum_val}\t{product}")
            except ValueError:
                # 如果转换失败,输出 NULL 或特定标记
                print("NULL\tNULL")
        else:
            # 如果列数不对,输出 NULL
            print("NULL\tNULL")
if __name__ == "__main__":
    main()

第 2 步:上传到 HDFS

hdfs dfs -put /path/to/math_ops.py /user/hive/scripts/math_ops.py

第 3 步:在 Hive 中使用

HiveQL 语句

-- 创建测试表
CREATE TABLE numbers (
    a INT,
    b INT
) ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';
-- 插入数据
INSERT INTO TABLE numbers VALUES (10, 5), (7, 3), (100, 2);
-- 调用 Python UDF
SELECT
    a,
    b,
    TRANSFORM(a, b)
    USING 'python /user/hive/scripts/math_ops.py'
    AS (sum_result, product_result)
FROM
    numbers;

查询结果

10  15  50
7   10  21
100 102 200

性能优化与最佳实践

  1. 减少数据量TRANSFORM 会将数据从 Hive JVM 拷贝到 Python 进程,再拷贝回来,这个过程很慢,尽量在 TRANSFORM 之前使用 WHEREGROUP BYJOIN 等 Hive 原生操作减少数据量。

    -- 差:处理整个大表
    SELECT TRANSFORM(*) FROM huge_table;
    -- 好:先过滤
    SELECT TRANSFORM(*) FROM huge_table WHERE id > 1000;
  2. 使用 MAPREDUCETRANSFORM 通常与 MAPREDUCE 子句一起使用,以明确指定脚本是用于 Map 阶段还是 Reduce 阶段,这有助于 Hive 优化执行计划。

    -- 使用 MAP
    FROM numbers
    MAP a, b
    USING 'python /user/hive/scripts/math_ops.py'
    AS (sum_result, product_result);
    -- 使用 REDUCE (配合 GROUP BY)
    SELECT
        TRANSFORM(key, value_list) -- value_list 是一个逗号分隔的字符串
        USING 'python /user/hive/scripts/reduce_script.py'
        AS (final_key, final_value)
    FROM numbers
    GROUP BY key;
  3. Python 环境管理

    • 依赖库:如果你的脚本需要 pandas, numpy 等第三方库,你需要确保所有 Hive 节点上都安装了这些库,并且版本一致,管理起来很麻烦。

    • 虚拟环境:可以将 Python 脚本和其依赖打包成一个虚拟环境(例如使用 venvconda),然后将整个虚拟环境目录打包成 .tar.gz 文件,上传到 HDFS,并在 USING 命令中指定解压后的路径。

      # 打包虚拟环境
      tar -czf py_env.tar.gz my_python_env/
      # 上传到 HDFS
      hdfs dfs -put py_env.tar.gz /user/hive/scripts/
      # 在 Hive 中使用 (需要先解压,或者使用 PYTHONPATH)
      -- 假设解压到 /user/hive/scripts/py_env
      USING '/usr/bin/python /user/hive/scripts/py_env/bin/python /user/hive/scripts/my_script.py'
  4. 错误处理:在 Python 脚本中加入 try-except 块,处理可能的异常,避免脚本崩溃导致整个 Hive 任务失败。

替代方案:PyHive 和 hive-exec

如果你追求更好的性能和更紧密的集成,可以考虑使用 PyHive 库,但这需要更复杂的部署,通常用于 Python 应用程序直接连接 Hive Metastore 或执行查询,而不是在 HiveQL 内部编写 UDF。

对于在 Hive 内部创建函数,TRANSFORM 是最标准、最通用的方法。

步骤 操作 说明
编写脚本 创建 .py 文件,使用 sys.stdin 读取,print 写出。 脚本逻辑必须能处理流式输入/输出。
上传脚本 将脚本放到 HDFS 或集群共享的文件系统。 确保 Hive 任务能访问到脚本。
编写 HiveQL 使用 TRANSFORM(...) USING 'python <script_path>' AS (...) 明确输入列、脚本命令和输出列。
优化执行 TRANSFORM 前用 Hive 原生操作减少数据量。 性能关键点。

通过以上步骤,你就可以在 Hive 中灵活地使用 Python 来处理复杂的逻辑了。

分享:
扫描分享到社交APP
上一篇
下一篇