Dockerize Edilmiş Kafka ile API Verilerini Okuma ve İşleme

Seda Kayademir
4 min readJun 21, 2024

--

Bu yazıda, NewsAPI’den veri çekip Kafka ile nasıl işleyeceğimizi ve bu verileri MySQL veritabanına nasıl yazacağımızı öğreneceğiz. Tüm kurulumu Dockerize edeceğiz.

Önkoşullar:

Docker

Docker Compose

NewsAPI API Anahtarı

1. API Anahtarı Oluşturma:

İlk adım olarak, NewsAPI web sitesine kaydolun ve API anahtarınızı alın. Aldığınız tokenı test etmek için kendi token bilginizi YOUR_TOKEN kısmına yazarak aşağıdaki adresten test edebilirsiniz:

https://newsapi.org/v2/everything?q=*&apiKey=YOUR_TOKEN

2. Proje Yapısı Oluşturalım:

Projeye github üzerinden de erişebilirsiniz. Projenin dizin yapısı aşağıdaki gibidir:

kafka_api_ingestion
├── app
│ ├── consumer
│ │ ├── consumer.py
│ │ └── Dockerfile
│ ├── producer
│ │ ├── producer.py
│ │ └── Dockerfile
│ └── requirements.txt
├── docker-compose.yml
├── etc
│ └── config
│ └── server.properties
└── README.md

Not: Kafka için gerekli olan server.properties dosyasını repodan alabilirsiniz burada paylaşmayacağım.

3. Docker Compose Dosyası:

Kafka, MySQL, Producer ve Consumer için servisleri tanımlamak için bir `docker-compose.yml` dosyası oluşturalım:

version: "3.8"

networks:
kafka:
name: kafka
driver: overlay
attachable: true

services:
kafka:
container_name: kafka
image: "kayademirseda/kafka:1.0"
ports:
- "9092:9092"
- "9091:9091"
networks:
- kafka
volumes:
- ./etc/config/server.properties:/kafka/config/server.properties

mysql:
restart: always
image: mysql:8.0.31
container_name: mysql
networks:
- kafka
ports:
- "3306:3306"
environment:
- MYSQL_DATABASE=kafka
- MYSQL_USER=kafka
- MYSQL_PASSWORD=kafka
- MYSQL_ROOT_PASSWORD=kafka
- MYSQL_ROOT_HOST="%"

producer:
container_name: producer
build:
context: .
dockerfile: ./app/producer/Dockerfile
depends_on:
- kafka
environment:
- NEWS_API_KEY=YOUR_TOKEN
networks:
- kafka

consumer:
container_name: consumer
build:
context: .
dockerfile: ./app/consumer/Dockerfile
depends_on:
- kafka
- mysql
networks:
- kafka

volumes:
kafka:
mysql:

Not: Producer kısmındaki YOUR_TOKEN kısmını kendi token bilginiz ile değiştirmeyi unutmayın.

4 . Producer Oluşturma:

NewsAPI’den veri almak için aşağıdaki adımları izleyerek bir Dockerfile ve producer.py dosyalarını içeren producer dizinini oluşturalım.

1. app/producer/producer.py Dosyası:

# app/producer/producer.py

import os
import requests
from kafka import KafkaProducer
import json
import time

# API URL ve Parametreler
api_url = "https://newsapi.org/v2/everything"
api_key = os.getenv('NEWS_API_KEY', 'your_default_api_key_here') # Replace 'your_default_api_key_here' with a default or remove it

params = {
"q": "*",
"apiKey": api_key
}

