Kafka best practices

This article was last updated on: July 24, 2024 am

preface

Kafka best practices, involveing:

  1. Typical usage scenarios
  2. Best practices used by Kafka

Typical use cases for Kafka

Data Streaming

Kafka can be docked with many mainstream stream data processing technologies such as Spark, Flink, and Flume. Taking advantage of Kafka’s high throughput, customers can establish a transmission channel through Kafka to transmit massive data on the application side to the stream data processing engine, and after data processing and analysis, it can support various services such as back-end big data analysis and AI model training.

Kafka Data Stream

Log platform

The most commonly used scenario by Kafka, and the scenario I am most familiar with, is the log analysis system. The typical implementation is to deploy a log collector (such as Fluentd, Filebeat or Logstash) on the client side for log collection, send the data to Kafka, and then perform data operations through the back-end ES, and then build a display layer such as Kibana for statistical analysis data display.

fluentd-kafka-es

fluentd-kafka-logstash

Internet of Things

With the emergence of valuable use cases, the Internet of Things (IoT) is gaining more and more attention. However, a key challenge is integrating equipment and machines to process data in real time and at scale. Apache Kafka and its surrounding ecosystems, including Kafka Connect and Kafka® Streams, have become the technology of choice for integrating and processing such datasets.

Kafka is already being used in many IoT deployments, including consumer IoT and Industrial IoT (IIoT). Most scenarios require reliable, scalable, and secure end-to-end integration to support real-time, two-way communication and data processing. Some specific use cases are:

  • Connected automotive infrastructure
  • Smart cities and smart homes
  • Smart Retail and Customer 360
  • Smart manufacturing

The specific implementation architecture is shown in the following figure:

edge-datacenter-cloud

kafka-gateway

Best practices used

Reliability best practices

Meet different reliability based on producer and consumer configurations

Producer At Least Once

Producers need to set up request.required.acks = ALL, the server master node is successfully written and the standby node is synchronized successfully.

Consumer At Least Once

After the consumer receives the message,The corresponding business operation should be performed first, followed by a commit to identify that the message has been processedThis processing ensures that a message can be reconsumed if business processing fails. Pay attention to the consumer’s enable.auto.commit The parameter needs to be set to Falseto ensure that the commit action is controlled manually.

Producer At Most Once

To ensure that a message is delivered at most once, you need to set this setting request.required.acks = 0, set at the same time retries = 0。 The principle here is that the producer does not retry any exception and does not consider whether the broker responds successfully in writing.

Consumer At Most Once

To ensure that a message is consumed at most once, the consumer needs to be presentAfter receiving the message, commit to identify that the message has been processed, and then perform the corresponding business operation。 The principle here is that consumers do not need to care about the actual business processing results, and immediately commit to tell the broker that the message processing is successful after getting the message. Pay attention to the consumer’s **enable.auto.commit The parameter needs to be set to False**to ensure that the commit action is controlled manually.

Producer Exactly-once

Kafka 0.11 has been addedIdempotent messagesThe semantics of the setting enable.idempotence=true parameters, can be implementedSingle partitionMessage idempotent.

If a topic involves multiple partitions or requires multiple messages to be encapsulated into a transaction capable of being protected, you need to increase the transaction control, such as the following:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 开启幂等控制参数
producerProps.put("enbale.idempotence", "true");
// 初始化事务
producer.initTransactions();
// 设置事务 ID
producerProps.put("transactional.id", "id-001");

try{
// 开始事务,并在事务中发送 2 条消息
producer.beginTranscation();
producer.send(record0);
producer.send(record1);
// 提交事务
producer.commitTranscation();
} catch( Exception e ) {
producer.abortTransaction();
producer.close();
}
Consumer Exactly-once

Setup required isolation.level=read_committed, and set it enable.auto.commit = falseTo ensure that consumers only consume messages that the producer has already committed transactions, consumer businesses need to ensure transactionality to avoid duplicate processing of messages, such as persisting messages to a database and then committing commits to the server.

Select the appropriate semantics according to the business scenario

Use At Least Once semantics to support a business that accepts a small number of message duplications

