Wearable Activity Classification with SGLD¶
Overview¶
We build a Bayesian time-series classifier for wearable motion data. A convolutional network ingests synthetic tri-axial accelerometer traces and predicts whether a subject is resting, walking, or running. SGLD delivers posterior weight samples so that predictions come with uncertainty estimates instead of a single point estimate.
Learning Objectives¶
- Synthesise a realistic multi-class time-series dataset mimicking wearable IMU signals.
- Train a 1D convolutional encoder and inspect the effect of a short warm-up phase.
- Run SGLD to gather posterior weight samples and compute Bayesian predictive summaries.
- Visualise model confidence by contrasting low- and high-variance sequences.
Prerequisites¶
- Install this repository with
pip install -e .so thedeepuqpackage is importable. - Ensure PyTorch and Matplotlib are available in the active environment.
- A GPU helps but is optional; batch sizes remain moderate.
# Configure Python path so the notebook sees the local deepuq package
import os
import sys
from pathlib import Path
PROJECT_ROOT = Path(os.getcwd())
if not (PROJECT_ROOT / 'src').exists():
PROJECT_ROOT = PROJECT_ROOT.parent
SRC_PATH = str(PROJECT_ROOT / 'src')
if SRC_PATH not in sys.path:
sys.path.insert(0, SRC_PATH)
# Core libraries and deepuq utilities
import math
from typing import Dict, Tuple
import matplotlib.pyplot as plt
import torch
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader
from deepuq.methods.mcmc import collect_posterior_samples, predict_with_samples
from deepuq.utils import set_seed
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Running on {DEVICE}')
Synthetic Activity Dataset¶
We emulate a tri-axial accelerometer strapped to a person performing three activities:
- Resting: low-amplitude drift with sporadic micro-movements.
- Walking: medium-frequency oscillations with phase shifts between axes.
- Running: high-frequency, high-amplitude oscillations with impact bursts. Each sequence spans 120 time steps in normalised time units.
set_seed(7)
class ActivitySequenceDataset(Dataset):
"""Generate labelled tri-axial accelerometer traces for activity recognition."""
def __init__(
self,
n_series: int,
seq_len: int,
noise_std: float = 0.08,
seed: int | None = None,
) -> None:
super().__init__()
self.seq_len = seq_len
self.channels = 3
self.time_grid = torch.linspace(0.0, 1.0, seq_len, dtype=torch.float32)
self.data = torch.empty(n_series, self.channels, seq_len, dtype=torch.float32)
self.labels = torch.empty(n_series, dtype=torch.long)
self.class_names: Dict[int, str] = {0: 'rest', 1: 'walk', 2: 'run'}
rng = torch.Generator()
if seed is not None:
rng.manual_seed(seed)
def uniform(low: float, high: float) -> float:
return float(torch.rand(1, generator=rng).item() * (high - low) + low)
for idx in range(n_series):
label = torch.randint(0, len(self.class_names), (1,), generator=rng).item()
base_trace = self._simulate_activity(label, uniform, rng)
noise = torch.randn((self.channels, seq_len), generator=rng) * noise_std
self.data[idx] = base_trace + noise
self.labels[idx] = label
def _simulate_activity(self, label: int, uniform, rng: torch.Generator) -> torch.Tensor:
"""Return a (channels, seq_len) tensor for the requested activity label."""
t = self.time_grid
trace = torch.zeros(self.channels, self.seq_len, dtype=torch.float32)
phase_offsets = torch.rand(self.channels, generator=rng) * (2 * math.pi)
orientation = torch.rand(self.channels, generator=rng) * 0.3
if label == 0: # resting
drift = torch.linspace(0.0, 0.1, self.seq_len)
micro_motion = 0.05 * torch.sin(2 * math.pi * 0.6 * t + phase_offsets.view(-1, 1))
trace = micro_motion + orientation.view(-1, 1) * drift
elif label == 1: # walking
freq = uniform(1.0, 1.8)
amplitude = torch.tensor([uniform(0.8, 1.1), uniform(0.7, 1.0), uniform(0.6, 0.9)])
gait_core = amplitude.view(-1, 1) * torch.sin(2 * math.pi * freq * t + phase_offsets.view(-1, 1))
sway = 0.15 * torch.sin(2 * math.pi * (freq / 2.0) * t)
trace = gait_core + sway
else: # running
freq = uniform(2.2, 3.5)
amplitude = torch.tensor([uniform(1.2, 1.6), uniform(1.1, 1.4), uniform(1.0, 1.3)])
gait_core = amplitude.view(-1, 1) * torch.sin(2 * math.pi * freq * t + phase_offsets.view(-1, 1))
impacts = 0.35 * torch.sin(2 * math.pi * (freq * 2.0) * t)
burst = 0.2 * torch.exp(-50 * (t - 0.5) ** 2)
trace = gait_core + impacts + burst
trace += orientation.view(-1, 1)
return trace
def __len__(self) -> int:
return self.data.size(0)
def __getitem__(self, index: int) -> Tuple[torch.Tensor, torch.Tensor]:
return self.data[index], self.labels[index]
SEQ_LEN = 120
train_dataset = ActivitySequenceDataset(n_series=400, seq_len=SEQ_LEN, noise_std=0.08, seed=12)
test_dataset = ActivitySequenceDataset(n_series=100, seq_len=SEQ_LEN, noise_std=0.08, seed=99)
BATCH_SIZE = 64
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, drop_last=True, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, pin_memory=True)
print(f'Training sequences: {len(train_dataset)} | Test sequences: {len(test_dataset)}')
Class balance check¶
Verify each activity appears equally often.
def class_distribution(dataset: ActivitySequenceDataset) -> Dict[str, int]:
counts = torch.zeros(len(dataset.class_names), dtype=torch.int64)
for label in dataset.labels:
counts[label.item()] += 1
return {dataset.class_names[i]: int(counts[i]) for i in range(len(counts))}
print('Train distribution:', class_distribution(train_dataset))
print('Test distribution :', class_distribution(test_dataset))
Train distribution: {'rest': 769, 'walk': 791, 'run': 840}
Test distribution : {'rest': 231, 'walk': 172, 'run': 197}
Visualise sample traces¶
Display two random sequences per class to sanity-check the simulator.
def plot_activity_examples(dataset: ActivitySequenceDataset, samples_per_class: int = 2) -> None:
class_ids = list(dataset.class_names.keys())
fig, axes = plt.subplots(samples_per_class, len(class_ids), figsize=(14, 4 * samples_per_class), sharex=True)
if samples_per_class == 1:
axes = axes[None, :]
time_grid = dataset.time_grid.numpy()
for col, class_id in enumerate(class_ids):
candidates = (dataset.labels == class_id).nonzero(as_tuple=True)[0]
picks = candidates[torch.randperm(len(candidates))[:samples_per_class]]
for row, idx_tensor in enumerate(picks):
idx = int(idx_tensor.item())
series, label = dataset[idx]
ax = axes[row, col]
for axis_idx, axis_name in enumerate(['x', 'y', 'z']):
ax.plot(time_grid, series[axis_idx].numpy(), label=f'axis {axis_name}')
ax.set_title(f"class {dataset.class_names[label.item()]} (idx {idx})")
ax.set_xlabel('time (normalised)')
ax.set_ylabel('acceleration')
ax.legend(loc='upper right')
plt.tight_layout()
plot_activity_examples(train_dataset, samples_per_class=2)
Temporal Encoder¶
A compact convolutional network extracts rhythmic patterns while residual connections preserve gradients. Dropout encourages diversity for the subsequent Bayesian sampling stage.
class ResidualConvBlock(nn.Module):
def __init__(self, channels: int, kernel_size: int = 5, dropout: float = 0.1) -> None:
super().__init__()
padding = kernel_size // 2
self.block = nn.Sequential(
nn.Conv1d(channels, channels, kernel_size=kernel_size, padding=padding),
nn.ReLU(),
nn.Dropout(dropout),
nn.Conv1d(channels, channels, kernel_size=kernel_size, padding=padding),
)
self.activation = nn.ReLU()
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.activation(x + self.block(x))
class ActivityClassifier(nn.Module):
def __init__(self, num_classes: int = 3, hidden_channels: int = 96, dropout: float = 0.2) -> None:
super().__init__()
self.stem = nn.Sequential(
nn.Conv1d(3, hidden_channels, kernel_size=7, padding=3),
nn.ReLU(),
nn.Dropout(dropout),
)
self.encoder = nn.Sequential(
ResidualConvBlock(hidden_channels, kernel_size=5, dropout=dropout),
ResidualConvBlock(hidden_channels, kernel_size=5, dropout=dropout),
nn.Conv1d(hidden_channels, hidden_channels, kernel_size=3, padding=1),
nn.ReLU(),
)
self.pool = nn.AdaptiveAvgPool1d(1)
self.head = nn.Linear(hidden_channels, num_classes)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.stem(x)
x = self.encoder(x)
x = self.pool(x).squeeze(-1)
return self.head(x)
model = ActivityClassifier().to(DEVICE)
total_params = sum(p.numel() for p in model.parameters())
print(model)
print(f'Total parameters: {total_params:,}')
ActivityClassifier(
(stem): Sequential(
(0): Conv1d(3, 96, kernel_size=(7,), stride=(1,), padding=(3,))
(1): ReLU()
(2): Dropout(p=0.2, inplace=False)
)
(encoder): Sequential(
(0): ResidualConvBlock(
(block): Sequential(
(0): Conv1d(96, 96, kernel_size=(5,), stride=(1,), padding=(2,))
(1): ReLU()
(2): Dropout(p=0.2, inplace=False)
(3): Conv1d(96, 96, kernel_size=(5,), stride=(1,), padding=(2,))
)
(activation): ReLU()
)
(1): ResidualConvBlock(
(block): Sequential(
(0): Conv1d(96, 96, kernel_size=(5,), stride=(1,), padding=(2,))
(1): ReLU()
(2): Dropout(p=0.2, inplace=False)
(3): Conv1d(96, 96, kernel_size=(5,), stride=(1,), padding=(2,))
)
(activation): ReLU()
)
(2): Conv1d(96, 96, kernel_size=(3,), stride=(1,), padding=(1,))
(3): ReLU()
)
(pool): AdaptiveAvgPool1d(output_size=1)
(head): Linear(in_features=96, out_features=3, bias=True)
)
Total parameters: 214,851
Warm-Up Training (Adam)¶
A brief deterministic phase stabilises optimisation before introducing stochastic gradient noise.
def train_epoch(model: nn.Module, loader: DataLoader, optimizer: optim.Optimizer, device: torch.device) -> float:
model.train()
loss_fn = nn.CrossEntropyLoss()
cumulative_loss = 0.0
for sequences, labels in loader:
sequences = sequences.to(device)
labels = labels.to(device)
optimizer.zero_grad(set_to_none=True)
logits = model(sequences)
loss = loss_fn(logits, labels)
loss.backward()
optimizer.step()
cumulative_loss += loss.item()
return cumulative_loss / max(len(loader), 1)
@torch.inference_mode()
def evaluate_accuracy(model: nn.Module, loader: DataLoader, device: torch.device) -> float:
model.eval()
correct = 0
total = 0
for sequences, labels in loader:
sequences = sequences.to(device)
labels = labels.to(device)
preds = model(sequences).argmax(dim=1)
correct += (preds == labels).sum().item()
total += labels.size(0)
return correct / max(total, 1)
WARMUP_EPOCHS = 2
optimizer = optim.Adam(model.parameters(), lr=2e-3, weight_decay=1e-4)
for epoch in range(1, WARMUP_EPOCHS + 1):
loss = train_epoch(model, train_loader, optimizer, DEVICE)
train_acc = evaluate_accuracy(model, train_loader, DEVICE)
test_acc = evaluate_accuracy(model, test_loader, DEVICE)
print(f'Epoch {epoch:02d} | loss={loss:.3f} | train_acc={train_acc:.3f} | test_acc={test_acc:.3f}')
Posterior Sampling via SGLD¶
With a solid starting point, we turn on SGLD to explore a posterior over weights.
SGLD_STEPS = 100
SGLD_BURN_IN = 0.35
SGLD_LR = 5e-5
SGLD_WEIGHT_DECAY = 5e-5
samples = collect_posterior_samples(
model=model,
data_loader=train_loader,
n_steps=SGLD_STEPS,
lr=SGLD_LR,
weight_decay=SGLD_WEIGHT_DECAY,
burn_in=SGLD_BURN_IN,
device=DEVICE,
)
print(f'Stored posterior samples: {len(samples)} of {SGLD_STEPS} steps')
Bayesian Predictions¶
We aggregate softmax outputs across posterior samples to compute mean probabilities and variances.
model.eval()
mean_probs_list = []
var_probs_list = []
labels_list = []
inputs_list = []
with torch.inference_mode():
for batch_inputs, batch_labels in test_loader:
mean_probs, var_probs = predict_with_samples(
model=model,
samples=samples,
x=batch_inputs,
apply_softmax=True,
device=DEVICE,
)
mean_probs_list.append(mean_probs)
var_probs_list.append(var_probs)
labels_list.append(batch_labels)
inputs_list.append(batch_inputs)
mean_probs = torch.cat(mean_probs_list, dim=0)
var_probs = torch.cat(var_probs_list, dim=0)
true_labels = torch.cat(labels_list, dim=0)
inputs = torch.cat(inputs_list, dim=0)
pred_labels = mean_probs.argmax(dim=1)
test_accuracy = (pred_labels == true_labels).float().mean().item()
avg_variance = var_probs.mean(dim=1)
print(f'Posterior mean accuracy: {test_accuracy:.3f}')
print(f'Mean predictive variance: {avg_variance.mean().item():.4f}')
Posterior mean accuracy: 0.712 Mean predictive variance: 0.1449
Confusion matrix¶
Quantify which activities remain most confusable after marginalising over weights.
def compute_confusion_matrix(preds: torch.Tensor, targets: torch.Tensor, num_classes: int) -> torch.Tensor:
matrix = torch.zeros(num_classes, num_classes, dtype=torch.int64)
for true_label, pred_label in zip(targets, preds):
matrix[true_label.item(), pred_label.item()] += 1
return matrix
def plot_confusion_matrix(matrix: torch.Tensor, class_names) -> None:
fig, ax = plt.subplots(figsize=(6, 5))
im = ax.imshow(matrix.numpy(), cmap='Blues')
ax.figure.colorbar(im, ax=ax)
ax.set_xlabel('Predicted label')
ax.set_ylabel('True label')
ax.set_xticks(range(len(class_names)))
ax.set_yticks(range(len(class_names)))
ax.set_xticklabels(class_names)
ax.set_yticklabels(class_names)
for i in range(matrix.size(0)):
for j in range(matrix.size(1)):
ax.text(j, i, int(matrix[i, j].item()), ha="center", va="center", color="black")
plt.tight_layout()
class_names = [train_dataset.class_names[i] for i in range(len(train_dataset.class_names))]
confusion = compute_confusion_matrix(pred_labels, true_labels, num_classes=len(class_names))
plot_confusion_matrix(confusion, class_names)
Uncertainty Diagnostics¶
Contrast sequences with the lowest and highest posterior variance to understand confidence.
num_sequences = 6
half = num_sequences // 2
lowest_idx = torch.topk(avg_variance, k=half, largest=False).indices
highest_idx = torch.topk(avg_variance, k=half, largest=True).indices
selected = torch.cat([lowest_idx, highest_idx])
descriptors = ['low variance'] * half + ['high variance'] * half
time_axis = train_dataset.time_grid.numpy()
fig, axes = plt.subplots(2, half, figsize=(14, 6), sharex=True, sharey=True)
for ax, idx_tensor, desc in zip(axes.flatten(), selected, descriptors):
idx = int(idx_tensor.item())
sequence = inputs[idx].numpy()
for channel, color in zip(range(sequence.shape[0]), ['tab:blue', 'tab:orange', 'tab:green']):
ax.plot(time_axis, sequence[channel], color=color, alpha=0.8)
title = (
f"{desc} | true {train_dataset.class_names[true_labels[idx].item()]} | "
f"p(run) {mean_probs[idx, 2].item():.2f} | var {avg_variance[idx].item():.4f}"
)
ax.set_title(title, fontsize=9)
ax.set_xlabel('time')
ax.set_ylabel('acceleration')
plt.tight_layout()
Next Steps¶
- Increase the number of SGLD iterations and reduce the step size once warm-up accuracy plateaus.
- Extend the simulator with additional activities (e.g., cycling, stair climbing) to study multi-class behaviour.
- Swap the encoder for a temporal transformer or GRU to compare posterior characteristics.
- Evaluate calibration by binning mean probabilities and plotting reliability diagrams.