Why Kubeflow?

In the rapidly evolving landscape of machine learning operations, the gap between experimental notebooks and production-ready systems remains one of the biggest challenges facing ML engineers. Kubeflow emerged as a comprehensive solution to bridge this gap, providing a platform for orchestrating ML workflows on Kubernetes.

Having deployed dozens of ML models into production over the past few years, I've learned that the tooling you choose for your ML pipeline can make or break your ability to iterate quickly and maintain reliability. Kubeflow has become my go-to platform for several reasons.

Architecture Overview

Core Components

  • Kubeflow Pipelines: DAG-based workflow orchestration for ML experiments
  • Katib: Hyperparameter tuning and neural architecture search
  • KFServing: Model serving with autoscaling and canary deployments
  • Training Operators: Distributed training for TensorFlow, PyTorch, MXNet, etc.
  • Notebook Servers: Managed Jupyter environments for development

The beauty of Kubeflow's architecture lies in its modularity. You don't need to adopt everything at once—start with pipelines, then gradually add components as your needs grow.

Setting Up Your First Pipeline

Installation

First, ensure you have a Kubernetes cluster running. For development, I recommend using kind (Kubernetes in Docker) or minikube. For production, use a managed Kubernetes service like GKE, EKS, or AKS.

Bash
# Install Kubeflow Pipelines standalone
export PIPELINE_VERSION=2.0.5
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=$PIPELINE_VERSION"

# Verify installation
kubectl get pods -n kubeflow

# Port-forward to access UI
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80

Creating a Simple Pipeline

Let's build a complete ML pipeline that trains a text classification model. This example demonstrates data preprocessing, training, evaluation, and model deployment.

Python
from kfp import dsl
from kfp import compiler
from kfp.dsl import component, Input, Output, Dataset, Model, Metrics

@component(
    packages_to_install=['pandas', 'scikit-learn'],
    base_image='python:3.9'
)
def preprocess_data(
    input_data: Input[Dataset],
    output_train: Output[Dataset],
    output_test: Output[Dataset],
    test_size: float = 0.2
):
    """Preprocess and split data into train/test sets."""
    import pandas as pd
    from sklearn.model_selection import train_test_split
    
    # Load data
    df = pd.read_csv(input_data.path)
    
    # Basic preprocessing
    df = df.dropna()
    df['text'] = df['text'].str.lower()
    
    # Split data
    train_df, test_df = train_test_split(
        df, 
        test_size=test_size, 
        random_state=42
    )
    
    # Save splits
    train_df.to_csv(output_train.path, index=False)
    test_df.to_csv(output_test.path, index=False)
    
    print(f"Train size: {len(train_df)}, Test size: {len(test_df)}")


@component(
    packages_to_install=['pandas', 'scikit-learn', 'transformers', 'torch'],
    base_image='python:3.9'
)
def train_model(
    train_data: Input[Dataset],
    model_output: Output[Model],
    metrics: Output[Metrics],
    learning_rate: float = 2e-5,
    epochs: int = 3
):
    """Train a text classification model."""
    import pandas as pd
    import torch
    from transformers import (
        AutoTokenizer, 
        AutoModelForSequenceClassification,
        TrainingArguments, 
        Trainer
    )
    from sklearn.metrics import accuracy_score, f1_score
    
    # Load training data
    train_df = pd.read_csv(train_data.path)
    
    # Initialize tokenizer and model
    model_name = "distilbert-base-uncased"
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForSequenceClassification.from_pretrained(
        model_name, 
        num_labels=2
    )
    
    # Tokenize data
    train_encodings = tokenizer(
        train_df['text'].tolist(),
        truncation=True,
        padding=True,
        max_length=128
    )
    
    # Create PyTorch dataset
    class TextDataset(torch.utils.data.Dataset):
        def __init__(self, encodings, labels):
            self.encodings = encodings
            self.labels = labels
        
        def __len__(self):
            return len(self.labels)
        
        def __getitem__(self, idx):
            item = {k: torch.tensor(v[idx]) for k, v in self.encodings.items()}
            item['labels'] = torch.tensor(self.labels[idx])
            return item
    
    train_dataset = TextDataset(
        train_encodings, 
        train_df['label'].tolist()
    )
    
    # Training arguments
    training_args = TrainingArguments(
        output_dir='./results',
        num_train_epochs=epochs,
        per_device_train_batch_size=16,
        learning_rate=learning_rate,
        logging_steps=100,
        save_strategy="epoch",
    )
    
    # Train model
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
    )
    
    trainer.train()
    
    # Save model
    model.save_pretrained(model_output.path)
    tokenizer.save_pretrained(model_output.path)
    
    # Log metrics
    metrics.log_metric("learning_rate", learning_rate)
    metrics.log_metric("epochs", epochs)
    
    print(f"Model saved to {model_output.path}")