At Least Once is the most commonly used semantic that ensures that messages are only sent and consumed, with a good balance of performance and reliability, as it can beBy defaultmode.The business side can also guarantee idempotency by adding a unique business primary key to the message bodyOn the consumer side, ensure that messages with the same business primary key are processed only once.

Using Exactly Once semantics underpins businesses that require strong idempotency

Exactly Once semantics generally use a key business that absolutely does not tolerate duplication, typical cases areScenarios related to orders and payments

Use At Most Once semantics to support non-critical business

At Most Once semantics are generally used inNon-business criticalbusinessNot sensitive to message loss, you just need to try to ensure that the message is successfully produced and consumed. A typical scenario for using At Most Once semantics isMessage notifications, where a small number of missed messages have little impact, and sending notifications repeatedly results in a worse user experience.

Performance tuning best practices

Set the number of partitions for a topic reasonably

The following summarizes the dimensions recommended for tuning performance through partition, and recommends that you tune the overall performance of the system based on theoretical analysis and stress testing.

| Consider dimensions | illustrate |
| ------------------- | ---------- ||
| Throughput | Increasing the number of partitions can increase the concurrency of message consumption, and when the system bottleneck is on the consumer side, and the consumer side can scale horizontally, increasing the partition can increase the system throughput. In Kafka, each partition under each topic is an independent message processing channel, and messages in a partition can only be consumed by a consumer group at the same time, and when the number of consumer groups exceeds the number of partitions, the excess consumer group will be idle. |
| Message Order | Kafka can ensure the order of messages within a partition, and the message order between partitions cannot be guaranteed, so you need to consider the impact of message order on the business when increasing partition. |
| Instance Partition Cap | An increase in partition consumes more underlying memory, resources such as IO and file handles. When planning the number of partitions for a topic, you need to consider the upper limit of the partition that the Kafka cluster can support. |

A description of the relationship between producers, consumers and partitions.

kafka_groups

Set the batch size appropriately

If multiple partitions are set for a topic, producers need to confirm which partition to send messages to. When sending multiple messages to the same partition, the Producer client packages the relevant messages into a batch and sends them to the server in batches. In general, small batches cause a large number of requests from the Producer client, causing the request queue to be queued on the client and server side, which will push up the overall delay of message sending and consumption.

A suitable batch size can reduce the number of requests made by the client to the server when sending messages, and improve the throughput and latency of message sending overall.

The Batch parameters are described as follows:

Parameter Description
batch.size The amount of message cache sent to each partition (the sum of bytes of message content, not the number of messages). When the set value is reached, a network request is triggered, and then the Producer client sends the message to the server in batches.
linger.ms The maximum time that each message remains in the cache. If this time is exceeded, the Producer client ignores it batch.size , immediately send messages to the server.
buffer.memory When the overall size of all cached messages exceeds this value, the message is triggered to be sent to the server, which is ignored batch.size and linger.ms Restrictions.buffer.memory The default value is 32MB, which guarantees sufficient performance for a single Producer.

There is no universal method for selecting Batch-related parameter values, and we recommend that you perform stress test tuning for performance-sensitive business scenarios.

Use sticky partitioning to handle bulk sends

Kafka producers and servers have a batch sending mechanism when sending messages, and only messages sent to the same partition will be put into the same batch. In the scenario of sending large batches, if messages are scattered among multiple partitions, multiple small batches may be formed, causing the batch sending mechanism to fail and reduce performance.

Kafka’s default partition selection strategy is as follows

Scenario Policy
The message specifies Key Hash the key of the message, and then select a partition based on the hash result to ensure that messages of the same key will be sent to the same partition.
The message does not specify Key The default policy is to loop through all partitions of the topic, sending messages to each partition in a round-robin fashion.

It can be seen from the default mechanism that the selection of partition is highly random, so it is recommended to set it in the scenario of mass transmission partitioner.classparameter that specifies the custom partition selection algorithm implementation Sticky partitioning

One way to do this is to use the same partition for a fixed period of time and switch to the next partition after a period of time to avoid scattering data across multiple different partitions.

