Python Development

Python Development Guide

This comprehensive guide covers Python programming, focusing on data science, machine learning, and general development practices. It includes key libraries and frameworks essential for modern Python development.

Core Python Concepts

Data Structures and Operations

Lists:

# Creating and manipulating lists
my_list = [1, 2, 3, 4, 5]
my_list.append(6)          # Add element
my_list.insert(0, 0)       # Insert at position
my_list.remove(3)          # Remove element
my_list.pop()              # Remove last element
my_list.sort()             # Sort in place
sorted_list = sorted(my_list)  # Return sorted copy

Dictionaries:

# Dictionary operations
my_dict = {'name': 'John', 'age': 30, 'city': 'New York'}
my_dict['email'] = 'john@example.com'  # Add key-value
del my_dict['city']       # Remove key
my_dict.update({'age': 31, 'job': 'Developer'})  # Update multiple

# Dictionary methods
keys = my_dict.keys()
values = my_dict.values()
items = my_dict.items()

Sets:

# Set operations
set1 = {1, 2, 3, 4}
set2 = {3, 4, 5, 6}

union = set1 | set2        # Union
intersection = set1 & set2 # Intersection
difference = set1 - set2   # Difference
symmetric_diff = set1 ^ set2  # Symmetric difference

File Operations

Reading and Writing Files:

# Reading files
with open('file.txt', 'r') as f:
    content = f.read()          # Read entire file
    lines = f.readlines()       # Read all lines
    line = f.readline()         # Read single line

# Writing files
with open('output.txt', 'w') as f:
    f.write('Hello World\n')
    f.writelines(['Line 1\n', 'Line 2\n'])

# Binary files
with open('image.jpg', 'rb') as f:
    data = f.read()

Error Handling

Exception Handling:

try:
    # Code that might raise an exception
    result = 10 / 0
except ZeroDivisionError as e:
    print(f"Division by zero: {e}")
except Exception as e:
    print(f"General error: {e}")
else:
    print("No exceptions occurred")
finally:
    print("Always executed")

Custom Exceptions:

class CustomError(Exception):
    pass

def validate_age(age):
    if age < 0:
        raise CustomError("Age cannot be negative")
    if age > 150:
        raise CustomError("Age cannot be greater than 150")

NumPy: Numerical Computing

NumPy provides efficient arrays and mathematical operations for scientific computing.

Array Creation and Operations

Basic Arrays:

import numpy as np

# 1D array (vector)
arr1 = np.array([1, 2, 3, 4])
print(arr1)  # [1 2 3 4]

# 2D array (matrix)
arr2 = np.array([[1, 2, 3], [4, 5, 6]])
print(arr2)
# [[1 2 3]
#  [4 5 6]]

# Special arrays
zeros = np.zeros((2, 3))      # 2x3 matrix of zeros
ones = np.ones((3, 3))        # 3x3 matrix of ones
eye = np.eye(3)               # 3x3 identity matrix
random = np.random.rand(2, 3) # 2x3 random matrix (0-1)
normal = np.random.randn(2, 3) # 2x3 normal distribution

# Sequences
arange = np.arange(0, 10, 2)  # [0, 2, 4, 6, 8]
linspace = np.linspace(0, 1, 5)  # 5 values between 0 and 1

Array Properties:

arr = np.array([[1, 2, 3], [4, 5, 6]])

print(arr.shape)  # (2, 3) - dimensions
print(arr.ndim)   # 2 - number of dimensions
print(arr.size)   # 6 - total elements
print(arr.dtype)  # int64 - data type

Indexing and Slicing:

arr = np.array([[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12]])

print(arr[0])      # [1 2 3 4] - first row
print(arr[:, 0])   # [1 5 9] - first column
print(arr[1, :])   # [5 6 7 8] - second row
print(arr[0:2, 1:3])  # [[2 3], [6 7]] - submatrix
print(arr[::2])    # Every other row

Mathematical Operations:

arr = np.array([1, 2, 3, 4])

# Element-wise operations
print(arr * 2)     # [2 4 6 8]
print(arr + arr)   # [2 4 6 8]
print(np.sqrt(arr)) # [1. 1.414 1.732 2.]
print(np.exp(arr))  # [2.718 7.389 20.085 54.598]
print(np.log(arr))  # [0. 0.693 1.099 1.386]

# Statistics
print(arr.mean())  # 2.5
print(arr.sum())   # 10
print(arr.max())   # 4
print(arr.min())   # 1
print(arr.std())   # 1.118
print(arr.var())   # 1.25

