Apache Kafka是一个分布式流处理平台,用于构建实时数据管道和流应用。它主要用于高吞吐量、低延迟的数据传输,提供发布和订阅消息、存储消息流,以及实时处理消息流的功能。
tar -xzf kafka_2.13-<version>.tgz
mv kafka_2.13-<version> /usr/local/kafka
~/.bashrc
或~/.profile
文件中添加以下内容:
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
启动Zookeeper
编辑$KAFKA_HOME/config/zookeeper.properties
文件,配置Zookeeper:
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
maxClientCnxns=60
admin.enableServer=false
启动Zookeeper:
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
配置Kafka Broker
编辑$KAFKA_HOME/config/server.properties
文件,配置Kafka Broker:
broker.id=0
log.dirs=/var/lib/kafka
zookeeper.connect=localhost:2181
listeners=PLAINTEXT://:9092
启动Kafka Broker
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
$KAFKA_HOME/bin/kafka-console-producer.sh --topic my_topic --bootstrap-server localhost:9092
然后在控制台输入消息,每行一个消息。
$KAFKA_HOME/bin/kafka-console-consumer.sh --topic my_topic --bootstrap-server localhost:9092 --from-beginning
server-1.properties
、server-2.properties
等,修改以下参数:
broker.id=1
log.dirs=/var/lib/kafka-1
zookeeper.connect=localhost:2181
listeners=PLAINTEXT://:9093
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server-1.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server-2.properties
Kafka Streams API用于构建实时流处理应用。
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
public class SimpleStreamProcessing {
public static void main(String[] args) {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("source-topic");
source.to("sink-topic");
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, new Properties());
streams.start();
}
}
Kafka Connect用于将Kafka与各种数据源和数据目标进行集成。
# 配置连接器
$KAFKA_HOME/config/connect-standalone.properties
# 启动连接器
$KAFKA_HOME/bin/connect-standalone.sh connect-standalone.properties source-connector.properties sink-connector.properties
通过掌握Kafka的基本概念、安装与配置方法,以及如何使用Kafka进行实时数据流处理和集成,你可以构建高吞吐量、低延迟的实时数据管道和流处理应用。Kafka的分布式架构和丰富的API使其适用于各种大数据处理和实时分析场景。