Common best practices

Kafka’s guarantee of message order

Kafka guarantees message order within the same partition, and cannot ensure global order if a topic has multiple partitions. If you need to guarantee the global order, you need to control the number of partitions to one.

Set a unique key for the message

Message Queuing Kafka messages have two fields: Key (message ID) and Value (message content). For easy tracking, it is recommended to set a unique key for the message. After that, you can use the key to track a message, print the sending log and consumption log, and understand the production and consumption of the message.

Set the retry policy of the queue reasonably

In a distributed environment, messages occasionally fail to be sent due to network and other reasons, perhaps because the message has been successfully sent but the ACK mechanism has failed, or the message has not been sent successfully. The default parameters can meet most scenarios, but you can set the following retry parameters as needed according to your business needs:

parameter illustrate
retries Number of retries, the default value is 3, but consider setting it to apps with zero tolerance for data loss Integer.MAX_VALUE(valid and maximum).
retry.backoff.ms The retry interval is recommended to be set to 1000.

Note:

If you want to implement At Most Once semantics, retries need to be turned off.

Access best practices

Spark Streaming accesses Kafka

Spark Streaming is an extension of Spark Core for high-throughput and fault-tolerant processing of persistent data, and currently supports external inputs such as Kafka, Flume, HDFS/S3, Kinesis, Twitter, and TCP sockets.
Alt text

Spark Streaming abstracts continuous data into DStream (Discretized Stream), while DStream consists of a series of continuous RDDs (Elastic Distributed Datasets), each RDD is data generated within a certain time interval. Using functions to process DStream is essentially processing these RDDs.
Alt text

When using Spark Streaming as data input to Kafka, both stable and experimental versions of Kafka are supported:

Kafka Version spark-streaming-kafka-0.8 spark-streaming-kafka-0.10
Broker Version 0.8.2.1 or higher 0.10.0 or higher
Api Maturity Deprecated Stable
Language Support Scala、Java、Python Scala、Java
Receiver DStream Yes No
Direct DStream Yes Yes
SSL / TLS Support No Yes
Offset Commit Api No Yes
Dynamic Topic Subscription No Yes

This exercise uses version 0.10.2.1 of Kafka dependencies.

Procedure

Step 1: Create a Kafka cluster and topic

The steps to create a Kafka cluster are slight, and then create a named one test of Topic.

Step 2: Prepare the server environment

Centos 6.8 system

package version
sbt 0.13.16
hadoop 2.7.3
spark 2.1.0
protobuf 2.5.0
ssh CentOS is installed by default
Java 1.8

The specific installation steps are omitted, including the following steps:

  1. Install SBT
  2. Install Protobuf
  3. Install Hadoop
  4. Install Spark
Step 3: Docking Kafka
Produce messages to Kafka

Here the 0.10.2.1 version of the Kafka dependency is used.

  1. at build.sbt To add a dependency:
1
2
3
4
name := "Producer Example"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.2.1"
  1. disposition producer_example.scala

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    import java.util.Properties
    import org.apache.kafka.clients.producer._
    object ProducerExample extends App {
    val props = new Properties()
    props.put("bootstrap.servers", "172.0.0.1:9092") //实例信息中的内网 IP 与端口

    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)
    val TOPIC="test" //指定要生产的 Topic
    for(i<- 1 to 50){
    val record = new ProducerRecord(TOPIC, "key", s"hello $i") //生产 key 是"key",value 是 hello i 的消息
    producer.send(record)
    }
    val record = new ProducerRecord(TOPIC, "key", "the end "+new java.util.Date)
    producer.send(record)
    producer.close() //最后要断开
    }

For more information on how to use ProducerRecord, see ProducerRecord Documentation.

Consume messages from Kafka

####### DirectStream

  1. at build.sbt To add a dependency:
1
2
3
4
5
6
name := "Consumer Example"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.1.0"
  1. disposition DirectStream_example.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import collection.JavaConversions._
