Apache Kafka

  • Apache Software Foundation의 scalar 언어로 된 오픈 소스 메시지 브로커 프로젝트
  • 실시간 데이터 피드를 관리하기 위해 통일된 높은 처리량, 낮은 지연 시간을 지닌 플랫폼 제공

  • Kafka 이전에 시스템 아키텍처
    • End-to-End 연결 방식의 아키텍처
    • 데이터 연동의 복잡성 증가(HW, 운영체제, 장애 등)
    • 서로 다른 데이터 Pipeline 연결 구조
    • 확장이 어려운 구조
  • Kafka 특징
    • Kafka를 통해 모든 시스템으로 데이터를 실시간으로 전송하여 처리할 수 있는 시스템
    • 데이터가 많아지더라도 확장이 용이한 시스템이 되었다.
    • Producer/Consumer 분리
    • 메시지를 여러 Consumer에게 허용
    • 높은 처리량을 위한 메시지
    • Scale-out 가능, Eco-system
  • Kafka Broker (Server)

    image

    • 이미지 출처: https://data-flair.training/blogs/kafka-broker/

    • 실행된 Kafka 애플리케이션 서버
    • 3대 이상의 Broker Cluster 구성을 권장
    • Zookeeper 연동
      • 역할: 메타데이터(Broker ID, Controller ID 등) 저장
      • Controller 정보 저장
    • n개 Broker 중 1대는 Controller 기능 수행
      • Controller 역할
        • 각 Broker에게 담당 파티션 할당 수행
        • Broker 정상 동작 모니터링 관리

Kafka 설치

  • https://kafka.apache.org/downloads 에서 원하는 버전의 Binary 다운로드
  • 압축 해제
    $ tar xvf kafka_2.13-2.7.0.tgz
    
  • 폴더 확인
    • config
      • properties로 설정 파일들이 저장되어 있다.
      • zookeper.properties, server.properties 등 zookeeper, server, producer, consumer 등에 대한 설정을 제공한다.
    • bin
      • linux, max, window(sh, batch) 등에서 사용할 수 있는 실행 파일을 제공한다

#### Producer와 Consumer

  • Kafka client
    • Kafka 와 데이터를 주고받기 위해 사용하는 Java Library
    • Producer, Consumer, Admin, Stream 등 Kafak 관련 API 제공
    • 다양한 3rd party library 종재
      • 참고: https://cwiki.apache.org/confluence/display/KAFKA/Clients
  • 서버 기동
    • Zookeeper 및 Kafka 서버 기동
      • $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
      • $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
    • Topic 생성
      • $KAFKA_HOME/bin/kafka-topics.sh –create –topic quickstart-events –bootstrap-server localhost:9092 –partitions 1
      • Producer에서 메세지를 보내면 Topic에 메세지가 저장이 되며 Consumer가 Topic에 있는 메세지를 읽어올 수 있다.
      • –create –topic으로 토픽을 생성할 수 있다(quickstart-events는 토픽이름)
    • Topic 목록 확인
      • $KAFKA_HOME/bin/kafka-topics.sh –bootstrap-server localhost:9092 –list
    • Topic 정보 확인
      • $KAFKA_HOME/bin/kafka-topics.sh –describe –topic quickstart-events –bootstrap-server localhost:9092
    • kafka-console-producer, kafka-console-consumer 을 통해서 콘솔상에서 테스트가 가능하다.
      • $KAFKA_HOME/bin/kafka-console-producer.sh –broker-list localhost:9092 –topic quickstart-events

        image

      • $KAFKA_HOME/bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic quickstart-events –from-begining

        image

      • producer에서 작성하면 consumer에게 전달된다
      • from-begining 은 topic에 저장되어 있는 처음 메세지부터 다 받을 수 있다.

