🏠

Part VII: Real-World Applications


Chapter 19: Case Study - Log File Analyzer

The Problem: Understanding Server Behavior

You've been asked to analyze log files from a web application. Each log entry is JSON, but the structure varies by event type. Some events are nested several levels deep. You need to:

Here's a sample log entry:

{
    "timestamp": "2025-11-10T14:23:11Z",
    "level": "error",
    "service": "api-gateway",
    "event": {
        "type": "request_failed",
        "details": {
            "user": {"id": "user_123", "session": "abc"},
            "request": {"path": "/api/data", "method": "GET"},
            "error": {"code": 500, "message": "Database timeout"}
        },
        "metadata": {"retry_count": 3, "duration_ms": 5000}
    }
}

But you've also seen entries like this:

{
    "timestamp": "2025-11-10T14:25:03Z",
    "level": "info",
    "auth": {"user_id": "user_456", "result": "success"}
}

First Steps: Explore Before You Code

Let's follow Rule #2: The Path to Wisdom and start with the obvious approach before seeking elegance.

import json

# Load one entry and explore it
with open('sample.log', 'r') as f:
    entry = json.loads(f.readline())

print(type(entry))  # dict
print(entry.keys())  # See what's at the top level

# Try accessing what we know
print(entry['level'])  # Works
print(entry.get('event', {}).get('details', {}).get('user', {}).get('id'))
# This works, but is fragile and hard to read

The problem reveals itself: we don't know where user IDs live in advance. This is a search problem, not a navigation problem.

Let's build a function that finds all occurrences of a key, no matter how deeply nested:

def find_all_values(data, target_key):
    """Find all values for a key in nested structure."""
    results = []

    # What type are we looking at?
    if isinstance(data, dict):
        # Check each key-value pair
        for key, value in data.items():
            if key == target_key:
                results.append(value)
            # Recurse into the value
            results.extend(find_all_values(value, target_key))

    elif isinstance(data, list):
        # Check each item in list
        for item in data:
            results.extend(find_all_values(item, target_key))

    # Base case: primitive types, just return empty list
    return results

# Test it
user_ids = find_all_values(entry, 'user_id')
print(f"Found user IDs: {user_ids}")

# Also find 'id' keys (might catch user.id)
ids = find_all_values(entry, 'id')
print(f"Found IDs: {ids}")

The Living Code moment: Notice how we handle three types? Let's pause and make this clearer:

def find_all_values(data, target_key):
    """Find all values for a key in nested structure.

    Handles dicts, lists, and primitives gracefully.
    """
    results = []

    if isinstance(data, dict):
        for key, value in data.items():
            if key == target_key:
                results.append(value)
            # Always recurse - value might contain more matches
            results.extend(find_all_values(value, target_key))

    elif isinstance(data, list):
        for item in data:
            results.extend(find_all_values(item, target_key))

    # If it's neither dict nor list, we're done (base case)
    return results

Applying It: Processing All Logs

Now let's apply Rule #5: The Gentle Ascent. We'll build this incrementally.

Step 1: Read and parse logs

def load_logs(filename):
    """Load log entries from a JSONL file."""
    entries = []
    with open(filename, 'r') as f:
        for line in f:
            try:
                entries.append(json.loads(line))
            except json.JSONDecodeError:
                # Skip malformed lines
                continue
    return entries

logs = load_logs('server.log')
print(f"Loaded {len(logs)} entries")

Run this. See it work. Feel good about it.

Step 2: Extract user activity

def extract_user_activity(logs):
    """Find all user-related events."""
    user_events = []

    for entry in logs:
        # Find user IDs - they might be under 'user_id' or nested in 'user'
        user_ids = find_all_values(entry, 'user_id')

        # Also check for 'id' keys inside 'user' objects
        user_objects = find_all_values(entry, 'user')
        for user_obj in user_objects:
            if isinstance(user_obj, dict) and 'id' in user_obj:
                user_ids.append(user_obj['id'])

        if user_ids:
            user_events.append({
                'timestamp': entry['timestamp'],
                'user_ids': list(set(user_ids)),  # Deduplicate
                'level': entry.get('level', 'unknown'),
                'entry': entry
            })

    return user_events

user_activity = extract_user_activity(logs)
print(f"Found {len(user_activity)} user-related events")

Step 3: Find error patterns

def find_errors(logs):
    """Extract all error events with context."""
    errors = []

    for entry in logs:
        if entry.get('level') == 'error':
            # Extract error details
            error_messages = find_all_values(entry, 'message')
            error_codes = find_all_values(entry, 'code')

            errors.append({
                'timestamp': entry['timestamp'],
                'messages': error_messages,
                'codes': error_codes,
                'full_entry': entry
            })

    return errors

