Database Practical Examples
Database Practical Examples
Section titled “Database Practical Examples”This document provides hands-on examples and code snippets for various database operations and scenarios.
SQL Examples
Section titled “SQL Examples”Basic CRUD Operations
Section titled “Basic CRUD Operations”Create Table with Constraints
Section titled “Create Table with Constraints”-- Create a comprehensive employees tableCREATE TABLE employees ( id INT PRIMARY KEY AUTO_INCREMENT, employee_id VARCHAR(10) UNIQUE NOT NULL, first_name VARCHAR(50) NOT NULL, last_name VARCHAR(50) NOT NULL, email VARCHAR(100) UNIQUE NOT NULL, phone VARCHAR(20), hire_date DATE NOT NULL, salary DECIMAL(10,2) CHECK (salary > 0), department_id INT, manager_id INT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (department_id) REFERENCES departments(id), FOREIGN KEY (manager_id) REFERENCES employees(id));
-- Create departments tableCREATE TABLE departments ( id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(100) NOT NULL, location VARCHAR(100), budget DECIMAL(12,2), manager_id INT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP);Advanced Queries
Section titled “Advanced Queries”-- Complex join with aggregationSELECT d.name AS department_name, COUNT(e.id) AS employee_count, AVG(e.salary) AS average_salary, MAX(e.salary) AS highest_salary, MIN(e.salary) AS lowest_salaryFROM departments dLEFT JOIN employees e ON d.id = e.department_idWHERE d.budget > 100000GROUP BY d.id, d.nameHAVING COUNT(e.id) > 5ORDER BY average_salary DESC;
-- Window functions for rankingSELECT first_name, last_name, salary, department_id, ROW_NUMBER() OVER (PARTITION BY department_id ORDER BY salary DESC) AS dept_rank, RANK() OVER (ORDER BY salary DESC) AS overall_rank, DENSE_RANK() OVER (ORDER BY salary DESC) AS dense_rank, LAG(salary, 1) OVER (ORDER BY salary DESC) AS prev_salary, LEAD(salary, 1) OVER (ORDER BY salary DESC) AS next_salaryFROM employeesORDER BY salary DESC;
-- Recursive CTE for hierarchical dataWITH RECURSIVE employee_hierarchy AS ( -- Base case: top-level managers SELECT id, first_name, last_name, manager_id, 0 AS level FROM employees WHERE manager_id IS NULL
UNION ALL
-- Recursive case: subordinates SELECT e.id, e.first_name, e.last_name, e.manager_id, eh.level + 1 FROM employees e JOIN employee_hierarchy eh ON e.manager_id = eh.id)SELECT CONCAT(REPEAT(' ', level), first_name, ' ', last_name) AS hierarchy, levelFROM employee_hierarchyORDER BY level, first_name;Indexing Strategies
Section titled “Indexing Strategies”-- Create composite index for common query patternsCREATE INDEX idx_employee_dept_salary ON employees(department_id, salary DESC);
-- Create covering index to avoid table lookupsCREATE INDEX idx_employee_covering ON employees(department_id, salary, first_name, last_name);
-- Create partial index for specific conditionsCREATE INDEX idx_high_salary_employees ON employees(salary) WHERE salary > 100000;
-- Create functional indexCREATE INDEX idx_employee_email_domain ON employees(SUBSTRING(email, LOCATE('@', email) + 1));
-- Analyze index usageEXPLAIN FORMAT=JSONSELECT first_name, last_name, salaryFROM employeesWHERE department_id = 1 AND salary > 50000;NoSQL Examples
Section titled “NoSQL Examples”MongoDB Operations
Section titled “MongoDB Operations”Document Structure and Queries
Section titled “Document Structure and Queries”// Insert documents with embedded datadb.employees.insertMany([ { _id: ObjectId(), employee_id: "EMP001", personal_info: { first_name: "John", last_name: "Doe", email: "john.doe@company.com", phone: "+1-555-0123", }, employment: { hire_date: ISODate("2023-01-15"), salary: 75000, department: "Engineering", position: "Software Engineer", }, skills: ["JavaScript", "Python", "React", "Node.js"], projects: [ { name: "E-commerce Platform", role: "Lead Developer", start_date: ISODate("2023-02-01"), end_date: ISODate("2023-08-31"), }, ], performance: { rating: 4.5, goals_met: 8, goals_total: 10, }, },]);
// Complex aggregation pipelinedb.employees.aggregate([ // Match employees in Engineering department { $match: { "employment.department": "Engineering" } },
// Unwind skills array to analyze individual skills { $unwind: "$skills" },
// Group by skill and calculate statistics { $group: { _id: "$skills", count: { $sum: 1 }, avg_salary: { $avg: "$employment.salary" }, max_salary: { $max: "$employment.salary" }, min_salary: { $min: "$employment.salary" }, }, },
// Sort by count descending { $sort: { count: -1 } },
// Limit to top 10 skills { $limit: 10 },]);
// Text search with scoringdb.employees .find( { $text: { $search: "JavaScript React Node.js" } }, { score: { $meta: "textScore" }, "personal_info.first_name": 1, "personal_info.last_name": 1, "employment.position": 1, skills: 1, } ) .sort({ score: { $meta: "textScore" } });
// Geospatial queries (if location data exists)db.employees.find({ location: { $near: { $geometry: { type: "Point", coordinates: [-74.0059, 40.7128], // New York coordinates }, $maxDistance: 10000, // 10km radius }, },});Redis Operations
Section titled “Redis Operations”Caching Patterns
Section titled “Caching Patterns”import redisimport jsonimport time
# Connect to Redisr = redis.Redis(host='localhost', port=6379, db=0)
# Cache-aside patterndef get_employee(employee_id): # Try to get from cache first cached_data = r.get(f"employee:{employee_id}") if cached_data: return json.loads(cached_data)
# If not in cache, get from database employee_data = database.get_employee(employee_id)
# Store in cache with expiration r.setex(f"employee:{employee_id}", 3600, json.dumps(employee_data))
return employee_data
# Write-through patterndef update_employee(employee_id, data): # Update database database.update_employee(employee_id, data)
# Update cache r.setex(f"employee:{employee_id}", 3600, json.dumps(data))
# Cache invalidationdef delete_employee(employee_id): # Delete from database database.delete_employee(employee_id)
# Remove from cache r.delete(f"employee:{employee_id}")
# Session storagedef create_user_session(user_id, session_data): session_id = f"session:{user_id}:{int(time.time())}" r.setex(session_id, 1800, json.dumps(session_data)) # 30 minutes return session_id
# Rate limitingdef check_rate_limit(user_id, limit=100, window=3600): key = f"rate_limit:{user_id}" current = r.incr(key)
if current == 1: r.expire(key, window)
return current <= limitDatabase Design Patterns
Section titled “Database Design Patterns”Repository Pattern Implementation
Section titled “Repository Pattern Implementation”Python Example
Section titled “Python Example”from abc import ABC, abstractmethodfrom typing import List, Optional, Dict, Anyimport sqlite3
class EmployeeRepository(ABC): @abstractmethod def find_by_id(self, employee_id: int) -> Optional[Dict[str, Any]]: pass
@abstractmethod def find_by_department(self, department_id: int) -> List[Dict[str, Any]]: pass
@abstractmethod def save(self, employee: Dict[str, Any]) -> int: pass
@abstractmethod def update(self, employee_id: int, employee: Dict[str, Any]) -> bool: pass
@abstractmethod def delete(self, employee_id: int) -> bool: pass
class SQLiteEmployeeRepository(EmployeeRepository): def __init__(self, db_path: str): self.db_path = db_path
def _get_connection(self): return sqlite3.connect(self.db_path)
def find_by_id(self, employee_id: int) -> Optional[Dict[str, Any]]: with self._get_connection() as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute( "SELECT * FROM employees WHERE id = ?", (employee_id,) ) row = cursor.fetchone() return dict(row) if row else None
def find_by_department(self, department_id: int) -> List[Dict[str, Any]]: with self._get_connection() as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute( "SELECT * FROM employees WHERE department_id = ?", (department_id,) ) rows = cursor.fetchall() return [dict(row) for row in rows]
def save(self, employee: Dict[str, Any]) -> int: with self._get_connection() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO employees (first_name, last_name, email, department_id, salary) VALUES (?, ?, ?, ?, ?) """, ( employee['first_name'], employee['last_name'], employee['email'], employee['department_id'], employee['salary'] )) return cursor.lastrowid
def update(self, employee_id: int, employee: Dict[str, Any]) -> bool: with self._get_connection() as conn: cursor = conn.cursor() cursor.execute(""" UPDATE employees SET first_name = ?, last_name = ?, email = ?, department_id = ?, salary = ? WHERE id = ? """, ( employee['first_name'], employee['last_name'], employee['email'], employee['department_id'], employee['salary'], employee_id )) return cursor.rowcount > 0
def delete(self, employee_id: int) -> bool: with self._get_connection() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM employees WHERE id = ?", (employee_id,)) return cursor.rowcount > 0
# Usage examplerepo = SQLiteEmployeeRepository("company.db")employee = repo.find_by_id(1)employees = repo.find_by_department(1)Unit of Work Pattern
Section titled “Unit of Work Pattern”class UnitOfWork: def __init__(self, repository: EmployeeRepository): self.repository = repository self.new_objects = [] self.dirty_objects = [] self.removed_objects = []
def register_new(self, employee: Dict[str, Any]): self.new_objects.append(employee)
def register_dirty(self, employee: Dict[str, Any]): if employee not in self.dirty_objects: self.dirty_objects.append(employee)
def register_removed(self, employee_id: int): self.removed_objects.append(employee_id)
def commit(self): try: # Insert new objects for employee in self.new_objects: self.repository.save(employee)
# Update dirty objects for employee in self.dirty_objects: self.repository.update(employee['id'], employee)
# Delete removed objects for employee_id in self.removed_objects: self.repository.delete(employee_id)
# Clear tracking self.new_objects.clear() self.dirty_objects.clear() self.removed_objects.clear()
except Exception as e: # Rollback logic would go here raise eDatabase Migration Examples
Section titled “Database Migration Examples”SQL Migration Scripts
Section titled “SQL Migration Scripts”-- Migration: Add employee skills table-- Version: 1.1.0-- Date: 2024-01-15
-- Create skills tableCREATE TABLE skills ( id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(100) NOT NULL UNIQUE, category VARCHAR(50), description TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
-- Create employee_skills junction tableCREATE TABLE employee_skills ( employee_id INT NOT NULL, skill_id INT NOT NULL, proficiency_level ENUM('Beginner', 'Intermediate', 'Advanced', 'Expert') DEFAULT 'Beginner', years_experience INT DEFAULT 0, certified BOOLEAN DEFAULT FALSE, certified_date DATE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (employee_id, skill_id), FOREIGN KEY (employee_id) REFERENCES employees(id) ON DELETE CASCADE, FOREIGN KEY (skill_id) REFERENCES skills(id) ON DELETE CASCADE);
-- Insert initial skills dataINSERT INTO skills (name, category, description) VALUES('JavaScript', 'Programming', 'High-level programming language'),('Python', 'Programming', 'General-purpose programming language'),('React', 'Frontend', 'JavaScript library for building user interfaces'),('Node.js', 'Backend', 'JavaScript runtime for server-side development'),('SQL', 'Database', 'Structured Query Language for database management'),('MongoDB', 'Database', 'NoSQL document database'),('Docker', 'DevOps', 'Containerization platform'),('AWS', 'Cloud', 'Amazon Web Services cloud platform');
-- Add indexes for performanceCREATE INDEX idx_employee_skills_employee ON employee_skills(employee_id);CREATE INDEX idx_employee_skills_skill ON employee_skills(skill_id);CREATE INDEX idx_employee_skills_proficiency ON employee_skills(proficiency_level);Database Schema Versioning
Section titled “Database Schema Versioning”class DatabaseMigrator: def __init__(self, connection): self.connection = connection self.migrations = []
def add_migration(self, version, description, up_sql, down_sql): self.migrations.append({ 'version': version, 'description': description, 'up_sql': up_sql, 'down_sql': down_sql })
def get_current_version(self): cursor = self.connection.cursor() cursor.execute("SELECT version FROM schema_migrations ORDER BY version DESC LIMIT 1") result = cursor.fetchone() return result[0] if result else 0
def migrate_up(self, target_version=None): current_version = self.get_current_version() migrations_to_run = [ m for m in self.migrations if m['version'] > current_version and (target_version is None or m['version'] <= target_version) ]
for migration in sorted(migrations_to_run, key=lambda x: x['version']): print(f"Running migration {migration['version']}: {migration['description']}") cursor = self.connection.cursor() cursor.execute(migration['up_sql']) cursor.execute( "INSERT INTO schema_migrations (version, description) VALUES (?, ?)", (migration['version'], migration['description']) ) self.connection.commit()
def migrate_down(self, target_version): current_version = self.get_current_version() migrations_to_rollback = [ m for m in self.migrations if m['version'] > target_version and m['version'] <= current_version ]
for migration in sorted(migrations_to_rollback, key=lambda x: x['version'], reverse=True): print(f"Rolling back migration {migration['version']}: {migration['description']}") cursor = self.connection.cursor() cursor.execute(migration['down_sql']) cursor.execute( "DELETE FROM schema_migrations WHERE version = ?", (migration['version'],) ) self.connection.commit()
# Usagemigrator = DatabaseMigrator(connection)
# Add migrationsmigrator.add_migration( version=1, description="Create employees table", up_sql="CREATE TABLE employees (id INT PRIMARY KEY, name VARCHAR(100))", down_sql="DROP TABLE employees")
migrator.add_migration( version=2, description="Add email column to employees", up_sql="ALTER TABLE employees ADD COLUMN email VARCHAR(100)", down_sql="ALTER TABLE employees DROP COLUMN email")
# Run migrationsmigrator.migrate_up()Performance Optimization Examples
Section titled “Performance Optimization Examples”Query Optimization
Section titled “Query Optimization”-- Before: Inefficient querySELECT e.first_name, e.last_name, d.name as department_nameFROM employees eJOIN departments d ON e.department_id = d.idWHERE e.salary > 50000ORDER BY e.salary DESC;
-- After: Optimized with proper indexing-- Create covering indexCREATE INDEX idx_employee_salary_dept ON employees(salary DESC, department_id, first_name, last_name);
-- Optimized querySELECT e.first_name, e.last_name, d.name as department_nameFROM employees eJOIN departments d ON e.department_id = d.idWHERE e.salary > 50000ORDER BY e.salary DESCLIMIT 100;
-- Use EXPLAIN to analyze query planEXPLAIN FORMAT=JSONSELECT e.first_name, e.last_name, d.name as department_nameFROM employees eJOIN departments d ON e.department_id = d.idWHERE e.salary > 50000ORDER BY e.salary DESC;Connection Pooling
Section titled “Connection Pooling”import psycopg2from psycopg2 import poolimport threadingimport time
class DatabaseConnectionPool: def __init__(self, min_connections=5, max_connections=20, **kwargs): self.connection_pool = psycopg2.pool.ThreadedConnectionPool( min_connections, max_connections, **kwargs ) self.lock = threading.Lock()
def get_connection(self): return self.connection_pool.getconn()
def return_connection(self, connection): self.connection_pool.putconn(connection)
def close_all(self): self.connection_pool.closeall()
# Usage with context managerclass DatabaseContext: def __init__(self, pool): self.pool = pool self.connection = None
def __enter__(self): self.connection = self.pool.get_connection() return self.connection
def __exit__(self, exc_type, exc_val, exc_tb): if self.connection: self.pool.return_connection(self.connection)
# Example usagepool = DatabaseConnectionPool( min_connections=5, max_connections=20, host='localhost', database='company', user='admin', password='password')
def execute_query(query, params=None): with DatabaseContext(pool) as conn: cursor = conn.cursor() cursor.execute(query, params) return cursor.fetchall()Database Security Examples
Section titled “Database Security Examples”SQL Injection Prevention
Section titled “SQL Injection Prevention”import sqlite3import hashlibimport secrets
class SecureDatabase: def __init__(self, db_path): self.connection = sqlite3.connect(db_path) self.connection.row_factory = sqlite3.Row
def authenticate_user(self, username, password): # Use parameterized queries to prevent SQL injection cursor = self.connection.cursor() cursor.execute( "SELECT id, username, password_hash, salt FROM users WHERE username = ?", (username,) ) user = cursor.fetchone()
if user: # Verify password with salt password_hash = self._hash_password(password, user['salt']) if password_hash == user['password_hash']: return user return None
def create_user(self, username, password): # Generate random salt salt = secrets.token_hex(16) password_hash = self._hash_password(password, salt)
cursor = self.connection.cursor() cursor.execute( "INSERT INTO users (username, password_hash, salt) VALUES (?, ?, ?)", (username, password_hash, salt) ) self.connection.commit() return cursor.lastrowid
def _hash_password(self, password, salt): # Use PBKDF2 for password hashing return hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000).hex()
def search_employees_safe(self, search_term): # Safe search with parameterized query cursor = self.connection.cursor() cursor.execute( "SELECT * FROM employees WHERE first_name LIKE ? OR last_name LIKE ?", (f"%{search_term}%", f"%{search_term}%") ) return cursor.fetchall()
# Example of what NOT to do (vulnerable to SQL injection)def search_employees_unsafe(connection, search_term): cursor = connection.cursor() # This is vulnerable to SQL injection! query = f"SELECT * FROM employees WHERE first_name LIKE '%{search_term}%'" cursor.execute(query) return cursor.fetchall()Data Encryption
Section titled “Data Encryption”from cryptography.fernet import Fernetimport base64import os
class DatabaseEncryption: def __init__(self): # Generate or load encryption key self.key = self._get_or_create_key() self.cipher = Fernet(self.key)
def _get_or_create_key(self): key_file = 'encryption.key' if os.path.exists(key_file): with open(key_file, 'rb') as f: return f.read() else: key = Fernet.generate_key() with open(key_file, 'wb') as f: f.write(key) return key
def encrypt_sensitive_data(self, data): """Encrypt sensitive data before storing""" if isinstance(data, str): data = data.encode() return self.cipher.encrypt(data)
def decrypt_sensitive_data(self, encrypted_data): """Decrypt sensitive data when retrieving""" decrypted = self.cipher.decrypt(encrypted_data) return decrypted.decode()
def store_encrypted_employee(self, connection, employee_data): """Store employee with encrypted sensitive fields""" cursor = connection.cursor()
# Encrypt sensitive fields encrypted_ssn = self.encrypt_sensitive_data(employee_data['ssn']) encrypted_salary = self.encrypt_sensitive_data(str(employee_data['salary']))
cursor.execute(""" INSERT INTO employees (first_name, last_name, email, encrypted_ssn, encrypted_salary) VALUES (?, ?, ?, ?, ?) """, ( employee_data['first_name'], employee_data['last_name'], employee_data['email'], encrypted_ssn, encrypted_salary )) connection.commit()
def retrieve_encrypted_employee(self, connection, employee_id): """Retrieve and decrypt employee data""" cursor = connection.cursor() cursor.execute( "SELECT * FROM employees WHERE id = ?", (employee_id,) ) row = cursor.fetchone()
if row: # Decrypt sensitive fields decrypted_ssn = self.decrypt_sensitive_data(row['encrypted_ssn']) decrypted_salary = float(self.decrypt_sensitive_data(row['encrypted_salary']))
return { 'id': row['id'], 'first_name': row['first_name'], 'last_name': row['last_name'], 'email': row['email'], 'ssn': decrypted_ssn, 'salary': decrypted_salary } return NoneThese examples demonstrate practical database operations, design patterns, and security measures that are commonly used in real-world applications.