At BetterDoc we started extracting some parts of our Rails monolith to services and decided to use Kafka as our messaging system.

We wrote a script that will allow us to produce “inital” events so new services can quickly pick up current state. Idea was also to have an easy way to “reset” if something goes out of sync.

We started with something like this (simplified for brevity):

Inquiry.find_each do |inquiry|
  # Code to produce init events to kafka
end

It was imidietally clear that this is prone to race conditions cause we are producing update events same time (when update happens in main database). We decided to use database lock to fix this.

Rails (ActiveRecord) provides two locking mechanisms out of the box:

Optimistic Locking assumes that conflict is very rare to happen. It uses a version number of the record to track the changes. It raises ActiveRecord::StaleObjectError error when other user tries to update the record while it is locked. To implement it just add a lock_version column to the table you want to place the lock and Rails will automatically check this column before updating the record.

Pessimistic locking on the other hand assumes that conflict is very likely to happen. It locks the record until the transaction is done. If the record is currently locked and the other user make a transaction, that second transaction will wait until the lock in first transaction is released.

We didn’t expect that race condition will happen that often but in the end we prefered pessimistic locking for the following reasons.

  • No need to add new database column (lock_version).
  • We didn’t want to deal with the conflict by rescuing ActiveRecord::StaleObjectError the exception and applying logic needed to resolve the conflict.

First locking implementation looked like this and it worked :)

Inquiry.find_each do |inquiry|
  inquiry.with_lock do
    # Code to produce init events to kafka
  end
end

With calling with_lock we were race conditions safe. It works by starting a database transaction (BEGIN..COMMIT) and then locks the selected row against concurrent updates (FOR UPDATE). This prevents this row from being modified or deleted by other transactions until the current transaction ends.

BEGIN
  SELECT "inquiries".*
  FROM "inquiries"
  WHERE "inquiries"."id" = 1234
  LIMIT 1
  FOR UPDATE
COMMIT

We can also see that our record is reloaded in memory (“SELECT”) so that values on the record match those in the locked database row. This will prevent others from modifying or deleting that row and anyone else trying to acquire a lock will have to wait for the lock to be released.

But there was still one issue with this approach. We were first getting batch of records from database with find_each and then running additional query for each record to lock it. We should actually lock initally when loading records.

So we switched to using lock query method.

Inquiry.lock.find_each do |inquiry|
  # Code to produce init events to kafka
end
SELECT "inquiries".* FROM "inquiries" ORDER BY "inquiries"."id" ASC LIMIT 1000 FOR UPDATE
SELECT "inquiries".* FROM "inquiries" WHERE ("inquiries"."id" > 1000) ORDER BY "inquiries"."id" ASC LIMIT 1000 FOR UPDATE

We quickly realized that this does not open new transaction and therefore lock does not work. To fix it we need to wrap it in transaction.

Inquiry.transaction do
  Inquiry.lock.find_each do |inquiry|
    # Code to produce init events to kafka
  end
end
BEGIN
  SELECT "inquiries".* FROM "inquiries" ORDER BY "inquiries"."id" ASC LIMIT 1000 FOR UPDATE
  SELECT "inquiries".* FROM "inquiries" WHERE ("inquiries"."id" > 1000) ORDER BY "inquiries"."id" ASC LIMIT 1000 FOR UPDATE
COMMIT

So looks like we finally have all we wanted, we are free of race conditions and we lock multiple rows at once for each batch, looks like we did a great job.

Or maybe we just introduced major issue? Do you see the problem?

Since lock is not released until transaction ends we will not be able to write to inquiries table and put our app down until find_each finish all batches. To fix it we should change our code to start a new transaction for each batch.

Inquiry.in_batches(of: 500).each do |relation|
  relation.transaction do
    relation.lock.each do |inquiry|
      # Code to produce init events to kafka
    end
  end
end
SELECT "inquiries"."id" FROM "inquiries" ORDER BY "inquiries"."id" ASC LIMIT 500
BEGIN
  SELECT "inquiries".* FROM "inquiries" WHERE "inquiries"."id" IN (1, 2, 3, ...) FOR UPDATE
COMMIT

SELECT  "inquiries"."id" FROM "inquiries" WHERE ("inquiries"."id" > 500) ORDER BY "inquiries"."id" ASC LIMIT 500
BEGIN
  SELECT "inquiries".* FROM "inquiries" WHERE "inquiries"."id" IN (501, 502, 503, ...) FOR UPDATE
COMMIT

Keep in mind that you should adjust batch size depending on a time it takes to run one batch not to lock your database rows for too long.

Conclusion

I would say that Rails has all you need to make tasks like this super easy. It took much more to write this blog post then to actually implement it :)

Having said that, it is sometimes hard to test this stuff. So you should be careful with the implementation cause you could easily slow your app down or have a wrong impression that you fixed it (missing transaction).

Two rules to remember!

  • Never user lock and lock! methods without transaction.
  • Do not lock your row(s) for too long.

Credits