monitoring_kafka_in_production_-_main_nqedfe-1280x720

مقدمه ای بر Kafka Streams

Kafka streams کتابخانه ای برای ساخت برنامه های جریان پرداز است که تاپیک های ورودی کافکا را به تاپیک های خروجی تبدیل می کند. در این مقاله به نحوه ی پیاده سازی جریان های کافکا می پردازیم.

اگر شما با داده های بزرگ سروکار داشته اید، مطمئنا در مورد کافکا شنیده اید. در سطح خیلی بالا، کافکا، یک سیستم پیام رسانی توزیع شده و منتشر شده مشترک بوده که برای پردازش سریع داده ها طراحی شده و با توانایی اداره صدها هزار پیام، در برابر خطا نیز تحمل پذیر است. تکنولوژی کافکا به طور مشخص برای پردازش جریان داده ها (Stream Processing) و ارسال و دریافت پیام (Message Broker) مورد استفاده قرار می گیرد.

مفهوم پردازش جریان داده ها stream processing

پردازش جریان در واقع پردازش اطلاعات در زمان بلادرنگ، به طور مداوم، همزمان و بصورت رکورد به رکورد می باشد.

به عبارت دیگر پردازش جریان یک الگوی برنامه نویسی کامپیوتری است که معادل برنامه نویسی dataflow، پردازش جریان رویداد و برنامه نویسی واکنشی است که به بعضی از برنامه ها اجازه می دهد تا به راحتی از یک فرم محدود پردازش موازی بهره مند شوند.

پردازش بلادرنگ real time

کافکا کاربردهای زیادی دارد که یکی از آنها پردازش بلادرنگ می باشد.

درابتدا باید ببینیم که در پردازش بلادرنگ چه اتفاقی رخ می دهد. به عبارت ساده، همه ما می دانیم که پردازش بلادرنگ حاوی یک جریان پیوسته از داده ها می باشد.

بعضی از روش های تجزیه و تحلیل انجام شده و برخی از داده های مفید را از آن دریافت می کنیم. از نظر کافکا، پردازش بلادرنگ به طور معمول شامل خواندن داده ها از یک تاپیک (منبع)، انجام بعضی تجزیه و تحلیل ها و یا تبدیلات بر روی آن و سپس نوشتن نتایج و برگرداندن آن به تاپیک دیگر (Sink) می باشد. در حال حاضر، برای انجام این نوع کار، گزینه های ما عبارتند از:

  • نوشتن کد دلخواه ما با استفاده از KafkaCustomer برای خواندن داده و نوشتن آن با استفاده از KafkaProducer
  • استفاده از یک چارچوب تکامل یافته پردازش جریان مانند اسپارک، فلینک و …

حال به مفهوم جریان کافکا یا Kafka streams می پردازیم.

جریان کافکا چیست؟

Kafka streams کتابخانه ای برای ساخت برنامه های جریان پرداز است که تاپیک های ورودی کافکا را به تاپیک های خروجی کافکا (یا فراخوانی سرویس های خارجی، به روزرسانی پایگاه داده و …) تبدیل می کند. جریان کافکا به شما این امکان را می دهد که کار را با کدی مختصر، به شکل توزیع شده و با تحمل پذیری خطا، انجام دهید.

پیاده سازی جریان کافکا

یک برنامه ی جریان پرداز، با استفاده ازجریان کافکا به صورت زیر ساخته می شود.

پیکربندی ایجاد شده برای جریان:

 

Properties streamsConfiguration = new Properties();

streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "Streaming-QuickStart");

streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

دریافت تاپیک (topic) و رشته (serdes):

String topic = configReader.getKStreamTopic();
String producerTopic = configReader.getKafkaTopic();
final Serde stringSerde = Serdes.String();
final Serde longSerde = Serdes.Long();

ایجاد جریان و واکشی داده:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> inputStreamData = builder.stream(stringSerde, stringSerde, producerTopic);

پردازش جریان:

KStream<String, Long> processedStream = inputStreamData.mapValues(record -> record.length() )

علاوه بر join و aggregate، لیستی از سایر عملیات تبدیل برای KStream وجود دارد. هر یک از این عملیات ممکن است یک یا چند شی KStream تولید کند و همچنین می تواند درون یک یا چند پردازنده ی متصل تحت توپولوژی پردازنده ی اصلی تفسیر شود. همه ی این متدهای انتقال، می توانند با هم ترکیب شده و یک توپولوژی پیچیده پردازنده را تشکیل دهند.

در بین این تبدیلات، filter، map، mapValues و … عملیات تبدیل بی ثمر هستند بدین معنی که هر کاربر می تواند یک تابع سفارشی را به عنوان پارامتر مثلا Predicate برای filter و KeyValueMapper برای map و … را بسته به استفاده ی آنها در یک زبان بکار برند.

نوشتن جریان های بازگشتی در کافکا:

processedStream.to(stringSerde, longSerde, topic);

در این مرحله، ساختار داخلی درج شده است، اما پردازش هنوز شروع نشده است. شما باید جریان کافکا را با استفاده از فراخوانی متد start() شروع کنید:

KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);streams.start();

و در نهایت آخرین گام بستن جریان است.

امیدوارم با استفاده از این مقاله بتوانید استفاده از جریان کافکا را شروع کنید. جهت آشنایی بیشتر می توانید به لینک زیر مراجعه نمایید:

مستندات جریان کافکا

ثبت دیدگاه

ایمیل شما نمایش داده نخواهد شد. لطفا فیلد های مرتبط را پر کنید.