Array Manipulation:

arr = np.array([[1, 2, 3], [4, 5, 6]])

# Reshaping
reshaped = arr.reshape(3, 2)  # Change to 3x2
flattened = arr.flatten()      # 1D array
transposed = arr.T             # Transpose

# Concatenation
arr2 = np.array([[7, 8, 9]])
concat_rows = np.concatenate([arr, arr2], axis=0)  # Add row
concat_cols = np.concatenate([arr, arr.T], axis=1) # Add column

# Stacking
stacked = np.stack([arr, arr], axis=0)  # New dimension

Linear Algebra:

A = np.array([[1, 2], [3, 4]])
B = np.array([[5, 6], [7, 8]])
v = np.array([1, 2])

# Matrix operations
dot_product = A @ B          # Matrix multiplication
dot_product = np.dot(A, B)   # Alternative
vector_norm = np.linalg.norm(v)  # Vector norm
matrix_inv = np.linalg.inv(A)    # Matrix inverse
determinant = np.linalg.det(A)   # Determinant

# Eigenvalues and eigenvectors
eigenvals, eigenvecs = np.linalg.eig(A)

# Singular value decomposition
U, s, Vt = np.linalg.svd(A)

Pandas: Data Manipulation

Pandas provides powerful data structures for data analysis and manipulation.

DataFrame and Series

Creating DataFrames:

import pandas as pd

# From dictionary
data = {
    'name': ['Alice', 'Bob', 'Charlie'],
    'age': [25, 30, 35],
    'city': ['NYC', 'LA', 'Chicago']
}
df = pd.DataFrame(data)

# From list of dictionaries
data_list = [
    {'name': 'Alice', 'age': 25, 'city': 'NYC'},
    {'name': 'Bob', 'age': 30, 'city': 'LA'},
    {'name': 'Charlie', 'age': 35, 'city': 'Chicago'}
]
df = pd.DataFrame(data_list)

# From CSV
df = pd.read_csv('data.csv')
df = pd.read_excel('data.xlsx')

DataFrame Operations:

# Basic information
print(df.head())      # First 5 rows
print(df.tail())      # Last 5 rows
print(df.info())      # Data types and non-null counts
print(df.describe())  # Statistical summary
print(df.shape)       # Dimensions
print(df.columns)     # Column names
print(df.index)       # Index

Data Selection:

# Column selection
names = df['name']           # Single column (Series)
subset = df[['name', 'age']] # Multiple columns (DataFrame)

# Row selection
first_row = df.loc[0]        # By label
second_row = df.iloc[1]      # By position

# Conditional selection
adults = df[df['age'] > 25]
nyc_people = df[df['city'] == 'NYC']

# Multiple conditions
young_nyc = df[(df['age'] < 30) & (df['city'] == 'NYC')]

Data Modification:

# Adding columns
df['age_in_10_years'] = df['age'] + 10
df['full_name'] = df['first_name'] + ' ' + df['last_name']

# Modifying values
df.loc[0, 'age'] = 26
df['city'] = df['city'].str.upper()

# Renaming columns
df = df.rename(columns={'name': 'full_name', 'city': 'location'})

Data Cleaning:

# Handling missing values
df.dropna()                    # Remove rows with NaN
df.fillna(0)                   # Fill NaN with 0
df.fillna(df.mean())           # Fill with column mean

# Removing duplicates
df.drop_duplicates()

# Data type conversion
df['age'] = df['age'].astype(int)
df['date'] = pd.to_datetime(df['date'])

Grouping and Aggregation:

# Group by single column
grouped = df.groupby('city')
city_counts = grouped.size()
city_avg_age = grouped['age'].mean()

# Group by multiple columns
grouped_multi = df.groupby(['city', 'department'])
stats = grouped_multi['salary'].agg(['mean', 'min', 'max', 'count'])

# Custom aggregation
def custom_agg(x):
    return x.max() - x.min()

result = df.groupby('city')['age'].agg(custom_agg)

Merging and Joining:

# Creating sample DataFrames
df1 = pd.DataFrame({
    'id': [1, 2, 3],
    'name': ['Alice', 'Bob', 'Charlie']
})

df2 = pd.DataFrame({
    'id': [1, 2, 4],
    'salary': [50000, 60000, 70000]
})

# Inner join (default)
inner_join = pd.merge(df1, df2, on='id', how='inner')

# Left join
left_join = pd.merge(df1, df2, on='id', how='left')

