Kafka集群环境搭建及使用教程
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz
tar -zxvf kafka_2.13-2.7.0.tgz
/root/kafka_2.13-2.7.0/k_data
/root/kafka_2.13-2.7.0/zk_data
/root/kafka_2.13-2.7.0/logs
vi /root/kafka_2.13-2.7.0/config/zookeeper.properties
dataDir=/root/kafka_2.13-2.7.0/zk_data
clientPort=2181
maxClientCnxns=2000
initLimit=5
syncLimit=2
server.1=192.168.172.137:2888:3888
server.2=192.168.172.138:2888:3888
server.3=192.168.172.139:2888:3888
echo 1 > /root/kafka_2.13-2.7.0/zk_data/myid
echo 2 > /root/kafka_2.13-2.7.0/zk_data/myid
echo 3 > /root/kafka_2.13-2.7.0/zk_data/myid
nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties >logs/zookeeper.log 2>1 &
vi /root/kafka_2.13-2.7.0/config/server.properties
broker.id=1
listeners=PLAINTEXT://192.168.172.138:9092
advertised.listeners=PLAINTEXT://192.168.172.138:9092
log.dirs=/root/kafka_2.13-2.7.0/k_data
zookeeper.connect==192.168.172.137:2181,192.168.172.138:2181,192.168.172.139:2181
nohup ./bin/kafka-server-start.sh config/server.properties >logs/kafka.log 2>1 &
./bin/kafka-topics.sh --list --zookeeper 192.168.172.137:2181,192.168.172.138:2181,192.168.172.139:2181
./bin/kafka-topics.sh --create --zookeeper 192.168.172.137:2181,192.168.172.138:2181,192.168.172.139:2181 --replication-factor 1 --partitions 1 --topic test
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.172.137:9092,192.168.172.138:9092,192.168.172.139:9092 --from-beginning --topic test
./bin/kafka-console-producer.sh --broker-list 192.168.172.137:9092,192.168.172.138:9092,192.168.172.139:9092 --topic test
./bin/kafka-console-producer.sh --broker-list 192.168.172.137:9092,192.168.172.138:9092,192.168.172.139:9092 --topic test --key-deserializer key999
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) {
//配置信息
Properties props = new Properties();
//kafka服务器地址
props.put("bootstrap.servers", "192.168.172.137:9092,192.168.172.138:9092,192.168.172.139:9092");
//设置数据key和value的序列化处理类
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", StringSerializer.class);
//创建生产者实例
KafkaProducer<String,String> producer = new KafkaProducer<>(props);
ProducerRecord record = new ProducerRecord<String, String>("test","key3","userName3");
//发送记录
producer.send(record);
producer.close();
}
}
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerTest {
public static void main(String[] args) {
//配置信息
Properties props = new Properties();
//kafka服务器地址
props.put("bootstrap.servers", "192.168.172.137:9092,192.168.172.138:9092,192.168.172.139:9092");
//必须指定消费者组
props.put("group.id", "test");
//设置数据key和value的序列化处理类
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", StringDeserializer.class);
//创建消息者实例
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
//订阅topic1的消息
consumer.subscribe(Arrays.asList("test"));
//到服务器中读取记录
while (true){
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String,String> record : records){
System.out.println("key:" + record.key() + "" + ",value:" + record.value());
}
}
}
}