Async 시작

Posted on March 25, 2021

왜 그냥 fork를 쓰지 않고, async가 필요한지 알아 보겠습니다.

IO a 결과값 받아오기

forkIO :: IO () -> IO ThreadId

forkIO는 받는 작업이 IO a 가 아니라, IO () 입니다. 결과값이 없는 IO작업만 받습니다.
하스켈에선 한 쓰레드에서 다른 쓰레드의 결과값에 바로 접근할 수 없습니다. 그러니 당연히 결과값이 없는 IO () 작업만 받으면 됩니다.
그럼, 결과값이 필요한 IO a 작업은 어떻게 fork 시킬까요?

기본 아이디어는 IO a의 결과값 a타입 값을 받아 올 MVar 변수를 만들어, IO 작업의 결과를 거기에 넣는 작업을 만드는 것입니다.

import Control.Concurrent
import Control.Concurrent.STM

-- IO () 타입이 아니기 때문에 바로 fork 할 수 없습니다.
oneChar :: IO Char
oneChar = do
    c <- getChar
    return c

-- oneChar 를 fork해서 실행한다면 결과값을 어떻게 받아 올까요?
-- 애초에 fork 는 IO () 타입 작업만 받으니, 이대로는 fork할 수 없습니다.

-- 아래와 같이 원래 작업을 부르는 IO () 타입의 작업을 만듭니다.
-- 이 작업은 MVar 성격의 변수를 받아 작업 결과를 그 MVar에 넣기만 하니, 결과 타입은 IO () 입니다.
oneCharFork :: TMVar Char -> IO ()
oneCharFork resultVar = do
    result <- oneChar
    atomically $ putTMVar resultVar result

main :: IO ()
main = do
    v <- newEmptyTMVarIO
    tid <- forkIO (oneCharFork v)  -- fork할 때, 외부에서 만든 MVar를 같이 넘깁니다.
    r <- atomically (readTMVar v)
    putChar r

위와 같이 작업하면 다른 쓰레드의 결과값을 받아 올 수 있습니다. 소통 채널로 MVar를 쓰는 형태로 만들 수 밖에 없습니다.

async 함수도 같은 아이디어로 되어 있고, 예외 핸들링을 위해 mask로 짜여져 있습니다. 그리고, 나중에 결과값에 접근 할 수 있도록 MVar 변수와 쓰레드 ID를 묶은 Async값을 만들어 리턴합니다. 이제 나중에 Async 안에 들어있는 MVar 값에 Read를 걸어두면, 결과값을 가져올 수 있고, 쓰레드가 종료하는 걸 알아낼 수도 있습니다.

async :: IO a -> IO (Async a)
async action = do
  m <- newEmptyVar
  t <- mask $ \restore -> forkIO
    (do -- 원래 IO a 였던 액션을 IO () 으로 바꾸고, 타입 a 값은 MVar에 담아 놓습니다.
      r <- try (restore action)
      putMVar m r
    )
  return (Async t m)

Async 작업 체인을 만드는 withAsync의 구현을 보면,

withAsync :: IO a -> (Async a -> IO b) -> IO b
withAsync = inline withAsyncUsing rawForkIO
-- bracket을 써서 구현해도 되지만 느려서 hand-coding 했다고 합니다.
withAsyncUsing :: (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing doFork = \action inner -> do
  var <- newEmptyTMVarIO
  mask $ \restore -> do
    t <- doFork $ try (restore action) >>= atomically . putTMVar var 
                                  -- 이 부분이 async 구현이랑 같습니다.
    let a = Async t (readTMVar var)
    r <- restore (inner a) `catchAll` \e -> do
      uninterruptibleCancel a
      throwIO e
    uninterruptibleCancel a
    return r

fpcomplete async 문서
withAsync 실제 소스

Github 계정이 없는 분은 메일로 보내주세요. lionhairdino at gmail.com