杰瑞科技汇

Python Spark Streaming 如何实时处理数据?

Of course! Let's dive into Python Spark Streaming, a powerful and popular technology for processing real-time data.

Python Spark Streaming 如何实时处理数据?-图1
(图片来源网络,侵删)

I'll structure this answer to be a comprehensive guide, covering:

  1. What is Spark Streaming? (The "Why")
  2. The Core Concept: Discretized Streams (DStreams) (The "How it Works")
  3. A Simple, End-to-End Python Example (The "Let's Code")
  4. Key Concepts & Transformations (The "Building Blocks")
  5. Stateful Processing (A More Advanced Topic)
  6. Structured Streaming (The Modern Approach)

What is Spark Streaming?

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. It allows you to treat live data streams as if they were unbounded tables, enabling you to use the same powerful DataFrame and SQL APIs you use for batch processing.

Key Use Cases:

  • Real-time Analytics: Monitoring user activity on a website or app.
  • Live Alerting: Detecting fraudulent credit card transactions or security threats.
  • ETL (Extract, Transform, Load): Continuously ingesting and transforming data from sources like Kafka or Kinesis into a data warehouse.
  • IoT Data Processing: Analyzing sensor data from smart devices.

How is it different from Batch Processing?

Python Spark Streaming 如何实时处理数据?-图2
(图片来源网络,侵删)
  • Batch: Processes a fixed, finite set of data. (e.g., "Process all sales from yesterday.")
  • Streaming: Processes a continuous, infinite flow of data. (e.g., "Process every new sales order as it comes in.")

The Core Concept: Discretized Streams (DStreams)

Spark Streaming's fundamental abstraction is the Discretized Stream (DStream). A DStream represents a continuous stream of data, which can be thought of as a sequence of Resilient Distributed Datasets (RDDs), the core data structure in Spark.

  • Input DStream: A DStream representing the live data stream being received from a source (e.g., a Kafka topic).
  • Batches: Spark Streaming divides the continuous stream into small, discrete batches of data (e.g., every 1 second). Each batch is processed as an RDD.
  • Processing: You can apply transformations (like map, filter, reduceByKey) to a DStream. These transformations are applied to the underlying RDDs of each batch.
  • Output DStream: The result of a transformation is another DStream, which can be written to an external system (like a database or the console).

Analogy: Imagine a river (the live data stream). Spark Streaming builds a series of small, manageable buckets (the batches) to scoop up water from the river at regular intervals. You can then perform an action on the water in each bucket (the transformation) before pouring it into a reservoir (the output).


A Simple, End-to-End Python Example

Let's build a classic word count application that reads text from a network socket. This is the "Hello, World!" of streaming.

Prerequisites

You need to have PySpark installed.

Python Spark Streaming 如何实时处理数据?-图3
(图片来源网络,侵删)
pip install pyspark

The Code (streaming_wordcount.py)

This program will:

  1. Create a SparkSession.
  2. Define a "streaming context" with a batch interval of 1 second.
  3. Connect to a socket (we'll use netcat to send data).
  4. Split the lines into words.
  5. Count the words in each batch.
  6. Print the results.
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
# 1. Create a SparkSession
# A SparkSession is the entry point to any Spark functionality.
# We also need to create a StreamingContext from it.
spark = SparkSession.builder \
    .appName("PythonStreamingWordCount") \
    .getOrCreate()
# Set the log level to ERROR to reduce verbosity in the console
spark.sparkContext.setLogLevel("ERROR")
# 2. Create a StreamingContext with a batch interval of 1 second
# The batch interval is the time at which Spark will process the data.
ssc = StreamingContext(spark.sparkContext, 1)
# 3. Create a DStream that will connect to a source
# We'll connect to a TCP socket. The hostname is 'localhost' and the port is 9999.
# Data can be sent to this socket using tools like 'netcat' or 'telnet'.
lines = ssc.socketTextStream("localhost", 9999)
# 4. Perform transformations on the DStream
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))
# Count each word in each batch
# The 'reduceByKey' operation groups the data by key (the word) and
# applies a function to combine the values for each key.
pairs = words.map(lambda word: (word, 1))
word_counts = pairs.reduceByKey(lambda x, y: x + y)
# 5. Print the first 10 elements of each RDD in the DStream to the console
# Note: In a real application, you would save this to a database or file.
# .pprint() is a useful action for debugging.
word_counts.pprint()
# 6. Start the computation
# This is a crucial step. The streaming context will now start processing
# data as it arrives. The program will run until you manually stop it.
ssc.start()
# Wait for the computation to terminate
ssc.awaitTermination()

How to Run It

Step 1: Start a Netcat (nc) Server Open a new terminal and run the following command. This will listen on port 9999 and print any text it receives to the console.

# On macOS or Linux
nc -lk 9999
# On Windows (using netcat for Windows, often named 'nc.exe')
nc -l -p 9999

Step 2: Run the Python Script Open a second terminal and run your Spark script.

python streaming_wordcount.py