import Array._
object Kafka {
def main(args: Array[String]) {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "172.0.0.1:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark_stream_test1",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> "false"
)

val sparkConf = new SparkConf()
sparkConf.setMaster("local")
sparkConf.setAppName("Kafka")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topics = Array("spark_test")

val offsets : Map[TopicPartition, Long] = Map()

for (i <- 0 until 3){
val tp = new TopicPartition("spark_test", i)
offsets.updated(tp , 0L)
}
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
println("directStream")
stream.foreachRDD{ rdd=>
//输出获得的消息
rdd.foreach{iter =>
val i = iter.value
println(s"${i}")
}
//获得offset
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
}

// Start the computation
ssc.start()
ssc.awaitTermination()
}
}

####### RDD

  1. dispositionbuild.sbt(The configuration is the same as above,Click View)。
  2. dispositionRDD_example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import collection.JavaConversions._
import Array._
object Kafka {
def main(args: Array[String]) {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "172.0.0.1:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark_stream",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val sc = new SparkContext("local", "Kafka", new SparkConf())
val java_kafkaParams : java.util.Map[String, Object] = kafkaParams
//按顺序向 parition 拉取相应 offset 范围的消息,如果拉取不到则阻塞直到超过等待时间或者新生产消息达到拉取的数量
val offsetRanges = Array[OffsetRange](
OffsetRange("spark_test", 0, 0, 5),
OffsetRange("spark_test", 1, 0, 5),
OffsetRange("spark_test", 2, 0, 5)
)
val range = KafkaUtils.createRDD[String, String](
sc,
java_kafkaParams,
offsetRanges,
PreferConsistent
)
range.foreach(rdd=>println(rdd.value))
sc.stop()
}
}

more kafkaParams Usage reference kafkaParams Documentation.

Flume is connected to Kafka

Apache Flume is a distributed, reliable, and highly available log collection system that supports a variety of data sources (such as HTTP, Log files, JMS, listening port data, etc.), and can efficiently collect, aggregate, move, and store massive log data from these data sources into designated storage systems (such as Kafka, distributed file systems, Solr search servers, etc.).

The basic structure of Flume is as follows:

Flume has an agent as the smallest unit of independent operation. An agent is a JVM, and a single agent consists of three components: Source, Sink and Channel.

Flume and Kafka

When storing data to downstream storage modules or compute modules such as HDFS or HBase, you need to consider various complex scenarios, such as the amount of concurrent writes, system bearer pressure, and network latency. Flume is a flexible distributed system with multiple interfaces while providing customizable pipelines.
In the production processing stage, Kafka can act as a cache when production and processing speed are inconsistent. Kafka has a partition structure and appends data, which makes Kafka have excellent throughput capabilities; At the same time, it has a replication structure, which makes Kafka highly fault-tolerant.
So combining Flume and Kafka can meet most of the requirements in a production environment.

Preparations

  • Download Apache Flume (Kafka compatible with version 1.6.0 or above)
  • Download Kafka toolkit (Version 0.9.x or above, 0.8 is no longer supported)
  • Confirm that Kafka’s Source and sink components are already in Flume.

Access method

Kafka can be used as a source or sink to import or export messages.

Kafka Source

Configure kafka as a source, that is, as a consumer, pulling data from Kafka into the specified sink. The main configuration options are as follows:

Configuration Item Description
channels Self-configured Channel
type Must be:org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers Kafka Broker’s server address
kafka.consumer.group.id Group ID
kafka.topics Data source in Kafka Topic
batchSize The size of each write to the Channel
batchDurationMillis Maximum interval between writes

Example:

1
2
3
4
5
6
7
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource 
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id

For more information, please refer to Apache Flume website

Kafka Sink

Configure Kafka as a content receiver, that is, push yourself into the Kafka Server as a producer for subsequent operations. The main configuration options are as follows:

Configuration Item Description
channel Self-configured Channel
type Must be:org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.servers Kafka Broker’s server
kafka.topics Data source in Kafka Topic
kafka.flumeBatchSize Bacth size per write
kafka.producer.acks Production strategy of Kafka producers

Example:

1
2
3
4
5
6
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1

For more information, please refer to Apache Flume website

Storm is connected to Kafka

