본문 바로가기

리액티브 프로그래밍(Reactive Programming)/리액티브 코프링

[리액티브 코프링] R2DBC 사용법 (데이터 저장 & 수정)

반응형

이전글 : [리액티브 코프링] R2DBC 사용법 (데이터 조회)

모든 예제 코드는 필자의 github 레포지토리 에서 확인할 수 있다.

데이터 저장하기

이번 글에서 다루는 데이터를 저장하는 방법은 아래와 같다.

  1. 레포지토리의 save() 함수 사용
  2. Fluent API 사용
  3. Native Query 사용
  4. Batch Insert

먼저, 1, 2, 3번에 해당하는 코드를 보도록 하자.
각각의 테스트 코드 위에 주석으로 데이터를 저장하는 방법을 명시했다.

@SpringBootTest
class ItemSaveTest {
    @Autowired
    private lateinit var itemRepository: ItemRepository

    @Autowired
    private lateinit var connectionFactory: ConnectionFactory

    @Autowired
    private lateinit var dataBaseClient: DatabaseClient

    @Test
    // 1. 레포지토리의 save() 함수 사용
    fun saveTest() {
        val item = Item(name = "테스트 아이템", price = 110.0)
        itemRepository.save(item)
            .`as`(StepVerifier::create)
            .expectNextMatches {
                Assertions.assertEquals(item.name, it.name)
                Assertions.assertEquals(item.price, it.price)
                true
            }
            .verifyComplete()
    }

    @Test
    // 2. Fluent API 사용
    fun saveByTemplateTest() {
        val r2dbcEntityTemplate = R2dbcEntityTemplate(connectionFactory)
        val item = Item(name = "테스트 아이템", price = 110.0)
        r2dbcEntityTemplate.insert(Item::class.java)
            .using(item)
            .`as`(StepVerifier::create)
            .expectNextMatches {
                Assertions.assertEquals(item.name, it.name)
                Assertions.assertEquals(item.price, it.price)
                true
            }
            .verifyComplete()
    }

    @Test
    // 3. native query 사용
    fun saveByQueryTest() {
        val id = 999L
        dataBaseClient.sql("INSERT INTO item(id, name, price) VALUES ($id, '테스트 아티템 15', 22.99)")
            .fetch().all()
            .`as`(StepVerifier::create)
            .then {
                itemRepository.findById(id)
                    .`as`(StepVerifier::create)
                    .expectNextMatches {
                        Assertions.assertEquals(id, it.id)
                        true
                    }.verifyComplete()
            }
            .verifyComplete()
    }

}

전체 코드 링크

위에서부터 차례로 보자.

  1. 레포지토리의 save() 함수 사용
    1. JPA를 사용해봤다면 익숙한 방법이다.
    2. 이 기능을 사용하기 위해서 엔티티마다 repository를 선언해 줘야 한다는 단점이 있다.
  2. Fluent API 사용
    1. 데이터 조회 편에서 다룬 Fluent API를 사용하고 있다.
    2. IDE 자동 완성 기능을 사용할 수 있다는 장점이 있다.
    3. Custom Repository, Custom DAO를 구현할 때 사용하기 유용하다.
  3. native query 사용
    1. native query를 개발자가 작성해서 사용하는 방법이다.
    2. 반환 값이 Map 데이터타입이라 매핑 로직을 개발자가 직접 구현해야 한다.

필자는 1번은 Item 엔티티를 저장할 때 사용한다.
왜냐하면, Item을 저장하는 기능은 Item 엔티티를 영속화하는 것만으로 끝나기 때문이다.
2번은 Cart를 저장할 때 사용한다.
이후 연관 관계 구현하기에서 자세히 다루겠지만, Cart를 저장하는 기능은 Cart뿐만 아니라 CartItem이라는 엔티티도 같이 영향을 받게 된다.
그리고 CartItem에 대한 변경 점은 Cart에 종속시키고 싶기 때문에 CartItem의 repository를 별도로 만들지 않았다.
그래서 연관 관계를 구현할 때 customRepository를 구현하고 Fluent API 사용할 것이다.


