Hiểu về Serialization và Deserialization trong Apache Kafka

·

8 min read

Apache Kafka là một nền tảng xử lý luồng dữ liệu phân tán mạnh mẽ, được thiết kế để xử lý lượng lớn dữ liệu theo thời gian thực. Trong quá trình truyền dữ liệu giữa các hệ thống, việc chuyển đổi dữ liệu sang định dạng byte (serialization) và ngược lại từ byte thành object (deserialization) là cực kỳ quan trọng. Để thực hiện việc này, Kafka cung cấp các cơ chế mạnh mẽ giúp nhà phát triển dễ dàng tùy chỉnh cách dữ liệu được chuyển đổi.

Trong bài blog này, chúng ta sẽ tìm hiểu chi tiết về hai khái niệm cốt lõi: key.serializer/value.serializerkey.deserializer/value.deserializer trong Kafka Producer và Consumer.

Serialization và Deserialization là gì?

  • Serialization là quá trình chuyển đổi dữ liệu từ một đối tượng có cấu trúc (như JSON, Avro, hoặc các object của ngôn ngữ lập trình) sang một chuỗi byte. Dữ liệu này sau đó sẽ được Kafka Producer sử dụng để gửi thông qua các chủ đề (topic) trong Kafka.

  • Deserialization là quá trình ngược lại: chuyển đổi dữ liệu từ byte (khi nhận từ Kafka) thành các đối tượng có cấu trúc để có thể dễ dàng xử lý trong ứng dụng của bạn.

Key.serializer và Value.serializer trong Kafka Producer

Trong Kafka Producer, dữ liệu trước khi được gửi tới Kafka phải được chuyển đổi thành byte. Quá trình này xảy ra với cả keyvalue của mỗi message, sử dụng các cơ chế key.serializervalue.serializer.

1. Key.serializer

  • Mô tả: Đây là cách mà Kafka Producer chuyển đổi key của mỗi message thành byte. key có thể là bất kỳ dữ liệu nào, ví dụ như ID của người dùng, tên giao dịch, hoặc bất kỳ giá trị nào bạn muốn phân biệt message.

  • Mục đích: Việc sử dụng key giúp Kafka xác định phân vùng (partition) của message. Kafka sử dụng key để đảm bảo rằng các message có cùng key sẽ được gửi tới cùng một phân vùng, giữ thứ tự các message cho cùng một key.

2. Value.serializer

  • Mô tả: Đây là cách Kafka Producer chuyển đổi value của message thành byte. value là nội dung chính của message mà bạn muốn truyền tải qua Kafka, ví dụ như dữ liệu giao dịch, trạng thái người dùng, hoặc bất kỳ thông tin nào khác.

  • Cơ chế phổ biến:

    • StringSerializer: Sử dụng cho các chuỗi ký tự.

    • IntegerSerializer: Sử dụng cho các số nguyên.

    • ByteArraySerializer: Sử dụng khi dữ liệu đã ở dạng byte.

    • JsonSerializer: Chuyển đổi dữ liệu từ đối tượng JSON thành chuỗi byte, rất hữu ích khi truyền dữ liệu dạng JSON giữa các hệ thống.

Key.deserializer và Value.deserializer trong Kafka Consumer

Khi dữ liệu được Producer gửi tới Kafka, nó sẽ được lưu trữ dưới dạng byte. Để sử dụng dữ liệu này trong ứng dụng, Consumer cần chuyển đổi dữ liệu từ byte về lại đối tượng ban đầu. Quá trình này được thực hiện thông qua key.deserializervalue.deserializer.

1. Key.deserializer

  • Mô tả: Tương tự như quá trình serialize key, khi Consumer nhận message từ Kafka, key.deserializer giúp chuyển đổi chuỗi byte của key thành đối tượng có thể đọc được.

  • Mục đích: Nếu Producer sử dụng key để chỉ định các phân vùng, Consumer sẽ cần giải mã key để biết được thông tin về key này, giúp tổ chức và xử lý dữ liệu hiệu quả hơn.

