본문 바로가기

Language

[Kotlin] - Channels

 

Channel을 사용하면 서로 다른 코루틴들이 손쉽게 하나의 값을 공유할 수 있다.

채널은 BlockingQueue 와 유사하게 동작하며 값을넣기 위해 send() 를 사용하고,

값을 꺼내기 위해서 recieve() 를 사용한다.

 

Channel

fun main(){
   val ch = Channel<Int>()

    runBlocking {
        launch {
            for(x in 1..5) {
                ch.send(x * x)
                println("-----")
            }
        }
        repeat(5){
            println(ch.receive())
        }
    }
}

// -----
// 1
// 4
// -----
// -----
// 9
// 16
// -----
// -----
// 25

- "----"의 순서는 바뀔 수 있음

 

 

Channel 닫힘

.close()를 사용하면 채널이 닫힐수 있고 닫힌 채널에서는 더이상 값을 수신하지 못한다

채널 큐의 마지막에 닫힘 여부를 추가 하는 방식으로 동작하기 때문에 채널이 닫히기 직전까지의 수신은 보장한다

fun main(){
   val channels = Channel<Int>()
    runBlocking {
            launch {
                for(x in 1..5) {
                    channels.send(x * x)
                    println("----")
                }
                channels.close()
            }
            println("Done !")

        for(ch in channels){
            println(ch)
        }
    }
}
fun main(){
   val channels = Channel<Int>()
    runBlocking {
            launch {
                for(x in 1..5) {
                    channels.send(x * x)
                    println("----")
                }
            }
            channels.close()
            println("Done !")

        for(ch in channels){
            println(ch)
        }
    }
}

// Exception in thread "main" kotlinx.coroutines.channels.ClosedSendChannelException: 
// Channel was closed

- 그러나 위와 같이 launch 바깥에 close()를 사용할 경우 이미 채널이 닫힌상태에서 send()를 호출하여 exception 이 발생한다

 

 

Channel Producer

- produce{ }라는 코루틴 빌더와 이렇게 생성된 프로듀서가 생성하는 값들의 수신을 돕기 위한 consumeEach() 라는 확장 함수를 사용

- 채널을 생성하고 작업이 끝나면 자동으로 닫아줘 위의 exception 의 문제를 해결할 수 있다

fun main(){
    runBlocking {
        val sq = produceSq(5)
        sq.consumeEach {
            println(it)
        }
        println("DONE")
    }
}
fun CoroutineScope.produceSq(max: Int): ReceiveChannel<Int> = produce {
    for(x in 1..max){
        send(x * x)
    }
}

 

 

Channel Pipeline

하나의 코루틴이 데이터 스트림 (무한 스트림 포함)을 생산해내고 다른 하나 이상의 코루틴들이 이 스트림을 수신 받아 필요한 작업을 수행 한 후 가공된 결과를 다시 전송 가능하다

fun main(args: Array<String>) = runBlocking<Unit> {
    val numbers = produceNumbers(5)
    val doubledNumbers = produceDouble(numbers)
    doubledNumbers.consumeEach { println(it) }
    println("Done")
}

fun CoroutineScope.produceNumbers(max: Int): ReceiveChannel<Int> = produce {
    for (x in 1..max) {
        send(x)
    }
}

fun CoroutineScope.produceDouble(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    numbers.consumeEach { send(it * 2) }
}

- 위 예제는 어떤 정수를 produce {} 를 통해 만들어내고 그 결과를 produceDouble()의 인자로 전달하여 수의 두배를 생성해내는 프로듀서를 만들어 전달하는 예제이다

 

 

Channel (produce) vs Iterator (buildIterator)

데이터를 비동기적으로 연속적으로 생산해야 한다면 produce {}를 사용한다

ex) 센서, 네트워크 요청, 비동기 스트리밍

데이터를 순차적으로 동기적으로 제공해야 한다면 buildIterator {}를 사용한다

ex) 트리 탐색, 수열 생성, 단순 반복 연산

개념 동작방식 데이터 전송방식 비동기 여부
produce { } 코루틴에서 채널 생성 send(value) 비동기
send () produce {} 내부에서 값 전송 send(value) 비동기
receive () produce {}에서 값 받기 receive() 비동기
buildIterator { } 일반함수처럼 동기적으로 실행 yield(value) 동기
yield() buildIterator {}에서 값 반환 yield(value) 동기
next() Iterator에서 값 가져오기 next() 동기

 

 

그렇다면 여기서 정리해야 하는 부분이 생긴다

Queue/List 대신 굳이 buildIterator를 쓰는 이유는 무엇일까?

 

Queue/List가 적합한 경우

  • 데이터 크기가 작고 미리 생성이 가능할 때
  • 한 번만 사용할 데이터일 때
  • 값이 사라지는 게 상관없을 때

buildIterator가 적합한 경우

  • 필요할 때만 값을 생성하고 싶을 때 (Lazy)
  • 메모리를 아끼고 싶을 때 (큰 데이터 처리)
  • 값을 재사용해야 할 때
  • 무한 데이터 생성 (ex. 피보나치 수열, DFS 탐색)

즉, 데이터를 미리 알고 있으면 Queue/List, 데이터를 즉석에서 생성해야 하면 buildIterator 를 사용하면 된다 !

 

 

'Language' 카테고리의 다른 글

[Kotlin] - 기본 문법 정리  (1) 2025.02.02
[Kotlin] 제네릭 generic타입  (0) 2025.02.02
[Kotlin] 코루틴 (2) - Cancellation and Timeouts  (1) 2025.02.01
[Kotlin] 코루틴 (1) - Basic  (1) 2025.02.01