1. 카프카 설치
https://kafka.apache.org/downloads 링크 클릭(컨트롤 클릭) -> 공식 다운로드 사이트 or 미러 사이트 에서 카프카 압축 파일 다운로드 -> 압축 해제 ->
2. 카프카 설정
디렉토리 config로 이동 ( ex) C:\dev\kafka_2.11-1.1.0\config) ->
server.properties 파일 텍스트 편집기로 오픈 ->
log.dirs=/tmp/kafka-logs 를 카프카 설치 경로로 변경 ->
( ex ) log.dirs=C:\\dev\\kafka_2.11-1.1.0\\logs)
3. 카프카 실행
3-1. 주키퍼 실행
%주키퍼 설치 및 설정은 [문서] ZOOKEEPER 설치 및 실행.docx 참고
Zookeeper 실행 ->
3-2. 카프카 실행
주키퍼 cmd창 유지한 채로 새로운 cmd 실행 ->
kafka 설치 경로로 이동 ->
( ex) cd C:\dev\kafka_2.11-1.1.0)
bin\windows\kafka-server-start.bat config\server.properties 입력 ->
4. 카프카 예제
4-1. 카프카 토픽 생성
주키퍼, 카프카 실행한 상태에서 새로운 cmd 실행 ->
kafka 설치 경로로 이동 ( ex) cd C:\dev\kafka_2.11-1.1.0) ->
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test20190715 입력 ->
Created topic “test20190715” 가 나오면 정상 ->
4-2. 카프카 토픽 리스트 조회
bin\windows\kafka-topics.bat --list --zookeeper localhost:2181 입력 ->
test20190715 가 나오면 정상 ->
4-3. 카프카 컨슈머 시작
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test20190715 입력 -> 아무 변화가 없어야 정상 ->
4-4. 카프카 프로듀서 시작
주키퍼, 카프카, 컨슈머 cmd창 유지한 채로 새로운 cmd 실행 ->
kafka 설치 경로로 이동 ( ex) cd C:\dev\kafka_2.11-1.1.0) ->
bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test20190715 입력->
메시지 입력 ->
consumer cmd 창에서 메시지 수신 확인
5. 메이븐과 이클립스를 이용한 카프카 클러스터 구축
5-1. 프로젝트 생성
이클립스 실행 -> maven project 생성 ->
pom.xml에 의존성 추가 -> 저장 ->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.1</version>
</dependency>
consumer code 작성 ->
package com.test.kafka.meta;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class consumer {
public static void main(String[] args) {
Properties configs = new Properties();
// 환경 변수 설정
configs.put("bootstrap.servers", "localhost:9092"); // kafka server host 및 port
configs.put("session.timeout.ms", "10000"); // session 설정
configs.put("group.id", "test20190715"); // topic 설정
configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key deserializer
configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value deserializer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs); // consumer 생성
consumer.subscribe(Arrays.asList("test20190715")); // topic 설정
while (true) { // 계속 loop를 돌면서 producer의 message를 띄운다.
ConsumerRecords<String, String> records = consumer.poll(500);
for (ConsumerRecord<String, String> record : records) {
String s = record.topic();
if ("test20190715".equals(s)) {
System.out.println(record.value());
} else {
throw new IllegalStateException("get message on topic " + record.topic());
}
}
}
}
}
producer code 작성 ->
package com.test.kafka.meta;
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class Producer {
public static void main(String[] args) throws IOException {
Properties configs = new Properties();
configs.put("bootstrap.servers", "localhost:9092"); // kafka host 및 server 설정
configs.put("acks", "all"); // 자신이 보낸 메시지에 대해 카프카로부터 확인을 기다리지 않습니다.
configs.put("block.on.buffer.full", "true"); // 서버로 보낼 레코드를 버퍼링 할 때 사용할 수 있는 전체 메모리의 바이트수
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정
// producer 생성
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
// message 전달
for (int i = 0; i < 5; i++) {
String v = "hello"+i;
producer.send(new ProducerRecord<String, String>("test20190715", v));
}
// 종료
producer.flush();
producer.close();
}
}
5-5. 실행
주키퍼 실행 -> 카프카 실행 ->
Run Producer -> Run Consumer
1. https://kafka.apache.org/quickstart
2. http://junil-hwang.com/blog/kafka-java/
'java framework > kafka' 카테고리의 다른 글
Kafka 용어 정리 (0) | 2019.08.08 |
---|