Skip to content

Commit

Permalink
Feature/carecon 792 add Job and CheckJobStatus into go-force (#1)
Browse files Browse the repository at this point in the history
* adds jobstatus check
* adds job type definition
* adds test for job
  • Loading branch information
sBurmester authored Aug 13, 2024
1 parent 34aaec2 commit 8f491ab
Show file tree
Hide file tree
Showing 16 changed files with 399 additions and 39 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ jobs:
build-test:
strategy:
matrix:
go-version: ["1.20", 1.21.x]
go-version: ["1.21.13", 1.22.x]
os: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
- name: Setup Go ${{ matrix.go-version }}
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go-version }}
- name: Install dependencies
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.idea/
31 changes: 31 additions & 0 deletions force/JobTypes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package force

import (
"net/http"
)

type OptionsFunc func(*Job)

type BulkClient interface {
Do(req *http.Request) (*http.Response, error)
}

type ObjectMapper func(objects any) [][]string

type Job struct {
info *JobInfo
operation JobOperation
forceApi *ForceApi
objectMapper ObjectMapper
client BulkClient
apiVersion string
}

type JobInfo struct {
Id string `json:"id"`
State string `json:"state"`
NumberRecordsFailed int `json:"numberRecordsFailed"`
NumberRecordsProcessed int `json:"numberRecordsProcessed"`
JobMessage string `json:"errorMessage"`
ContentURL string `json:"contentUrl"`
}
2 changes: 1 addition & 1 deletion force/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type ForceApi struct {
apiSObjects map[string]*SObjectMetaData
apiSObjectDescriptions map[string]*SObjectDescription
apiMaxBatchSize int64
logger ForceApiLogger
logger ApiLogger
logPrefix string

httpClient *http.Client
Expand Down
16 changes: 8 additions & 8 deletions force/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
)

const (
version = "1.0.0"
version = "1.1.0"
userAgent = "go-force/" + version
contentType = "application/json"
responseType = "application/json"
)

