Network-Level Behavioral Ad Targeting System
Jun 24
|
By Gautam Ankoji
Network-Level Behavioral Ad Targeting System: A Unified Approach to Consumer Engagement and Ad Optimization
Collections:
Deep-Learning
Communication
PublicSpeaking
SelfImprovement
/experiment/network-level-ad-targeting-system

Building Your Own Network-Level Behavioral Ad Targeting System: A Practical Experiment

Network monitoring setup

Introduction

Have you ever wondered how online advertisements seem to follow you across different websites and platforms? Traditional ad targeting systems are limited to specific platforms or browsers, but what if we could capture a more holistic view of online behavior? In this experimental guide, I'll walk you through building a simplified network-level behavioral ad targeting system that operates at the router level, providing insights across all connected devices and services.

This hands-on experiment will demonstrate the core concepts behind the technology described in recent research from the Digital Marketing Innovations Lab. While their enterprise-grade system required sophisticated infrastructure, we'll create a scaled-down version suitable for a home lab environment that still demonstrates the key principles.

Important Note: This experiment is designed for educational purposes only. Always ensure you have explicit permission from all network users before implementing any monitoring system. The techniques described here should only be used on networks you own or have authorization to test on.

What You'll Need

  • A spare router that supports custom firmware (I recommend a TP-Link Archer C7 or similar)
  • A Raspberry Pi 4 (8GB RAM recommended)
  • 64GB microSD card
  • Ethernet cables
  • A computer for development and testing
  • Basic knowledge of Python and networking concepts
  • 3-5 test devices (phones, laptops, etc.)
  • Optional: A small network switch

Experiment Outline

Our experiment will consist of five main phases:

  1. Setting up router with custom firmware for traffic monitoring
  2. Configuring the data collection system on Raspberry Pi
  3. Building a basic behavioral analysis engine
  4. Creating a simple ad recommendation system
  5. Testing the complete system and analyzing results

Let's break down each phase with detailed instructions.

Phase 1: Router Setup with Custom Firmware

Step 1.1: Install OpenWrt

We'll use OpenWrt as our custom router firmware. It provides the flexibility we need to implement traffic monitoring.

  1. Download the appropriate OpenWrt firmware for your router from openwrt.org
  2. Follow the installation instructions specific to your router model
  3. After installation, access the OpenWrt admin interface (typically at 192.168.1.1)
  4. Set a secure password and configure basic network settings

Step 1.2: Install Required Packages

Connect to your router via SSH and install the necessary packages:

ssh root@192.168.1.1
opkg update
opkg install tcpdump netcat python3-light python3-pip luci-app-statistics collectd-mod-netlink

Step 1.3: Configure Network Monitoring

Create a basic monitoring script that will capture network traffic metadata:

pymonitor.py
Copy code
# Save as /root/monitor.py on the router
import subprocess
import time
import json
import os

OUTPUT_DIR = "/tmp/network_data"
os.makedirs(OUTPUT_DIR, exist_ok=True)

def extract_metadata(packet_info):
    # Simple parser to extract domain, timestamp, and device info
    metadata = {}
    
    # Extract source IP and MAC address
    if "IP" in packet_info:
        src_ip = packet_info.split("IP ")[1].split(" > ")[0].split(".")
        if len(src_ip) >= 4:
            metadata["source_ip"] = ".".join(src_ip[:4])
    
    # Extract destination domain (when available)
    if "Host: " in packet_info:
        domain = packet_info.split("Host: ")[1].split("\r")[0]
        metadata["domain"] = domain
    
    # Add timestamp
    metadata["timestamp"] = time.time()
    
    return metadata

def main():
    # Run tcpdump to capture only HTTP/HTTPS traffic headers
    # Note: This captures only unencrypted headers, not content
    cmd = [
        "tcpdump", 
        "-i", "br-lan", 
        "-n",
        "-A",
        "tcp port 80 or tcp port 443",
        "-l"  # Line buffered output
    ]
    
    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, text=True)
    
    current_batch = []
    last_save = time.time()
    
    try:
        for line in proc.stdout:
            metadata = extract_metadata(line)
            if metadata and "domain" in metadata:
                current_batch.append(metadata)
            
            # Save batch every 5 minutes
            if time.time() - last_save > 300:
                filename = f"{OUTPUT_DIR}/netdata_{int(time.time())}.json"
                with open(filename, "w") as f:
                    json.dump(current_batch, f)
                current_batch = []
                last_save = time.time()
                
    except KeyboardInterrupt:
        proc.terminate()
        if current_batch:
            filename = f"{OUTPUT_DIR}/netdata_{int(time.time())}.json"
            with open(filename, "w") as f:
                json.dump(current_batch, f)

