Apache Airflow ile Telegram’a Bildirim Mesajı Gönderme

Seda Kayademir
7 min readJun 18, 2024

--

Apache Airflow, veri işleme ve zamanlama işlerini otomatikleştirmek için kullanılan güçlü bir araçtır. Bu yazıda, Airflow’u kullanarak Telegram’a bildirim mesajları göndermeyi öğreneceğiz. Bu sayede, Airflow iş akışlarımızın durumu hakkında anlık bildirimler alabilir ve veri işleme süreçlerimiz hakkında bilgi sahibi olabiliriz.

Telegram Bot ve Kanal Oluşturma

Bot Oluşturma

Bilgilendirme mesajı gönderebilmek için bir Telegram botu oluşturalım. Bunu yapmak için aşağıdaki adımları izleyin:

  1. Telegram arama çubuğuna BotFather yazın. Onaylı olan hesabın üzerine tıklayıp /start komutuyla mesajlaşmayı başlatın.
BotFather

2. BotFather’a /newbot komutunu göndererek yeni bir bot oluşturacağınızı belirtin.

3. Bot oluşturma sırasında istenilen bot ismi ve kullanıcı adını belirtip devam edin. Bot oluştuktan sonra size verilen token bilgisini not edin, bu token’i Telegram API’ye bağlanmak için kullanacağız.

Kanal Oluşturma ve Kanal ID’sini Öğrenme

“Airflow Notification” adında bir kanal oluşturun (farklı bir isim de verebilirsiniz, bu ismi sonrasında title bilgisi olarak kullanacağız). Oluşturduğunuz kanala botu yönetici olarak ekleyin.

  1. .env dosyası oluşturun ve içine botunuzun token'ını ve oluşturduğunuz kanalın adını ekleyin:
TELEGRAM_TOKEN = "YOUR_TELEGRAM_TOKEN"
TITLE = "YOUR_TELEGRAM_TITLE"

2. Ardından, Telegram’da kanalınıza bir mesaj gönderin.

3. Aşağıdaki Python scriptini kullanarak kanalınızın ID’sini alın:

import os
import requests
from dotenv import load_dotenv

load_dotenv()

token = os.getenv("TELEGRAM_TOKEN")
title = os.getenv("TITLE")

def get_chat_id(token, title):
# Construct the URL for the getUpdates method
url = f"https://api.telegram.org/bot{token}/getUpdates"
# Define the parameters for the request
params = {"offset": "-1", "limit": "1", "allowed_updates": ["channel_post"]}
# Send a GET request to the Telegram API
response = requests.get(url, params=params)
# Parse the JSON response
data = response.json()
# Check if the response contains a "result" key and it's not empty
if "result" in data and len(data["result"]) > 0:
# Loop through each result in the response
for result in data["result"]:
# Check if the result contains a "channel_post" key
# and if the "title" of the chat matches the specified title
if "channel_post" in result and result["channel_post"]["chat"]["title"] == title:
# Return the chat ID if a match is found
return result["channel_post"]["chat"]["id"]
# Return None if no matching chat is found
return None


# Get the chat ID for the specified title
chat_id = get_chat_id(token, title)
# Print the chat ID
print(f"The ID of the chat with the title '{title}' is:", chat_id)

Docker ile Airflow Kurulumu

Airflow’u Docker üzerinde kurarak Telegram’a bildirim mesajı gönderme işlemini gerçekleştireceğiz. Gerekli kütüphanelerin yüklenmesi için Dockerfile kullanıyoruz. Bu Dockerfile, python-telegram-bot Python kütüphanesi ve Airflow için gerekli olan apache-airflow-providers-telegram kütüphanesini yükleyecektir.

Proje kodlarına repodan erişebilirsiniz. Proje kodlarını indirdikten sonra aşağıdaki adımları izleyerek test ortamınızı oluşturabilirsiniz:

1. Proje kodlarına repodan erişin ve projenin bulunduğu dizine gidin. Ardından terminalde aşağıdaki komutu çalıştırarak Docker konteynerlerini oluşturun:

docker-compose up - build -d

