Kafka集群环境搭建及使用教程


wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz

tar -zxvf kafka_2.13-2.7.0.tgz 

/root/kafka_2.13-2.7.0/k_data
/root/kafka_2.13-2.7.0/zk_data
/root/kafka_2.13-2.7.0/logs

vi /root/kafka_2.13-2.7.0/config/zookeeper.properties 
dataDir=/root/kafka_2.13-2.7.0/zk_data
clientPort=2181
maxClientCnxns=2000
initLimit=5
syncLimit=2
server.1=192.168.172.137:2888:3888
server.2=192.168.172.138:2888:3888
server.3=192.168.172.139:2888:3888


echo 1 > /root/kafka_2.13-2.7.0/zk_data/myid
echo 2 > /root/kafka_2.13-2.7.0/zk_data/myid
echo 3 > /root/kafka_2.13-2.7.0/zk_data/myid

nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties >logs/zookeeper.log 2>1 &


vi /root/kafka_2.13-2.7.0/config/server.properties 
broker.id=1
listeners=PLAINTEXT://192.168.172.138:9092
advertised.listeners=PLAINTEXT://192.168.172.138:9092
log.dirs=/root/kafka_2.13-2.7.0/k_data
zookeeper.connect==192.168.172.137:2181,192.168.172.138:2181,192.168.172.139:2181

nohup ./bin/kafka-server-start.sh config/server.properties >logs/kafka.log 2>1 &

./bin/kafka-topics.sh --list --zookeeper 192.168.172.137:2181,192.168.172.138:2181,192.168.172.139:2181

./bin/kafka-topics.sh --create --zookeeper 192.168.172.137:2181,192.168.172.138:2181,192.168.172.139:2181 --replication-factor 1 --partitions 1 --topic test

./bin/kafka-console-consumer.sh --bootstrap-server 192.168.172.137:9092,192.168.172.138:9092,192.168.172.139:9092 --from-beginning --topic test

./bin/kafka-console-producer.sh --broker-list 192.168.172.137:9092,192.168.172.138:9092,192.168.172.139:9092 --topic test

./bin/kafka-console-producer.sh --broker-list 192.168.172.137:9092,192.168.172.138:9092,192.168.172.139:9092 --topic test --key-deserializer key999





import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerTest {
    public static void main(String[] args) {
        //配置信息
        Properties props = new Properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "192.168.172.137:9092,192.168.172.138:9092,192.168.172.139:9092");
        //设置数据key和value的序列化处理类
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", StringSerializer.class);
        //创建生产者实例
        KafkaProducer<String,String> producer = new KafkaProducer<>(props);
        ProducerRecord record = new ProducerRecord<String, String>("test","key3","userName3");
        //发送记录
        producer.send(record);
        producer.close();
    }
}




import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerTest {
    public static void main(String[] args) {
        //配置信息
        Properties props = new Properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "192.168.172.137:9092,192.168.172.138:9092,192.168.172.139:9092");
        //必须指定消费者组
        props.put("group.id", "test");
        //设置数据key和value的序列化处理类
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", StringDeserializer.class);
        //创建消息者实例
        KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
        //订阅topic1的消息
        consumer.subscribe(Arrays.asList("test"));
        //到服务器中读取记录
        while (true){
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
            for(ConsumerRecord<String,String> record : records){
                System.out.println("key:" + record.key() + "" + ",value:" + record.value());
            }
        }
    }
}