본문 바로가기
Computer Sience/Java

[JAVA8] CompletableFuture

by 제우제우 2024. 10. 4.

목차

  • 자바 Concurrent 프로그래밍 
  • Executors
  • Callable과 Future
  • CompletableFuture

자바 Concurrent 프로그래밍 

Concurent 소프트웨어?

  • 동시에 여러 작업을 할 수 있는 소프트웨어 
  • 예) 웹 브라우저로 유투브를 보면서 키보드로 문서에 타이핑을 할 수 있다. 
  • 동시성을 활용한 소프트웨어
  • 동시성(Concurrency): 여러 작업이 동시에 실행되는 것처럼 보이도록 프로그램을 작성하는 개념, 실제로는 
    단일 또는 다중 CPU에서 여러 작업이 교차적(컨텍스트 스위칭)으로 실행 

자바에서 지원하는 Concurrent

  • 멀티프로세싱 (ProcessBuilder)
  • 멀티쓰레드

자바 멀티쓰레드 프로그래밍

  • Thread / Runnable

스레드 이름 출력  

public static void main(String[] args) {
    System.out.println(Thread.currentThread().getName()); // main
}

현재 실행되는 스레드 이름을 가져온다. → main

 

스레드 생성 방법 - Thread 상속 

public class App {
    public static void main(String[] args) {
        MyThread myThread = new MyThread();
        myThread.start();

        System.out.println("Hello: " + Thread.currentThread().getName());
    }
    static class MyThread extends Thread{
        @Override
        public void run() {
            System.out.println("Thread: " + Thread.currentThread().getName());
        }
    }
}

// 실행 결과 
Hello: main
Thread: Thread-0

 

실행 결과를 보면 코드에서는 myThread.start()를 먼저 했지만 메인 스레드의 동작이 먼저 실행되었다는 걸 확인할 수 있다.

→ 스레드의 실행 순서는 알 수 없다. 

 

스레드의 실행 순서는 제어할 수 없고 보장되지 않는다. 

스레드가 언제 실행을 시작할지는 자바의 스레드 스케줄러가 결정하며, 이 스케줄링 과정은 운영체제에 의해 관리된다. 

즉 예측할 수 없으며, 다시 실행할 때마다 결과가 달라질 수 있다.

 

스레드 생성 방법 -  Runnable - 익명 클래스 

public static void main(String[] args) {
    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
            System.out.println("Thread: " + Thread.currentThread().getName());
        }
    });
    thread.start();
    System.out.println("Hello: " + Thread.currentThread().getName());
}

 

스레드 생성 방법 -  Runnable - 람다 표현식

 public static void main(String[] args) {
    Thread thread = new Thread(() -> {
        System.out.println("Thread: " + Thread.currentThread().getName());
    });
    thread.start();
    System.out.println("Hello: " + Thread.currentThread().getName());
}

 

쓰레드 주요 기능 

  • 현재 쓰레드 멈춰두기 (sleep): 다른 쓰레드가 처리할 수 있도록 기회를 주지만 그렇다고 락을 놔주진 않는다. (잘못하면 데드락 걸릴 수 있겠죠.)
  • 다른 쓰레드 깨우기 (interupt): 다른 쓰레드를 깨워서 interruptedExeption을 발생 시킨다. 그 에러가 발생했을 때 할 일은 코딩하기 나름. 종료 시킬 수도 있고 계속 하던 일 할 수도 있고.
  • 다른 쓰레드 기다리기 (join): 다른 쓰레드가 끝날 때까지 기다린다.

스레드 sleep()

public static void main(String[] args) {
    Thread thread = new Thread(() -> {
        try {
            Thread.sleep(1000L); // 스레드 1초 대기 상태 
        }
        catch (InterruptedException e){
            e.printStackTrace();
        }
        System.out.println("Thread: " + Thread.currentThread().getName());
    });
    thread.start();
    System.out.println("Hello: " + Thread.currentThread().getName());
}

// 출력 
Hello: main
Thread: Thread-0

 

Thread.sleep() 메소드를 호출하면 해당 스레드는 일정 시간 동안 waiting / time waiting 상태로 변경된다. 

이 상태에서는 CPU 스케줄러에 의해서 CPU 할당을 받지 않고 지정된 시간만큼 기다린다. 

시간이 지나면 다시 대기 상태(runnable)로 돌아가고 CPU 스케줄링에 따라 실행이 재개될 수 있다. 

 

스레드 interrupt()

public static void main(String[] args) throws InterruptedException {
    Thread thread = new Thread(() -> {
        while (true){
            try{
                System.out.println("Thread: " + Thread.currentThread().getName());
                Thread.sleep(1000L); // 1초
            }
            catch (InterruptedException e){
                System.out.println("interrupted");
                return; // Runnable 인터페이스 public abstract void run();
            }
        }
    });
    thread.start();

    System.out.println("Hello: " + Thread.currentThread().getName());
    Thread.sleep(3000L);
    thread.interrupt();
}

// 출력1 
Thread: Thread-0
Hello: main
Thread: Thread-0
Thread: Thread-0
interrupted

// 출력2
Hello: main
Thread: Thread-0
Thread: Thread-0
Thread: Thread-0
interrupted

 

.interrupt() 메서드는 스레드가 차단 상태(blocked, waiting, sleeping)에 있을 때만 InterruptedException을 발생시킨다. 

 

똑같은 로직을 2번 돌렸을 때 출력 결과가 다른 이유는 스레드 실행 순서는 보장을 할 수 없기 때문이다. 

그래서 메인 스레드가 먼저 출력될지 생성된 스레드가 먼저 출력될지는 알 수 없다.

