Custom Rules
Create Custom Rule
Sometimes, you need to add some custom rules to your migration. For example, you have some custom airflow operators
base on the existing PostgresOperator
and you want to migrate them to dolphinscheduler. The custom operator
definition is like this:
# Just create a custom operator base on PostgresOperator, and do nothing except change the connection
# arguments name from ``postgres_conn_id`` to ``my_custom_conn_id``
from airflow.providers.postgres.operators.postgres import PostgresOperator
class MyCustomOperator(PostgresOperator):
def __init__(
self,
*,
sql: str | Iterable[str],
my_custom_conn_id: str = 'postgres_default',
autocommit: bool = False,
parameters: Iterable | Mapping | None = None,
database: str | None = None,
runtime_parameters: Mapping | None = None,
**kwargs,
) -> None:
super().__init__(
sql=sql,
postgres_conn_id=my_custom_conn_id,
autocommit=autocommit,
parameters=parameters,
database=database,
runtime_parameters=runtime_parameters,
**kwargs,
)
You put this operator same directory as your DAG file. and your airflow dags files are like this:
dags
└── dag.py
custom
└── my_custom_operator.py
And in the file dag.py
, you use this custom operator like this:
from custom.my_custom_operator import MyCustomOperator
with DAG(
dag_id='my_custom_dag',
default_args=default_args,
schedule_interval='@once',
start_date=days_ago(2),
tags=['example'],
) as dag:
t1 = MyCustomOperator(
task_id='my_custom_task',
sql='select * from table',
my_custom_conn_id='my_custom_conn_id',
)
In this case, you can add a custom rule file named MyCustomOperator.yaml
to tell air2phin what you want to do
during the migration
name: MyCustomOperator
description: The configuration for migrating airflow custom operator MyCustomOperator to DolphinScheduler SQL task.
migration:
module:
- action: replace
src: custom.my_custom_operator.MyCustomOperator
dest: pydolphinscheduler.tasks.sql.Sql
parameter:
- action: replace
src: task_id
dest: name
- action: replace
src: my_custom_conn_id
dest: datasource_name
Use Custom Rule
Save the YAML config file to any directory you want, and declare the path when you run the air2phin
command:
air2phin migrate --custom-rules /path/to/MyCustomOperator.yaml ~/airflow/dags/dag.py
And you can see the new DAG file directory ~/airflow/dags
named dag-air2phin.py
is created which is the
migrated result of dag.py
.
Use Multiple Custom Rules
Air2phin also supports using multiple custom rules in a single migration, and has directory and scatter files due to different files organized.
In Single File and Directory
When all custom rules are in one single file or directory, use single options argument --custom-rules
or -r
can use them
# single file
air2phin migrate --custom-rules /path/to/MyCustomOperator.yaml ~/airflow/dags/dag.py
# single directory
air2phin migrate --custom-rules /path/to/rules/dir ~/airflow/dags/dag.py
In Scatter Files or Directories
Sometimes, our rules will be in the different places, and air2phin support the use option argument --custom-rules
or -r
multiple times in one single migration
# multiple files
air2phin migrate --custom-rules /path/to/MyCustomOperator1.yaml --custom-rules /path/to/MyCustomOperator2.yaml ~/airflow/dags/dag.py
# multiple directories
air2phin migrate --custom-rules /path/to/rules/dir1 --custom-rules /path/to/rules/dir2 ~/airflow/dags/dag.py
# multiple mixed files and directories
air2phin migrate --custom-rules /path/to/MyCustomOperator1.yaml --custom-rules /path/to/rules/dir1 ~/airflow/dags/dag.py
Use Custom Rule Only Without Built-ins
All the above examples using custom rules combine built-in rules and customs, sometimes we just want to apply
the custom rule to migrate existing files, just like we apply a patch to our codebase. We can use option argument
--custom-only
or -R
to use custom rules and ignore built-in.
# Only use custom rules and ignore built-in one
air2phin migrate --custom-rules /path/to/MyCustomOperator1.yaml --custom-only ~/airflow/dags/dag.py
It is useful when you have lots of files to migrate, if you found some code should change again after the first migration run, but do not want to apply all the rules which cost lots of time, you can try to use this feature.
Use Rule Override
Custom rules provide the ability to override built-in rules. Sometimes we want to override the built-in migrate rules by custom one, we can use the same name as the built-in rule when you specify the custom rule.
For example, we have the build-in rule PythonOperator.yaml
, and the content as below:
name: PythonOperator
description: The configuration for migrating Airflow PythonOperator to DolphinScheduler Python task.
migration:
module:
- action: replace
src:
- airflow.operators.python_operator.PythonOperator
- airflow.operators.python.PythonOperator
dest: pydolphinscheduler.tasks.python.Python
parameter:
- action: replace
src: task_id
dest: name
- action: replace
src: python_callable
dest: definition
If you want to run those python task base on the dolphinscheduler specific environment, the best practice is to use rule
override. Create a custom rule with the name CustomPythonOperator.yaml
with content
name: PythonOperator
description: The configuration for migrating Airflow PythonOperator to DolphinScheduler Python task.
migration:
module:
- action: replace
src:
- airflow.operators.python_operator.PythonOperator
- airflow.operators.python.PythonOperator
dest: pydolphinscheduler.tasks.python.Python
parameter:
- action: replace
src: task_id
dest: name
- action: replace
src: python_callable
dest: definition
- action: add
arg: environment_name
default:
type: str
value: airflow_migrate
We do nothing but add five new lines(Note that the name
attribute in CustomPythonOperator.yaml
is the
same as the value of built-in on in PythonOperator.yaml
)
- action: add
arg: environment_name
default:
type: str
value: airflow_migrate
in CustomPythonOperator.yaml
to tell air2phin add one new argument name environment_name
with default
value airflow_migrate
, then we can use it by command
air2phin migrate --custom-rules CustomPythonOperator.yaml ~/airflow/dags/dag.py
PythonOperator.yaml
will be overridden by CustomPythonOperator.yaml
due to CustomPythonOperator.yaml
have the same name and CustomPythonOperator.yaml
is the custom rule.
Note
We use the name
attribute in the file content instead of the filename of identification