Skip to content

Commit

Permalink
Campaigns, Issues, and Issue Categories now syncing from Tijuana to I…
Browse files Browse the repository at this point in the history
…dentity
  • Loading branch information
benmort committed Jul 13, 2023
1 parent 4f5332a commit e5cbd97
Show file tree
Hide file tree
Showing 13 changed files with 421 additions and 123 deletions.
53 changes: 53 additions & 0 deletions app/models/identity_tijuana/campaign.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
module IdentityTijuana
class Campaign < ReadWrite
self.table_name = 'campaigns'

scope :deleted_campaigns, -> (last_updated_at, exclude_from) {
where('deleted_at is not null and deleted_at >= ? and deleted_at < ?', last_updated_at, exclude_from)
.order('deleted_at, id')
}

scope :updated_campaigns, -> (last_updated_at, last_id) {
updated_campaigns_all(last_updated_at, last_id)
.order('updated_at, id')
.limit(Settings.tijuana.pull_batch_amount)
}

scope :updated_campaigns_all, -> (last_updated_at, last_id) {
where('updated_at > ? or (updated_at = ? and id > ?)', last_updated_at, last_updated_at, last_id)
}

def import(sync_id)
begin
# The campaigns table in TJ maps onto the issues table in ID.
issue = ::Issue.find_or_create_by(external_id: self.id.to_s, external_source: 'tijuana')
issue.name = self.name
issue.save!
# The campaigns.accounts_key column in TJ maps onto the issue_categories table in ID.
issue.issue_categories.clear
accounts_key = self.accounts_key
if accounts_key.present?
issue_category = ::IssueCategory.find_or_create_by(name: accounts_key)
issue.issue_categories << issue_category
end
rescue Exception => e
Rails.logger.error "Tijuana campaigns sync id:#{self.id}, error: #{e.message}"
raise
end
end

def erase(sync_id)
begin
issue = ::Issue.find_by(external_id: self.id.to_s, external_source: 'tijuana')
if issue.present?
issue.campaigns.destroy_all # TODO: Will need to cascade to other tables.
issue.issue_categories.clear
issue.destroy
end
rescue Exception => e
Rails.logger.error "Tijuana campaigns delete id:#{self.id}, error: #{e.message}"
raise
end
end
end
end
162 changes: 47 additions & 115 deletions lib/identity_tijuana.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
require "identity_tijuana/engine"
require "identity_tijuana/campaign_helper"
require "identity_tijuana/mutex_helper"
require "identity_tijuana/redis_helper"

module IdentityTijuana
SYSTEM_NAME = 'tijuana'
SYNCING = 'tag'
CONTACT_TYPE = 'email'
PULL_JOBS = [[:fetch_user_updates, 10.minutes]]
PULL_JOBS = [[:fetch_campaign_updates, 10.minutes], [:fetch_user_updates, 10.minutes]]
MEMBER_RECORD_DATA_TYPE='object'
MUTEX_EXPIRY_DURATION = 10.minutes

include CampaignHelper
include MutexHelper
include RedisHelper

def self.push(sync_id, member_ids, external_system_params)
begin
members = Member.where(id: member_ids).with_email.order(:id)
Expand Down Expand Up @@ -56,33 +63,20 @@ def self.get_push_jobs
def self.pull(sync_id, external_system_params)
begin
pull_job = JSON.parse(external_system_params)['pull_job'].to_s
self.send(pull_job, sync_id) do |records_for_import_count, records_for_import, records_for_import_scope, pull_deferred|
yield records_for_import_count, records_for_import, records_for_import_scope, pull_deferred
end
rescue => e
raise e
end
end

def self.fetch_user_updates(sync_id)
begin
mutex_acquired = acquire_mutex_lock(__method__.to_s, sync_id)
mutex_acquired = acquire_mutex_lock(pull_job, sync_id)
unless mutex_acquired
yield 0, {}, {}, true
return
end
need_another_batch = fetch_user_updates_impl(sync_id) do |records_for_import_count, records_for_import, records_for_import_scope, pull_deferred|
self.send(pull_job, sync_id) do |records_for_import_count, records_for_import, records_for_import_scope, pull_deferred|
yield records_for_import_count, records_for_import, records_for_import_scope, pull_deferred
end
ensure
release_mutex_lock(__method__.to_s) if mutex_acquired
release_mutex_lock(pull_job) if mutex_acquired # Check to make sure that mutex lock is always released.
end
schedule_pull_batch(:fetch_user_updates) if need_another_batch
schedule_pull_batch(:fetch_tagging_updates)
schedule_pull_batch(:fetch_donation_updates)
end

