Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Campaigns, Issues, Issue Categories Synced from TJ to ID #57

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions app/models/identity_tijuana/campaign.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
module IdentityTijuana
class Campaign < ReadWrite
self.table_name = 'campaigns'

scope :updated_campaigns, -> (last_updated_at, last_id) {
updated_campaigns_all(last_updated_at, last_id)
.order('updated_at, id')
.limit(Settings.tijuana.pull_batch_amount)
}

scope :updated_campaigns_all, -> (last_updated_at, last_id) {
where('deleted_at is null and updated_at > ? or (updated_at = ? and id > ?)', last_updated_at, last_updated_at, last_id)
}

def import(sync_id)
begin
# The campaigns table in TJ maps onto the issues table in ID.
issue = ::Issue.find_or_create_by(external_id: self.id.to_s, external_source: 'tijuana')
issue.name = self.name
issue.save!
# The campaigns.accounts_key column in TJ maps onto the issue_categories table in ID.
issue.issue_categories.clear
accounts_key = self.accounts_key
if accounts_key.present?
issue_category = ::IssueCategory.find_or_create_by(name: accounts_key)
issue.issue_categories << issue_category
end
rescue Exception => e
Rails.logger.error "Tijuana campaigns sync id:#{self.id}, error: #{e.message}"
raise
end
end
end
end
162 changes: 47 additions & 115 deletions lib/identity_tijuana.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
require "identity_tijuana/engine"
require "identity_tijuana/campaign_helper"
require "identity_tijuana/mutex_helper"
require "identity_tijuana/redis_helper"

module IdentityTijuana
SYSTEM_NAME = 'tijuana'
SYNCING = 'tag'
CONTACT_TYPE = 'email'
PULL_JOBS = [[:fetch_user_updates, 10.minutes]]
PULL_JOBS = [[:fetch_campaign_updates, 10.minutes], [:fetch_user_updates, 10.minutes]]
MEMBER_RECORD_DATA_TYPE='object'
MUTEX_EXPIRY_DURATION = 10.minutes

include CampaignHelper
include MutexHelper
include RedisHelper

def self.push(sync_id, member_ids, external_system_params)
begin
members = Member.where(id: member_ids).with_email.order(:id)
Expand Down Expand Up @@ -56,33 +63,20 @@ def self.get_push_jobs
def self.pull(sync_id, external_system_params)
begin
pull_job = JSON.parse(external_system_params)['pull_job'].to_s
self.send(pull_job, sync_id) do |records_for_import_count, records_for_import, records_for_import_scope, pull_deferred|
yield records_for_import_count, records_for_import, records_for_import_scope, pull_deferred
end
rescue => e
raise e
end
end

def self.fetch_user_updates(sync_id)
begin
mutex_acquired = acquire_mutex_lock(__method__.to_s, sync_id)
mutex_acquired = acquire_mutex_lock(pull_job, sync_id)
unless mutex_acquired
yield 0, {}, {}, true
return
end
need_another_batch = fetch_user_updates_impl(sync_id) do |records_for_import_count, records_for_import, records_for_import_scope, pull_deferred|
self.send(pull_job, sync_id) do |records_for_import_count, records_for_import, records_for_import_scope, pull_deferred|
yield records_for_import_count, records_for_import, records_for_import_scope, pull_deferred
end
ensure
release_mutex_lock(__method__.to_s) if mutex_acquired
release_mutex_lock(pull_job) if mutex_acquired # Check to make sure that mutex lock is always released.
end
schedule_pull_batch(:fetch_user_updates) if need_another_batch
schedule_pull_batch(:fetch_tagging_updates)
schedule_pull_batch(:fetch_donation_updates)
end

def self.fetch_user_updates_impl(sync_id)
def self.fetch_user_updates(sync_id)
started_at = DateTime.now
last_updated_at = get_redis_date('tijuana:users:last_updated_at')
last_id = (Sidekiq.redis { |r| r.get 'tijuana:users:last_id' } || 0).to_i
Expand All @@ -100,29 +94,30 @@ def self.fetch_user_updates_impl(sync_id)
updated_member_ids = Member.connection.execute(<<~SQL
SELECT id as member_id
FROM members
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
UNION
SELECT DISTINCT member_id
FROM addresses
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
UNION
SELECT distinct member_id
FROM custom_fields
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
UNION
SELECT DISTINCT member_id
FROM member_subscriptions
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
UNION
SELECT DISTINCT member_id
FROM phone_numbers
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
ORDER BY member_id;
WHERE (updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}')
OR id IN (
SELECT DISTINCT member_id
FROM addresses
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
UNION
SELECT distinct member_id
FROM custom_fields
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
UNION
SELECT DISTINCT member_id
FROM member_subscriptions
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
UNION
SELECT DISTINCT member_id
FROM phone_numbers
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
ORDER BY member_id
);
SQL
).map {|member_id_row| member_id_row['member_id']}

