杰瑞科技汇

MongoDB与Python如何高效交互?

Of course! Here is a comprehensive guide to using MongoDB with Python, covering everything from setup to advanced operations.

Table of Contents

  1. What is MongoDB? (A quick refresher)
  2. Why Use MongoDB with Python?
  3. Step 1: Installation
    • Install the MongoDB Server
    • Install the Python Driver (pymongo)
  4. Step 2: Connecting to MongoDB
    • Connecting to a Local Server
    • Connecting to MongoDB Atlas (Cloud)
  5. Step 3: Core CRUD Operations
    • Create: Inserting Documents
    • Read: Querying Documents
    • Update: Modifying Documents
    • Delete: Removing Documents
  6. Step 4: Advanced Concepts
    • Working with ObjectId
    • Handling Nested Documents and Arrays
    • Aggregation Framework
  7. Step 5: Best Practices
    • Error Handling
    • Using Environment Variables for Credentials
    • Connection Management (Closing Connections)
  8. Full Example: A Simple To-Do List App

What is MongoDB?

MongoDB is a popular, open-source NoSQL database. Instead of storing data in tables with rows and columns like a traditional SQL database (e.g., MySQL, PostgreSQL), MongoDB stores data in a flexible, JSON-like format called BSON (Binary JSON).

  • Collections: The equivalent of a table.
  • Documents: The equivalent of a row. A document is a set of key-value pairs (a Python dictionary).
  • Fields: The equivalent of a column.

Example Document:

{
  "name": "Alice",
  "age": 30,
  "email": "alice@example.com",
  "courses": ["History", "Math"],
  "address": {
    "street": "123 Main St",
    "city": "Wonderland"
  }
}

Why Use MongoDB with Python?

The combination is incredibly powerful and popular for several reasons:

  • Native Data Format: Python dictionaries and lists map almost directly to MongoDB documents. This makes data manipulation intuitive and requires minimal "translation" code.
  • Flexibility: You can store documents with different structures (schemas) in the same collection, which is great for rapidly evolving applications.
  • Scalability: MongoDB is designed for horizontal scaling (sharding across multiple servers), making it suitable for large-scale applications.
  • Rich Python Ecosystem: The official pymongo driver is mature, well-documented, and has excellent community support. Libraries like MongoEngine and PyMongo-SQL provide even more advanced features like ORM-like functionality.

Step 1: Installation

You need two things: the MongoDB server itself and the Python driver to connect to it.

A. Install the MongoDB Server

You have two main options:

  1. Local Installation (Recommended for development):

    • macOS (using Homebrew): brew install mongodb-community
    • Windows (using Chocolatey): choco install mongodb
    • Linux (Debian/Ubuntu): Follow the official MongoDB documentation.

    After installation, start the MongoDB service:

    • On macOS: brew services start mongodb-community
    • On Linux: sudo systemctl start mongod
  2. MongoDB Atlas (Cloud - Recommended for production):

    • This is a free, cloud-hosted version of MongoDB. It's the easiest way to get started without managing a server.
    • Go to MongoDB Atlas, sign up for a free tier, and create a new cluster.
    • Once your cluster is running, you'll get a connection string. This is all you need to connect from Python.

B. Install the Python Driver (pymongo)

Open your terminal or command prompt and install the library using pip:

pip install pymongo

Step 2: Connecting to MongoDB

Create a Python file (e.g., connect.py) and let's establish a connection.

A. Connecting to a Local Server

If you installed MongoDB locally on your machine, it typically runs on localhost at port 27017.

from pymongo import MongoClient
# The default connection URI for a local MongoDB instance
# URI format: mongodb://<host>:<port>/
client = MongoClient('mongodb://localhost:27017/')
# Now, let's get a handle to a specific database
# If the database doesn't exist, MongoDB will create it when you first store data in it.
db = client['my_database']
# You can also get a handle to a collection (like a table)
# If the collection doesn't exist, it will be created.
users_collection = db['users']
print("Successfully connected to MongoDB!")

B. Connecting to MongoDB Atlas

When you create a cluster in Atlas, it adds security by default. You need to:

  1. Create a database user with a username and password.
  2. Whitelist your IP address (or 0.0.0/0 to allow access from anywhere, for development only).

