DAG example using KubernetesPodOperator
, the idea is run a Docker container in Kubernetes from Airflow every 30 minutes.
Features:
- Scheduled every 30 minutes.
- Set environment variable for the pod
RULES
. - Run the pods in the namespace
default
. - Mount a volume to the container. It's just an example mounting the
/tmp
from host. - Set imagePullSecrets for private Docker registry.
import airflow
from airflow import DAG
from datetime import datetime
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.contrib.kubernetes.volume import Volume
from airflow.contrib.kubernetes.volume_mount import VolumeMount
default_args = {
'owner': 'devops',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(1),
'retries': 0
}
dag = DAG(
'run_container',
schedule_interval='*/30 * * * *',
default_args=default_args
)
volume = Volume(
name="tmp",
configs={"hostPath": {"path":"/tmp", "type":"Directory"}}
)
volume_mount = VolumeMount(
"tmp",
mount_path="/opt/tmp",
sub_path=None,
read_only=True
)
with dag:
k = KubernetesPodOperator(
name="my-container",
image=hello-world:latest,
namespace="default",
env_vars={
"RULES": "Testing"
},
task_id="run_container",
image_pull_secrets="service-user-for-pull",
volumes=[volume],
volume_mounts=[volume_mount],
get_logs=True,
in_cluster=False,
xcom_push=False
)
Troubleshooting
After updated to Airflow 1.10.7 we have a few issues.
Issue 1
First issue was related to the kubeconfig and the context, the default values for the field in_cluster
was changed from False
to True
so if you have your kubeconfig defined in ~/.kube/config you need to set in_cluster
as False
.
kubernetes.config.config_exception.ConfigException: Service host/port is not set.
Add the field in_cluster=False
in your DAG.
Issue 2
The second issue is related to the default value for the field xcom_push
was changed from False
to True
so is creating an extra container as sidecar which handle the logs and is failing to retrive them.
Add the field xcom_push=False
in your DAG.