Storm is a distributed real-time computing framework that can stream data and provide general-purpose distributed RPC calls, which can achieve sub-second latency in processing events, which is suitable for real-time data processing scenarios with high latency requirements.

How Storm works

There are two types of nodes in a Storm cluster, the control nodeMaster Nodeand worker nodesWorker NodeMaster NodeonNimbusProcesses for resource allocation and status monitoring.Worker NodeonSupervisorprocess, listen for work tasks, startexecutorExecute. The entire Storm cluster dependszookeeperResponsible for public data storage, cluster status monitoring, task allocation and other functions.

The data handler that users submit to Storm is calledtopology, the smallest unit of message it processes istuple, an array of arbitrary objects.topologycomposedspoutandboltconstitutespoutis producedtupleThe source,boltYou can subscribe to anyspoutorboltIssuedtupleprocessing.

Storm with Kafka

Storm can use Kafka asspoutfor the processing of consumption data; Also asbolt, storing the processed data for consumption by other components.

Centos 6.8 system

package version
maven 3.5.0
storm 2.1.0
ssh 5.3
Java 1.8

Prerequisites

Procedure

Step 1: Create a topic
Step 2: Add Maven dependencies

The pom.xml configuration is as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>storm</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>ExclamationTopology</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Step 3: Production Messages
Use spout/bolt

topology code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
//TopologyKafkaProducerSpout.java
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;

import java.util.Properties;

public class TopologyKafkaProducerSpout {
//申请的kafka实例ip:port
private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
//指定要将消息写入的topic
private final static String TOPIC = "storm_test";
public static void main(String[] args) throws Exception {
//设置producer属性
//函数参考:https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
//属性参考:http://kafka.apache.org/0102/documentation.html
Properties properties = new Properties();
properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
properties.put("acks", "1");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

//创建写入kafka的bolt,默认使用fields("key" "message")作为生产消息的key和message,也可以在FieldNameBasedTupleToKafkaMapper()中指定
KafkaBolt kafkaBolt = new KafkaBolt()
.withProducerProperties(properties)
.withTopicSelector(new DefaultTopicSelector(TOPIC))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
TopologyBuilder builder = new TopologyBuilder();
//一个顺序生成消息的spout类,输出field是sentence
SerialSentenceSpout spout = new SerialSentenceSpout();
AddMessageKeyBolt bolt = new AddMessageKeyBolt();
builder.setSpout("kafka-spout", spout, 1);
//为tuple加上生产到kafka所需要的fields
builder.setBolt("add-key", bolt, 1).shuffleGrouping("kafka-spout");
//写入kafka
builder.setBolt("sendToKafka", kafkaBolt, 8).shuffleGrouping("add-key");

Config config = new Config();
if (args != null && args.length > 0) {
//集群模式,用于打包jar,并放到storm运行
config.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
} else {
//本地模式
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}

}
}

Create a spout class that generates messages sequentially:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;
import java.util.UUID;

public class SerialSentenceSpout extends BaseRichSpout {

private SpoutOutputCollector spoutOutputCollector;

@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}

@Override
public void nextTuple() {
Utils.sleep(1000);
//生产一个UUID字符串发送给下一个组件
spoutOutputCollector.emit(new Values(UUID.randomUUID().toString()));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("sentence"));
}
}

for tuple Add the two fields of key and message, when the key is null, the produced message is evenly distributed to each partition, and after specifying the key, it will be hashed to a specific partition according to the key value:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//AddMessageKeyBolt.java
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class AddMessageKeyBolt extends BaseBasicBolt {

@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
//取出第一个filed值
String messae = tuple.getString(0);
//System.out.println(messae);
//发送给下一个组件
basicOutputCollector.emit(new Values(null, messae));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//创建发送给下一个组件的schema
outputFieldsDeclarer.declare(new Fields("key", "message"));
}
}
Use trident

To generate topology using the trident class:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
//TopologyKafkaProducerTrident.java
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.trident.TridentKafkaStateFactory;
import org.apache.storm.kafka.trident.TridentKafkaStateUpdater;
import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Properties;