errors = find_errors(logs)
print(f"Found {len(errors)} errors")

The More Elegant Path

Following Rule #2 again, let's look at what we've learned and improve it. Our current approach has repetition. Let's create a more flexible tool:

def extract_fields(data, field_spec):
    """Extract multiple fields from nested data.

    Args:
        data: Nested dict/list structure
        field_spec: Dict mapping field names to extraction paths

    Returns:
        Dict with extracted values
    """
    result = {}

    for field_name, key_to_find in field_spec.items():
        values = find_all_values(data, key_to_find)
        # Take first value if we only expect one
        result[field_name] = values[0] if len(values) == 1 else values

    return result

# Now our extraction is cleaner
for entry in logs[:5]:  # Test on first 5
    fields = extract_fields(entry, {
        'user_id': 'user_id',
        'error_code': 'code',
        'duration': 'duration_ms'
    })
    print(fields)

Why is this better? We can now declare what we want without repeating how to get it.

Handling the Messy Reality

Let's apply Rule #4: The Virtuous Flaw. Our code works on clean data, but what about this?

broken_entry = {
    "timestamp": "2025-11-10T14:30:00Z",
    "level": "error",
    "event": {
        "details": {
            "user": "user_789",  # Not a dict! Just a string
            "error": None  # Missing error details
        }
    }
}

Run our extractor on this. Watch it work, but return unexpected results. Now let's write a test:

def test_handles_irregular_structure():
    """Ensure we handle non-dict user fields."""
    entry = {
        "event": {
            "details": {
                "user": "user_789",  # String, not dict
            }
        }
    }

    # This shouldn't crash
    result = find_all_values(entry, 'user')
    assert result == ["user_789"]

    # And we shouldn't find 'id' since there's no dict
    ids = find_all_values(entry, 'id')
    assert ids == []

test_handles_irregular_structure()
print("✓ Test passed")

Good! Our recursive function already handles this because we only recurse into dicts and lists.

The Complete Analyzer

Now let's put it together with Rule #5: The Gentle Ascent in mind:

class LogAnalyzer:
    """Analyze structured log files."""

    def __init__(self, log_file):
        self.logs = self._load_logs(log_file)

    def _load_logs(self, filename):
        """Load and parse log entries."""
        entries = []
        with open(filename, 'r') as f:
            for i, line in enumerate(f, 1):
                try:
                    entries.append(json.loads(line))
                except json.JSONDecodeError as e:
                    print(f"Warning: Skipped malformed line {i}: {e}")
        return entries

    def find_field_values(self, field_name):
        """Find all occurrences of a field across all logs."""
        all_values = []
        for entry in self.logs:
            values = find_all_values(entry, field_name)
            all_values.extend(values)
        return all_values

    def filter_by_level(self, level):
        """Return all entries with specified log level."""
        return [e for e in self.logs if e.get('level') == level]

    def user_error_summary(self):
        """Summarize errors by user."""
        errors = self.filter_by_level('error')
        summary = {}

        for error in errors:
            user_ids = find_all_values(error, 'user_id')
            # Also check nested user objects
            for user_obj in find_all_values(error, 'user'):
                if isinstance(user_obj, dict) and 'id' in user_obj:
                    user_ids.append(user_obj['id'])

            for user_id in user_ids:
                if user_id not in summary:
                    summary[user_id] = {'count': 0, 'errors': []}
                summary[user_id]['count'] += 1
                summary[user_id]['errors'].append(error)

        return summary

# Use it
analyzer = LogAnalyzer('server.log')
print(f"Analyzed {len(analyzer.logs)} log entries")

# Find all errors
errors = analyzer.filter_by_level('error')
print(f"Found {len(errors)} errors")

# Which users experienced errors?
user_errors = analyzer.user_error_summary()
for user_id, data in user_errors.items():
    print(f"User {user_id}: {data['count']} errors")

Optimization Journey

Our initial version loads all logs into memory. For large files, this won't work. Let's improve it using Rule #3: The Living Code:

def analyze_logs_streaming(filename, target_field):
    """Analyze logs without loading all into memory."""
    field_counts = {}

    with open(filename, 'r') as f:
        for line in f:
            try:
                entry = json.loads(line)
                values = find_all_values(entry, target_field)

                for value in values:
                    field_counts[value] = field_counts.get(value, 0) + 1

            except json.JSONDecodeError:
                continue

    return field_counts

# This processes logs one at a time
user_counts = analyze_logs_streaming('large_server.log', 'user_id')
print(f"Found {len(user_counts)} unique users")

Why is this better? It uses constant memory regardless of file size. Trade-off: we can only do one analysis pass at a time.