Atlas will give you a connection string. It will look something like this:

mongodb+srv://<username>:<password>@mycluster.xxxxx.mongodb.net/my_database?retryWrites=true&w=majority

Replace <username>, <password>, and my_database with your actual credentials.

from pymongo import MongoClient
import os # Best practice to load from environment variables
# It's bad practice to hardcode credentials. Use environment variables.
# In your terminal, you would set them like this:
# export MONGO_URI="mongodb+srv://..."
uri = os.getenv("MONGO_URI") 
# If you don't use env vars, replace with your actual connection string
# uri = "mongodb+srv://<username>:<password>@mycluster.xxxxx.mongodb.net/my_database?retryWrites=true&w=majority"
# Create a new client and connect to the server
client = MongoClient(uri)
# Get a handle to the database
db = client['my_database']
# Test the connection
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)

Step 3: Core CRUD Operations

Let's perform basic operations on our users_collection.

C - Create: Inserting Documents

# --- Insert a single document ---
user1 = {
    "name": "Alice",
    "age": 30,
    "email": "alice@example.com"
}
insert_result = users_collection.insert_one(user1)
print(f"Inserted document with id: {insert_result.inserted_id}")
# --- Insert multiple documents at once ---
user2 = {"name": "Bob", "age": 25, "city": "New York"}
user3 = {"name": "Charlie", "age": 35, "city": "London"}
insert_many_result = users_collection.insert_many([user2, user3])
print(f"Inserted {len(insert_many_result.inserted_ids)} documents.")

R - Read: Querying Documents

# --- Find one document ---
print("\n--- Finding one document ---")
found_user = users_collection.find_one({"name": "Alice"})
print(found_user)
# --- Find all documents matching a query ---
print("\n--- Finding all users older than 28 ---")
older_users = users_collection.find({"age": {"$gt": 28}})
for user in older_users:
    print(user)
# --- Find with multiple conditions (AND) ---
print("\n--- Finding Bob in New York ---")
bob_ny = users_collection.find({"name": "Bob", "city": "New York"})
print(bob_ny.next()) # .next() gets the first result from the cursor

U - Update: Modifying Documents

Use update operators like $set to modify fields without overwriting the entire document.

# --- Update one document ---
# Set Alice's age to 31
update_result = users_collection.update_one(
    {"name": "Alice"},
    {"$set": {"age": 31}}
)
print(f"\nMatched {update_result.matched_count} document and modified {update_result.modified_count} document.")
# --- Update multiple documents ---
# Add a "status" field to all users
update_many_result = users_collection.update_many(
    {}, # Empty filter means ALL documents
    {"$set": {"status": "active"}}
)
print(f"Updated {update_many_result.modified_count} documents with 'status' field.")

D - Delete: Removing Documents

# --- Delete one document ---
# Delete Charlie
delete_result = users_collection.delete_one({"name": "Charlie"})
print(f"\nDeleted {delete_result.deleted_count} document.")
# --- Delete multiple documents ---
# Delete all users older than 28
delete_many_result = users_collection.delete_many({"age": {"$gt": 28}})
print(f"Deleted {delete_many_result.deleted_count} documents.")

Step 4: Advanced Concepts

Working with ObjectId

When you insert a document, MongoDB automatically adds a unique _id field. This is an ObjectId, not a simple string.

# When you insert a document, you get its ID back
insert_result = users_collection.insert_one({"name": "David"})
object_id = insert_result.inserted_id
print(f"Type of _id: {type(object_id)}")
# You MUST query using the ObjectId object, not a string
from bson.objectid import ObjectId
found_user = users_collection.find_one({"_id": ObjectId(object_id)})
print(found_user)

Handling Nested Documents and Arrays

Use "dot notation" to access fields inside nested documents or elements in arrays.

# Insert a document with nested data
address_doc = {
    "name": "Eve",
    "contact": {
        "email": "eve@example.com",
        "phone": "123-456-7890"
    },
    "tags": ["developer", "python"]
}
users_collection.insert_one(address_doc)
# Query a nested field
print("\n--- Querying nested field ---")
eve = users_collection.find_one({"contact.email": "eve@example.com"})
print(eve)
# Query an array (looks for documents where the array contains the value)
print("\n--- Querying array field ---")
python_dev = users_collection.find_one({"tags": "python"})
print(python_dev)