public class TopologyKafkaProducerTrident {
//申请的kafka实例ip:port
private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
//指定要将消息写入的topic
private final static String TOPIC = "storm_test";
public static void main(String[] args) throws Exception {
//设置producer属性
//函数参考:https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
//属性参考:http://kafka.apache.org/0102/documentation.html
Properties properties = new Properties();
properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
properties.put("acks", "1");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//设置Trident
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withProducerProperties(properties)
.withKafkaTopicSelector(new DefaultTopicSelector(TOPIC))
//设置使用fields("key", "value")作为消息写入 不像FieldNameBasedTupleToKafkaMapper有默认值
.withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "value"));
TridentTopology builder = new TridentTopology();
//一个批量产生句子的spout,输出field为sentence
builder.newStream("kafka-spout", new TridentSerialSentenceSpout(5))
.each(new Fields("sentence"), new AddMessageKey(), new Fields("key", "value"))
.partitionPersist(stateFactory, new Fields("key", "value"), new TridentKafkaStateUpdater(), new Fields());

Config config = new Config();
if (args != null && args.length > 0) {
//集群模式,用于打包jar,并放到storm运行
config.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
} else {
//本地模式
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, builder.build());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}

}

private static class AddMessageKey extends BaseFunction {

@Override
public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
//取出第一个filed值
String messae = tridentTuple.getString(0);
//System.out.println(messae);
//发送给下一个组件
//tridentCollector.emit(new Values(Integer.toString(messae.hashCode()), messae));
tridentCollector.emit(new Values(null, messae));
}
}
}

Create a spout class that generates messages in bulk:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
//TridentSerialSentenceSpout.java
import org.apache.storm.Config;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;
import java.util.UUID;

public class TridentSerialSentenceSpout implements IBatchSpout {

private final int batchCount;

public TridentSerialSentenceSpout(int batchCount) {
this.batchCount = batchCount;
}

@Override
public void open(Map map, TopologyContext topologyContext) {

}

@Override
public void emitBatch(long l, TridentCollector tridentCollector) {
Utils.sleep(1000);
for(int i = 0; i < batchCount; i++){
tridentCollector.emit(new Values(UUID.randomUUID().toString()));
}
}

@Override
public void ack(long l) {

}

@Override
public void close() {

}

@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
conf.setMaxTaskParallelism(1);
return conf;
}

@Override
public Fields getOutputFields() {
return new Fields("sentence");
}
}
Step 4: Consume messages
Use spout/bolt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
//TopologyKafkaConsumerSpout.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.spout.*;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.HashMap;
import java.util.Map;

import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;

public class TopologyKafkaConsumerSpout {
//申请的kafka实例ip:port
private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
//指定要将消息写入的topic
private final static String TOPIC = "storm_test";

public static void main(String[] args) throws Exception {
//设置重试策略
KafkaSpoutRetryService kafkaSpoutRetryService = new KafkaSpoutRetryExponentialBackoff(
KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
Integer.MAX_VALUE,
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10)
);
ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>(
(r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
new Fields("topic", "partition", "offset", "key", "value"));
//设置consumer参数
//函数参考http://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html
//参数参考http://kafka.apache.org/0102/documentation.html
KafkaSpoutConfig spoutConfig = KafkaSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPIC)
.setProp(new HashMap<String, Object>(){{
put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1"); //设置group
put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000"); //设置session超时
put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); //设置请求超时
}})
.setOffsetCommitPeriodMs(10_000) //设置自动确认时间
.setFirstPollOffsetStrategy(LATEST) //设置拉取最新消息
.setRetry(kafkaSpoutRetryService)
.setRecordTranslator(trans)
.build();

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig), 1);
builder.setBolt("bolt", new BaseRichBolt(){
private OutputCollector outputCollector;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

}

@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector = outputCollector;
}

@Override
public void execute(Tuple tuple) {
System.out.println(tuple.getStringByField("value"));
outputCollector.ack(tuple);
}
}, 1).shuffleGrouping("kafka-spout");