if __name__ == "__main__":
    main()

Step 1.4: Set Up Data Transfer to Raspberry Pi

Create a script to periodically transfer collected data to our Raspberry Pi:

terminaltransfer_data.sh
Copy code
# Save as /root/transfer_data.sh on the router
#!/bin/sh

PI_IP="192.168.1.100"  # Replace with your Raspberry Pi's IP address
DATA_DIR="/tmp/network_data"

for file in $DATA_DIR/*.json; do
    if [ -f "$file" ]; then
        # Transfer file to Pi
        nc $PI_IP 8765 < "$file"
        
        # Remove file after transfer
        rm "$file"
    fi
done

Make the script executable:

chmod +x /root/transfer_data.sh

Step 1.5: Configure Automatic Execution

Set up cron jobs to ensure our scripts run automatically:

# Add to crontab on the router
(crontab -l 2>/dev/null; echo "@reboot /usr/bin/python3 /root/monitor.py &") | crontab -
(crontab -l 2>/dev/null; echo "*/5 * * * * /root/transfer_data.sh") | crontab -

Phase 2: Raspberry Pi Data Collection System

Step 2.1: Set Up Raspberry Pi

  1. Install Raspberry Pi OS on your microSD card using the Raspberry Pi Imager
  2. Configure the OS with SSH enabled
  3. Boot up the Pi and connect to your network
  4. Update the system:
sudo apt update
sudo apt upgrade -y

Step 2.2: Install Required Software

sudo apt install -y python3-pip python3-venv sqlite3
python3 -m venv ~/venv
source ~/venv/bin/activate
pip install pandas numpy scikit-learn flask requests

Step 2.3: Create Data Receiver Service

Create a script to receive data from the router:

pydata_receiver.py
Copy code
# Save as ~/data_receiver.py
import socket
import json
import sqlite3
import time
import os
from datetime import datetime

# Database setup
DB_PATH = os.path.expanduser("~/network_data.db")

def initialize_database():
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    
    # Create tables if they don't exist
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS raw_network_data (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        source_ip TEXT,
        domain TEXT,
        timestamp REAL,
        processed INTEGER DEFAULT 0
    )
    ''')
    
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS domains (
        domain TEXT PRIMARY KEY,
        category TEXT,
        last_updated REAL
    )
    ''')
    
    conn.commit()
    conn.close()

def categorize_domain(domain):
    # This is a simplified domain categorization
    # In a real implementation, you would use a comprehensive domain database
    
    categories = {
        "shopping": ["amazon", "ebay", "walmart", "etsy", "shop"],
        "social": ["facebook", "twitter", "instagram", "tiktok", "reddit"],
        "tech": ["github", "stackoverflow", "techcrunch", "wired", "cnet"],
        "news": ["cnn", "bbc", "nytimes", "reuters", "guardian"],
        "entertainment": ["youtube", "netflix", "hulu", "spotify", "disney"],
        "sports": ["espn", "nba", "nfl", "mlb", "fifa"]
    }
    
    domain_lower = domain.lower()
    
    for category, keywords in categories.items():
        for keyword in keywords:
            if keyword in domain_lower:
                return category
    
    return "other"

def process_network_data(data):
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    
    for item in data:
        if "domain" in item and "source_ip" in item:
            # Insert raw network data
            cursor.execute(
                "INSERT INTO raw_network_data (source_ip, domain, timestamp) VALUES (?, ?, ?)",
                (item["source_ip"], item["domain"], item["timestamp"])
            )
            
            # Check if domain exists in domains table
            cursor.execute("SELECT domain FROM domains WHERE domain = ?", (item["domain"],))
            if not cursor.fetchone():
                category = categorize_domain(item["domain"])
                cursor.execute(
                    "INSERT INTO domains (domain, category, last_updated) VALUES (?, ?, ?)",
                    (item["domain"], category, time.time())
                )
    
    conn.commit()
    conn.close()

