Short guide: How to use PostgresOperator in Apache Airflow?

SQLAlchemy inside

Set up connection to PostgreSQL

Edit postgres_default connection in Airflow Connections

DAG

from datetime import datetime
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator

dag_params = {
'dag_id': 'PostgresOperator_dag',
'start_date': datetime(2019, 10, 7),
'schedule_interval': None
}

with DAG(**dag_params) as dag:

create_table = PostgresOperator(
task_id='create_table',
sql='''CREATE TABLE new_table(
custom_id integer NOT NULL, timestamp TIMESTAMP NOT NULL, user_id VARCHAR (50) NOT NULL
);''',
)
PostgreSQL task success
table exists issue
INSERT INTO new_table VALUES(%s, %s, %s)
insert_row = PostgresOperator(
task_id='insert_row',
sql='INSERT INTO new_table VALUES(%s, %s, %s)',
trigger_rule=TriggerRule.ALL_DONE,
parameters=(uuid.uuid4().int % 123456789, datetime.now(), uuid.uuid4().hex[:10])
)

create_table >> insert_row
full DAG body
success insert row to PostgreSQL

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store