Lessons Learned

  1. Exploration first: We spent time understanding the data structure before writing complex code
  2. Simple, then elegant: The recursive find_all_values function solved our core problem cleanly
  3. Test the edges: Irregular data revealed assumptions we didn't know we had
  4. Incremental building: Each step produced a working tool we could verify
  5. Optimization when needed: We started with the clear approach, then optimized for real constraints

Your Turn: Extend It

Try these extensions:

  1. Add a method to find events within a time range
  2. Track which paths through the nested structure actually contained values
  3. Build a report showing the most common error types
  4. Handle compressed log files (.gz)

Chapter 20: Case Study - Documentation Generator

The Problem: Auto-Document Python Code

Your team writes Python libraries but documentation lags behind code changes. You need to:

Here's sample code to document:

def calculate_score(player_data, bonus_multiplier=1.0):
    """Calculate final score with bonuses.

    Args:
        player_data: Dict with 'base_score' and 'achievements'
        bonus_multiplier: Multiplier for bonus points

    Returns:
        Final score as integer
    """
    base = player_data['base_score']
    bonus = len(player_data.get('achievements', [])) * 100
    return int(base + bonus * bonus_multiplier)

def _internal_helper(value):
    """Internal function - not for public use."""
    return value * 2

Exploration: Understanding AST Structure

Following our principles, let's explore before building:

import tree_sitter_python as tspython
from tree_sitter import Language, Parser

# Set up parser
PY_LANGUAGE = Language(tspython.language())
parser = Parser(PY_LANGUAGE)

# Parse sample code
source_code = b'''
def calculate_score(player_data, bonus_multiplier=1.0):
    """Calculate final score with bonuses."""
    return player_data['base_score']
'''

tree = parser.parse(source_code)
root = tree.root_node

# What's at the root?
print(f"Root type: {root.type}")
print(f"Number of children: {len(root.children)}")

# Look at the first child
if root.children:
    first_child = root.children[0]
    print(f"\nFirst child type: {first_child.type}")
    print(f"First child has {len(first_child.children)} children")

Output shows us:

Root type: module
Number of children: 1

First child type: function_definition
First child has 5 children

Let's dig deeper into that function definition:

func_node = root.children[0]

for i, child in enumerate(func_node.children):
    print(f"{i}: {child.type:20} | {source_code[child.start_byte:child.end_byte]}")

This reveals the structure:

0: def                  | b'def'
1: identifier           | b'calculate_score'
2: parameters           | b'(player_data, bonus_multiplier=1.0)'
3: :                    | b':'
4: block                | b'""" ... return ...'

Key insight: Function parts are predictable children. We can use direct access!

The Simple Path: Direct Extraction

def extract_function_info(func_node, source):
    """Extract info from a function_definition node."""

    # Get function name (child at index 1)
    name_node = func_node.children[1]
    name = source[name_node.start_byte:name_node.end_byte].decode('utf8')

    # Get parameters (child at index 2)
    params_node = func_node.children[2]
    params = source[params_node.start_byte:params_node.end_byte].decode('utf8')

    # Find the block (contains body and docstring)
    block_node = func_node.children[4]

    # Check if first statement is a docstring
    docstring = None
    if block_node.children:
        first_stmt = block_node.children[0]
        if first_stmt.type == 'expression_statement':
            # Check if it's a string
            expr = first_stmt.children[0]
            if expr.type == 'string':
                doc_bytes = source[expr.start_byte:expr.end_byte]
                docstring = doc_bytes.decode('utf8').strip('"""').strip("'''").strip()

    return {
        'name': name,
        'parameters': params,
        'docstring': docstring,
        'is_public': not name.startswith('_')
    }

# Test it
func_info = extract_function_info(func_node, source_code)
print(func_info)

Applying Rule #4: The Virtuous Flaw

Our code works for the happy path. But what if there's no docstring?

source_with_no_doc = b'''
def helper(x):
    return x * 2
'''

tree = parser.parse(source_with_no_doc)
func_node = tree.root_node.children[0]

# This might crash if we access children[0] without checking
try:
    info = extract_function_info(func_node, source_with_no_doc)
    print(info)
except IndexError as e:
    print(f"Bug found: {e}")

Write a test that catches this:

def test_function_without_docstring():
    """Ensure we handle functions without docstrings."""
    source = b'def f(x):\n    return x'
    tree = parser.parse(source)
    func = tree.root_node.children[0]

    info = extract_function_info(func, source)
    assert info['docstring'] is None
    assert info['name'] == 'f'

test_function_without_docstring()
print("✓ Test passed")

Good! Our code already handles this because we check if block_node.children first.