하지만 그 이후에는 메인 스레드가 3초 동안 대기 상태로 바뀌기 때문에 생성된 스레드만 동작한다.

3초 이후에 메인 스레드가 깨어나면 interrupt() 호출을 하면서 생성된 스레드는 InterruptedException 발생하고 
return 된다. 

 

스레드 join()

public static void main(String[] args) throws InterruptedException {
    Thread thread = new Thread(() -> {
        System.out.println("Thread: " + Thread.currentThread().getName());
        try{
            Thread.sleep(10000L); // 10초
        }
        catch (InterruptedException e){
            throw new IllegalStateException(e);
        }
    });
    thread.start();
    System.out.println(LocalDateTime.now());
    System.out.println("Hello: " + Thread.currentThread().getName());
    thread.join(); // 다른 쓰레드가 끝날 때까지 기다린다
    System.out.println(LocalDateTime.now());
    System.out.println(thread + " is finished");
}

// 출력
Thread: Thread-0
2024-10-04T18:51:04.495400100
Hello: main
2024-10-04T18:51:14.340432200
Thread[Thread-0,5,] is finished

 

join() 메서드는 해당 스레드(thread)가 종료될 때까지 메인 스레드를 차단 상태로 만든다.

즉 메인 스레드는 새로운 스레드가 종료될 때 까지 대기한다. 

출력 시간을 보면 10초 차이가 난다. 


Executors

고수준 (High-Level) Concurrency 프로그래밍

  • 쓰레드를 만들고 관리하는 작업을 애플리케이션에서 분리.
  • 그런 기능을 Executors에게 위임

Executors가 하는 일

  • 쓰레드 만들기: 애플리케이션이 사용할 쓰레드 풀을 만들어 관리한다.
  • 쓰레드 관리: 쓰레드 생명 주기를 관리한다.
  • 작업 처리 및 실행: 쓰레드로 실행할 작업을 제공할 수 있는 API를 제공한다.

주요 인터페이스

  • package java.util.concurrent;
  • Executor: execute(Runnable)
  • ExecutorService: Executor 상속 받은 인터페이스로, Callable도 실행할 수 있으며, Executor를 종료 시키거나, 여러 Callable을 동시에 실행하는 등의 기능을 제공한다.
  • ScheduledExecutorService: ExecutorService를 상속 받은 인터페이스로 특정 시간 이후에 또는 주기적으로 작업을 실행할 수 있다.

Executor 인터페이스 

public interface Executor {
    void execute(Runnable command);
}

 

Executor는 가장 간단한 스레드 실행 인터페이스로, 기본적인 작업을 실행하는 기능을 제공

스레드 생성 및 실행의 세부 사항을 숨기고, 사용자는 작업을 Runnable로 제공하면 된다.

 

ExecutorService 인터페이스

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    // 생략
}

 

public class App {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(() -> {
            System.out.println("Thread " + Thread.currentThread().getName());
        });

        executorService.shutdown(); // 작업을 모두 끝내고 끝낸다.
        // executorService.shutdownNow(); // 끝낸다 -> 작업을 모두 끝내지 핞고 종료될 가능성이 있다.
    }
}
// 출력  
Thread pool-1-thread-1

 

 ExecutorService executorService = Executors.newSingleThreadExecutor();

 

Executors.newSingleThreadExecutor() 메서드를 호출하여 단일 스레드를 사용하는 ExecutorService 인스턴스를 생성

이 인스턴스는 요청된 작업을 하나의 스레드에서 순차적으로 실행

executorService.shutdown();

 

shutdown() 메서드를 호출하여 ExecutorService를 종료

이 메서드는 현재 진행 중인 작업이 완료된 후, 더 이상 새로운 작업을 받지 않고 종료

shutdown()을 호출한 후에는 더 이상 작업을 제출할 수 없다.

기존에 제출된 작업이 완료될 때까지 대기

executorService.shutdownNow();

 

shutdownNow() 메서드는 ExecutorService를 즉시 종료하려고 시도

이 메서드는 현재 진행 중인 작업을 중단하고, 대기 중인 작업도 취소할 수 있는 기능을 제공

이 메서드는 현재 실행 중인 작업에 Thread.interrupt()를 호출하여 중단 요청을 하며, 작업이 종료될 가능성이 있다.

중단 요청을 받은 작업은 InterruptedException을 발생시킬 수 있으므로, 이를 처리해야 할 수도 있다.

이 메서드는 List<Runnable>을 반환하여 종료되지 않은 작업을 확인할 수 있다.

마찬가지로 shutdownNow() 을 호출한 후에는 더 이상 작업을 제출할 수 없다.

 

FixedThreadPool(2) 

2개의 고정된 스레드를 사용하여 작업을 처리하는 구조 

public class App {
    public static void main(String[] args) throws InterruptedException {
 		
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(getRunnable("Hello "));
        executorService.submit(getRunnable("jeu "));
        executorService.submit(getRunnable("The "));
        executorService.submit(getRunnable("Java "));
        executorService.submit(getRunnable("Test "));

        executorService.shutdown();
    }
    private static Runnable getRunnable(String message){ // effectively final
        return () -> System.out.println(message + Thread.currentThread().getName());
    }
}

// 출력
jeu pool-1-thread-2
Hello pool-1-thread-1
The pool-1-thread-2
Java pool-1-thread-1
Test pool-1-thread-2

 

동작 방식 

1. ExecutorService에는 크게 2가지가 존재한다. Thread Pool / BlockingQueue 

    ThreadPool에는 미리 스레드 2개를 생성해 두었다. Executors.newFixedThreadPool(2)     

    BlockingQueue는 요청된 작업을  임시로 큐에 저장한다. (작업을 동시에 처리할 수 있는 스레드보다 많은 경우, 그 초과      된 작업을 일시적으로 보관해두기 위해)
    할일이 없는 스레드가 큐에서 consume해서 일을 처리한다. 

