카프카 따라해보기
이전 글 : 2019/03/09 - [프로그래밍/아키텍쳐] - 주키퍼(ZooKeeper)
카프카 공홈의 퀵스타트를 따라함
테스트환경은 라즈베리파이3(라파이/rbp), 라즈비안OS(64비트)
굳이 데탑(윈도우)에서 안하고 따로 서버 세팅하느라 좀 오래걸렸는데
AWS 프리티어가 아직 남아서 t2.small에서 해볼려다가 카프카 성능 테스트겸 라파이에 했다.
원래 라즈비안이 32비트환경이었는데 이왕하는거 64비트로 바꿔서 하려고 새로 포맷하고 다시 세팅했다.
집 분전함에 전화+인터넷이 같이 있는데 거기에 라파이를 보관 중이라
이렇게 매번 뜯을때마다 고통이다.(식탁 밀어내고 뚜껑따서 꾸역꾸역 선정리..)
그리고 메모리가 기본 1G이기 때문에 별도로 swap 4G를 할당했다. 주로 메모리를 사용하는게 아니고 디스크IO를 사용하기 때문에 상관없긴하지만 CPU도 1코어에 1Ghz도 안되기 때문에 메모리라도 넉넉히 쓰려고 크게 잡아놨다.
※주의 : 유닉스 계열은 사이트 예제를 그대로 따라하면 되는데 윈도우는 폴더랑 확장자가 다르다...(step1 전에 언급된다)
Step 1. 다운로드
흠... 그냥 받으면 된다.(근데 벌써 2.1.0 버전이네...)
Step 2. 서버 시작
(난 ssh으로 접속해서 별도로 screen을 띄워서 작업을 했다.)
앞에서 굳이 주키퍼에 대해서 좀 공부했는데
막상 실습에서는 주키퍼가 뭔지 설명은 안해주고 일단 실행부터 시킨다.
그런데 좋은점은 이미 주키퍼/카프카에 대한 config가 있어서 크게 신경쓰지 않아도 된다는 것이다.
그래서 주키퍼와 카프카 각각을 screen으로 띄워서 실행시켰다.
디렉토리 구조를 보니 아파치 아니랄까봐 매우매우 아파치스러운 구조였다.(자주보던 config, logs 등등)
로그구조도 아파치스러운데 controller.log가 신기해보여서 까보니 토픽에 대한 브로커관련 내용인 것 같았다.
(정확히 무슨 뜻인지는 모르겠으나 토픽의 consumer 오프셋 내용이었다)
만약에 실무에서 사용한다면 docker-container로 묶어서 사용해야할 듯 싶다.
포트 충돌 나는건 물론이고 프로세스 관리해야되는게 주키퍼/카프카해서 벌써 2개이다;;
Step 3. 토픽 생성
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
이렇게 생성하는데 토픽을 생성할때 주키퍼를 옵션으로 넣어줘야되는 것 같았다.
그리고 테스트겸 로그도 볼겸 토픽 2개를 생성해봤다.
Step 4&5. 메세지 보내기 & consumer 시작
엔터를 기준으로 메세지를 구별하는데 최대 메세지 크기가 얼마인가 봤더니
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
OOM 보호를 위해서 기본으로 정해진 사이즈가 100MB(104857600)였다. 그런데 버퍼는 100KB(102400)인걸보면 되도록 메세지 큰건 지양해야될 듯하다. 아무튼 버퍼 자체가 생각보다 작은걸보면 저사양에서도 쓰기 좋은듯하다.(우리집은 빨라야 10MB/s니깐... ㅠㅠ)
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
레코드(로그)는 기본이 tmp에 쌓이는듯했다.
consumer 오프셋이 따로 폴더별로 있고 토픽에 대한 로그는 토픽명-파이션번호로 폴더로 분류되어 저장된다.
그런데 test-0의 로그 파일을 봤는데 따로 암호화는 안되는듯하고 저 정체모를 yyyyyyy가 delimeter인듯하다
밑에 test2 4 3은 심심해서 더 넣어본건데 암호화가 안된다는게 좀 흠...
중요한 내용이라면 메세지를 보낼때부터 암호화하던가 인증방식을 따로 쓰게끔 해야될 듯하다.
Step 6. 멀티 브로커 클러스터 세팅
사실 이걸 해보고 싶어서 퀵스타트 따라하는 중인데 id 1,2는 라파이에 id 0은 지금 글을 쓰고 있는 윈도우 데탑에 카프카를 설치해서 동작시켰다.
[내가 테스트 중인 카프카 클러스터의 대략적인 구조]
그리고 바로 토픽을 생성해보았다.
gnidoc327@raspberrypi:~/workspace/kafka$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2
여기서 leader, replicas, isr가 뭐냐
- leader는 모든 read/write를 하는 노드를 뜻함
- replicas는 현재 살아있든 리더이든 아니든 상관없이 복제하고 있는 노드 수다.(즉, 모든 node)
- isr는 동기화 중인 replicas를 뜻한다. 살아있고 리더를 caught-up(따라가다)하고 있는 노드 수다.
현재 my-replicated-topic 토픽은 3개의 replicas로 구성되어 있고 id 2번이 leader 인 상태이다.
그런데 isr이 1,2 즉, 0이 죽어있다고 나왔다.
그런데 제대로 연결했다면 0, 1, 2해서 3개가 나와야되는거 아닌가???
(사실 이때는 주키퍼가 라파이에 있었고 삽질하다가 윈도우로 옮겼다 ㅋㅋ)
그래서 삽질한 결과 DNS 문제라는 것을 알았다.
예제대로 했고 로그들이 정상적으로 뜨는걸 봤는데 뭐지 했는데 정말 기본적인 실수였다.
[퀵스타트에서 config 수정하는 부분]
서버개발 2년차가 됐는데도 아직도 이런 실수를 한다...
listeners가 난 포트번호만 해당하는 줄 알고 포트만 변경했는데
사실은 PLAINTEXT://192.168.0.100:9093 처럼 IP를 직접 넣어줘야된다.
왜냐하면 kafka 클러스터의 노드들은 주키퍼와 소켓통신을 하는데
출발/목적지 DNS를 모른다면 어떻게 서로 양방향 통신을 하겠는가???
(그래서 그냥 예제에서는 하나의 서버에서 3개의 노드를 생성했기 때문에 필요없었지만 나처럼 2개 이상의 서버라면 IP까지 입력해주어야한다.)
고치고 나서 다시 확인해보니 매우 잘된다는 것을 확인해볼 수 있다.
$ bin/windows/kafka-topics.bat --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
결과적으론 아래와 같은 구조가 되었다.
[예제 최종 결과]
그리고 producer, consumer API를 통해서 메세지 확인하는 스크립트가
아래의 오류를 뱉으면서 이상하게 윈도우만 동작을 안했다...
// Producer
[2019-03-13 03:50:58,341] WARN [Producer clientId=console-producer] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
// Consumer
[2019-03-13 03:51:40,682] WARN [Consumer clientId=consumer-1, groupId=console-consumer-73936] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
결국 기본 timeout인 6000ms를 초과해서 에러가 발생했는데 아직도 딱히 이유를 모르겠다;;
그래서 장애 내결함성 테스트도 리더의 위치는 상관없이 윈도우 노드를 죽이면 잘안되고 라파이쪽은 죽여도 잘된다...
(윈도우서버 접읍시다)
Step 7. 카프카 connect로 데이터 import/export 하기
여기도 마찬가지로 윈도우 스크립트는 안된다.
그리고 라파이에선 매우매우 느린듯 싶다. 10분째 기다리는데 아래와 같은 로그만 뜬다.
넘나... 슬픈것...
Step 8. 카프카 Streams으로 데이터 처리하기
실시간이나 마이크로서비스를 위한게 카프카 streams 클라이언트 라이브러리란다. 이 라이브러리는 카프카 클러스터에 input/output data를 저장한다. 그리고 streams API를 쓰는 예제도 있다. 이 건 나중에 해보는걸로 ㅎㅎ
느낀점
카프카를 보니 기능이 많고 잘 사용한다면 안정적이라는 것을 알 수 있는데
개인적인 생각으로는 이 클러스터 운영하는거 자체가 일일듯 싶다.
왜냐
- 서버에 장애 발생시 자동 승격 기능으로 leader가 변경되는건 좋은데 문제는 이런 장애를 캐치해야되는게 일이다.
- leader에 장애가 발생했을시 leader에서 아직 replicas로 복제되지 않은 데이터는 유실된다.
(이건 어쩔수없는 부분이지만 흠.. 방법이 없나) - API를 사용해야되는데 어떤건 주키퍼를 통해서, 어떤건 카프카를 통해서 하는게 이상하고 별도의 스크립트로 돌아가서 클라이언트 라이브러리나 cli를 따로 만들어야됨
(토픽 생성은 주키퍼로하는데 브로커리스트는 브로커에 직접 요청 날림) - 어쨌든 메세지(데이터)들이 파일로 관리되니깐 스토리지 죽으면 지옥이 펼쳐짐
- 파일에 저장되는 메세지가 암호화되지 않음
- 주키퍼가 죽으면 답이 없는거 같다.(second, multi 주키퍼 설정이 없음)
- 주키퍼 죽으면 카프카 프로세스가 ctrl+c 인터럽트로 안죽는다.
- config마다 ip들을 다 입력해야되는데 이런 설정바꿔야할 cli도 필요할 듯
- 트래픽 몰려서 디스크 꽉차면 어떡하나
그냥 소개읽고 예제정도만 써본 나도 이정도 걱정하는데 흠...
이미 기업들이 다 사용하고 있으니 어떤 정답이 있지 않을까 싶다.
그리고 AWS의 SQS를 생각해보면 적어도 제품 문제 때문에 메세지가 날아간 경우도 없고
메세지가 쌓이면 쌓이는대로 들고 있고
메세지당 돈을 받아서 소규모 + 불규칙한 트래픽에서 사용하기엔 좋다.
대신 SQS는 메세지 크기가 256KB 고정이고 카프카는 기본 메세지 크기가 100MB이니
정말 큰 빅데이터를 처리하기에는 카프카가 적합하지 않을까 싶다.
(물론, 네트워크가 메세지 크기를 버틸지는 모르겠지만 ㅎㅎ)
대신 완벽한 클러스터 구성하는건 좀...
지옥일 듯 싶다.(지옥의 인프라)
이걸 쓸 기회가 있을진 모르겠지만 배웠다는것에 만족!
나중에 개인 플젝에서 한번 써봐야겠다.