Skip to content

Commit

Permalink
Fetch only 3 months if we already imported the model
Browse files Browse the repository at this point in the history
  • Loading branch information
AntoineAugusti committed Jan 17, 2024
1 parent 5b32985 commit b614446
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 40 deletions.
46 changes: 31 additions & 15 deletions apps/transport/lib/jobs/import_dataset_monthly_metrics_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ defmodule Transport.Jobs.ImportDatasetMonthlyMetricsJob do
@moduledoc """
Import monthly metrics related to datasets coming from the data.gouv.fr's API.
This job is executed daily and imports metrics for all datasets for the last 2 years.
This job is executed daily and imports metrics for all datasets.
If dataset metrics have not been imported previously, we well fetch metrics for the last 2 years.
Otherwise we will fetch metrics only for the last 3 months.
Records are not supposed to change in the past, except for the current month.
"""
use Oban.Worker, max_attempts: 3
Expand Down Expand Up @@ -33,30 +36,39 @@ defmodule Transport.Jobs.ImportMonthlyMetrics do
Shared methods to import monthly metrics from the data.gouv.fr's API.
"""
require Logger
import Ecto.Query

# Maximum number of months to fetch for each model
# 12*2 = 24 months
@nb_records 12 * 2

@doc """
iex> api_url(:dataset, "datagouv_id")
iex> api_url(:dataset, "datagouv_id", page_size: 24)
"https://metric-api.data.gouv.fr/api/datasets/data/?dataset_id__exact=datagouv_id&page_size=24&metric_month__sort=desc"
iex> api_url(:resource, "datagouv_id", page_size: 5)
"https://metric-api.data.gouv.fr/api/resources/data/?resource_id__exact=datagouv_id&page_size=5&metric_month__sort=desc"
"""
def api_url(model_name, datagouv_id, page_size \\ @nb_records) when model_name in [:dataset, :resource] do
def api_url(model_name, datagouv_id, page_size: page_size) when model_name in [:dataset, :resource] do
model_name
|> api_base_url()
|> URI.append_query(api_args(model_name, datagouv_id, page_size))
|> URI.append_query(api_args(model_name, datagouv_id: datagouv_id, page_size: page_size))
|> URI.to_string()
end

def import_metrics(model_name, datagouv_id) when model_name in [:dataset, :resource] do
url = api_url(model_name, datagouv_id)
# If we already imported metrics for this model, fetch only the last 3 months
url =
if already_imported?(model_name, datagouv_id) do
api_url(model_name, datagouv_id, page_size: 3)
else
api_url(model_name, datagouv_id, page_size: @nb_records)
end

case http_client().get(url, []) do
{:ok, %Req.Response{status: 200, body: body}} ->
body
|> Map.fetch!("data")
|> Enum.each(fn data -> insert_or_update(model_name, data, datagouv_id) end)
|> Enum.each(fn data -> insert_or_update(model_name, datagouv_id, data) end)

other ->
Logger.error(
Expand All @@ -65,11 +77,15 @@ defmodule Transport.Jobs.ImportMonthlyMetrics do
end
end

defp insert_or_update(
model_name,
%{"metric_month" => metric_month} = data,
datagouv_id
)
def already_imported?(:dataset, datagouv_id) do
DB.DatasetMonthlyMetric |> where([d], d.dataset_datagouv_id == ^datagouv_id) |> DB.Repo.exists?()
end

def already_imported?(:resource, datagouv_id) do
DB.ResourceMonthlyMetric |> where([d], d.resource_datagouv_id == ^datagouv_id) |> DB.Repo.exists?()
end

defp insert_or_update(model_name, datagouv_id, %{"metric_month" => metric_month} = data)
when model_name in [:dataset, :resource] do
Enum.each(metrics(model_name, data), fn {metric_name, count} ->
count = count || 0
Expand Down Expand Up @@ -100,20 +116,20 @@ defmodule Transport.Jobs.ImportMonthlyMetrics do
end

defp changeset(:dataset, %{datagouv_id: datagouv_id} = params) do
params = Map.merge(params, %{dataset_datagouv_id: datagouv_id})
params = Map.put(params, :dataset_datagouv_id, datagouv_id)
DB.DatasetMonthlyMetric.changeset(%DB.DatasetMonthlyMetric{}, params)
end

defp changeset(:resource, %{datagouv_id: datagouv_id} = params) do
params = Map.merge(params, %{resource_datagouv_id: datagouv_id})
params = Map.put(params, :resource_datagouv_id, datagouv_id)
DB.ResourceMonthlyMetric.changeset(%DB.ResourceMonthlyMetric{}, params)
end

defp api_args(:dataset, datagouv_id, page_size) do
defp api_args(:dataset, datagouv_id: datagouv_id, page_size: page_size) do
[dataset_id__exact: datagouv_id, page_size: page_size, metric_month__sort: "desc"] |> URI.encode_query()
end

defp api_args(:resource, datagouv_id, page_size) do
defp api_args(:resource, datagouv_id: datagouv_id, page_size: page_size) do
[resource_id__exact: datagouv_id, page_size: page_size, metric_month__sort: "desc"] |> URI.encode_query()
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ defmodule Transport.Jobs.ImportResourceMonthlyMetricsJob do
@moduledoc """
Import monthly metrics related to resources coming from the data.gouv.fr's API.
This job is executed daily and imports metrics for all resources for the last 2 years.
This job is executed daily and imports metrics for all resources.
If resource metrics have not been imported previously, we well fetch metrics for the last 2 years.
Otherwise we will fetch metrics only for the last 3 months.
Records are not supposed to change in the past, except for the current month.
"""
use Oban.Worker, max_attempts: 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,23 @@ defmodule Transport.Test.Transport.Jobs.ImportDatasetMonthlyMetricsTestJob do
count: 42
)

setup_http_response(datagouv_id, [
%{
"dataset_id" => datagouv_id,
"metric_month" => "2023-12",
"monthly_visit" => 1337,
"monthly_download_resource" => 43
}
])
# This datagouv_id has already been imported, we should only fetch the
# 3 latest records
refute Transport.Jobs.ImportMonthlyMetrics.already_imported?(:dataset, Ecto.UUID.generate())
assert Transport.Jobs.ImportMonthlyMetrics.already_imported?(:dataset, datagouv_id)

setup_http_response(
datagouv_id,
[
%{
"dataset_id" => datagouv_id,
"metric_month" => "2023-12",
"monthly_visit" => 1337,
"monthly_download_resource" => 43
}
],
page_size: 3
)

assert [
%DB.DatasetMonthlyMetric{
Expand Down Expand Up @@ -190,8 +199,9 @@ defmodule Transport.Test.Transport.Jobs.ImportDatasetMonthlyMetricsTestJob do
|> DB.Repo.all()
end

defp setup_http_response(datagouv_id, data) do
metrics_api_url = Transport.Jobs.ImportMonthlyMetrics.api_url(:dataset, datagouv_id)
defp setup_http_response(datagouv_id, data, options \\ []) do
page_size = Keyword.get(options, :page_size, 24)
metrics_api_url = Transport.Jobs.ImportMonthlyMetrics.api_url(:dataset, datagouv_id, page_size: page_size)

expect(Transport.Req.Mock, :get, fn ^metrics_api_url, [] ->
{:ok, %Req.Response{status: 200, body: %{"data" => data}}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,27 @@ defmodule Transport.Test.Transport.Jobs.ImportResourceMonthlyMetricsTestJob do
count: 42
)

setup_http_response(datagouv_id, [
%{
"resource_id" => datagouv_id,
"metric_month" => "2023-12",
"monthly_download_resource" => 43
},
%{
"resource_id" => datagouv_id,
"metric_month" => "2023-11",
"monthly_download_resource" => 1337
}
])
# This datagouv_id has already been imported, we should only fetch the
# 3 latest records
refute Transport.Jobs.ImportMonthlyMetrics.already_imported?(:resource, Ecto.UUID.generate())
assert Transport.Jobs.ImportMonthlyMetrics.already_imported?(:resource, datagouv_id)

setup_http_response(
datagouv_id,
[
%{
"resource_id" => datagouv_id,
"metric_month" => "2023-12",
"monthly_download_resource" => 43
},
%{
"resource_id" => datagouv_id,
"metric_month" => "2023-11",
"monthly_download_resource" => 1337
}
],
page_size: 3
)

assert [
%DB.ResourceMonthlyMetric{
Expand Down Expand Up @@ -161,8 +170,9 @@ defmodule Transport.Test.Transport.Jobs.ImportResourceMonthlyMetricsTestJob do
|> DB.Repo.all()
end

defp setup_http_response(datagouv_id, data) do
metrics_api_url = Transport.Jobs.ImportMonthlyMetrics.api_url(:resource, datagouv_id)
defp setup_http_response(datagouv_id, data, options \\ []) do
page_size = Keyword.get(options, :page_size, 24)
metrics_api_url = Transport.Jobs.ImportMonthlyMetrics.api_url(:resource, datagouv_id, page_size: page_size)

expect(Transport.Req.Mock, :get, fn ^metrics_api_url, [] ->
{:ok, %Req.Response{status: 200, body: %{"data" => data}}}
Expand Down

0 comments on commit b614446

Please sign in to comment.