杰瑞科技汇

MaxCompute python

Of course! Here is a comprehensive guide to using Python with MaxCompute, covering the core concepts, methods, and best practices.

MaxCompute python-图1
(图片来源网络,侵删)

What is MaxCompute?

MaxCompute is a massive, fully managed, and highly scalable Big Data Service on the Alibaba Cloud (similar to AWS EMR, Google BigQuery, or Azure Synapse Analytics). It's designed to process petabytes of data using a distributed computing engine (MapReduce-like) and a SQL-like query language (ODPS SQL).

The key takeaway is that MaxCompute is not your local machine. It's a separate, distributed system. Therefore, you can't just run pandas or sklearn directly on your data. You need a way to move your code and data to the MaxCompute environment.

This is where the MaxCompute Python SDK comes in.


The MaxCompute Python SDK

The MaxCompute Python SDK is the official library that allows you to interact with MaxCompute services from a Python environment. It serves several primary purposes:

MaxCompute python-图2
(图片来源网络,侵删)
  1. Running SQL Queries: Execute ODPS SQL scripts from your Python code and fetch the results.
  2. Running Python Scripts (UDF/UDAF): Upload and execute Python code directly within the MaxCompute distributed environment. This is essential for custom data processing, machine learning, and complex logic that SQL can't handle.
  3. Data Ingestion (ETL): Upload local data files (e.g., from your local machine or OSS) into MaxCompute tables.
  4. Resource Management: Upload Python files, JAR files, or other dependencies that your MaxCompute Python scripts might need.

Core Concepts: PyODPS

The Python SDK is built around a central class called odps. When you create an instance of this class, you establish a connection to your MaxCompute project. This odps object is your gateway to everything.

  • odps: The main entry point for all operations. It represents your connection to a MaxCompute project.
  • Table: Represents a table in your MaxCompute project.
  • Instance: Represents a running job (either a SQL query or a Python script). You can use it to check the job's status and retrieve logs.
  • resources: A manager for uploading and managing resources (like .py files or .zip archives).

How to Use the MaxCompute Python SDK

Step 1: Installation

First, install the SDK using pip. It's recommended to use a virtual environment.

pip install pyodps

Step 2: Authentication and Connection

You need to authenticate to connect to your MaxCompute project. There are two common methods:

Method 1: Using an AccessKey ID and Secret (Recommended for scripts and CI/CD)

You can pass your credentials directly, but for better security, use environment variables.

# Set these in your terminal or in your environment
export ODPS_ACCESS_ID=<your_access_id>
export ODPS_ACCESS_KEY=<your_access_key>
export ODPS_ENDPOINT=<your_endpoint, e.g., http://service.cn.maxcompute.aliyun.com/api>
export ODPS_PROJECT_NAME=<your_project_name>

Then, in your Python code:

from odps import ODPS
# The SDK will automatically read the environment variables
odps = ODPS()

Method 2: Using a Configuration File (Easier for local development)

Create a file named odps_config.ini in your home directory (~/.odps/config.ini):

[odps]
access_id = <your_access_id>
access_key = <your_access_key>
project = <your_project_name>
endpoint = <your_endpoint>

Your Python code remains simple:

from odps import ODPS
odps = ODPS()

Step 3: Running SQL Queries

This is the most straightforward use case. You can run any ODPS SQL statement.

from odps import ODPS
# Assuming odps is initialized as above
odps = ODPS()
try:
    # Run a SQL query
    with odps.execute_sql('SELECT * my_table LIMIT 10') as result:
        # The result is an iterable cursor
        for record in result:
            print(record)
    # You can also use run_sql to get an Instance object for more control
    instance = odps.run_sql('SELECT COUNT(*) as total FROM my_table')
    instance.wait_for_success()  # Wait for the job to finish
    if instance.status == 'Success':
        # Get the results from the successful instance
        with instance.open_reader() as reader:
            for record in reader:
                print(f"Total rows: {record[0]}") # Access by index
                print(f"Total rows: {record.total}") # Access by name
except Exception as e:
    print(f"An error occurred: {e}")

Step 4: Running Python Scripts (UDF - User-Defined Function)

This is where the real power lies. You can write complex Python logic and run it on the entire dataset in a distributed fashion.

Scenario: You want to clean a string column by removing special characters.

Step 4a: Write your Python code

Create a file named my_cleaner.py:

# my_cleaner.py
import re
def clean_string(s):
    """
    A simple function to clean a string by removing non-alphanumeric characters.
    """
    if s is None:
        return None
    # Remove any character that is not a letter or a number
    return re.sub(r'[^a-zA-Z0-9]', '', s)

Step 4b: Upload the code as a resource

You need to make this Python file available to MaxCompute.

from odps import ODPS
odps = ODPS()
# Upload the Python file as a resource
# The name 'my_cleaner_resource' will be used to reference it in the script
odps.resources.upload('my_cleaner_resource', 'my_cleaner.py')

Step 4c: Write and run the PyODPS script

Now, write a script that uses this resource. You can do this in two ways:

Option A: Using PyODPS Script (.py file)

Create a file named run_cleaner.py:

# run_cleaner.py
from odps import ODPS
from odps.udf import annotate
# Annotate the function to tell MaxCompute its input and output types.
# 'string->string' means it takes a string and returns a string.
@annotate('string->string')
def clean_string(s):
    # This function will be replaced by the one in our resource file.
    # We just need the signature here.
    pass
# The main script logic
if __name__ == '__main__':
    odps = ODPS()
    # Use the resource in our script
    with odps.open_reader(
        f'''
        SELECT 
            user_id,
            clean_string(raw_text) as cleaned_text
        FROM my_text_table
        LIMIT 100
        ''',
        resources=['my_cleaner_resource'] # Reference the uploaded resource
    ) as reader:
        for record in reader:
            print(record)

You then run this script using the odps command-line tool:

# Make sure you are authenticated and the odps_config.ini is set up
odps -run run_cleaner.py

Option B: Using an ODPS PyODPS Notebook (Recommended for Exploration)

The MaxCompute console provides a Jupyter-like Notebook environment where you can write and run PyODPS code interactively. The process is similar: you upload the resource and then write cell blocks that reference it.


Best Practices and Advanced Topics

  1. Use with Statements: Always use with odps.execute_sql(...) or with instance.open_reader() to ensure resources are properly closed.

  2. Data Type Mapping: Understand how MaxCompute data types map to Python types.

    • MaxCompute bigint -> Python int
    • MaxCompute double -> Python float
    • MaxCompute string -> Python str
    • MaxCompute boolean -> Python bool
    • MaxCompute datetime -> Python datetime.datetime
  3. Passing Data Between Python and MaxCompute: Be mindful of data serialization overhead. When you use instance.open_reader(), data is being serialized and transferred from the MaxCompute cluster to your local Python environment. For very large results, this can be slow. It's often better to let the processing happen inside MaxCompute and only bring back a summary or a small sample.

  4. Using Pandas and Scikit-learn (PyODPS DataFrame): The SDK provides a higher-level abstraction called PyODPS DataFrame, which is very similar to pandas. It allows you to perform data manipulations using a pandas-like API, but the operations are translated into and executed on the MaxCompute cluster.

    from odps import ODPS
    from odps.df import DataFrame
    odps = ODPS()
    # Create a DataFrame from a MaxCompute table
    df = DataFrame(odps.get_table('my_table'))
    # Use a pandas-like API
    # This entire operation is executed on the MaxCompute cluster
    result_df = df[df['age'] > 30].groupby('city')['salary'].mean()
    # The result is another DataFrame, which you can iterate over
    for row in result_df.collect():
        print(row)

    You can even use scikit-learn models within PyODPS scripts for in-database machine learning.

  5. Debugging: Debugging code running on a distributed cluster is hard.

    • Use Logs: The instance.get_log() method will show you the standard output and error streams from your distributed Python script.
    • Test Locally: Test your Python functions locally with small sample data before deploying them as MaxCompute resources.
    • Print Statements: print() statements in your UDF code will appear in the logs, which is very helpful for debugging.

Summary: When to Use What

Task Recommended Method
Simple data exploration and analysis MaxCompute SQL via odps.execute_sql()
Interactive data analysis & prototyping PyODPS Notebook or PyODPS DataFrame
Custom data cleaning/ETL logic PyODPS Script with a Python resource (UDF)
In-database Machine Learning PyODPS Script with scikit-learn resources
Ad-hoc data pull to local machine SQL Query + instance.open_reader() to get a pandas.DataFrame
分享:
扫描分享到社交APP
上一篇
下一篇