def run_receiver():
    initialize_database()
    
    # Create server socket
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind(('0.0.0.0', 8765))
    server_socket.listen(5)
    
    print(f"[{datetime.now()}] Data receiver started on port 8765")
    
    while True:
        client_socket, address = server_socket.accept()
        print(f"[{datetime.now()}] Connection from {address}")
        
        # Receive data
        data = b""
        while True:
            chunk = client_socket.recv(4096)
            if not chunk:
                break
            data += chunk
        
        # Process data if not empty
        if data:
            try:
                json_data = json.loads(data.decode('utf-8'))
                process_network_data(json_data)
                print(f"[{datetime.now()}] Processed {len(json_data)} records")
            except json.JSONDecodeError:
                print(f"[{datetime.now()}] Error: Invalid JSON data received")
        
        client_socket.close()

if __name__ == "__main__":
    run_receiver()

Step 2.4: Create Systemd Service

Create a service to ensure our data receiver runs on startup:

# Save as /etc/systemd/system/data-receiver.service
[Unit]
Description=Network Data Receiver
After=network.target

[Service]
Type=simple
User=pi
WorkingDirectory=/home/pi
ExecStart=/home/pi/venv/bin/python /home/pi/data_receiver.py
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target

Enable and start the service:

sudo systemctl enable data-receiver
sudo systemctl start data-receiver

Phase 3: Building the Behavioral Analysis Engine

Step 3.1: Create User Profile Generator

Create a script to generate user profiles based on browsing patterns:

pyprofile_generator.py
Copy code
# Save as ~/profile_generator.py
import sqlite3
import json
import time
import os
import numpy as np
from collections import Counter
from datetime import datetime, timedelta

DB_PATH = os.path.expanduser("~/network_data.db")

def get_recent_data(days=7):
    """Get network data from the past X days"""
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    
    # Calculate timestamp from X days ago
    cutoff_time = time.time() - (days * 86400)
    
    # Get data grouped by IP address
    cursor.execute('''
    SELECT source_ip, domain, timestamp 
    FROM raw_network_data 
    WHERE timestamp > ?
    ORDER BY timestamp
    ''', (cutoff_time,))
    
    results = cursor.fetchall()
    conn.close()
    
    # Organize data by IP address
    ip_data = {}
    for source_ip, domain, timestamp in results:
        if source_ip not in ip_data:
            ip_data[source_ip] = []
        
        ip_data[source_ip].append({
            "domain": domain,
            "timestamp": timestamp
        })
    
    return ip_data

def get_domain_categories():
    """Get domain to category mappings"""
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    
    cursor.execute("SELECT domain, category FROM domains")
    results = cursor.fetchall()
    
    domain_categories = {}
    for domain, category in results:
        domain_categories[domain] = category
    
    conn.close()
    return domain_categories

def generate_user_profiles():
    """Generate behavioral profiles for each IP address"""
    ip_data = get_recent_data()
    domain_categories = get_domain_categories()
    
    profiles = {}
    
    for ip, visits in ip_data.items():
        # Skip IPs with very few visits
        if len(visits) < 10:
            continue
            
        # Count domain visits
        domain_counter = Counter([visit["domain"] for visit in visits])
        top_domains = domain_counter.most_common(10)
        
        # Count category visits
        categories = [domain_categories.get(visit["domain"], "unknown") for visit in visits]
        category_counter = Counter(categories)
        top_categories = category_counter.most_common()
        
        # Analyze time patterns
        timestamps = [visit["timestamp"] for visit in visits]
        times_of_day = []
        
        for ts in timestamps:
            dt = datetime.fromtimestamp(ts)
            hour = dt.hour
            
            if 5 <= hour < 12:
                times_of_day.append("morning")
            elif 12 <= hour < 17:
                times_of_day.append("afternoon")
            elif 17 <= hour < 22:
                times_of_day.append("evening")
            else:
                times_of_day.append("night")
        
        time_counter = Counter(times_of_day)
        primary_time = time_counter.most_common(1)[0][0]
        
        # Create profile
        profiles[ip] = {
            "top_domains": top_domains,
            "category_interests": dict(top_categories),
            "primary_time": primary_time,
            "activity_level": len(visits),
            "last_updated": time.time()
        }
    
    # Save profiles to file
    with open(os.path.expanduser("~/user_profiles.json"), "w") as f:
        json.dump(profiles, f, indent=2)
    
    return profiles