2. Value.deserializer

  • Mô tả: Đây là cách Kafka Consumer chuyển đổi value từ byte về lại đối tượng mà ứng dụng có thể hiểu được.

  • Cơ chế phổ biến:

    • StringDeserializer: Dùng để chuyển đổi dữ liệu từ chuỗi byte thành chuỗi ký tự.

    • IntegerDeserializer: Giải mã dữ liệu thành số nguyên.

    • JsonDeserializer: Giúp chuyển đổi dữ liệu từ byte thành đối tượng JSON.

Tại sao Serialization và Deserialization quan trọng trong Kafka?

Kafka là một hệ thống phân tán xử lý luồng dữ liệu khổng lồ, việc quản lý và tối ưu hóa quá trình chuyển đổi dữ liệu là rất quan trọng cho hiệu năng và độ tin cậy. Dưới đây là một số lý do tại sao serialization và deserialization đóng vai trò quan trọng trong Kafka:

  1. Hiệu suất: Chuyển đổi dữ liệu sang byte giúp dữ liệu được truyền qua mạng và lưu trữ trong Kafka nhanh hơn, giảm thiểu độ trễ và tối ưu hóa băng thông.

  2. Tính linh hoạt: Kafka hỗ trợ nhiều loại serializer và deserializer khác nhau, giúp bạn dễ dàng tùy chỉnh cách thức lưu trữ và xử lý dữ liệu theo nhu cầu cụ thể của hệ thống.

  3. Tính tương thích: Các serializer như Avro hoặc Protobuf đảm bảo rằng dữ liệu vẫn tương thích ngay cả khi các schema thay đổi. Điều này cực kỳ hữu ích trong các hệ thống lớn, khi dữ liệu cần phải tiến hóa theo thời gian.

  4. Đảm bảo tính toàn vẹn dữ liệu: Việc sử dụng các serializer có cấu trúc như JSON hoặc Avro giúp đảm bảo rằng dữ liệu được truyền đi không bị mất mát hoặc hỏng trong quá trình truyền qua Kafka.

Khi nào nên sử dụng Key?

Không phải lúc nào bạn cũng cần sử dụng key khi gửi message qua Kafka. Tuy nhiên, việc sử dụng key có thể mang lại lợi ích lớn trong các trường hợp sau:

  • Đảm bảo thứ tự xử lý: Các message có cùng key luôn được gửi tới cùng một phân vùng, điều này đảm bảo rằng chúng được xử lý theo thứ tự.

  • Phân loại dữ liệu: Sử dụng key giúp bạn dễ dàng phân loại và tổ chức dữ liệu trong Kafka. Ví dụ, nếu bạn gửi các giao dịch từ nhiều người dùng, bạn có thể sử dụng key là ID của người dùng để phân biệt.

Dưới đây là ví dụ chi tiết về cách sử dụng key.deserializervalue.deserializer trong Kafka Consumer bằng Python, với một số tham số quan trọng đã liệt kê ở trên.

Cài đặt Thư viện

Trước khi bắt đầu, đảm bảo rằng bạn đã cài đặt thư viện kafka-python:

pip install kafka-python

Producer

from kafka import KafkaProducer
import json

# Hàm serialize cho JSON
def json_serializer(value):
    return json.dumps(value).encode('utf-8')

# Tạo Kafka producer
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    key_serializer=json_serializer,
    value_serializer=json_serializer
)

# Dữ liệu JSON để gửi
key_data = {"id": 1}
value_data = {"name": "David", "role": "Head of Technology"}

# Gửi message với key và value là JSON
producer.send('test-topic', key=key_data, value=value_data)
producer.flush()

print("JSON message sent!")

Consumer sử dụng json deserializer

Dữ liệu đã được gửi dưới dạng JSON, nên ta sẽ sử dụng json để deserialize dữ liệu từ byte về object Python.