2. main 스레드 → ExecutorService 에게 작업(Runnable)을 보낸다.  → 큐에 쌓인다.

3. 스레드 풀에 있는 스레드가 큐에 쌓인 작업을 처리한다. 

4. 작업을 처리하고 큐에 쌓인 작업이 없다면 대기 상태가 되고 있다면 다음 작업을 큐에서 consume해서 반복한다. 

 

ScheduledExecutorService 인터페이스

public interface ScheduledExecutorService extends ExecutorService {
		// 생략 ...
}

 

공식 문서 설명

지정된 지연 후에 실행되거나 주기적으로 실행되도록 명령을 예약할 수 있는 ExecutorService 이다.

schedule 메서드는 다양한 지연이 있는 작업을 생성하고 실행을 취소하거나 확인하는 데 사용할 수 있는 작업 개체를 반환.

scheduleAtFixedRate 및 scheduleWithFixedDelay 메서드는 취소될 때까지 주기적으로 실행되는 작업을 생성하고 실행합니다.

Executor.execute(Runnable) 및 ExecutorService submit 메소드를 사용하여 제출된 명령은 요청된 지연 시간이 0으로 예약된다.

schedule 방법에서는 0 및 음수 지연(기간 제외)도 허용되며 즉시 실행 요청으로 처리된다.

모든 schedule 방법은 절대 시간이나 날짜가 아닌 상대적인 지연 및 기간을 인수로 허용한다.

java.util.Date 로 표현된 절대 시간을 필요한 형식으로 변환하는 것은 간단한 문제이다.

예를 들어, 특정 미래 date 에 예약하려면 다음을 사용할 수 있다.

schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS) .

그러나 상대적 지연 만료가 네트워크 시간 동기화 프로토콜, 시계 드리프트 또는 기타 요인으로 인해 작업이 활성화되는 현재 Date 와 일치할 필요는 없다는 점에 유의하라.

Executors 클래스는 이 패키지에 제공된 ScheduledExecutorService 구현을 위한 편리한 팩토리 메서드를 제공한다.

 

ScheduledExecutorService 예제 delay 주기 

public class App {
    public static void main(String[] args) throws InterruptedException {
        System.out.println(LocalDateTime.now());
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        scheduledExecutorService.schedule(getRunnable("Hello"), 3, TimeUnit.SECONDS); // 3초 delay

        scheduledExecutorService.shutdown();
    }
    private static Runnable getRunnable(String message){ // effectively final
        return () -> System.out.println(message + Thread.currentThread().getName() + " Time= " + LocalDateTime.now());
    }
}

// 출력 
2024-10-05T00:41:03.534137400
Hellopool-1-thread-1 Time= 2024-10-05T00:41:06.546261800

 

지금 출력을 보면 맨 처음 메인 스레드에서의 시간 출력은 41:03 인데 

스레드 풀에 있는 스레드 작업의 시간 출력은 41:06 이다.

의도한 대로 3초 지연(delay)가 지난 후에 작업이 수행된 것을 확인할 수 있었다. 

 

또한 shutdown()을 호출함으로써 모든 작업이 완료된 후 스레드 풀이 정상적으로 종료된다.

 

ScheduledExecutorService 예제 - delay + 반복 

public class App {
    public static void main(String[] args) throws InterruptedException {
        System.out.println(LocalDateTime.now());
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        // (시작 지연) initialDelay 1, (반복) period 2
        scheduledExecutorService.scheduleAtFixedRate(getRunnable("Hello "), 1, 2, TimeUnit.SECONDS);

        scheduledExecutorService.shutdown();
    }
    private static Runnable getRunnable(String message){ // effectively final
        return () -> System.out.println(message + Thread.currentThread().getName() + " Time= " + LocalDateTime.now());
    }
}

 

출력이 어떻게 나올까?

2024-10-05T00:50:16.628552600 메인 스레드에서 LocalDateTime.now() 출력만 나온다. 

 

나는 분명히 반복 작업을 요청하였고 내 예상은 내가 애플리케이션을 종료할 때 까지 무한하게 Runnable에서 정의한 출력물이 2초 간격으로 나올꺼라고 생각했다. 

 

shutdown()은 현재 ExecutorService(Queue)에 있는 작업은 모두 마치고 종료하지만 추가적으로 작업을 받지 않고 

종료되도록 한다.

그럼 scheduledExecutorService.scheduleAtFixedRate(getRunnable("Hello "), 1, 2, TimeUnit.SECONDS);

해당 작업은 1초 후 첫 번째 작업을 실행하고 이후 2초 간격으로 반복되도록 스케줄링되었다. 
그런데 이 작업은 1초 후에 큐에 들어가는거니까 메인 스레드에서 shutdown()이 호출된 시점에 큐에 작업은 없었던 것이다. 

그래서 첫 번째 작업조차 실행되지 않고 끝난다. 

System.out.println(LocalDateTime.now());
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.schedule(getRunnable("Hello"), 3, TimeUnit.SECONDS);

scheduledExecutorService.shutdown();

 

이 코드는 왜 근데 출력을 할까? 

해당 코드는 큐에 바로 작업이 들어가서 3초 후에 동작하는 방식이다.

핵심은 바로 큐에 들어간 것이다. 

그래서 shut down이 호출되어도 이미 예약된 작업이니 동작하는 것이다.

 scheduledExecutorService.scheduleAtFixedRate(getRunnable("Hello "), 1, 2, TimeUnit.SECONDS);

 

