💼나에게 주어진 Task
kafka connect를 세팅하고, kafka-redis connector plugin 세팅을 해서 kafka에 들어온 메세지를 redis에 저장해라. (근데 이제 도커 세팅을 곁들인..)
- kafka connect 세팅
- kafka-redis plugin 컨테이너에 설치
- sink connector 등록 (api 요청)
📜kafka-connect 컨테이너 설정
- zookeeper, kafka, redis 설정은 되어있다고 가정합니다.
docker-compose.yml
kafka-connect:
image: confluentinc/cp-kafka-connect:6.0.14
container_name: kafka-connect
depends_on:
- kafka1
- kafka2
- kafka3
ports:
- "8083:8083"
environment:
#kafka 컨테이너에 hostname설정이 되어 있어야 한다.
CONNECT_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: connect-cluster
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_PLUGIN_PATH: "/home"
volumes:
- "./var/run/docker.sock:/var/run/docker.sock"
restart: on-failure
📜kafka-redis plugin
오픈소스 플러그인 다운로드
https://docs.confluent.io/kafka-connectors/redis/current/overview.html
- ‘kafka-connect’ 컨테이너의 /home 디렉토리에 다운받은 jcustenborder-kafka-connect-redis-0.0.4를 위치시킨다.
#서버에 위치한 플러그인을 컨테이너 내부로 이동시키는 명령어
docker cp /home/jcustenborder-kafka-connect-redis-0.0.4 kafka-connect:/home
#kafka-connect 컨테이너로 들어가서 잘 있나 확인
docker exec -it kafka-connect /bin/bash
#추가된 플러그인 적용을 위해 도커 kafka-plugin 컨테이너를 재시작
docker restart kafka-connect
📜redis-sink connector 등록
curl -X POST -H "Content-Type: application/json" \\
--data '{
"name": "redis-sink",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
"redis.hosts": "redis-kafka:6380",
"redis.database": "0",
"redis.client.mode": "Standalone",
"task.max": "3",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"internal.key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"internal.value.converter": "org.apache.kafka.connect.storage.StringConverter",
"plugin.path": "/home",
#kafka의 메세지 중, stock이라는 topic의 메세지들을 redis에 키:값 형태로 저장합니다.
"topics": "stock"
}
}' \\
http://<server-host>:8083/connectors/redis-sink/config
- 내가 생성한 커넥터가 잘 작동중인지 확인
http://<server-host>:8083/connectors/redis-sink/status
#혹시 만들었던 sink connector의 설정을 변경하고자 한다면?
curl -X PUT -H "Content-Type: application/json" \\
--data '{
"connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
"redis.hosts": "redis-kafka:6380",
"redis.database": "0",
"redis.client.mode": "Standalone",
"task.max": "3",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"internal.key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"internal.value.converter": "org.apache.kafka.connect.storage.StringConverter",
"plugin.path": "/home",
"topics": "stock"
}' \\
http://<server-host>:8083/connectors/redis-sink/config
#변경 후 커넥터 재시작!
curl -X POST http://<server-host>:8083/connectors/redis-sink/restart
🙄이번에 docker로 kafka connect 세팅하면서 알게된 점
- docker-compose.yml로 실행되는 여러 컨테이너는 독립적이며, 각각의 컨테이너 별로 작업을 할 수 있다.
- 컨테이너 내 환경설정 변경사항이 있다면, 컨테이너 restart를 통해 적용시켜준다.
- docker-compose down 명령어는 기존에 실행되고 있는 모든 컨테이너를 중지시키고 삭제한다. 만약 데이터를 보존하고자 한다면, 도커 볼륨 세팅을 해주어야 한다.
- 볼륨에 대해 모르고 막 down 해버렸다가 데이터가 날아간 경험이 있다 😇
- docker-compose.yml 내 컨테이너에 문제가 생겼을 경우 docker logs <해당컨테이너명> 으로 로그 확인하면 편했다.
- docker를 사용하기 전에 일일히 zookeeper,kafka,redis 설치 명령어 다 하고 하나하나 config 파일 설정해주는데 정말 오래 걸렸었는데, docker-compose 파일 하나로 묶어서 설치와 관리가 정말 간편했다. 쓸 이유가 충분하니 앞으로 웬만한 프로젝트엔 도커를 사용할 듯 싶음.