杰瑞科技汇

Python如何对接FusionInsight?

Of course! "Python FusionInsight" is a common request, and it typically refers to using Python to interact with and manage data on FusionInsight, a Big Data platform developed by Huawei.

Python如何对接FusionInsight?-图1
(图片来源网络,侵删)

FusionInsight is an enterprise-grade platform that bundles many big data components, most notably Apache Hadoop, Spark, Flink, Hive, HBase, and more. The goal is to provide a unified, manageable, and secure environment for big data processing.

Here’s a comprehensive guide on how to use Python with FusionInsight, covering the most common use cases.


Core Idea: The Bridge

Python itself doesn't run on FusionInsight's core components (like HDFS or YARN). Instead, Python acts as a client or a driver that sends commands and code to the FusionInsight cluster. The heavy computation happens on the cluster's distributed resources.

There are two primary ways to bridge Python with FusionInsight:

Python如何对接FusionInsight?-图2
(图片来源网络,侵删)
  1. Client Libraries (Most Common): Use Python libraries that communicate with the services running on the FusionInsight cluster (e.g., PySpark for Spark, pyhdfs for HDFS).
  2. Server-Side Execution: Use Python environments that are installed on the FusionInsight cluster nodes and execute code via tools like PySpark on YARN or Zeppelin notebooks.

Method 1: Using Client Libraries (Your Laptop/Workstation)

This is the most flexible approach. You write Python scripts on your local machine that connect to the remote FusionInsight cluster.

Prerequisites

  1. Python Environment: A standard Python installation (e.g., 3.7+).
  2. FusionInsight Connection Information: You need the following from your FusionInsight administrator:
    • Cluster IP Address & Port: The address of the services (e.g., NameNode, ResourceManager).
    • Authentication Method: FusionInsight uses Kerberos for security. You will need:
      • A Kerberos principal (e.g., your_user@YOUR_REALM.COM).
      • A keytab file (your_user.keytab). This is the most crucial part.
    • Python Libraries: You'll need to install specific libraries.

Step-by-Step Guide with Kerberos Authentication

Step 1: Install Necessary Python Libraries

# For HDFS file operations
pip install hdfs pykerberos
# For Spark (PySpark)
pip install pyspark
# For Hive (via PySpark or JDBC)
# PySpark includes Hive support if you have the right connectors
# For direct JDBC connection, you'd need a driver like 'pyhive'
pip install pyhive thrift

Step 2: Configure Kerberos Authentication

Your Python script needs to authenticate with the FusionInsight cluster using Kerberos. The keytab file is your password.

Python如何对接FusionInsight?-图3
(图片来源网络,侵删)
import os
from krb5 import Krb5Context
# --- IMPORTANT ---
# Set the KRB5_CONFIG environment variable if your krb5.conf is not in the default location
# os.environ['KRB5_CONFIG'] = '/path/to/your/krb5.conf'
# Set your principal and keytab file path
PRINCIPAL = 'your_user@YOUR_REALM.COM'
KEYTAB = '/path/to/your/your_user.keytab'
# Authenticate
kinit_cmd = f"kinit -k -t {KEYTAB} {PRINCIPAL}"
os.system(kinit_cmd)
# Optional: Verify the ticket
# klist_output = os.popen("klist").read()
# print(klist_output)

Note: The kinit command is a standard Kerberos utility. Make sure you have it installed on your system (it comes with Kerberos clients).

Step 3: Interact with FusionInsight Services

Now that you're authenticated, you can use the Python libraries.

Example A: Reading/Writing Files on HDFS with pyhdfs