이 코드는 1초 후에 큐에 작업이 들어가는 건데 그전에 shutdown이 호출되어서 모든 작업이 시작도 되지 않고 끝나는 원리이다.

 

shutdown() 호출이 없으면 2초마다 무한하게 작업이 실행된다. 


Callable과 Future

Callable

  • Runnable과 유사하지만 작업의 결과를 받을 수 있다.
@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

 

Future

public class App {
    public static void main(String[] args) throws Exception {
        // 싱글 스레드풀 스레드 풀에 1개의 스레드가 생성 & BlockingQueue 존재 
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        // Callable Runnable 차이점 return 존재 
        Callable<String> hello = () -> {
              Thread.sleep(2000L);
              return "Hello";
        };
        // executor 할일(Callable) 제출 / 비동기 실행 / 반환 -> Future<T> 
        Future<String> future = executorService.submit(hello);
        System.out.println(future.isDone()); // 상태 조회 false
        System.out.println("Started: " + LocalTime.now());
        future.get(); // Future 비동기 계산의 결과를 나타낸다. 완료될 때까지 기다리고, 계산 결과를 검색하는 메서드가 제공
        System.out.println(future.isDone()); // 상태 조회 true
        System.out.println("End " + LocalTime.now()); // 시작과 2초 차이

        executorService.shutdown();
    }
}

// 출력 
false
Started: 16:44:12.721072700
true
End 16:44:14.590812900

 

동작 흐름 

작업 제출 시점 submit() 호출하는 순간 Callable 작업이 스레드 풀에서 실행을 시작한다. 

즉 Callable의 Thread.sleep(2000L)이 실행되고 2초 동안 대기하게 된다. 

 

중요한 점은 future.get()을 호출하지 않더라도 작업은 이미 백그라운드에서 실행 중이라는 거다. 

future.get()은 비동기 작업이 완료되기를 기다리는 역할을 할 뿐이다. 

 

future.isDone() 

future에는 isDone() 메소드를 제공하는데 해당 메소드는 작업의 완료 여부를 boolean 타입으로 반환한다. 

get() 이후 작업이 끝났는지 출력하면 true가 나오고 

get() 이전에 작업이 끝났는지 출력하면 false가 나오는 걸 확인할 수 있었다. 

 

출력 결과를 보면 2초 차이가 난다. 

 

future.get()은 블로킹 콜이다. 

future.get()의 호출 스레드는 main 스레드이다. 

main 스레드는 작업을 하는 스레드가 작업이 완료될 때까지 기다리므로 스레드는 아무 일도 하지 않고 멈춘 상태로 대기한다.

이를 블로킹이라고 한다. 

 

타임아웃 설정

또한 futuer.get() 메소드는 매개변수를 받을 수 있는데 타임아웃(최대한으로 기다릴 시간)을 설정할 수 있다. 

 future.get(1L, TimeUnit.SECONDS);

 

만약 지정한 시간 동안만 기다리다가 작업이 끝나지 않으면 예외를 던진다. 

public class App {
    public static void main(String[] args) throws Exception {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        Callable<String> hello = () -> {
              Thread.sleep(2000L);
              return "Hello";
        };
        Future<String> future = executorService.submit(hello);
        future.get(1L, TimeUnit.SECONDS); // TimeoutException 발생 
        executorService.shutdown();
    }
}

Exception in thread "main" java.util.concurrent.TimeoutException
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:204)
	at com.example.java8.App.main(App.java:22)

 

현재 Callable에 정의된 작업은 최소 2초 이상이 걸린다. 

하지만 future.get()에서 설정한 타임아웃은 1초이다. 

작업의 시간이 제한 시간을 넘겼으니 TimeoutException이 발생하게 된다. 

 

future.cancel()

Future 인터페이스에서 제공하는 cancel() 메서드는 비동기 작업을 취소할 때 사용한다.

이 메서드가 호출되면, 해당 작업이 완료되지 않았을 경우 중단을 시도합니다.

boolean cancel(boolean mayInterruptIfRunning);

 

cancel()의 매개변수인 mayInterruptIfRunning는 작업이 이미 실행 중일 때 어떻게 처리할지 결정한다. 

 

○ true

작업이 이미 실행 중이라면 스레드를 중단시키려고 시도 

○ false

작업이 이미 실행 중이라면 작업을 중단하지 않고 그냥 완료될 때까지 기다린다. 

즉 작업이 이미 시작되지 않았다면 취소되지만, 이미 시작된 작업은 계속 실행 

 

cancel()의 반환 값

○ true

작업이 성공적으로 취소되었음을 의미

취소가 완료되었다는 것은 아직 작업이 시작되지 않았거나, 작업 중이더라도 중단이 가능했음을 뜻한다. 

○ false

false: 작업이 이미 완료되었거나 취소할 수 없는 상태였기 때문에 취소할 수 없다는 뜻

 

만약 취소된 작업의 결과물(future)을 가져오려 하면 예외가 발생한다. 

public class App {
    public static void main(String[] args) throws Exception {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        Callable<String> hello = () -> {
              Thread.sleep(2000L);
              return "Hello";
        };
        Future<String> future = executorService.submit(hello);
        future.cancel(true);
        future.get(); // -> CancellationException
        executorService.shutdown();
    }
}

// 출력 
Exception in thread "main" java.util.concurrent.CancellationException
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:121)
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at com.example.java8.App.main(App.java:26)

 

만약 future.cancel(false); 이렇게 요청하면 결과는 어떻게 나올까?

내 예상은 작업이 이미 submit()하는 순간 시작되었으니까 작업 또한 정상 완료가 되고 결과 조회도 가능하다고 생각했다.

하지만 똑같인 CancellationException이 발생한다.

 

