Of course! Here's a comprehensive guide to using Apache Storm with Python, covering the core concepts, setup, and a complete example.

Why Use Python with Apache Storm?
While Apache Storm was originally designed for the JVM (Java, Scala, Clojure), it has excellent support for other languages through a mechanism called Multi-Language Protocol (MLP). This allows you to write your processing logic (called "bolts") in any language, including Python.
Key Advantages:
- Leverage Python Ecosystem: Use powerful Python libraries like NumPy, Pandas, Scikit-learn, NLTK, etc., for data analysis and machine learning directly inside your Storm topology.
- Rapid Prototyping: Python is often faster for writing and iterating on complex data manipulation logic.
- Existing Skills: Leverage your team's Python expertise without requiring them to learn the JVM ecosystem.
Core Concepts in a Nutshell
Before diving into the code, let's quickly recap the main components of a Storm topology.
- Topology: A graph of computation nodes (spouts and bolts) that are connected by streams of data. A topology runs forever until you explicitly kill it.
- Spout: A source of data in a topology. It reads data from an external source (like a message queue, database, or file) and emits tuples (data packets) into the topology. Spouts are the entry points.
- Bolt: A processing unit in a topology. It receives tuples from one or more spouts or other bolts, processes them, and can optionally emit new tuples. Bolts can perform filtering, aggregations, joins, database lookups, or any other computation.
- Stream: The unbounded pipeline of tuples flowing from a spout or bolt to another bolt.
- Tuple: The basic unit of data in Storm. It's an ordered list of values, like a list or a tuple in Python.
How the Multi-Language Protocol (MLP) Works
The magic behind Python support is the MLP. Here's the flow:

- Java Master: The Storm cluster is managed by Java processes (Nimbus, Supervisor).
- Python Worker: The worker processes on your cluster nodes run Python interpreters.
- Stdin/Stdout Communication: The Java worker and the Python process communicate by writing messages to each other's standard input (
stdin) and reading from standard output (stdout). The messages are serialized using a simple, line-based protocol. - Thrift for Definition: The structure of the topology (which spout is connected to which bolt, etc.) is defined using Thrift, a language-agnostic serialization framework. This definition is sent from the client (your machine) to the Java master (Nimbus).
Prerequisites
- Python 3: Ensure you have Python 3 installed.
- Apache Storm: You need a Storm cluster running (local or distributed). For local development, you can use the
stormcommand-line tool. storm-pythonlibrary: This library simplifies the process of creating spouts and bolts and handling the MLP communication.pip install storm-python
Step-by-Step Example: Word Count Topology in Python
Let's build the classic "Word Count" topology. This topology will read sentences, split them into words, and count the frequency of each word.
Step 1: Create the Spout (sentence_spout.py)
The spout's job is to emit sentences. For this example, we'll just emit a predefined list of sentences.
# sentence_spout.py
import storm
from storm import Spout, Bolt
class SentenceSpout(Spout):
# Initialize the spout
def initialize(self, conf, context):
self.sentences = [
"the quick brown fox",
"jumps over the lazy dog",
"apache storm is a great framework",
"python is a powerful language"
]
self sentence_index = 0
print("Sentence Spout initialized")
# Called when a task is sent to this spout
def nextTuple(self):
if self.sentence_index < len(self.sentences):
# Emit the sentence as a tuple
sentence = self.sentences[self.sentence_index]
self.emit([sentence])
self.sentence_index += 1
else:
# To prevent the spout from busy-waiting, sleep a bit
storm.sleep(1000)
# Register the spout with Storm
SentenceSpout().run()
Step 2: Create the Splitter Bolt (splitter_bolt.py)
This bolt receives a sentence, splits it into words, and emits each word.
# splitter_bolt.py
import storm
from storm import Bolt
class SplitterBolt(Bolt):
def initialize(self, conf, context):
self.words = []
print("Splitter Bolt initialized")
def process(self, tup):
sentence = tup.values[0]
words = sentence.split(" ")
for word in words:
# Emit each word as a new tuple
self.emit([word])
# Register the bolt with Storm
SplitterBolt().run()
Step 3: Create the Counter Bolt (counter_bolt.py)
This is the most complex bolt. It receives words and maintains a count for each word. This state must be preserved between tuples.

