Leveraging Sidekiq’s middleware & API for Application DSL

[Graduating from startup to scaleup means having to constantly evolve your applications to keep pace with your customers’ growth. In this Freshworks Engineering series, Rails@Scale, we talk about some of the techniques we employ to scale up our products.]

In one of our previous blogs — Sidekiq queue management using custom middleware — we delved into how Freshworks’ products had Sidekiq implemented as a background processing system. We also briefly touched upon leveraging Sidekiq’s middlewares for intelligent routing and efficient use of our infrastructure. In this blog, we will go deeper into some Domain Specific Languages (DSL) related to certain applications within Freshworks that we have implemented on top of Sidekiq’s Middleware and APIs.

Freshworks builds multitenant SaaS applications and has had to adopt a multi-tenancy architecture from Day One. To ensure that our multitenant architecture scales across different teams and modules, we defined our own pipeline for enqueuing jobs to sidekiq and leveraging custom middlewares and Sidekiq’s API. This helped us ensure proper defaults were in place. The defaults cover both multi-tenancy and robustness.

Leveraging Sidekiq API for DSL

With multitenant architectures, there is always a global context and a tenant-specific context. The same logic applies to background jobs as well. There will be tenant specific jobs and there will be application-specific jobs. To have a clear separation between them, we introduced custom classes to enqueue jobs to these “virtual buckets”. This helped developers both new and existing get a grasp of what context particular jobs are enqueued and processed under. These classes also set a handful of “defaults” that are app-wide and overridden in specific worker classes.

SidekiqWorker interacts with the Sidekiq API to push a job to redis, which would be picked by the sidekiq servers. It also sets some high level attributes like request UUID, tenant_id to the message which will be utilised by our custom middlewares.

# app/workers/sidekiq_worker.rb

# frozen_string_literal: true

class SidekiqWorker
  # Pushes a job to sidekiq client for processing.
  #
  # @param class_name [Class] The worker class to be enqueued
  # @param params [Hash] Params to the Sidekiq Job
  # @param belongs_to_tenant [Boolean] flag to identify whether the job
  #   is tenant specific. If true, sets the tenant_id to the job args
  # @param at [Number] timestamp value for enqueuing in the future
  def self.enqueue(class_name, params, belongs_to_tenant, at)
    args = {}
    args['args'] = Array.wrap(params)
    args['belongs_to_tenant'] = belongs_to_tenant
    args['tenant_id'] = Tenant.current.id if belongs_to_tenant
    args['uuid'] = # request UUID
    args['class'] = class_name
    args['at'] = at if at
    Sidekiq::Client.push(args)
  end
end

BaseWorker is the base class for all worker classes that hold some common functionality and the application DSL for enqueuing jobs from the application. It also sets the context that worker classes inheriting from BaseWorker by default get the tenant context.

# app/workers/sidekiq/base_worker.rb

# frozen_string_literal: true

module Sidekiq
  class BaseWorker
    include Sidekiq::Worker

    # specific options can be overridden in the child classes.
    sidekiq_options queue: :default, retry: 0, backtrace: 10

    sidekiq_retries_exhausted do |msg, _e|
      # default retries exhausted logging here.
    end

    BELONGS_TO_TENANT = true
    private_constant :BELONGS_TO_TENANT

    class << self
      def enqueue(args = {}, at = nil)
        SidekiqWorker.enqueue(self.name, args, self::BELONGS_TO_TENANT, at)
      end

      def enqueue_at(at, args = {})
        int = at.to_f
        now = Time.now.to_f
        at = (int < 1_000_000_000 ? now + int : int)

        # Optimization to enqueue something now that is scheduled to go out now or in the past
        at = nil if at <= now

        enqueue(args, at)
      end
      alias enqueue_in enqueue_at
    end
  end
end

GlobalWorker inherits from BaseWorker but does not set the tenant context, i.e. it runs in the application context.