Finding All Functions: Search Pattern

Now we need to find functions anywhere in a file:

def find_all_functions(node):
    """Recursively find all function_definition nodes."""
    functions = []

    # Is this node a function definition?
    if node.type == 'function_definition':
        functions.append(node)

    # Recurse into children
    for child in node.children:
        functions.extend(find_all_functions(child))

    return functions

# Parse a file with multiple functions
multi_func_source = b'''
def public_func(x):
    """Public function."""
    return x

def _private_func(y):
    """Private function."""
    return y * 2

class MyClass:
    def method(self):
        """A method."""
        pass
'''

tree = parser.parse(multi_func_source)
all_funcs = find_all_functions(tree.root_node)
print(f"Found {len(all_funcs)} functions")

for func in all_funcs:
    info = extract_function_info(func, multi_func_source)
    print(f"- {info['name']} (public: {info['is_public']})")

Building the Documentation Generator

Let's combine everything into a working tool:

class PythonDocGenerator:
    """Generate documentation from Python source files."""

    def __init__(self):
        self.parser = Parser(Language(tspython.language()))

    def parse_file(self, filepath):
        """Parse a Python file and return the AST."""
        with open(filepath, 'rb') as f:
            source = f.read()
        return self.parser.parse(source), source

    def extract_functions(self, node, source):
        """Extract all functions with their info."""
        functions = []

        def recurse(n):
            if n.type == 'function_definition':
                info = self._extract_function_details(n, source)
                functions.append(info)

            for child in n.children:
                recurse(child)

        recurse(node)
        return functions

    def _extract_function_details(self, func_node, source):
        """Extract detailed info from function node."""
        # Name
        name_node = func_node.child_by_field_name('name')
        name = source[name_node.start_byte:name_node.end_byte].decode('utf8')

        # Parameters
        params_node = func_node.child_by_field_name('parameters')
        params = source[params_node.start_byte:params_node.end_byte].decode('utf8')

        # Body and docstring
        body_node = func_node.child_by_field_name('body')
        docstring = self._extract_docstring(body_node, source)

        return {
            'name': name,
            'signature': f"{name}{params}",
            'docstring': docstring,
            'is_public': not name.startswith('_'),
            'line': func_node.start_point[0] + 1
        }

    def _extract_docstring(self, body_node, source):
        """Extract docstring from function body."""
        if not body_node or not body_node.children:
            return None

        # First statement might be docstring
        first = body_node.children[0]
        if first.type == 'expression_statement':
            expr = first.children[0] if first.children else None
            if expr and expr.type == 'string':
                doc_text = source[expr.start_byte:expr.end_byte].decode('utf8')
                # Remove quotes
                return doc_text.strip('"""').strip("'''").strip()

        return None

    def generate_markdown(self, filepath, include_private=False):
        """Generate Markdown documentation for a file."""
        tree, source = self.parse_file(filepath)
        functions = self.extract_functions(tree.root_node, source)

        # Filter out private functions if requested
        if not include_private:
            functions = [f for f in functions if f['is_public']]

        # Generate markdown
        lines = [f"# Documentation: {filepath}\n"]

        for func in functions:
            lines.append(f"## `{func['signature']}`\n")

            if func['docstring']:
                lines.append(func['docstring'])
                lines.append("")
            else:
                lines.append("*No documentation provided*\n")

            lines.append(f"*Defined at line {func['line']}*\n")

        return "\n".join(lines)

# Use it
generator = PythonDocGenerator()
docs = generator.generate_markdown('mymodule.py')
print(docs)

# Save to file
with open('API.md', 'w') as f:
    f.write(docs)

Handling Edge Cases

Let's make our tool more robust:

def test_nested_functions():
    """Ensure we find nested function definitions."""
    source = b'''
def outer():
    def inner():
        """Nested function."""
        pass
    return inner
'''

    gen = PythonDocGenerator()
    tree, _ = gen.parser.parse(source), source
    funcs = gen.extract_functions(tree.root_node, source)

    # Should find both outer and inner
    assert len(funcs) == 2
    names = [f['name'] for f in funcs]
    assert 'outer' in names
    assert 'inner' in names

test_nested_functions()
print("✓ Nested functions handled")

Extending: Parse Parameter Descriptions

Docstrings often have structured parameter docs:

def _parse_docstring_params(docstring):
    """Extract parameter descriptions from docstring.

    Expects Google-style docstrings with Args: section.
    """
    if not docstring or 'Args:' not in docstring:
        return {}

    params = {}
    in_args_section = False

    for line in docstring.split('\n'):
        line = line.strip()

        if line == 'Args:':
            in_args_section = True
            continue

        if in_args_section:
            # Stop at next section
            if line.endswith(':') and not line.startswith(' '):
                break

            # Parse parameter line
            if ':' in line:
                param_name = line.split(':')[0].strip()
                param_desc = line.split(':', 1)[1].strip()
                params[param_name] = param_desc

    return params