You will see no output in the Spark terminal yet, because it's waiting for data.

Step 3: Send Data to the Socket Go back to your first terminal (the one running nc). Type a sentence and press Enter.

hello world spark streaming
hello spark

Step 4: Observe the Output In your Spark terminal, you will see the output printed every second (the batch interval). It will look something like this:

-------------------------------------------
Time: 2025-10-27 10:30:00
-------------------------------------------
(spark, 2)
(hello, 2)
(world, 1)
(streaming, 1)
-------------------------------------------
Time: 2025-10-27 10:30:01
-------------------------------------------
(spark, 1)
(hello, 1)

Notice how it counts the words from all the data received within that 1-second window.

Step 5: Stop the Application In the Spark terminal, press Ctrl+C to stop the streaming context.


Key Concepts & Transformations

DStreams support many of the same transformations as Spark's RDDs.

Transformation Meaning
window(windowLength, slideInterval) Applies transformations over a sliding window of data.
reduceByKey(func) Returns a new DStream by applying a function to each value associated with the same key.
count() Returns a new DStream containing the number of elements in each RDD.
filter(func) Returns a new DStream by selecting only those records where func returns True.
map(func) Returns a new DStream by applying a function to each element.
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items.

Stateless vs. Stateful Operations:

  • Stateless: Operations like map, filter, reduceByKey are stateless. They process each batch independently. The word count example above is stateless.
  • Stateful: Operations like window require the system to remember information across multiple batches. This is essential for calculations like moving averages.

Stateful Processing: The updateStateByKey Example

Sometimes you need to maintain state across batches. For example, you might want a running total of word counts, not just the count for the last second.

The updateStateByKey transformation allows you to do this. It lets you maintain arbitrary state for each key and update it incrementally using the previous state and the new values from the incoming batch.

Let's modify our word count to keep a running total.

# (Assuming SparkSession and StreamingContext are already created)
# ... (lines, words, pairs are the same as before)
lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
# Define the function to update the state
# The function takes two arguments:
# 1. The values for a key in the new batch (e.g., [1, 1] for the word 'spark')
# 2. The previous state for that key (e.g., 10)
# It should return the new state (e.g., 12)
def update_function(new_values, previous_count):
    # previous_count is an Optional integer, it will be None for the first time
    return sum(new_values) + (previous_count or 0)
# Use updateStateByKey to maintain a running total
running_counts = pairs.updateStateByKey(update_function)
# Print the running totals
running_counts.pprint()
ssc.start()
ssc.awaitTermination()

Now, if you run this and send the same data, the output will accumulate:

-------------------------------------------
Time: 2025-10-27 10:35:00
-------------------------------------------
(spark, 2)
(hello, 2)
(world, 1)
(streaming, 1)
-------------------------------------------
Time: 2025-10-27 10:35:01
-------------------------------------------
(spark, 3)  <-- 2 (from before) + 1 (new)
(hello, 2)
(world, 1)
(streaming, 1)

Structured Streaming (The Modern & Recommended Approach)

While DStreams are powerful, Structured Streaming is now the primary and recommended streaming API in Spark. It's built on the Spark SQL engine and offers several advantages:

  • Unified API: Uses the same DataFrame/Dataset API for both batch and streaming.
  • Fault Tolerance: Achieves end-to-end exactly-once guarantees with a new, reliable processing model.
  • Optimization: The Spark SQL optimizer can automatically optimize the execution plan.
  • Simplified Aggregations: Built-in support for event-time processing and watermarks for handling late data.

Concept: You treat a stream as a table that is being continuously appended. You write a batch SQL-like query on this unbounded table, and Spark runs it continuously as new data arrives.

Example: Structured Streaming Word Count

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col
spark = SparkSession.builder \
    .appName("StructuredStreamingWordCount") \
    .getOrCreate()
# Create a streaming DataFrame that represents an unbounded table
# The 'format' is 'socket', 'schema' is the structure of the data
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()
# Split the lines into words
words = lines.select(
    explode(
        split(lines.value, " ")
    ).alias("word")
)
# Generate running word counts
wordCounts = words.groupBy("word").count()
# Start running the query that prints the running counts to the console
# 'outputMode("complete")' means the entire result of the aggregation will be printed.
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()
query.awaitTermination()

This example achieves the same result as the DStream version but uses a more declarative, DataFrame-based approach, which is generally easier to maintain and optimize.

Summary

Feature Spark Streaming (DStreams) Structured Streaming
API Low-level RDD-based API High-level DataFrame/Dataset API
Processing Model Micro-batch of RDDs Continuous processing of a table
Fault Tolerance Good (using checkpointing) Excellent (end-to-end exactly-once)
Ease of Use More manual state management Declarative, similar to SQL
Recommendation Legacy API, not recommended for new projects Recommended for all new projects

For any new development, you should strongly prefer using Structured Streaming. However, understanding DStreams is still valuable as they are foundational to how Spark Streaming works internally.

分享:
扫描分享到社交APP
上一篇
下一篇