Config config = new Config();
config.setMaxSpoutPending(20);
if (args != null && args.length > 0) {
config.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
}
else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, builder.createTopology());
Utils.sleep(20000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}
Use trident
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
//TopologyKafkaConsumerTrident.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutConfig;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.HashMap;

import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;


public class TopologyKafkaConsumerTrident {
//申请的kafka实例ip:port
private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
//指定要将消息写入的topic
private final static String TOPIC = "storm_test";

public static void main(String[] args) throws Exception {
ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>(
(r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
new Fields("topic", "partition", "offset", "key", "value"));
//设置consumer参数
//函数参考http://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html
//参数参考http://kafka.apache.org/0102/documentation.html
KafkaTridentSpoutConfig spoutConfig = KafkaTridentSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPIC)
.setProp(new HashMap<String, Object>(){{
put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1"); //设置group
put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); //设置自动确认
put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000"); //设置session超时
put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); //设置请求超时
}})
.setFirstPollOffsetStrategy(LATEST) //设置拉取最新消息
.setRecordTranslator(trans)
.build();

TridentTopology builder = new TridentTopology();
// Stream spoutStream = builder.newStream("spout", new KafkaTridentSpoutTransactional(spoutConfig)); //事务型
Stream spoutStream = builder.newStream("spout", new KafkaTridentSpoutOpaque(spoutConfig));
spoutStream.each(spoutStream.getOutputFields(), new BaseFunction(){
@Override
public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
System.out.println(tridentTuple.getStringByField("value"));
tridentCollector.emit(new Values(tridentTuple.getStringByField("value")));
}
}, new Fields("message"));

Config conf = new Config();
conf.setMaxSpoutPending(20);conf.setNumWorkers(1);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.build());
}
else {
StormTopology stormTopology = builder.build();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, stormTopology);
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();stormTopology.clear();
}
}
}
Step 5: Submit Storm

use mvn package After compilation, you can submit to the local cluster for debug testing, or you can submit to the production cluster for running.

1
storm jar your_jar_name.jar topology_name
1
storm jar your_jar_name.jar topology_name tast_name

Logstash accesses Kafka

Logstash is an open-source log processing tool that can collect data from multiple sources, filter the collected data, and store the data for other purposes.

Logstash is flexible, has powerful parsing capabilities, and is rich in plugins to support a variety of input and output sources. As a horizontally scalable data pipeline, Logstash works with Elasticsearch and Kibana to be powerful in log collection and retrieval.

How Logstash works

Logstash data processing can be divided into three stages: inputs → filters → outputs.

  1. Inputs: Generate data sources such as files, syslog, redis, and beats.
  2. filters: Modify the filtered data, which is an intermediate link in the Logstash data pipeline, and can change events according to conditions. Some common filters include: grok, mutate, drop, and clone.
  3. outputs: Transfer data elsewhere, an event can be transmitted to multiple outputs, and this event ends when the transfer is complete. Elasticsearch is the most common outputs.

At the same time, Logstash supports encoding and decoding, and the format can be specified on the inputs and outputs sides.

Logstash accesses Kafka’s advantages

  • Data can be processed asynchronously: prevents bursts.
  • Decoupling: When Elasticsearch is exceptional, upstream work is not affected.

Note:
Logstash filtering consumes resources and affects its performance if deployed on a production server.

Procedure

Preparations
Step 1: Create a topic

Create a file named logstash_testof Topic.

Step 2: Access Kafka
Access as inputs
  1. execute bin/logstash-plugin listto see if the supported plug-ins are included logstash-input-kafka

  2. at .bin/ directory to write a configuration file input.conf
    The standard output is used here as the data endpoint and Kafka as the data source.

    input {
        kafka {
            bootstrap_servers => "xx.xx.xx.xx:xxxx" // kafka 实例接入地址
            group_id => "logstash_group"  // kafka groupid 名称
            topics => ["logstash_test"] // kafka topic 名称
            consumer_threads => 3 // 消费线程数,一般与 kafka 分区数一致
            auto_offset_reset => "earliest"
        }
    }
    output {
        stdout{codec=>rubydebug}
    }
    
  3. Run the following command to start logstash for message consumption:

    ./logstash -f input.conf
    

    You will see that the data in the topic just now is consumed.