@component(
    packages_to_install=['pandas', 'scikit-learn', 'transformers', 'torch'],
    base_image='python:3.9'
)
def evaluate_model(
    test_data: Input[Dataset],
    model: Input[Model],
    metrics: Output[Metrics],
    threshold: float = 0.8
) -> bool:
    """Evaluate model and determine if it meets quality threshold."""
    import pandas as pd
    import torch
    from transformers import AutoTokenizer, AutoModelForSequenceClassification
    from sklearn.metrics import accuracy_score, f1_score, classification_report
    
    # Load test data
    test_df = pd.read_csv(test_data.path)
    
    # Load model and tokenizer
    tokenizer = AutoTokenizer.from_pretrained(model.path)
    model_obj = AutoModelForSequenceClassification.from_pretrained(model.path)
    model_obj.eval()
    
    # Tokenize test data
    test_encodings = tokenizer(
        test_df['text'].tolist(),
        truncation=True,
        padding=True,
        max_length=128,
        return_tensors='pt'
    )
    
    # Make predictions
    with torch.no_grad():
        outputs = model_obj(**test_encodings)
        predictions = torch.argmax(outputs.logits, dim=1).numpy()
    
    # Calculate metrics
    true_labels = test_df['label'].values
    accuracy = accuracy_score(true_labels, predictions)
    f1 = f1_score(true_labels, predictions, average='weighted')
    
    # Log metrics
    metrics.log_metric("accuracy", accuracy)
    metrics.log_metric("f1_score", f1)
    metrics.log_metric("test_samples", len(test_df))
    
    # Print detailed report
    print(classification_report(true_labels, predictions))
    print(f"Accuracy: {accuracy:.4f}")
    print(f"F1 Score: {f1:.4f}")
    
    # Check if model meets threshold
    meets_threshold = accuracy >= threshold
    print(f"Model {'PASSES' if meets_threshold else 'FAILS'} threshold of {threshold}")
    
    return meets_threshold


@component(
    packages_to_install=['kserve'],
    base_image='python:3.9'
)
def deploy_model(
    model: Input[Model],
    model_name: str = "text-classifier",
    namespace: str = "default"
):
    """Deploy model to KServe."""
    from kubernetes import client, config
    import json
    
    # Load kubeconfig
    config.load_incluster_config()
    
    # Create InferenceService spec
    inference_service = {
        "apiVersion": "serving.kserve.io/v1beta1",
        "kind": "InferenceService",
        "metadata": {
            "name": model_name,
            "namespace": namespace
        },
        "spec": {
            "predictor": {
                "pytorch": {
                    "storageUri": model.uri,
                    "resources": {
                        "limits": {
                            "cpu": "1",
                            "memory": "2Gi"
                        },
                        "requests": {
                            "cpu": "500m",
                            "memory": "1Gi"
                        }
                    }
                }
            }
        }
    }
    
    # Deploy using custom resource
    api = client.CustomObjectsApi()
    api.create_namespaced_custom_object(
        group="serving.kserve.io",
        version="v1beta1",
        namespace=namespace,
        plural="inferenceservices",
        body=inference_service
    )
    
    print(f"Model {model_name} deployed successfully!")


@dsl.pipeline(
    name='Text Classification Pipeline',
    description='End-to-end pipeline for text classification'
)
def text_classification_pipeline(
    data_path: str,
    learning_rate: float = 2e-5,
    epochs: int = 3,
    test_size: float = 0.2,
    accuracy_threshold: float = 0.8
):
    """Complete ML pipeline with conditional deployment."""
    
    # Step 1: Preprocess data
    preprocess_task = preprocess_data(
        input_data=data_path,
        test_size=test_size
    )
    
    # Step 2: Train model
    train_task = train_model(
        train_data=preprocess_task.outputs['output_train'],
        learning_rate=learning_rate,
        epochs=epochs
    )
    
    # Step 3: Evaluate model
    eval_task = evaluate_model(
        test_data=preprocess_task.outputs['output_test'],
        model=train_task.outputs['model_output'],
        threshold=accuracy_threshold
    )
    
    # Step 4: Deploy if evaluation passes
    with dsl.Condition(eval_task.output == True):
        deploy_task = deploy_model(
            model=train_task.outputs['model_output']
        )