Aggregation Framework

Aggregation allows you to process data records and return computed results. It's like SQL's GROUP BY on steroids.

Goal: Find the number of users in each city.

# Let's re-insert some users with cities for this example
users_collection.insert_many([
    {"name": "Frank", "city": "New York"},
    {"name": "Grace", "city": "London"},
    {"name": "Heidi", "city": "New York"}
])
# The aggregation pipeline is a list of stages
pipeline = [
    {
        "$group": {
            "_id": "$city",  # Group by the 'city' field
            "count": {"$sum": 1} # Count the number of documents in each group
        }
    }
]
print("\n--- Aggregation: User count by city ---")
user_counts = users_collection.aggregate(pipeline)
for count in user_counts:
    print(count)

Step 5: Best Practices

Error Handling

Network issues or invalid queries can raise exceptions. Always wrap your operations in a try...except block.

try:
    result = users_collection.insert_one({"invalid": "data"})
except pymongo.errors.PyMongoError as e:
    print(f"An error occurred: {e}")

Using Environment Variables for Credentials

Never hardcode your database username and password in your code. Use environment variables.

# In your Python script
import os
from pymongo import MongoClient
uri = os.environ.get("MONGO_URI")
if not uri:
    raise ValueError("MONGO_URI environment variable not set!")
client = MongoClient(uri)
# ... rest of your code

Connection Management

The MongoClient object manages a connection pool. It's best practice to create a single instance of MongoClient for your application and reuse it. Don't create a new client for every operation. Make sure to close the connection when your application shuts down.

# At the start of your application
client = MongoClient('mongodb://localhost:27017/')
# ... perform all your operations ...
# At the end of your application (e.g., in a shutdown function)
client.close()

Full Example: A Simple To-Do List App

Here’s a complete script demonstrating all CRUD operations for a to-do list.

from pymongo import MongoClient
from bson.objectid import ObjectId
import datetime
# --- Setup ---
# Assuming a local MongoDB instance
client = MongoClient('mongodb://localhost:27017/')
db = client['todo_app']
todos_collection = db['todos']
def add_task(task_text):
    """Adds a new task to the to-do list."""
    new_task = {
        "text": task_text,
        "completed": False,
        "created_at": datetime.datetime.utcnow()
    }
    result = todos_collection.insert_one(new_task)
    print(f"Added new task with id: {result.inserted_id}")
def list_tasks():
    """Lists all tasks, showing their status."""
    print("\n--- Your To-Do List ---")
    tasks = todos_collection.find().sort("created_at", 1) # Sort by creation date
    if tasks.count() == 0:
        print("No tasks yet.")
        return
    for task in tasks:
        status = "✓" if task["completed"] else " "
        print(f"[{status}] {task['text']} (ID: {task['_id']})")
def complete_task(task_id):
    """Marks a task as completed."""
    result = todos_collection.update_one(
        {"_id": ObjectId(task_id)},
        {"$set": {"completed": True}}
    )
    if result.modified_count > 0:
        print(f"Task {task_id} marked as complete!")
    else:
        print(f"Task with id {task_id} not found.")
def delete_task(task_id):
    """Deletes a task from the list."""
    result = todos_collection.delete_one({"_id": ObjectId(task_id)})
    if result.deleted_count > 0:
        print(f"Task {task_id} deleted.")
    else:
        print(f"Task with id {task_id} not found.")
# --- Main Program ---
if __name__ == "__main__":
    add_task("Learn Python")
    add_task("Learn MongoDB")
    add_task("Build a cool app")
    list_tasks()
    complete_task("63f9c1b3f1b3b1b3b1b3b1b3") # Replace with a real _id from your output
    list_tasks()
    delete_task("63f9c1b3f1b3b1b3b1b3b1b4") # Replace with a real _id
    list_tasks()
    # Clean up (optional)
    # todos_collection.delete_many({})
    client.close()
分享:
扫描分享到社交APP
上一篇
下一篇