from kafka import KafkaConsumer
import json

# Hàm deserialize cho JSON
def json_deserializer(value):
    if value is None:
        return None
    return json.loads(value.decode('utf-8'))

# Tạo Kafka consumer
consumer = KafkaConsumer(
    'test-topic',
    bootstrap_servers='localhost:9092',
    group_id='test-group',
    auto_offset_reset='earliest',   # Đọc từ đầu nếu không tìm thấy offset
    enable_auto_commit=False,       # Tắt auto commit để kiểm soát commit offset
    key_deserializer=json_deserializer,
    value_deserializer=json_deserializer,
    max_poll_records=10,            # Lấy tối đa 10 bản ghi trong một lần poll
    session_timeout_ms=10000,       # Timeout cho consumer là 10 giây
    heartbeat_interval_ms=3000      # Interval giữa các heartbeat là 3 giây
)

# Đọc message từ topic
for message in consumer:
    print(f"Key: {message.key}, Value: {message.value}")

    # Commit manually after processing
    consumer.commit()

consumer.close()

Sử dụng Pickle Serializer (Dữ liệu phức tạp)

Nếu bạn muốn gửi các đối tượng Python phức tạp hơn (ví dụ: dict, list, tuple), có thể sử dụng thư viện pickle để serialize đối tượng thành byte:

import pickle
from kafka import KafkaProducer

# Hàm serialize cho Pickle
def pickle_serializer(value):
    if value is None:
        return None
    return pickle.dumps(value)

# Tạo Kafka producer
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    key_serializer=pickle_serializer,
    value_serializer=pickle_serializer
)

# Dữ liệu Python phức tạp
key_data = ('key_tuple', 123)
value_data = {'name': 'Kafka Producer', 'version': 1.0}

# Gửi message với key và value là dữ liệu phức tạp
producer.send('test-topic', key=key_data, value=value_data)
producer.flush()

print("Pickle message sent!")

Consumer sử dụng pickle deserializer

Nếu Producer đã sử dụng pickle để serialize dữ liệu phức tạp (như tuple hoặc dict), thì consumer sẽ dùng pickle để deserialize.

from kafka import KafkaConsumer
import pickle

# Hàm deserialize cho Pickle
def pickle_deserializer(value):
    if value is None:
        return None
    return pickle.loads(value)

# Tạo Kafka consumer
consumer = KafkaConsumer(
    'test-topic',
    bootstrap_servers='localhost:9092',
    group_id='pickle-group',
    auto_offset_reset='earliest',   # Đọc từ đầu nếu không tìm thấy offset
    enable_auto_commit=True,        # Bật auto commit
    key_deserializer=pickle_deserializer,
    value_deserializer=pickle_deserializer,
    max_poll_records=5,             # Lấy tối đa 5 bản ghi trong một lần poll
    session_timeout_ms=15000,       # Timeout cho consumer là 15 giây
    heartbeat_interval_ms=5000      # Interval giữa các heartbeat là 5 giây
)

# Đọc message từ topic
for message in consumer:
    print(f"Key: {message.key}, Value: {message.value}")

    # Dữ liệu đã tự động commit do enable_auto_commit=True

consumer.close()

Giải thích Các Tham Số Được Sử Dụng

  • auto_offset_reset='earliest': Giúp Consumer đọc từ đầu khi không có offset hợp lệ.

  • enable_auto_commit=False: Tắt auto commit để kiểm soát việc commit offset thủ công, tránh việc mất dữ liệu nếu có sự cố.

  • max_poll_records=10: Giới hạn số lượng bản ghi mà consumer sẽ nhận trong một lần gọi poll(), giúp kiểm soát khối lượng công việc.

  • session_timeout_ms=10000: Nếu Kafka không nhận được heartbeat từ consumer trong 10 giây, consumer sẽ bị coi là "chết".

  • heartbeat_interval_ms=3000: Khoảng thời gian giữa mỗi lần gửi heartbeat để giữ kết nối với broker.