Part VII: Real-World Applications
Chapter 19: Case Study - Log File Analyzer
The Problem: Understanding Server Behavior
You've been asked to analyze log files from a web application. Each log entry is JSON, but the structure varies by event type. Some events are nested several levels deep. You need to:
- Find all error events across any log level
- Extract user IDs regardless of where they appear
- Calculate response times, which might be in different formats
- Identify patterns in failed authentication attempts
Here's a sample log entry:
{
"timestamp": "2025-11-10T14:23:11Z",
"level": "error",
"service": "api-gateway",
"event": {
"type": "request_failed",
"details": {
"user": {"id": "user_123", "session": "abc"},
"request": {"path": "/api/data", "method": "GET"},
"error": {"code": 500, "message": "Database timeout"}
},
"metadata": {"retry_count": 3, "duration_ms": 5000}
}
}
But you've also seen entries like this:
{
"timestamp": "2025-11-10T14:25:03Z",
"level": "info",
"auth": {"user_id": "user_456", "result": "success"}
}
First Steps: Explore Before You Code
Let's follow Rule #2: The Path to Wisdom and start with the obvious approach before seeking elegance.
import json
# Load one entry and explore it
with open('sample.log', 'r') as f:
entry = json.loads(f.readline())
print(type(entry)) # dict
print(entry.keys()) # See what's at the top level
# Try accessing what we know
print(entry['level']) # Works
print(entry.get('event', {}).get('details', {}).get('user', {}).get('id'))
# This works, but is fragile and hard to read
The problem reveals itself: we don't know where user IDs live in advance. This is a search problem, not a navigation problem.
The Simple Path: Recursive Search
Let's build a function that finds all occurrences of a key, no matter how deeply nested:
def find_all_values(data, target_key):
"""Find all values for a key in nested structure."""
results = []
# What type are we looking at?
if isinstance(data, dict):
# Check each key-value pair
for key, value in data.items():
if key == target_key:
results.append(value)
# Recurse into the value
results.extend(find_all_values(value, target_key))
elif isinstance(data, list):
# Check each item in list
for item in data:
results.extend(find_all_values(item, target_key))
# Base case: primitive types, just return empty list
return results
# Test it
user_ids = find_all_values(entry, 'user_id')
print(f"Found user IDs: {user_ids}")
# Also find 'id' keys (might catch user.id)
ids = find_all_values(entry, 'id')
print(f"Found IDs: {ids}")
The Living Code moment: Notice how we handle three types? Let's pause and make this clearer:
def find_all_values(data, target_key):
"""Find all values for a key in nested structure.
Handles dicts, lists, and primitives gracefully.
"""
results = []
if isinstance(data, dict):
for key, value in data.items():
if key == target_key:
results.append(value)
# Always recurse - value might contain more matches
results.extend(find_all_values(value, target_key))
elif isinstance(data, list):
for item in data:
results.extend(find_all_values(item, target_key))
# If it's neither dict nor list, we're done (base case)
return results
Applying It: Processing All Logs
Now let's apply Rule #5: The Gentle Ascent. We'll build this incrementally.
Step 1: Read and parse logs
def load_logs(filename):
"""Load log entries from a JSONL file."""
entries = []
with open(filename, 'r') as f:
for line in f:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
# Skip malformed lines
continue
return entries
logs = load_logs('server.log')
print(f"Loaded {len(logs)} entries")
Run this. See it work. Feel good about it.
Step 2: Extract user activity
def extract_user_activity(logs):
"""Find all user-related events."""
user_events = []
for entry in logs:
# Find user IDs - they might be under 'user_id' or nested in 'user'
user_ids = find_all_values(entry, 'user_id')
# Also check for 'id' keys inside 'user' objects
user_objects = find_all_values(entry, 'user')
for user_obj in user_objects:
if isinstance(user_obj, dict) and 'id' in user_obj:
user_ids.append(user_obj['id'])
if user_ids:
user_events.append({
'timestamp': entry['timestamp'],
'user_ids': list(set(user_ids)), # Deduplicate
'level': entry.get('level', 'unknown'),
'entry': entry
})
return user_events
user_activity = extract_user_activity(logs)
print(f"Found {len(user_activity)} user-related events")
Step 3: Find error patterns
def find_errors(logs):
"""Extract all error events with context."""
errors = []
for entry in logs:
if entry.get('level') == 'error':
# Extract error details
error_messages = find_all_values(entry, 'message')
error_codes = find_all_values(entry, 'code')
errors.append({
'timestamp': entry['timestamp'],
'messages': error_messages,
'codes': error_codes,
'full_entry': entry
})
return errors
errors = find_errors(logs)
print(f"Found {len(errors)} errors")
The More Elegant Path
Following Rule #2 again, let's look at what we've learned and improve it. Our current approach has repetition. Let's create a more flexible tool:
def extract_fields(data, field_spec):
"""Extract multiple fields from nested data.
Args:
data: Nested dict/list structure
field_spec: Dict mapping field names to extraction paths
Returns:
Dict with extracted values
"""
result = {}
for field_name, key_to_find in field_spec.items():
values = find_all_values(data, key_to_find)
# Take first value if we only expect one
result[field_name] = values[0] if len(values) == 1 else values
return result
# Now our extraction is cleaner
for entry in logs[:5]: # Test on first 5
fields = extract_fields(entry, {
'user_id': 'user_id',
'error_code': 'code',
'duration': 'duration_ms'
})
print(fields)
Why is this better? We can now declare what we want without repeating how to get it.
Handling the Messy Reality
Let's apply Rule #4: The Virtuous Flaw. Our code works on clean data, but what about this?
broken_entry = {
"timestamp": "2025-11-10T14:30:00Z",
"level": "error",
"event": {
"details": {
"user": "user_789", # Not a dict! Just a string
"error": None # Missing error details
}
}
}
Run our extractor on this. Watch it work, but return unexpected results. Now let's write a test:
def test_handles_irregular_structure():
"""Ensure we handle non-dict user fields."""
entry = {
"event": {
"details": {
"user": "user_789", # String, not dict
}
}
}
# This shouldn't crash
result = find_all_values(entry, 'user')
assert result == ["user_789"]
# And we shouldn't find 'id' since there's no dict
ids = find_all_values(entry, 'id')
assert ids == []
test_handles_irregular_structure()
print("✓ Test passed")
Good! Our recursive function already handles this because we only recurse into dicts and lists.
The Complete Analyzer
Now let's put it together with Rule #5: The Gentle Ascent in mind:
class LogAnalyzer:
"""Analyze structured log files."""
def __init__(self, log_file):
self.logs = self._load_logs(log_file)
def _load_logs(self, filename):
"""Load and parse log entries."""
entries = []
with open(filename, 'r') as f:
for i, line in enumerate(f, 1):
try:
entries.append(json.loads(line))
except json.JSONDecodeError as e:
print(f"Warning: Skipped malformed line {i}: {e}")
return entries
def find_field_values(self, field_name):
"""Find all occurrences of a field across all logs."""
all_values = []
for entry in self.logs:
values = find_all_values(entry, field_name)
all_values.extend(values)
return all_values
def filter_by_level(self, level):
"""Return all entries with specified log level."""
return [e for e in self.logs if e.get('level') == level]
def user_error_summary(self):
"""Summarize errors by user."""
errors = self.filter_by_level('error')
summary = {}
for error in errors:
user_ids = find_all_values(error, 'user_id')
# Also check nested user objects
for user_obj in find_all_values(error, 'user'):
if isinstance(user_obj, dict) and 'id' in user_obj:
user_ids.append(user_obj['id'])
for user_id in user_ids:
if user_id not in summary:
summary[user_id] = {'count': 0, 'errors': []}
summary[user_id]['count'] += 1
summary[user_id]['errors'].append(error)
return summary
# Use it
analyzer = LogAnalyzer('server.log')
print(f"Analyzed {len(analyzer.logs)} log entries")
# Find all errors
errors = analyzer.filter_by_level('error')
print(f"Found {len(errors)} errors")
# Which users experienced errors?
user_errors = analyzer.user_error_summary()
for user_id, data in user_errors.items():
print(f"User {user_id}: {data['count']} errors")
Optimization Journey
Our initial version loads all logs into memory. For large files, this won't work. Let's improve it using Rule #3: The Living Code:
def analyze_logs_streaming(filename, target_field):
"""Analyze logs without loading all into memory."""
field_counts = {}
with open(filename, 'r') as f:
for line in f:
try:
entry = json.loads(line)
values = find_all_values(entry, target_field)
for value in values:
field_counts[value] = field_counts.get(value, 0) + 1
except json.JSONDecodeError:
continue
return field_counts
# This processes logs one at a time
user_counts = analyze_logs_streaming('large_server.log', 'user_id')
print(f"Found {len(user_counts)} unique users")
Why is this better? It uses constant memory regardless of file size. Trade-off: we can only do one analysis pass at a time.
Lessons Learned
- Exploration first: We spent time understanding the data structure before writing complex code
- Simple, then elegant: The recursive
find_all_valuesfunction solved our core problem cleanly - Test the edges: Irregular data revealed assumptions we didn't know we had
- Incremental building: Each step produced a working tool we could verify
- Optimization when needed: We started with the clear approach, then optimized for real constraints
Your Turn: Extend It
Try these extensions:
- Add a method to find events within a time range
- Track which paths through the nested structure actually contained values
- Build a report showing the most common error types
- Handle compressed log files (
.gz)
Chapter 20: Case Study - Documentation Generator
The Problem: Auto-Document Python Code
Your team writes Python libraries but documentation lags behind code changes. You need to:
- Extract all function definitions with their signatures
- Find docstrings and parameter descriptions
- Identify which functions are public (no leading underscore)
- Generate Markdown documentation automatically
Here's sample code to document:
def calculate_score(player_data, bonus_multiplier=1.0):
"""Calculate final score with bonuses.
Args:
player_data: Dict with 'base_score' and 'achievements'
bonus_multiplier: Multiplier for bonus points
Returns:
Final score as integer
"""
base = player_data['base_score']
bonus = len(player_data.get('achievements', [])) * 100
return int(base + bonus * bonus_multiplier)
def _internal_helper(value):
"""Internal function - not for public use."""
return value * 2
Exploration: Understanding AST Structure
Following our principles, let's explore before building:
import tree_sitter_python as tspython
from tree_sitter import Language, Parser
# Set up parser
PY_LANGUAGE = Language(tspython.language())
parser = Parser(PY_LANGUAGE)
# Parse sample code
source_code = b'''
def calculate_score(player_data, bonus_multiplier=1.0):
"""Calculate final score with bonuses."""
return player_data['base_score']
'''
tree = parser.parse(source_code)
root = tree.root_node
# What's at the root?
print(f"Root type: {root.type}")
print(f"Number of children: {len(root.children)}")
# Look at the first child
if root.children:
first_child = root.children[0]
print(f"\nFirst child type: {first_child.type}")
print(f"First child has {len(first_child.children)} children")
Output shows us:
Root type: module
Number of children: 1
First child type: function_definition
First child has 5 children
Let's dig deeper into that function definition:
func_node = root.children[0]
for i, child in enumerate(func_node.children):
print(f"{i}: {child.type:20} | {source_code[child.start_byte:child.end_byte]}")
This reveals the structure:
0: def | b'def'
1: identifier | b'calculate_score'
2: parameters | b'(player_data, bonus_multiplier=1.0)'
3: : | b':'
4: block | b'""" ... return ...'
Key insight: Function parts are predictable children. We can use direct access!
The Simple Path: Direct Extraction
def extract_function_info(func_node, source):
"""Extract info from a function_definition node."""
# Get function name (child at index 1)
name_node = func_node.children[1]
name = source[name_node.start_byte:name_node.end_byte].decode('utf8')
# Get parameters (child at index 2)
params_node = func_node.children[2]
params = source[params_node.start_byte:params_node.end_byte].decode('utf8')
# Find the block (contains body and docstring)
block_node = func_node.children[4]
# Check if first statement is a docstring
docstring = None
if block_node.children:
first_stmt = block_node.children[0]
if first_stmt.type == 'expression_statement':
# Check if it's a string
expr = first_stmt.children[0]
if expr.type == 'string':
doc_bytes = source[expr.start_byte:expr.end_byte]
docstring = doc_bytes.decode('utf8').strip('"""').strip("'''").strip()
return {
'name': name,
'parameters': params,
'docstring': docstring,
'is_public': not name.startswith('_')
}
# Test it
func_info = extract_function_info(func_node, source_code)
print(func_info)
Applying Rule #4: The Virtuous Flaw
Our code works for the happy path. But what if there's no docstring?
source_with_no_doc = b'''
def helper(x):
return x * 2
'''
tree = parser.parse(source_with_no_doc)
func_node = tree.root_node.children[0]
# This might crash if we access children[0] without checking
try:
info = extract_function_info(func_node, source_with_no_doc)
print(info)
except IndexError as e:
print(f"Bug found: {e}")
Write a test that catches this:
def test_function_without_docstring():
"""Ensure we handle functions without docstrings."""
source = b'def f(x):\n return x'
tree = parser.parse(source)
func = tree.root_node.children[0]
info = extract_function_info(func, source)
assert info['docstring'] is None
assert info['name'] == 'f'
test_function_without_docstring()
print("✓ Test passed")
Good! Our code already handles this because we check if block_node.children first.
Finding All Functions: Search Pattern
Now we need to find functions anywhere in a file:
def find_all_functions(node):
"""Recursively find all function_definition nodes."""
functions = []
# Is this node a function definition?
if node.type == 'function_definition':
functions.append(node)
# Recurse into children
for child in node.children:
functions.extend(find_all_functions(child))
return functions
# Parse a file with multiple functions
multi_func_source = b'''
def public_func(x):
"""Public function."""
return x
def _private_func(y):
"""Private function."""
return y * 2
class MyClass:
def method(self):
"""A method."""
pass
'''
tree = parser.parse(multi_func_source)
all_funcs = find_all_functions(tree.root_node)
print(f"Found {len(all_funcs)} functions")
for func in all_funcs:
info = extract_function_info(func, multi_func_source)
print(f"- {info['name']} (public: {info['is_public']})")
Building the Documentation Generator
Let's combine everything into a working tool:
class PythonDocGenerator:
"""Generate documentation from Python source files."""
def __init__(self):
self.parser = Parser(Language(tspython.language()))
def parse_file(self, filepath):
"""Parse a Python file and return the AST."""
with open(filepath, 'rb') as f:
source = f.read()
return self.parser.parse(source), source
def extract_functions(self, node, source):
"""Extract all functions with their info."""
functions = []
def recurse(n):
if n.type == 'function_definition':
info = self._extract_function_details(n, source)
functions.append(info)
for child in n.children:
recurse(child)
recurse(node)
return functions
def _extract_function_details(self, func_node, source):
"""Extract detailed info from function node."""
# Name
name_node = func_node.child_by_field_name('name')
name = source[name_node.start_byte:name_node.end_byte].decode('utf8')
# Parameters
params_node = func_node.child_by_field_name('parameters')
params = source[params_node.start_byte:params_node.end_byte].decode('utf8')
# Body and docstring
body_node = func_node.child_by_field_name('body')
docstring = self._extract_docstring(body_node, source)
return {
'name': name,
'signature': f"{name}{params}",
'docstring': docstring,
'is_public': not name.startswith('_'),
'line': func_node.start_point[0] + 1
}
def _extract_docstring(self, body_node, source):
"""Extract docstring from function body."""
if not body_node or not body_node.children:
return None
# First statement might be docstring
first = body_node.children[0]
if first.type == 'expression_statement':
expr = first.children[0] if first.children else None
if expr and expr.type == 'string':
doc_text = source[expr.start_byte:expr.end_byte].decode('utf8')
# Remove quotes
return doc_text.strip('"""').strip("'''").strip()
return None
def generate_markdown(self, filepath, include_private=False):
"""Generate Markdown documentation for a file."""
tree, source = self.parse_file(filepath)
functions = self.extract_functions(tree.root_node, source)
# Filter out private functions if requested
if not include_private:
functions = [f for f in functions if f['is_public']]
# Generate markdown
lines = [f"# Documentation: {filepath}\n"]
for func in functions:
lines.append(f"## `{func['signature']}`\n")
if func['docstring']:
lines.append(func['docstring'])
lines.append("")
else:
lines.append("*No documentation provided*\n")
lines.append(f"*Defined at line {func['line']}*\n")
return "\n".join(lines)
# Use it
generator = PythonDocGenerator()
docs = generator.generate_markdown('mymodule.py')
print(docs)
# Save to file
with open('API.md', 'w') as f:
f.write(docs)
Handling Edge Cases
Let's make our tool more robust:
def test_nested_functions():
"""Ensure we find nested function definitions."""
source = b'''
def outer():
def inner():
"""Nested function."""
pass
return inner
'''
gen = PythonDocGenerator()
tree, _ = gen.parser.parse(source), source
funcs = gen.extract_functions(tree.root_node, source)
# Should find both outer and inner
assert len(funcs) == 2
names = [f['name'] for f in funcs]
assert 'outer' in names
assert 'inner' in names
test_nested_functions()
print("✓ Nested functions handled")
Extending: Parse Parameter Descriptions
Docstrings often have structured parameter docs:
def _parse_docstring_params(docstring):
"""Extract parameter descriptions from docstring.
Expects Google-style docstrings with Args: section.
"""
if not docstring or 'Args:' not in docstring:
return {}
params = {}
in_args_section = False
for line in docstring.split('\n'):
line = line.strip()
if line == 'Args:':
in_args_section = True
continue
if in_args_section:
# Stop at next section
if line.endswith(':') and not line.startswith(' '):
break
# Parse parameter line
if ':' in line:
param_name = line.split(':')[0].strip()
param_desc = line.split(':', 1)[1].strip()
params[param_name] = param_desc
return params
# Test it
doc = """Calculate score.
Args:
player_data: Dict with player info
bonus_multiplier: Multiplier for bonuses
Returns:
Final score
"""
params = _parse_docstring_params(doc)
print(params)
# {'player_data': 'Dict with player info',
# 'bonus_multiplier': 'Multiplier for bonuses'}
Lessons Learned
- Field access is powerful: Using
.child_by_field_name()made extraction reliable - AST structure is predictable: Function definitions always have the same children
- Recursion finds everything: Simple recursive search found all functions, even nested ones
- Docstrings have structure: Parsing them added real value
- Tests catch edge cases: Functions without docstrings, nested functions
Your Turn
Extend the generator to:
- Extract class definitions and their methods
- Parse return type hints
- Generate a table of contents
- Handle async functions
Chapter 21: Case Study - Data Pipeline for Nested APIs
The Problem: Multi-Level Data Collection
You're building a dashboard that pulls data from a REST API. The API has this structure:
/api/teams→ List of team IDs/api/teams/{id}→ Team details with member IDs/api/teams/{id}/members/{member_id}→ Individual member data
You need to:
- Fetch all teams
- For each team, get all members
- For each member, collect their activity data
- Normalize into a flat structure for analysis
This is a nested traversal where each level requires API calls.
Starting Simple: Single-Level Fetch
import requests
import time
API_BASE = "https://api.example.com"
def fetch_teams():
"""Fetch all teams."""
response = requests.get(f"{API_BASE}/api/teams")
response.raise_for_status()
return response.json()
# Test it
teams = fetch_teams()
print(f"Found {len(teams)} teams")
print(f"First team: {teams[0]}")
Output might look like:
Found 3 teams
First team: {'id': 'team_1', 'name': 'Engineering', 'member_count': 5}
Level Two: Fetching Team Details
def fetch_team_details(team_id):
"""Fetch detailed info for a specific team."""
response = requests.get(f"{API_BASE}/api/teams/{team_id}")
response.raise_for_status()
data = response.json()
return data
# Test it
team_details = fetch_team_details('team_1')
print(f"Team name: {team_details['name']}")
print(f"Members: {team_details['members']}")
This might return:
{
'id': 'team_1',
'name': 'Engineering',
'members': [
{'id': 'mem_1', 'name': 'Alice', 'role': 'Lead'},
{'id': 'mem_2', 'name': 'Bob', 'role': 'Developer'}
]
}
The Naive Approach: Nested Loops
Rule #2: Show the simple path first:
def collect_all_data_naive():
"""Collect all data - simple but slow."""
all_data = []
# Get all teams
teams = fetch_teams()
# For each team, get details
for team in teams:
team_data = fetch_team_details(team['id'])
# For each member, get their data
for member in team_data.get('members', []):
member_detail = fetch_member_data(team['id'], member['id'])
# Combine everything
all_data.append({
'team_id': team['id'],
'team_name': team_data['name'],
'member_id': member['id'],
'member_name': member['name'],
'member_role': member['role'],
'member_details': member_detail
})
# Be nice to the API
time.sleep(0.1)
return all_data
def fetch_member_data(team_id, member_id):
"""Fetch individual member data."""
url = f"{API_BASE}/api/teams/{team_id}/members/{member_id}"
response = requests.get(url)
response.raise_for_status()
return response.json()
# Run it
data = collect_all_data_naive()
print(f"Collected data for {len(data)} members")
This works, but it's slow! If you have 10 teams with 10 members each, that's:
- 1 request for teams
- 10 requests for team details
- 100 requests for member details
- 111 sequential requests
The Better Path: Error Handling First
Before optimizing, let's make it robust (Rule #4):
class APIError(Exception):
"""Custom exception for API errors."""
pass
def safe_fetch(url, max_retries=3):
"""Fetch with error handling and retries."""
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
print(f"Timeout on attempt {attempt + 1}/{max_retries}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
else:
raise APIError(f"Failed after {max_retries} attempts")
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
# Resource not found - don't retry
return None
elif e.response.status_code == 429:
# Rate limited - wait longer
wait_time = int(e.response.headers.get('Retry-After', 60))
print(f"Rate limited, waiting {wait_time}s")
time.sleep(wait_time)
else:
raise APIError(f"HTTP error: {e}")
raise APIError("Max retries exceeded")
# Update our functions to use this
def fetch_teams_safe():
"""Fetch teams with error handling."""
data = safe_fetch(f"{API_BASE}/api/teams")
return data if data else []
Write a test to verify this works:
def test_handles_404():
"""Ensure we handle missing resources gracefully."""
# This team doesn't exist
result = safe_fetch(f"{API_BASE}/api/teams/nonexistent")
assert result is None
# Run it (will actually call API, so be careful)
# test_handles_404()
Optimization: Parallel Requests
Now let's apply Rule #2 and show the elegant solution:
from concurrent.futures import ThreadPoolExecutor, as_completed
def collect_all_data_parallel():
"""Collect all data using parallel requests."""
all_data = []
# Step 1: Get all teams
teams = fetch_teams_safe()
print(f"Fetching details for {len(teams)} teams...")
# Step 2: Fetch team details in parallel
team_details = {}
with ThreadPoolExecutor(max_workers=5) as executor:
future_to_team = {
executor.submit(fetch_team_details_safe, team['id']): team['id']
for team in teams
}
for future in as_completed(future_to_team):
team_id = future_to_team[future]
try:
details = future.result()
if details:
team_details[team_id] = details
except Exception as e:
print(f"Error fetching team {team_id}: {e}")
print(f"Got details for {len(team_details)} teams")
# Step 3: Fetch all member data in parallel
member_tasks = []
for team_id, team_data in team_details.items():
for member in team_data.get('members', []):
member_tasks.append({
'team_id': team_id,
'team_name': team_data['name'],
'member': member
})
print(f"Fetching data for {len(member_tasks)} members...")
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_task = {
executor.submit(
fetch_member_data_safe,
task['team_id'],
task['member']['id']
): task
for task in member_tasks
}
for future in as_completed(future_to_task):
task = future_to_task[future]
try:
member_detail = future.result()
if member_detail:
all_data.append({
'team_id': task['team_id'],
'team_name': task['team_name'],
'member_id': task['member']['id'],
'member_name': task['member']['name'],
'member_role': task['member']['role'],
'member_details': member_detail
})
except Exception as e:
print(f"Error fetching member {task['member']['id']}: {e}")
return all_data
def fetch_team_details_safe(team_id):
"""Fetch team details with error handling."""
return safe_fetch(f"{API_BASE}/api/teams/{team_id}")
def fetch_member_data_safe(team_id, member_id):
"""Fetch member data with error handling."""
url = f"{API_BASE}/api/teams/{team_id}/members/{member_id}"
return safe_fetch(url)
# Run it
data = collect_all_data_parallel()
print(f"Collected data for {len(data)} members")
Why is this better?
- 10 teams with 10 members each now makes ~20 parallel requests instead of 111 sequential
- Total time: ~10 seconds instead of ~111 seconds
- Still handles errors gracefully
Transformation: Normalizing the Data
Now we have nested data. Let's flatten it for analysis:
def normalize_member_data(raw_data):
"""Transform nested API data into flat records."""
normalized = []
for record in raw_data:
# Extract nested details
details = record.get('member_details', {})
flat_record = {
'team_id': record['team_id'],
'team_name': record['team_name'],
'member_id': record['member_id'],
'member_name': record['member_name'],
'member_role': record['member_role'],
# Flatten member details
'email': details.get('email'),
'joined_date': details.get('joined_date'),
'active': details.get('active', True),
# Nested stats
'commits': details.get('stats', {}).get('commits', 0),
'reviews': details.get('stats', {}).get('reviews', 0),
'projects': len(details.get('projects', []))
}
normalized.append(flat_record)
return normalized
# Apply it
flat_data = normalize_member_data(data)
# Now you can easily analyze
import pandas as pd
df = pd.DataFrame(flat_data)
print(df.groupby('team_name')['commits'].sum())
Building a Reusable Pipeline
Let's wrap this into a class following Rule #5: The Gentle Ascent:
class APIPipeline:
"""Reusable pipeline for multi-level API data collection."""
def __init__(self, base_url, max_workers=10):
self.base_url = base_url
self.max_workers = max_workers
self.cache = {}
def fetch_with_cache(self, url):
"""Fetch URL with caching."""
if url in self.cache:
return self.cache[url]
data = safe_fetch(url)
if data:
self.cache[url] = data
return data
def fetch_level(self, urls, show_progress=True):
"""Fetch multiple URLs in parallel."""
results = {}
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_url = {
executor.submit(self.fetch_with_cache, url): url
for url in urls
}
completed = 0
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
results[url] = data
completed += 1
if show_progress and completed % 10 == 0:
print(f"Progress: {completed}/{len(urls)}")
except Exception as e:
print(f"Error fetching {url}: {e}")
results[url] = None
return results
def collect_team_data(self):
"""Complete data collection pipeline."""
print("Step 1: Fetching teams...")
teams = self.fetch_with_cache(f"{self.base_url}/api/teams")
if not teams:
return []
print(f"Step 2: Fetching {len(teams)} team details...")
team_urls = [f"{self.base_url}/api/teams/{t['id']}" for t in teams]
team_details = self.fetch_level(team_urls)
print("Step 3: Collecting member URLs...")
member_urls = []
member_context = {}
for url, team_data in team_details.items():
if not team_data:
continue
team_id = team_data['id']
for member in team_data.get('members', []):
member_url = (
f"{self.base_url}/api/teams/{team_id}"
f"/members/{member['id']}"
)
member_urls.append(member_url)
member_context[member_url] = {
'team': team_data,
'member_basic': member
}
print(f"Step 4: Fetching {len(member_urls)} member details...")
member_details = self.fetch_level(member_urls)
print("Step 5: Combining data...")
combined = []
for url, detail in member_details.items():
if not detail:
continue
context = member_context[url]
combined.append({
'team': context['team'],
'member_basic': context['member_basic'],
'member_detail': detail
})
return combined
# Use it
pipeline = APIPipeline("https://api.example.com", max_workers=10)
data = pipeline.collect_team_data()
print(f"\nCollected complete data for {len(data)} members")
print(f"Cache contains {len(pipeline.cache)} entries")
Handling Incremental Updates
What if you want to update just one team's data?
class IncrementalPipeline(APIPipeline):
"""Pipeline that supports incremental updates."""
def __init__(self, base_url, cache_file='cache.json', max_workers=10):
super().__init__(base_url, max_workers)
self.cache_file = cache_file
self.load_cache()
def load_cache(self):
"""Load cache from disk."""
try:
with open(self.cache_file, 'r') as f:
self.cache = json.load(f)
print(f"Loaded {len(self.cache)} cached entries")
except FileNotFoundError:
self.cache = {}
def save_cache(self):
"""Save cache to disk."""
with open(self.cache_file, 'w') as f:
json.dump(self.cache, f)
print(f"Saved {len(self.cache)} entries to cache")
def invalidate_team(self, team_id):
"""Remove team and its members from cache."""
keys_to_remove = [
k for k in self.cache.keys()
if f'/teams/{team_id}' in k
]
for key in keys_to_remove:
del self.cache[key]
print(f"Invalidated {len(keys_to_remove)} cache entries")
def update_team(self, team_id):
"""Update just one team's data."""
# Invalidate old data
self.invalidate_team(team_id)
# Fetch fresh data
team_url = f"{self.base_url}/api/teams/{team_id}"
team_data = self.fetch_with_cache(team_url)
if not team_data:
return []
# Fetch member details
member_urls = [
f"{self.base_url}/api/teams/{team_id}/members/{m['id']}"
for m in team_data.get('members', [])
]
member_details = self.fetch_level(member_urls, show_progress=False)
# Save updated cache
self.save_cache()
return member_details
# Use it
pipeline = IncrementalPipeline("https://api.example.com")
# Full update
all_data = pipeline.collect_team_data()
pipeline.save_cache()
# Later: update just one team
updated = pipeline.update_team('team_1')
print(f"Updated {len(updated)} members")
Production Considerations
Let's add monitoring and rate limiting:
import logging
from datetime import datetime, timedelta
class ProductionPipeline(IncrementalPipeline):
"""Production-ready pipeline with monitoring."""
def __init__(self, base_url, cache_file='cache.json',
max_workers=10, rate_limit_per_second=10):
super().__init__(base_url, cache_file, max_workers)
self.rate_limit = rate_limit_per_second
self.request_times = []
# Set up logging
logging.basicConfig(
filename='pipeline.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def _wait_for_rate_limit(self):
"""Ensure we don't exceed rate limit."""
now = datetime.now()
# Remove requests older than 1 second
self.request_times = [
t for t in self.request_times
if now - t < timedelta(seconds=1)
]
# If at limit, wait
if len(self.request_times) >= self.rate_limit:
sleep_time = 1.0 - (now - self.request_times[0]).total_seconds()
if sleep_time > 0:
time.sleep(sleep_time)
self.request_times.append(now)
def fetch_with_cache(self, url):
"""Fetch with rate limiting and logging."""
self._wait_for_rate_limit()
self.logger.info(f"Fetching: {url}")
try:
data = super().fetch_with_cache(url)
self.logger.info(f"Success: {url}")
return data
except Exception as e:
self.logger.error(f"Failed: {url} - {e}")
raise
def collect_team_data(self):
"""Collect with timing information."""
start_time = datetime.now()
self.logger.info("Starting data collection")
try:
data = super().collect_team_data()
duration = (datetime.now() - start_time).total_seconds()
self.logger.info(
f"Completed: {len(data)} records in {duration:.1f}s"
)
return data
except Exception as e:
self.logger.error(f"Collection failed: {e}")
raise
# Use it
pipeline = ProductionPipeline(
"https://api.example.com",
max_workers=5,
rate_limit_per_second=10
)
data = pipeline.collect_team_data()
Lessons Learned
- Start sequential: The naive nested loop approach worked and was easy to understand
- Add error handling before optimization: Robust code > fast code
- Parallelize wisely: ThreadPoolExecutor made it 10x faster with minimal complexity
- Cache aggressively: Repeated requests are wasteful
- Make it incremental: Production systems need partial updates
- Monitor everything: Logging reveals issues before users do
Your Turn: Extend It
Try these challenges:
- Add retry with exponential backoff for transient failures
- Implement pagination for APIs that return data in pages
- Add a progress bar using the
tqdmlibrary - Handle authentication tokens that expire
- Write tests using
responseslibrary to mock API calls
Example test structure:
import responses
@responses.activate
def test_handles_empty_teams():
"""Ensure pipeline handles empty team list."""
responses.add(
responses.GET,
'https://api.example.com/api/teams',
json=[],
status=200
)
pipeline = APIPipeline('https://api.example.com')
data = pipeline.collect_team_data()
assert data == []
@responses.activate
def test_handles_team_with_no_members():
"""Ensure pipeline handles teams without members."""
responses.add(
responses.GET,
'https://api.example.com/api/teams',
json=[{'id': 'team_1', 'name': 'Empty Team'}],
status=200
)
responses.add(
responses.GET,
'https://api.example.com/api/teams/team_1',
json={'id': 'team_1', 'name': 'Empty Team', 'members': []},
status=200
)
pipeline = APIPipeline('https://api.example.com')
data = pipeline.collect_team_data()
assert data == []
Reflection on Part VII
What We Learned Across All Three Case Studies
The Pattern of Mastery:
- Explore first (Rule #1: The Designer's Mind)
- Log analyzer: We examined the JSON structure before writing extraction code
- Doc generator: We parsed sample files to understand AST node relationships
-
API pipeline: We made single requests before building the full traversal
-
Simple, then elegant (Rule #2: The Path to Wisdom)
- Log analyzer: Basic recursive search → Generic field extraction
- Doc generator: Direct child access → Reusable extractor class
-
API pipeline: Nested loops → Parallel fetching with caching
-
Code as a living thing (Rule #3: The Living Code)
- We refactored live in each case study
- Showed where assumptions broke
-
Improved incrementally based on what we learned
-
Tests reveal purpose (Rule #4: The Virtuous Flaw)
- Log analyzer: Tested irregular data structures
- Doc generator: Tested functions without docstrings
-
API pipeline: Tested error handling and retries
-
Build incrementally (Rule #5: The Gentle Ascent)
- Each case study had working code after every step
- No "wait until the end to see if it works"
-
Each addition built on proven foundation
-
Introduce concepts slowly (Rule #6: The Unburdened Mind)
- Started with familiar patterns (recursion, loops)
- Added parallel execution only when needed
- Introduced caching, logging, and monitoring one at a time
The Universal Lessons
When facing any nested data problem:
- Identify the pattern: Is it search, navigation, or transformation?
- Check for libraries: Does BeautifulSoup, tree-sitter, or pandas already solve this?
- Start simple: Get something working before optimizing
- Handle errors early: Production means expecting failure
- Optimize when necessary: Measure before parallelizing
- Make it reusable: Future you will thank present you
Your Next Steps
You now have three complete examples showing:
- Recursive search in nested JSON (log analyzer)
- AST traversal with direct access (doc generator)
- Multi-level API navigation with parallelization (data pipeline)
These patterns appear everywhere:
- Processing configuration files
- Analyzing code repositories
- Scraping structured websites
- Building ETL pipelines
- Creating monitoring tools
The techniques are universal. The mindset is transferable. The practice is what makes you fluent.
Now go build something.