from airflow.operators.python import PythonOperator def push_function(**context): context['ti'].xcom_push(key='user_id', value=123)
push = PythonOperator(task_id='push_task', python_callable=push_function) pull = PythonOperator(task_id='pull_task', python_callable=pull_function)
def pull_function(**context): user_id = context['ti'].xcom_pull(task_ids='push_task', key='user_id') print(f"Received user_id")
@task def extract() -> dict: return "user_id": 123, "name": "Alice" # pushed automatically
push >> pull Pattern 1: Passing an ID from a query to a processing task @task def get_latest_record_id() -> int: # Imagine a SQL query here return 42 @task def process_record(record_id: int): print(f"Processing record record_id")