if __name__ == "__main__":
    profiles = generate_user_profiles()
    print(f"Generated {len(profiles)} user profiles")

Step 3.2: Create Interest Inference Engine

Build a simple engine to infer user interests based on browsing patterns:

pyinterest_inference.py
Copy code
# Save as ~/interest_inference.py
import json
import os
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans

def load_user_profiles():
    """Load user profiles from file"""
    profile_path = os.path.expanduser("~/user_profiles.json")
    
    if not os.path.exists(profile_path):
        return {}
    
    with open(profile_path, "r") as f:
        return json.load(f)

def extract_features(profiles):
    """Extract features from user profiles for clustering"""
    features = []
    ips = []
    
    for ip, profile in profiles.items():
        # Create a feature vector for each user
        # Here we just use category interests as features
        feature = []
        categories = ["shopping", "social", "tech", "news", "entertainment", "sports", "other"]
        
        for category in categories:
            feature.append(profile.get("category_interests", {}).get(category, 0))
        
        features.append(feature)
        ips.append(ip)
    
    return np.array(features), ips

def infer_interests():
    """Infer user interests based on profile clustering"""
    profiles = load_user_profiles()
    if not profiles:
        print("No profiles available")
        return {}
    
    # Extract features for clustering
    features, ips = extract_features(profiles)
    
    # Use KMeans to cluster users with similar interests
    n_clusters = min(5, len(features))
    kmeans = KMeans(n_clusters=n_clusters, random_state=42)
    clusters = kmeans.fit_predict(features)
    
    # Define interest categories based on cluster centers
    interest_categories = {
        "tech_enthusiasts": [0, 0, 1, 0.5, 0, 0, 0],  # Tech-focused
        "news_junkies": [0, 0.3, 0, 1, 0, 0.5, 0],    # News-focused
        "social_butterflies": [0.3, 1, 0, 0.3, 0.5, 0, 0],  # Social-focused
        "entertainment_lovers": [0.3, 0.5, 0, 0, 1, 0.3, 0],  # Entertainment-focused
        "shoppers": [1, 0.3, 0, 0, 0.3, 0, 0],  # Shopping-focused
        "sports_fans": [0, 0.3, 0, 0.5, 0.3, 1, 0]  # Sports-focused
    }
    
    # Assign interest categories to clusters based on similarity to prototypes
    cluster_interests = {}
    for i in range(n_clusters):
        center = kmeans.cluster_centers_[i]
        
        # Calculate similarity to each interest prototype
        similarities = {}
        for category, prototype in interest_categories.items():
            # Cosine similarity
            sim = np.dot(center, prototype) / (np.linalg.norm(center) * np.linalg.norm(prototype))
            similarities[category] = sim
        
        # Assign the most similar interest category
        best_category = max(similarities.items(), key=lambda x: x[1])[0]
        cluster_interests[i] = best_category
    
    # Assign interests to users
    user_interests = {}
    for i, ip in enumerate(ips):
        user_cluster = clusters[i]
        user_interests[ip] = {
            "primary_interest": cluster_interests[user_cluster],
            "confidence": 0.7,  # Simplified confidence score
            "secondary_interests": list(profiles[ip]["category_interests"].keys())[:3]
        }
    
    # Save interests to file
    with open(os.path.expanduser("~/user_interests.json"), "w") as f:
        json.dump(user_interests, f, indent=2)
    
    return user_interests

if __name__ == "__main__":
    interests = infer_interests()
    print(f"Inferred interests for {len(interests)} users")

Step 3.3: Set Up Automated Analysis

Create a script to run analysis tasks periodically:

# Save as ~/run_analysis.sh
#!/bin/bash

# Activate virtual environment
source ~/venv/bin/activate