# Outer join
outer_join = pd.merge(df1, df2, on='id', how='outer')

# Concatenation
df3 = pd.DataFrame({
    'id': [5, 6],
    'name': ['David', 'Eve']
})

concatenated = pd.concat([df1, df3], ignore_index=True)

Data Export:

# To CSV
df.to_csv('output.csv', index=False)

# To Excel
df.to_excel('output.xlsx', index=False)

# To JSON
df.to_json('output.json', orient='records')

# To SQL
from sqlalchemy import create_engine
engine = create_engine('sqlite:///data.db')
df.to_sql('table_name', engine, index=False, if_exists='replace')

Scikit-learn: Machine Learning

Scikit-learn provides tools for machine learning and data analysis.

Data Preprocessing

Train-Test Split:

from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

Feature Scaling:

from sklearn.preprocessing import StandardScaler, MinMaxScaler

# Standardization (mean=0, std=1)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Normalization (0-1 range)
minmax_scaler = MinMaxScaler()
X_train_normalized = minmax_scaler.fit_transform(X_train)
X_test_normalized = minmax_scaler.transform(X_test)

Encoding Categorical Variables:

from sklearn.preprocessing import LabelEncoder, OneHotEncoder

# Label encoding
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y_categorical)

# One-hot encoding
onehot_encoder = OneHotEncoder(sparse=False)
X_encoded = onehot_encoder.fit_transform(X_categorical)

Model Training and Evaluation

Classification Example:

from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

# Load data
iris = load_iris()
X, y = iris.data, iris.target

# Split data
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Train model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train_scaled, y_train)

# Make predictions
y_pred = model.predict(X_test_scaled)

# Evaluate model
print("Accuracy:", accuracy_score(y_test, y_pred))
print("Classification Report:")
print(classification_report(y_test, y_pred))
print("Confusion Matrix:")
print(confusion_matrix(y_test, y_pred))

Regression Example:

from sklearn.datasets import load_boston
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score

# Load data
boston = load_boston()
X, y = boston.data, boston.target

# Split data
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# Train model
regressor = LinearRegression()
regressor.fit(X_train, y_train)

# Make predictions
y_pred = regressor.predict(X_test)

# Evaluate model
print("Mean Squared Error:", mean_squared_error(y_test, y_pred))
print("R² Score:", r2_score(y_test, y_pred))

Cross-Validation

K-Fold Cross-Validation:

from sklearn.model_selection import cross_val_score, KFold

# K-fold cross-validation
kf = KFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(model, X, y, cv=kf, scoring='accuracy')

print("Cross-validation scores:", scores)
print("Mean accuracy:", scores.mean())
print("Standard deviation:", scores.std())

Grid Search for Hyperparameter Tuning:

from sklearn.model_selection import GridSearchCV

# Define parameter grid
param_grid = {
    'n_estimators': [50, 100, 200],
    'max_depth': [None, 10, 20, 30],
    'min_samples_split': [2, 5, 10]
}

# Grid search
grid_search = GridSearchCV(
    RandomForestClassifier(random_state=42),
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1
)

grid_search.fit(X_train, y_train)

print("Best parameters:", grid_search.best_params_)
print("Best score:", grid_search.best_score_)

Clustering

K-Means Clustering:

from sklearn.cluster import KMeans
from sklearn.datasets import make_blobs

# Generate sample data
X, _ = make_blobs(n_samples=300, centers=4, cluster_std=0.60, random_state=0)

# K-means clustering
kmeans = KMeans(n_clusters=4, random_state=42)
clusters = kmeans.fit_predict(X)

# Get cluster centers
centers = kmeans.cluster_centers_

Dimensionality Reduction

Principal Component Analysis (PCA):

from sklearn.decomposition import PCA

# PCA for dimensionality reduction
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_scaled)

# Explained variance ratio
print("Explained variance ratio:", pca.explained_variance_ratio_)
print("Total explained variance:", sum(pca.explained_variance_ratio_))

Apache Airflow: Workflow Orchestration

Apache Airflow is a platform for programmatically authoring, scheduling, and monitoring workflows. It’s widely used for data pipelines, ETL processes, and task automation.

Installation

# Install Airflow
pip install apache-airflow

# Initialize database (SQLite for development)
airflow db init

# Create admin user
airflow users create \
    --username admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com

# Start web server
airflow webserver --port 8080

# Start scheduler (in another terminal)
airflow scheduler

Core Concepts