이유

cancel(false)의 의미는 이미 시작된 작업이 있다면 중단하지 않고 계속 실행시키지만, 이후에 그 결과를 요청하지 못하도록 설정

그래서 작업이 완료되어도, 그 작업의 결과를 얻으려고 get()을 호출하면 CancellationException이 발생하는 것

 

 invokeAll() VS  invokeAny()

 

invokeAll() 

사용자의 모든 주식을 조회 주식 a 조회는 1초 주식 b 조회는 3초 걸린다고 가정 

invokeAll()은 어떤 주식 조회가 얼마나 걸리는지와 상관없이 모두 작업(조회)이 완료될 때까지 기다린다. 

즉 어떤 주식이 조회에 오래 걸린다고 가져오지 않는 어이없는 상황이 없는 것처럼 모두 가져와야 하는 상황에서 사용한다. 

 

invokeAny() 

각 서버는 현재 내가 필요한 파일 a를 모두 가지고 있다. 

해당 파일 다운을 각 서버에 요청했을 때 각 서버는 모두 다른 시간이 걸린다. 

이럴 때 필요한 파일 a는 결국 모든 서버에서 보내줄 수 있으니 가장 빠르게 작업(다운)이 완료된 작업 말고는 다른 작업은 필요 없다. 

 

invokeAny() 

public class InvokeTest {
    public static ExecutorService executorService = Executors.newSingleThreadExecutor();
    public static void main(String[] args) throws Exception{
        Callable<String> a = () ->{
            Thread.sleep(1000L);
            return "a";
        };
        Callable<String> b = () ->{
            Thread.sleep(2000L);
            return "b";
        };
        Callable<String> c = () ->{
            Thread.sleep(3000L);
            return "c";
        };
        List<Callable<String>> callables = Arrays.asList(a, b, c);
        invokeAny(callables);

        executorService.shutdown();
    }
    public static void invokeAny(List<Callable<String>> list) throws Exception{
        System.out.println("START invokeAny " + LocalDateTime.now());
        String future = executorService.invokeAny(list);
        System.out.println(future); // a
        System.out.println("END   invokeAny " + LocalDateTime.now()); // 1초
    }
}

// 출력 
START invokeAny 2024-10-05T17:31:58.112584700
a
END   invokeAny 2024-10-05T17:31:59.127564

 

가장 빠른 작업인 a만 출력되고 애초에 내부에서 하나의 future을 가져오고 바로 get()까지 해서 가져온다. 

1초가 걸린다.

 

invokeAll() 

public class InvokeTest {
    public static ExecutorService executorService = Executors.newSingleThreadExecutor();
    public static void main(String[] args) throws Exception{
        Callable<String> a = () ->{
            Thread.sleep(1000L);
            return "a";
        };
        Callable<String> b = () ->{
            Thread.sleep(2000L);
            return "b";
        };
        Callable<String> c = () ->{
            Thread.sleep(3000L);
            return "c";
        };
        List<Callable<String>> callables = Arrays.asList(a, b, c);
        invokeAll(callables);

        executorService.shutdown();
    }
    public static void invokeALL(List<Callable<String>> list) throws Exception{
        System.out.println("START invokeAll " + LocalDateTime.now());
        List<Future<String>> futures = executorService.invokeAll(list);
        for (Future<String> future : futures) {
            System.out.println(future.get());
        }
        System.out.println("END   invokeAll " + LocalDateTime.now()); // 6초
    }
}

// 출력 
START invokeAll 2024-10-05T17:34:10.126199500
a
b
c
END   invokeAll 2024-10-05T17:34:16.165

 

모든 작업이 완료된다. 

동시에 실행된 작업 중에 제일 오래 걸리는 작업 만큼 시간이 걸린다.

하지만 현재 스레드풀은 싱글 스레드이기 때문에 각 작업이 순차적으로

모든 작업시간이 합쳐진 1 + 2 + 3 = 6초가 걸린다. 

 

모든 작업이 완료되면 각 작업의 결과물인 Future 객체의 리스트를 반환한다. 

 

newSingleThreadExecutor -> newFixedThreadPool(3)

// newSingleThreadExecutor -> newFixedThreadPool(3)
public static ExecutorService executorService = Executors.newFixedThreadPool(3);

START invokeAll 2024-10-05T17:38:44.123376200
a
b
c
END   invokeAll 2024-10-05T17:38:47.145500300

전체 작업시간이 가장 오래걸리는 시간인 3초가 걸린다.


CompletableFuture

자바에서 비동기(Asynchronous) 프로그래밍을 가능케하는 인터페이스

Future를 사용해서도 어느정도 가능했지만 하기 힘들 일들이 많았다

 

콜백(callback)

프로그래밍에서 특정 작업이 완료되거나 특정 이벤트가 발생했을 때 호출되는 함수를 의미한다.

일반적으로 비동기 프로그래밍에서 많이 사용되며, 비동기 작업의 결과를 처리하기 위해 특정 함수를 전달하고, 해당 작업이 완료되면 그 함수가 호출된다. 

 

Future로는 하기 어렵던 작업들

  • Future를 외부에서 완료 시킬 수 없다. 물론 cancel() - 취소하거나, get()에 타임아웃을 설정할 수는 있다.
  • 블로킹 코드 get() 를 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.
  • 여러 Future를 조합할 수 없다, 예) Event 정보 가져온 다음 Event에 참석하는 회원 목록 가져오기
  • 예외 처리용 API를 제공하지 않는다.

Future - 비블로킹 콜백 실행 불가

Future의 get() 메서드는 결과가 준비될 때까지 현재 스레드를 블로킹한다.

이는 비동기 프로그래밍의 장점을 활용하기 어렵게 만든다. 

