At BetterDoc we started extracting some parts of our Rails monolith to services and decided to use Kafka as our messaging system.
We wrote a script that will allow us to produce “inital” events so new services can quickly pick up current state. Idea was also to have an easy way to “reset” if something goes out of sync.
We started with something like this (simplified for brevity):
Inquiry.find_each do |inquiry|
# Code to produce init events to kafka
end
It was imidietally clear that this is prone to race conditions cause we are producing update events same time (when update happens in main database). We decided to use database lock to fix this.
Rails (ActiveRecord) provides two locking mechanisms out of the box:
Optimistic Locking assumes that conflict is very rare to happen. It uses a version number of the record to track the changes. It raises ActiveRecord::StaleObjectError
error when other user tries to update the record while it is locked. To implement it just add a lock_version column to the table you want to place the lock and Rails will automatically check this column before updating the record.
Pessimistic locking on the other hand assumes that conflict is very likely to happen. It locks the record until the transaction is done. If the record is currently locked and the other user make a transaction, that second transaction will wait until the lock in first transaction is released.
We didn’t expect that race condition will happen that often but in the end we prefered pessimistic locking for the following reasons.
- No need to add new database column (lock_version).
- We didn’t want to deal with the conflict by rescuing
ActiveRecord::StaleObjectError
the exception and applying logic needed to resolve the conflict.
First locking implementation looked like this and it worked :)
Inquiry.find_each do |inquiry|
inquiry.with_lock do
# Code to produce init events to kafka
end
end
With calling with_lock
we were race conditions safe. It works by starting a database transaction (BEGIN..COMMIT)
and then locks the selected row against concurrent updates (FOR UPDATE
). This prevents this row from being modified or deleted by other transactions until the current transaction ends.
BEGIN
SELECT "inquiries".*
FROM "inquiries"
WHERE "inquiries"."id" = 1234
LIMIT 1
FOR UPDATE
COMMIT
We can also see that our record is reloaded in memory (“SELECT”) so that values on the record match those in the locked database row. This will prevent others from modifying or deleting that row and anyone else trying to acquire a lock will have to wait for the lock to be released.
But there was still one issue with this approach. We were first getting batch of records from database with find_each
and then running additional query for each record to lock it. We should actually lock initally when loading records.
So we switched to using lock
query method.
Inquiry.lock.find_each do |inquiry|
# Code to produce init events to kafka
end
SELECT "inquiries".* FROM "inquiries" ORDER BY "inquiries"."id" ASC LIMIT 1000 FOR UPDATE
SELECT "inquiries".* FROM "inquiries" WHERE ("inquiries"."id" > 1000) ORDER BY "inquiries"."id" ASC LIMIT 1000 FOR UPDATE
We quickly realized that this does not open new transaction and therefore lock does not work. To fix it we need to wrap it in transaction.
Inquiry.transaction do
Inquiry.lock.find_each do |inquiry|
# Code to produce init events to kafka
end
end
BEGIN
SELECT "inquiries".* FROM "inquiries" ORDER BY "inquiries"."id" ASC LIMIT 1000 FOR UPDATE
SELECT "inquiries".* FROM "inquiries" WHERE ("inquiries"."id" > 1000) ORDER BY "inquiries"."id" ASC LIMIT 1000 FOR UPDATE
COMMIT
So looks like we finally have all we wanted, we are free of race conditions and we lock multiple rows at once for each batch, looks like we did a great job.
Or maybe we just introduced major issue? Do you see the problem?
Since lock is not released until transaction ends we will not be able to write to inquiries table and put our app down until find_each
finish all batches. To fix it we should change our code to start a new transaction for each batch.
Inquiry.in_batches(of: 500).each do |relation|
relation.transaction do
relation.lock.each do |inquiry|
# Code to produce init events to kafka
end
end
end
SELECT "inquiries"."id" FROM "inquiries" ORDER BY "inquiries"."id" ASC LIMIT 500
BEGIN
SELECT "inquiries".* FROM "inquiries" WHERE "inquiries"."id" IN (1, 2, 3, ...) FOR UPDATE
COMMIT
SELECT "inquiries"."id" FROM "inquiries" WHERE ("inquiries"."id" > 500) ORDER BY "inquiries"."id" ASC LIMIT 500
BEGIN
SELECT "inquiries".* FROM "inquiries" WHERE "inquiries"."id" IN (501, 502, 503, ...) FOR UPDATE
COMMIT
Keep in mind that you should adjust batch size depending on a time it takes to run one batch not to lock your database rows for too long.
Conclusion
I would say that Rails has all you need to make tasks like this super easy. It took much more to write this blog post then to actually implement it :)
Having said that, it is sometimes hard to test this stuff. So you should be careful with the implementation cause you could easily slow your app down or have a wrong impression that you fixed it (missing transaction).
Two rules to remember!
Credits
- https://guides.rubyonrails.org/active_record_querying.html#locking-records-for-update
- https://www.postgresql.org/docs/9.0/sql-select.html#SQL-FOR-UPDATE-SHARE
- https://gist.github.com/ryanermita/464bf88e2fc292e75c9353820c2f0475
- https://www.peterdebelak.com/blog/pessimistic-locking-in-rails-by-example