読者です 読者をやめる 読者になる 読者になる

夜が静寂を取り戻す頃

日記 << 考えや感情. 外に発散する場として. たまに備忘録

ruby-gem 'Resque' コードリーディング

Resque is a Redis-backed library for creating background jobs, placing those jobs on multiple queues, and processing them later.

f:id:asaokiru:20160402004216p:plain

Flowchart Maker & Online Diagram Software で作った図。

guiアプリから(引数が毎回異なる)パッチジョブをバックグラウンドで起動・管理する必要があったので、何をやっているのか知るために Resque を調べた。

概略

  • Jobをenqueueするプログラムと処理するプログラム(worker)は別
  • Jobは class Job1 def perform end end のようにperformメソッドで実質定義され、Redis(key-value型オンメモリDB)に保存される
  • workerはループして仕事を順次処理していくrakeタスク

詳細

/lib 以下がメインコード
worker.rb
tasks.rb
job.rb

euqueue

  • enqueueするプログラムは、enqueueした後即時に終了してよい
  • Jobは QUEUE ごとに管理されるが、Job定義のクラスのインスタンス変数にシンボルを代入しておくことで
class Archive
  @queue = :file_serve
  def self.perform(repo_id, branch = 'master')
    repo = Repository.find(repo_id)
    repo.create_archive(branch)
  end
end

のように定義される

worker

# The following events occur during a worker's life cycle:
# 1. Startup: Signals are registered, dead workers are pruned, # and this worker is registered.
# 2. Work loop: Jobs are pulled from a queue and processed.
# 3. Teardown: This worker is unregistered.

  • 環境変数QUEUEを QUEUE=default rake resque:work のように指定しておくことで、ワーカーがどの種類のジョブを処理するのか指定できる。 ワイルドカード "*" も指定できる。
  • workerは複数起動できる
  • workerは子プロセスを fork して、 そのプロセスを wait する。これをループするのがworker。説明 => why fork?
  • "$0"を書き換えることで ps したときに表示される情報をみやすくしている。

resque-web

resque-web

とすると、

  • worker 一覧
  • successed/ failed Job 一覧

などを表示してくれる sinatra-application が立ち上がる。これも rake-task で表側が実装されてる。
過去のタスクの詳細は表示されない。(そもそも過去のタスクは、ログファイルの形以外で記録されていない。)

知りたかったこと

Redisが保存している情報

  • queues ob jobs (queue of jobs が複数ある)
  • set of workers
  • heartbeats of workers (後述) など。

Q.どうやってlinuxプロセスを追跡しているか.特に、pid=~~~ を保持していたとして、それが本当に自分が起動したworkerなのか、予期せずworkerが終了した後に起動した、pidが偶然一緒になったまったく関係ないプロセスなのかをどうやって区別するのか

A. redisの set に Worker#to_s(pidの情報を含む) を登録しておくことで、 workerのidentityを確保している。

  • 合計プロセス数は (worker , その子プロセス) * 起動したワーカーの数
  • 各ワーカーは Redis にStringで保存され、 idで区別される。
    # The string representation is the same as the id for this worker
    # instance. Can be used with `Worker.find`.
    def to_s
      @to_s ||= "#{hostname}:#{pid}:#{@queues.join(',')}"
    end
    alias_method :id, :to_s

to_sが暗黙に呼ばれる箇所(多分)

 # Registers ourself as a worker. Useful when entering the worker
    # lifecycle on startup.
    def register_worker
      redis.pipelined do
        redis.sadd(:workers, self)
        started!
      end
    end

worker一覧

class Worker
    def self.all
          Array(redis.smembers(:workers)).map { |id| find(id, :skip_exists => true) }.compact
    end
end

ps を使って workerのpid一覧表示

  # Returns an Array of string pids of all the other workers on this
    # machine. Useful when pruning dead workers on startup.
    def worker_pids
      if RUBY_PLATFORM =~ /solaris/
        solaris_worker_pids
      elsif RUBY_PLATFORM =~ /mingw32/
        windows_worker_pids
      else
        linux_worker_pids
      end
    end

いろいろ見た結果、

  • workerが fork した子プロセスの面倒を必ずみる。それは、 signal を trap することで保障される。
  • 新しいworkerが起動したタイミングで、 dead-worker が切り取られる(pruned)。具体的には、Worker.all のうち、死んでいるものは kill する
  • dead-workerの判定は heartbeatを通して行われる.
heartbeat

worker が作成された時に、 redisにおいてあるタイムスタンプを更新するスレッドを作成。もしワーカーが死んだら、このスレッドも一緒に死に、redisのタイムスタンプが更新されなくなるので、それを検知する。
つまり、 worker自身に一定時間毎に生存報告をさせて、 一定時間以上報告がなかったらdead判定する。 worker自体に生存報告をさせるアイデアは、はてぶにあがってた何かのスライドで見たけどどれだったか覚えてない。

  def heartbeat!(time = Time.now)
      redis.hset(WORKER_HEARTBEAT_KEY, to_s, time.iso8601)
    end
    def start_heartbeat
      heartbeat!
      @heart = Thread.new do
        loop do
          sleep(Resque.heartbeat_interval)
          heartbeat!
        end
      end
    end

to_s はさっき出てきた Worker のインスタンスメソッド。hsetのhはhash、hsetの引数は(hashname, key, val)

ps: linux - Is it reasonable to use resque(ruby) to manage external long-running commands (and log tasks) - Stack Overflow 意気揚々とstackoverflowに質問したのに0回答で悲しい