Skip to content

Commit

Permalink
feat(DailyUsage): Add daily_usages:fill_history task (#2751)
Browse files Browse the repository at this point in the history
## Context

This PR is part of the Usage Revenue and unit.

Today, Lago does not offer a way to retrieve customer usage with a
granularity lower than the billing period (via invoices and fees). On
the other end, it is possible to get the current usage for a
customer/subscription, but this usage is just a “snapshot” of the usage
a the current time.

## Description

This PR adds a rake task to fill the daily usage history for a specific
organization
  • Loading branch information
vincent-pochet authored Nov 8, 2024
1 parent 9df5d09 commit 8da36e9
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 14 deletions.
2 changes: 1 addition & 1 deletion app/services/daily_usages/compute_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def call
customer: subscription.customer,
subscription:,
external_subscription_id: subscription.external_id,
usage: ::V1::Customers::UsageSerializer.new(current_usage).serialize,
usage: ::V1::Customers::UsageSerializer.new(current_usage, includes: %i[charges_usage]).serialize,
from_datetime: current_usage.from_datetime,
to_datetime: current_usage.to_datetime,
refreshed_at: timestamp
Expand Down
33 changes: 20 additions & 13 deletions app/services/invoices/customer_usage_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,37 @@

module Invoices
class CustomerUsageService < BaseService
def initialize(customer:, subscription:, apply_taxes: true)
def initialize(customer:, subscription:, apply_taxes: true, with_cache: true, max_to_datetime: nil)
super

@apply_taxes = apply_taxes
@customer = customer
@subscription = subscription
@with_cache = with_cache

# NOTE: used to force charges_to_datetime boundary
@max_to_datetime = max_to_datetime
end

def self.with_external_ids(customer_external_id:, external_subscription_id:, organization_id:, apply_taxes: true)
customer = Customer.find_by!(external_id: customer_external_id, organization_id:)
subscription = customer&.active_subscriptions&.find_by(external_id: external_subscription_id)
new(customer:, subscription:, apply_taxes:)
rescue ActiveRecord::RecordNotFound
result.not_found_failure!(resource: 'customer')
result.not_found_failure!(resource: "customer")
end

def self.with_ids(organization_id:, customer_id:, subscription_id:, apply_taxes: true)
customer = Customer.find_by(id: customer_id, organization_id:)
subscription = customer&.active_subscriptions&.find_by(id: subscription_id)
new(customer:, subscription:, apply_taxes:)
rescue ActiveRecord::RecordNotFound
result.not_found_failure!(resource: 'customer')
result.not_found_failure!(resource: "customer")
end

def call
return result.not_found_failure!(resource: 'customer') unless @customer
return result.not_allowed_failure!(code: 'no_active_subscription') if subscription.blank?
return result.not_found_failure!(resource: "customer") unless @customer
return result.not_allowed_failure!(code: "no_active_subscription") if subscription.blank?

result.usage = compute_usage
result.invoice = invoice
Expand All @@ -37,8 +41,7 @@ def call

private

attr_reader :invoice, :subscription, :apply_taxes

attr_reader :invoice, :subscription, :apply_taxes, :with_cache, :max_to_datetime
delegate :plan, to: :subscription
delegate :organization, to: :subscription

Expand Down Expand Up @@ -71,12 +74,12 @@ def compute_usage
def add_charge_fees
query = subscription.plan.charges.joins(:billable_metric)
.includes(:taxes, billable_metric: :organization, filters: {values: :billable_metric_filter})
.order(Arel.sql('lower(unaccent(billable_metrics.name)) ASC'))
.order(Arel.sql("lower(unaccent(billable_metrics.name)) ASC"))

# we're capturing the context here so we can re-use inside the threads. This will correctly propagate spans to this current span
context = OpenTelemetry::Context.current

invoice.fees = Parallel.flat_map(query.all, in_threads: ENV['LAGO_PARALLEL_THREADS_COUNT']&.to_i || 0) do |charge|
invoice.fees = Parallel.flat_map(query.all, in_threads: ENV["LAGO_PARALLEL_THREADS_COUNT"]&.to_i || 0) do |charge|
OpenTelemetry::Context.with_current(context) do
ActiveRecord::Base.connection_pool.with_connection do
charge_usage(charge)
Expand All @@ -90,11 +93,15 @@ def charge_usage(charge)
subscription:,
charge:,
to_datetime: boundaries[:charges_to_datetime],
cache: !organization.clickhouse_events_store? # NOTE: Will be turned on in the future
# NOTE: Will be turned on for clickhouse in the future
cache: organization.clickhouse_events_store? ? false : with_cache
)

applied_boundaries = boundaries
applied_boundaries = applied_boundaries.merge(charges_to_datetime: max_to_datetime) if max_to_datetime

Fees::ChargeService
.call(invoice:, charge:, subscription:, boundaries:, current_usage: true, cache_middleware:)
.call(invoice:, charge:, subscription:, boundaries: applied_boundaries, current_usage: true, cache_middleware:)
.raise_if_error!
.fees
end
Expand Down Expand Up @@ -171,10 +178,10 @@ def compute_amounts_with_provider_taxes

def provider_taxes_cache_key
[
'provider-taxes',
"provider-taxes",
subscription.id,
plan.updated_at.iso8601
].join('/')
].join("/")
end

def format_usage
Expand Down
58 changes: 58 additions & 0 deletions lib/tasks/daily_usages.rake
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# frozen_string_literal: true

require 'timecop'

namespace :daily_usages do
desc "Fill past daily usage"
task :fill_history, [:organization_id, :days_ago] => :environment do |_task, args|
abort "Missing organization_id\n\n" unless args[:organization_id]

Rails.logger.level = Logger::INFO

days_ago = (args[:days_ago] || 120).to_i.days.ago
organization = Organization.find(args[:organization_id])

subscriptions = organization.subscriptions
.where(status: [:active, :terminated])
.where.not(started_at: nil)
.where('terminated_at IS NULL OR terminated_at >= ?', days_ago)
.includes(customer: :organization)

subscriptions.find_each do |subscription|
from = subscription.started_at.to_date
if from < days_ago
from = days_ago.to_date
end

to = (subscription.terminated_at || Time.current).to_date

(from..to).each do |date|
datetime = date.in_time_zone(subscription.customer.applicable_timezone).beginning_of_day.utc

next if date == Date.today &&
DailyUsage.refreshed_at_in_timezone(datetime).where(subscription_id: subscription.id).exists?

Timecop.freeze(datetime + 5.minutes) do
usage = Invoices::CustomerUsageService.call(
customer: subscription.customer,
subscription: subscription,
apply_taxes: false,
with_cache: false,
max_to_datetime: datetime
).raise_if_error!.usage

DailyUsage.create!(
organization:,
customer: subscription.customer,
subscription:,
external_subscription_id: subscription.external_id,
usage: ::V1::Customers::UsageSerializer.new(usage, includes: %i[charges_usage]).serialize,
from_datetime: usage.from_datetime,
to_datetime: usage.to_datetime,
refreshed_at: datetime
)
end
end
end
end
end

0 comments on commit 8da36e9

Please sign in to comment.