def self.fetch_user_updates_impl(sync_id)
def self.fetch_user_updates(sync_id)
started_at = DateTime.now
last_updated_at = get_redis_date('tijuana:users:last_updated_at')
last_id = (Sidekiq.redis { |r| r.get 'tijuana:users:last_id' } || 0).to_i
Expand All @@ -100,29 +94,30 @@ def self.fetch_user_updates_impl(sync_id)
updated_member_ids = Member.connection.execute(<<~SQL
SELECT id as member_id
FROM members
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
UNION
SELECT DISTINCT member_id
FROM addresses
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
UNION
SELECT distinct member_id
FROM custom_fields
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
UNION
SELECT DISTINCT member_id
FROM member_subscriptions
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
UNION
SELECT DISTINCT member_id
FROM phone_numbers
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
ORDER BY member_id;
WHERE (updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}')
OR id IN (
SELECT DISTINCT member_id
FROM addresses
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
UNION
SELECT distinct member_id
FROM custom_fields
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
UNION
SELECT DISTINCT member_id
FROM member_subscriptions
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
UNION
SELECT DISTINCT member_id
FROM phone_numbers
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
ORDER BY member_id
);
SQL
).map {|member_id_row| member_id_row['member_id']}

Expand Down Expand Up @@ -154,7 +149,11 @@ def self.fetch_user_updates_impl(sync_id)
false
)

updated_users.count < updated_users_all.count
release_mutex_lock(:fetch_user_updates)
need_another_batch = updated_users.count < updated_users_all.count
schedule_pull_batch(:fetch_user_updates) if need_another_batch
schedule_pull_batch(:fetch_tagging_updates)
schedule_pull_batch(:fetch_donation_updates)
end

def self.fetch_users_for_dedupe
Expand All @@ -173,22 +172,6 @@ def self.fetch_users_for_dedupe
end

def self.fetch_donation_updates(sync_id)
begin
mutex_acquired = acquire_mutex_lock(__method__.to_s, sync_id)
unless mutex_acquired
yield 0, {}, {}, true
return
end
need_another_batch = fetch_donation_updates_impl(sync_id) do |records_for_import_count, records_for_import, records_for_import_scope, pull_deferred|
yield records_for_import_count, records_for_import, records_for_import_scope, pull_deferred
end
ensure
release_mutex_lock(__method__.to_s) if mutex_acquired
end
schedule_pull_batch(:fetch_donation_updates) if need_another_batch
end

def self.fetch_donation_updates_impl(sync_id)
started_at = DateTime.now
last_updated_at = get_redis_date('tijuana:donations:last_updated_at')
last_id = (Sidekiq.redis { |r| r.get 'tijuana:donations:last_id' } || 0).to_i
Expand Down Expand Up @@ -221,26 +204,12 @@ def self.fetch_donation_updates_impl(sync_id)
false
)

updated_donations.count < updated_donations_all.count
release_mutex_lock(:fetch_donation_updates)
need_another_batch = updated_donations.count < updated_donations_all.count
schedule_pull_batch(:fetch_donation_updates) if need_another_batch
end

def self.fetch_tagging_updates(sync_id)
begin
mutex_acquired = acquire_mutex_lock(__method__.to_s, sync_id)
unless mutex_acquired
yield 0, {}, {}, true
return
end
need_another_batch = fetch_tagging_updates_impl(sync_id) do |records_for_import_count, records_for_import, records_for_import_scope, pull_deferred|
yield records_for_import_count, records_for_import, records_for_import_scope, pull_deferred
end
ensure
release_mutex_lock(__method__.to_s) if mutex_acquired
end
schedule_pull_batch(:fetch_tagging_updates) if need_another_batch
end

def self.fetch_tagging_updates_impl(sync_id)
latest_tagging_scope_limit = 50000
started_at = DateTime.now
last_id = (Sidekiq.redis { |r| r.get 'tijuana:taggings:last_id' } || 0).to_i
Expand Down Expand Up @@ -369,50 +338,13 @@ def self.fetch_tagging_updates_impl(sync_id)
false
)

results.count < tags_remaining_count
release_mutex_lock(:fetch_tagging_updates)
need_another_batch = results.count < tags_remaining_count
schedule_pull_batch(:fetch_tagging_updates) if need_another_batch
end

private

def self.acquire_mutex_lock(method_name, sync_id)
mutex_name = "#{SYSTEM_NAME}:mutex:#{method_name}"
new_mutex_expiry = DateTime.now + MUTEX_EXPIRY_DURATION
mutex_acquired = set_redis_date(mutex_name, new_mutex_expiry, true)
unless mutex_acquired
mutex_expiry = get_redis_date(mutex_name)
if mutex_expiry.past?
unless worker_currently_running?(method_name, sync_id)
delete_redis_date(mutex_name)
mutex_acquired = set_redis_date(mutex_name, new_mutex_expiry, true)
end
end
end
mutex_acquired
end