2. Telegram’a ulaşmak için kullanacağımız değişkenleri ve bağlantıları tanımlamak için airflow-webserver konteynerına bağlanmamız gerekecek. Bu değişkenleri ve Telegram bağlantısını aşağıdaki komutlarla tanımlayabiliriz:

# NOT: <YOUR_TOKEN> ve <YOUR_CHATID> kısımlarını kendi bilgilerinize göre düzenlemelisiniz.

docker exec -it airflow-webserver bash

airflow variables set TELEGRAM_TOKEN <YOUR_TOKEN>

airflow variables set TELEGRAM_CHATID <YOUR_CHATID>

airflow connections add telegram \
- conn-type=telegram \
- conn-password=<YOUR_TOKEN>

Mesaj Gönderme için DAG’lerin Oluşturulması

Mesaj göndermek için üç farklı yöntem kullanabiliriz:

Python Operator ile Mesaj Gönderme

Airflow TelegramOperator ile Mesaj Gönderme

Özel TelegramOperator’un Kullanılması

1. Python Operator ile Mesaj Gönderme

Python ile mesaj göndermek için python-telegram-bot kütüphanesini kullanacağız. Bu kütüphane, Telegram ile iletişim kurmamızı sağlayacak araçları içerir. send_telegram_message_async adında bir fonksiyon oluşturacağız ve bu fonksiyonu Python Operator ile çağırarak mesaj gönderme işlemini gerçekleştireceğiz. Asenkron kullanımın nedeni, Telegram'a mesaj gönderirken diğer işlemlerin aynı anda devam etmesini sağlamaktır. Bu sayede, Telegram'a mesaj gönderme işlemi tamamlanana kadar diğer görevlerimizde çalışmaya devam edebilir.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from telegram import Bot
from airflow.models import Variable
import telegram
import asyncio

# Retrieve chat_id and token from Airflow Variables
chat_id = Variable.get('TELEGRAM_CHATID')
token = Variable.get('TELEGRAM_TOKEN')

# Asynchronous function to send a message via Telegram
async def send_telegram_message_async():
bot_token = token
message = '''
Hello from Airflow!

This message is sent using the Airflow Python Operator.
'''

bot = telegram.Bot(token=bot_token)
await bot.send_message(chat_id=chat_id, text=message)

# Synchronous wrapper to run the async function


def send_telegram_message():
asyncio.run(send_telegram_message_async())


# Default arguments for the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
}

# Define the DAG
dag = DAG(
'python_telegram_notification',
default_args=default_args,
schedule_interval='@daily',
start_date=days_ago(1),
tags=['example'],
catchup=False
)

# DummyOperator to mark the start of the DAG
start = DummyOperator(
task_id='start',
retries=3,
dag=dag
)

# PythonOperator to send a message via Telegram
python_task = PythonOperator(
task_id='python_task',
python_callable=send_telegram_message,
dag=dag,
)

# DummyOperator to mark the end of the DAG
end = DummyOperator(
task_id='end',
retries=3,
dag=dag
)

# Set the task dependencies
start >> python_task >> end

2. Airflow TelegramOperator ile Mesaj Gönderme

Airflow TelegramOperator’ü kullanarak da mesaj gönderebiliriz. Bunun için apache-airflow-providers-telegram paketini kullanıyoruz.

from airflow import DAG
from airflow.providers.telegram.operators.telegram import TelegramOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable

# Retrieve the chat ID from Airflow Variables
chat_id = Variable.get('TELEGRAM_CHATID')

# Default arguments for the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
}

# Define the DAG
with DAG('telegram_notification_dag',
default_args=default_args,
schedule_interval='@daily',
start_date=days_ago(1),
tags=['example'],
catchup=False) as dag:

# Task to send a message using the TelegramOperator
send_message = TelegramOperator(
task_id='send_message',
telegram_conn_id='telegram', # Ensure this connection is set up in Airflow
chat_id=chat_id,
text = '''
Hello from Airflow!

This message is sent using the Airflow Telegram Operator.''',
)

3. Kullanıcı Tanımlı TelegramOperator