#### Kafka Connect

  • Kafka Connect를 통해 Data를 Import/Export 가능
  • 코드 없이 Configuration으로 데이터를 이동
  • Standalone mode, Distribution mode 지원
    • RESTful API 지원
    • stream 또는 batch 형태로 데이터 전송 가능
    • 커스텀 Connector를 통해 다양한 Plubin 제공(File, AWS S3, DB etc ..)
  • Source System (Hive, jdbc ..) -> Kafka Connect Source -> Kafka Cluster -> Kafka Connect Sink -> Target System(AWS S3, …)
  • Kafka Connector 설치
    $ curl -O http://packages.confluent.io/archive/5.5/confluent-community-5.5.2-2.12.tar.gz
    $ curl -O http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz
    $ tar xvf confluent-community-6.1.0.tar.gz
    $ cd  $KAFKA_CONNECT_HOME
    
  • Kafka Connect 설정(기본으로 사용)
    $ KAFKA_HOME/config/connect-distriuted.properties
    
  • Kafka Connect 실행(KAFKA_HOME: confluent-6.1.0)
    ./bin/windows/connect-distributed.bat ./etc/kafka/connect-distributed.properties
    
  • Kafka Connect 실행 전에 Topic 목록 확인
    $ ./bin/windows/kafka-topics.bat --bootstrap-server localhost:9092 --list
    __consumer_offsets
    connect-configs
    connect-offsets
    connect-status
    
  • JDBC Connector 설치
    • https://docs.confluent.io/5.5.1/connect/kafka-connect-jdbc/index.html
      • download and extract the ZIP file -> confluentinc-kafka-connect-jdbc-10.2.1.zip 다운로드
    • etc/kafka/connect-distributed.properties 파일 마지막에 아래 plugin 정보 추가
      • plugin.path=[confluentinc-kafka-connect-jdbc-10.2.1 폴더]
        plugin.path=\D:\\Work\\Excutions\\confluentinc-kafka-connect-jdbc-10.2.1\\lib
        
        • window에 경우 \을 사용하기 위해 \를 하나 더 붙인다
    • JdbcSourceConnector에서 MariaDB 사용하기 위해 mariadb 드라이버 복사
      • ./share/java/kafka/ 폴더에 mariadb-java-client-2.7.3.jar 파일 복사
      • ${USER.HOMW}.m2\repository\org\mariadb\jdbc\mariadb-java-client\2.7.3 폴더에서 maraidb-java-client-2.7.3.jar 파일이 있다
  • Kafka Source Connect 사용
    • Kafka Connect API 활용하여 Source Connector 등록
      POST http://localhost:8083/connectors
      Body:  {
              "name" : "my-source-connect",
              "config" : {
                  "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
                  "connection.url":"jdbc:mysql://localhost:3306/mydb",
                  "connection.user":"root",
                  "connection.password":"test1357",
                  "mode": "incrementing",
                  "incrementing.column.name" : "id",
                  "table.whitelist":"users",
                  "topic.prefix" : "my_topic_",
                  "tasks.max" : "1"}}
      
      • mode: incrementing -> 데이터가 등록될 때 자동으로 증가시키는 모드로 설정
      • incrementing.column.name: id -> 자동으로 증가하는 컬럼은 id
      • table.whitelist: “users” -> whitelist는 MariaDB에 변경 사항이 생기면 topic에 저장하게 되는 데, 해당 테이블을 설정
      • topic.prefix: ‘my_topic_’ -> 저장할 topic은 my_topic_으로 시작한다.
    • Kafka Connect 목록 확인
      GET 192.168.56.2:8083/connectors
      [
        "my-source-connect"
      ]
      
      • vm에 kafka를 설치해였기 때문에 8083 방화벽을 오픈하여 연결하였다.
      • my-source-connect가 생성된 것을 확인할 수 있다.
    • Kafka Connect 확인
      GET http://localhost:8083/connectors/my-source-connect/status
      {
          "name": "my-source-connect",
          "connector": {
              "state": "RUNNING",
              "worker_id": "127.0.1.1:8083"
          },
          "tasks": [
              {
                  "id": 0,
              	"state": "RUNNING",
              	"worker_id": "127.0.1.1:8083"
              }
        	],
        	"type": "source"
      }
      
  • Kafka Sink Connect 사용
      POST http://localhost:8083/connectors
      Body:  {
              "name" : "my-sink-connect",
              "config" : {
                  "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
                  "connection.url":"jdbc:mysql://localhost:3306/mydb",
                  "connection.user":"root",
                  "connection.password":"test1357",
                  "auto.create":"true",
                  "auto.evolve": "true",
          "delete.enabled": "false",
          "tasks.max": "1",
          "topics": "my_topic_users"}}
      response-body: 
        {
        "name": "my-sink-connect",
        "config": {
            "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
            "connection.url": "jdbc:mysql://localhost:3306/mydb",
            "connection.user": "root",
            "connection.password": "test1357",
            "auto.create": "true",
            "auto.evolve": "true",
            "delete.enabled": "false",
            "tasks.max": "1",
            "topics": "my_topic_users",
            "name": "my-sink-connect"
        },
        "tasks": [],
        "type": "sink"
        }
    
    • connector를 사용하여 Sink 지정
    • “auto.create”: “true” -> 토픽과 같은 테이블을 생성한다는 의미로 테이블 구조는 토픽이 갖고 있는 구조 그대로 사용하여 생성한다.
    • “topics”: “my-topic_users” -> 구독하고자 하는
    • Kafka Connect 목록 확인
      GET 192.168.56.2:8083/connectors
      Response Body:
      [
        "my-sink-connect",
        "my-source-connect"
      ]
      
    • Kafka Connect 확인
      GET 192.168.56.2:8083/connectors/my-sink-connect/status
      Response body: 
      {
      "name": "my-sink-connect",
      "connector": {
          "state": "RUNNING",
          "worker_id": "127.0.1.1:8083"
      },
      "tasks": [
          {
              "id": 0,
              "state": "RUNNING",
              "worker_id": "127.0.1.1:8083"
          }
      ],
      "type": "sink"
      }
      
  • 예제
    • mariaDB 설치(docker 활용)
      • mariadb 설치
        $ docker pull mariadb
        $ docker run --name mariadb -p 3306:3306 -e MYSQL_ROOT_PASSWORD=test1357 --cap-add=NET_ADMIN mariadb 
        
        • port 는 3306, password는 test1357로 세팅했다
        • –cap-add=NET_ADMIN을 통해 외부 클라이언트에서 docker로 접속할 수 있도록 했다 (3306 포트 방화벽 오픈)
      • mariadb 실행
        $ docker exec -i -t mariadb bash
        root@....:/# mysql -uroot -ptest1357
        MariaDB [(none)]> show databases;
        MariaDB [(none)]> create database mydb;
        MariaDB [(none)]> use mydb;
        
        • show databases; 를 통해 사용가능한 Database를 확인할 수 있다.
        • create database mydb: mydb란 신규 데이터베이스 생성
        • use mydb: mydb 사용
    • pom.xml에 mariadb dependency 추가
      <dependency>
        <groupId>org.mariadb.jdbc</groupId>
        <artifactId>mariadb-java-client</artifactId>
      </dependency>
      
    • h2-console에 들어가 테이블 생성
      • url: org.mariadb.jdbc.Driver
      • driverClassName: jdbc:mariadb://192.168.56.2:3306/mydb
      • username: root
      • password: test1357

      image

      • vm에 docker로 mariadb를 설치했기 때문에 docker ip인 192.168.56.2로 설정하였다.
      • 이미지출처: https://empty-cloud.tistory.com/84
      create table users( 
        id int auto_increment primary key,
        name varchar(20),
        pwd varchar(20),
        created_at datetime default NOW()
      );
      
    • Kafka Source Connect 추가(MariaDB)
      • 직접 명령어를 주어 추가할 수 있다.
      • 또는 POST /connectors로
        $ echo '{
        > "name" : "my-source-connect",
        > "config" : {
        >    "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
        >    "connection.url":"jdbc:mysql://localhost:3306/mydb",
        >    "connection.user":"root",
        >    "connection.password":"test1357",
        >    "mode": "incrementing",
        >    "incrementing.column.name" : "id",
        >    "table.whitelist":"users",
        >    "topic.prefix" : "my_topic_",
        >    "tasks.max" : "1"
        >    }
        >}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
        {"name":"my-source-connect",
        "config":{
          "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
          "connection.url":"jdbc:mysql://localhost:3306/mydb",
          "connection.user":"root",
          "connection.password":"test1357",
          "mode":"incrementing",
          "incrementing.column.name":"id",
          "table.whitelist":"users",
          "topic.prefix":"my_topic_",
          "tasks.max":"1",
          "name":"my-source-connect"},
        "tasks":[],"type":"source"}
        
    • MariaDB에 데이터 추가
      MariaDB [mydb]> insert into users(name, pwd) values('test', 'test1234');
      Query OK, 1 row affected (0.010 sec)
      MariaDB [mydb]> select * from users;
      +----+------+----------+---------------------+
      | id | name | pwd      | created_at          |
      +----+------+----------+---------------------+
      |  1 | test | test1234 | 2021-08-01 11:04:17 |
      +----+------+----------+---------------------+
      1 row in set (0.002 sec)
      
    • my_topic_users에서 확인
      • 토픽 확인
        $ ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
        __consumer_offsets
        connect-configs
        connect-offsets
        connect-status
        my_topic_users
        
        • my_topic_users가 생성되었다.
      • kafka-console-consumer에서 입력 확인
        $ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic_users --from-beginning
        {"schema":
          {"type":"struct",
          "fields":[
          {"type":"int32","optional":false,"field":"id"},
          {"type":"string","optional":true,"field":"name"},
          {"type":"string","optional":true,"field":"pwd"}, 
          {"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,field":"created_at"}],
        "optional":false,
        "name":"users"
          },
          "payload":{"id":1,"name":"test","pwd":"test1234","created_at":1627815857000}}
        
    • Kafka sink connect 생성
      $ echo '{
        > "name" : "my-sink-connect",
        > "config" : {
        >    "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
        >    "connection.url":"jdbc:mysql://localhost:3306/mydb",
        >    "connection.user":"root",
        >    "connection.password":"test1357",
        >    "auto.create": "true",
        >    "auto.evolve": "true",
        >    "delete.enabled": "false",
        >    "tasks.max": "1",
        >    "topics": "my_topic_users"
        >    }
        >}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
      
      • my_topic_users란 테이블이 생성된 것을 확인할 수 있으며 users에 저장한 데이터가 입력된 것을 확인할 수 있다.
    • Kafka Producer를 이용하여 Kafka Topic에 데이터 직접 전송
      • Kafka-console-producer에서 데이터 전송 -> topic에 추가 -> mariadb에 추가
        $ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic_users
        > {"schema":
          {"type":"struct",
          "fields":[
          {"type":"int32","optional":false,"field":"id"},
          {"type":"string","optional":true,"field":"name"},
          {"type":"string","optional":true,"field":"pwd"}],
        "optional":false,
        "name":"users"
          },
          "payload":{"id":2,"name":"test2","pwd":"test2234"}}
        
        • schema: 전달하고자 하는 데이터 구조
        • payload: 전달하는 데이터

