Skip to content

Commit

Permalink
New version of updater batch
Browse files Browse the repository at this point in the history
  • Loading branch information
CriMDev97 committed Nov 21, 2023
1 parent 9cc091b commit 2012e3e
Show file tree
Hide file tree
Showing 26 changed files with 264 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,30 @@ public class AutoUpdaterController {
private final DeadEventService deadEventService;


public void scheduleUpdater() {
log.info("ScheduleUpdater Started: {}", dateTimeFormatter.format(LocalDateTime.now()));
Long lastEventId = this.tracingBatchService.getLastEventIdByTracingBatch();
lastEventId = updateRecursiveFlow(lastEventId);
tracingBatchService.terminateTracingBatch(TracingBatchStateEnum.ENDED, lastEventId);
public void scheduleUpdater(String applicationType) {
log.info("ScheduleUpdater of {} started at {}", applicationType, dateTimeFormatter.format(LocalDateTime.now()));
Long lastEventId = this.tracingBatchService.getLastEventIdByTracingBatchAndType(applicationType);
lastEventId = updateRecursiveFlow(lastEventId, applicationType);
tracingBatchService.terminateTracingBatch(TracingBatchStateEnum.ENDED, lastEventId, applicationType);
}

private Long updateRecursiveFlow(Long lastEventId) {
private Long updateRecursiveFlow(Long lastEventId, String type) {
try {
EventsDto events = this.interopService.getAgreementsAndEServices(lastEventId);
updateEvents(events.getEvents());
return updateRecursiveFlow(events.getLastEventId());
EventsDto events = this.interopService.getEventsByType(lastEventId, type);
updateEvents(events.getEvents(), type);
return updateRecursiveFlow(events.getLastEventId(), type);
} catch (PDNDConnectionResetException ex) {
tracingBatchService.terminateTracingBatch(TracingBatchStateEnum.ENDED, ex.getEventId());
tracingBatchService.terminateTracingBatch(TracingBatchStateEnum.ENDED, ex.getEventId(), type);
throw ex;
} catch (PDNDNoEventsException ex) {
return lastEventId;
} catch (PDNDEventException ex) {
tracingBatchService.terminateTracingBatch(TracingBatchStateEnum.ENDED_WITH_ERROR, ex.getEventId());
tracingBatchService.terminateTracingBatch(TracingBatchStateEnum.ENDED_WITH_ERROR, ex.getEventId(), type);
throw ex;
}
}

private void updateEvents(List<EventDto> events){
private void updateEvents(List<EventDto> events, String type){
for (EventDto event : events) {
try {
if (Predicates.isEServiceEvent().test(event)) {
Expand All @@ -60,7 +60,7 @@ private void updateEvents(List<EventDto> events){
}
}
catch (PDNDEventException ex) {
deadEventService.saveDeadEvent(event);
deadEventService.saveDeadEvent(event, type);
throw ex;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
public class TracingBatchEntity {
public static final String COLUMN_BATCH_ID = "batch_id";
public static final String COLUMN_LAST_EVENT_ID = "last_event_id";
public static final String COLUMN_TYPE = "type";
public static final String COLUMN_STATE = "state";
public static final String COLUMN_DATE_CREATED = "tmst_created";

Expand All @@ -28,6 +29,9 @@ public class TracingBatchEntity {
@Column(name = COLUMN_LAST_EVENT_ID)
private Long lastEventId;

@Column(name = COLUMN_TYPE)
private String type;

@Column(name = COLUMN_STATE)
private String state;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
package it.pagopa.interop.signalhub.updater.execution;

import it.pagopa.interop.signalhub.updater.controller.AutoUpdaterController;
import it.pagopa.interop.signalhub.updater.utility.Const;
import it.pagopa.interop.signalhub.updater.utility.Predicates;
import lombok.AllArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

@Profile("!test")
@Component
@AllArgsConstructor
public class BatchTaskExecutor implements CommandLineRunner {
@Autowired
private AutoUpdaterController autoUpdaterController;

private Environment env;

@Override
public void run(String... args) throws Exception {
autoUpdaterController.scheduleUpdater();
if (Predicates.isCorrectApplicationType(env)) {
String applicationType = env.getProperty(Const.APPLICATION_TYPE_ARG);
autoUpdaterController.scheduleUpdater(applicationType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


public interface InteroperabilityClient {
Events getEventsFromId(Long lastEventId);
Events getEventsFromIdAndType(Long lastEventId, String type);
Agreement getAgreement(String agreementId);
EService getEService(String eserviceId);
EServiceDescriptor getEServiceDescriptor(String eserviceId, String descriptorId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import it.pagopa.interop.signalhub.updater.externalclient.InteroperabilityClient;
import it.pagopa.interop.signalhub.updater.generated.openapi.client.interop.api.v1.GatewayApi;
import it.pagopa.interop.signalhub.updater.generated.openapi.client.interop.model.v1.*;
import it.pagopa.interop.signalhub.updater.utility.Const;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatusCode;
Expand All @@ -27,13 +28,23 @@ public class InteroperabilityClientImpl implements InteroperabilityClient {


@Override
public Events getEventsFromId(Long lastEventId) {
return gatewayApi.getEventsFromId(lastEventId, MAX_LIMIT_BLOCK)
.retryWhen(
Retry.backoff(4, Duration.ofMillis(1000))
.filter(this.isRetryException())
)
.block();
public Events getEventsFromIdAndType(Long lastEventId, String type) {
if (type.equals(Const.ESERVICE_EVENT)){
return gatewayApi.getEservicesEventsFromId(lastEventId, MAX_LIMIT_BLOCK)
.retryWhen(
Retry.backoff(4, Duration.ofMillis(1000))
.filter(this.isRetryException())
)
.block();
} else if (type.equals(Const.AGREEMENT_EVENT)) {
return gatewayApi.getAgreementsEventsFromId(lastEventId, MAX_LIMIT_BLOCK)
.retryWhen(
Retry.backoff(4, Duration.ofMillis(1000))
.filter(this.isRetryException())
)
.block();
}
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ public int hashCode() {
@Override
public boolean equals(Object obj) {
if (obj instanceof AgreementEventDto other){
return this.agreementId.equals(other.agreementId);
return this.agreementId.equals(other.agreementId)
&& super.getObjectType().equals(other.getObjectType());
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ public int hashCode() {
@Override
public boolean equals(Object obj) {
if (obj instanceof EServiceEventDto other){
return this.eServiceId.equals(other.eServiceId);
return this.eServiceId.equals(other.eServiceId) && super.getDescriptorId().equals(other.getDescriptorId())
&& super.getObjectType().equals(other.getObjectType());
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@

@Repository
public interface TracingBatchRepository extends JpaRepository<TracingBatchEntity, Long> {
@Query("select trace from TracingBatchEntity trace where trace.lastEventId = (select MAX(t.lastEventId) from TracingBatchEntity t) order by trace.tmstCreated desc" )
List<TracingBatchEntity> findByStateAndLastEventIdMax();
@Query("select trace from TracingBatchEntity trace where trace.lastEventId = (select MAX(t.lastEventId) from TracingBatchEntity t where t.type = :type) order by trace.tmstCreated desc" )
List<TracingBatchEntity> findByStateAndLastEventIdMaxAndType(String type);

@Query("SELECT trace FROM TracingBatchEntity trace WHERE trace.state = :state AND trace.lastEventId = :lastEventId")
List<TracingBatchEntity> findAllStateEndedWithErrorAndLastEventId(String state, Long lastEventId);
@Query("SELECT trace FROM TracingBatchEntity trace WHERE trace.state = :state AND trace.lastEventId = :lastEventId AND trace.type = :type")
List<TracingBatchEntity> findAllStateEndedWithErrorAndLastEventIdAndType(String state, Long lastEventId, String type);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
import it.pagopa.interop.signalhub.updater.model.EventDto;

public interface DeadEventService {
DeadEvent saveDeadEvent(EventDto eventDto);
DeadEvent saveDeadEvent(EventDto eventDto, String type);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import it.pagopa.interop.signalhub.updater.model.OrganizationEServiceDto;

public interface InteropService {
EventsDto getAgreementsAndEServices(Long lastEventId);
EventsDto getEventsByType(Long lastEventId, String type);
ConsumerEServiceDto getConsumerEService(String agreementId, Long eventId);
OrganizationEServiceDto getEService(String eserviceId, Long eventId);
OrganizationEServiceDto getEServiceDescriptor(OrganizationEServiceDto organizationEServiceDto);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


public interface TracingBatchService {
Long getLastEventIdByTracingBatch();
TracingBatchDto terminateTracingBatch(TracingBatchStateEnum stateEnum, Long eventId);
Integer countBatchInErrorWithLastEventId(Long lastEventId);
Long getLastEventIdByTracingBatchAndType(String type);
TracingBatchDto terminateTracingBatch(TracingBatchStateEnum stateEnum, Long eventId, String type);
Integer countBatchInErrorWithLastEventIdAndType(Long lastEventId, String type);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ public class DeadEventServiceImpl implements DeadEventService {


@Override
public DeadEvent saveDeadEvent(EventDto eventDto) {
Integer occurrence = tracingBatchService.countBatchInErrorWithLastEventId(eventDto.getEventId()-1);
public DeadEvent saveDeadEvent(EventDto eventDto, String type) {
Integer occurrence = tracingBatchService.countBatchInErrorWithLastEventIdAndType(eventDto.getEventId()-1, type);
if((occurrence+1) >= registryUpdaterProps.getAttemptEvent()) {
deadEventRepository.saveAndFlush(deadEventMapperImpl.toDeadEvent(eventDto));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import it.pagopa.interop.signalhub.updater.mapper.OrganizationEServiceMapper;
import it.pagopa.interop.signalhub.updater.model.*;
import it.pagopa.interop.signalhub.updater.service.InteropService;
import it.pagopa.interop.signalhub.updater.utility.Predicates;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClientRequestException;
import org.springframework.web.reactive.function.client.WebClientResponseException;
Expand All @@ -34,13 +34,13 @@ public class InteropServiceImpl implements InteropService {


@Override
public EventsDto getAgreementsAndEServices(Long lastEventId) {
public EventsDto getEventsByType(Long lastEventId, String type) {
Events response = null;
try {
log.info("Rerieving events from {} eventId", lastEventId);
response = client.getEventsFromId(lastEventId);
response = client.getEventsFromIdAndType(lastEventId, type);
} catch (WebClientRequestException ex) {
throw new PDNDConnectionResetException("Connection token was expired", lastEventId + 1);
throw new PDNDConnectionResetException("Connection token was expired {}", lastEventId + 1);
} catch (WebClientResponseException ex) {
throw new PDNDEventException("Error with retrieve events", lastEventId);
}
Expand All @@ -53,8 +53,8 @@ public EventsDto getAgreementsAndEServices(Long lastEventId) {
log.info("Total events retrieved {}", response.getEvents().size());
Set<EventDto> events = response.getEvents()
.parallelStream()
.filter(Predicates.isAgreementOrEServiceEvent())
.map(this::toEventDto)
.filter(event -> StringUtils.isNotBlank(event.getDescriptorId()))
.collect(Collectors.toSet());

log.info("Total events filtered {}", events.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,19 @@ public class OrganizationServiceImpl implements OrganizationService {

@Override
public OrganizationEServiceDto updateOrganizationEService(EServiceEventDto eServiceEventDTO) {
log.info("[{} - {} - {}] Retrieving detail eservice...", eServiceEventDTO.getEventId(), eServiceEventDTO.getEServiceId(), eServiceEventDTO.getDescriptorId());
log.info("[{} - {} - {}] Retrieving producerID eservice...", eServiceEventDTO.getEventId(), eServiceEventDTO.getEServiceId(), eServiceEventDTO.getDescriptorId());
//Only for retrieving Producer ID
OrganizationEServiceDto detailEservice = this.interopService.getEService(eServiceEventDTO.getEServiceId(), eServiceEventDTO.getEventId());
detailEservice.setDescriptorId(eServiceEventDTO.getDescriptorId());

log.info("[{} - {} - {}] Detail eservice retrieved with state {}", detailEservice.getEventId(), detailEservice.getEserviceId(), detailEservice.getDescriptorId(), detailEservice.getState());
log.info("[{} - {} - {}] Detail eservice retrieved with producerID {}", detailEservice.getEventId(), detailEservice.getEserviceId(), detailEservice.getDescriptorId(), detailEservice.getProducerId());

log.info("[{} - {} - {}] Retrieving state of eservice...", eServiceEventDTO.getEventId(), eServiceEventDTO.getEServiceId(), eServiceEventDTO.getDescriptorId());
//Only setting eservices state
detailEservice = this.interopService.getEServiceDescriptor(detailEservice);

log.info("[{} - {} - {}] Detail eservice retrieved with state {}", detailEservice.getEventId(), detailEservice.getEserviceId(), detailEservice.getDescriptorId(), detailEservice.getState());

OrganizationEService entity = this.repository.findByEserviceIdAndProducerIdAndDescriptorId(detailEservice.getEserviceId(), detailEservice.getProducerId())
.orElse(getInitialEService(detailEservice));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ public class TracingBatchServiceImpl implements TracingBatchService {
private final TracingBatchMapper mapper;
private final RegistryUpdaterProps props;

public Integer countBatchInErrorWithLastEventId(Long lastEventId) {
@Override
public Integer countBatchInErrorWithLastEventIdAndType(Long lastEventId, String type) {
log.info("Retrieve a number of batch in error with event Id {}", lastEventId);
List<TracingBatchEntity> list = repository.findAllStateEndedWithErrorAndLastEventId(TracingBatchStateEnum.ENDED_WITH_ERROR.name(), lastEventId);
List<TracingBatchEntity> list = repository.findAllStateEndedWithErrorAndLastEventIdAndType(TracingBatchStateEnum.ENDED_WITH_ERROR.name(), lastEventId, type);
if (list == null || list.isEmpty()) return 0;
log.info("{} batch in error with event id {}", list.size(), lastEventId);
return list.size();
}

public Long getLastEventIdByTracingBatch() {
List<TracingBatchEntity> list = repository.findByStateAndLastEventIdMax();
@Override
public Long getLastEventIdByTracingBatchAndType(String type) {
List<TracingBatchEntity> list = repository.findByStateAndLastEventIdMaxAndType(type);
if (list.isEmpty()) return 0L;
if (list.get(0).getState().equals(TracingBatchStateEnum.ENDED.name())){
return list.get(0).getLastEventId();
Expand All @@ -44,13 +46,14 @@ public Long getLastEventIdByTracingBatch() {
}

@Override
public TracingBatchDto terminateTracingBatch(TracingBatchStateEnum stateEnum, Long eventId) {
public TracingBatchDto terminateTracingBatch(TracingBatchStateEnum stateEnum, Long eventId, String type) {
TracingBatchEntity tracingBatchEntity = new TracingBatchEntity();
tracingBatchEntity.setTmstCreated(Timestamp.from(Instant.now()));
tracingBatchEntity.setLastEventId(eventId-1);
tracingBatchEntity.setState(stateEnum.name());
tracingBatchEntity.setType(type);
tracingBatchEntity = this.repository.saveAndFlush(tracingBatchEntity);
log.debug("Create a tracing batch entity {}", tracingBatchEntity);
log.debug("Create a tracing batch entity of type {} {}", type, tracingBatchEntity);
return mapper.toDto(tracingBatchEntity);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ public class Const {

private Const() {}

public static final String APPLICATION_TYPE_ARG = "application.type";
public static final String AGREEMENT_EVENT = "AGREEMENT";
public static final String ESERVICE_EVENT = "ESERVICE";
public static final String AGREEMENT_KEY_ID = "agreementId";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
import it.pagopa.interop.signalhub.updater.model.EServiceEventDto;
import it.pagopa.interop.signalhub.updater.model.EventDto;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.env.Environment;

import java.util.function.Predicate;

import static it.pagopa.interop.signalhub.updater.utility.Const.AGREEMENT_EVENT;
import static it.pagopa.interop.signalhub.updater.utility.Const.ESERVICE_EVENT;
import static it.pagopa.interop.signalhub.updater.utility.Const.*;

@Slf4j
public class Predicates {
Expand All @@ -28,4 +29,12 @@ public static Predicate<EventDto> isEServiceEvent() {
}


public static boolean isCorrectApplicationType(Environment envs) {
if (envs.containsProperty(APPLICATION_TYPE_ARG)){
String applicationType = envs.getProperty(APPLICATION_TYPE_ARG);
return StringUtils.equals(ESERVICE_EVENT, applicationType) || StringUtils.equals(AGREEMENT_EVENT, applicationType);
}
return false;
}

}
1 change: 1 addition & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
logging.level.root=INFO
spring.main.web-application-type=none
spring.profiles.active=local


pdnd.client.event.endpoint-url=https://api.uat.interop.pagopa.it/1.0
Expand Down
Loading

0 comments on commit 2012e3e

Please sign in to comment.