Why Kubeflow?
In the rapidly evolving landscape of machine learning operations, the gap between experimental notebooks and production-ready systems remains one of the biggest challenges facing ML engineers. Kubeflow emerged as a comprehensive solution to bridge this gap, providing a platform for orchestrating ML workflows on Kubernetes.
Having deployed dozens of ML models into production over the past few years, I've learned that the tooling you choose for your ML pipeline can make or break your ability to iterate quickly and maintain reliability. Kubeflow has become my go-to platform for several reasons.
Architecture Overview
Core Components
- Kubeflow Pipelines: DAG-based workflow orchestration for ML experiments
- Katib: Hyperparameter tuning and neural architecture search
- KFServing: Model serving with autoscaling and canary deployments
- Training Operators: Distributed training for TensorFlow, PyTorch, MXNet, etc.
- Notebook Servers: Managed Jupyter environments for development
The beauty of Kubeflow's architecture lies in its modularity. You don't need to adopt everything at once—start with pipelines, then gradually add components as your needs grow.
Setting Up Your First Pipeline
Installation
First, ensure you have a Kubernetes cluster running. For development, I recommend using kind (Kubernetes in Docker) or minikube. For production, use a managed Kubernetes service like GKE, EKS, or AKS.
# Install Kubeflow Pipelines standalone
export PIPELINE_VERSION=2.0.5
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=$PIPELINE_VERSION"
# Verify installation
kubectl get pods -n kubeflow
# Port-forward to access UI
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80
Creating a Simple Pipeline
Let's build a complete ML pipeline that trains a text classification model. This example demonstrates data preprocessing, training, evaluation, and model deployment.
from kfp import dsl
from kfp import compiler
from kfp.dsl import component, Input, Output, Dataset, Model, Metrics
@component(
packages_to_install=['pandas', 'scikit-learn'],
base_image='python:3.9'
)
def preprocess_data(
input_data: Input[Dataset],
output_train: Output[Dataset],
output_test: Output[Dataset],
test_size: float = 0.2
):
"""Preprocess and split data into train/test sets."""
import pandas as pd
from sklearn.model_selection import train_test_split
# Load data
df = pd.read_csv(input_data.path)
# Basic preprocessing
df = df.dropna()
df['text'] = df['text'].str.lower()
# Split data
train_df, test_df = train_test_split(
df,
test_size=test_size,
random_state=42
)
# Save splits
train_df.to_csv(output_train.path, index=False)
test_df.to_csv(output_test.path, index=False)
print(f"Train size: {len(train_df)}, Test size: {len(test_df)}")
@component(
packages_to_install=['pandas', 'scikit-learn', 'transformers', 'torch'],
base_image='python:3.9'
)
def train_model(
train_data: Input[Dataset],
model_output: Output[Model],
metrics: Output[Metrics],
learning_rate: float = 2e-5,
epochs: int = 3
):
"""Train a text classification model."""
import pandas as pd
import torch
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
TrainingArguments,
Trainer
)
from sklearn.metrics import accuracy_score, f1_score
# Load training data
train_df = pd.read_csv(train_data.path)
# Initialize tokenizer and model
model_name = "distilbert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(
model_name,
num_labels=2
)
# Tokenize data
train_encodings = tokenizer(
train_df['text'].tolist(),
truncation=True,
padding=True,
max_length=128
)
# Create PyTorch dataset
class TextDataset(torch.utils.data.Dataset):
def __init__(self, encodings, labels):
self.encodings = encodings
self.labels = labels
def __len__(self):
return len(self.labels)
def __getitem__(self, idx):
item = {k: torch.tensor(v[idx]) for k, v in self.encodings.items()}
item['labels'] = torch.tensor(self.labels[idx])
return item
train_dataset = TextDataset(
train_encodings,
train_df['label'].tolist()
)
# Training arguments
training_args = TrainingArguments(
output_dir='./results',
num_train_epochs=epochs,
per_device_train_batch_size=16,
learning_rate=learning_rate,
logging_steps=100,
save_strategy="epoch",
)
# Train model
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
)
trainer.train()
# Save model
model.save_pretrained(model_output.path)
tokenizer.save_pretrained(model_output.path)
# Log metrics
metrics.log_metric("learning_rate", learning_rate)
metrics.log_metric("epochs", epochs)
print(f"Model saved to {model_output.path}")
@component(
packages_to_install=['pandas', 'scikit-learn', 'transformers', 'torch'],
base_image='python:3.9'
)
def evaluate_model(
test_data: Input[Dataset],
model: Input[Model],
metrics: Output[Metrics],
threshold: float = 0.8
) -> bool:
"""Evaluate model and determine if it meets quality threshold."""
import pandas as pd
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from sklearn.metrics import accuracy_score, f1_score, classification_report
# Load test data
test_df = pd.read_csv(test_data.path)
# Load model and tokenizer
tokenizer = AutoTokenizer.from_pretrained(model.path)
model_obj = AutoModelForSequenceClassification.from_pretrained(model.path)
model_obj.eval()
# Tokenize test data
test_encodings = tokenizer(
test_df['text'].tolist(),
truncation=True,
padding=True,
max_length=128,
return_tensors='pt'
)
# Make predictions
with torch.no_grad():
outputs = model_obj(**test_encodings)
predictions = torch.argmax(outputs.logits, dim=1).numpy()
# Calculate metrics
true_labels = test_df['label'].values
accuracy = accuracy_score(true_labels, predictions)
f1 = f1_score(true_labels, predictions, average='weighted')
# Log metrics
metrics.log_metric("accuracy", accuracy)
metrics.log_metric("f1_score", f1)
metrics.log_metric("test_samples", len(test_df))
# Print detailed report
print(classification_report(true_labels, predictions))
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1:.4f}")
# Check if model meets threshold
meets_threshold = accuracy >= threshold
print(f"Model {'PASSES' if meets_threshold else 'FAILS'} threshold of {threshold}")
return meets_threshold
@component(
packages_to_install=['kserve'],
base_image='python:3.9'
)
def deploy_model(
model: Input[Model],
model_name: str = "text-classifier",
namespace: str = "default"
):
"""Deploy model to KServe."""
from kubernetes import client, config
import json
# Load kubeconfig
config.load_incluster_config()
# Create InferenceService spec
inference_service = {
"apiVersion": "serving.kserve.io/v1beta1",
"kind": "InferenceService",
"metadata": {
"name": model_name,
"namespace": namespace
},
"spec": {
"predictor": {
"pytorch": {
"storageUri": model.uri,
"resources": {
"limits": {
"cpu": "1",
"memory": "2Gi"
},
"requests": {
"cpu": "500m",
"memory": "1Gi"
}
}
}
}
}
}
# Deploy using custom resource
api = client.CustomObjectsApi()
api.create_namespaced_custom_object(
group="serving.kserve.io",
version="v1beta1",
namespace=namespace,
plural="inferenceservices",
body=inference_service
)
print(f"Model {model_name} deployed successfully!")
@dsl.pipeline(
name='Text Classification Pipeline',
description='End-to-end pipeline for text classification'
)
def text_classification_pipeline(
data_path: str,
learning_rate: float = 2e-5,
epochs: int = 3,
test_size: float = 0.2,
accuracy_threshold: float = 0.8
):
"""Complete ML pipeline with conditional deployment."""
# Step 1: Preprocess data
preprocess_task = preprocess_data(
input_data=data_path,
test_size=test_size
)
# Step 2: Train model
train_task = train_model(
train_data=preprocess_task.outputs['output_train'],
learning_rate=learning_rate,
epochs=epochs
)
# Step 3: Evaluate model
eval_task = evaluate_model(
test_data=preprocess_task.outputs['output_test'],
model=train_task.outputs['model_output'],
threshold=accuracy_threshold
)
# Step 4: Deploy if evaluation passes
with dsl.Condition(eval_task.output == True):
deploy_task = deploy_model(
model=train_task.outputs['model_output']
)
# Compile pipeline
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=text_classification_pipeline,
package_path='text_classification_pipeline.yaml'
)
print("Pipeline compiled successfully!")
Running and Monitoring Pipelines
Submitting a Pipeline Run
import kfp
# Connect to Kubeflow Pipelines
client = kfp.Client(host='http://localhost:8080')
# Upload and run pipeline
pipeline_id = client.upload_pipeline(
pipeline_package_path='text_classification_pipeline.yaml',
pipeline_name='Text Classification Pipeline'
)
# Create experiment
experiment = client.create_experiment(name='text-classification-exp')
# Run pipeline
run = client.run_pipeline(
experiment_id=experiment.id,
job_name='text-classifier-run-1',
pipeline_id=pipeline_id,
params={
'data_path': 'gs://my-bucket/training-data.csv',
'learning_rate': 2e-5,
'epochs': 5,
'accuracy_threshold': 0.85
}
)
print(f"Run created: {run.id}")
Best Practices
💡 Production Tips
- Version everything: Data, code, models, and pipeline definitions
- Use component caching: Speed up iterations by caching unchanged components
- Implement proper monitoring: Track pipeline metrics, success rates, and execution times
- Handle failures gracefully: Add retry logic and fallback mechanisms
- Secure your pipelines: Use RBAC and secrets management
- Document extensively: Clear docs save debugging time later
- Test components independently: Unit test before pipeline integration
Advanced Patterns
Hyperparameter Tuning with Katib
Integrate automated hyperparameter optimization into your pipelines:
apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
name: text-classifier-hpo
spec:
algorithm:
algorithmName: random
parallelTrialCount: 3
maxTrialCount: 12
objective:
type: maximize
goal: 0.95
objectiveMetricName: accuracy
parameters:
- name: learning_rate
parameterType: double
feasibleSpace:
min: "0.00001"
max: "0.001"
- name: batch_size
parameterType: int
feasibleSpace:
min: "16"
max: "64"
step: "16"
trialTemplate:
primaryContainerName: training-container
trialSpec:
apiVersion: batch/v1
kind: Job
spec:
template:
spec:
containers:
- name: training-container
image: myregistry/text-classifier:latest
command:
- python
- train.py
- --learning-rate=${trialParameters.learningRate}
- --batch-size=${trialParameters.batchSize}
A/B Testing Deployments
Use canary deployments to gradually roll out new models:
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: text-classifier
spec:
predictor:
canaryTrafficPercent: 20
pytorch:
storageUri: gs://models/text-classifier/v2
model:
modelFormat:
name: pytorch
# Previous version gets 80% of traffic
canary:
pytorch:
storageUri: gs://models/text-classifier/v1
Conclusion
Kubeflow provides a powerful platform for building production ML systems, but like any complex tool, it requires investment to master. Start simple—perhaps just with Kubeflow Pipelines—and gradually adopt more components as your needs grow.
The key to success with Kubeflow is treating your ML workflows as first-class software engineering artifacts. Apply the same rigor you would to any production system: version control, testing, monitoring, and documentation.
In upcoming posts, we'll dive deeper into specific aspects like optimizing pipeline performance, implementing CI/CD for ML, and building custom Kubeflow components.
Resources
- Official Kubeflow Documentation: https://kubeflow.org/docs/
- Kubeflow Pipelines SDK: https://kubeflow-pipelines.readthedocs.io/
- Katib Documentation: https://www.kubeflow.org/docs/components/katib/
- KServe Documentation: https://kserve.github.io/website/