본문 바로가기

java framework/kafka

windows 환경에서 kafka 설치 및 실행

1.   카프카 설치

https://kafka.apache.org/downloads 링크 클릭(컨트롤 클릭) -> 공식 다운로드 사이트 or 미러 사이트 에서 카프카 압축 파일 다운로드 -> 압축 해제 ->

 

 

2.   카프카 설정

디렉토리 config로 이동 ( ex) C:\dev\kafka_2.11-1.1.0\config) ->

server.properties 파일 텍스트 편집기로 오픈 ->

log.dirs=/tmp/kafka-logs 를 카프카 설치 경로로 변경 ->

 ( ex ) log.dirs=C:\\dev\\kafka_2.11-1.1.0\\logs)

 


3.   카프카 실행

3-1. 주키퍼 실행

   %주키퍼 설치 및 설정은 [문서] ZOOKEEPER 설치 및 실행.docx 참고

Zookeeper 실행 ->

 

3-2. 카프카 실행

주키퍼 cmd창 유지한 채로 새로운 cmd 실행 ->

kafka 설치 경로로 이동 ->

( ex) cd C:\dev\kafka_2.11-1.1.0)

bin\windows\kafka-server-start.bat config\server.properties 입력 ->

 

 

 

4.   카프카 예제

4-1. 카프카 토픽 생성

주키퍼, 카프카 실행한 상태에서 새로운 cmd 실행 ->

kafka 설치 경로로 이동 ( ex) cd C:\dev\kafka_2.11-1.1.0) ->

bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test20190715 입력 ->

Created topic “test20190715” 가 나오면 정상 ->

 

4-2. 카프카 토픽 리스트 조회

bin\windows\kafka-topics.bat --list --zookeeper localhost:2181 입력 ->

test20190715 가 나오면 정상 ->

 

4-3. 카프카 컨슈머 시작

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test20190715 입력 -> 아무 변화가 없어야 정상 ->

 

 

 

4-4. 카프카 프로듀서 시작

주키퍼, 카프카, 컨슈머 cmd창 유지한 채로 새로운 cmd 실행 ->

kafka 설치 경로로 이동 ( ex) cd C:\dev\kafka_2.11-1.1.0) ->

bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test20190715 입력->

메시지 입력 ->

 

consumer cmd 창에서 메시지 수신 확인

 

 

5.   메이븐과 이클립스를 이용한 카프카 클러스터 구축

5-1. 프로젝트 생성

이클립스 실행 -> maven project 생성 ->

 

 

 

5-2. dependency 추가

pom.xml에 의존성 추가 -> 저장 ->

<dependency>

        <groupId>org.apache.kafka</groupId>

        <artifactId>kafka-clients</artifactId>

        <version>1.1.0</version>

       </dependency>

       <dependency>

           <groupId>org.apache.kafka</groupId>

           <artifactId>kafka-streams</artifactId>

           <version>1.1.0</version>

       </dependency>

       <dependency>

           <groupId>org.apache.kafka</groupId>

           <artifactId>kafka_2.11</artifactId>

           <version>0.8.2.1</version>

       </dependency>

 

 

 

5-3. consumer 작성

consumer code 작성 ->

package com.test.kafka.meta;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;

import java.util.Properties;

 

public class consumer {

 

    public static void main(String[] args) {

        Properties configs = new Properties();

        // 환경 변수 설정

        configs.put("bootstrap.servers", "localhost:9092");     // kafka server host port

        configs.put("session.timeout.ms", "10000");             // session 설정

        configs.put("group.id", "test20190715");                // topic 설정

        configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");    // key deserializer

        configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  // value deserializer

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);    // consumer 생성

        consumer.subscribe(Arrays.asList("test20190715"));      // topic 설정

        while (true) {  // 계속 loop 돌면서 producer message 띄운다.

            ConsumerRecords<String, String> records = consumer.poll(500);

            for (ConsumerRecord<String, String> record : records) {

                String s = record.topic();

                if ("test20190715".equals(s)) {

                    System.out.println(record.value());

                } else {

                    throw new IllegalStateException("get message on topic " + record.topic());

                }

            }

        }  

    }

   

}

 

 

 

5-4. producer 작성

producer code 작성 ->

package com.test.kafka.meta;

 

import java.io.IOException;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

 

public class Producer {

 

    public static void main(String[] args) throws IOException {

 

        Properties configs = new Properties();

        configs.put("bootstrap.servers", "localhost:9092"); // kafka host server 설정

        configs.put("acks", "all");                         // 자신이 보낸 메시지에 대해 카프카로부터 확인을 기다리지 않습니다.

        configs.put("block.on.buffer.full", "true");        // 서버로 보낼 레코드를 버퍼링 사용할 있는 전체 메모리의 바이트수

        configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");   // serialize 설정

        configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정

 

        // producer 생성

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);

       

        // message 전달

        for (int i = 0; i < 5; i++) {

            String v = "hello"+i;

            producer.send(new ProducerRecord<String, String>("test20190715", v));

        }

       

        // 종료

        producer.flush();

        producer.close();

    }

 

}

 

 

5-5. 실행

주키퍼 실행 -> 카프카 실행 ->

Run Producer -> Run Consumer

 

 

 

6.   참고

1.     https://kafka.apache.org/quickstart

2.     http://junil-hwang.com/blog/kafka-java/

 

'java framework > kafka' 카테고리의 다른 글

Kafka 용어 정리  (0) 2019.08.08