杰瑞科技汇

Python如何用import etcd连接etcd集群?

Of course! Here is a comprehensive guide on how to import and use the etcd library in Python, from basic setup to common operations.

Python如何用import etcd连接etcd集群?-图1
(图片来源网络,侵删)

Installation

First, you need to install the official Python client for etcd. The recommended package is etcd3.

Open your terminal or command prompt and run:

pip install etcd3

Importing the Library

Once installed, you can import the etcd3 client into your Python script.

import etcd3

Connecting to an etcd Server

The etcd3 client requires a connection to an etcd server. You can do this by creating an instance of the etcd3.Client class.

Python如何用import etcd连接etcd集群?-图2
(图片来源网络,侵删)

Basic Connection (Default)

If etcd is running on your local machine with the default settings (localhost:2379), you can simply instantiate the client:

# This connects to localhost:2379
client = etcd3.Client()

Connection with Specific Host and Port

If your etcd server is running on a different host or port:

Python如何用import etcd连接etcd集群?-图3
(图片来源网络,侵删)
# Connect to a specific host and port
client = etcd3.Client(host='192.168.1.100', port=2379)

Connection with Authentication

If your etcd cluster has authentication enabled (username and password):

# Connect with username and password
client = etcd3.Client(
    host='192.168.1.100',
    port=2379,
    user='my-user',
    password='my-secret-password'
)

Basic CRUD Operations

Here are the most common operations you'll perform.

a. Putting a Key-Value Pair (put)

This sets the value of a key.

# Put a key-value pair
client.put('my-key', 'my-value')
print("Successfully put 'my-key' into etcd.")

b. Getting a Value by Key (get)

This retrieves the value associated with a key.

# Get the value for a key
value, metadata = client.get('my-key')
# The value is returned as bytes, so you need to decode it
print(f"The value for 'my-key' is: {value.decode('utf-8')}")
# metadata contains information about the key, like the revision number
print(f"Key revision: {metadata.modifiedIndex}")

c. Deleting a Key (delete)

This removes a key and its associated value.

# Delete a key
deleted = client.delete('my-key')
if deleted:
    print("Successfully deleted 'my-key'.")
else:
    print("Key 'my-key' not found.")

Advanced Features

etcd is more than a simple key-value store. Here's how to use some of its powerful features.

a. Leases (For Temporary Keys)

Leases are used to create keys that automatically expire after a certain time. This is perfect for service discovery, distributed locks, or configuration that needs to be refreshed.

# Create a lease that will expire in 10 seconds (10 seconds * 1 billion nanoseconds)
lease = client.lease(10)
# Put a key associated with the lease
client.put('service-a-status', 'running', lease=lease)
print("Put 'service-a-status' with a 10-second lease.")
# Wait for a bit to see the effect
import time
time.sleep(11)
# Try to get the key after the lease has expired
value, _ = client.get('service-a-status')
if value is None:
    print("\n'key has been automatically deleted after its lease expired.")

b. Watching for Changes (watch)

You can watch a key for changes. The watch method blocks until the key is modified or deleted.

# In one terminal, run this script to watch for changes
def watch_callback(event):
    """This function is called when a change is detected."""
    print(f"Event detected!")
    print(f"  Key: {event.key.decode('utf-8')}")
    print(f"  Value: {event.value.decode('utf-8')}")
    print(f"  Type: {event.type}") # 'PUT' or 'DELETE'
# Start watching a key
watch_id = client.add_watch_callback('my-key', watch_callback)
print(f"Watching 'my-key' with watch ID: {watch_id}")
# Keep the script running to receive events
try:
    while True:
        pass
except KeyboardInterrupt:
    print("\nStopping watch.")
    client.cancel_watch(watch_id)

How to test this:

  1. Run the script above. It will wait.
  2. In a different Python script or in a Python interpreter, run:
    import etcd3
    client = etcd3.Client()
    client.put('my-key', 'a-new-value')
    client.delete('my-key')
  3. You will see the events printed in the terminal where the first script is running.

c. Transactions

Transactions allow you to perform a series of operations atomically, based on certain conditions. This is extremely powerful for building robust systems.

Example: "Compare-and-Swap" (CAS)

Let's build a simple lock. We'll try to acquire a lock only if it doesn't already exist.

lock_key = 'my-app-global-lock'
lock_owner = 'server-01'
# Try to acquire the lock
# The transaction will only execute the 'success' operations IF the lock_key does not exist.
# If it exists, the 'failure' operations will run.
transaction_success = client.transaction(
    compare=[client.transactions.version(lock_key) == 0],
    success=[client.transactions.put(lock_key, lock_owner)],
    failure=[]
)
if transaction_success:
    print(f"Lock acquired by '{lock_owner}'!")
    # Do some work...
    print("Doing some critical work...")
    # Release the lock by deleting it
    client.delete(lock_key)
    print("Lock released.")
else:
    # Get the current owner of the lock
    current_owner, _ = client.get(lock_key)
    print(f"Failed to acquire lock. It is currently held by: {current_owner.decode('utf-8')}")

Complete Example: Service Registry with Expiration

Here is a practical example where a service registers itself with etcd using a lease. The registration automatically expires if the service crashes or goes offline.

import etcd3
import time
import threading
import random
# --- Configuration ---
ETCD_HOST = 'localhost'
ETCD_PORT = 2379
SERVICE_NAME = 'my-api-service'
INSTANCE_ID = f"{SERVICE_NAME}-{random.randint(1000, 9999)}"
HEARTBEAT_INTERVAL = 5  # seconds
LEASE_TTL = 15  # seconds (Time To Live)
# --- Setup ---
etcd_client = etcd3.Client(host=ETCD_HOST, port=ETCD_PORT)
lease = etcd_client.lease(ttl=LEASE_TTL)
# --- Service Registration ---
def register_service():
    """Registers the service instance with etcd."""
    service_key = f"services/{SERVICE_NAME}/{INSTANCE_ID}"
    service_value = f"http://{INSTANCE_ID}:8080"
    # Associate the key with the lease
    etcd_client.put(service_key, service_value, lease=lease)
    print(f"✅ Registered service '{service_key}' with value '{service_value}' and a {LEASE_TTL}s lease.")
# --- Heartbeat (Lease Renewal) ---
def heartbeat():
    """Keeps the lease alive by resetting its TTL."""
    while True:
        # The reset() call renews the lease for its original TTL
        lease.refresh()
        print(f"💓 Heartbeat sent for lease {lease.id}. Lease renewed.")
        time.sleep(HEARTBEAT_INTERVAL)
# --- Main Execution ---
if __name__ == "__main__":
    register_service()
    # Start the heartbeat in a background thread
    heartbeat_thread = threading.Thread(target=heartbeat, daemon=True)
    heartbeat_thread.start()
    print("Service is running. Press Ctrl+C to stop.")
    try:
        # Keep the main thread alive
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("\n🛑 Service shutting down...")
        # The lease will be automatically revoked when the script exits
        # and the etcd3 client object is garbage collected.
        print("Service unregistered.")

This example demonstrates the core concepts:

  1. Connection: etcd3.Client()
  2. Lease: etcd_client.lease()
  3. Put with Lease: etcd_client.put(..., lease=lease)
  4. Heartbeat: lease.refresh() in a background thread to prevent automatic expiration.
  5. Automatic Cleanup: When the script stops, the lease is revoked, and etcd automatically removes the key.
分享:
扫描分享到社交APP
上一篇
下一篇