Airflow: Kubernetes Operator

November 4, 2019 - Last updated: January 30, 2020

DAG example using KubernetesPodOperator, the idea is run a Docker container in Kubernetes from Airflow every 30 minutes.

Features:

  • Scheduled every 30 minutes.
  • Set environment variable for the pod RULES.
  • Run the pods in the namespace default.
  • Mount a volume to the container. It's just an example mounting the /tmp from host.
  • Set imagePullSecrets for private Docker registry.
import airflow
from airflow import DAG
from datetime import datetime
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.contrib.kubernetes.volume import Volume
from airflow.contrib.kubernetes.volume_mount import VolumeMount

default_args = {
    'owner': 'devops',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(1),
    'retries': 0
}

dag = DAG(
    'run_container',
    schedule_interval='*/30 * * * *',
    default_args=default_args
)

volume = Volume(
    name="tmp",
    configs={"hostPath": {"path":"/tmp", "type":"Directory"}}
)

volume_mount = VolumeMount(
    "tmp",
    mount_path="/opt/tmp",
    sub_path=None,
    read_only=True
)

with dag:
    k = KubernetesPodOperator(
        name="my-container",
        image=hello-world:latest,
        namespace="default",
        env_vars={
            "RULES": "Testing"
        },
        task_id="run_container",
        image_pull_secrets="service-user-for-pull",
        volumes=[volume],
        volume_mounts=[volume_mount],
        get_logs=True,
        in_cluster=False,
        xcom_push=False
    )

Troubleshooting

After updated to Airflow 1.10.7 we have a few issues.

Issue 1

First issue was related to the kubeconfig and the context, the default values for the field in_cluster was changed from False to True so if you have your kubeconfig defined in ~/.kube/config you need to set in_cluster as False.

kubernetes.config.config_exception.ConfigException: Service host/port is not set.

Add the field in_cluster=False in your DAG.

Issue 2

The second issue is related to the default value for the field xcom_push was changed from False to True so is creating an extra container as sidecar which handle the logs and is failing to retrive them.

Add the field xcom_push=False in your DAG.