# Generate user profiles
python ~/profile_generator.py

# Infer interests
python ~/interest_inference.py

# Log completion
echo "Analysis completed at $(date)" >> ~/analysis_log.txt

Make the script executable:

chmod +x ~/run_analysis.sh

Set up a cron job to run the analysis script every hour:

(crontab -l 2>/dev/null; echo "0 * * * * /home/pi/run_analysis.sh") | crontab -

Phase 4: Creating a Simple Ad Recommendation System

Step 4.1: Build Ad Inventory Database

Create a database of test ads for our experiment:

pycreate_ad_inventory.py
Copy code
# Save as ~/create_ad_inventory.py
import sqlite3
import json
import os

DB_PATH = os.path.expanduser("~/ad_system.db")

def initialize_database():
    """Create ad inventory database"""
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    
    # Create ad inventory table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS ad_inventory (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        ad_name TEXT,
        ad_content TEXT,
        target_interests TEXT,
        target_categories TEXT,
        priority INTEGER DEFAULT 1
    )
    ''')
    
    # Create ad serving history table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS ad_serving (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        ad_id INTEGER,
        user_ip TEXT,
        timestamp REAL,
        FOREIGN KEY (ad_id) REFERENCES ad_inventory (id)
    )
    ''')
    
    conn.commit()
    conn.close()

def create_test_ads():
    """Create test ads with different targeting parameters"""
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    
    # Clear existing data
    cursor.execute("DELETE FROM ad_inventory")
    
    # Test ads for different interest categories
    test_ads = [
        {
            "ad_name": "Tech Gadget Sale",
            "ad_content": "Check out the latest tech gadgets at 30% off!",
            "target_interests": json.dumps(["tech_enthusiasts"]),
            "target_categories": json.dumps(["tech"]),
            "priority": 2
        },
        {
            "ad_name": "News Subscription",
            "ad_content": "Stay informed with our premium news subscription",
            "target_interests": json.dumps(["news_junkies"]),
            "target_categories": json.dumps(["news"]),
            "priority": 1
        },
        {
            "ad_name": "Social Media Workshop",
            "ad_content": "Learn how to grow your social media presence",
            "target_interests": json.dumps(["social_butterflies"]),
            "target_categories": json.dumps(["social"]),
            "priority": 1
        },
        {
            "ad_name": "Entertainment Streaming",
            "ad_content": "Thousands of movies and shows available now",
            "target_interests": json.dumps(["entertainment_lovers"]),
            "target_categories": json.dumps(["entertainment"]),
            "priority": 2
        },
        {
            "ad_name": "Online Shopping Discount",
            "ad_content": "Get 20% off your first purchase",
            "target_interests": json.dumps(["shoppers"]),
            "target_categories": json.dumps(["shopping"]),
            "priority": 3
        },
        {
            "ad_name": "Sports Equipment Sale",
            "ad_content": "All sports gear 25% off this weekend only",
            "target_interests": json.dumps(["sports_fans"]),
            "target_categories": json.dumps(["sports"]),
            "priority": 2
        },
        {
            "ad_name": "Generic Ad",
            "ad_content": "Check out our amazing products and services",
            "target_interests": json.dumps([]),
            "target_categories": json.dumps([]),
            "priority": 1
        }
    ]
    
    # Insert test ads
    for ad in test_ads:
        cursor.execute(
            "INSERT INTO ad_inventory (ad_name, ad_content, target_interests, target_categories, priority) VALUES (?, ?, ?, ?, ?)",
            (ad["ad_name"], ad["ad_content"], ad["target_interests"], ad["target_categories"], ad["priority"])
        )
    
    conn.commit()
    conn.close()
    
    print("Created test ad inventory")

if __name__ == "__main__":
    initialize_database()
    create_test_ads()

Step 4.2: Create Ad Recommendation Engine

Build a simple engine to recommend ads based on user interests:

pyad_recommender.py
Copy code
# Save as ~/ad_recommender.py
import sqlite3
import json
import os
import random
import time

DB_PATH = os.path.expanduser("~/ad_system.db")

def load_user_interests():
    """Load inferred user interests"""
    interest_path = os.path.expanduser("~/user_interests.json")
    
    if not os.path.exists(interest_path):
        return {}
    
    with open(interest_path, "r") as f:
        return json.load(f)