Expand Down Expand Up @@ -154,7 +149,11 @@ def self.fetch_user_updates_impl(sync_id)
false
)

updated_users.count < updated_users_all.count
release_mutex_lock(:fetch_user_updates)
need_another_batch = updated_users.count < updated_users_all.count
schedule_pull_batch(:fetch_user_updates) if need_another_batch
schedule_pull_batch(:fetch_tagging_updates)
schedule_pull_batch(:fetch_donation_updates)
end

def self.fetch_users_for_dedupe
Expand All @@ -173,22 +172,6 @@ def self.fetch_users_for_dedupe
end

def self.fetch_donation_updates(sync_id)
begin
mutex_acquired = acquire_mutex_lock(__method__.to_s, sync_id)
unless mutex_acquired
yield 0, {}, {}, true
return
end
need_another_batch = fetch_donation_updates_impl(sync_id) do |records_for_import_count, records_for_import, records_for_import_scope, pull_deferred|
yield records_for_import_count, records_for_import, records_for_import_scope, pull_deferred
end
ensure
release_mutex_lock(__method__.to_s) if mutex_acquired
end
schedule_pull_batch(:fetch_donation_updates) if need_another_batch
end

def self.fetch_donation_updates_impl(sync_id)
started_at = DateTime.now
last_updated_at = get_redis_date('tijuana:donations:last_updated_at')
last_id = (Sidekiq.redis { |r| r.get 'tijuana:donations:last_id' } || 0).to_i
Expand Down Expand Up @@ -221,26 +204,12 @@ def self.fetch_donation_updates_impl(sync_id)
false
)

updated_donations.count < updated_donations_all.count
release_mutex_lock(:fetch_donation_updates)
need_another_batch = updated_donations.count < updated_donations_all.count
schedule_pull_batch(:fetch_donation_updates) if need_another_batch
end

def self.fetch_tagging_updates(sync_id)
begin
mutex_acquired = acquire_mutex_lock(__method__.to_s, sync_id)
unless mutex_acquired
yield 0, {}, {}, true
return
end
need_another_batch = fetch_tagging_updates_impl(sync_id) do |records_for_import_count, records_for_import, records_for_import_scope, pull_deferred|
yield records_for_import_count, records_for_import, records_for_import_scope, pull_deferred
end
ensure
release_mutex_lock(__method__.to_s) if mutex_acquired
end
schedule_pull_batch(:fetch_tagging_updates) if need_another_batch
end

def self.fetch_tagging_updates_impl(sync_id)
latest_tagging_scope_limit = 50000
started_at = DateTime.now
last_id = (Sidekiq.redis { |r| r.get 'tijuana:taggings:last_id' } || 0).to_i
Expand Down Expand Up @@ -369,50 +338,13 @@ def self.fetch_tagging_updates_impl(sync_id)
false
)

results.count < tags_remaining_count
release_mutex_lock(:fetch_tagging_updates)
need_another_batch = results.count < tags_remaining_count
schedule_pull_batch(:fetch_tagging_updates) if need_another_batch
end

private

def self.acquire_mutex_lock(method_name, sync_id)
mutex_name = "#{SYSTEM_NAME}:mutex:#{method_name}"
new_mutex_expiry = DateTime.now + MUTEX_EXPIRY_DURATION
mutex_acquired = set_redis_date(mutex_name, new_mutex_expiry, true)
unless mutex_acquired
mutex_expiry = get_redis_date(mutex_name)
if mutex_expiry.past?
unless worker_currently_running?(method_name, sync_id)
delete_redis_date(mutex_name)
mutex_acquired = set_redis_date(mutex_name, new_mutex_expiry, true)
end
end
end
mutex_acquired
end

def self.release_mutex_lock(method_name)
mutex_name = "#{SYSTEM_NAME}:mutex:#{method_name}"
delete_redis_date(mutex_name)
end

def self.get_redis_date(redis_identifier, default_value=Time.at(0))
date_str = Sidekiq.redis { |r| r.get redis_identifier }
date_str ? Time.parse(date_str) : default_value
end

def self.set_redis_date(redis_identifier, date_time_value, as_mutex=false)
date_str = date_time_value.utc.to_fs(:inspect) # Ensures fractional seconds are retained
if as_mutex
Sidekiq.redis { |r| r.setnx redis_identifier, date_str }
else
Sidekiq.redis { |r| r.set redis_identifier, date_str }
end
end

