Haskell Servant + WebSocket で非同期ジョブを作る
とある理由で、Ruby の Resque や Sidekiq のような非同期ジョブを行うホビープログラムを Haskell で自作したくなったので、そのメモ書きです。 リポジトリはこちら:
非同期ジョブプログラム
雰囲気として、非同期ジョブの状態確認や実行を HTTP リクエストで受け取るサーバーを一つと、実際にジョブを処理するクライアントを複数用意します。 そして、サーバーとクライアントは WebSocket で繋がるイメージです。 ジョブの設定はサーバーが適当に YAML で読み込んで、クライアントへ渡します。 とりあえず、ジョブはただ単に docker run
だけすることをゴールにします。
Servant による Web API
まずは、簡単な Web API をサーバー側に定義しておきます。 サーバーと接続したクライアントを返すだけです。 そのためにサーバーと接続したクライアントを表す型を定義しておきます。
module JobWorker.Worker where
import Data.Aeson (FromJSON, ToJSON)
import GHC.Generics (Generic)
newtype Id = Id Int32
deriving newtype (Show, Eq, Ord, Num, FromJSON, ToJSON)
data Worker = Worker
id :: Id
{ working :: Bool
,deriving (Generic, Show, Eq)
}
instance ToJSON Worker
instance FromJSON Worker
GHC 9.2 のレコードドット記法を利用するので id
というフィールド名をつけちゃいます(GHC 9.2 最高)。 Worker
型の値は雑に STM を利用して状態管理することにします。
module JobWorker.DB where
import Control.Concurrent.STM qualified as STM
data DB = DB
workers :: STM.TVar (Map Worker.Id (Maybe Worker))
{
}
new :: IO DB
= DB <$> STM.newTVarIO mempty
new
getAllWorker :: DB -> IO [Worker]
=
getAllWorker db . Map.elems <$> STM.readTVarIO db.workers catMaybes
Maybe Worker
型を利用しているのは、確保した ID が被らないようにクライアントのコネクションが切れたら Nothing
で置き換えるためです。 WebSocket のコネクションを取得して、DB
型に Worker
型の値を保存する部分は後述します。
そして、Web API の定義には Servant を使います。
module JobWorker.Server where
import JobWorker.DB qualified as DB
import Servant
type API = "api" :> WebAPI
api :: Proxy API
= Proxy
api
type WebAPI
= "workers" :> Get '[JSON] [Worker]
server :: DB -> Server API
server db = liftIO (DB.getAllWorker db)
あとは適当に main
を書いて起動し、curl
を使ってリクエストを投げてみます。
$ curl -s localhost:8080/api/workers | jq
[]
Server-Client を WebSocket で繋ぐ
Haskell で WebSocket を利用する簡単な方法は websockets パッケージを使うことです。 まずは Worker
型を拡張してコネクションを持たせます。
module JobWorker.Worker where
import Network.WebSockets qualified as WS
data Worker = Worker
id :: Id
{ conn :: WS.Connection
, working :: Bool
,
}
new :: Id -> WS.Connection -> Worker
= Worker wid conn False
new wid conn
data Info = Info
id :: Id
{ working :: Bool
,deriving (Generic, Show, Eq) }
WS.Connection
型は、さすがに JSON へエンコードできないので、Web API で返す用の型として Worker.Info
型を用意しました。 次に WebSocket サーバー側を定義します。 WebSocket のルーティングを Servant に乗せる簡単な方法は servant-websockets パッケージを使うことです:
module JobWorker.Server where
import Control.Concurrent (threadDelay)
import Control.Exception (finally)
import Control.Monad (forever)
import Network.WebSockets qualified as WS
type API
= "api" :> JobAPI
:<|> "runner" :> WS.WebSocket
server :: DB -> Server API
server db= liftIO (DB.getAllWorker db)
:<|> (liftIO . serveRunner config db)
serveWorker :: DB -> WS.Connection -> IO ()
= do
serveWorker db conn <- DB.connectedWorker db conn
worker `finally` DB.disconnectedWorker db worker.id forever (threadDelay 1_000_000)
まだメッセージのやりとりはせず、ただ db
にコネクションを保存して、threadDelay
でひたすら待ってるだけです。 connectedWorker
が STM に Worker
型の値を保存する関数で、disconnectedWorker
が接続が切れたとして Nothing
を保存する関数です:
module JobWorker.DB where
connectedWorker :: DB -> WS.Connection -> IO Worker
= STM.atomically $ do
connectedWorker db conn <- Map.size <$> STM.readTVar db.workers
maxId let worker = Worker.new (fromIntegral $ maxId + 1) conn
.workers (Map.insert worker.id $ Just worker)
STM.modifyTVar dbpure worker
disconnectedWorker :: DB -> Worker.Id -> IO ()
=
disconnectedWorker db wid $ STM.modifyTVar db.workers (Map.update (\_ -> Just Nothing) wid) STM.atomically
あとはクライアント側を定義します。 websockets パッケージにはクライアント側の関数や型もあります:
module JobWorker.Client where
import Network.WebSockets qualified as WS
data Client = Client
host :: String
{ port :: Int
, path :: String
,
}
-- 'localhost:8080/hoge/fuga' を適当にパースする
new :: String -> Maybe Client
= ...
new dest
run :: Client -> IO ()
= WS.runClient client.host client.port client.path $ \conn ->
run client 15 (pure ()) $ do
WS.withPingThread conn forever (threadDelay 1_000_000)
withPingThread
によって第一引数で与えたコネクションに対し、第二引数で与えた秒毎で Ping を別スレッドで送り続けます。 WebSocket サーバーは一定時間やり取りがない場合にタイムアウトする可能性があり、多くの場合はタイムアウトが60秒なので半分の30秒にして動作させておくと良いと Hackage には書いてあります。 Haskell の Warp の場合(Warp.run
)はデフォルトだと30秒でタイムアウトするようになっているので、半分の15秒にしてみました(サーバー側をいじっても良い)。
これらを適当に Docker イメージ化して、docker-compose で起動した後に curl すると無事クライアントが接続されました:
$ curl -s localhost:8080/api/workers | jq
[
{
"id": 1,
"working": false
},
{
"id": 2,
"working": false
},
{
"id": 3,
"working": false
}
]
WebSocket上のプロトコルを定義
サーバー・クライアント間でやり取りするためのプロトコルっぽいものを定義します。 websockets パッケージの場合は WebSocketsData
型クラスというのがあります。 これを使うことで WebSockets 上でやりとりする ByteString
型と任意の型への相互変換を定義できます。 例えば、サーバーからクライアントへ送るデータ型を次のように定義しました:
module JobWorker.Protocol where
import Data.Aeson qualified as JSON
import Data.Binary qualified as Binary
import Data.ByteString.Lazy qualified as LBS
import JobWorker.Job qualified as Job
import Network.WebSockets qualified as WS
data Server
= JobConfigs [Job.Config]
| Enqueue Job.Id Job.Name
| SUndefined
instance WS.WebSocketsData Server where
WS.Text bl _) = WS.fromLazyByteString bl
fromDataMessage (WS.Binary bl) = WS.fromLazyByteString bl
fromDataMessage (
= case LBS.uncons lbs of
fromLazyByteString lbs Just (1, rest) ->
case JSON.decode rest of
Just configs ->
JobConfigs configs
Nothing ->
SUndefined lbs
Just (2, rest) ->
uncurry Enqueue (Binary.decode rest)
->
_ SUndefined
= case p of
toLazyByteString p JobConfigs configs ->
1 (JSON.encode configs)
LBS.cons
Enqueue wid name ->
2 (Binary.encode (wid, name))
LBS.cons
SUndefined ->
""
クライアントへ送るメッセージの種類は、ByteString
の先頭で判定することにしました。 Job.Config
型は YAML で定義したジョブの設定を読み込んだ型です。 aeson パッケージで JSON にして送ります。 ジョブ(Job
型)はサーバー側で生成して、クライアントへ処理を任せます。 Job.Id
型は、その生成したジョブのユニークIDで、Job.Name
型は Job.Config
にあるジョブの種類を指すユニークIDです。 Binary
型クラスを利用してエンコード・デコードをすることにしました。
クライアントからサーバーへはジョブの状態を返すようなプロトコルを定義しました:
data Client
= JobRunning Job.Id
| JobSuccess Job.Id
| JobFailure Job.Id
| CUndefined
instance WS.WebSocketsData Client where
WS.Text bl _) = WS.fromLazyByteString bl
fromDataMessage (WS.Binary bl) = WS.fromLazyByteString bl
fromDataMessage (
= case LBS.uncons lbs of
fromLazyByteString lbs Just (1, rest) ->
JobRunning (Binary.decode rest)
Just (2, rest) ->
JobSuccess (Binary.decode rest)
Just (3, rest) ->
JobFailure (Binary.decode rest)
->
_ CUndefined
= case p of
toLazyByteString p JobRunning wid ->
1 (Binary.encode wid)
LBS.cons
JobSuccess wid ->
2 (Binary.encode wid)
LBS.cons
JobFailure wid ->
3 (Binary.encode wid)
LBS.cons
CUndefined ->
""
websockets パッケージでメッセージを送受信するには sendBinaryData
関数と receiveData
関数を使います:
-- Server 側
serveWorker :: [Job.Config] -> DB -> WS.Connection -> IO ()
= do
serveWorker configs db conn <- DB.connectedWorker db conn
worker .conn (Protocol.JobConfigs configs)
WS.sendBinaryData worker`finally` DB.disconnectedWorker db worker.id
forever (receive worker) where
= do
receive worker <- WS.receiveData worker.conn
p case p of
Protocol.JobRunning jid ->
.id jid
DB.runningJob db workerProtocol.JobSuccess jid ->
.id jid
DB.successJob db workerProtocol.JobFailure jid ->
.id jid
DB.failureJob db worker->
_ pure ()
-- Client 側
run :: Client -> IO ()
= WS.runClient client.host client.port client.path $ \conn ->
run client 15 (pure ()) $ do
WS.withPingThread conn
forever (receive conn)where
= do
receive conn <- WS.receiveData conn
p case p of
Protocol.JobConfigs configs ->
$ STM.writeTVar client.configs configs
STM.atomically Protocol.Enqueue jid jname ->
runJob conn client jid jname->
_ pure ()
runJob :: WS.Connection -> Client -> Job.Id -> Job.Name -> IO ()
= do
runJob conn client jid jname <- STM.atomically $ STM.readTVar client.configs
configs case List.find (\config -> config.name == jname) configs of
Nothing ->
Protocol.JobFailure jid)
WS.sendBinaryData conn (Just config -> do
Protocol.JobRunning jid)
WS.sendBinaryData conn (
threadDelay 10_000_000Protocol.JobSuccess jid) WS.sendBinaryData conn (
クライアントは、サーバーから受け取った設定の情報を STM で保存することにします。 現状はとりあえず、10秒待って成功のメッセージを返すようにします。 これだけだとまだ、ジョブを積む側の処理がありません。ジョブは Web API を介して積むようにしました:
type JobAPI
= "workers" :> Get '[JSON] [Worker.Info]
:<|> "jobs" :> Get '[JSON] [Job]
:<|> "jobs" :> Capture "name" Job.Name :> Post '[JSON] Job
server :: [Job.Config] -> DB -> Server API
server configs db= (getWorkers :<|> getJobs :<|> kickJob)
:<|> (liftIO . serveRunner config db)
where
= liftIO $ DB.getAllWorkerInfo db
getWorkers = liftIO $ DB.getAllJob db
getJobs =
kickJob name case List.find (\config -> config.name == name) configs of
Nothing ->
throwError err404Just _ -> do
<- liftIO $ randomWorker db
w case w of
Nothing ->
$ err500 { errBody = "worker is not exist." }
throwError Just worker -> liftIO $ do
<- DB.enqueueJob db name
job .conn (Protocol.Enqueue job.id job.name)
WS.sendBinaryData workerpure job
Job
型も Worker
型と同じように STM で管理します(割愛)。 クライアントは複数あるので、ランダムで1つ取得するための処理が randomWorker
関数です。 雑に randomRIO
関数を使ってとってくるだけです:
import System.Random (randomRIO)
randomWorker :: DB -> IO (Maybe Worker)
= do
randomWorker db <- DB.getAllWorker db
workers case workers of
->
[] pure Nothing
-> do
_ <- (\x -> x - 1) <$> randomRIO (1, length workers)
idx pure $ Just (workers !! idx)
Docker イメージを更新して、適当に curl するとこうなります:
$ curl -s localhost:8080/api/workers | jq
[
{
"id": 1,
"working": false
},
{
"id": 2,
"working": false
}
]
$ curl -s localhost:8080/api/jobs | jq
[]
$ curl -s -XPOST localhost:8080/api/jobs/hello-world | jq
{
"id": 1,
"name": "hello-world",
"queuing": true,
"running": false,
"success": false
}
$ curl -s localhost:8080/api/workers | jq
[
{
"id": 1,
"working": false
},
{
"id": 2,
"working": true
}
]
$ curl -s localhost:8080/api/jobs | jq
[
{
"id": 1,
"name": "hello-world",
"queuing": false,
"running": true,
"success": false
}
]
Client にキューを導入
runJob
のところでジョブを実行しても良いですが、そうすると WebSocket が詰まってしまいそうなので、クライアント側に非同期用のキューをサクッと実装します。 Haskell では stm パッケージの TQueue
型を利用することで、簡単に非同期処理用のメッセージキューを実装できます:
module JobWorker.Client where
...
data Client = Client
host :: String
{ port :: Int
, path :: String
, configs :: STM.TVar [Job.Config]
, queue :: STM.TQueue Job -- コレ
,
}
run :: Client -> IO ()
= WS.runClient client.host client.port client.path $ \conn ->
run client 15 (pure ()) $ do
WS.withPingThread conn $ forever (runJob conn client) -- ジョブの処理を非同期にする
forkIO
forever (receive conn)where
= do
receive conn <- WS.receiveData conn
p case p of
Protocol.Enqueue jid jname ->
-- WebSocketからのをそのままキューに積むだけ
$ STM.writeTQueue client.queue (Job.new jname jid)
STM.atomically ...
runJob :: WS.Connection -> Client -> IO ()
= do
runJob conn client -- キューからジョブを取り出す(取り出せるまで待つ)
<- STM.atomically $ STM.readTQueue client.queue
job <- STM.atomically $ STM.readTVar client.configs
configs ...
Docker コンテナの実行
最後に、ジョブを Docker コンテナで実行します。 といっても、ただプロセスを雑に実行するだけです。 外部プロセスを実行するのには process パッケージの eadProcessWithExitCode
関数 を使います:
module JobWorker.Docker where
import JobWorker.Job qualified as Job
import System.Exit (ExitCode)
import System.Process qualified as Process
run :: Job.Config -> IO (ExitCode, String, String)
=
run config "docker" ["run", "--rm", config.image, config.command] "" Process.readProcessWithExitCode
最後の引数 ""
はプロセスに与える標準入力なので空文字列です。 後はこれをクライアントで呼び出して、結果に応じてサーバーへの返答を変えるだけです。