def get_all_ads():
    """Get all ads from the inventory"""
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    
    cursor.execute("SELECT id, ad_name, ad_content, target_interests, target_categories, priority FROM ad_inventory")
    results = cursor.fetchall()
    
    ads = []
    for id, name, content, target_interests, target_categories, priority in results:
        ads.append({
            "id": id,
            "name": name,
            "content": content,
            "target_interests": json.loads(target_interests),
            "target_categories": json.loads(target_categories),
            "priority": priority
        })
    
    conn.close()
    return ads

def record_ad_serving(ad_id, user_ip):
    """Record that an ad was served to a user"""
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    
    cursor.execute(
        "INSERT INTO ad_serving (ad_id, user_ip, timestamp) VALUES (?, ?, ?)",
        (ad_id, user_ip, time.time())
    )
    
    conn.commit()
    conn.close()

def recommend_ad(user_ip, user_interests, all_ads):
    """Recommend an ad for a user based on their interests"""
    # Get user's primary interest
    if user_ip not in user_interests:
        # If we don't have interests for this user, return a generic ad
        generic_ads = [ad for ad in all_ads if not ad["target_interests"]]
        if generic_ads:
            selected_ad = random.choice(generic_ads)
        else:
            selected_ad = random.choice(all_ads)
        
        record_ad_serving(selected_ad["id"], user_ip)
        return selected_ad
    
    primary_interest = user_interests[user_ip].get("primary_interest")
    
    # Filter ads targeting this interest
    targeted_ads = [ad for ad in all_ads if not ad["target_interests"] or primary_interest in ad["target_interests"]]
    
    if not targeted_ads:
        # If no targeted ads found, return a generic ad
        generic_ads = [ad for ad in all_ads if not ad["target_interests"]]
        if generic_ads:
            selected_ad = random.choice(generic_ads)
        else:
            selected_ad = random.choice(all_ads)
    else:
        # Weight ads by priority
        weights = [ad["priority"] for ad in targeted_ads]
        selected_ad = random.choices(targeted_ads, weights=weights, k=1)[0]
    
    # Record this ad serving
    record_ad_serving(selected_ad["id"], user_ip)
    
    return selected_ad

def get_ad_for_user(user_ip):
    """Get a recommended ad for a specific user"""
    user_interests = load_user_interests()
    all_ads = get_all_ads()
    
    return recommend_ad(user_ip, user_interests, all_ads)

if __name__ == "__main__":
    # Test the recommender with a sample IP
    sample_ip = "192.168.1.101"
    recommended_ad = get_ad_for_user(sample_ip)
    print(f"Recommended ad for {sample_ip}: {recommended_ad['name']}")

Step 4.3: Create Simple Ad Serving API

Build a simple API to serve ads to clients:

pyad_server.py
Copy code
# Save as ~/ad_server.py
from flask import Flask, jsonify, request
import ad_recommender

app = Flask(__name__)

@app.route("/get_ad", methods=["GET"])
def get_ad():
    """Serve an ad based on the user's IP address"""
    user_ip = request.remote_addr
    
    # Get a recommended ad
    ad = ad_recommender.get_ad_for_user(user_ip)
    
    # Return ad as JSON
    return jsonify({
        "ad_name": ad["name"],
        "ad_content": ad["content"]
    })

@app.route("/health", methods=["GET"])
def health_check():
    """Simple health check endpoint"""
    return jsonify({"status": "healthy"})

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)

Step 4.4: Create Systemd Service for Ad Server

# Save as /etc/systemd/system/ad-server.service
[Unit]
Description=Ad Server API
After=network.target

[Service]
Type=simple
User=pi
WorkingDirectory=/home/pi
ExecStart=/home/pi/venv/bin/python /home/pi/ad_server.py
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target

Enable and start the service:

sudo systemctl enable ad-server
sudo systemctl start ad-server

Phase 5: Testing the Complete System and Analyzing Results

Step 5.1: Create a Test Client

Create a simple HTML page that will fetch ads from our server:

htmlindex.html
Copy code
<!DOCTYPE html>
<html>
<head>
    <title>Network Ad Test Client</title>
    <style>
        body {
            font-family: Arial, sans-serif;
            margin: 0;
            padding: 20px;
        }
        .ad-container {
            border: 1px solid #ddd;
            padding: 15px;
            margin: 20px 0;
            border-radius: 5px;
            background-color: #f9f9f9;
        }
        h1 {
            color: #333;
        }
        button {
            padding: 10px 15px;
            background-color: #4CAF50;
            color: white;
            border: none;
            border-radius: 4px;
            cursor: pointer;
        }
        button:hover {
            background-color: #45a049;
        }
    </style>
</head>
<body>
    <h1>Network Level Ad Targeting Test</h1>
    <p>This page demonstrates the ad targeting system by fetching ads from our custom server.</p>
    
    <button onclick="fetchAd()">Load Targeted Ad</button>
    
    <div class="ad-container" id="ad-display">
        <p>Click the button above to load an ad based on your browsing behavior.</p>
    </div>
    
    <script>
        function fetchAd() {
            fetch('http://192.168.1.100:5000/get_ad')
                .then(response => response.json())
                .then(data => {
                    document.getElementById('ad-display').innerHTML = `
                        <h3>${data.ad_name}</h3>
                        <p>${data.ad_content}</p>
                    `;
                })
                .catch(error => {
                    document.getElementById('ad-display').innerHTML = `
                        <p>Error fetching ad: ${error}</p>
                    `;
                });
        }
    </script>
</body>
</html>

Step 5.2: Develop a Testing Script

Create a Python script to automate the testing process:

pytesting_script.py
Copy code
# ~/testing_script.py
import requests
import json
import time
import random
import matplotlib.pyplot as plt
import numpy as np
from tabulate import tabulate

def simulate_browsing_pattern(pattern_type, router_api_url="http://192.168.1.1:8080/simulate"):
    """Simulate different browsing patterns through the router"""
    patterns = {
        "tech": ["techcrunch.com", "wired.com", "arstechnica.com", "theverge.com", "cnet.com"],
        "finance": ["bloomberg.com", "wsj.com", "cnbc.com", "ft.com", "forbes.com"],
        "sports": ["espn.com", "nba.com", "nfl.com", "fifa.com", "bleacherreport.com"],
        "travel": ["tripadvisor.com", "booking.com", "expedia.com", "airbnb.com", "lonelyplanet.com"]
    }
    
    # Select the appropriate sites based on pattern_type
    sites = patterns.get(pattern_type, patterns["tech"])
    
    # Simulate visiting 10 sites from the selected category
    for _ in range(10):
        site = random.choice(sites)
        print(f"Simulating visit to {site}")
        requests.post(router_api_url, json={"url": site})
        time.sleep(1)

def test_ad_system(num_tests=20):
    """Test the ad recommendation system with different browsing patterns"""
    print("Starting ad system test...")
    patterns = ["tech", "finance", "sports", "travel"]
    results = []
    
    for _ in range(num_tests):
        # Choose a random pattern
        pattern = random.choice(patterns)
        
        # Simulate browsing
        print(f"\nSimulating {pattern} browsing pattern...")
        simulate_browsing_pattern(pattern)
        
        # Wait for data processing
        time.sleep(3)
        
        # Request an ad
        print("Requesting ad...")
        response = requests.get("http://192.168.1.100:5000/get_ad")
        ad_data = response.json()
        
        # Record result
        results.append({
            "pattern": pattern,
            "ad_category": ad_data.get("category", "unknown"),
            "ad_name": ad_data.get("ad_name", "unknown"),
            "relevance_score": 1 if pattern == ad_data.get("category", "unknown") else 0
        })
        
        print(f"Received ad: {ad_data.get('ad_name')}, Category: {ad_data.get('category')}")
        time.sleep(2)
    
    return results

