ruby-gem 'Resque' コードリーディング
Resque is a Redis-backed library for creating background jobs, placing those jobs on multiple queues, and processing them later.
Flowchart Maker & Online Diagram Software で作った図。
guiアプリから(引数が毎回異なる)パッチジョブをバックグラウンドで起動・管理する必要があったので、何をやっているのか知るために Resque を調べた。
概略
- Jobをenqueueするプログラムと処理するプログラム(worker)は別
- Jobは
class Job1 def perform end end
のようにperformメソッドで実質定義され、Redis(key-value型オンメモリDB)に保存される - workerはループして仕事を順次処理していくrakeタスク
詳細
/lib 以下がメインコード
worker.rb
tasks.rb
job.rb
euqueue
- enqueueするプログラムは、enqueueした後即時に終了してよい
- Jobは QUEUE ごとに管理されるが、Job定義のクラスのインスタンス変数にシンボルを代入しておくことで
class Archive @queue = :file_serve def self.perform(repo_id, branch = 'master') repo = Repository.find(repo_id) repo.create_archive(branch) end end
のように定義される
worker
# The following events occur during a worker's life cycle:
# 1. Startup: Signals are registered, dead workers are pruned, # and this worker is registered.
# 2. Work loop: Jobs are pulled from a queue and processed.
# 3. Teardown: This worker is unregistered.
- 環境変数QUEUEを
QUEUE=default rake resque:work
のように指定しておくことで、ワーカーがどの種類のジョブを処理するのか指定できる。 ワイルドカード "*" も指定できる。 - workerは複数起動できる
- workerは子プロセスを
fork
して、 そのプロセスをwait
する。これをループするのがworker。説明 => why fork? - "$0"を書き換えることで
ps
したときに表示される情報をみやすくしている。
resque-web
resque-web
とすると、
- worker 一覧
- successed/ failed Job 一覧
などを表示してくれる sinatra-application が立ち上がる。これも rake-task で表側が実装されてる。
過去のタスクの詳細は表示されない。(そもそも過去のタスクは、ログファイルの形以外で記録されていない。)
知りたかったこと
Redisが保存している情報
- queues ob jobs (queue of jobs が複数ある)
- set of workers
- heartbeats of workers (後述) など。
Q.どうやってlinuxプロセスを追跡しているか.特に、pid=~~~ を保持していたとして、それが本当に自分が起動したworkerなのか、予期せずworkerが終了した後に起動した、pidが偶然一緒になったまったく関係ないプロセスなのかをどうやって区別するのか
A. redisの set に Worker#to_s(pidの情報を含む) を登録しておくことで、 workerのidentityを確保している。
- 合計プロセス数は (worker , その子プロセス) * 起動したワーカーの数
- 各ワーカーは Redis にStringで保存され、 idで区別される。
# The string representation is the same as the id for this worker # instance. Can be used with `Worker.find`. def to_s @to_s ||= "#{hostname}:#{pid}:#{@queues.join(',')}" end alias_method :id, :to_s
to_sが暗黙に呼ばれる箇所(多分)
# Registers ourself as a worker. Useful when entering the worker # lifecycle on startup. def register_worker redis.pipelined do redis.sadd(:workers, self) started! end end
worker一覧
class Worker def self.all Array(redis.smembers(:workers)).map { |id| find(id, :skip_exists => true) }.compact end end
ps を使って workerのpid一覧表示
# Returns an Array of string pids of all the other workers on this # machine. Useful when pruning dead workers on startup. def worker_pids if RUBY_PLATFORM =~ /solaris/ solaris_worker_pids elsif RUBY_PLATFORM =~ /mingw32/ windows_worker_pids else linux_worker_pids end end
いろいろ見た結果、
- workerが fork した子プロセスの面倒を必ずみる。それは、 signal を trap することで保障される。
- 新しいworkerが起動したタイミングで、 dead-worker が切り取られる(pruned)。具体的には、Worker.all のうち、死んでいるものは kill する
- dead-workerの判定は heartbeatを通して行われる.
heartbeat
worker が作成された時に、 redisにおいてあるタイムスタンプを更新するスレッドを作成。もしワーカーが死んだら、このスレッドも一緒に死に、redisのタイムスタンプが更新されなくなるので、それを検知する。
つまり、 worker自身に一定時間毎に生存報告をさせて、 一定時間以上報告がなかったらdead判定する。 worker自体に生存報告をさせるアイデアは、はてぶにあがってた何かのスライドで見たけどどれだったか覚えてない。
def heartbeat!(time = Time.now) redis.hset(WORKER_HEARTBEAT_KEY, to_s, time.iso8601) end
def start_heartbeat heartbeat! @heart = Thread.new do loop do sleep(Resque.heartbeat_interval) heartbeat! end end end
to_s はさっき出てきた Worker のインスタンスメソッド。hsetのhはhash、hsetの引数は(hashname, key, val)
ps: linux - Is it reasonable to use resque(ruby) to manage external long-running commands (and log tasks) - Stack Overflow 意気揚々とstackoverflowに質問したのに0回答で悲しい