# app/workers/sidekiq/global_worker.rb

# frozen_string_literal: true

module Sidekiq
  class GlobalWorker < BaseWorker
    BELONGS_TO_TENANT = false
    private_constant :BELONGS_TO_TENANT
  end
end

Enough definitions, let’s look at an example:

# app/workers/sidekiq/tenant_registration_worker.rb

# frozen_string_literal: true

module Sidekiq
  class TenantRegistrationWorker < GlobalWorker
    sidekiq_options queue: :signup, retry: 3

    def perform(args)
      send(args[:action], args)
    end

    private

    def create_tenant(args)
      TenantRegistration.new(args).create!
    end

    def destroy_tenant(accargs_info)
      TenantRegistration.new(args).destroy!
    end
  end
end

# tenants_controller.rb

def create
  # post processing
  Sidekiq::TenantRegistrationWorker.enqueue(create_tenant_args)
end

def destroy
  # post processing
  Sidekiq::TenantRegistrationWorker.enqueue(destroy_tenant_args)
end

The TenantRegistrationWorker inherits GlobalWorker to register/deregister a tenant to the global records. Here, a tenant might not be available when the worker is processing a job in the background, hence we skip the tenant context. All job-related payloads are enqueued and made available to the worker.

Leveraging Sidekiq API for better job routing & processing

Imagine walking into a crowded restaurant and finding out that the Chef’s Special was being made by one cook while all the other varieties vested with another.. If you were to go for the Special, a choice a lot of other customers might be ordering as well, the wait time would be much longer compared to a non-Special pizza.

Now if the kitchen were to have more cooks skilled in making all the recipes, the restaurant owner would allocate a majority of them to make the Chef’s Special and the rest for making the other varieties, evenly distributed among them. This way, they each know what they have to make and are able to process the loads faster, thereby making delightful experiences for the customers. The restaurant owner would be able to make an informed decision on load splitting based on the historical order patterns. 

Similarly, serving multiple tenants across different subscription plans, using the application at different levels, we wanted to ensure that jobs were evenly distributed. Based on usage patterns, we created multiple subscription-specific queues for some high velocity jobs like automations, notifications, events, and so on. To handle this the traditional way would mean we create duplicate worker classes with just the queue names differing. This would also mean adding checks across the codebase before enqueuing jobs. This wouldn’t scale for large teams; it would also be easy to miss. Hence, we decided to create a third Sidekiq worker class to define this behaviour. We had to modify SidekiqWorker and BaseWorker classes slightly to accommodate this third new class.

# app/workers/sidekiq/load_balanced_worker.rb

# frozen_string_literal: true
module Sidekiq
  class LoadBalancedWorker < BaseWorker
    def self.enqueue(args = {}, at = nil)
        queue_name = SidekiqWorker.load_balanced_queue(sidekiq_options['queue'])
        SidekiqWorker.enqueue(self.name, args, self::BELONGS_TO_ACCOUNT, queue_name, at)
      end
  end
end

# app/workers/sidekiq_worker.rb

# frozen_string_literal: true

class SidekiqWorker
  # Pushes a job to sidekiq client for processing.
  #
  # @param class_name [Class] The worker class to be enqueued
  # @param params [Hash] Params to the Sidekiq Job
  # @param belongs_to_tenant [Boolean] flag to identify whether the job
  #   is tenant specific. If true, sets the tenant_id to the job args
  # @param at [Number] timestamp value for enqueuing in the future
  def self.enqueue(class_name, params, belongs_to_tenant, queue, at)
    args = {}
    args['args'] = Array.wrap(params)
    args['belongs_to_tenant'] = belongs_to_tenant
    args['tenant_id'] = Tenant.current.id if belongs_to_tenant
    args['uuid'] = # request UUID
    args['class'] = class_name
    args['queue'] = queue if queue
    args['at'] = at if at
    Sidekiq::Client.push(args)
  end

  def self.load_balanced_queue(queue_name)
    return queue_name unless Tenant.current 
    “queue_name_#{tenant.state_name}”
  end