Access as outputs
  1. execute bin/logstash-plugin listto see if the supported plug-ins are included logstash-output-kafka

  2. At.bin/directory to write a configuration file output.conf
    Standard inputs are used here as the data source and Kafka as the data destination.

    input {
        input {
          stdin{}
      }
    }
    
    output {
       kafka {
            bootstrap_servers => "xx.xx.xx.xx:xxxx"  // ckafka 实例接入地址
            topic_id => "logstash_test" // ckafka topic 名称
           }
    }
    
  3. Run the following command to start Logstash and send a message to the created topic:

    1
    ./logstash -f output.conf
  4. Start Kafka Consumer to verify the production data from the previous step.

    1
    ./kafka-console-consumer.sh --bootstrap-server 172.0.0.1:9092 --topic logstash_test --from-begging --new-consumer

Filebeats access to Kafka

Beats platform A collection of single-purpose data collectors. Once installed, these collectors can be used as lightweight agents to send acquisition data from hundreds, thousands, or thousands of machines to targets.

Beats has a variety of collectors that you can download to suit your needs. This topic uses Filebeat (Lightweight Log Collector) as an example to introduce the operation methods of Filebeat to access Kafka and solutions to common problems after access.

Prerequisites

Procedure

Step 1: Create a topic

Create a file named test of Topic.

Step 2: Prepare the configuration file

Go to the installation directory of Filebeat and create the configuration monitoring file filebeat.yml.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
##======= Filebeat prospectors ==========
filebeat.prospectors:
- input_type: log
## 此处为监听文件路径
paths:
- /var/log/messages

##======= Outputs =========

##------------------ kafka -------------------------------------
output.kafka:
version:0.10.2 // 根据不同 Kafka 集群版本配置
# 设置为Kafka实例的接入地址
hosts: ["xx.xx.xx.xx:xxxx"]
# 设置目标topic的名称
topic: 'test'
partition.round_robin:
reachable_only: false

required_acks: 1
compression: none
max_message_bytes: 1000000

# SASL 需要配置下列信息,如果不需要则下面两个选项可不配置
username: "yourinstance#yourusername" //username 需要拼接实例ID和用户名
password: "yourpassword"
Step 4: Filebeat sends the message
  1. Run the following command to start the client:

    1
    sudo ./filebeat -e -c filebeat.yml 
  2. Add data to the monitoring file (for example, a testlog file written to the listener).

    1
    2
    3
    echo ckafka1 >> testlog
    echo ckafka2 >> testlog
    echo ckafka3 >> testlog
  3. Open the topic corresponding to Consumer consumption to obtain the following data.

    1
    2
    3
    {"@timestamp":"2017-09-29T10:01:27.936Z","beat":{"hostname":"10.193.9.26","name":"10.193.9.26","version":"5.6.2"},"input_type":"log","message":"ckafka1","offset":500,"source":"/data/ryanyyang/hcmq/beats/filebeat-5.6.2-linux-x86_64/testlog","type":"log"}
    {"@timestamp":"2017-09-29T10:01:30.936Z","beat":{"hostname":"10.193.9.26","name":"10.193.9.26","version":"5.6.2"},"input_type":"log","message":"ckafka2","offset":508,"source":"/data/ryanyyang/hcmq/beats/filebeat-5.6.2-linux-x86_64/testlog","type":"log"}
    {"@timestamp":"2017-09-29T10:01:33.937Z","beat":{"hostname":"10.193.9.26","name":"10.193.9.26","version":"5.6.2"},"input_type":"log","message":"ckafka3","offset":516,"source":"/data/ryanyyang/hcmq/beats/filebeat-5.6.2-linux-x86_64/testlog","type":"log"}
SASL/PLAINTEXT mode

IF YOU NEED TO CONFIGURE SALS/PLAINTEXT, YOU NEED TO CONFIGURE A USER NAME AND PASSWORD. Add a new username and password configuration to the Kafka configuration area.

Message Queuing CKafka - Documentation Center - Tencent Cloud (tencent.com)