DAG (Directed Acyclic Graph): A collection of tasks with dependencies Task: A single unit of work Operator: A template for a task (defines what to do) Sensor: A special type of operator that waits for a condition

Basic DAG Structure

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

# Default arguments for the DAG
default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Create DAG
dag = DAG(
    'example_dag',
    default_args=default_args,
    description='A simple example DAG',
    schedule_interval=timedelta(days=1),  # Run daily
    catchup=False,
)

# Define tasks
def print_hello():
    print("Hello from Airflow!")

task1 = PythonOperator(
    task_id='print_hello',
    python_callable=print_hello,
    dag=dag,
)

task2 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

# Set task dependencies
task1 >> task2  # task1 runs before task2

Common Operators

BashOperator: Execute bash commands

bash_task = BashOperator(
    task_id='run_script',
    bash_command='python /path/to/script.py',
    dag=dag,
)

PythonOperator: Execute Python functions

def process_data():
    import pandas as pd
    # Data processing logic
    df = pd.read_csv('input.csv')
    # Process data...
    df.to_csv('output.csv')

python_task = PythonOperator(
    task_id='process_data',
    python_callable=process_data,
    dag=dag,
)

EmailOperator: Send emails

from airflow.operators.email import EmailOperator

email_task = EmailOperator(
    task_id='send_email',
    to='recipient@example.com',
    subject='Pipeline Complete',
    html_content='<p>The data pipeline has completed successfully.</p>',
    dag=dag,
)

HttpSensor: Wait for HTTP endpoint

from airflow.sensors.http_sensor import HttpSensor

http_sensor = HttpSensor(
    task_id='wait_for_api',
    http_conn_id='http_default',
    endpoint='api/v1/status',
    request_params={},
    response_check=lambda response: response.json()['status'] == 'ready',
    poke_interval=30,  # Check every 30 seconds
    timeout=600,       # Timeout after 10 minutes
    dag=dag,
)

Data Pipeline Example

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime, timedelta
import pandas as pd

default_args = {
    'owner': 'data_engineer',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'etl_pipeline',
    default_args=default_args,
    description='ETL pipeline for customer data',
    schedule_interval='@daily',
    catchup=False,
)

def extract_data():
    """Extract data from source"""
    # Simulate data extraction
    data = {
        'customer_id': [1, 2, 3, 4, 5],
        'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
        'email': ['alice@example.com', 'bob@example.com',
                 'charlie@example.com', 'david@example.com', 'eve@example.com']
    }
    df = pd.DataFrame(data)
    df.to_csv('/tmp/customers_raw.csv', index=False)
    return 'Data extracted successfully'

def transform_data():
    """Transform and clean data"""
    df = pd.read_csv('/tmp/customers_raw.csv')

    # Data cleaning
    df['name'] = df['name'].str.title()
    df['email'] = df['email'].str.lower()

    # Add derived columns
    df['domain'] = df['email'].str.split('@').str[1]
    df['signup_date'] = datetime.now().date()

    df.to_csv('/tmp/customers_clean.csv', index=False)
    return 'Data transformed successfully'

def load_data():
    """Load data to database"""
    df = pd.read_csv('/tmp/customers_clean.csv')

    # Connect to PostgreSQL
    hook = PostgresHook(postgres_conn_id='postgres_default')

    # Create table if not exists
    create_table_sql = """
    CREATE TABLE IF NOT EXISTS customers (
        customer_id INTEGER PRIMARY KEY,
        name VARCHAR(100),
        email VARCHAR(100),
        domain VARCHAR(100),
        signup_date DATE
    );
    """
    hook.run(create_table_sql)

    # Insert data
    for _, row in df.iterrows():
        insert_sql = f"""
        INSERT INTO customers (customer_id, name, email, domain, signup_date)
        VALUES ({row['customer_id']}, '{row['name']}', '{row['email']}',
                '{row['domain']}', '{row['signup_date']}')
        ON CONFLICT (customer_id) DO UPDATE SET
            name = EXCLUDED.name,
            email = EXCLUDED.email,
            domain = EXCLUDED.domain;
        """
        hook.run(insert_sql)

    return 'Data loaded successfully'

# Define tasks
extract_task = PythonOperator(
    task_id='extract',
    python_callable=extract_data,
    dag=dag,
)

transform_task = PythonOperator(
    task_id='transform',
    python_callable=transform_data,
    dag=dag,
)

load_task = PythonOperator(
    task_id='load',
    python_callable=load_data,
    dag=dag,
)