Service에 적용하기

  • 예제: Orders -> Catalogs
    • Orders Service에 요청된 주문의 수량 정보를 Catalogs Service에 반영
    • Orders Service에서 Kafka Topic으로 메시지 전송 -> Producer
    • Catalogs Service에서 Kafka Topic에 전송된 메시지 취득 -> Consumer

    • Catalog Service (Consumer)
      • dependency 추가
        <dependency>
          <groupId>org.springframework.kafka</groupId>
          <artifactId>spring-kafka</artifactId>
        </dependency>      
        
      • KafkaConsumerConfig.java
        • Topic 구독 후 변경사항을 이벤트 리스너로 사용
          @EnableKafka
          @Configuration
          public class KafkaConsumerConfig {
            /* 카프카 접속 정보 저장 빈*/
            @Bean
            public ConsumerFactory<String, String> consumerFactory() {
              Map<String, Object> properties = new HashMap<>();
              properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.2:9092");
              properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId"); // 여러 토픽을 그룹으로 지정하여 사용할 수 있다.
              properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
              properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);		
              return new DefaultKafkaConsumerFactory<>(properties);
            }	
            /* 리스너 */
            @Bean
            public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
              ConcurrentKafkaListenerContainerFactory<String, String> factory = 
                  new ConcurrentKafkaListenerContainerFactory<>();
              factory.setConsumerFactory(consumerFactory());
              return factory;
            }
          }
          
      • kafkaConsumer.java ```java
        @Service @Slf4j @Transactional public class KafkaConsumer { private final CatalogRepository catalogRepository; @Autowired public KafkaConsumer(CatalogRepository catalogRepository) { this.catalogRepository = catalogRepository; } @KafkaListener(topics = “example-catalog-topic”) public void upateQty(String kafkaMessage) { log.info(“Kafka message: “ + kafkaMessage); ObjectMapper mapper = new ObjectMapper(); Map<Object, Object> map = new HashMap<>(); try { map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object»() {}); } catch (JsonMappingException e) { e.printStackTrace(); } catch (JsonProcessingException e) { e.printStackTrace(); }

        Catalog catalog = catalogRepository.findById((Long)map.get("productId")).orElseThrow(IllegalArgumentException::new);
        catalog.setQty(catalog.getQty() - (Integer)map.get("qty"));   } }
        
    • Order Service (Producer)
      • dependency 추가
        <dependency>
          <groupId>org.springframework.kafka</groupId>
          <artifactId>spring-kafka</artifactId>
        </dependency>
        
      • KafkaProducerConfig.java
        @EnableKafka
        @Configuration
        public class KafkaProducerConfig {
          /*카프카 접속 정보 저장 빈*/
          @Bean
          public ProducerFactory<String, String> producerFactory() {
            Map<String, Object> properties = new HashMap<>();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.2:9092");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);			
            return new DefaultKafkaProducerFactory<>(properties);
          }
          /* Kafka Template -> 데이터 publish할 때 사용 */
          @Bean
          public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
           }	
        }
        
      • KafkaProducer.java
        @Service
        @Slf4j
        @Transactional
        public class KafkaProducer {
          private final KafkaTemplate<String, String> kafkaTemplate;
          @Autowired 
          public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
            this.kafkaTemplate = kafkaTemplate;
          }	
          public Orders send(String topic, Orders order) {
            ObjectMapper mapper = new ObjectMapper();
            String jsonString = "";	
            try {
              jsonString = mapper.writeValueAsString(order);
            }catch(JsonProcessingException e) {
              e.printStackTrace();
            }		
            kafkaTemplate.send(topic, jsonString);	
            log.info("Kafka Producer send data from the order service: " + jsonString);				
            return order;
          }
        }
        
      • 주문 생성 시 KafakProducer를 통해 토픽에 데이터 전송
        public ResponseOrder createOrder(Long userId, RequestOrder requestOrder) {
          Orders order = new Orders();
          order.setProductId(requestOrder.getProductId());
          order.setQty(requestOrder.getQty());
          order.setUnitPrice(requestOrder.getUnitPrice());
          order.setTotalPrice(requestOrder.getQty() * requestOrder.getUnitPrice());
          order.setUserId(userId);
          Orders returnOrder = orderRepository.save(order);	
          /* send this order to kafka */
          kafkaProducer.send("example-category-topic", returnOrder);
          return new ResponseOrder(returnOrder.getId(), returnOrder.getUserId(), returnOrder.getQty(), returnOrder.getUnitPrice(), 
              returnOrder.getTotalPrice(), returnOrder.getCreatedAt());
        }
        

Multi Service에서 데이터 동기화 문제

  • 만약 Order Service를 확장하여 여러개를 구동한다면, DB도 분산되기 떄문에 따로 저장되어 문제가 발생한다. (동기화 문제)
  • 해결 방법
    • Orders Service에 요청된 주문 정보를 DB가 아니라 Kafka Topic으로 전송
    • Kafka Topic에 설정된 Kafka Sink Connect를 사용해 단일 DB에 저장 -> 데이터 동기화
  • 예제
    • JPA를 사용하여 DB를 저장하는 부분을 Kafka로 전달하는 것으로 수정
    • mydb에 Order 테이블
    • OrderService의 Procuer에서 발생하기 위한 메시지 등록
      {
        "schema": {"type": "struct", "fields": [{"type": "string", "optinal": true, "field: "id"}, ...], "optinal": false, "name": "orders"},
        "payload": {"id": "..", "user_id": "...", "product_id": "...", "qty": 5, "total_price": 6000, "unit_price": "1200"
      }
      
      • Producer에서 발생하는 메시지를 DTO로 생성
        public class KafkaOrderDto implements Serializable {
          private Schema schema;
          private Payload payload;
        }
        public class Schema {
          private String type;
          private List<Field> fields;
          private String name;
        }
        public class Field {
          private String type;
          private boolean optional;
          private String field;
        }
        public class Payload {
          private Long id;
          private Long userId;
          private Long productId;
          private int qty;
          private int totalPrice;
          private int unitPrice;
        }
        
      • Producer 생성
        @Service
        @Slf4j
        @Transactional
        public class OrderProducer {
          private final KafkaTemplate<String, String> kafkaTemplate;
          private final List<Field> fields = Arrays.asList(
            new Field("int32", true, "id"), new Field("int32", true, "user_id"), new Field("int32", true, "product_id"),
            new Field("int32", true, "qty"), new Field("int32", true, "total_price"), new Field("int32", true, "unit_price"));
          private final Schema schema = Schema.builder().type("struct").fields(fields).optional(false).name("orders").build();
        	
          @Autowired 
          public OrderProducer(KafkaTemplate<String, String> kafkaTemplate) {
            this.kafkaTemplate = kafkaTemplate;
          }	
          public Orders send(String topic, Orders order) {
            Payload payload = Payload.builder().id(order.getId())
              .userId(order.getUserId())
              .productId(order.getProductId())
              .qty(order.getQty())
              .totalPrice(order.getTotalPrice())
              .unitPrice(order.getUnitPrice()).build();		
            KafkaOrderDto kafkaOrderDto = KafkaOrderDto.builder().schema(schema).payload(payload).build();
            ObjectMapper mapper = new ObjectMapper();
            String jsonString = "";
            try {
              jsonString = mapper.writeValueAsString(kafkaOrderDto);
            }catch(JsonProcessingException e) {
              e.printStackTrace();
            }	
            kafkaTemplate.send(topic, jsonString);	
            log.info("Order Producer send data from the order service: " + jsonString);			
            return order;
          }
        }
        
      • OrderService에 OrderProducer 추가
        public ResponseOrder createOrder(Long userId, RequestOrder requestOrder) {
          Orders order = new Orders();
          order.setProductId(requestOrder.getProductId());
          order.setQty(requestOrder.getQty());
          order.setUnitPrice(requestOrder.getUnitPrice());
          order.setTotalPrice(requestOrder.getQty() * requestOrder.getUnitPrice());
          order.setUserId(userId);
                	
          /* send this order to kafka */
          kafkaProducer.send("example-category-topic", order);
          /* Jpa 대신 Kafka로 전송 */
          // Orders returnOrder = orderRepository.save(order);
          orderProducer.send("orders", order);
          return new ResponseOrder(order.getId(), order.getUserId(), order.getQty(), order.getUnitPrice(), 
            order.getTotalPrice(), order.getCreatedAt());
        }
        
    • Kafka Sink Connector 추가
      • 127.0.0.1:8082/connectors POST로 데이터 추가
      • 한번만 등록하여 사용하면 된다.
        {
          "name": "my-order-sink-connect",
          "config": {
            "connector.class": "io.confluent.connect.jdbc.JdbcSingConnector",
            "connection.url": "jdbc:mysql://127.0.0.1:3306/mydb",
            "connection.user": "root",
            "connection.password": "test1357",
            "auto.create": "true",
            "auto.evolve": "true",
            "delete.enbaled": "false",
            "tasks.max": "1",
            "topics": "orders"
          }
        }
        

출처

  • Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA) 강의