# Test it
doc = """Calculate score.

Args:
    player_data: Dict with player info
    bonus_multiplier: Multiplier for bonuses

Returns:
    Final score
"""

params = _parse_docstring_params(doc)
print(params)
# {'player_data': 'Dict with player info',
#  'bonus_multiplier': 'Multiplier for bonuses'}

Lessons Learned

  1. Field access is powerful: Using .child_by_field_name() made extraction reliable
  2. AST structure is predictable: Function definitions always have the same children
  3. Recursion finds everything: Simple recursive search found all functions, even nested ones
  4. Docstrings have structure: Parsing them added real value
  5. Tests catch edge cases: Functions without docstrings, nested functions

Your Turn

Extend the generator to:

  1. Extract class definitions and their methods
  2. Parse return type hints
  3. Generate a table of contents
  4. Handle async functions

Chapter 21: Case Study - Data Pipeline for Nested APIs

The Problem: Multi-Level Data Collection

You're building a dashboard that pulls data from a REST API. The API has this structure:

You need to:

This is a nested traversal where each level requires API calls.

Starting Simple: Single-Level Fetch

import requests
import time

API_BASE = "https://api.example.com"

def fetch_teams():
    """Fetch all teams."""
    response = requests.get(f"{API_BASE}/api/teams")
    response.raise_for_status()
    return response.json()

# Test it
teams = fetch_teams()
print(f"Found {len(teams)} teams")
print(f"First team: {teams[0]}")

Output might look like:

Found 3 teams
First team: {'id': 'team_1', 'name': 'Engineering', 'member_count': 5}

Level Two: Fetching Team Details

def fetch_team_details(team_id):
    """Fetch detailed info for a specific team."""
    response = requests.get(f"{API_BASE}/api/teams/{team_id}")
    response.raise_for_status()
    data = response.json()
    return data

# Test it
team_details = fetch_team_details('team_1')
print(f"Team name: {team_details['name']}")
print(f"Members: {team_details['members']}")

This might return:

{
    'id': 'team_1',
    'name': 'Engineering',
    'members': [
        {'id': 'mem_1', 'name': 'Alice', 'role': 'Lead'},
        {'id': 'mem_2', 'name': 'Bob', 'role': 'Developer'}
    ]
}

The Naive Approach: Nested Loops

Rule #2: Show the simple path first:

def collect_all_data_naive():
    """Collect all data - simple but slow."""
    all_data = []

    # Get all teams
    teams = fetch_teams()

    # For each team, get details
    for team in teams:
        team_data = fetch_team_details(team['id'])

        # For each member, get their data
        for member in team_data.get('members', []):
            member_detail = fetch_member_data(team['id'], member['id'])

            # Combine everything
            all_data.append({
                'team_id': team['id'],
                'team_name': team_data['name'],
                'member_id': member['id'],
                'member_name': member['name'],
                'member_role': member['role'],
                'member_details': member_detail
            })

            # Be nice to the API
            time.sleep(0.1)

    return all_data

def fetch_member_data(team_id, member_id):
    """Fetch individual member data."""
    url = f"{API_BASE}/api/teams/{team_id}/members/{member_id}"
    response = requests.get(url)
    response.raise_for_status()
    return response.json()

# Run it
data = collect_all_data_naive()
print(f"Collected data for {len(data)} members")

This works, but it's slow! If you have 10 teams with 10 members each, that's:

The Better Path: Error Handling First

Before optimizing, let's make it robust (Rule #4):

class APIError(Exception):
    """Custom exception for API errors."""
    pass

