Dockerize Edilmiş Kafka ile API Verilerini Okuma ve İşleme
Bu yazıda, NewsAPI’den veri çekip Kafka ile nasıl işleyeceğimizi ve bu verileri MySQL veritabanına nasıl yazacağımızı öğreneceğiz. Tüm kurulumu Dockerize edeceğiz.
Önkoşullar:
Docker
Docker Compose
NewsAPI API Anahtarı
1. API Anahtarı Oluşturma:
İlk adım olarak, NewsAPI web sitesine kaydolun ve API anahtarınızı alın. Aldığınız tokenı test etmek için kendi token bilginizi YOUR_TOKEN
kısmına yazarak aşağıdaki adresten test edebilirsiniz:
https://newsapi.org/v2/everything?q=*&apiKey=YOUR_TOKEN
2. Proje Yapısı Oluşturalım:
Projeye github üzerinden de erişebilirsiniz. Projenin dizin yapısı aşağıdaki gibidir:
kafka_api_ingestion
├── app
│ ├── consumer
│ │ ├── consumer.py
│ │ └── Dockerfile
│ ├── producer
│ │ ├── producer.py
│ │ └── Dockerfile
│ └── requirements.txt
├── docker-compose.yml
├── etc
│ └── config
│ └── server.properties
└── README.md
Not: Kafka için gerekli olan server.properties dosyasını repodan alabilirsiniz burada paylaşmayacağım.
3. Docker Compose Dosyası:
Kafka, MySQL, Producer ve Consumer için servisleri tanımlamak için bir `docker-compose.yml` dosyası oluşturalım:
version: "3.8"
networks:
kafka:
name: kafka
driver: overlay
attachable: true
services:
kafka:
container_name: kafka
image: "kayademirseda/kafka:1.0"
ports:
- "9092:9092"
- "9091:9091"
networks:
- kafka
volumes:
- ./etc/config/server.properties:/kafka/config/server.properties
mysql:
restart: always
image: mysql:8.0.31
container_name: mysql
networks:
- kafka
ports:
- "3306:3306"
environment:
- MYSQL_DATABASE=kafka
- MYSQL_USER=kafka
- MYSQL_PASSWORD=kafka
- MYSQL_ROOT_PASSWORD=kafka
- MYSQL_ROOT_HOST="%"
producer:
container_name: producer
build:
context: .
dockerfile: ./app/producer/Dockerfile
depends_on:
- kafka
environment:
- NEWS_API_KEY=YOUR_TOKEN
networks:
- kafka
consumer:
container_name: consumer
build:
context: .
dockerfile: ./app/consumer/Dockerfile
depends_on:
- kafka
- mysql
networks:
- kafka
volumes:
kafka:
mysql:
Not: Producer kısmındaki
YOUR_TOKEN
kısmını kendi token bilginiz ile değiştirmeyi unutmayın.
4 . Producer Oluşturma:
NewsAPI’den veri almak için aşağıdaki adımları izleyerek bir Dockerfile
ve producer.py
dosyalarını içeren producer
dizinini oluşturalım.
1. app/producer/producer.py
Dosyası:
# app/producer/producer.py
import os
import requests
from kafka import KafkaProducer
import json
import time
# API URL ve Parametreler
api_url = "https://newsapi.org/v2/everything"
api_key = os.getenv('NEWS_API_KEY', 'your_default_api_key_here') # Replace 'your_default_api_key_here' with a default or remove it
params = {
"q": "*",
"apiKey": api_key
}
producer = KafkaProducer(
bootstrap_servers='kafka:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
topic = 'news-data'
def fetch_data_from_api():
response = requests.get(api_url, params=params)
if response.status_code == 200:
return response.json()
else:
print(f"API Error: {response.status_code}")
return None
def produce_to_kafka(data):
for article in data.get('articles', []):
producer.send(topic, value=article)
producer.flush()
while True:
data = fetch_data_from_api()
if data:
print(data)
produce_to_kafka(data)
time.sleep(10)
2. app/producer/Dockerfile
Dosyası:
# app/producer/Dockerfile
# Base image
FROM python:3.9
# Set working directory
WORKDIR /app
# Copy the application code
COPY ./app/producer .
# Copy and install the dependencies
COPY ./app/requirements.txt .
RUN pip install - no-cache-dir -r requirements.txt
# Command to run the producer.py script
CMD ["python", "producer.py"]
Bu dosyalar, NewsAPI’den veri çeken ve Kafka’ya yazan bir Python Producer uygulamasını içerir. producer.py
dosyası verileri NewsAPI'den çeker ve Kafka'ya yazar, Dockerfile
dosyası ise bu Python uygulamasını Docker konteynerinde çalıştırmak için gerekli adımları içerir.
Not:
NEWS_API_KEY
değişkenini kendi NewsAPI API anahtarınızla değiştirmeyi unutmayın.
5. Consumer Oluşturma:
Consumer dosyamızı oluşturalım ve ardından dockerize edelim.
1. app/consumer/consumer.py
Dosyası:
# app/consumer/consumer.py
from kafka import KafkaConsumer
import json
import logging
import mysql.connector
from datetime import datetime
try:
consumer = KafkaConsumer(
'news-data',
bootstrap_servers='kafka:9092',
auto_offset_reset='latest',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
# MySQL bağlantısı oluştur
mysql_connection = mysql.connector.connect(
host='mysql',
database='kafka',
user='kafka',
password='kafka'
)
mysql_cursor = mysql_connection.cursor()
for message in consumer:
data = message.value
print("Received data:", data)
source_id = data['source']['id']
source_name = data['source']['name']
author = data['author']
title = data['title']
description = data['description']
url = data['url']
urlToImage = data['urlToImage']
publishedAt = datetime.strptime(data['publishedAt'], '%Y-%m-%dT%H:%M:%SZ')
content = data['content']
insert_query = "INSERT INTO news_data (source_id, source_name, author, title, description, url, urlToImage, publishedAt, content) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)"
record_to_insert = (source_id, source_name, author, title,
description, url, urlToImage, publishedAt, content)
mysql_cursor.execute(insert_query, record_to_insert)
mysql_connection.commit()
except Exception as e:
print("Error:", e)
finally:
if 'consumer' in locals():
consumer.close()
if 'mysql_connection' in locals():
mysql_connection.close()
2. app/consumer/Dockerfile
Dosyası:
# app/consumer/Dockerfile
# Base image
FROM python:3.9
# Set working directory
WORKDIR /app
# Copy the application code
COPY ./app/consumer .
COPY app/requirements.txt .
RUN pip install - no-cache-dir -r requirements.txt
# Command to run the consumer.py script
CMD ["python", "consumer.py"]
Bu dosyalar, Kafka’dan veri alan ve MySQL veritabanına yazan bir Python Consumer uygulamasını içerir. consumer.py
dosyası Kafka'dan verileri alır ve MySQL veritabanına yazar, Dockerfile
dosyası ise bu Python uygulamasını Docker konteynerinde çalıştırmak için gerekli adımları içerir.
6. Gereksinimler Dosyasını Oluşturun:
Bağımlılıkları listeleyen bir requirements.txt
dosyası oluşturarak gerekli Python paketlerini ekleyelim.
# app/requirements.txt
kafka-python==2.0.2
mysql-connector-python==8.4.0
requests==2.32.2
Kafka ile iletişim kurmak için kafka-python
, MySQL ile iletişim kurmak için mysql-connector-python
ve API'den veri almak için requests
paketlerini kullanıyoruz.
7. MySQL’de Tablo Oluşturma:
MySQL konteynerine bağlanarak `news_data` tablosunu oluşturmak için aşağıdaki adımları takip edelim:
1. MySQL Konteynerini Başlatma:
docker-compose up -d mysql
2. MySQL Konteynerine Bağlanma:
docker exec -it mysql bash
3. MySQL’e Giriş Yapma:
mysql -u root -p
Ardından şifre olarak docker-compose da belirttiğimiz gibi kafka
yazıyoruz ve aşağıdaki SQL komutlarını çalıştırarak news_data
tablosunu oluşturabiliriz:
use kafka;
CREATE TABLE news_data (
id INT AUTO_INCREMENT PRIMARY KEY,
source_id VARCHAR(255),
source_name VARCHAR(255),
author VARCHAR(255),
title TEXT,
description TEXT,
url TEXT,
urlToImage TEXT,
publishedAt DATETIME,
content TEXT
);
8. Docker-compose’u Çalıştırma:
Docker-compose’u çalıştırmadan önce, compose dosyası içerisindeki producer
servisinin NEWS_API_KEY
değişkenini düzenlememiz gerekmektedir. Ardından aşağıdaki komutu kullanarak tüm konteynerları çalıştıralım:
docker-compose up -d — build
Bu komut, compose dosyasındaki servisleri başlatırken, gerekirse imajları yeniden oluşturur (--build
flag'ı ile) ve tüm konteynerları arka planda çalıştırır (-d
flag'ı ile).
9. Veri tabanına Bağlanıp Tabloyu Kontrolü:
MySQL konteynerine bağlanarak verilerin yazılıp/yazılmadığını kontrol edelim.
docker exec -it mysql bash
mysql -u root -p
# Password: kafka
use kafka;
select * from news_data;
Bu yazıda, NewsAPI’den haber makaleleri almak için Kafka Producer, veriyi işlemek için Kafka Consumer oluşturduk. API’den aldığımız veriyi MySQL veritabanında kaydettik. Tüm kurulumu, dağıtımı ve yönetimi kolaylaştırmak için Dockerize ettik. Okuduğunuz için teşekkürler. Sonraki yazılarda görüşmek üzere.