# Compile pipeline
if __name__ == '__main__':
    compiler.Compiler().compile(
        pipeline_func=text_classification_pipeline,
        package_path='text_classification_pipeline.yaml'
    )
    print("Pipeline compiled successfully!")

Running and Monitoring Pipelines

Submitting a Pipeline Run

Python
import kfp

# Connect to Kubeflow Pipelines
client = kfp.Client(host='http://localhost:8080')

# Upload and run pipeline
pipeline_id = client.upload_pipeline(
    pipeline_package_path='text_classification_pipeline.yaml',
    pipeline_name='Text Classification Pipeline'
)

# Create experiment
experiment = client.create_experiment(name='text-classification-exp')

# Run pipeline
run = client.run_pipeline(
    experiment_id=experiment.id,
    job_name='text-classifier-run-1',
    pipeline_id=pipeline_id,
    params={
        'data_path': 'gs://my-bucket/training-data.csv',
        'learning_rate': 2e-5,
        'epochs': 5,
        'accuracy_threshold': 0.85
    }
)

print(f"Run created: {run.id}")

Best Practices

💡 Production Tips

  • Version everything: Data, code, models, and pipeline definitions
  • Use component caching: Speed up iterations by caching unchanged components
  • Implement proper monitoring: Track pipeline metrics, success rates, and execution times
  • Handle failures gracefully: Add retry logic and fallback mechanisms
  • Secure your pipelines: Use RBAC and secrets management
  • Document extensively: Clear docs save debugging time later
  • Test components independently: Unit test before pipeline integration

Advanced Patterns

Hyperparameter Tuning with Katib

Integrate automated hyperparameter optimization into your pipelines:

YAML
apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
  name: text-classifier-hpo
spec:
  algorithm:
    algorithmName: random
  parallelTrialCount: 3
  maxTrialCount: 12
  objective:
    type: maximize
    goal: 0.95
    objectiveMetricName: accuracy
  parameters:
    - name: learning_rate
      parameterType: double
      feasibleSpace:
        min: "0.00001"
        max: "0.001"
    - name: batch_size
      parameterType: int
      feasibleSpace:
        min: "16"
        max: "64"
        step: "16"
  trialTemplate:
    primaryContainerName: training-container
    trialSpec:
      apiVersion: batch/v1
      kind: Job
      spec:
        template:
          spec:
            containers:
              - name: training-container
                image: myregistry/text-classifier:latest
                command:
                  - python
                  - train.py
                  - --learning-rate=${trialParameters.learningRate}
                  - --batch-size=${trialParameters.batchSize}

A/B Testing Deployments

Use canary deployments to gradually roll out new models:

YAML
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: text-classifier
spec:
  predictor:
    canaryTrafficPercent: 20
    pytorch:
      storageUri: gs://models/text-classifier/v2
    model:
      modelFormat:
        name: pytorch
  # Previous version gets 80% of traffic
  canary:
    pytorch:
      storageUri: gs://models/text-classifier/v1

Conclusion

Kubeflow provides a powerful platform for building production ML systems, but like any complex tool, it requires investment to master. Start simple—perhaps just with Kubeflow Pipelines—and gradually adopt more components as your needs grow.

The key to success with Kubeflow is treating your ML workflows as first-class software engineering artifacts. Apply the same rigor you would to any production system: version control, testing, monitoring, and documentation.

In upcoming posts, we'll dive deeper into specific aspects like optimizing pipeline performance, implementing CI/CD for ML, and building custom Kubeflow components.

Resources

  • Official Kubeflow Documentation: https://kubeflow.org/docs/
  • Kubeflow Pipelines SDK: https://kubeflow-pipelines.readthedocs.io/
  • Katib Documentation: https://www.kubeflow.org/docs/components/katib/
  • KServe Documentation: https://kserve.github.io/website/