深入探讨airflow中的日期问题

最近用airflow做流程调度,遇到了不少跟时间相关的问题。现在整理一下遇到的几个时间,并给出他们在源码中的定义。

模板时间

在models.py get_template_context(self, session=None)函数中定义

1
2
3
4
5
6
7
8
9
10
11
12
ds = self.execution_date.isoformat()[:10]
ts = self.execution_date.isoformat()
yesterday_ds = (self.execution_date - timedelta(1)).isoformat()[:10]
tomorrow_ds = (self.execution_date + timedelta(1)).isoformat()[:10]

prev_execution_date = task.dag.previous_schedule(self.execution_date)
next_execution_date = task.dag.following_schedule(self.execution_date)

ds_nodash = ds.replace('-', '')
ts_nodash = ts.replace('-', '').replace(':', '')
yesterday_ds_nodash = yesterday_ds.replace('-', '')
tomorrow_ds_nodash = tomorrow_ds.replace('-', '')

可见模板时间全部依赖execution_date, execution_date从哪定义的呢?稍后说

使用模板变量是还遇到一个格式化字符串问题:例如我有下面一个模板字符串

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
sqoop_inc_cmd = r"""
sqoop import \
--connect 'jdbc:sqlserver://hostname:port;username=USERNAME;password=PASSWORD;database=DBNAME' \
--query "{{ params.query }} and \$CONDITIONS" \
--delete-target-dir \
--target-dir {{ params.targetdir }} \
--hive-import \
--hive-table {{ params.hivetable }} \
--fields-terminated-by '\t' \
--hive-partition-key ds \
--hive-partition-value {{ params.ds}} \
--null-string '\\N' \
--null-non-string '\\N' \
-m 1
"""

如果params.ds这个变量是用bash_operator参数传递的,模板的变量替换会失败。猜测可能是因为ds变量命名问题
改成params.ds_value后,模板变量替换正常。

execution_date

找到airflow web 页面,execution date随处可见,分两种:

  • 一种是scheduled run, dag的execution date 是start date减去一天
  • 另一种是external triggered run, execution date就是当时触发的时间