Creating a custom XCom backend in Airflow
How to store larger Airflow XCom payloads in S3 while keeping DAG task interfaces small and explicit.
Need Help With This Topic?
Our experts can help you implement these strategies in your organisation. Get a free consultation today.
In Airflow, XComs 🔗 let tasks 🔗 exchange small pieces of data between runs.
Default XCom storage is not a good fit for larger payloads. Limits vary by metadata database backend, and pushing bigger objects directly into XCom storage can create operational problems.
Use this pattern when payload size is too large for default XCom storage but still small enough to serialize and retrieve safely per task run.
Why this pattern helps
Instead of storing full payloads in XCom tables, you can:
- Serialize task output to object storage (for example, S3)
- Store only an object reference in XCom
- Deserialize downstream when needed
This keeps task boundaries explicit and avoids bloating Airflow metadata storage.
Custom backend approach
Airflow supports custom XCom backends through BaseXCom subclassing. The idea is:
- Override
serialize_valueto intercept selected object types. - Upload serialized data to S3.
- Return a custom URI in XCom (for example,
xcom_s3://bucket/key). - Override
deserialize_valueto fetch from S3 when that URI is encountered.
Example implementation
import os
import uuid
import pandas as pd
from typing import Any
from airflow.models.xcom import BaseXCom
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
class S3XComBackend(BaseXCom):
PREFIX = "xcom_s3"
BUCKET_NAME = os.environ.get("S3_XCOM_BUCKET_NAME")
@staticmethod
def _assert_s3_backend():
if S3XComBackend.BUCKET_NAME is None:
raise ValueError("Unknown bucket for S3 backend.")
@staticmethod
def serialize_value(value: Any):
if isinstance(value, pd.DataFrame):
S3XComBackend._assert_s3_backend()
hook = S3Hook()
key = f"data_{str(uuid.uuid4())}.csv"
filename = f"{key}.csv"
value.to_csv(filename, index=False)
hook.load_file(
filename=filename,
key=key,
bucket_name=S3XComBackend.BUCKET_NAME,
replace=True
)
value = f"{S3XComBackend.PREFIX}://{S3XComBackend.BUCKET_NAME}/{key}"
return BaseXCom.serialize_value(value)
@staticmethod
def deserialize_value(result) -> Any:
result = BaseXCom.deserialize_value(result)
if isinstance(result, str) and result.startswith(S3XComBackend.PREFIX):
S3XComBackend._assert_s3_backend()
hook = S3Hook()
key = result.replace(f"{S3XComBackend.PREFIX}://{S3XComBackend.BUCKET_NAME}/", "")
filename = hook.download_file(
key=key,
bucket_name=S3XComBackend.BUCKET_NAME,
local_path="/tmp"
)
result = pd.read_csv(filename)
return result
Airflow configuration
Set AIRFLOW__CORE__XCOM_BACKEND to your backend class path, for example:
xcom_s3_backend.S3XComBackend
Ensure the module is available in PYTHONPATH and S3_XCOM_BUCKET_NAME is set in the runtime environment.
Operational cautions
- Add lifecycle cleanup for generated objects to avoid unbounded S3 growth.
- Scope bucket permissions tightly so task roles can access only required keys.
- Validate serialization format consistency between write and read paths.
- Include run identifiers in keys if cross-run collisions are possible.
Object storage-backed XCom values should still be treated as transient task exchange, not a long-term data lake interface.