Skip to content

Commit

Permalink
Merge pull request #362 from e-picsa/feat/forecasts-server-2
Browse files Browse the repository at this point in the history
refactor: forecast api
  • Loading branch information
chrismclarke authored Feb 27, 2025
2 parents 9e82e68 + d2c59d8 commit 3b74488
Show file tree
Hide file tree
Showing 15 changed files with 1,228 additions and 1,045 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ export const ApiMapping = (

// TODO - handle error if filename already exists
const { error: dbError } = await supabaseService.db
.table('climate_forecasts')
.table('forecasts')
.update<IForecastUpdate>({ storage_file: fullPath })
.eq('id', row.id);
if (dbError) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import { CommonModule } from '@angular/common';
import { ChangeDetectionStrategy, Component, computed, effect, signal } from '@angular/core';
import { RouterModule } from '@angular/router';
import { RefreshSpinnerComponent } from '@picsa/components';
import { FunctionResponses } from '@picsa/server-types';
import { IDataTableOptions, PicsaDataTableComponent } from '@picsa/shared/features';
import { PicsaNotificationService } from '@picsa/shared/services/core/notification.service';
import { SupabaseService } from '@picsa/shared/services/core/supabase';
import { FunctionsHttpError } from '@supabase/supabase-js';

import { DashboardMaterialModule } from '../../../../material.module';
import { DeploymentDashboardService } from '../../../deployment/deployment.service';
Expand All @@ -16,6 +18,8 @@ interface IForecastTableRow extends IForecastRow {
file_name: string;
}

type IForecastDBAPIResponse = { data: FunctionResponses['Dashboard']['forecast-db']; error?: any };

const DISPLAY_COLUMNS: (keyof IForecastTableRow)[] = [
'country_code',
'forecast_type',
Expand Down Expand Up @@ -61,7 +65,7 @@ export class ClimateForecastPageComponent {
public activeDownloads = signal<Record<string, 'pending' | 'complete'>>({});

private get db() {
return this.supabase.db.table('climate_forecasts');
return this.supabase.db.table('forecasts');
}

constructor(
Expand Down Expand Up @@ -116,29 +120,31 @@ export class ClimateForecastPageComponent {
/** Invoke backend function that fetches forecasts from climate api and updates db */
private async refreshAPIData() {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const { country_code } = this.deploymentService.activeDeployment()!;
const formData = new FormData();
formData.append('country_code', country_code);
formData.append('query_prefix', this.apiQueryPrefix());
const { data, error } = await this.supabase.functions
.invoke('dashboard/climate-forecast-update', {
method: 'POST',
body: formData,
})
.catch((error) => ({ data: [], error }));
if (error) {
console.error(error);
const country_code = this.countryCode() as string;
const query_prefix = this.apiQueryPrefix();

const { data, error } = await this.supabase.functions.invoke<IForecastDBAPIResponse>('dashboard/forecast-db', {
method: 'POST',
body: { country_code, query_prefix },
});

// Errors thrown from functions in JS client need to wait for message
// https://github.com/supabase/functions-js/issues/45
if (error && error instanceof FunctionsHttpError) {
const errorMessage = await error.context.json();
console.error('refreshAPIData', JSON.parse(errorMessage));
this.notificationService.showErrorNotification('Forecast Update Failed. See console logs for details');
return [];
}
if (data.length > 0) {
this.forecastData.update((v) => ([] as IForecastTableRow[]).concat(this.toTableData(data), v));
}
console.log('[Api Data Updated]', data);
return data;
const forecasts = data?.[country_code] || [];

this.forecastData.update((v) => ([] as IForecastTableRow[]).concat(this.toTableData(forecasts), v));
console.log('[Api Data Updated]', { country_code, data, forecasts });

return forecasts;
}

private toTableData(data: IForecastRow[]): IForecastTableRow[] {
private toTableData(data: IForecastRow[] = []): IForecastTableRow[] {
return data
.map((el) => {
// compute file_name column from storage file path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ export type IClimateSummaryRainfallRow = Database['public']['Tables']['climate_s
};
export type IClimateSummaryRainfallInsert = Database['public']['Tables']['climate_summary_rainfall']['Insert'];

export type IForecastRow = Database['public']['Tables']['climate_forecasts']['Row'];
export type IForecastInsert = Database['public']['Tables']['climate_forecasts']['Insert'];
export type IForecastUpdate = Database['public']['Tables']['climate_forecasts']['Update'];
export type IForecastRow = Database['public']['Tables']['forecasts']['Row'];
export type IForecastInsert = Database['public']['Tables']['forecasts']['Insert'];
export type IForecastUpdate = Database['public']['Tables']['forecasts']['Update'];

export type IStationRow = Database['public']['Tables']['climate_stations']['Row'];
export type IStationInsert = Database['public']['Tables']['climate_stations']['Insert'];
2 changes: 1 addition & 1 deletion apps/picsa-server/project.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"dependsOn": [],
"outputs": [],
"options": {
"commands": ["npx supabase gen types typescript --local > supabase/types/index.ts"],
"commands": ["npx supabase gen types typescript --local > supabase/types/db.types.ts"],
"cwd": "apps/picsa-server"
}
},
Expand Down
6 changes: 3 additions & 3 deletions apps/picsa-server/supabase/functions/_shared/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ export const getFormData = async (req: Request): Promise<Form> => {
return { fields: {}, files: {} };
};

export const getJsonData = async (req: Request): Promise<Record<string, any>> => {
export const getJsonData = async <T = Record<string, any>>(req: Request): Promise<T> => {
if (req.headers.has('content-type') && req.headers.get('content-type')?.startsWith('application/json')) {
const json = await req.json();
return json;
return json as T;
}
console.error('Request does not contain json body');
return {};
return {} as T;
};
20 changes: 18 additions & 2 deletions apps/picsa-server/supabase/functions/_shared/response.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
import { corsHeaders } from './cors.ts';

export function ErrorResponse(msg: string, status = 400) {
return new Response(JSON.stringify({ msg }), {
/**
* Return an erorr response
*
* Note - if reading from js-client need to await error context body for message
* https://github.com/supabase/functions-js/issues/45
*
* @example
* ```ts
* const {dat, error} = await invokeFunction(...)
* if (error && error instanceof FunctionsHttpError) {
* const errorMessage = await error.context.json();
* const errorJson = JSON.parse(errorMessage)
* console.error(errorJson)
* }
* ```
*/
export function ErrorResponse(msg: any, status = 400) {
return new Response(JSON.stringify(msg), {
status,
headers: { ...corsHeaders, 'Content-Type': 'application/json' },
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,53 +1,59 @@
import type { Database } from '../../types/index.ts';
import type {
paths as climateApiPaths,
components as climateApiComponents,
} from '../../../../picsa-apps/dashboard/src/app/modules/climate/types/api.d.ts';
import { getClient } from '../_shared/client.ts';
import { getJsonData } from '../_shared/request.ts';
import { ErrorResponse } from '../_shared/response.ts';
import { JSONResponse } from '../_shared/response.ts';

import type {
climateApiPaths,
IApiClimateForecast,
IDBClimateForecastInsert,
IForecastDBAPIResponse,
} from './types.ts';

/**
* Read the endpoint from env. Note, if running climate api in local docker container update `.env` to:
* ```env
* CLIMATE_API_ENDPOINT=http://host.docker.internal:8000
* ```
* https://github.com/orgs/supabase/discussions/9837
*/
const CLIMATE_API_ENDPOINT = Deno.env.get('CLIMATE_API_ENDPOINT') || 'https://api.epicsa.idems.international';
const COUNTRY_CODES = ['mw', 'zm'];
export const CLIMATE_API_ENDPOINT = Deno.env.get('CLIMATE_API_ENDPOINT') || 'https://api.epicsa.idems.international';
export const ALL_COUNTRY_CODES = ['mw', 'zm'];

// Create typed fetch client from open-api definition exported by climate api
import createClient from 'openapi-fetch';
const apiClient = createClient<climateApiPaths>({ baseUrl: CLIMATE_API_ENDPOINT, mode: 'cors' });

type IDBClimateForecast = Database['public']['Tables']['climate_forecasts']['Insert'];
type IApiClimateForecast = climateApiComponents['schemas']['DocumentMetadata'];
export const apiClient = createClient<climateApiPaths>({ baseUrl: CLIMATE_API_ENDPOINT, mode: 'cors' });

/**
* Update
* Update cliamte forecast db rows
*/
export const climateForecastUpdate = async (req: Request) => {
// Validate body formData
export const forecastDB = async (req: Request) => {
// TODO - Improve validators and feedback
let { country_codes = COUNTRY_CODES, query_prefix } = await getJsonData(req);
let { country_code, query_prefix } = await getJsonData(req);

// Retrieve single country if specified, default all
const country_codes = country_code ? [country_code] : ALL_COUNTRY_CODES;

// Default query for documents stored in the current month,
if (!query_prefix) {
query_prefix = new Date().toISOString().replace(/-/, '').substring(0, 6);
}

const responses = [];
const response: IForecastDBAPIResponse = {};
const errors = [];

for (const country_code of country_codes) {
try {
const data = await getCountryUpdates(country_code, query_prefix);
responses.push({ country_code, data });
response[country_code] = data;
} catch (error) {
responses.push({ country_code, error });
errors.push(error);
}
}
return JSONResponse(responses);
if (errors.length > 0) {
return ErrorResponse(errors);
}
return JSONResponse(response);
};

async function getCountryUpdates(country_code: string, query_prefix: string) {
Expand All @@ -65,7 +71,7 @@ async function getCountryUpdates(country_code: string, query_prefix: string) {
// map api forecasts to db format and update db
const updates = mapApiForecastToDb(newForecasts, country_code);
const supabaseClient = getClient();
const { error } = await supabaseClient.from('climate_forecasts').insert(updates);
const { error } = await supabaseClient.from('forecasts').insert(updates);
if (error) {
throw error;
}
Expand All @@ -88,7 +94,7 @@ async function getDBForecasts(query: { country_code: string; query_prefix: strin
const { country_code, query_prefix } = query;
console.log('db query', query_prefix, country_code);
const { data, error } = await supabaseClient
.from('climate_forecasts')
.from('forecasts')
.select('*')
.like('id', `${query_prefix}%`)
.eq('country_code', country_code)
Expand All @@ -100,7 +106,7 @@ async function getDBForecasts(query: { country_code: string; query_prefix: strin
return data;
}

function mapApiForecastToDb(apiForecasts: IApiClimateForecast[], country_code: string): IDBClimateForecast[] {
function mapApiForecastToDb(apiForecasts: IApiClimateForecast[], country_code: string): IDBClimateForecastInsert[] {
return apiForecasts.map((v) => ({
country_code,
id: v.name,
Expand Down
100 changes: 100 additions & 0 deletions apps/picsa-server/supabase/functions/dashboard/forecast-storage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import { getClient } from '../_shared/client.ts';
import { getJsonData } from '../_shared/request.ts';
import { JSONResponse } from '../_shared/response.ts';
import { apiClient } from './forecast-db.ts';
import { IDBClimateForecastRow } from './types.ts';

interface IReqParams {
/**
* Max number of documents to retrieve. As requests are run in parallel smaller number
* reduces server workload. Default 5
*/
limit?: number;
}

/**
* Handle downloading forecast files from api and populating to supabase storage entry
* Checks DB for any entries without storage files and attempts to update
*/
export const forecastStorage = async (req: Request) => {
// ensure api up and running before sending batched requests
await apiClient.GET('/v1/status/');
const params = await getJsonData<IReqParams>(req);
const res = await new ForecastStorageUpdate().populateStorageFiles(params);
return JSONResponse(res);
};

class ForecastStorageUpdate {
supabaseClient = getClient();

private get table() {
return this.supabaseClient.from('forecasts');
}

async populateStorageFiles(params: IReqParams) {
const { limit = 5 } = params;
const pending = await this.listPendingFiles(limit);

const updates: IDBClimateForecastRow[] = [];
const errors: any[] = [];
// TODO - make parallel and allow failure
for (const { country_code, id } of pending) {
const { data, error } = await this.storeForecast(country_code, id);
if (error) {
errors.push(error);
}
if (data) {
updates.push(data);
}
}
return { data: updates, error: errors };
}

/** Check all climate forecast db entries for any that are missing corresponding storage files */
private async listPendingFiles(limit = 5) {
const query = this.table.select('*').is('storage_file', null).order('id', { ascending: false }).limit(limit);
const { data, error } = await query;
if (error) {
throw error;
}
return data;
}

/** Retrieve forecast data from API, store to supabase storage and update DB */
private async storeForecast(
country_code: string,
id: string
): Promise<{ data?: IDBClimateForecastRow; error?: any }> {
const supabaseClient = getClient();
// download from api
const req = apiClient.GET('/v1/documents/{country}/{filepath}', {
params: { path: { country: country_code as any, filepath: id } },
parseAs: 'blob',
});
const { data: fileData, response: apiResponse, error: apiError } = await req;
if (apiError) {
return { error: apiError };
}
if (fileData) {
// upload to supabase storage
const contentType = apiResponse.headers.get('content-type') as string;
const { data: uploadData, error: uploadError } = await supabaseClient.storage
.from(country_code)
.upload(`climate/forecasts/${id}`, fileData, { contentType, upsert: true });
if (uploadError) {
return { error: uploadError };
}
// update db entry
const { data: updateData, error: updateError } = await this.updateForecastDBStorageEntry(id, uploadData.fullPath);
if (updateError) {
return { error: updateError };
}
return { data: updateData?.[0] };
}
return { error: `No filedata found for ${id}` };
}

private updateForecastDBStorageEntry(id: string, storage_file: string) {
return this.table.update({ storage_file }).eq('id', id).select();
}
}
9 changes: 6 additions & 3 deletions apps/picsa-server/supabase/functions/dashboard/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
// This enables autocomplete, go to definition, etc.

import { serve } from 'https://deno.land/std@0.168.0/http/server.ts';
import { climateForecastUpdate } from './climate-forecast-update.ts';
import { forecastStorage } from './forecast-storage.ts';
import { forecastDB } from './forecast-db.ts';
import { corsHeaders } from '../_shared/cors.ts';

serve((req) => {
Expand All @@ -18,8 +19,10 @@ serve((req) => {
const endpoint = req.url.split('/').pop();

switch (endpoint) {
case 'climate-forecast-update':
return climateForecastUpdate(req);
case 'forecast-db':
return forecastDB(req);
case 'forecast-storage':
return forecastStorage(req);

default:
return new Response(`Invalid endpoint: ${endpoint}`, {
Expand Down
16 changes: 16 additions & 0 deletions apps/picsa-server/supabase/functions/dashboard/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import type { Database } from '../../types/db.types.ts';
import type { paths, components } from '../../../../picsa-apps/dashboard/src/app/modules/climate/types/api.d.ts';

export type climateApiPaths = paths;
export type climateApiComponents = components;

export type IDBClimateForecastRow = Database['public']['Tables']['forecasts']['Row'];
export type IDBClimateForecastInsert = Database['public']['Tables']['forecasts']['Insert'];

export type IApiClimateForecast = climateApiComponents['schemas']['DocumentMetadata'];

/********* Api Responses ************/

export type IForecastDBAPIResponse = {
[country_code: string]: IDBClimateForecastInsert[];
};
Loading

0 comments on commit 3b74488

Please sign in to comment.