def analyze_results(results):
    """Analyze and visualize test results"""
    # Calculate accuracy
    accuracy = sum(r["relevance_score"] for r in results) / len(results)
    print(f"\nOverall accuracy: {accuracy:.2%}")
    
    # Group by pattern
    pattern_accuracy = {}
    pattern_counts = {}
    
    for r in results:
        pattern = r["pattern"]
        if pattern not in pattern_accuracy:
            pattern_accuracy[pattern] = 0
            pattern_counts[pattern] = 0
        
        pattern_accuracy[pattern] += r["relevance_score"]
        pattern_counts[pattern] += 1
    
    # Calculate per-pattern accuracy
    for pattern in pattern_accuracy:
        pattern_accuracy[pattern] /= pattern_counts[pattern]
    
    # Display results as table
    table_data = []
    for pattern, acc in pattern_accuracy.items():
        table_data.append([pattern, f"{acc:.2%}", pattern_counts[pattern]])
    
    print("\nAccuracy by browsing pattern:")
    print(tabulate(table_data, headers=["Pattern", "Accuracy", "Count"]))
    
    # Plot results
    plt.figure(figsize=(10, 6))
    patterns = list(pattern_accuracy.keys())
    accuracies = [pattern_accuracy[p] for p in patterns]
    
    plt.bar(patterns, accuracies)
    plt.ylim(0, 1.0)
    plt.title("Ad Targeting Accuracy by Browsing Pattern")
    plt.xlabel("Browsing Pattern")
    plt.ylabel("Accuracy")
    
    for i, v in enumerate(accuracies):
        plt.text(i, v + 0.05, f"{v:.2%}", ha='center')
    
    plt.savefig("accuracy_results.png")
    plt.close()
    
    print("\nResults chart saved as 'accuracy_results.png'")

if __name__ == "__main__":
    results = test_ad_system(20)
    analyze_results(results)

Step 5.3: Set Up Test Environment

  1. Deploy the complete system:
# Start the data collection system on Raspberry Pi
ssh pi@192.168.1.100 "cd ~/network_monitoring && python3 data_collector.py &"

# Start the ad recommendation server
ssh pi@192.168.1.100 "cd ~/network_monitoring && python3 ad_server.py &"
  1. Configure testing devices:
    • Set up 2-3 devices to connect through the router
    • Install the test client on each device

Step 5.4: Run Tests and Collect Data

  1. Execute the testing script:
python testing_script.py
  1. Manual testing:

    • Open the test client page on different devices
    • Browse specific categories of websites (tech, finance, sports, etc.)
    • Check if the ads displayed match the browsing behavior
  2. Collect the results:

    • System logs
    • Ad request/response data
    • Accuracy measurements

Step 5.5: Analyze Results

  1. Examine the quantitative results:

    • Overall targeting accuracy
    • Pattern-specific accuracy
    • System response time
    • False positive/negative rates
  2. Create visualizations:

    • Accuracy by browsing pattern
    • Response time distribution
    • User profile drift over time
  3. Identify system limitations:

    • Privacy implications
    • Technical constraints
    • Scalability issues

Conclusion

Our experiment demonstrated the feasibility of building a network-level behavioral tracking and ad targeting system using readily available hardware and open-source software. By configuring a router with custom firmware and implementing data collection and analysis tools on a Raspberry Pi, we were able to:

  1. Track browsing behavior: The system successfully captured and categorized web traffic patterns at the network level without requiring any client-side software.

  2. Build user profiles: Through our behavior analysis engine, we constructed profiles that accurately reflected users' interests based on their browsing habits.

  3. Serve targeted advertising: The recommendation system effectively matched advertisements to user profiles with an acceptable level of accuracy.

  4. Maintain user privacy: By keeping all data processing local to the network and implementing basic anonymization techniques, we maintained privacy while still enabling effective targeting.

This proof-of-concept shows that network-level advertising systems can be developed without the need for complex infrastructure or invasive tracking methods. However, several challenges remain:

  • Scalability: The current implementation is limited to small networks and would require significant optimization for larger deployments.
  • HTTPS traffic: Encrypted traffic presents challenges for content-based categorization, limiting some aspects of behavioral analysis.
  • Ethical considerations: Even with privacy protections in place, such systems raise important questions about consent and transparency.

Future work could focus on improving the machine learning algorithms for more accurate categorization, developing more sophisticated ad matching techniques, and implementing additional privacy safeguards. Overall, this experiment provides valuable insights into the technical aspects of network-based ad targeting and creates a foundation for further research in this area.