Batch Insert

Batch Insert는 여러 개의 insert 문을 모아서 한 번에 처리하는 기능이다.
하나의 트랜잭션에서 수백 건의 row를 insert 한다고 가정해보자.
이때 개별 row마다 insert를 수행하면 처리 시간이 오래 걸리고 불필요한 네트워크 자원이 소모된다.
만약, 트랜잭션이 걸려있다면 해당 테이블에 lock도 걸릴 것이다.
Batch Insert는 위의 단점을 극복하고자 여러 건의 insert 문을 하나의 구문(Statement)으로 모아서 처리하는 기능이다.

Item 엔티티를 Batch Insert 하는 코드를 보도록 하자.

@Repository
interface ItemRepository : ReactiveCrudRepository<Item, Long>, ItemCustomRepository

interface ItemCustomRepository {
   fun batchSave(items: List<Item>): Flux<Item>
}

@Repository
class ItemCustomRepositoryImpl(
    private val dataBaseClient: DatabaseClient
) : ItemCustomRepository {
    override fun batchSave(items: List<Item>): Flux<Item> {
        return dataBaseClient.inConnectionMany { connection ->
            val statement =
                connection.createStatement("INSERT INTO item(name, price) VALUES ($1, $2)")
                    .returnGeneratedValues("id", "name", "price")
            for (item in items) {
                statement.bind(0, item.name).bind(1, item.price).add()
            }
            Flux.from(statement.execute()).flatMap { result ->
                result.map { row, r ->
                    Item(row["id"] as Long, row["name"] as String, row["price"] as Double)
                }
            }
        }
    }
}

전체 코드 보기

 

위에서부터 차례대로 살펴보도록 하자.

  1. dataBaseClient.inConnectionMany() connection을 가져온다.
  2. connection.createStatement("INSERT INTO item(name, price) VALUES ($1, $2)") 실행할 insert 문을 작성한다.
  3. .returnGeneratedValues("id", "name", "price") insert 수행 결과로 생성된 item 값 중 반환받을 column을 명시해준다.
  4. Item 여러 건을 insert 해야하므로 for문을 사용해서 값을 바인딩한다.
  5. statement.bind(0, item.name).bind(1, item.price).add() 바인딩할 값의 인덱스(index)와 값을 명시해준다.
    1. 인덱스는 0부터 시작하며, 2번의 쿼리 VALUES ($1, $2)에 입력하는 값이다.
  6. statement.execute()로 insert 문을 실행해준다.
  7. statement.execute()의 결과값은 flatMap으로 Item 엔티티로 매핑해준다.

아래의 테스트 코드로 동작을 직접 확인해보자.

@SpringBootTest
class ItemSaveTest {
    @Autowired
    private lateinit var itemRepository: ItemRepository

    @Test
    fun batchSaveTest() {
        val item1 = Item(name = "배치 아이템1", price = 101.0)
        val item2 = Item(name = "배치 아이템2", price = 141.0)
        itemRepository.batchSave(listOf(item1, item2))
            .`as`(StepVerifier::create)
            .thenConsumeWhile {
                println("item save success: $it")
                true
            }.verifyComplete()
    }

}

데이터 수정하기

  1. 레포지토리의 save()
    1. 저장하는 방식
  2. Fluent API
  3. 쿼리 메소드 (@Modifying)

우선 1, 2번에 해당하는 코드를 보도록 하자.
각각의 테스트 코드 위에 주석으로 코드 동작 설명을 작성했다.

@SpringBootTest
class ItemUpdateTest {
    @Autowired
    private lateinit var itemRepository: ItemRepository

    @Autowired
    private lateinit var connectionFactory: ConnectionFactory

