Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update and propagate error message in ci pipeline material if fetch fails #24

Merged
merged 24 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions git-sensor/internals/sql/CiPipelineMaterial.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type CiPipelineMaterialRepository interface {
FindById(id int) (*CiPipelineMaterial, error)
Exists(id int) (bool, error)
Save(material []*CiPipelineMaterial) ([]*CiPipelineMaterial, error)
UpdateMaterialsErroredForGitMaterialId(gitMaterialId int, materialType SourceType, errorMessage string) error
UpdateMaterialsNonErroredForGitMaterialId(gitMaterialId int, materialType SourceType) error
}

type CiPipelineMaterialRepositoryImpl struct {
Expand Down Expand Up @@ -104,3 +106,25 @@ func (impl CiPipelineMaterialRepositoryImpl) FindById(id int) (*CiPipelineMateri
Where("active = ?", true).Select()
return materials, err
}

func (impl CiPipelineMaterialRepositoryImpl) UpdateMaterialsErroredForGitMaterialId(gitMaterialId int, materialType SourceType, errorMessage string) error {
_, err := impl.dbConnection.Model(&CiPipelineMaterial{}).
Set("errored = ?", true).
Set("error_msg = ?", errorMessage).
Where("active = ?", true).
Where("git_material_id = ?", gitMaterialId).
Where("type = ?", materialType).
Update()
return err
}

func (impl CiPipelineMaterialRepositoryImpl) UpdateMaterialsNonErroredForGitMaterialId(gitMaterialId int, materialType SourceType) error {
_, err := impl.dbConnection.Model(&CiPipelineMaterial{}).
Set("errored = ?", false).
Set("error_msg = ?", "").
Where("active = ?", true).
Where("git_material_id = ?", gitMaterialId).
Where("type = ?", materialType).
Update()
return err
}
8 changes: 4 additions & 4 deletions git-sensor/internals/sql/GitMaterial.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ type GitMaterial struct {
CheckoutMsgAny string `sql:"checkout_msg_any"`
Deleted bool `sql:"deleted,notnull"`
//------
LastFetchTime time.Time `json:"last_fetch_time"`
FetchStatus bool `json:"fetch_status"`
LastFetchErrorCount int `json:"last_fetch_error_count"` //continues fetch error
FetchErrorMessage string `json:"fetch_error_message"`
LastFetchTime time.Time `json:"last_fetch_time" sql:"last_fetch_time"`
FetchStatus bool `json:"fetch_status" sql:"fetch_status,notnull"`
LastFetchErrorCount int `json:"last_fetch_error_count" sql:"last_fetch_error_count"` //continues fetch error
FetchErrorMessage string `json:"fetch_error_message" sql:"fetch_error_message"`
CloningMode string `json:"cloning_mode" sql:"-"`
FilterPattern []string `sql:"filter_pattern"`
GitProvider *GitProvider
Expand Down
52 changes: 32 additions & 20 deletions git-sensor/pkg/RepoManages.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func (impl RepoManagerImpl) UpdateRepo(gitCtx git.GitContext, material *sql.GitM
existingMaterial.GitProviderId = material.GitProviderId
existingMaterial.Deleted = material.Deleted
existingMaterial.CheckoutStatus = false
existingMaterial.FetchStatus = false
existingMaterial.FetchSubmodules = material.FetchSubmodules
existingMaterial.FilterPattern = material.FilterPattern
err = impl.materialRepository.Update(existingMaterial)
Expand Down Expand Up @@ -372,9 +373,11 @@ func (impl RepoManagerImpl) checkoutMaterial(gitCtx git.GitContext, material *sq
impl.logger.Errorw("context error in git checkout", "err", gitCtx.Err())
return material, gitCtx.Err()
} else if err != nil {
impl.logger.Errorw("error encountered in adding git repo", "materialId", material.Id, "err", err, "errMsg", errMsg)
material.CheckoutStatus = false
material.CheckoutMsgAny = err.Error()
material.FetchErrorMessage = util2.BuildDisplayErrorMessage(errMsg, err)
errorMessage := util2.BuildDisplayErrorMessage(errMsg, err)
material.CheckoutMsgAny = errorMessage
return nil, errors.New(errorMessage)
} else {
material.CheckoutLocation = checkoutLocationForFetching
material.CheckoutStatus = true
Expand Down Expand Up @@ -494,16 +497,7 @@ func (impl RepoManagerImpl) FetchChanges(pipelineMaterialId int, from string, to
func (impl RepoManagerImpl) FetchGitCommitsForBranchFixPipeline(pipelineMaterial *sql.CiPipelineMaterial, gitMaterial *sql.GitMaterial, showAll bool) (*git.MaterialChangeResp, error) {
response := &git.MaterialChangeResp{}
response.LastFetchTime = gitMaterial.LastFetchTime
if pipelineMaterial.Errored {
impl.logger.Infow("errored material ", "id", pipelineMaterial.Id, "errMsg", pipelineMaterial.ErrorMsg)
if !gitMaterial.CheckoutStatus {
response.IsRepoError = true
response.RepoErrorMsg = gitMaterial.FetchErrorMessage
} else {
response.IsBranchError = true
response.BranchErrorMsg = pipelineMaterial.ErrorMsg
}

if impl.CheckAndSetErrorTypeAndMsgInResponse(pipelineMaterial, gitMaterial, response) {
return response, nil
}
commits := make([]*git.GitCommitBase, 0)
Expand Down Expand Up @@ -533,12 +527,33 @@ func (impl RepoManagerImpl) FetchGitCommitsForBranchFixPipeline(pipelineMaterial
return response, nil
}

func (impl RepoManagerImpl) CheckAndSetErrorTypeAndMsgInResponse(pipelineMaterial *sql.CiPipelineMaterial, gitMaterial *sql.GitMaterial, response *git.MaterialChangeResp) bool {
if pipelineMaterial.Errored {
impl.logger.Infow("errored material ", "id", pipelineMaterial.Id, "gitMaterialId", gitMaterial.Id, "fetchErrorMessage", gitMaterial.FetchErrorMessage, "checkoutMsgAny", gitMaterial.CheckoutMsgAny, "errMsg", pipelineMaterial.ErrorMsg)
if !gitMaterial.CheckoutStatus {
response.IsRepoError = true
// doing this as previously fetch message was stored with checkoutStatus flag, if empty return fetchErrormessage
if len(gitMaterial.CheckoutMsgAny) > 0 {
response.RepoErrorMsg = gitMaterial.CheckoutMsgAny
} else {
response.RepoErrorMsg = gitMaterial.FetchErrorMessage
}
} else if !gitMaterial.FetchStatus {
response.IsRepoError = true
response.RepoErrorMsg = gitMaterial.FetchErrorMessage
} else {
response.IsBranchError = true
response.BranchErrorMsg = pipelineMaterial.ErrorMsg
}
return true
}
return false
}

func (impl RepoManagerImpl) FetchGitCommitsForWebhookTypePipeline(pipelineMaterial *sql.CiPipelineMaterial, gitMaterial *sql.GitMaterial) (*git.MaterialChangeResp, error) {
response := &git.MaterialChangeResp{}
response.LastFetchTime = gitMaterial.LastFetchTime
if pipelineMaterial.Errored && !gitMaterial.CheckoutStatus {
response.IsRepoError = true
response.RepoErrorMsg = gitMaterial.FetchErrorMessage
if impl.CheckAndSetErrorTypeAndMsgInResponse(pipelineMaterial, gitMaterial, response) {
return response, nil
}

Expand Down Expand Up @@ -674,15 +689,12 @@ func (impl RepoManagerImpl) GetLatestCommitForBranch(gitCtx git.GitContext, pipe
impl.logger.Warn("repository is up to date")
}
if err != nil {
gitMaterial.CheckoutStatus = false
gitMaterial.CheckoutMsgAny = err.Error()
gitMaterial.FetchStatus = false
gitMaterial.FetchErrorMessage = util2.BuildDisplayErrorMessage(errMsg, err)

impl.logger.Errorw("error in fetching the repository ", "gitMaterial", gitMaterial, "err", err)
return nil, err
} else {
gitMaterial.CheckoutStatus = true

gitMaterial.FetchStatus = true
}

err = impl.materialRepository.Update(gitMaterial)
Expand Down
45 changes: 28 additions & 17 deletions git-sensor/pkg/git/Util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@ import (
)

const (
GIT_BASE_DIR = "/git-base/"
SSH_PRIVATE_KEY_DIR = GIT_BASE_DIR + "ssh-keys/"
TLS_FILES_DIR = GIT_BASE_DIR + "tls-files/"
SSH_PRIVATE_KEY_FILE_NAME = "ssh_pvt_key"
TLS_KEY_FILE_NAME = "tls_key.key"
TLS_CERT_FILE_NAME = "tls_cert.pem"
CA_CERT_FILE_NAME = "ca_cert.pem"
CLONE_TIMEOUT_SEC = 600
FETCH_TIMEOUT_SEC = 30
GITHUB_PROVIDER = "github.com"
GITLAB_PROVIDER = "gitlab.com"
CloningModeShallow = "SHALLOW"
CloningModeFull = "FULL"
GIT_BASE_DIR = "/git-base/"
SSH_PRIVATE_KEY_DIR = GIT_BASE_DIR + "ssh-keys/"
TLS_FILES_DIR = GIT_BASE_DIR + "tls-files/"
SSH_PRIVATE_KEY_FILE_NAME = "ssh_pvt_key"
TLS_KEY_FILE_NAME = "tls_key.key"
TLS_CERT_FILE_NAME = "tls_cert.pem"
CA_CERT_FILE_NAME = "ca_cert.pem"
CLONE_TIMEOUT_SEC = 600
FETCH_TIMEOUT_SEC = 30
GITHUB_PROVIDER = "github.com"
GITLAB_PROVIDER = "gitlab.com"
CloningModeShallow = "SHALLOW"
CloningModeFull = "FULL"
NO_COMMIT_GIT_ERROR_MESSAGE = "unknown revision or path not in the working tree."
NO_COMMIT_CUSTOM_ERROR_MESSAGE = "No Commit Found"
)
Expand Down Expand Up @@ -79,10 +79,13 @@ func GetUserNamePassword(gitProvider *sql.GitProvider) (userName, password strin
return "", "", fmt.Errorf("unsupported %s", gitProvider.AuthMode)
}
}

func GetOrCreateSshPrivateKeyOnDisk(gitProviderId int, sshPrivateKeyContent string) (privateKeyPath string, err error) {
func getSSHPrivateKeyFolderAndFilePath(gitProviderId int) (string, string) {
sshPrivateKeyFolderPath := path.Join(SSH_PRIVATE_KEY_DIR, strconv.Itoa(gitProviderId))
sshPrivateKeyFilePath := path.Join(sshPrivateKeyFolderPath, SSH_PRIVATE_KEY_FILE_NAME)
return sshPrivateKeyFolderPath, sshPrivateKeyFilePath
}
func GetOrCreateSshPrivateKeyOnDisk(gitProviderId int, sshPrivateKeyContent string) (privateKeyPath string, err error) {
sshPrivateKeyFolderPath, sshPrivateKeyFilePath := getSSHPrivateKeyFolderAndFilePath(gitProviderId)

// if file exists then return
if _, err := os.Stat(sshPrivateKeyFilePath); os.IsExist(err) {
Expand All @@ -104,9 +107,17 @@ func GetOrCreateSshPrivateKeyOnDisk(gitProviderId int, sshPrivateKeyContent stri
return sshPrivateKeyFilePath, nil
}

func CheckIfSshPrivateKeyExists(gitProviderId int) bool {
_, sshPrivateKeyFilePath := getSSHPrivateKeyFolderAndFilePath(gitProviderId)
// if file exists then return true
if _, err := os.Stat(sshPrivateKeyFilePath); os.IsExist(err) {
return true
}
return false
}

func CreateOrUpdateSshPrivateKeyOnDisk(gitProviderId int, sshPrivateKeyContent string) error {
sshPrivateKeyFolderPath := path.Join(SSH_PRIVATE_KEY_DIR, strconv.Itoa(gitProviderId))
sshPrivateKeyFilePath := path.Join(sshPrivateKeyFolderPath, SSH_PRIVATE_KEY_FILE_NAME)
sshPrivateKeyFolderPath, sshPrivateKeyFilePath := getSSHPrivateKeyFolderAndFilePath(gitProviderId)

// if file exists then delete file
if _, err := os.Stat(sshPrivateKeyFilePath); os.IsExist(err) {
Expand Down
78 changes: 59 additions & 19 deletions git-sensor/pkg/git/Watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,48 @@ func (impl GitWatcherImpl) pollAndUpdateGitMaterial(materialReq *sql.GitMaterial
return material, err
}

// Helper function to handle SSH key creation and retry fetching material
func (impl GitWatcherImpl) handleSshKeyCreationAndRetry(gitCtx GitContext, material *sql.GitMaterial, location string, gitProvider *sql.GitProvider) (updated bool, repo *GitRepository, errMsg string, err error) {
if strings.Contains(material.CheckoutLocation, "/.git") {
location, _, _, err = impl.repositoryManager.GetCheckoutLocationFromGitUrl(material, gitCtx.CloningMode)
if err != nil {
impl.logger.Errorw("error in getting clone location ", "material", material, "errMsg", errMsg, "err", err)
return false, nil, errMsg, err
}
}
_, errMsg, err = impl.repositoryManager.CreateSshFileIfNotExistsAndConfigureSshCommand(gitCtx, location, gitProvider.Id, gitProvider.SshPrivateKey)
if err != nil {
impl.logger.Errorw("error in creating/configuring ssh private key on disk ", "repo", material.Url, "gitProviderId", gitProvider.Id, "errMsg", errMsg, "err", err)
return false, nil, errMsg, err
} else {
impl.logger.Info("Retrying fetching for", "repo", material.Url)
updated, repo, errMsg, err = impl.FetchAndUpdateMaterial(gitCtx, material, location)
if err != nil {
impl.logAndUpdateDbError(material.Id, errMsg)
impl.logger.Errorw("error in fetching material details in retry", "repo", material.Url, "err", err)
return false, nil, errMsg, err
}
}
return updated, repo, errMsg, err
}

// Helper function to log and update database with error message for CI pipeline material
func (impl GitWatcherImpl) logAndUpdateDbError(materialId int, errMsg string) {
dbErr := impl.ciPipelineMaterialRepository.UpdateMaterialsErroredForGitMaterialId(materialId, sql.SOURCE_TYPE_BRANCH_FIXED, errMsg)
if dbErr != nil {
// made this non-blocking
impl.logger.Errorw("error encountered in updating ci pipeline material", "materialId", materialId, "dbErr", dbErr)
}
}

func (impl GitWatcherImpl) logAndUpdateDbNonError(materialId int) {
dbErr := impl.ciPipelineMaterialRepository.UpdateMaterialsNonErroredForGitMaterialId(materialId, sql.SOURCE_TYPE_BRANCH_FIXED)
if dbErr != nil {
// made this non-blocking
impl.logger.Errorw("error encountered in updating ci pipeline material", "materialId", materialId, "dbErr", dbErr)
}
}

func (impl GitWatcherImpl) pollGitMaterialAndNotify(material *sql.GitMaterial) (string, error) {
gitProvider := material.GitProvider
userName, password, err := GetUserNamePassword(gitProvider)
Expand All @@ -201,31 +243,22 @@ func (impl GitWatcherImpl) pollGitMaterialAndNotify(material *sql.GitMaterial) (
if err != nil {
impl.logger.Errorw("error in fetching material details ", "repo", material.Url, "errMsg", errMsg, "err", err)
// there might be the case if ssh private key gets flush from disk, so creating and single retrying in this case
if gitProvider.AuthMode == sql.AUTH_MODE_SSH {
if strings.Contains(material.CheckoutLocation, "/.git") {
location, _, _, err = impl.repositoryManager.GetCheckoutLocationFromGitUrl(material, gitCtx.CloningMode)
if err != nil {
impl.logger.Errorw("error in getting clone location ", "material", material, "errMsg", errMsg, "err", err)
return "", err
}
}
_, errMsg, err = impl.repositoryManager.CreateSshFileIfNotExistsAndConfigureSshCommand(gitCtx, location, gitProvider.Id, gitProvider.SshPrivateKey)
// Retry mechanism for SSH-based authentication
if gitProvider.AuthMode == sql.AUTH_MODE_SSH && !CheckIfSshPrivateKeyExists(gitProvider.Id) {
updated, repo, errMsg, err = impl.handleSshKeyCreationAndRetry(gitCtx, material, location, gitProvider)
if err != nil {
impl.logger.Errorw("error in creating/configuring ssh private key on disk ", "repo", material.Url, "gitProviderId", gitProvider.Id, "errMsg", errMsg, "err", err)
impl.logger.Errorw("error in fetching material details in retry", "repo", material.Url, "materialId", material.Id, "errMsg", errMsg, "err", err)
return errMsg, err
} else {
impl.logger.Info("Retrying fetching for", "repo", material.Url)
updated, repo, errMsg, err = impl.FetchAndUpdateMaterial(gitCtx, material, location)
if err != nil {
impl.logger.Errorw("error in fetching material details in retry", "repo", material.Url, "err", err)
return errMsg, err
}
}
} else {
// Log and update database if retry not possible or SSH key already exists
impl.logAndUpdateDbError(material.Id, errMsg)
return errMsg, err
}
}
if !updated {
// update set errored false in ci pipeline material as fetch is successful
impl.logAndUpdateDbNonError(material.Id)
return "", nil
}
materials, err := impl.ciPipelineMaterialRepository.FindByGitMaterialId(material.Id)
Expand All @@ -241,7 +274,7 @@ func (impl GitWatcherImpl) pollGitMaterialAndNotify(material *sql.GitMaterial) (
if material.Type != sql.SOURCE_TYPE_BRANCH_FIXED {
continue
}
impl.logger.Debugw("Running changesBySinceRepository for material - ", material)
impl.logger.Debugw("Running changesBySinceRepository for material - ", "materialId", material.Id)
impl.logger.Debugw("---------------------------------------------------------- ")
// parse env variables here, then search for the count field and pass here.
lastSeenHash := ""
Expand All @@ -251,7 +284,9 @@ func (impl GitWatcherImpl) pollGitMaterialAndNotify(material *sql.GitMaterial) (
}
fetchCount := impl.configuration.GitHistoryCount
commits, errMsg, err := impl.repositoryManager.ChangesSinceByRepository(gitCtx, repo, material.Value, lastSeenHash, "", fetchCount, checkoutLocation, false)
impl.logger.Debugw("Got changesBySinceRepository for material ", "commits", commits, "errMsg", errMsg, "err", err)
if err != nil {
impl.logger.Errorw("error in fetching ChangesSinceByRepository", "err", err, "errMsg", errMsg, "Value", material.Value)
material.Errored = true
material.ErrorMsg = util2.BuildDisplayErrorMessage(errMsg, err)

Expand Down Expand Up @@ -287,6 +322,11 @@ func (impl GitWatcherImpl) pollGitMaterialAndNotify(material *sql.GitMaterial) (
updatedMaterialsModel = append(updatedMaterialsModel, material)
}
middleware.GitMaterialUpdateCounter.WithLabelValues().Inc()
} else if len(commits) == 0 {
// no new commit found, this is the case when no new commits are found but all git operations are working fine so update the error save in pipeline material to false
material.Errored = false
material.ErrorMsg = ""
updatedMaterialsModel = append(updatedMaterialsModel, material)
}
}
if len(updatedMaterialsModel) > 0 {
Expand Down Expand Up @@ -314,7 +354,7 @@ func (impl GitWatcherImpl) FetchAndUpdateMaterial(gitCtx GitContext, material *s
updated, repo, errMsg, err := impl.repositoryManager.Fetch(gitCtx, material.Url, location)
if err == nil {
material.CheckoutLocation = location
material.CheckoutStatus = true
material.FetchStatus = true
}
return updated, repo, errMsg, err
}
Expand Down
1 change: 1 addition & 0 deletions git-sensor/scripts/sql/14_git_material_update.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- NO down Script as we can have messages stored in the table with more than 250 char so down will break.
1 change: 1 addition & 0 deletions git-sensor/scripts/sql/14_git_material_update.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE git_material ALTER COLUMN checkout_msg_any TYPE text;