// Get issues a GET to the specified path with the given params and put the
// umarshalled (json) result in the third parameter
// unmarshalled (json) result in the third parameter
func (forceApi *ForceApi) Get(path string, params url.Values, out interface{}) error {
return forceApi.request("GET", path, params, nil, out)
}
Expand Down Expand Up @@ -59,7 +59,7 @@ func (forceApi *ForceApi) NewRequest(method, path string, params url.Values) (*h
// Build Request
req, err := http.NewRequest(method, uri.String(), nil)
if err != nil {
return nil, fmt.Errorf("Error creating %v request: %v", method, err)
return nil, fmt.Errorf("error creating %v request: %w", method, err)
}

// Add Headers
Expand All @@ -73,7 +73,7 @@ func (forceApi *ForceApi) NewRequest(method, path string, params url.Values) (*h

func (forceApi *ForceApi) request(method, path string, params url.Values, payload, out interface{}) error {
if err := forceApi.oauth.Validate(); err != nil {
return fmt.Errorf("Error creating %v request: %v", method, err)
return fmt.Errorf("error creating %v request: %w", method, err)
}

req, err := forceApi.NewRequest(method, path, params)
Expand All @@ -87,7 +87,7 @@ func (forceApi *ForceApi) request(method, path string, params url.Values, payloa

jsonBytes, err := forcejson.Marshal(payload)
if err != nil {
return fmt.Errorf("Error marshaling encoded payload: %v", err)
return fmt.Errorf("error marshaling encoded payload: %w", err)
}

body = io.NopCloser(bytes.NewReader(jsonBytes))
Expand All @@ -98,7 +98,7 @@ func (forceApi *ForceApi) request(method, path string, params url.Values, payloa
forceApi.traceRequest(req)
resp, err := forceApi.httpClient.Do(req)
if err != nil {
return fmt.Errorf("Error sending %v request: %v", method, err)
return fmt.Errorf("error sending %v request: %w", method, err)
}
defer resp.Body.Close()
forceApi.traceResponse(resp)
Expand All @@ -110,7 +110,7 @@ func (forceApi *ForceApi) request(method, path string, params url.Values, payloa

respBytes, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("Error reading response bytes: %v", err)
return fmt.Errorf("error reading response bytes: %w", err)
}
forceApi.traceResponseBody(respBytes)

Expand Down Expand Up @@ -144,7 +144,7 @@ func (forceApi *ForceApi) request(method, path string, params url.Values, payloa

if objectUnmarshalErr != nil {
// Not a force.com api error. Just an unmarshalling error.
return fmt.Errorf("unable to unmarshal response to object: %v", objectUnmarshalErr)
return fmt.Errorf("unable to unmarshal response to object: %w", objectUnmarshalErr)
}

// Sometimes no response is expected. For example delete and update. We still have to make sure an error wasn't returned.
Expand Down
12 changes: 11 additions & 1 deletion force/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"strings"
)

// Custom Error to handle salesforce api responses.
// ApiErrors to handle salesforce api responses.
type ApiErrors []*ApiError

type ApiError struct {
Expand Down Expand Up @@ -48,3 +48,13 @@ func (e ApiError) Validate() bool {

return false
}

type FailedResultsError struct {
ApiError
SfId string `json:"sf__Id"`
}

func (e FailedResultsError) Validate() bool {
return len(e.Fields) != 0 || len(e.Message) != 0 || len(e.ErrorCode) != 0 ||
len(e.ErrorName) != 0 || len(e.ErrorDescription) != 0 || len(e.SfId) != 0
}
20 changes: 10 additions & 10 deletions force/force.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// A Go package that provides bindings to the force.com REST API
// Package force A Go package that provides bindings to the force.com REST API
//
// See http://www.salesforce.com/us/developer/docs/api_rest/
package force
Expand All @@ -11,17 +11,17 @@ import (
)

const (
testVersion = "v36.0"
testVersion = "v53.0"
testClientId = "3MVG9A2kN3Bn17hs8MIaQx1voVGy662rXlC37svtmLmt6wO_iik8Hnk3DlcYjKRvzVNGWLFlGRH1ryHwS217h"
testClientSecret = "4165772184959202901"
testUserName = "go-force@jalali.net"
testPassword = "golangrocks3"
testSecurityToken = "kAlicVmti9nWRKRiWG3Zvqtte"
testSecurityToken = "kAlicVmti9nWRKRiWG3Zvqtte" //nolint:gosec Just for testing purpose
testEnvironment = "production"
)

const (
DefaultAPIVersion = "v58.0"
DefaultAPIVersion = "v53.0"
)

type APIConfig func(*ForceApi)
Expand Down Expand Up @@ -85,7 +85,7 @@ func NewClient(cfg ...APIConfig) (*ForceApi, error) {
apiResources: make(map[string]string),
apiSObjects: make(map[string]*SObjectMetaData),
apiSObjectDescriptions: make(map[string]*SObjectDescription),
apiVersion: "v58.0",
apiVersion: "v53.0",
httpClient: http.DefaultClient,
}

Expand Down Expand Up @@ -117,7 +117,7 @@ func NewClient(cfg ...APIConfig) (*ForceApi, error) {
return nil, fmt.Errorf("failed to initialize oauth: %w", err)
}

// Init Api Resources
// Init ForceApi Resources
err = f.getApiResources()
if err != nil {
return nil, err
Expand Down Expand Up @@ -171,7 +171,7 @@ func CreateWithRefreshToken(version, clientId, accessToken, instanceUrl string)
return nil, err
}

// Init Api Resources
// Init ForceApi Resources
err := forceApi.getApiResources()
if err != nil {
return nil, err
Expand All @@ -195,7 +195,7 @@ func createTest() *ForceApi {
return forceApi
}

type ForceApiLogger interface {
type ApiLogger interface {
Printf(format string, v ...interface{})
}

Expand All @@ -206,10 +206,10 @@ type ForceApiLogger interface {
//
// Use TraceOn if you want to spy on the ForceApi requests and responses.
//
// Note that the base log.Logger type satisfies ForceApiLogger, but adapters
// Note that the base log.Logger type satisfies ApiLogger, but adapters
// can easily be written for other logging packages (e.g., the
// golang-sanctioned glog framework).
func (forceApi *ForceApi) TraceOn(prefix string, logger ForceApiLogger) {
func (forceApi *ForceApi) TraceOn(prefix string, logger ApiLogger) {
forceApi.logger = logger
if prefix == "" {
forceApi.logPrefix = prefix
Expand Down
143 changes: 143 additions & 0 deletions force/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package force

import (
"bytes"
"encoding/csv"
"errors"
"fmt"
"io"
"net/http"
"strings"
)

// CreateJob creates a new pointer to an instance of Job. Can be Modified with the given JobOptionsFuncs
func CreateJob(fapi *ForceApi, opts ...OptionsFunc) *Job {
job := &Job{
forceApi: fapi,
operation: JobOperation{},
objectMapper: func(objects any) [][]string { return nil },
info: &JobInfo{},
apiVersion: DefaultAPIVersion,
client: http.DefaultClient,
}
for _, opt := range opts {
opt(job)
}

return job
}

// JobWithHTTPClient adds a HTTPClient to the Job, to communicate with salesforce
func JobWithHTTPClient(client BulkClient) OptionsFunc {
return func(job *Job) {
job.client = client
}
}

// JobWithJobInfo adds Job Information to the Job
func JobWithJobInfo(info *JobInfo) OptionsFunc {
return func(job *Job) {
job.info = info
}
}

// JobWithApiVersion set the ApiVersion of a Job
func JobWithApiVersion(apiVersion string) OptionsFunc {
return func(job *Job) {
job.apiVersion = apiVersion
}
}

// JobWithMapper adds a given ObjectMapper to the Job
func JobWithMapper(mapper ObjectMapper) OptionsFunc {
return func(job *Job) {
job.objectMapper = mapper
}
}

// JobWithOperation adds a given JobOperation to the Job
func JobWithOperation(operation JobOperation) OptionsFunc {
return func(job *Job) {
job.operation = operation
}
}

func (job *Job) Start() error {
params := map[string]string{
"object": job.operation.Object,
"operation": job.operation.Operation,
}

if err := job.forceApi.Post("/services/data/"+job.apiVersion+"/jobs/ingest", nil, params, job.info); err != nil {
return err
}
job.operation.ProgressReporter("job created")
return nil
}

// Run marshals the given payload to csv with the given ObjectMapper
// for the Job and sends the csv to the given SalesforceJob
func (job *Job) Run(payload any) error {
if payload == nil {
return errors.New("could not send payload because it is empty")
}

body, err := job.marshalCSV(payload)
if err != nil {
return fmt.Errorf("cannot marshal csv. %w", err)
}

contentUrl := job.info.ContentURL
if !strings.HasPrefix(contentUrl, "/") {
contentUrl = "/" + contentUrl
}

req, err := http.NewRequest("PUT", fmt.Sprintf("%s%s", job.forceApi.GetInstanceURL(), contentUrl), body)
if err != nil {
return fmt.Errorf("could not create new HTTP Request. %w", err)
}

req.Header.Set("Content-Type", "text/csv")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", job.forceApi.GetAccessToken()))

res, err := job.client.Do(req)
if err != nil {
return fmt.Errorf("could not put csv bulk data. %w", err)
}
defer res.Body.Close()

if res.StatusCode != http.StatusCreated {
errb, err := io.ReadAll(res.Body)
if err != nil {
return err
}

return fmt.Errorf("unexpected StatusCode on PUT batch: %d (%s), %s", res.StatusCode, res.Status, string(errb))
}

statusURI := fmt.Sprintf("/services/data/%s/jobs/ingest/%s", job.apiVersion, job.info.Id)
params := map[string]string{
"state": "UploadComplete",
}

if err := job.forceApi.Patch(statusURI, nil, params, job.info); err != nil {
return err
}

job.operation.AddJobID(job.info.Id)

return nil
}

func (job *Job) marshalCSV(payload any) (io.Reader, error) {
// Map Objects to a csv Reader, for bulk api
var bulkData bytes.Buffer
w := csv.NewWriter(&bulkData)
var records [][]string
records = append(records, job.operation.Fields)
records = append(records, job.objectMapper(payload)...)
if err := w.WriteAll(records); err != nil {
return nil, fmt.Errorf("could not create csv from records. %w", err)
}
return bytes.NewReader(bulkData.Bytes()), nil
}
20 changes: 20 additions & 0 deletions force/jobOperation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package force

import "io"

type JobOperation struct {
Operation string
Object string
Fields []string

NumberRecordsFailed int
NumberRecordsProcessed int
ResponseMessages []string
JobIDs []string
WriteLine func(w io.Writer) bool
ProgressReporter func(msg string)
}

func (op *JobOperation) AddJobID(id string) {
op.JobIDs = append(op.JobIDs, id)
}
Loading

0 comments on commit 8f491ab

Please sign in to comment.