producer = KafkaProducer(
bootstrap_servers='kafka:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
topic = 'news-data'

def fetch_data_from_api():
response = requests.get(api_url, params=params)
if response.status_code == 200:
return response.json()
else:
print(f"API Error: {response.status_code}")
return None

def produce_to_kafka(data):
for article in data.get('articles', []):
producer.send(topic, value=article)
producer.flush()

while True:
data = fetch_data_from_api()
if data:
print(data)
produce_to_kafka(data)
time.sleep(10)

2. app/producer/Dockerfile Dosyası:

# app/producer/Dockerfile
# Base image
FROM python:3.9
# Set working directory
WORKDIR /app
# Copy the application code
COPY ./app/producer .
# Copy and install the dependencies
COPY ./app/requirements.txt .
RUN pip install - no-cache-dir -r requirements.txt
# Command to run the producer.py script
CMD ["python", "producer.py"]

Bu dosyalar, NewsAPI’den veri çeken ve Kafka’ya yazan bir Python Producer uygulamasını içerir. producer.py dosyası verileri NewsAPI'den çeker ve Kafka'ya yazar, Dockerfile dosyası ise bu Python uygulamasını Docker konteynerinde çalıştırmak için gerekli adımları içerir.

Not: NEWS_API_KEY değişkenini kendi NewsAPI API anahtarınızla değiştirmeyi unutmayın.

5. Consumer Oluşturma:

Consumer dosyamızı oluşturalım ve ardından dockerize edelim.

1. app/consumer/consumer.py Dosyası:

# app/consumer/consumer.py
from kafka import KafkaConsumer
import json
import logging
import mysql.connector
from datetime import datetime

try:
consumer = KafkaConsumer(
'news-data',
bootstrap_servers='kafka:9092',
auto_offset_reset='latest',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# MySQL bağlantısı oluştur
mysql_connection = mysql.connector.connect(
host='mysql',
database='kafka',
user='kafka',
password='kafka'
)
mysql_cursor = mysql_connection.cursor()

for message in consumer:
data = message.value
print("Received data:", data)

source_id = data['source']['id']
source_name = data['source']['name']
author = data['author']
title = data['title']
description = data['description']
url = data['url']
urlToImage = data['urlToImage']
publishedAt = datetime.strptime(data['publishedAt'], '%Y-%m-%dT%H:%M:%SZ')
content = data['content']

insert_query = "INSERT INTO news_data (source_id, source_name, author, title, description, url, urlToImage, publishedAt, content) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)"
record_to_insert = (source_id, source_name, author, title,
description, url, urlToImage, publishedAt, content)
mysql_cursor.execute(insert_query, record_to_insert)
mysql_connection.commit()

except Exception as e:
print("Error:", e)

finally:
if 'consumer' in locals():
consumer.close()
if 'mysql_connection' in locals():
mysql_connection.close()

2. app/consumer/Dockerfile Dosyası:

# app/consumer/Dockerfile
# Base image
FROM python:3.9
# Set working directory
WORKDIR /app
# Copy the application code
COPY ./app/consumer .
COPY app/requirements.txt .
RUN pip install - no-cache-dir -r requirements.txt
# Command to run the consumer.py script
CMD ["python", "consumer.py"]

Bu dosyalar, Kafka’dan veri alan ve MySQL veritabanına yazan bir Python Consumer uygulamasını içerir. consumer.py dosyası Kafka'dan verileri alır ve MySQL veritabanına yazar, Dockerfile dosyası ise bu Python uygulamasını Docker konteynerinde çalıştırmak için gerekli adımları içerir.

6. Gereksinimler Dosyasını Oluşturun:

Bağımlılıkları listeleyen bir requirements.txt dosyası oluşturarak gerekli Python paketlerini ekleyelim.

# app/requirements.txt
kafka-python==2.0.2
mysql-connector-python==8.4.0
requests==2.32.2

Kafka ile iletişim kurmak için kafka-python, MySQL ile iletişim kurmak için mysql-connector-python ve API'den veri almak için requests paketlerini kullanıyoruz.

7. MySQL’de Tablo Oluşturma:

MySQL konteynerine bağlanarak `news_data` tablosunu oluşturmak için aşağıdaki adımları takip edelim:

1. MySQL Konteynerini Başlatma:

docker-compose up -d mysql

2. MySQL Konteynerine Bağlanma:

docker exec -it mysql bash

3. MySQL’e Giriş Yapma:

mysql -u root -p

Ardından şifre olarak docker-compose da belirttiğimiz gibi kafka yazıyoruz ve aşağıdaki SQL komutlarını çalıştırarak news_data tablosunu oluşturabiliriz:

use kafka;

CREATE TABLE news_data (
id INT AUTO_INCREMENT PRIMARY KEY,
source_id VARCHAR(255),
source_name VARCHAR(255),
author VARCHAR(255),
title TEXT,
description TEXT,
url TEXT,
urlToImage TEXT,
publishedAt DATETIME,
content TEXT
);

8. Docker-compose’u Çalıştırma:

Docker-compose’u çalıştırmadan önce, compose dosyası içerisindeki producer servisinin NEWS_API_KEY değişkenini düzenlememiz gerekmektedir. Ardından aşağıdaki komutu kullanarak tüm konteynerları çalıştıralım:

docker-compose up -d — build

Bu komut, compose dosyasındaki servisleri başlatırken, gerekirse imajları yeniden oluşturur (--build flag'ı ile) ve tüm konteynerları arka planda çalıştırır (-d flag'ı ile).

9. Veri tabanına Bağlanıp Tabloyu Kontrolü:

MySQL konteynerine bağlanarak verilerin yazılıp/yazılmadığını kontrol edelim.

docker exec -it mysql bash
mysql -u root -p
# Password: kafka
use kafka;
select * from news_data;

Bu yazıda, NewsAPI’den haber makaleleri almak için Kafka Producer, veriyi işlemek için Kafka Consumer oluşturduk. API’den aldığımız veriyi MySQL veritabanında kaydettik. Tüm kurulumu, dağıtımı ve yönetimi kolaylaştırmak için Dockerize ettik. Okuduğunuz için teşekkürler. Sonraki yazılarda görüşmek üzere.

Kaynaklar

--

--