fake.hooks
Some users will use hook in Python operator or custom operator, for example, we have a custom Python task like:
from contextlib import closing
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
def demo():
connection = PostgresHook.get_connection("postgres_default")
hook = PostgresHook(connection=connection)
with closing(hook.get_conn()) as conn:
with closing(conn.cursor()) as cursor:
cursor.execute("SELECT 1")
print(cursor.fetchall())
demo = PythonOperator(
task_id="demo",
python_callable=test,
)
when you use air2phin to migrate it to dolphinscheduler python SDK, the result will be:
from contextlib import closing
from pydolphinscheduler.tasks.python import Python
from airflow.providers.postgres.hooks.postgres import PostgresHook
def demo():
connection = PostgresHook.get_connection("postgres_default")
hook = PostgresHook(connection=connection)
with closing(hook.get_conn()) as conn:
with closing(conn.cursor()) as cursor:
cursor.execute("SELECT 1")
print(cursor.fetchall())
demo = Python(
name="demo",
definition=test,
)
As you can see, the task demo
’s class name and its attributes is migrated to dolphinscheduler’s, but the function
demo
is not migrated, because it is a function and not an airflow task.
We can find it used airflow.PostgresHook
to connect to airflow’s metadata database, and then execute SQL.
This code cannot be run in dolphinscheduler python SDK, because airflow.PostgresHook
is a concept of airflow
only. There are two ways if you want it successfully run by dolphinscheduler python SDK, one is to rewrite the function
to make it work with dolphinscheduler, another is to use air2phin.fake
without any modification.
Usage
Basic Usage
When you run air2phin migrate
subcommand, it will automatically detect if there are any airflow.hooks
in your DAG. If so, it will automatically migrate the hook module into module air2phin.fake
, which means
you can do nothing for hook migration.
Note
Module air2phin.fake
only support two hooks migration, which are airflow.PostgresHook
and airflow.MySqlHook
. If you want to migrate other hooks, you can use custom rules
With air2phin.fake
module, the original DAG can be migrated to:
from contextlib import closing
from pydolphinscheduler.tasks.python import Python
from air2phin.fake.hooks.postgres import PostgresHook
def demo():
connection = PostgresHook.get_connection("postgres_default")
hook = PostgresHook(connection=connection)
with closing(hook.get_conn()) as conn:
with closing(conn.cursor()) as cursor:
cursor.execute("SELECT 1")
print(cursor.fetchall())
demo = Python(
name="demo",
definition=test,
)
And you can see air2phin migrate the hook module from airflow.providers.postgres.hooks.postgres.PostgresHook
to
air2phin.fake.hooks.postgres.PostgresHook
. When you run the code in dolphinscheduler, air2phin.fake
will
query dolphinscheduler metadata database to get the connection information, you can use it just like you use
airflow.providers.postgres.hooks.postgres.PostgresHook
Requirement
The network of dolphinscheduler workers can connect to dolphinscheduler metadata database is be required. Because
air2phin.fake
will query the connection information from dolphinscheduler metadata database.The data source named
postgres_default
(same as airflow’s connection) must exist in dolphinscheduler metadata database for air2phin.fake to get the connection information.Methods can connect to the dolphinscheduler metabase, any one of the following is acceptable:
Package pydolphinscheduler is installed in the dolphinscheduler worker’s python environment, and make sure the token is correct.
An environment variable named
AIR2PHIN_FAKE_CONNECTION
set with the connection information of the dolphinscheduler metadata database. It is use sqlalchemy connection string format for example:postgresql+psycopg2://scott:tiger@localhost:5432/mydatabase
. We recommend you use dolphinscheduler’s Environmental Management to do that, all you should do is add a new environment with content likeexport AIR2PHIN_FAKE_CONNECTION=postgresql+psycopg2://scott:tiger@localhost:5432/mydatabase
and use it in your dolphinscheduler’s Python task.
Note
The priority of package
pydolphinscheduler
is higher than the environment variablesAIR2PHIN_FAKE_CONNECTION
. If you want less connections for your dolphinscheduler metadata database, please use package pydolphinscheduler which will reuse the connection pool of dolphinscheduler itself. But if you do not case much of the connections number(such as do not have many tasks using air2phin.fake), or do not want to install pydolphinscheduler dolphinscheduler worker, environment variableAIR2PHIN_FAKE_CONNECTION
is a better choice.
With Non-unique Datasource Name
Dolphinscheduler datasource uses joint unique index (type, name)
to ensure the datasource name is unique.
but the airflow connection id is unique. So when your dolphinscheduler metadata database has two datasource with the same name,
air2phin.fake will raise an error, in this case, you should add the type of datasource
# When you have two datasource named "postgres_default" in the dolphinscheduler metadata database
from air2phin.fake.hooks.postgres import PostgresHook
connection = PostgresHook.get_connection("postgres_default")
# You should add the type of datasource, into the format "type.name"
from air2phin.fake.hooks.postgres import PostgresHook
connection = PostgresHook.get_connection("postgres.postgres_default")
or you can change your datasource name to make it unique.
# Change the datasource name to make it unique, for example, change from "postgres_default" to "postgres_default_uniq"
from air2phin.fake.hooks.postgres import PostgresHook
connection = PostgresHook.get_connection("postgres_default_uniq")
And dolphinscheduler only supports the following types of datasource, which mean your type must be one of them:
mysql
postgresql
hive
spark
clickhouse
oracle
sqlserver
db2
presto
h2
redshift
dameng
starrocks