from hdfs import InsecureClient, TokenClient
import os
# Use Kerberos authentication
# The HDFS address can be found in your FusionInsight UI (e.g., http://<node>:9870)
hdfs_nn_address = 'http://<your-hdfs-node>:8020'
# After running kinit, you can use a token-based client
# This is the modern way to interact with secure HDFS
try:
    # The token requires your user principal
    token = os.popen("klist -s | grep -oP 'Default cache.*: \\K.*'").read().strip()
    if not token:
        raise ValueError("Could not get Kerberos token. Run kinit first.")
    client = TokenClient(hdfs_nn_address, token=token, user=PRINCIPAL)
    # Create a directory
    client.makedirs('/tmp/python_test', permission=755)
    # Write a file from a local string
    with client.write('/tmp/python_test/hello.txt') as writer:
        writer.write("Hello from Python via FusionInsight HDFS!")
    # Read the file
    with client.read('/tmp/python_test/hello.txt') as reader:
        content = reader.read()
        print(f"Content read from HDFS: {content.decode('utf-8')}")
except Exception as e:
    print(f"An error occurred: {e}")

Example B: Running a PySpark Job

This is the most common use case for data processing. Your Python script (the driver) will submit a Spark application to the FusionInsight cluster.

from pyspark.sql import SparkSession
import os
# Make sure you have run kinit before this script
# or ensure the environment is correctly set up for PySpark to find the ticket.
# --- Spark Configuration ---
# These properties tell PySpark how to connect to the secure cluster.
# Find the correct service principals and addresses in your FusionInsight documentation.
spark_conf = {
    "spark.hadoop.fs.defaultFS": "hdfs://<your-hdfs-cluster-name>", # Your HDFS cluster name
    "spark.hadoop.yarn.resourcemanager.address": "<your-rm-host>:8032",
    "spark.kerberos.principal": PRINCIPAL,
    "spark.kerberos.keytab": KEYTAB,
    # Add other necessary configurations for your cluster
    "spark.sql.shuffle.partitions": "4",
    "spark.executor.memory": "2g",
    "spark.driver.memory": "1g"
}
# --- Create SparkSession ---
spark = SparkSession.builder \
    .appName("PythonFusionInsightExample") \
    .config("spark.master", "yarn") \
    .config("deploy-mode", "cluster") # Or "client" for debugging
    .config(spark_conf) \
    .getOrCreate()
# --- Your Spark Code ---
print("Spark Session created successfully!")
# Create a sample DataFrame
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]
df = spark.createDataFrame(data, ["name", "id"])
# Show the DataFrame
print("Sample DataFrame:")
df.show()
# Perform a transformation
df_filtered = df.filter(df.id > 1)
print("Filtered DataFrame:")
df_filtered.show()
# Write the result to HDFS
# The path must be in a directory your user has write access to
output_path = "/tmp/python_spark_output"
df_filtered.write.mode("overwrite").parquet(output_path)
print(f"Wrote filtered data to HDFS: {output_path}")
# Stop the SparkSession
spark.stop()

Method 2: Using Python on the FusionInsight Cluster

In this scenario, you don't use your local Python. Instead, you submit a job or run an interactive session that uses a Python interpreter installed on the FusionInsight cluster nodes.

Common Tools:

  1. Spark on YARN: This is the most powerful method. You submit a Python script (.py) or a Spark SQL query (using pyspark shell) to the YARN ResourceManager. YARN allocates containers on the cluster nodes, and the Python code inside those containers uses the cluster's local Python installation to process the data. The example above with spark.master = yarn and deploy-mode = cluster is exactly this.

  2. FusionInsight Zeppelin/Spark Notebook: FusionInsight often comes with Zeppelin, a web-based notebook environment. You can create a PySpark interpreter within Zeppelin.

    • Advantage: Interactive, visual, and easy to share.
    • Setup: An administrator needs to configure the Zeppelin interpreter to point to the cluster's Spark installation and handle Kerberos authentication.
  3. MapReduce Streaming: An older, but still viable, method. You can write a Python script that reads from stdin and writes to stdout. You then submit this as a streaming job to MapReduce, which pipes data between your Python mapper and reducer scripts.


Summary and Best Practices

Method Use Case Pros Cons
Client Libraries Ad-hoc analysis, ETL scripts, data exploration from a local machine. - Flexible, uses your preferred dev environment.
- Easy to debug.
- Requires Kerberos setup on your local machine.
- Network latency for data transfer.
**PySpark
分享:
扫描分享到社交APP
上一篇
下一篇