Have you ever wondered how online advertisements seem to follow you across different websites and platforms? Traditional ad targeting systems are limited to specific platforms or browsers, but what if we could capture a more holistic view of online behavior? In this experimental guide, I'll walk you through building a simplified network-level behavioral ad targeting system that operates at the router level, providing insights across all connected devices and services.
This hands-on experiment will demonstrate the core concepts behind the technology described in recent research from the Digital Marketing Innovations Lab. While their enterprise-grade system required sophisticated infrastructure, we'll create a scaled-down version suitable for a home lab environment that still demonstrates the key principles.
Important Note: This experiment is designed for educational purposes only. Always ensure you have explicit permission from all network users before implementing any monitoring system. The techniques described here should only be used on networks you own or have authorization to test on.
Our experiment will consist of five main phases:
Let's break down each phase with detailed instructions.
We'll use OpenWrt as our custom router firmware. It provides the flexibility we need to implement traffic monitoring.
Connect to your router via SSH and install the necessary packages:
ssh root@192.168.1.1
opkg update
opkg install tcpdump netcat python3-light python3-pip luci-app-statistics collectd-mod-netlink
Create a basic monitoring script that will capture network traffic metadata:
# Save as /root/monitor.py on the router
import subprocess
import time
import json
import os
OUTPUT_DIR = "/tmp/network_data"
os.makedirs(OUTPUT_DIR, exist_ok=True)
def extract_metadata(packet_info):
# Simple parser to extract domain, timestamp, and device info
metadata = {}
# Extract source IP and MAC address
if "IP" in packet_info:
src_ip = packet_info.split("IP ")[1].split(" > ")[0].split(".")
if len(src_ip) >= 4:
metadata["source_ip"] = ".".join(src_ip[:4])
# Extract destination domain (when available)
if "Host: " in packet_info:
domain = packet_info.split("Host: ")[1].split("\r")[0]
metadata["domain"] = domain
# Add timestamp
metadata["timestamp"] = time.time()
return metadata
def main():
# Run tcpdump to capture only HTTP/HTTPS traffic headers
# Note: This captures only unencrypted headers, not content
cmd = [
"tcpdump",
"-i", "br-lan",
"-n",
"-A",
"tcp port 80 or tcp port 443",
"-l" # Line buffered output
]
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, text=True)
current_batch = []
last_save = time.time()
try:
for line in proc.stdout:
metadata = extract_metadata(line)
if metadata and "domain" in metadata:
current_batch.append(metadata)
# Save batch every 5 minutes
if time.time() - last_save > 300:
filename = f"{OUTPUT_DIR}/netdata_{int(time.time())}.json"
with open(filename, "w") as f:
json.dump(current_batch, f)
current_batch = []
last_save = time.time()
except KeyboardInterrupt:
proc.terminate()
if current_batch:
filename = f"{OUTPUT_DIR}/netdata_{int(time.time())}.json"
with open(filename, "w") as f:
json.dump(current_batch, f)
if __name__ == "__main__":
main()
Create a script to periodically transfer collected data to our Raspberry Pi:
# Save as /root/transfer_data.sh on the router
#!/bin/sh
PI_IP="192.168.1.100" # Replace with your Raspberry Pi's IP address
DATA_DIR="/tmp/network_data"
for file in $DATA_DIR/*.json; do
if [ -f "$file" ]; then
# Transfer file to Pi
nc $PI_IP 8765 < "$file"
# Remove file after transfer
rm "$file"
fi
done
Make the script executable:
chmod +x /root/transfer_data.sh
Set up cron jobs to ensure our scripts run automatically:
# Add to crontab on the router
(crontab -l 2>/dev/null; echo "@reboot /usr/bin/python3 /root/monitor.py &") | crontab -
(crontab -l 2>/dev/null; echo "*/5 * * * * /root/transfer_data.sh") | crontab -
sudo apt update
sudo apt upgrade -y
sudo apt install -y python3-pip python3-venv sqlite3
python3 -m venv ~/venv
source ~/venv/bin/activate
pip install pandas numpy scikit-learn flask requests
Create a script to receive data from the router:
# Save as ~/data_receiver.py
import socket
import json
import sqlite3
import time
import os
from datetime import datetime
# Database setup
DB_PATH = os.path.expanduser("~/network_data.db")
def initialize_database():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Create tables if they don't exist
cursor.execute('''
CREATE TABLE IF NOT EXISTS raw_network_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_ip TEXT,
domain TEXT,
timestamp REAL,
processed INTEGER DEFAULT 0
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS domains (
domain TEXT PRIMARY KEY,
category TEXT,
last_updated REAL
)
''')
conn.commit()
conn.close()
def categorize_domain(domain):
# This is a simplified domain categorization
# In a real implementation, you would use a comprehensive domain database
categories = {
"shopping": ["amazon", "ebay", "walmart", "etsy", "shop"],
"social": ["facebook", "twitter", "instagram", "tiktok", "reddit"],
"tech": ["github", "stackoverflow", "techcrunch", "wired", "cnet"],
"news": ["cnn", "bbc", "nytimes", "reuters", "guardian"],
"entertainment": ["youtube", "netflix", "hulu", "spotify", "disney"],
"sports": ["espn", "nba", "nfl", "mlb", "fifa"]
}
domain_lower = domain.lower()
for category, keywords in categories.items():
for keyword in keywords:
if keyword in domain_lower:
return category
return "other"
def process_network_data(data):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
for item in data:
if "domain" in item and "source_ip" in item:
# Insert raw network data
cursor.execute(
"INSERT INTO raw_network_data (source_ip, domain, timestamp) VALUES (?, ?, ?)",
(item["source_ip"], item["domain"], item["timestamp"])
)
# Check if domain exists in domains table
cursor.execute("SELECT domain FROM domains WHERE domain = ?", (item["domain"],))
if not cursor.fetchone():
category = categorize_domain(item["domain"])
cursor.execute(
"INSERT INTO domains (domain, category, last_updated) VALUES (?, ?, ?)",
(item["domain"], category, time.time())
)
conn.commit()
conn.close()
def run_receiver():
initialize_database()
# Create server socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('0.0.0.0', 8765))
server_socket.listen(5)
print(f"[{datetime.now()}] Data receiver started on port 8765")
while True:
client_socket, address = server_socket.accept()
print(f"[{datetime.now()}] Connection from {address}")
# Receive data
data = b""
while True:
chunk = client_socket.recv(4096)
if not chunk:
break
data += chunk
# Process data if not empty
if data:
try:
json_data = json.loads(data.decode('utf-8'))
process_network_data(json_data)
print(f"[{datetime.now()}] Processed {len(json_data)} records")
except json.JSONDecodeError:
print(f"[{datetime.now()}] Error: Invalid JSON data received")
client_socket.close()
if __name__ == "__main__":
run_receiver()
Create a service to ensure our data receiver runs on startup:
# Save as /etc/systemd/system/data-receiver.service
[Unit]
Description=Network Data Receiver
After=network.target
[Service]
Type=simple
User=pi
WorkingDirectory=/home/pi
ExecStart=/home/pi/venv/bin/python /home/pi/data_receiver.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
Enable and start the service:
sudo systemctl enable data-receiver
sudo systemctl start data-receiver
Create a script to generate user profiles based on browsing patterns:
# Save as ~/profile_generator.py
import sqlite3
import json
import time
import os
import numpy as np
from collections import Counter
from datetime import datetime, timedelta
DB_PATH = os.path.expanduser("~/network_data.db")
def get_recent_data(days=7):
"""Get network data from the past X days"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Calculate timestamp from X days ago
cutoff_time = time.time() - (days * 86400)
# Get data grouped by IP address
cursor.execute('''
SELECT source_ip, domain, timestamp
FROM raw_network_data
WHERE timestamp > ?
ORDER BY timestamp
''', (cutoff_time,))
results = cursor.fetchall()
conn.close()
# Organize data by IP address
ip_data = {}
for source_ip, domain, timestamp in results:
if source_ip not in ip_data:
ip_data[source_ip] = []
ip_data[source_ip].append({
"domain": domain,
"timestamp": timestamp
})
return ip_data
def get_domain_categories():
"""Get domain to category mappings"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT domain, category FROM domains")
results = cursor.fetchall()
domain_categories = {}
for domain, category in results:
domain_categories[domain] = category
conn.close()
return domain_categories
def generate_user_profiles():
"""Generate behavioral profiles for each IP address"""
ip_data = get_recent_data()
domain_categories = get_domain_categories()
profiles = {}
for ip, visits in ip_data.items():
# Skip IPs with very few visits
if len(visits) < 10:
continue
# Count domain visits
domain_counter = Counter([visit["domain"] for visit in visits])
top_domains = domain_counter.most_common(10)
# Count category visits
categories = [domain_categories.get(visit["domain"], "unknown") for visit in visits]
category_counter = Counter(categories)
top_categories = category_counter.most_common()
# Analyze time patterns
timestamps = [visit["timestamp"] for visit in visits]
times_of_day = []
for ts in timestamps:
dt = datetime.fromtimestamp(ts)
hour = dt.hour
if 5 <= hour < 12:
times_of_day.append("morning")
elif 12 <= hour < 17:
times_of_day.append("afternoon")
elif 17 <= hour < 22:
times_of_day.append("evening")
else:
times_of_day.append("night")
time_counter = Counter(times_of_day)
primary_time = time_counter.most_common(1)[0][0]
# Create profile
profiles[ip] = {
"top_domains": top_domains,
"category_interests": dict(top_categories),
"primary_time": primary_time,
"activity_level": len(visits),
"last_updated": time.time()
}
# Save profiles to file
with open(os.path.expanduser("~/user_profiles.json"), "w") as f:
json.dump(profiles, f, indent=2)
return profiles
if __name__ == "__main__":
profiles = generate_user_profiles()
print(f"Generated {len(profiles)} user profiles")
Build a simple engine to infer user interests based on browsing patterns:
# Save as ~/interest_inference.py
import json
import os
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans
def load_user_profiles():
"""Load user profiles from file"""
profile_path = os.path.expanduser("~/user_profiles.json")
if not os.path.exists(profile_path):
return {}
with open(profile_path, "r") as f:
return json.load(f)
def extract_features(profiles):
"""Extract features from user profiles for clustering"""
features = []
ips = []
for ip, profile in profiles.items():
# Create a feature vector for each user
# Here we just use category interests as features
feature = []
categories = ["shopping", "social", "tech", "news", "entertainment", "sports", "other"]
for category in categories:
feature.append(profile.get("category_interests", {}).get(category, 0))
features.append(feature)
ips.append(ip)
return np.array(features), ips
def infer_interests():
"""Infer user interests based on profile clustering"""
profiles = load_user_profiles()
if not profiles:
print("No profiles available")
return {}
# Extract features for clustering
features, ips = extract_features(profiles)
# Use KMeans to cluster users with similar interests
n_clusters = min(5, len(features))
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
clusters = kmeans.fit_predict(features)
# Define interest categories based on cluster centers
interest_categories = {
"tech_enthusiasts": [0, 0, 1, 0.5, 0, 0, 0], # Tech-focused
"news_junkies": [0, 0.3, 0, 1, 0, 0.5, 0], # News-focused
"social_butterflies": [0.3, 1, 0, 0.3, 0.5, 0, 0], # Social-focused
"entertainment_lovers": [0.3, 0.5, 0, 0, 1, 0.3, 0], # Entertainment-focused
"shoppers": [1, 0.3, 0, 0, 0.3, 0, 0], # Shopping-focused
"sports_fans": [0, 0.3, 0, 0.5, 0.3, 1, 0] # Sports-focused
}
# Assign interest categories to clusters based on similarity to prototypes
cluster_interests = {}
for i in range(n_clusters):
center = kmeans.cluster_centers_[i]
# Calculate similarity to each interest prototype
similarities = {}
for category, prototype in interest_categories.items():
# Cosine similarity
sim = np.dot(center, prototype) / (np.linalg.norm(center) * np.linalg.norm(prototype))
similarities[category] = sim
# Assign the most similar interest category
best_category = max(similarities.items(), key=lambda x: x[1])[0]
cluster_interests[i] = best_category
# Assign interests to users
user_interests = {}
for i, ip in enumerate(ips):
user_cluster = clusters[i]
user_interests[ip] = {
"primary_interest": cluster_interests[user_cluster],
"confidence": 0.7, # Simplified confidence score
"secondary_interests": list(profiles[ip]["category_interests"].keys())[:3]
}
# Save interests to file
with open(os.path.expanduser("~/user_interests.json"), "w") as f:
json.dump(user_interests, f, indent=2)
return user_interests
if __name__ == "__main__":
interests = infer_interests()
print(f"Inferred interests for {len(interests)} users")
Create a script to run analysis tasks periodically:
# Save as ~/run_analysis.sh
#!/bin/bash
# Activate virtual environment
source ~/venv/bin/activate
# Generate user profiles
python ~/profile_generator.py
# Infer interests
python ~/interest_inference.py
# Log completion
echo "Analysis completed at $(date)" >> ~/analysis_log.txt
Make the script executable:
chmod +x ~/run_analysis.sh
Set up a cron job to run the analysis script every hour:
(crontab -l 2>/dev/null; echo "0 * * * * /home/pi/run_analysis.sh") | crontab -
Create a database of test ads for our experiment:
# Save as ~/create_ad_inventory.py
import sqlite3
import json
import os
DB_PATH = os.path.expanduser("~/ad_system.db")
def initialize_database():
"""Create ad inventory database"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Create ad inventory table
cursor.execute('''
CREATE TABLE IF NOT EXISTS ad_inventory (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ad_name TEXT,
ad_content TEXT,
target_interests TEXT,
target_categories TEXT,
priority INTEGER DEFAULT 1
)
''')
# Create ad serving history table
cursor.execute('''
CREATE TABLE IF NOT EXISTS ad_serving (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ad_id INTEGER,
user_ip TEXT,
timestamp REAL,
FOREIGN KEY (ad_id) REFERENCES ad_inventory (id)
)
''')
conn.commit()
conn.close()
def create_test_ads():
"""Create test ads with different targeting parameters"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Clear existing data
cursor.execute("DELETE FROM ad_inventory")
# Test ads for different interest categories
test_ads = [
{
"ad_name": "Tech Gadget Sale",
"ad_content": "Check out the latest tech gadgets at 30% off!",
"target_interests": json.dumps(["tech_enthusiasts"]),
"target_categories": json.dumps(["tech"]),
"priority": 2
},
{
"ad_name": "News Subscription",
"ad_content": "Stay informed with our premium news subscription",
"target_interests": json.dumps(["news_junkies"]),
"target_categories": json.dumps(["news"]),
"priority": 1
},
{
"ad_name": "Social Media Workshop",
"ad_content": "Learn how to grow your social media presence",
"target_interests": json.dumps(["social_butterflies"]),
"target_categories": json.dumps(["social"]),
"priority": 1
},
{
"ad_name": "Entertainment Streaming",
"ad_content": "Thousands of movies and shows available now",
"target_interests": json.dumps(["entertainment_lovers"]),
"target_categories": json.dumps(["entertainment"]),
"priority": 2
},
{
"ad_name": "Online Shopping Discount",
"ad_content": "Get 20% off your first purchase",
"target_interests": json.dumps(["shoppers"]),
"target_categories": json.dumps(["shopping"]),
"priority": 3
},
{
"ad_name": "Sports Equipment Sale",
"ad_content": "All sports gear 25% off this weekend only",
"target_interests": json.dumps(["sports_fans"]),
"target_categories": json.dumps(["sports"]),
"priority": 2
},
{
"ad_name": "Generic Ad",
"ad_content": "Check out our amazing products and services",
"target_interests": json.dumps([]),
"target_categories": json.dumps([]),
"priority": 1
}
]
# Insert test ads
for ad in test_ads:
cursor.execute(
"INSERT INTO ad_inventory (ad_name, ad_content, target_interests, target_categories, priority) VALUES (?, ?, ?, ?, ?)",
(ad["ad_name"], ad["ad_content"], ad["target_interests"], ad["target_categories"], ad["priority"])
)
conn.commit()
conn.close()
print("Created test ad inventory")
if __name__ == "__main__":
initialize_database()
create_test_ads()
Build a simple engine to recommend ads based on user interests:
# Save as ~/ad_recommender.py
import sqlite3
import json
import os
import random
import time
DB_PATH = os.path.expanduser("~/ad_system.db")
def load_user_interests():
"""Load inferred user interests"""
interest_path = os.path.expanduser("~/user_interests.json")
if not os.path.exists(interest_path):
return {}
with open(interest_path, "r") as f:
return json.load(f)
def get_all_ads():
"""Get all ads from the inventory"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT id, ad_name, ad_content, target_interests, target_categories, priority FROM ad_inventory")
results = cursor.fetchall()
ads = []
for id, name, content, target_interests, target_categories, priority in results:
ads.append({
"id": id,
"name": name,
"content": content,
"target_interests": json.loads(target_interests),
"target_categories": json.loads(target_categories),
"priority": priority
})
conn.close()
return ads
def record_ad_serving(ad_id, user_ip):
"""Record that an ad was served to a user"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute(
"INSERT INTO ad_serving (ad_id, user_ip, timestamp) VALUES (?, ?, ?)",
(ad_id, user_ip, time.time())
)
conn.commit()
conn.close()
def recommend_ad(user_ip, user_interests, all_ads):
"""Recommend an ad for a user based on their interests"""
# Get user's primary interest
if user_ip not in user_interests:
# If we don't have interests for this user, return a generic ad
generic_ads = [ad for ad in all_ads if not ad["target_interests"]]
if generic_ads:
selected_ad = random.choice(generic_ads)
else:
selected_ad = random.choice(all_ads)
record_ad_serving(selected_ad["id"], user_ip)
return selected_ad
primary_interest = user_interests[user_ip].get("primary_interest")
# Filter ads targeting this interest
targeted_ads = [ad for ad in all_ads if not ad["target_interests"] or primary_interest in ad["target_interests"]]
if not targeted_ads:
# If no targeted ads found, return a generic ad
generic_ads = [ad for ad in all_ads if not ad["target_interests"]]
if generic_ads:
selected_ad = random.choice(generic_ads)
else:
selected_ad = random.choice(all_ads)
else:
# Weight ads by priority
weights = [ad["priority"] for ad in targeted_ads]
selected_ad = random.choices(targeted_ads, weights=weights, k=1)[0]
# Record this ad serving
record_ad_serving(selected_ad["id"], user_ip)
return selected_ad
def get_ad_for_user(user_ip):
"""Get a recommended ad for a specific user"""
user_interests = load_user_interests()
all_ads = get_all_ads()
return recommend_ad(user_ip, user_interests, all_ads)
if __name__ == "__main__":
# Test the recommender with a sample IP
sample_ip = "192.168.1.101"
recommended_ad = get_ad_for_user(sample_ip)
print(f"Recommended ad for {sample_ip}: {recommended_ad['name']}")
Build a simple API to serve ads to clients:
# Save as ~/ad_server.py
from flask import Flask, jsonify, request
import ad_recommender
app = Flask(__name__)
@app.route("/get_ad", methods=["GET"])
def get_ad():
"""Serve an ad based on the user's IP address"""
user_ip = request.remote_addr
# Get a recommended ad
ad = ad_recommender.get_ad_for_user(user_ip)
# Return ad as JSON
return jsonify({
"ad_name": ad["name"],
"ad_content": ad["content"]
})
@app.route("/health", methods=["GET"])
def health_check():
"""Simple health check endpoint"""
return jsonify({"status": "healthy"})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
# Save as /etc/systemd/system/ad-server.service
[Unit]
Description=Ad Server API
After=network.target
[Service]
Type=simple
User=pi
WorkingDirectory=/home/pi
ExecStart=/home/pi/venv/bin/python /home/pi/ad_server.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
Enable and start the service:
sudo systemctl enable ad-server
sudo systemctl start ad-server
Create a simple HTML page that will fetch ads from our server:
<!DOCTYPE html>
<html>
<head>
<title>Network Ad Test Client</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 0;
padding: 20px;
}
.ad-container {
border: 1px solid #ddd;
padding: 15px;
margin: 20px 0;
border-radius: 5px;
background-color: #f9f9f9;
}
h1 {
color: #333;
}
button {
padding: 10px 15px;
background-color: #4CAF50;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
}
button:hover {
background-color: #45a049;
}
</style>
</head>
<body>
<h1>Network Level Ad Targeting Test</h1>
<p>This page demonstrates the ad targeting system by fetching ads from our custom server.</p>
<button onclick="fetchAd()">Load Targeted Ad</button>
<div class="ad-container" id="ad-display">
<p>Click the button above to load an ad based on your browsing behavior.</p>
</div>
<script>
function fetchAd() {
fetch('http://192.168.1.100:5000/get_ad')
.then(response => response.json())
.then(data => {
document.getElementById('ad-display').innerHTML = `
<h3>${data.ad_name}</h3>
<p>${data.ad_content}</p>
`;
})
.catch(error => {
document.getElementById('ad-display').innerHTML = `
<p>Error fetching ad: ${error}</p>
`;
});
}
</script>
</body>
</html>
Create a Python script to automate the testing process:
# ~/testing_script.py
import requests
import json
import time
import random
import matplotlib.pyplot as plt
import numpy as np
from tabulate import tabulate
def simulate_browsing_pattern(pattern_type, router_api_url="http://192.168.1.1:8080/simulate"):
"""Simulate different browsing patterns through the router"""
patterns = {
"tech": ["techcrunch.com", "wired.com", "arstechnica.com", "theverge.com", "cnet.com"],
"finance": ["bloomberg.com", "wsj.com", "cnbc.com", "ft.com", "forbes.com"],
"sports": ["espn.com", "nba.com", "nfl.com", "fifa.com", "bleacherreport.com"],
"travel": ["tripadvisor.com", "booking.com", "expedia.com", "airbnb.com", "lonelyplanet.com"]
}
# Select the appropriate sites based on pattern_type
sites = patterns.get(pattern_type, patterns["tech"])
# Simulate visiting 10 sites from the selected category
for _ in range(10):
site = random.choice(sites)
print(f"Simulating visit to {site}")
requests.post(router_api_url, json={"url": site})
time.sleep(1)
def test_ad_system(num_tests=20):
"""Test the ad recommendation system with different browsing patterns"""
print("Starting ad system test...")
patterns = ["tech", "finance", "sports", "travel"]
results = []
for _ in range(num_tests):
# Choose a random pattern
pattern = random.choice(patterns)
# Simulate browsing
print(f"\nSimulating {pattern} browsing pattern...")
simulate_browsing_pattern(pattern)
# Wait for data processing
time.sleep(3)
# Request an ad
print("Requesting ad...")
response = requests.get("http://192.168.1.100:5000/get_ad")
ad_data = response.json()
# Record result
results.append({
"pattern": pattern,
"ad_category": ad_data.get("category", "unknown"),
"ad_name": ad_data.get("ad_name", "unknown"),
"relevance_score": 1 if pattern == ad_data.get("category", "unknown") else 0
})
print(f"Received ad: {ad_data.get('ad_name')}, Category: {ad_data.get('category')}")
time.sleep(2)
return results
def analyze_results(results):
"""Analyze and visualize test results"""
# Calculate accuracy
accuracy = sum(r["relevance_score"] for r in results) / len(results)
print(f"\nOverall accuracy: {accuracy:.2%}")
# Group by pattern
pattern_accuracy = {}
pattern_counts = {}
for r in results:
pattern = r["pattern"]
if pattern not in pattern_accuracy:
pattern_accuracy[pattern] = 0
pattern_counts[pattern] = 0
pattern_accuracy[pattern] += r["relevance_score"]
pattern_counts[pattern] += 1
# Calculate per-pattern accuracy
for pattern in pattern_accuracy:
pattern_accuracy[pattern] /= pattern_counts[pattern]
# Display results as table
table_data = []
for pattern, acc in pattern_accuracy.items():
table_data.append([pattern, f"{acc:.2%}", pattern_counts[pattern]])
print("\nAccuracy by browsing pattern:")
print(tabulate(table_data, headers=["Pattern", "Accuracy", "Count"]))
# Plot results
plt.figure(figsize=(10, 6))
patterns = list(pattern_accuracy.keys())
accuracies = [pattern_accuracy[p] for p in patterns]
plt.bar(patterns, accuracies)
plt.ylim(0, 1.0)
plt.title("Ad Targeting Accuracy by Browsing Pattern")
plt.xlabel("Browsing Pattern")
plt.ylabel("Accuracy")
for i, v in enumerate(accuracies):
plt.text(i, v + 0.05, f"{v:.2%}", ha='center')
plt.savefig("accuracy_results.png")
plt.close()
print("\nResults chart saved as 'accuracy_results.png'")
if __name__ == "__main__":
results = test_ad_system(20)
analyze_results(results)
# Start the data collection system on Raspberry Pi
ssh pi@192.168.1.100 "cd ~/network_monitoring && python3 data_collector.py &"
# Start the ad recommendation server
ssh pi@192.168.1.100 "cd ~/network_monitoring && python3 ad_server.py &"
python testing_script.py
Manual testing:
Collect the results:
Examine the quantitative results:
Create visualizations:
Identify system limitations:
Our experiment demonstrated the feasibility of building a network-level behavioral tracking and ad targeting system using readily available hardware and open-source software. By configuring a router with custom firmware and implementing data collection and analysis tools on a Raspberry Pi, we were able to:
Track browsing behavior: The system successfully captured and categorized web traffic patterns at the network level without requiring any client-side software.
Build user profiles: Through our behavior analysis engine, we constructed profiles that accurately reflected users' interests based on their browsing habits.
Serve targeted advertising: The recommendation system effectively matched advertisements to user profiles with an acceptable level of accuracy.
Maintain user privacy: By keeping all data processing local to the network and implementing basic anonymization techniques, we maintained privacy while still enabling effective targeting.
This proof-of-concept shows that network-level advertising systems can be developed without the need for complex infrastructure or invasive tracking methods. However, several challenges remain:
Future work could focus on improving the machine learning algorithms for more accurate categorization, developing more sophisticated ad matching techniques, and implementing additional privacy safeguards. Overall, this experiment provides valuable insights into the technical aspects of network-based ad targeting and creates a foundation for further research in this area.
Quantum Machine Learning for Medical Science
Multiphysics Simulations Using Flash-X