In this tutorial, we explore how federated learning behaves when the traditional centralized aggregation server is removed and replaced with a fully decentralized, peer-to-peer gossip mechanism. We implement both centralized FedAvg and decentralized Gossip Federated Learning from scratch and introduce client-side differential privacy by injecting calibrated noise into local model updates. By running controlled experiments on non-IID MNIST data, we examine how privacy strength, as measured by different epsilon values, directly affects convergence speed, stability, and final model accuracy. Also, we study the practical trade-offs between privacy guarantees and learning efficiency in real-world decentralized learning systems. Check out the Full Codes here.
import os, math, random, time
from dataclasses import dataclass
from typing import Dict, List, Tuple
import subprocess, sys
def pip_install(pkgs):
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q"] + pkgs)
pip_install(["torch", "torchvision", "numpy", "matplotlib", "networkx", "tqdm"])
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Subset
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import networkx as nx
from tqdm import trange
SEED = 7
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = False
torch.backends.cudnn.benchmark = True
transform = transforms.Compose([transforms.ToTensor()])
train_ds = datasets.MNIST(root="/content/data", train=True, download=True, transform=transform)
test_ds = datasets.MNIST(root="/content/data", train=False, download=True, transform=transform)
We set up the execution environment and installed all required dependencies. We initialize random seeds and device settings to maintain reproducibility across experiments. We also load the MNIST dataset, which serves as a lightweight yet effective benchmark for federated learning experiments. Check out the Full Codes here.
def make_noniid_clients(dataset, num_clients=20, shards_per_client=2, seed=SEED):
rng = np.random.default_rng(seed)
y = np.array([dataset[i][1] for i in range(len(dataset))])
idx = np.arange(len(dataset))
idx_sorted = idx[np.argsort(y)]
num_shards = num_clients * shards_per_client
shard_size = len(dataset) // num_shards
shards = [idx_sorted[i*shard_size:(i+1)*shard_size] for i in range(num_shards)]
rng.shuffle(shards)
client_indices = []
for c in range(num_clients):
take = shards[c*shards_per_client:(c+1)*shards_per_client]
client_indices.append(np.concatenate(take))
return client_indices
NUM_CLIENTS = 20
client_indices = make_noniid_clients(train_ds, num_clients=NUM_CLIENTS, shards_per_client=2)
test_loader = DataLoader(test_ds, batch_size=1024, shuffle=False, num_workers=2, pin_memory=True)
class MLP(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(28*28, 256)
self.fc2 = nn.Linear(256, 128)
self.fc3 = nn.Linear(128, 10)
def forward(self, x):
x = x.view(x.size(0), -1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
return self.fc3(x)
We construct a non-IID data distribution by partitioning the training dataset into label-based shards across multiple clients. We define a compact neural network model that balances expressiveness and computational efficiency. It enables us to realistically simulate data heterogeneity, a critical challenge in federated learning systems. Check out the Full Codes here.
def get_model_params(model):
return {k: v.detach().clone() for k, v in model.state_dict().items()}
def set_model_params(model, params):
model.load_state_dict(params, strict=True)
def add_params(a, b):
return {k: a[k] + b[k] for k in a.keys()}
def sub_params(a, b):
return {k: a[k] - b[k] for k in a.keys()}
def scale_params(a, s):
return {k: a[k] * s for k in a.keys()}
def mean_params(params_list):
out = {k: torch.zeros_like(params_list[0][k]) for k in params_list[0].keys()}
for p in params_list:
for k in out.keys():
out[k] += p[k]
for k in out.keys():
out[k] /= len(params_list)
return out
def l2_norm_params(delta):
sq = 0.0
for v in delta.values():
sq += float(torch.sum(v.float() * v.float()).item())
return math.sqrt(sq)
def dp_sanitize_update(delta, clip_norm, epsilon, delta_dp, rng):
norm = l2_norm_params(delta)
scale = min(1.0, clip_norm / (norm + 1e-12))
clipped = scale_params(delta, scale)
if epsilon is None or math.isinf(epsilon) or epsilon <= 0:
return clipped
sigma = clip_norm * math.sqrt(2.0 * math.log(1.25 / delta_dp)) / epsilon
noised = {}
for k, v in clipped.items():
noise = torch.normal(mean=0.0, std=sigma, size=v.shape, generator=rng, device=v.device, dtype=v.dtype)
noised[k] = v + noise
return noised
We implement parameter manipulation utilities that enable addition, subtraction, scaling, and averaging of model weights across clients. We introduce differential privacy by clipping local updates and injecting Gaussian noise, both determined by the chosen privacy budget. It serves as the core privacy mechanism that enables us to study the privacy–utility trade-off in both centralized and decentralized settings. Check out the Full Codes here.
def local_train_one_client(base_params, client_id, epochs, lr, batch_size, weight_decay=0.0):
model = MLP().to(device)
set_model_params(model, base_params)
model.train()
loader = DataLoader(
Subset(train_ds, client_indices[client_id].tolist() if hasattr(client_indices[client_id], "tolist") else client_indices[client_id]),
batch_size=batch_size,
shuffle=True,
num_workers=2,
pin_memory=True
)
opt = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=weight_decay)
for _ in range(epochs):
for xb, yb in loader:
xb, yb = xb.to(device), yb.to(device)
opt.zero_grad(set_to_none=True)
logits = model(xb)
loss = F.cross_entropy(logits, yb)
loss.backward()
opt.step()
return get_model_params(model)
@torch.no_grad()
def evaluate(params):
model = MLP().to(device)
set_model_params(model, params)
model.eval()
total, correct = 0, 0
loss_sum = 0.0
for xb, yb in test_loader:
xb, yb = xb.to(device), yb.to(device)
logits = model(xb)
loss = F.cross_entropy(logits, yb, reduction="sum")
loss_sum += float(loss.item())
pred = torch.argmax(logits, dim=1)
correct += int((pred == yb).sum().item())
total += int(yb.numel())
return loss_sum / total, correct / total
We define the local training loop that each client executes independently on its private data. We also implement a unified evaluation routine to measure test loss and accuracy for any given model state. Together, these functions simulate realistic federated learning behavior where training and evaluation are fully decoupled from data ownership. Check out the Full Codes here.
@dataclass
class FedAvgConfig:
rounds: int = 25
clients_per_round: int = 10
local_epochs: int = 1
lr: float = 0.06
batch_size: int = 64
clip_norm: float = 2.0
epsilon: float = math.inf
delta_dp: float = 1e-5
def run_fedavg(cfg):
global_params = get_model_params(MLP().to(device))
history = {"test_loss": [], "test_acc": []}
for r in trange(cfg.rounds):
chosen = random.sample(range(NUM_CLIENTS), k=cfg.clients_per_round)
start_params = global_params
updates = []
for cid in chosen:
local_params = local_train_one_client(start_params, cid, cfg.local_epochs, cfg.lr, cfg.batch_size)
delta = sub_params(local_params, start_params)
rng = torch.Generator(device=device)
rng.manual_seed(SEED * 10000 + r * 100 + cid)
delta_dp = dp_sanitize_update(delta, cfg.clip_norm, cfg.epsilon, cfg.delta_dp, rng)
updates.append(delta_dp)
avg_update = mean_params(updates)
global_params = add_params(start_params, avg_update)
tl, ta = evaluate(global_params)
history["test_loss"].append(tl)
history["test_acc"].append(ta)
return history, global_params
We implement the centralized FedAvg algorithm, where a subset of clients trains locally and sends differentially private updates to a central aggregator. We track model performance across communication rounds to observe convergence behavior under varying privacy budgets. This serves as the baseline against which decentralized gossip-based learning is compared. Check out the Full Codes here.
@dataclass
class GossipConfig:
rounds: int = 25
local_epochs: int = 1
lr: float = 0.06
batch_size: int = 64
clip_norm: float = 2.0
epsilon: float = math.inf
delta_dp: float = 1e-5
topology: str = "ring"
p: float = 0.2
gossip_pairs_per_round: int = 10
def build_topology(cfg):
if cfg.topology == "ring":
G = nx.cycle_graph(NUM_CLIENTS)
elif cfg.topology == "erdos_renyi":
G = nx.erdos_renyi_graph(NUM_CLIENTS, cfg.p, seed=SEED)
if not nx.is_connected(G):
comps = list(nx.connected_components(G))
for i in range(len(comps) - 1):
a = next(iter(comps[i]))
b = next(iter(comps[i+1]))
G.add_edge(a, b)
else:
raise ValueError
return G
def run_gossip(cfg):
node_params = [get_model_params(MLP().to(device)) for _ in range(NUM_CLIENTS)]
G = build_topology(cfg)
history = {"avg_test_loss": [], "avg_test_acc": []}
for r in trange(cfg.rounds):
new_params = []
for cid in range(NUM_CLIENTS):
p0 = node_params[cid]
p_local = local_train_one_client(p0, cid, cfg.local_epochs, cfg.lr, cfg.batch_size)
delta = sub_params(p_local, p0)
rng = torch.Generator(device=device)
rng.manual_seed(SEED * 10000 + r * 100 + cid)
delta_dp = dp_sanitize_update(delta, cfg.clip_norm, cfg.epsilon, cfg.delta_dp, rng)
p_local_dp = add_params(p0, delta_dp)
new_params.append(p_local_dp)
node_params = new_params
edges = list(G.edges())
for _ in range(cfg.gossip_pairs_per_round):
i, j = random.choice(edges)
avg = mean_params([node_params[i], node_params[j]])
node_params[i] = avg
node_params[j] = avg
losses, accs = [], []
for cid in range(NUM_CLIENTS):
tl, ta = evaluate(node_params[cid])
losses.append(tl)
accs.append(ta)
history["avg_test_loss"].append(float(np.mean(losses)))
history["avg_test_acc"].append(float(np.mean(accs)))
return history, node_params
We implement decentralized Gossip Federated Learning using a peer-to-peer model that exchanges over a predefined network topology. We simulate repeated local training and pairwise parameter averaging without relying on a central server. It allows us to analyze how privacy noise propagates through decentralized communication patterns and affects convergence. Check out the Full Codes here.
eps_sweep = [math.inf, 8.0, 4.0, 2.0, 1.0]
ROUNDS = 20
fedavg_results = {}
gossip_results = {}
common_local_epochs = 1
common_lr = 0.06
common_bs = 64
common_clip = 2.0
common_delta = 1e-5
for eps in eps_sweep:
fcfg = FedAvgConfig(
rounds=ROUNDS,
clients_per_round=10,
local_epochs=common_local_epochs,
lr=common_lr,
batch_size=common_bs,
clip_norm=common_clip,
epsilon=eps,
delta_dp=common_delta
)
hist_f, _ = run_fedavg(fcfg)
fedavg_results[eps] = hist_f
gcfg = GossipConfig(
rounds=ROUNDS,
local_epochs=common_local_epochs,
lr=common_lr,
batch_size=common_bs,
clip_norm=common_clip,
epsilon=eps,
delta_dp=common_delta,
topology="ring",
gossip_pairs_per_round=10
)
hist_g, _ = run_gossip(gcfg)
gossip_results[eps] = hist_g
plt.figure(figsize=(10, 5))
for eps in eps_sweep:
plt.plot(fedavg_results[eps]["test_acc"], label=f"FedAvg eps={eps}")
plt.xlabel("Round")
plt.ylabel("Accuracy")
plt.legend()
plt.grid(True)
plt.show()
plt.figure(figsize=(10, 5))
for eps in eps_sweep:
plt.plot(gossip_results[eps]["avg_test_acc"], label=f"Gossip eps={eps}")
plt.xlabel("Round")
plt.ylabel("Avg Accuracy")
plt.legend()
plt.grid(True)
plt.show()
final_fed = [fedavg_results[eps]["test_acc"][-1] for eps in eps_sweep]
final_gos = [gossip_results[eps]["avg_test_acc"][-1] for eps in eps_sweep]
x = [100.0 if math.isinf(eps) else eps for eps in eps_sweep]
plt.figure(figsize=(8, 5))
plt.plot(x, final_fed, marker="o", label="FedAvg")
plt.plot(x, final_gos, marker="o", label="Gossip")
plt.xlabel("Epsilon")
plt.ylabel("Final Accuracy")
plt.legend()
plt.grid(True)
plt.show()
def rounds_to_threshold(acc_curve, threshold):
for i, a in enumerate(acc_curve):
if a >= threshold:
return i + 1
return None
best_f = fedavg_results[math.inf]["test_acc"][-1]
best_g = gossip_results[math.inf]["avg_test_acc"][-1]
th_f = 0.9 * best_f
th_g = 0.9 * best_g
for eps in eps_sweep:
rf = rounds_to_threshold(fedavg_results[eps]["test_acc"], th_f)
rg = rounds_to_threshold(gossip_results[eps]["avg_test_acc"], th_g)
print(eps, rf, rg)
We run controlled experiments across multiple privacy levels and collect results for both centralized and decentralized training strategies. We visualize convergence trends and final accuracy to clearly expose the privacy–utility trade-off. We also compute convergence speed metrics to quantitatively compare how different aggregation schemes respond to increasing privacy constraints.
In conclusion, we demonstrated that decentralization fundamentally changes how differential privacy noise propagates through a federated system. We observed that while centralized FedAvg typically converges faster under weak privacy constraints, gossip-based federated learning is more robust to noisy updates at the cost of slower convergence. Our experiments highlighted that stronger privacy guarantees significantly slow learning in both settings, but the effect is amplified in decentralized topologies due to delayed information mixing. Overall, we showed that designing privacy-preserving federated systems requires jointly reasoning about aggregation topology, communication patterns, and privacy budgets rather than treating them as independent choices.
Check out the Full Codes here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.