Son olarak, BaseHook ve BaseOperator sınıflarını genişleterek kendi TelegramHook ve TelegramOperator sınıflarımızı yazabiliriz. Bu sınıfları kullanarak mesaj gönderme işlemini gerçekleştirebiliriz.

Öncelikle, TelegramHook sınıfını oluşturalım:

from airflow.hooks.base import BaseHook
import requests

class TelegramHook(BaseHook):
def __init__(self, telegram_conn_id):
"""
Initialize the TelegramHook with the given connection ID.

:param telegram_conn_id: The connection ID for Telegram.
"""
super().__init__()
self.telegram_conn_id = telegram_conn_id
self.base_url = "https://api.telegram.org/bot"

def get_conn(self):
"""
Get the connection for Telegram.

:return: The Telegram connection.
"""
connection = self.get_connection(self.telegram_conn_id)
return connection

def send_message(self, chat_id, message):
"""
Send a message to a Telegram chat.

:param chat_id: The ID of the chat to send the message to.
:param message: The message to send.
:return: The response from the Telegram API.
"""
connection = self.get_conn()
token = connection.password
url = f"{self.base_url}{token}/sendMessage"
data = {
"chat_id": chat_id,
"text": message
}
response = requests.post(url, data=data)
return response.json()

Ardından, TelegramOperator sınıfını oluşturalım:

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from util.telegram.telegram_hook import TelegramHook

class TelegramOperator(BaseOperator):
"""
Operator for sending messages via Telegram.

:param telegram_conn_id: The connection ID to use for Telegram.
:type telegram_conn_id: str
:param chat_id: The ID of the chat where the message will be sent.
:type chat_id: str
:param message: The message to send.
:type message: str
:param args: Additional arguments for the BaseOperator
:param kwargs: Additional keyword arguments for the BaseOperator
"""
@apply_defaults
def __init__(self, telegram_conn_id, chat_id, message, *args, **kwargs):
super().__init__(*args, **kwargs)
self.telegram_conn_id = telegram_conn_id
self.chat_id = chat_id
self.message = message


def execute(self, context):
"""
Execute the TelegramOperator.

:param context: The context of the execution.
"""
hook = TelegramHook(self.telegram_conn_id)
hook.send_message(self.chat_id, self.message)

Son olarak, bu sınıfları kullanarak bir DAG oluşturalım:

from airflow import DAG
from datetime import datetime
from util.telegram.telegram_operator import TelegramOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable

# Retrieve the chat ID from Airflow Variables
chat_id = Variable.get('TELEGRAM_CHATID')

# Default arguments for the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
}

# Define the DAG
dag = DAG('custom_telegram_notification_dag',
default_args=default_args,
schedule_interval='@daily',
start_date=days_ago(1),
tags=['example'],
catchup=False)

# Task to send a message using the custom TelegramOperator
send_telegram_message = TelegramOperator(
task_id='custom_send_telegram_message',
telegram_conn_id='telegram', # Ensure this connection is set up in Airflow
chat_id=chat_id,
message='''
Hello from Airflow!

This message is sent using the Custom Airflow Telegram Operator.
''',
dag=dag
)

send_telegram_message

DAG’leri Çalıştırılması

Airflow arayüzüne bağlanarak DAG’leri sırasıyla çalıştırabilir ve Telegram kanalınızda mesajları gözlemleyebilirsiniz. Tarayıcınızda Airflow web arayüzüne gidin (http://localhost:8080). Kullanıcı adı ve şifre olarak “airflow” yazarak giriş yapın. DAG’leri çalıştırdıktan sonra Telegram’a girip mesajları kontrol edebilirsiniz.

DAG’leri çalıştırdıktan sonra Telegram’a girip mesajları kontrol edebilirsiniz.

Bu yazıda, Apache Airflow kullanarak Telegram’a bildirim mesajları göndermeyi ele aldık. Airflow’un güçlü özelliklerini kullanarak veri işleme süreçlerini otomatikleştirmeyi ve iş akışlarının durumu hakkında anlık bildirimler almayı öğrendik. Bir sonraki yazıda tekrar görüşmek üzere, hoşça kalın!

Kaynaklar

--

--