Skip to content

Commit

Permalink
use snapshots when initiating github sync (#2273)
Browse files Browse the repository at this point in the history
* include snapshots when initiating github sync

* update changelog

* Move querying responsiblity out of expoirt utils

* make credo happy

---------

Co-authored-by: Stuart Corbishley <corbish@gmail.com>
  • Loading branch information
midigofrank and stuartc committed Jul 30, 2024
1 parent e2d780d commit dbc9527
Show file tree
Hide file tree
Showing 11 changed files with 291 additions and 58 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ and this project adheres to
- Make root layout configurable
[#2310](https://github.com/OpenFn/lightning/pull/2310)

- Use snapshots when initiating Github Sync
[#1827](https://github.com/OpenFn/lightning/issues/1827)

### Fixed

## [v2.7.11] - 2024-07-26
Expand Down
66 changes: 38 additions & 28 deletions lib/lightning/export_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ defmodule Lightning.ExportUtils do
"""

alias Lightning.Projects
alias Lightning.Repo
alias Lightning.Workflows
alias Lightning.Workflows.Snapshot

defp hyphenate(string) when is_binary(string) do
string |> String.replace(" ", "-")
Expand Down Expand Up @@ -41,36 +41,38 @@ defmodule Lightning.ExportUtils do
else: base
end

defp edge_to_treenode(%{source_job_id: nil} = edge, triggers) do
edge = Repo.preload(edge, [:source_trigger, :target_job])
trigger_name = edge.source_trigger.type |> Atom.to_string()
target_job = edge.target_job.name |> hyphenate()
defp edge_to_treenode(%{source_job_id: nil} = edge, triggers, jobs) do
source_trigger =
Enum.find(triggers, fn t -> t.id == edge.source_trigger_id end)

target_job = Enum.find(jobs, fn j -> j.id == edge.target_job_id end)
trigger_name = to_string(source_trigger.type)
target_job_name = hyphenate(target_job.name)

%{
name: "#{trigger_name}->#{target_job}",
source_trigger: find_trigger_name(edge, triggers)
name: "#{trigger_name}->#{target_job_name}",
source_trigger: trigger_name
}
|> merge_edge_common_fields(edge)
|> merge_edge_common_fields(edge, target_job)
end

defp edge_to_treenode(%{source_trigger_id: nil} = edge, _unused_triggers) do
edge = Repo.preload(edge, [:source_job, :target_job])
source_job = edge.source_job.name |> hyphenate()
target_job = edge.target_job.name |> hyphenate()
defp edge_to_treenode(%{source_trigger_id: nil} = edge, _triggers, jobs) do
target_job = Enum.find(jobs, fn j -> j.id == edge.target_job_id end)
source_job = Enum.find(jobs, fn j -> j.id == edge.source_job_id end)
source_job_name = hyphenate(source_job.name)
target_job_name = hyphenate(target_job.name)

%{
name: "#{source_job}->#{target_job}",
source_job: source_job
name: "#{source_job_name}->#{target_job_name}",
source_job: source_job_name
}
|> merge_edge_common_fields(edge)
|> merge_edge_common_fields(edge, target_job)
end

defp merge_edge_common_fields(json, edge) do
target_job = edge.target_job.name |> hyphenate()

defp merge_edge_common_fields(json, edge, target_job) do
json
|> Map.merge(%{
target_job: target_job,
target_job: hyphenate(target_job.name),
condition_type: edge.condition_type |> Atom.to_string(),
enabled: edge.enabled,
node_type: :edge
Expand All @@ -87,12 +89,6 @@ defmodule Lightning.ExportUtils do
end)
end

defp find_trigger_name(edge, triggers) do
[trigger] = Enum.filter(triggers, fn t -> t.id == edge.source_trigger_id end)

trigger.name
end

defp pick_and_sort(map) do
ordering_map = %{
project: [:name, :description, :credentials, :globals, :workflows],
Expand Down Expand Up @@ -229,17 +225,21 @@ defmodule Lightning.ExportUtils do
edges =
workflow.edges
|> Enum.sort_by(& &1.inserted_at, NaiveDateTime)
|> Enum.map(fn e -> edge_to_treenode(e, triggers) end)
|> Enum.map(fn e ->
edge_to_treenode(e, workflow.triggers, workflow.jobs)
end)

flow_map = %{jobs: jobs, edges: edges, triggers: triggers}

flow_map
|> to_workflow_yaml_tree(workflow)
end

def generate_new_yaml(project_id) do
project = Projects.get_project!(project_id)
@spec generate_new_yaml(Projects.Project.t(), [Snapshot.t()] | nil) ::
{:ok, binary()}
def generate_new_yaml(project, snapshots \\ nil)

def generate_new_yaml(project, nil) do
yaml =
project
|> Workflows.get_workflows_for()
Expand All @@ -248,4 +248,14 @@ defmodule Lightning.ExportUtils do

{:ok, yaml}
end

def generate_new_yaml(project, snapshots) when is_list(snapshots) do
yaml =
snapshots
|> Enum.sort_by(& &1.name)
|> build_yaml_tree(project)
|> to_new_yaml()

{:ok, yaml}
end
end
13 changes: 9 additions & 4 deletions lib/lightning/projects.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ defmodule Lightning.Projects do
alias Lightning.RunStep
alias Lightning.Services.ProjectHook
alias Lightning.Workflows.Job
alias Lightning.Workflows.Snapshot
alias Lightning.Workflows.Trigger
alias Lightning.Workflows.Workflow
alias Lightning.WorkOrder
Expand Down Expand Up @@ -558,11 +559,15 @@ defmodule Lightning.Projects do
{:ok, string}
"""
@spec export_project(:yaml, any) :: {:ok, binary}
def export_project(:yaml, project_id) do
{:ok, yaml} = ExportUtils.generate_new_yaml(project_id)
@spec export_project(atom(), Ecto.UUID.t(), [Ecto.UUID.t()] | nil) ::
{:ok, binary}
def export_project(:yaml, project_id, snapshot_ids \\ nil) do
project = get_project!(project_id)

{:ok, yaml}
snapshots =
if snapshot_ids, do: Snapshot.get_all_by_ids(snapshot_ids), else: nil

{:ok, _yaml} = ExportUtils.generate_new_yaml(project, snapshots)
end

@doc """
Expand Down
11 changes: 9 additions & 2 deletions lib/lightning/projects/provisioner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,11 @@ defmodule Lightning.Projects.Provisioner do
Exclude deleted workflows.
"""
@spec preload_dependencies(Project.t()) :: Project.t()
def preload_dependencies(project) do
@spec preload_dependencies(Project.t(), nil | [Ecto.UUID.t(), ...]) ::
Project.t()
def preload_dependencies(project, snapshots \\ nil)

def preload_dependencies(project, nil) do
w = from(w in Workflow, where: is_nil(w.deleted_at))

Repo.preload(
Expand All @@ -146,6 +149,10 @@ defmodule Lightning.Projects.Provisioner do
)
end

def preload_dependencies(project, snapshots) when is_list(snapshots) do
%{project | workflows: Snapshot.get_all_by_ids(snapshots)}
end

defp project_changeset(project, attrs) do
project
|> cast(attrs, [:id, :name, :description])
Expand Down
50 changes: 42 additions & 8 deletions lib/lightning/version_control/version_control.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ defmodule Lightning.VersionControl do
alias Lightning.VersionControl.GithubError
alias Lightning.VersionControl.ProjectRepoConnection
alias Lightning.VersionControl.VersionControlUsageLimiter
alias Lightning.Workflows.Snapshot
alias Lightning.Workflows.Workflow

defdelegate subscribe(user), to: Events

Expand Down Expand Up @@ -130,6 +132,8 @@ defmodule Lightning.VersionControl do
VersionControlUsageLimiter.limit_github_sync(
repo_connection.project_id
),
snapshots <-
list_or_create_snapshots_for_project(repo_connection.project_id),
{:ok, client} <-
GithubClient.build_installation_client(
repo_connection.github_installation_id
Expand All @@ -143,20 +147,50 @@ defmodule Lightning.VersionControl do
pull_yml_target_path() |> Path.basename(),
%{
ref: default_branch,
inputs: %{
projectId: repo_connection.project_id,
apiSecretName: api_secret_name(repo_connection),
pathToConfig: config_target_path(repo_connection),
branch: repo_connection.branch,
commitMessage:
"user #{user_email} initiated a sync from Lightning"
}
inputs:
%{
projectId: repo_connection.project_id,
apiSecretName: api_secret_name(repo_connection),
pathToConfig: config_target_path(repo_connection),
branch: repo_connection.branch,
commitMessage:
"user #{user_email} initiated a sync from Lightning"
}
|> maybe_add_snapshots(snapshots)
}
) do
:ok
end
end

defp list_or_create_snapshots_for_project(project_id) do
current_query =
from w in Workflow,
left_join: s in assoc(w, :snapshots),
on: s.lock_version == w.lock_version,
where: w.project_id == ^project_id and is_nil(w.deleted_at),
select: {w, s.id}

workflows = Repo.all(current_query)

Enum.reduce(workflows, [], fn {workflow, snapshot_id}, acc ->
if is_nil(snapshot_id) do
{:ok, snapshot} = Snapshot.get_or_create_latest_for(workflow)
[snapshot.id | acc]
else
[snapshot_id | acc]
end
end)
end

defp maybe_add_snapshots(inputs, snapshot_ids) do
if Enum.empty?(snapshot_ids) do
inputs
else
Map.put(inputs, :snapshots, Enum.join(snapshot_ids, " "))
end
end

def fetch_user_installations(user) do
with {:ok, access_token} <- fetch_user_access_token(user),
{:ok, client} <- GithubClient.build_bearer_client(access_token) do
Expand Down
7 changes: 7 additions & 0 deletions lib/lightning/workflows/snapshot.ex
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ defmodule Lightning.Workflows.Snapshot do
|> Repo.all()
end

def get_all_by_ids(ids) do
from(s in __MODULE__,
where: s.id in ^ids
)
|> Repo.all()
end

@doc """
Get the latest snapshot for a workflow, based on the lock_version.
Expand Down
7 changes: 4 additions & 3 deletions lib/lightning_web/controllers/api/provisioning_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ defmodule LightningWeb.API.ProvisioningController do
conn.assigns.current_resource,
project
),
project <- Provisioner.preload_dependencies(project) do
project <-
Provisioner.preload_dependencies(project, params["snapshots"]) do
conn
|> put_status(:ok)
|> render("create.json", project: project)
Expand All @@ -78,7 +79,7 @@ defmodule LightningWeb.API.ProvisioningController do
Returns a description of the project as yaml. Same as the export project to
yaml button (see Downloads Controller) but made for the API.
"""
def show_yaml(conn, %{"id" => id}) do
def show_yaml(conn, %{"id" => id} = params) do
with %Projects.Project{} = project <-
Projects.get_project(id) || {:error, :not_found},
:ok <-
Expand All @@ -88,7 +89,7 @@ defmodule LightningWeb.API.ProvisioningController do
conn.assigns.current_resource,
project
) do
{:ok, yaml} = Projects.export_project(:yaml, id)
{:ok, yaml} = Projects.export_project(:yaml, id, params["snapshots"])

conn
|> put_resp_content_type("text/yaml")
Expand Down
35 changes: 23 additions & 12 deletions lib/lightning_web/controllers/api/provisioning_json.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule LightningWeb.API.ProvisioningJSON do
alias Lightning.Projects.Project
alias Lightning.Workflows.Edge
alias Lightning.Workflows.Job
alias Lightning.Workflows.Snapshot
alias Lightning.Workflows.Trigger
alias Lightning.Workflows.Workflow

Expand All @@ -21,47 +22,57 @@ defmodule LightningWeb.API.ProvisioningJSON do
def as_json(%Project{} = project) do
Ecto.embedded_dump(project, :json)
|> Map.put(
"workflows",
:workflows,
project.workflows
|> Enum.sort_by(& &1.inserted_at, NaiveDateTime)
|> Enum.map(&as_json/1)
)
end

def as_json(%Workflow{} = workflow) do
Ecto.embedded_dump(workflow, :json)
def as_json(%module{} = workflow_or_snapshot)
when module in [Workflow, Snapshot] do
workflow_id =
if module == Workflow do
workflow_or_snapshot.id
else
workflow_or_snapshot.workflow_id
end

workflow_or_snapshot
|> Ecto.embedded_dump(:json)
|> Map.put(:id, workflow_id)
|> Map.put(
"jobs",
workflow.jobs
:jobs,
workflow_or_snapshot.jobs
|> Enum.sort_by(& &1.inserted_at, NaiveDateTime)
|> Enum.map(&as_json/1)
)
|> Map.put(
"triggers",
workflow.triggers
:triggers,
workflow_or_snapshot.triggers
|> Enum.sort_by(& &1.inserted_at, NaiveDateTime)
|> Enum.map(&as_json/1)
)
|> Map.put(
"edges",
workflow.edges
:edges,
workflow_or_snapshot.edges
|> Enum.sort_by(& &1.inserted_at, NaiveDateTime)
|> Enum.map(&as_json/1)
)
end

def as_json(%Job{} = job) do
def as_json(%module{} = job) when module in [Job, Snapshot.Job] do
Ecto.embedded_dump(job, :json)
|> Map.take(~w(id adaptor body name)a)
end

def as_json(%Trigger{} = trigger) do
def as_json(%module{} = trigger) when module in [Trigger, Snapshot.Trigger] do
Ecto.embedded_dump(trigger, :json)
|> Map.take(~w(id type cron_expression enabled)a)
|> drop_keys_with_nil_value()
end

def as_json(%Edge{} = edge) do
def as_json(%module{} = edge) when module in [Edge, Snapshot.Edge] do
Ecto.embedded_dump(edge, :json)
|> Map.take(~w(
id enabled source_job_id source_trigger_id
Expand Down
Loading

0 comments on commit dbc9527

Please sign in to comment.