public class App {
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        Future<String> future = executorService.submit(() -> {
            Thread.sleep(10000L); // 10초
            return "hello";
        });
        String str = future.get();// 블록킹 콜 -> 작업이 완료전까지 다음 구문은 진행 x -> 10초를 기다린다.
        System.out.println("str = " + str);
    }
}

 

CompletableFuture

/*
명시적인 Executor 인수가 없는 모든 비동기 메서드는ForkJoinPool.commonPool() 사용하여 수행됩니다
*/
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
	// 생략 ...
}

 

completableFuture에서 명시적으로 Executor를 제공하지 않으면, 모든 비동기 메서드는 기본적으로 ForkJoinPool.commonPool()에서 실행된다.

이 풀은 자바 7부터 도입된 Fork/Join 프레임워크의 일부로, 작업을 동적으로 할당하고, 병렬 처리를 위한 효율적인 스레드 관리 기능을 제공한다. 

 

ForkJoinPool.commonPool()

 

  • 공유 스레드 풀: ForkJoinPool.commonPool()은 모든 CompletableFuture와 같은 비동기 작업이 기본적으로 사용하는 공유 스레드 풀이다. 여러 개의 비동기 작업이 동시에 실행될 수 있도록 여러 스레드를 관리한다. 
  • 스레드 재사용: 이 풀은 스레드를 재사용하여 성능을 최적화하고, 필요에 따라 동적으로 스레드를 생성하거나 종료한다.
  • 스레드 수 제한: 일반적으로 ForkJoinPool.commonPool()의 스레드 수는 시스템의 프로세서 수에 기반하여 설정되며, 이를 통해 CPU의 코어 수에 최적화된 병렬 처리가 가능하다. 

Executor  큐 

Executor의 큐는 일반적으로 BlockingQueue를 사용한다.

ex) ArrayBlockingQueue, LinkedBlockingQueue 등이 있다. 

작업의 목적이 FIFO 방식으로 처리하거나 우선 순위가 있는 작업을 처리할 수 있도록 설계

큐는 작업 요청을 보관하고, 스레드가 작업을 요청할 때 큐에서 꺼내어 실행한다. 

 

ForkJoinPool의 큐

ForkJoinPool은 "work-stealing" 큐를 사용한다.

각 작업자는 자신의 작업을 보관하는 큐를 가지고 있으며, 다른 작업자들이 작업을 수행

복잡한 재귀적 작업을 효율적으로 처리하기 위해 설계되어있다. 

특히 대규모 작업을 여러 개의 작은 작업으로 나누어 병렬 처리할 때 유리하다.

 ForkJoinPool은 주로 Divide-and-Conquer 알고리즘을 사용하여 작업을 처리한다.
각 스레드는 자신의 큐에서 작업을 처리하고, 다른 스레드가 작업이 없을 때 자신의 큐에서 작업을 빼내어 수행할 수 있다.

 

Executor  Queue VS ForkJoinPool Queue 간단 정리 

ForkJoinPool은 작업이 나누어져서 각 스레드 본인의 큐에 추가되고 

스레드는 자신의 큐에서 작업을 꺼내고 

본인의 큐가 비어있으면 다른 스레드의 큐에서 작업을 꺼내는 방식 

 

Executor는 공통 큐가 존재하고 스레드들이 공통 큐에서 FIFO 방식으로 가져오거나 우선순위 큐 방식으로 작업을 가져와 처리한다. 

 

 

CompletableFuture 명시적 완료

public class App {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> future = new CompletableFuture<>();

        // 캐시에서 데이터를 가져온다고 가정
        String cachedData = getDataFromCache(); 
        if (cachedData != null) {
            future.complete(cachedData); 
        } else {
            future.complete("배제우"); 
        }
        System.out.println(future.get()); 
    }
    private static String getDataFromCache() {
        return null; // 캐시에서 값이 없다고 가정
    }
}

 

future.complete()

이 메소드는 CompleteFuture의 결과를 명시적으로 설정한다.

예를 들어, 데이터 베이스에서 가져온 값이나 캐시에서 가져온 값을 기반으로 작업의 결과를 설정할 수 있다. 

 

complete()가 호출되면, 이 CompleteableFuture는 완료 상태로 전환되고, 

이 결과를 기다리는 모든 스레드가 깨어나게 된다. 

즉 get() 메소드를 호출 시 대기하지 않고  즉시 결과를 반환한다. 

 

CompletableFuture.completedFuture() - static 메소드 

주어진 값으로 이미 완료된 새로운 CompletableFuture를 반환한다. 

public class App {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> future = CompletableFuture.completedFuture("배제우");
        System.out.println(future.isDone());
        System.out.println(future.get());
    }
}

// 출력 
true
배제우

 

반환 값이 없는 비동기 작업을 수행 → runAsync()

public class App {
    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
        });
        future.get();
    }
}

// Hello ForkJoinPool.commonPool-worker-1

 

CompletableFuture.runAsync() 메서드를 호출하여 비동기 작업을 생성

해당 메소드는 반환값이 없는 Runnable을 인수로 받는다. 

즉, 이 작업은 결과를 반환하지 않고, 단순히 작업이 수행되면 끝난다.

 

future.get()이 없다면?

비동기 작업은 CompletableFuture.runAsync() 메서드 호출 시 즉시 실행되지만, 메인 스레드가 종료되면 비동기 작업도 종료된다.

future.get()을 호출하면 메인 스레드가 비동기 작업이 완료될 때까지 대기하게 되어, 작업이 정상적으로 실행될 수 있도록 보장한다. (is 블록킹 콜) 