end

# app/workers/sidekiq/notifications_worker.rb

# frozen_string_literal: true

module Sidekiq
  class NotificationsWorker < LoadBalancedWorker
    sidekiq_options queue: :notifications, retry: 3

    def perform(args)
      # DSL
    end
  end
end

# some_model.rb

class SomeModel < ApplicationRecord
  ...
  after_commit -> { Sidekiq::NotificationsWorker.enqueue({id: id, model: ‘some_model’}) }
  ...
end

When enqueuing jobs of workers that inherit LoadBalancedWorker, the jobs automatically get load balanced based on the current tenant’s subscription state. This allows us to better distribute jobs and ensure that optimal resources are allocated for jobs per subscription state. This also gives us the virtual separation between our free, trial, paid and premium customers without creating duplicate child classes or any additional code maintenance.

Leveraging the Middleware chain

As mentioned in the previous sections, we use custom middlewares for handling multi-tenancy and ensuring that our background servers are running robustly. The first one is as below:

# lib/middleware/sidekiq/server/belongs_to_tenant.rb

# frozen_string_literal: true

module Middleware
  module Sidekiq
    module Server
      class BelongsToTenant
        def call(_worker, msg, _queue, &block)
          if msg['belongs_to_tenant']
            set_current_tenant(msg['tenant_id'], &block)
          else
            yield
          end
        end

        private

        def set_current_tenant(tenant_id)
          shard = Shard.by_tenant_id(tenant_id)
          return unless shard

          shard.make_current
          DatabaseProxy.on_current do
            tenant = Tenant.by_id(tenant_id)
            return unless tenant

            tenant.make_current
	        yield
          end
        end
      end
    end
  end
end

More about the multi-tenant architecture and the associated classes like Tenant and Shard can be found in our previous blog – Horizontal sharding in a multi-tenant app with Rails 6.1. This middleware takes care of two things: 

  1. If the job’s message has the belongs_to_tenant attribute set
  2. Then look for a tenant_id and set the tenant context before ‘yielding’ to the job.

The next middleware takes care of setting up the message to be handed over to the actual worker and some post-processing:

# lib/middleware/sidekiq/server/worker_wrapper.rb

# frozen_string_literal: true

module Middleware
  module Sidekiq
    module Server
      class WorkerWrapper
        include DebugLogging

        def call(_worker, job, _queue, &block)
          params = job['args'].first
          params.deep_symbolize_keys!

          job['start_time'] = Time.zone.now.to_f

          set_debug_mode

          Rails.logger.tagged(job['jid']) do
            block.call
          end

          job['response_time'] = (Time.zone.now.to_f - job['start_time']).round(2)
          log_details(job, params)
        ensure
          revert_debug_mode
          RequestStore.clear!
        end

        private

        def log_details(job, params)
          # log job details in a structured format for log processing systems & alerting
        end
      end
    end
  end
end

 

The DebugLogging module allows us to enable debug logging on the fly at runtime by leveraging memoize_until. We have covered this as part of an earlier blog – Optimizing cache with MemoizeUntil.

All our job params are expected to be symbolised hashes. This is standardised(enforced) through this middleware. We leverage the request UUID set in SidekiqWorker here for tagged logging. This helps with better traceability – being able to map background jobs with the web request that was responsible for enqueuing the said job. Once the job is executed, we track the time taken by the worker class for “actual execution”. This information along with other metadata like worker class, queue, tenant details are logged in a structured format. This log is processed by our internal log processing tools for monitoring, alerting and scaling. The last responsibility of this class is to ensure that all thread local state is cleared post processing. We use a PORO class on top of RequestStore for maintaining thread locals. 

We at Freshworks are always committed to ensuring that we constantly evolve our processes and applications to make delightful experiences for our customers using our suite of products.