********A. Cài server apache-airflow trên môi trường Ubuntu-20.04********
****I. Môi trường****
Install WSL & update to WSL 2. Open PowerShell as Administrator and run:
dism.exe /online /enable-feature /featurename:Microsoft-Windows-Subsystem-Linux /all /norestart
dism.exe /online /enable-feature /featurename:VirtualMachinePlatform /all /norestart
Download & Install -> https://wslstorestorage.blob.core.windows.net/wslblob/wsl_update_x64.msi
wsl — set-default-version 2
Download & Install Linux distribution [Ubuntu 20.04 LTS]
Check danh sách các máy ảo đang chạy và verion WSL đang sử dụng
wsl — list — verbose
Nếu máy ảo đang chạy trên version WSL v1 thì set lại version cho nó:
wsl — set-version <distribution name> <versionNumber>
sudo apt update && sudo apt upgrade
sudo apt install python3
sudo apt install python3-pip
****II. Install Apache-Airflow 2.0.1****
sudo apt-get install software-properties-common
sudo apt-add-repository universe
sudo apt-get update
sudo apt-get install python-setuptools
sudo apt-get install libmysqlclient-dev
sudo apt-get install libssl-dev
sudo apt-get install libkrb5-dev
export AIRFLOW_HOME=~/airflow
sudo pip3 install apache-airflow
sudo pip3 install typing_extensions
Sau khi cài apache-airflow phải khởi tạo database back-end(Repository data) để lưu cấu hình. Mặc định khi cài Apache-Airflow sẽ sử dụng SQLite để làm database back-end . Mình muốn sử dụng Postgres để làm database back-end.
Install postgres:
sudo apt install postgresql postgresql-contrib
sudo -i -u postgres
pg_lsclusters — Hiển thị thông tin các clusters PostgreSQL
pg_ctlcluster {Ver} main start — start một cluster PostgreSQL
Mở psql để tạo user và database vd:
CREATE USER airflow;
Trong AIRFLOW_HOME có file airflow.cfg sử lại tham số sql_alchemy_conn để trỏ tới serv Postgres:
sql_alchemy_conn = postgresql+psycopg2://airflow:admin@localhost:5432/airflow
Khởi tạo database-backend:
airflow db init
Tạo acc vào Airflow:
airflow users create \
— username admin \
— firstname Trump \
— lastname Donald \
— role Admin \
— email trumpd@gmail.com
****III. Apache-Airflow****
Trước khi lunch Apache-Airflow kiểm tra xem db back-end postgres đang online không(Khi reboot host db bị down). Nếu đang down thì phải start lại:
sudo service postgresql restart
Start scheduler instance(service để execute các requests) và Airflow webserver instance(UI để xem và thực thi các DAGs) trên 2 remote session khác nhau cùng lúc:
airflow scheduler
airflow webserver [-p 8080]
****IV. Extract — Load data from/to database with Apache-Airflow****
Để connect tới db thì cần cài các provider tương ứng.
Danh sách providers pakage:
1. Oracle
Cài provider :
pip3 install apache-airflow-providers-oracle
Đối với database Oracle, cần cài thêm oracle client để có thể connect tới database server:
download instant client package:
download sqlplus package:
sudo apt update && sudo apt upgrade
sudo apt-get install -y unzip
sudo unzip <download>/instantclient-basic-linux.x64– -d /opt/oracle
sudo unzip <download>/instantclient-sqlplus-linux.x64– -d /opt/oracle/
nano ~/.profile
export PATH=”$PATH:/opt/oracle/instantclient_19_8"
export LD_LIBRARY_PATH=”$LD_LIBRARY_PATH:/opt/oracle/instantclient_19_8"
source ~/.profile
**DAG example:**
from datetime import datetime
from airflow import DAG
from airflow.operators.oracle_operator import OracleOperator
from airflow.hooks.oracle_hook import OracleHook
from airflow.operators.python_operator import PythonOperator
import requests
import json
from bs4 import BeautifulSoup
def get_data_api():
db = OracleHook(oracle_conn_id=’oracle_test’)
db_conn = db.get_conn()
db_cursor = db_conn.cursor()
url = “https://khcn.vista.gov.vn/vuejx/”
payload="{\"query\":\"\\n query search($token: String, $body: JSON, $db: String, $collection: String) {\\n results: search(token: $token, body: $body, db: $db, collection: $collection )\\n }\\n \”,\”variables\”:\”{\\\”body\\\”:{\\\”size\\\”:30,\\\”query\\\”:{\\\”bool\\\”:{\\\”filter\\\”:{\\\”match\\\”:{\\\”site\\\”:\\\”guest\\\”}},\\\”must\\\”:[{\\\”match\\\”:{\\\”storage\\\”:\\\”regular\\\”}}]}},\\\”highlight\\\”:{\\\”pre_tags\\\”:[\\\”<es_em>\\\”],\\\”post_tags\\\”:[\\\”</es_em>\\\”],\\\”fields\\\”:{\\\”*\\\”:{}}},\\\”_source\\\”:{\\\”includes\\\”:[\\\”id\\\”,\\\”Collection\\\”,\\\”Type\\\”,\\\”Title\\\”,\\\”Authors\\\”,\\\”PublicationDate\\\”,\\\”Authors\\\”,\\\”Keyword\\\”,\\\”Publisher\\\”,\\\”Subject\\\”,\\\”Journal\\\”,\\\”Number\\\”,\\\”ISSN\\\”,\\\”StartPage\\\”,\\\”EndPage\\\”,\\\”Updated_Date\\\”]},\\\”sort\\\”:[{\\\”_score\\\”:\\\”desc\\\”},{\\\”PublicationDate\\\”:\\\”desc\\\”}],\\\”from\\\”:0},\\\”db\\\”:\\\”khcn\\\”,\\\”collection\\\”:\\\”Publication\\\”,\\\”token\\\”:null}\”}”
headers = {‘Content-Type’: ‘application/json’}
res = requests.post(url, headers=headers, data=payload,verify=False)
if res.status_code == 200:
# Kiểm tra được certificate
result = json.loads(res.content)
# for h in (result[‘data’][‘results’][‘hits’][‘hits’]):
# print(h[‘_id’])
# print(h[‘_source’][‘Title’])
# print(h)
# break
for item in result[‘data’][‘results’][‘hits’][‘hits’]:
_id = (item.get(‘_source’,None).get(‘id’,None))
_value = (item.get(‘_source’,None).get(‘Title’,None).get(‘_source’,None).get(‘_value’,None))
db_cursor.execute(‘insert into TPR_POC_DB.test_vuejx (“_id”,”Title”) VALUES (:1,:2)’, (_id,_value))
# Certificate lỗi
def get_data_html():
db = OracleHook(oracle_conn_id=’oracle_test’)
db_conn = db.get_conn()
db_cursor = db_conn.cursor()
# Make a GET request to fetch the raw HTML content
html_content = requests.get(url).text
# Parse the html content
soup = BeautifulSoup(html_content, “lxml”)
table = soup.find(“table”, attrs={“class”: “table”})
header = table.thead.find_all(“th”)
detail = table.tbody.find_all(“tr”)
for th in header:
for tr in detail:
# print(tr)
_values = []
for td in tr.find_all(“td”):
db_cursor.execute(‘INSERT INTO TPR_POC_DB.TEST_DIEM(“stt”, “truong”, “diem”, “ghichu”) VALUES(:1, :2, :3, :4)’, tuple(_values))
dag = DAG(
dag_id = ‘get_data’,
start_date = datetime(2021, 1, 1),
schedule_interval = ‘*/10 * * * *’,
task_1 = PythonOperator(
task_2 = PythonOperator(
Miscellaneous commands
airflow cheat-sheet | Display cheat sheet
airflow info | Show information about current Airflow and environment
airflow kerberos | Start a kerberos ticket renewer
airflow plugins | Dump information about loaded plugins
airflow rotate-fernet-key | Rotate encrypted connection credentials and variables
airflow scheduler | Start a scheduler instance
airflow sync-perm | Update permissions for existing roles and DAGs
airflow version | Show the version
airflow webserver | Start a Airflow webserver instance
Celery components
airflow celery flower | Start a Celery Flower
airflow celery stop | Stop the Celery worker gracefully
airflow celery worker | Start a Celery worker node
View configuration
airflow config get-value | Print the value of the configuration
airflow config list | List options for the configuration
Manage connections
airflow connections add | Add a connection
airflow connections delete | Delete a connection
airflow connections export | Export all connections
airflow connections get | Get a connection
airflow connections list | List connections
Manage DAGs
airflow dags backfill | Run subsections of a DAG for a specified date range
airflow dags delete | Delete all DB records related to the specified DAG
airflow dags list | List all the DAGs
airflow dags list-jobs | List the jobs
airflow dags list-runs | List DAG runs given a DAG id
airflow dags next-execution | Get the next execution datetimes of a DAG
airflow dags pause | Pause a DAG
airflow dags report | Show DagBag loading report
airflow dags show | Displays DAG’s tasks with their dependencies
airflow dags state | Get the status of a dag run
airflow dags test | Execute one single DagRun
airflow dags trigger | Trigger a DAG run
airflow dags unpause | Resume a paused DAG
Database operations
airflow db check | Check if the database can be reached
airflow db check-migrations | Check if migration have finished
airflow db init | Initialize the metadata database
airflow db reset | Burn down and rebuild the metadata database
airflow db shell | Runs a shell to access the database
airflow db upgrade | Upgrade the metadata database to latest version
Tools to help run the KubernetesExecutor
airflow kubernetes cleanup-pods | Clean up Kubernetes pods in evicted/failed/succeeded states
airflow kubernetes generate-dag-yaml | Generate YAML files for all tasks in DAG. Useful for debugging tasks without launching into a cluster
Manage pools
airflow pools delete | Delete pool
airflow pools export | Export all pools
airflow pools get | Get pool size
airflow pools import | Import pools
airflow pools list | List pools
airflow pools set | Configure pool
Display providers
airflow providers behaviours | Get information about registered connection types with custom behaviours
airflow providers get | Get detailed information about a provider
airflow providers hooks | List registered provider hooks
airflow providers links | List extra links registered by the providers
airflow providers list | List installed providers
airflow providers widgets | Get information about registered connection form widgets
Manage roles
airflow roles create | Create role
airflow roles list | List roles
Manage tasks
airflow tasks clear | Clear a set of task instance, as if they never ran
airflow tasks failed-deps | Returns the unmet dependencies for a task instance
airflow tasks list | List the tasks within a DAG
airflow tasks render | Render a task instance’s template(s)
airflow tasks run | Run a single task instance
airflow tasks state | Get the status of a task instance
airflow tasks states-for-dag-run | Get the status of all task instances in a dag run
airflow tasks test | Test a task instance
Manage users
airflow users add-role | Add role to a user
airflow users create | Create a user
airflow users delete | Delete a user
airflow users export | Export all users
airflow users import | Import users
airflow users list | List users
airflow users remove-role | Remove role from a user
Manage variables
airflow variables delete | Delete variable
airflow variables export | Export all variables
airflow variables get | Get variable
airflow variables import | Import variables
airflow variables list | List variables
airflow variables set | Set variable
B. Cài Apache-Airflow đầy đủ cho việc phát triển.
I. Cài đặt Docker trên Ubuntu
Xóa, dọn dẹp các cài đặt cũ:
sudo apt — purge autoremove docker docker-engine docker.io containerd runc
Cài các lib cần
sudo apt update && sudo apt install software-properties-common gnupg2 curl ca-certificates apt-transport-https
Thêm key fingerprint GPG vào danh sách tin cây của APT
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
Thêm kho lưu trữ Docker cho Ubuntu vào nguồn phần mềm.
sudo add-apt-repository “deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable”
Cài đặt Docker.
sudo apt update && sudo apt install docker-ce
sudo apt install docker-compose
sudo adduser $USER docker
II. Cài Docker Compose.
COMPOSE_VERSION=”$(curl -s https://api.github.com/repos/docker/compose/releases/latest | grep ‘“tag_name”:’\
| cut -d ‘“‘ -f 4)”
docker-compose-$(uname -s)-$(uname -m)”
sudo curl -L “${COMPOSE_URL}” -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
docker-compose — version
III. Cài đặt Pyenv
Checking required packages
sudo add-apt-repository ppa:deadsnakes/ppa && sudo apt update
sudo apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev libncursesw5-dev xz-utils tk-dev libffi-dev liblzma-dev python-openssl git
sudo apt install build-essential python3.6-dev python3.7-dev python3.8-dev python-dev openssl sqlite3 libsqlite3-dev default-libmysqlclient-dev libmariadbd-dev libmariadb-dev libmysqlclient-dev:i386 libmysqlclient-dev postgresql
Install pyenv
curl https://pyenv.run | bash
thêm các dòng suggested vào cuối ~/.bashrc
Restart your shell
exec $SHELL
installing required Python version to pyenv and verifying it
pyenv install — list
pyenv install 3.8.5
pyenv versions
Creating new virtual environment named airflow-env
pyenv virtualenv 3.8.5 airflow-env
IV. Cài Apache-Airflow với Breeze
clone source code về: (vd)
git clone https://github.com/dqtrung1702/airflow.git
Thêm dòng sau vào ~/.bashrc để call breeze cho tiện: (vd)
export PATH=${PATH}:”/home/trung/Apache-Airflow/airflow”
source ~/.bashrc
activate venv lên:
pyenv activate airflow-env
mở tới project directory: (vd)
cd /home/trung/Apache-Airflow/airflow
Init sth:
breeze setup-autocomplete
breeze — python 3.8 — backend postgres
airflow db reset
airflow users create — role Admin — username admin — password admin — email trung@dang — firstname trung — lastname dang
chạy airflow map vào db đã tạo:
breeze start-airflow initialize-local-virtualenv — python 3.8 — backend postgres
Port Forwarding
When you run Airflow Breeze, the following ports are automatically forwarded:
28080 -> forwarded to Airflow webserver -> airflow:8080
25555 -> forwarded to Flower dashboard -> airflow:5555
25433 -> forwarded to Postgres database -> postgres:5432
23306 -> forwarded to MySQL database -> mysql:3306
26379 -> forwarded to Redis broker -> redis:6379