따라서, future.get()이 없으면 비동기 작업이 실행될 가능성이 있지만, 메인 스레드의 종료로 인해 결과를 확인할 수 없게 되는 상황이 발생할 수 있다.

 

반환 값이 있는 비동기 작업을 수행 → supplyAsync()

Supplier<T>를 사용한다. 

public class App {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });
        System.out.println(future.get());
    }
}

 

근데 지금까지 예제로 연습한 코드들은 Future에 Runnable을 넘기고 

future.get() 블록킹 콜을 하는 방식과 똑같다 그래서 지금부터 비동기 콜백을 적용하겠다. 

 

비동기 작업 이후 콜백 수행 반환 값이 있는 경우 thenApply()

Function<T, R> 인터페이스를 사용한다. 

public class App {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        }).thenApply(s -> { // future 다른점 callback 적용 
            System.out.println(Thread.currentThread().getName());
           return s.toUpperCase();
        });

        System.out.println(future.get());
    }
}

// 출력 
Hello ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
HELLO

 

CompletableFuture의 .thenApply() 메서드는 비동기 작업의 결과를 변환하는 데 사용되는 강력한 기능이다.

이 메소드 사용하여 이전 단계의 결과를 기반으로 후속 작업을 수행할 수 있다.

 

비동기 작업 이후 콜백 수행 반환 값이 없는 경우 thenAccept()

Consumer<T> 인페이스를 사용한다. 

public class App {
    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        }).thenAccept(s -> { 
            System.out.println(Thread.currentThread().getName());
            System.out.println(s.toUpperCase());
        });
        future.get()
    }
}

// 출력 
Hello ForkJoinPool.commonPool-worker-1
main
HELLO

 

비동기 작업 이후 콜백 수행 반환 값/ 받는 값 모두 없는 경우 thenRun()

Runnable 인터페이스를 사용한다. 

public class App {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();

        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        }).thenRun(() -> System.out.println("안녕 " + Thread.currentThread().getName()));

        future.get();
    }
}

// 출력 1
Hello ForkJoinPool.commonPool-worker-1
안녕 ForkJoinPool.commonPool-worker-1

// 출력 2
Hello ForkJoinPool.commonPool-worker-1
안녕 main

 

이전 콜백에서 반환하는 값이 필요없고(없거나) 이번 콜백에서 반환하는 값이 없으면 

thenRun()을 사용한다. 

 

thenRun()으로 전달한 콜백은 앞선 콜백을 실행한 쓰레드나 그 쓰레드를 파생시킨 부모에서 실행하게 되어있다.

그래서 여러 번 돌려서 출력을 보면 해당 비동기 작업을 호출한 main이나 앞선 콜백을 실행한 ForkJoinPool.commonPool - worker - 1이 이번 콜백을 실행한 것을 알 수 있다. 

 

부모 쓰레드나 이전 콜백을 실행한 쓰레드가 아닌 쓰레드로 실행하고 싶다면 thenRunAsync를 사용해야 한다.

 

중간 정리: Future와 CompletableFuture의 차이점 ★★

 

  • 블로킹 vs 논 블로킹
    • Future는 작업이 완료될 때까지 get() 메서드를 호출하면 블로킹 상태가 된다.
      즉, 결과가 준비될 때까지 현재 스레드가 대기한다. 
    • 반면, CompletableFuture는 비동기적으로 작업을 수행하며, .thenAccept(), .thenApply()와 같은 메서드를 통해 콜백을 정의할 수 있다.
      이 경우, 메인 스레드는 블로킹되지 않고 다른 작업을 계속 수행할 수 있다.
  • 콜백 및 비동기 처리:
    • Future는 결과를 받기 위해 get() 메서드로 직접 결과를 가져와야 하며, 이를 통해 비동기 처리를 구현하기 어렵다.
    • CompletableFuture는 콜백 메서드를 사용하여 비동기 작업의 결과를 처리할 수 있다. 예를 들어, 작업이 완료되었을 때 자동으로 실행되는 후속 작업을 정의할 수 있다.
  • 명시적 완료:
    • CompletableFuture는 complete() 메서드를 사용하여 작업을 명시적으로 완료할 수 있다.
      이는 다른 스레드(호출한 스레드)나 작업에서 결과를 설정할 수 있게 해준다. 
    • Future는 외부에서 결과를 설정할 수 없으며, 단순히 작업이 완료될 때까지 기다려야 한다.

즉 Future는 비동기 방식이라고 하기에는 호출 스레드가 블로킹 되어서 진정한 비동기 방식 같지가 않았는데
CompletableFuture는 콜백과 명시적 완료 기능을 통해 더 비동기 방식 같이 바뀌었다.

 

CompletableFuture  - ForkJoinPool 사용 X

public class App {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        }, executor).thenRun(() -> System.out.println("안녕 " + Thread.currentThread().getName()));

        future.get();
        executor.shutdown();
    }
}

// 출력 
Hello pool-1-thread-1
안녕 pool-1-thread-1

 

이렇게 supplyAsync나 runAsync 두 번째 인자로 Executor를 넘길 수 있다. 

 

비동기 작업 조합하기  → thenCompose()

두 작업이 서로 이어져서 실행하도록 조합

public class App {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });
        CompletableFuture<String> future = hello.thenCompose(App::getWorld);
        System.out.println(future.get());
    }
    private static CompletableFuture<String> getWorld(String message) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("World " + Thread.currentThread().getName());
            return message + " World";
        });
    }
}

// 출력 
Hello ForkJoinPool.commonPool-worker-1
World ForkJoinPool.commonPool-worker-2
Hello World

 

hello 작업에서는 Hello return 

getWorld(String message)는 메시지를 받아서 합쳐서 return 