def self.release_mutex_lock(method_name)
mutex_name = "#{SYSTEM_NAME}:mutex:#{method_name}"
delete_redis_date(mutex_name)
end

def self.get_redis_date(redis_identifier, default_value=Time.at(0))
date_str = Sidekiq.redis { |r| r.get redis_identifier }
date_str ? Time.parse(date_str) : default_value
end

def self.set_redis_date(redis_identifier, date_time_value, as_mutex=false)
date_str = date_time_value.utc.to_fs(:inspect) # Ensures fractional seconds are retained
if as_mutex
Sidekiq.redis { |r| r.setnx redis_identifier, date_str }
else
Sidekiq.redis { |r| r.set redis_identifier, date_str }
end
end

def self.delete_redis_date(redis_identifier)
Sidekiq.redis { |r| r.del redis_identifier }
end

def self.schedule_pull_batch(pull_job)
sync = Sync.create!(
external_system: SYSTEM_NAME,
Expand Down
66 changes: 66 additions & 0 deletions lib/identity_tijuana/campaign_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
module IdentityTijuana
module CampaignHelper
module ClassMethods
def fetch_campaign_updates(sync_id)
started_at = DateTime.now
last_updated_at = get_redis_date('tijuana:campaigns:last_updated_at')
last_id = (Sidekiq.redis { |r| r.get 'tijuana:campaigns:last_id' } || 0).to_i
campaigns_dependent_data_cutoff = DateTime.now
updated_campaigns = IdentityTijuana::Campaign.updated_campaigns(last_updated_at, last_id)
updated_campaigns_all = IdentityTijuana::Campaign.updated_campaigns_all(last_updated_at, last_id)

updated_campaigns.each do |campaign|
campaign.import(sync_id)
end

unless updated_campaigns.empty?
campaigns_dependent_data_cutoff = updated_campaigns.last.updated_at if updated_campaigns.count < updated_campaigns_all.count
end

# Erase any logically deleted campaigns from ID.
deleted_campaigns = IdentityTijuana::Campaign.deleted_campaigns(last_updated_at, campaigns_dependent_data_cutoff)
deleted_campaigns.each do |campaign|
campaign.erase(sync_id)
end

unless updated_campaigns.empty?
set_redis_date('tijuana:campaigns:last_updated_at', updated_campaigns.last.updated_at)
Sidekiq.redis { |r| r.set 'tijuana:campaigns:last_id', updated_campaigns.last.id }
end

set_redis_date('tijuana:campaigns:dependent_data_cutoff', campaigns_dependent_data_cutoff)

execution_time_seconds = ((DateTime.now - started_at) * 24 * 60 * 60).to_i
yield(
updated_campaigns.size,
updated_campaigns.pluck(:id),
{
scope: 'tijuana:campaigns:last_updated_at',
scope_limit: Settings.tijuana.pull_batch_amount,
from: last_updated_at,
to: updated_campaigns.empty? ? nil : updated_campaigns.last.updated_at,
started_at: started_at,
completed_at: DateTime.now,
execution_time_seconds: execution_time_seconds,
remaining_behind: updated_campaigns_all.count
},
false
)

release_mutex_lock(:fetch_campaign_updates)
need_another_batch = updated_campaigns.count < updated_campaigns_all.count
if need_another_batch
schedule_pull_batch(:fetch_campaign_updates)
else
schedule_pull_batch(:fetch_page_sequence_updates)
schedule_pull_batch(:fetch_push_updates)
end
end
end

extend ClassMethods
def self.included(other)
other.extend(ClassMethods)
end
end
end
35 changes: 35 additions & 0 deletions lib/identity_tijuana/mutex_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
require "identity_tijuana/redis_helper"

module IdentityTijuana
module MutexHelper
include IdentityTijuana::RedisHelper

module ClassMethods
def acquire_mutex_lock(method_name, sync_id)
mutex_name = "#{SYSTEM_NAME}:mutex:#{method_name}"
new_mutex_expiry = DateTime.now + MUTEX_EXPIRY_DURATION
mutex_acquired = set_redis_date(mutex_name, new_mutex_expiry, true)
unless mutex_acquired
mutex_expiry = get_redis_date(mutex_name)
if mutex_expiry.past?
unless worker_currently_running?(method_name, sync_id)
delete_redis_date(mutex_name)
mutex_acquired = set_redis_date(mutex_name, new_mutex_expiry, true)
end
end
end
mutex_acquired
end

def release_mutex_lock(method_name)
mutex_name = "#{SYSTEM_NAME}:mutex:#{method_name}"
delete_redis_date(mutex_name)
end
end

extend ClassMethods
def self.included(other)
other.extend(ClassMethods)
end
end
end
Loading

0 comments on commit e5cbd97

Please sign in to comment.