def self.delete_redis_date(redis_identifier)
Sidekiq.redis { |r| r.del redis_identifier }
end

def self.schedule_pull_batch(pull_job)
sync = Sync.create!(
external_system: SYSTEM_NAME,
Expand Down
60 changes: 60 additions & 0 deletions lib/identity_tijuana/campaign_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
module IdentityTijuana
module CampaignHelper
module ClassMethods
def fetch_campaign_updates(sync_id)
started_at = DateTime.now
last_updated_at = get_redis_date('tijuana:campaigns:last_updated_at')
last_id = (Sidekiq.redis { |r| r.get 'tijuana:campaigns:last_id' } || 0).to_i
campaigns_dependent_data_cutoff = DateTime.now
updated_campaigns = IdentityTijuana::Campaign.updated_campaigns(last_updated_at, last_id)
updated_campaigns_all = IdentityTijuana::Campaign.updated_campaigns_all(last_updated_at, last_id)

updated_campaigns.each do |campaign|
campaign.import(sync_id)
end

unless updated_campaigns.empty?
campaigns_dependent_data_cutoff = updated_campaigns.last.updated_at if updated_campaigns.count < updated_campaigns_all.count
end

unless updated_campaigns.empty?
set_redis_date('tijuana:campaigns:last_updated_at', updated_campaigns.last.updated_at)
Sidekiq.redis { |r| r.set 'tijuana:campaigns:last_id', updated_campaigns.last.id }
end

set_redis_date('tijuana:campaigns:dependent_data_cutoff', campaigns_dependent_data_cutoff)

execution_time_seconds = ((DateTime.now - started_at) * 24 * 60 * 60).to_i
yield(
updated_campaigns.size,
updated_campaigns.pluck(:id),
{
scope: 'tijuana:campaigns:last_updated_at',
scope_limit: Settings.tijuana.pull_batch_amount,
from: last_updated_at,
to: updated_campaigns.empty? ? nil : updated_campaigns.last.updated_at,
started_at: started_at,
completed_at: DateTime.now,
execution_time_seconds: execution_time_seconds,
remaining_behind: updated_campaigns_all.count
},
false
)

release_mutex_lock(:fetch_campaign_updates)
need_another_batch = updated_campaigns.count < updated_campaigns_all.count
if need_another_batch
schedule_pull_batch(:fetch_campaign_updates)
else
schedule_pull_batch(:fetch_page_sequence_updates)
schedule_pull_batch(:fetch_push_updates)
end
end
end

extend ClassMethods
def self.included(other)
other.extend(ClassMethods)
end
end
end
37 changes: 37 additions & 0 deletions lib/identity_tijuana/mutex_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
require "identity_tijuana/redis_helper"

module IdentityTijuana
module MutexHelper
include IdentityTijuana::RedisHelper

module ClassMethods
private

def acquire_mutex_lock(method_name, sync_id)
mutex_name = "#{SYSTEM_NAME}:mutex:#{method_name}"
new_mutex_expiry = DateTime.now + MUTEX_EXPIRY_DURATION
mutex_acquired = set_redis_date(mutex_name, new_mutex_expiry, true)
unless mutex_acquired
mutex_expiry = get_redis_date(mutex_name)
if mutex_expiry.past?
unless worker_currently_running?(method_name, sync_id)
delete_redis_date(mutex_name)
mutex_acquired = set_redis_date(mutex_name, new_mutex_expiry, true)
end
end
end
mutex_acquired
end

def release_mutex_lock(method_name)
mutex_name = "#{SYSTEM_NAME}:mutex:#{method_name}"
delete_redis_date(mutex_name)
end
end

extend ClassMethods
def self.included(other)
other.extend(ClassMethods)
end
end
end
30 changes: 30 additions & 0 deletions lib/identity_tijuana/redis_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
module IdentityTijuana
module RedisHelper
module ClassMethods
private

def get_redis_date(redis_identifier, default_value=Time.at(0))
date_str = Sidekiq.redis { |r| r.get redis_identifier }
date_str ? Time.parse(date_str) : default_value
end

def set_redis_date(redis_identifier, date_time_value, as_mutex=false)
date_str = date_time_value.utc.strftime("%Y-%m-%d %H:%M:%S.%9N %z") # Ensures fractional seconds are retained
if as_mutex
Sidekiq.redis { |r| r.setnx redis_identifier, date_str }
else
Sidekiq.redis { |r| r.set redis_identifier, date_str }
end
end

def delete_redis_date(redis_identifier)
Sidekiq.redis { |r| r.del redis_identifier }
end
end

extend ClassMethods
def self.included(other)
other.extend(ClassMethods)
end
end
end
Loading