    @Test
    // 1. 레포지토리의 save()
    fun updateByRepositoryTest() {
        val updatedName = "updated item name"
        val updatedPrice = 0.0
        // id가 1,2인 item을 불러온다.
        itemRepository.findAllById(listOf(1, 2))
            // item의 name과 price를 수정한다.
            .flatMap {
                it.name = updatedName
                it.price = updatedPrice
                Mono.just(it)
            }
           // save()를 이용해 item을 저장한다.
            .flatMap {
                itemRepository.save(it)
            }
            .log()
            .`as`(StepVerifier::create)
            .thenConsumeWhile { item ->
                Assertions.assertEquals(item.name, updatedName)
                Assertions.assertEquals(item.price, updatedPrice)
                true
            }.verifyComplete()
    }

    @Test
    // 2. 레포지토리의 save()
    fun updateByFluentTest() {
        val r2dbcEntityTemplate = R2dbcEntityTemplate(connectionFactory)
        val updatedName = "updated item name"
        val updatedPrice = 0.0
       // id가 1,2인 item을 불러온다.
        itemRepository.findAllById(listOf(1, 2))
           // item의 name과 price를 수정한다.
            .flatMap {
                it.name = updatedName
                it.price = updatedPrice
                Mono.just(it)
            }
           // Fluent API를 이용해 item을 저장한다.
            .flatMap {
                r2dbcEntityTemplate.update(Item::class.java)
                    .matching(Query.query(Criteria.where("id").`is`(it.id!!)))
                    .apply(Update.update("name", it.name).set("price", it.price))
            }
            .log()
            .`as`(StepVerifier::create)
           // update가 성공적으로 진행되었는지 반환값으로 확인한다.
            .thenConsumeWhile {
                Assertions.assertNotNull(it)
                true
            }
            .then {
               // 업데이트한 item을 불러와서 값을 검증한다.
                itemRepository.findAllById(listOf(1, 2))
                    .`as`(StepVerifier::create)
                    .thenConsumeWhile { item ->
                        Assertions.assertEquals(item.name, updatedName)
                        Assertions.assertEquals(item.price, updatedPrice)
                        true
                    }.verifyComplete()
                true
            }.verifyComplete()
    }
}


쿼리 메소드 (@Modifying)를 사용하는 방식은 itemRepository에 메소드를 추가해야한다.

@Repository
interface ItemRepository : ReactiveCrudRepository<Item, Long>, ReactiveQueryByExampleExecutor<Item>,
    ItemCustomRepository {

    @Modifying
    @Query("UPDATE item SET name = :name, price = :price where id = :id")
    fun updateItem(name: String, price: Double, id: Long): Mono<Int?>
}

 

추가된 메소드의 특징은 아래와 같다.

  1. @Query 어노테이션으로 update문을 추가했다.
  2. @Modifying를 메소드 위에 명시해줬다.
  3. 메소드의 파라미터는 update문의 값과 매핑된다.

테스트 코드를 이용해서 결과를 확인해보도록 하자.

 

@SpringBootTest
class ItemUpdateTest {
    @Autowired
    private lateinit var itemRepository: ItemRepository

    @Test
    fun updateByModifyingQueryTest() {
        val updatedName = "updated item name"
        val updatedPrice = 0.0
        val itemId: Long = 1
        // update할 item의 정보를 입력해준다.
        itemRepository.updateItem(updatedName, updatedPrice, itemId)
            .log()
            .`as`(StepVerifier::create)
            // update가 성공적으로 진행되었는지 반환값으로 확인한다.
            .expectNextMatches {
                Assertions.assertNotNull(it)
                true
            }
            .then {
               // 업데이트한 item을 불러와서 값을 검증한다.
                itemRepository.findById(itemId)
                    .`as`(StepVerifier::create)
                    .thenConsumeWhile { item ->
                        Assertions.assertEquals(item.name, updatedName)
                        Assertions.assertEquals(item.price, updatedPrice)
                        true
                    }.verifyComplete()
                true
            }.verifyComplete()
    }
}

데이터 수정하기 참고 자료

반응형