Running concurrent workflows in Ruby

Written by: Maciej Nowak

This article was originally published on Chaps by Maciej Nowak, and with their permission, we are sharing it here for Codeship readers.

Have you ever struggled with a chain of rake tasks that needed to be run periodically and their runtime was giving you a headache? Did you ask yourself how to save time by boosting execution along with readability for huge chunks of jobs? These questions are very common when it comes to applications with hefty amount of background processing jobs.

It is my own experience that inspired me to write this blog post. I had to run literally hundreds of jobs one by one to fetch, normalize and enhance content and refresh it on the page. This normally lasted around 1-2 weeks and every time cost me death of a few cells in my brain.

In addition, the entire process wasn’t properly documented and systematized so in fact sometimes I needed to run the same batch of jobs two or more times.

Solution was closer than I expected and was created by my fellow Chaps developers months ago. Namely why not to group tasks into logical structures and run them concurrently? If they don’t depend on each other I can’t really see any obstacles to parallel execution.

And here arrived Gush. Gush is graph based gem devised to facilitate organization of jobs into self-describing workflows. It uses Sidekiq API to store jobs and dependencies among them.

Installation

To install Gush simply add it to Gemfile:

gem 'gush'

and bundle bundle install

From now on your classes will be able to inherit from two fundamental classes in Gush: Gush::Job and Gush::Workflow. The former defines a single job (which can be compared to a rake task in Rails) and the latter groups all jobs in cohesive workflow.

Gush::Job

Firstly, let’s take a closer look at Gush::Job and how to create simple job. It can not be get any easier than this:

class FetchPosts < Gush::Job
  def work
    Fetcher.fetch :posts
  end
end

It does remind you of something, doesn’t it? Yes, we are defining #work method that is exact counterpart of #perform method in Sidekiq. When a job is ready to go, #work method is simply triggered. No rocket science.

Gush::Workflow

Once jobs are defined we have to build workflow that would gather all jobs in one logical set. Every workflow must inherit from Gush::Workflow and implement configure method.

class PostsAndTopicsProcess < Gush::Workflow
  def configure
    run FetchPosts
    run FetchTopics
    run PersistPosts
    run PersistTopics
    run NormalizeEntities
    run StoreEntities
  end
end

Very basic stuff, but there is one obvious problem with this workflow. Since we haven’t declared any dependencies among jobs, all of them will be triggered at the same time. To set execution order for jobs we can use either before or after param.

class PostsAndTopicsProcess < Gush::Workflow
  def configure
    run FetchPosts
    run FetchTopics
    run PersistPosts, after: FetchPosts
    run PersistTopics, after: FetchTopics
    run NormalizeEntities, after: [PersistPosts, PersistTopics]
    run StoreEntities, after: NormalizeEntities
  end
end

Now it looks much better. We set dependencies on jobs so Gush knows how to trigger consecutive tasks. We are able to define either before or after hook. Names are self-explanatory. I decided to stick to after and not to mix it with before because workflow would become nasty.

What if our jobs or workflow needed additional arguments? Nothing simpler than passing them like this:

class PostsAndTopicsProcess < Gush::Workflow
  def configure(url)
    run FetchPosts, params: { url: url, in_batches: 200 }
    run FetchTopics, params: { url: url, in_batches: 20 }
    run PersistPosts, after: FetchPosts
    run PersistTopics, after: FetchTopics
    run NormalizeEntities, after: [PersistPosts, PersistTopics]
    run StoreEntities, after: NormalizeEntities
  end
end

This is very handy when our workflow has to run the same job two or more times with different params.

Summing up:

First two jobs are independent so they will be started at the same time. Their outcome is required to run PersistPosts and PersistTopics, so they must wait as they depend on the said outcome. Once posts and topics are persisted they can finally be normalized and stored. Params passed to a job can be easily accessed as is in #work method body.

Gush uses Redis as a storage so each one of jobs has its unique id and array of job ids on which they depend. When the workflow is initialized, all these ids are persisted. All wrapped in sidekiq. Cool, isn’t it? No need to explain in detail how Sidekiq handles entire tasks’s logic but over a few past years it proved to be trustworthy partner in our applications so I reckon we may trust it again.

Running workflows

To build and run new workflow we have to fire up sidekiq workers:

bundle exec gush workers

and then create new object of our workflow, save it and start!

workflow = PostsAndTopicsProcess.new
workflow.save
workflow.start!

Gush workers opened in separate window should automatically inform us about newly started jobs:

Its worth checking Gush workers from time to time just to ensure whether whole process goes smoothly and didn’t fail in the middle of the run.

Job states and exception handling

Like in all applications so in ours some jobs might fail. This is natural part of application.

How Gush handles with failures? Pretty straightforward. When exception is raised in one of the jobs it doesn’t mean entire workflow stops. Only the jobs depending on failed ones halt and await while the rest of the graph keeps going.

So, if some jobs have failed, we can deploy fixes and resume the workflow. Gush will retry the failed jobs and proceed once they are successfully completed. To do so we need to find our workflow and use continue method.

workflow = Gush::Workflow.find(<workflow_id>)
workflow.continue

If you don’t like using the API directly, there’s a gush-control gem that adds a web GUI you can run on top of Gush.

Bash commands

To simplify workflows and jobs management, handful of bash commands are provided out of the box. The only requirement to run them is to create Gushfile.rb file that loads application files. Gushfile obviously should require all your jobs and workflows. For example:

require_relative './lib/your_project'
Dir[Rails.root.join("app/workflows/**/*.rb")].each do |file|
  require file
end

Great. Having Gushfile.rb required we can use a couple of very handy commands:

bundle exec gush list
+--------------------------------------+---------------+-------------------+
|                  id                  |     name      |      status       |
+--------------------------------------+---------------+-------------------+
| 62b2a811-dff6-47ce-a183-db596475c431 | PostsProcess  |       done        |
| 73ea0e32-6350-4d68-814b-fb70e6f5405e | TopicsProcess |      failed       |
+--------------------------------------+---------------+-------------------+
bundle exec gush show 73ea0e32-6350-4d68-814b-fb70e6f5405e
+----------------+--------------------------------------+
|       ID       | 73ea0e32-6350-4d68-814b-fb70e6f5405e |
+----------------+--------------------------------------+
|      Name      | TopicsProcess                        |
+----------------+--------------------------------------+
|      Jobs      | 4                                    |
+----------------+--------------------------------------+
|  Failed jobs   | 1                                    |
+----------------+--------------------------------------+
| Succeeded jobs | 2                                    |
+----------------+--------------------------------------+
| Enqueued jobs  | 1                                    |
+----------------+--------------------------------------+
|  Running jobs  | 0                                    |
+----------------+--------------------------------------+
| Remaining jobs | 2                                    |
+----------------+--------------------------------------+
|   Started at   | 1452580414                           |
+----------------+--------------------------------------+
|     Status     | failed                               |
|                | TopicsNormalize failed               |
+----------------+--------------------------------------+
Jobs list:
[X] TopicsNormalize
[✓] TopicsFetch
[✓] TopicsPersist
[ ] TopicsStore

We might even visualize our workflow by running: bash bundle exec gush viz TopicsProcess

This basically builds graphical version of workflow showed in previous chapter.

Final comments

And that’s it. Gush does not come as fairy to solve problems with job performance itself but with relatively small overhead may significantly upscale your background processing by parallelising it. If you find this library interesting and useful in your environment but some features are not implemented yet, feel free to drop us an issue or pull request: https://github.com/chaps-io/gush. We are open to improvement.

Stay up to date

We'll never share your email address and you can opt out at any time, we promise.