# Data quality check
data_quality_check = PostgresOperator(
    task_id='data_quality_check',
    postgres_conn_id='postgres_default',
    sql="""
    SELECT COUNT(*) as record_count FROM customers
    WHERE customer_id IS NOT NULL;
    """,
    dag=dag,
)

# Set dependencies
extract_task >> transform_task >> load_task >> data_quality_check

Scheduling and Triggers

Schedule Intervals:

# Common schedule intervals
schedule_interval='@daily'        # Every day at midnight
schedule_interval='@hourly'       # Every hour
schedule_interval='@weekly'       # Every week
schedule_interval='0 9 * * *'     # Every day at 9 AM
schedule_interval='*/30 * * * *'  # Every 30 minutes
schedule_interval=None            # Manual only

Manual Trigger:

# Trigger DAG manually
airflow dags trigger example_dag

# Trigger with configuration
airflow dags trigger example_dag \
    --conf '{"key": "value"}'

Variables and Connections

Airflow Variables:

from airflow.models import Variable

# Set variable via UI or CLI
# airflow variables set my_var "my_value"

# Get variable in code
my_var = Variable.get("my_var")

# Set variable programmatically
Variable.set("processed_count", 100)

Connections:

from airflow.hooks.base import BaseHook

# Get connection
conn = BaseHook.get_connection('postgres_default')
host = conn.host
user = conn.login
password = conn.password
database = conn.schema

Best Practices

  1. Idempotent Tasks: Ensure tasks can be run multiple times safely
  2. Atomic Operations: Use transactions for database operations
  3. Error Handling: Implement proper error handling and retries
  4. Logging: Use Airflow’s logging for debugging
  5. Modular DAGs: Break complex workflows into smaller, reusable DAGs
  6. Testing: Test DAGs locally before deployment
  7. Documentation: Document DAGs and tasks with docstrings
  8. Resource Management: Set appropriate resource limits
  9. Monitoring: Use Airflow’s web UI for monitoring
  10. Version Control: Keep DAGs in version control

Testing DAGs

from airflow.utils.test import get_test_dag_bag

def test_dag():
    """Test DAG structure and dependencies"""
    dag_bag = get_test_dag_bag()
    dag = dag_bag.get_dag('example_dag')

    assert dag is not None
    assert len(dag.tasks) == 3

    # Test task dependencies
    task1 = dag.get_task('task1')
    task2 = dag.get_task('task2')
    assert task2 in task1.downstream_list

if __name__ == '__main__':
    test_dag()

Deployment Considerations

Production Setup:

  • Use external database (PostgreSQL/MySQL) instead of SQLite
  • Configure proper logging and monitoring
  • Set up proper authentication and authorization
  • Use connection pools for database connections
  • Implement proper backup and recovery procedures

Scaling:

  • Use CeleryExecutor for distributed task execution
  • Configure worker nodes for parallel processing
  • Monitor resource usage and adjust accordingly

Additional Libraries

Requests: HTTP Library

import requests

# GET request
response = requests.get('https://api.example.com/data')
print(response.status_code)
print(response.json())

# POST request
data = {'key': 'value'}
response = requests.post('https://api.example.com/submit', json=data)

# With authentication
response = requests.get('https://api.example.com/protected',
                       auth=('username', 'password'))

# Headers and timeout
headers = {'Authorization': 'Bearer token'}
response = requests.get('https://api.example.com/data',
                       headers=headers, timeout=5)

Virtual Environments

Using venv:

# Create virtual environment
python -m venv myenv

# Activate (Linux/Mac)
source myenv/bin/activate

# Activate (Windows)
myenv\Scripts\activate

# Install packages
pip install package-name

# Deactivate
deactivate

# Export requirements
pip freeze > requirements.txt

# Install from requirements
pip install -r requirements.txt

Best Practices

  1. Code Style: Follow PEP 8 guidelines
  2. Type Hints: Use type annotations for better code documentation
  3. Error Handling: Implement proper exception handling
  4. Testing: Write unit tests with pytest or unittest
  5. Documentation: Use docstrings and comments
  6. Virtual Environments: Always use virtual environments
  7. Dependencies: Keep requirements.txt updated
  8. Performance: Use appropriate data structures and algorithms
  9. Security: Validate inputs and handle sensitive data carefully
  10. Version Control: Use Git for all projects

This guide provides a comprehensive foundation for Python development, covering core concepts, data manipulation, machine learning, and best practices.