# counter_bolt.py
import storm
from storm import Bolt
class CounterBolt(Bolt):
def initialize(self, conf, context):
# Use a Python dictionary to keep the word counts
self.word_counts = {}
print("Counter Bolt initialized")
def process(self, tup):
word = tup.values[0]
# Update the count for the word
self.word_counts[word] = self.word_counts.get(word, 0) + 1
# Emit the word and its updated count
self.emit([word, self.word_counts[word]])
# For debugging, you can print the counts
# print(f"Counts: {self.word_counts}")
# Register the bolt with Storm
CounterBolt().run()
Step 4: Create the Topology File (word_count_topology.py)
This file defines the structure of your topology—how the spouts and bolts are connected. You run this file on your local machine to submit the topology to the Storm cluster.
# word_count_topology.py
import storm
from storm import ShellBolt
# Define the bolts by their command line execution
# Storm will run these commands in separate processes
SplitterBolt = ShellBolt.exec(["python", "splitter_bolt.py"])
CounterBolt = ShellBolt.exec(["python", "counter_bolt.py"])
# Define the topology
def run_topology():
# Create a new topology
topology = storm.Topology("word-count-python")
# Add the spout
# The first argument is the name, the second is the spout class
topology.add_spout("sentence-spout", SentenceSpout, 1) # 1 parallel instance
# Add the splitter bolt
# It will receive tuples from the 'sentence-spout'
topology.add_bolt("splitter-bolt", SplitterBolt, 2, # 2 parallel instances
fields=["sentence"], # Group by the 'sentence' field
groupings=[{"stream": "default", "bolt": "sentence-spout"}])
# Add the counter bolt
# It will receive tuples from the 'splitter-bolt'
topology.add_bolt("counter-bolt", CounterBolt, 2, # 2 parallel instances
fields=["word"], # Group by the 'word' field
groupings=[{"stream": "default", "bolt": "splitter-bolt"}])
# Submit the topology to the Storm cluster
topology.submit()
if __name__ == '__main__':
run_topology()
Note: The storm-python library provides a higher-level API, but the ShellBolt is a core part of the MLP. It's a Java bolt that executes a command (in our case, python splitter_bolt.py) and pipes data to/from it.
Step 5: Run the Topology Locally
-
Make sure all Python files (
sentence_spout.py,splitter_bolt.py,counter_bolt.py,word_count_topology.py) are in the same directory. -
Open your terminal in that directory.
-
Run the topology:
# This command runs the topology locally for 10 seconds storm jar target/storm-starter/storm-starter-0.10.0.jar storm.starter.WordCountTopology word-count-topology
Wait, that's a Java command! The
storm-pythonlibrary simplifies this. Theword_count_topology.pyscript itself is designed to be run with thestormcommand. The correct way to submit it using thestormCLI is often a bit more involved. A simpler way for local testing is to use theLocalClusterfrom thestormPython package.
Let's adjust the word_count_topology.py for local execution:
# word_count_topology.py (revised for local run)
import storm
from storm.local import LocalCluster
from storm import ShellBolt
# Assuming SentenceSpout, SplitterBolt, CounterBolt are defined in the same file
# or imported from their respective files.
# For simplicity, let's assume they are in the same file for this example.
class SentenceSpout(storm.Spout):
def initialize(self, conf, context):
self.sentences = [
"the quick brown fox", "jumps over the lazy dog",
"apache storm is a great framework", "python is a powerful language"
]
self.sentence_index = 0
def nextTuple(self):
if self.sentence_index < len(self.sentences):
self.emit([self.sentences[self.sentence_index]])
self.sentence_index += 1
else:
storm.sleep(1000)
class SplitterBolt(storm.Bolt):
def process(self, tup):
sentence = tup.values[0]
for word in sentence.split(" "):
self.emit([word])
class CounterBolt(storm.Bolt):
def initialize(self, conf, context):
self.word_counts = {}
def process(self, tup):
word = tup.values[0]
self.word_counts[word] = self.word_counts.get(word, 0) + 1
self.emit([word, self.word_counts[word]])
def run_local_topology():
# Create a local cluster for testing
cluster = LocalCluster()
# Define the topology
topology = storm.Topology("word-count-python-local")
topology.add_spout("sentence-spout", SentenceSpout, 1)
topology.add_bolt("splitter-bolt", SplitterBolt, 2,
groupings=[{"stream": "default", "bolt": "sentence-spout"}])
topology.add_bolt("counter-bolt", CounterBolt, 2,
groupings=[{"stream": "default", "bolt": "splitter-bolt"}])
# Submit the topology to the local cluster
topology.submit()
# Let it run for a while
storm.sleep(10000) # 10 seconds
# Kill the topology
cluster.kill_topology("word-count-python-local")
cluster.shutdown()
if __name__ == '__main__':
run_local_topology()
Now you can simply run:
python word_count_topology.py
You will see output from the spouts and bolts being printed to your console.
Best Practices and Considerations
- State Management: Bolts in a distributed cluster can be moved or restarted. The
self.word_countsdictionary in our example will be lost if the bolt is reassigned. For real applications, you need a distributed, fault-tolerant store for state, such as Redis, HBase, or Cassandra. ThestormPython library providesBasicBoltandIRichBoltbase classes that can help with this, but the underlying storage is up to you. - Error Handling: In the
processmethod of a bolt, always handle potential exceptions. If an exception is unhandled, the tuple will be "failed," and Storm will replay it according to the configured configuration (after a delay, a certain number of times, etc.). - Tuple Acking: For critical data streams, enable "acking". When a tuple is fully processed by all downstream bolts, the originating spout receives an "ack" message. If the tuple fails processing, the spout receives a "fail" message and can replay it. This is essential for exactly-once processing semantics.
- Parallelism: The number of instances you set for a spout or bolt (e.g.,
topology.add_spout(..., 1)) determines how many parallel processes of that component will run across the cluster. This is a key way to scale your topology. - Grouping Strategy: How you group bolts is crucial for performance.
- Shuffle Grouping (default): Tuples are randomly distributed among bolt instances. Good for general-purpose parallelism.
- Fields Grouping: Tuples that have the same values for the specified fields are sent to the same bolt instance. This is essential for stateful operations like our counter bolt, ensuring all counts for "the" go to the same bolt.
- All Grouping: The tuple is sent to all instances of the bolt. Use with caution as it creates massive data duplication.
Conclusion
Using Python with Apache Storm via the Multi-Language Protocol is a powerful way to leverage the Python data science ecosystem within a robust, distributed, real-time processing framework. While it introduces a layer of communication overhead compared to native JVM bolts, it offers unparalleled flexibility and integration with the vast Python library ecosystem. For data science teams and Python-centric organizations, it's an excellent choice for building real-time analytics and machine learning pipelines.