두 작업이 이어진다.

 

비동기 작업 조합하기  → thenCombine()

두 작업을 독립적으로 실행하고 둘 다 종료 했을 때 콜백 실행

public class App {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });
        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
            System.out.println("World " + Thread.currentThread().getName());
            return "World";
        });
        
        // BiFunction<T,U,R> 인터페이스
        CompletableFuture<String> future = hello.thenCombine(world, (h, w) -> h + " " + w);
        System.out.println(future.get());
    }
}

 

hello, world 작업을 각각 실행하고 

둘 다 작업이 끝나면 콜백을 실행한다. 

이때 BiFunction<T, U, R> 인터페이스를 사용한다. 

 

allOf() → thenAccept()

여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행

public class App {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });
        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
            System.out.println("World " + Thread.currentThread().getName());
            return "World";
        });
        CompletableFuture<Void> future = CompletableFuture.allOf(hello, world)
                .thenAccept(System.out::println);

        System.out.println(future.get());
    }
}

// 출력 
World ForkJoinPool.commonPool-worker-2
Hello ForkJoinPool.commonPool-worker-1
null
null

 

thenAccept() 메소드는 Consumer<T> 인터페이스를 사용한다.

hello 작업이 만약 Integer를 반환하여 world 가 반환하는 타입인 String과 다를 수도 있고

각 작업이 정상적으로 끝나지 않았을 수도 있다. 

그래서 해당 반환값은 CompletableFuture<Void>가 된다. 

 

즉 모든 작업의 결과를 합쳐서 사용하는 것이 아니라, 단순히 모든 작업의 완료 여부만 확인하는 경우에 유용하다. 

 

allOf()  thenApply()

public class App {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });
        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
            System.out.println("World " + Thread.currentThread().getName());
            return "World";
        });

        List<CompletableFuture<String>> futures = Arrays.asList(hello, world);
        CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[futures.size()]);

        // f.get() checked exception 발생 f.join() unchecked exception 발생

        CompletableFuture<List<String>> futureList = CompletableFuture
                .allOf(futuresArray)
                .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));

        futureList.get().forEach(System.out::println);
    }
}
// 출력 
World ForkJoinPool.commonPool-worker-2
Hello ForkJoinPool.commonPool-worker-1
Hello
World

 

동작 흐름 

작업 2개가 비동기적으로 실행된다. hello, world

allOf() 메소드를 통해서 두 작업이 모두 완료된 이후 콜백 메소드인 thenApply() 메소드가 호출 

thenApply() 메소드는 Function<T, R> 인터페이스를 사용한다. 

이미 모든 작업이 완료되었으니 get() or join()을 통해서 블록킹 되지 않고 가져온다. join() 사용 

해당 결과를  CompletableFuture<List<String>>로 받는다 모두 반환 결과가 String 이기 떄문에 제네릭에 String 명시

타입이 다르면 Object 사용하자 

 

예외 처리

futures.stream().map(CompletableFuture::join) 부분에서 join() 메소드는 결과를 가져오는 과정에서 예외가 발생하면 unchecked exception인 CompletionException을 발생시킨다.

 

get() 메소드는 checked exception을 발생시키므로, throws Exception으로 선언하거나 try-catch 문을 사용해야 한다. 

 

anyOf()

public class App {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });
        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
            System.out.println("World " + Thread.currentThread().getName());
            return "World";
        });

        CompletableFuture<Object> future = CompletableFuture
                .anyOf(hello, world)
                .thenApply(s -> s);
        System.out.println(future.get());
}

 

 

CompletableFuture.anyOf(hello, world) 메소드는 주어진 여러 CompletableFuture 중 하나라도 완료될 때까지 대기한다. 이 메소드는 CompletableFuture<Object>를 반환한다. 

이 반환 값은 hello 또는 world 중 하나의 결과가 된다.

 

예외 처리 → exceptionally

Function<T, R> 인터페이스를 사용한다. 

 

 

public class App {
    public static void main(String[] args) throws Exception {
        boolean throwError = true;
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            if(throwError){
                throw new IllegalStateException();
            }
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        }).exceptionally(ex -> {
            System.out.println(ex);
            return "Error!";
        });
        System.out.println(hello.get());
}        
// 출력 
java.util.concurrent.CompletionException: java.lang.IllegalStateException
Error!

 

예외 처리  handle

BiFunction<T,U,R> 인터페이스를 사용한다. 

public class App {
    public static void main(String[] args) throws Exception {
        boolean throwError = true;
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            if(throwError){
                throw new IllegalStateException();
            }
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        }).handle((result, ex) -> {
            if(ex != null){
                System.out.println(ex);
                return "ERROR!";
            }
            return result;
        });
        System.out.println(hello.get());
}
// 출력 
java.util.concurrent.CompletionException: java.lang.IllegalStateException
ERROR!

 

 


참고 자료 

인프런 백기선님 더 자바, Java 8

 

더 자바, Java 8 강의 | 백기선 - 인프런

백기선 | 자바 8에 추가된 기능들은 자바가 제공하는 API는 물론이고 스프링 같은 제 3의 라이브러리 및 프레임워크에서도 널리 사용되고 있습니다. 이 시대의 자바 개발자라면 반드시 알아야 합

www.inflearn.com

 

Executors (JDK 22 executors 공식 문서)

CompletableFuture 공식 문서 

 

'Computer Sience > Java' 카테고리의 다른 글

[JAVA8] 애노테이션의 변화  (0) 2024.10.07
[JAVA] Annotation  (0) 2024.10.07
[JAVA8] Date & Time  (2) 2024.10.03
[JAVA8] Optional  (0) 2024.10.03
[JAVA8] Stream  (0) 2024.10.02