def safe_fetch(url, max_retries=3):
    """Fetch with error handling and retries."""
    for attempt in range(max_retries):
        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()
            return response.json()

        except requests.exceptions.Timeout:
            print(f"Timeout on attempt {attempt + 1}/{max_retries}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # Exponential backoff
            else:
                raise APIError(f"Failed after {max_retries} attempts")

        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 404:
                # Resource not found - don't retry
                return None
            elif e.response.status_code == 429:
                # Rate limited - wait longer
                wait_time = int(e.response.headers.get('Retry-After', 60))
                print(f"Rate limited, waiting {wait_time}s")
                time.sleep(wait_time)
            else:
                raise APIError(f"HTTP error: {e}")

    raise APIError("Max retries exceeded")

# Update our functions to use this
def fetch_teams_safe():
    """Fetch teams with error handling."""
    data = safe_fetch(f"{API_BASE}/api/teams")
    return data if data else []

Write a test to verify this works:

def test_handles_404():
    """Ensure we handle missing resources gracefully."""
    # This team doesn't exist
    result = safe_fetch(f"{API_BASE}/api/teams/nonexistent")
    assert result is None

# Run it (will actually call API, so be careful)
# test_handles_404()

Optimization: Parallel Requests

Now let's apply Rule #2 and show the elegant solution:

from concurrent.futures import ThreadPoolExecutor, as_completed

def collect_all_data_parallel():
    """Collect all data using parallel requests."""
    all_data = []

    # Step 1: Get all teams
    teams = fetch_teams_safe()
    print(f"Fetching details for {len(teams)} teams...")

    # Step 2: Fetch team details in parallel
    team_details = {}
    with ThreadPoolExecutor(max_workers=5) as executor:
        future_to_team = {
            executor.submit(fetch_team_details_safe, team['id']): team['id']
            for team in teams
        }

        for future in as_completed(future_to_team):
            team_id = future_to_team[future]
            try:
                details = future.result()
                if details:
                    team_details[team_id] = details
            except Exception as e:
                print(f"Error fetching team {team_id}: {e}")

    print(f"Got details for {len(team_details)} teams")

    # Step 3: Fetch all member data in parallel
    member_tasks = []
    for team_id, team_data in team_details.items():
        for member in team_data.get('members', []):
            member_tasks.append({
                'team_id': team_id,
                'team_name': team_data['name'],
                'member': member
            })

    print(f"Fetching data for {len(member_tasks)} members...")

    with ThreadPoolExecutor(max_workers=10) as executor:
        future_to_task = {
            executor.submit(
                fetch_member_data_safe,
                task['team_id'],
                task['member']['id']
            ): task
            for task in member_tasks
        }

        for future in as_completed(future_to_task):
            task = future_to_task[future]
            try:
                member_detail = future.result()
                if member_detail:
                    all_data.append({
                        'team_id': task['team_id'],
                        'team_name': task['team_name'],
                        'member_id': task['member']['id'],
                        'member_name': task['member']['name'],
                        'member_role': task['member']['role'],
                        'member_details': member_detail
                    })
            except Exception as e:
                print(f"Error fetching member {task['member']['id']}: {e}")

    return all_data

def fetch_team_details_safe(team_id):
    """Fetch team details with error handling."""
    return safe_fetch(f"{API_BASE}/api/teams/{team_id}")

def fetch_member_data_safe(team_id, member_id):
    """Fetch member data with error handling."""
    url = f"{API_BASE}/api/teams/{team_id}/members/{member_id}"
    return safe_fetch(url)

# Run it
data = collect_all_data_parallel()
print(f"Collected data for {len(data)} members")

Why is this better?

Transformation: Normalizing the Data

Now we have nested data. Let's flatten it for analysis:

def normalize_member_data(raw_data):
    """Transform nested API data into flat records."""
    normalized = []

    for record in raw_data:
        # Extract nested details
        details = record.get('member_details', {})

        flat_record = {
            'team_id': record['team_id'],
            'team_name': record['team_name'],
            'member_id': record['member_id'],
            'member_name': record['member_name'],
            'member_role': record['member_role'],

            # Flatten member details
            'email': details.get('email'),
            'joined_date': details.get('joined_date'),
            'active': details.get('active', True),

            # Nested stats
            'commits': details.get('stats', {}).get('commits', 0),
            'reviews': details.get('stats', {}).get('reviews', 0),
            'projects': len(details.get('projects', []))
        }

        normalized.append(flat_record)

    return normalized

# Apply it
flat_data = normalize_member_data(data)

# Now you can easily analyze
import pandas as pd
df = pd.DataFrame(flat_data)
print(df.groupby('team_name')['commits'].sum())

Building a Reusable Pipeline

Let's wrap this into a class following Rule #5: The Gentle Ascent:

class APIPipeline:
    """Reusable pipeline for multi-level API data collection."""

    def __init__(self, base_url, max_workers=10):
        self.base_url = base_url
        self.max_workers = max_workers
        self.cache = {}

    def fetch_with_cache(self, url):
        """Fetch URL with caching."""
        if url in self.cache:
            return self.cache[url]

        data = safe_fetch(url)
        if data:
            self.cache[url] = data
        return data

    def fetch_level(self, urls, show_progress=True):
        """Fetch multiple URLs in parallel."""
        results = {}

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            future_to_url = {
                executor.submit(self.fetch_with_cache, url): url
                for url in urls
            }

            completed = 0
            for future in as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    data = future.result()
                    results[url] = data

                    completed += 1
                    if show_progress and completed % 10 == 0:
                        print(f"Progress: {completed}/{len(urls)}")

                except Exception as e:
                    print(f"Error fetching {url}: {e}")
                    results[url] = None

        return results

    def collect_team_data(self):
        """Complete data collection pipeline."""
        print("Step 1: Fetching teams...")
        teams = self.fetch_with_cache(f"{self.base_url}/api/teams")

        if not teams:
            return []

        print(f"Step 2: Fetching {len(teams)} team details...")
        team_urls = [f"{self.base_url}/api/teams/{t['id']}" for t in teams]
        team_details = self.fetch_level(team_urls)

        print("Step 3: Collecting member URLs...")
        member_urls = []
        member_context = {}

        for url, team_data in team_details.items():
            if not team_data:
                continue

            team_id = team_data['id']
            for member in team_data.get('members', []):
                member_url = (
                    f"{self.base_url}/api/teams/{team_id}"
                    f"/members/{member['id']}"
                )
                member_urls.append(member_url)
                member_context[member_url] = {
                    'team': team_data,
                    'member_basic': member
                }

        print(f"Step 4: Fetching {len(member_urls)} member details...")
        member_details = self.fetch_level(member_urls)

        print("Step 5: Combining data...")
        combined = []
        for url, detail in member_details.items():
            if not detail:
                continue

            context = member_context[url]
            combined.append({
                'team': context['team'],
                'member_basic': context['member_basic'],
                'member_detail': detail
            })

        return combined

# Use it
pipeline = APIPipeline("https://api.example.com", max_workers=10)
data = pipeline.collect_team_data()

print(f"\nCollected complete data for {len(data)} members")
print(f"Cache contains {len(pipeline.cache)} entries")

Handling Incremental Updates

What if you want to update just one team's data?

class IncrementalPipeline(APIPipeline):
    """Pipeline that supports incremental updates."""

    def __init__(self, base_url, cache_file='cache.json', max_workers=10):
        super().__init__(base_url, max_workers)
        self.cache_file = cache_file
        self.load_cache()

    def load_cache(self):
        """Load cache from disk."""
        try:
            with open(self.cache_file, 'r') as f:
                self.cache = json.load(f)
            print(f"Loaded {len(self.cache)} cached entries")
        except FileNotFoundError:
            self.cache = {}

    def save_cache(self):
        """Save cache to disk."""
        with open(self.cache_file, 'w') as f:
            json.dump(self.cache, f)
        print(f"Saved {len(self.cache)} entries to cache")

    def invalidate_team(self, team_id):
        """Remove team and its members from cache."""
        keys_to_remove = [
            k for k in self.cache.keys()
            if f'/teams/{team_id}' in k
        ]
        for key in keys_to_remove:
            del self.cache[key]
        print(f"Invalidated {len(keys_to_remove)} cache entries")

    def update_team(self, team_id):
        """Update just one team's data."""
        # Invalidate old data
        self.invalidate_team(team_id)

        # Fetch fresh data
        team_url = f"{self.base_url}/api/teams/{team_id}"
        team_data = self.fetch_with_cache(team_url)

        if not team_data:
            return []

        # Fetch member details
        member_urls = [
            f"{self.base_url}/api/teams/{team_id}/members/{m['id']}"
            for m in team_data.get('members', [])
        ]

        member_details = self.fetch_level(member_urls, show_progress=False)

        # Save updated cache
        self.save_cache()

        return member_details

# Use it
pipeline = IncrementalPipeline("https://api.example.com")

# Full update
all_data = pipeline.collect_team_data()
pipeline.save_cache()

# Later: update just one team
updated = pipeline.update_team('team_1')
print(f"Updated {len(updated)} members")

Production Considerations

Let's add monitoring and rate limiting:

import logging
from datetime import datetime, timedelta

class ProductionPipeline(IncrementalPipeline):
    """Production-ready pipeline with monitoring."""

    def __init__(self, base_url, cache_file='cache.json',
                 max_workers=10, rate_limit_per_second=10):
        super().__init__(base_url, cache_file, max_workers)

        self.rate_limit = rate_limit_per_second
        self.request_times = []

        # Set up logging
        logging.basicConfig(
            filename='pipeline.log',
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)

    def _wait_for_rate_limit(self):
        """Ensure we don't exceed rate limit."""
        now = datetime.now()

        # Remove requests older than 1 second
        self.request_times = [
            t for t in self.request_times
            if now - t < timedelta(seconds=1)
        ]

        # If at limit, wait
        if len(self.request_times) >= self.rate_limit:
            sleep_time = 1.0 - (now - self.request_times[0]).total_seconds()
            if sleep_time > 0:
                time.sleep(sleep_time)

        self.request_times.append(now)

    def fetch_with_cache(self, url):
        """Fetch with rate limiting and logging."""
        self._wait_for_rate_limit()

        self.logger.info(f"Fetching: {url}")

        try:
            data = super().fetch_with_cache(url)
            self.logger.info(f"Success: {url}")
            return data

        except Exception as e:
            self.logger.error(f"Failed: {url} - {e}")
            raise

    def collect_team_data(self):
        """Collect with timing information."""
        start_time = datetime.now()
        self.logger.info("Starting data collection")

        try:
            data = super().collect_team_data()

            duration = (datetime.now() - start_time).total_seconds()
            self.logger.info(
                f"Completed: {len(data)} records in {duration:.1f}s"
            )

            return data

        except Exception as e:
            self.logger.error(f"Collection failed: {e}")
            raise

# Use it
pipeline = ProductionPipeline(
    "https://api.example.com",
    max_workers=5,
    rate_limit_per_second=10
)

data = pipeline.collect_team_data()

Lessons Learned

  1. Start sequential: The naive nested loop approach worked and was easy to understand
  2. Add error handling before optimization: Robust code > fast code
  3. Parallelize wisely: ThreadPoolExecutor made it 10x faster with minimal complexity
  4. Cache aggressively: Repeated requests are wasteful
  5. Make it incremental: Production systems need partial updates
  6. Monitor everything: Logging reveals issues before users do

Your Turn: Extend It

Try these challenges:

  1. Add retry with exponential backoff for transient failures
  2. Implement pagination for APIs that return data in pages
  3. Add a progress bar using the tqdm library
  4. Handle authentication tokens that expire
  5. Write tests using responses library to mock API calls

Example test structure:

import responses

@responses.activate
def test_handles_empty_teams():
    """Ensure pipeline handles empty team list."""
    responses.add(
        responses.GET,
        'https://api.example.com/api/teams',
        json=[],
        status=200
    )

    pipeline = APIPipeline('https://api.example.com')
    data = pipeline.collect_team_data()

    assert data == []

@responses.activate
def test_handles_team_with_no_members():
    """Ensure pipeline handles teams without members."""
    responses.add(
        responses.GET,
        'https://api.example.com/api/teams',
        json=[{'id': 'team_1', 'name': 'Empty Team'}],
        status=200
    )
    responses.add(
        responses.GET,
        'https://api.example.com/api/teams/team_1',
        json={'id': 'team_1', 'name': 'Empty Team', 'members': []},
        status=200
    )

    pipeline = APIPipeline('https://api.example.com')
    data = pipeline.collect_team_data()

    assert data == []

Reflection on Part VII

What We Learned Across All Three Case Studies

The Pattern of Mastery:

  1. Explore first (Rule #1: The Designer's Mind)
  2. Log analyzer: We examined the JSON structure before writing extraction code
  3. Doc generator: We parsed sample files to understand AST node relationships
  4. API pipeline: We made single requests before building the full traversal

  5. Simple, then elegant (Rule #2: The Path to Wisdom)

  6. Log analyzer: Basic recursive search → Generic field extraction
  7. Doc generator: Direct child access → Reusable extractor class
  8. API pipeline: Nested loops → Parallel fetching with caching

  9. Code as a living thing (Rule #3: The Living Code)

  10. We refactored live in each case study
  11. Showed where assumptions broke
  12. Improved incrementally based on what we learned

  13. Tests reveal purpose (Rule #4: The Virtuous Flaw)

  14. Log analyzer: Tested irregular data structures
  15. Doc generator: Tested functions without docstrings
  16. API pipeline: Tested error handling and retries

  17. Build incrementally (Rule #5: The Gentle Ascent)

  18. Each case study had working code after every step
  19. No "wait until the end to see if it works"
  20. Each addition built on proven foundation

  21. Introduce concepts slowly (Rule #6: The Unburdened Mind)

  22. Started with familiar patterns (recursion, loops)
  23. Added parallel execution only when needed
  24. Introduced caching, logging, and monitoring one at a time

The Universal Lessons

When facing any nested data problem:

  1. Identify the pattern: Is it search, navigation, or transformation?
  2. Check for libraries: Does BeautifulSoup, tree-sitter, or pandas already solve this?
  3. Start simple: Get something working before optimizing
  4. Handle errors early: Production means expecting failure
  5. Optimize when necessary: Measure before parallelizing
  6. Make it reusable: Future you will thank present you

Your Next Steps

You now have three complete examples showing:

These patterns appear everywhere:

The techniques are universal. The mindset is transferable. The practice is what makes you fluent.

Now go build something.