Skip to content

Commit

Permalink
fix(graphql/upsertIngestionSource): Validate cron schedule; parse err…
Browse files Browse the repository at this point in the history
…or in CLI (#11011)
  • Loading branch information
asikowitz committed Jul 26, 2024
1 parent 1f7c92b commit bc75f7a
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 57 deletions.
1 change: 1 addition & 0 deletions datahub-graphql-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {
implementation externalDependency.opentelemetryAnnotations

implementation externalDependency.slf4jApi
implementation externalDependency.springContext
compileOnly externalDependency.lombok
annotationProcessor externalDependency.lombok

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.net.URISyntaxException;
import java.time.DateTimeException;
import java.time.ZoneId;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.support.CronExpression;

/** Creates or updates an ingestion source. Requires the MANAGE_INGESTION privilege. */
@Slf4j
Expand All @@ -46,55 +50,51 @@ public UpsertIngestionSourceResolver(final EntityClient entityClient) {
public CompletableFuture<String> get(final DataFetchingEnvironment environment) throws Exception {
final QueryContext context = environment.getContext();

return GraphQLConcurrencyUtils.supplyAsync(
() -> {
if (IngestionAuthUtils.canManageIngestion(context)) {

final Optional<String> ingestionSourceUrn =
Optional.ofNullable(environment.getArgument("urn"));
final UpdateIngestionSourceInput input =
bindArgument(environment.getArgument("input"), UpdateIngestionSourceInput.class);
if (!IngestionAuthUtils.canManageIngestion(context)) {
throw new AuthorizationException(
"Unauthorized to perform this action. Please contact your DataHub administrator.");
}
final Optional<String> ingestionSourceUrn = Optional.ofNullable(environment.getArgument("urn"));
final UpdateIngestionSourceInput input =
bindArgument(environment.getArgument("input"), UpdateIngestionSourceInput.class);

// Create the policy info.
final DataHubIngestionSourceInfo info = mapIngestionSourceInfo(input);
final MetadataChangeProposal proposal;
if (ingestionSourceUrn.isPresent()) {
// Update existing ingestion source
try {
proposal =
buildMetadataChangeProposalWithUrn(
Urn.createFromString(ingestionSourceUrn.get()),
INGESTION_INFO_ASPECT_NAME,
info);
} catch (URISyntaxException e) {
throw new DataHubGraphQLException(
String.format("Malformed urn %s provided.", ingestionSourceUrn.get()),
DataHubGraphQLErrorCode.BAD_REQUEST);
}
} else {
// Create new ingestion source
// Since we are creating a new Ingestion Source, we need to generate a unique UUID.
final UUID uuid = UUID.randomUUID();
final String uuidStr = uuid.toString();
final DataHubIngestionSourceKey key = new DataHubIngestionSourceKey();
key.setId(uuidStr);
proposal =
buildMetadataChangeProposalWithKey(
key, INGESTION_SOURCE_ENTITY_NAME, INGESTION_INFO_ASPECT_NAME, info);
}
// Create the policy info.
final DataHubIngestionSourceInfo info = mapIngestionSourceInfo(input);
final MetadataChangeProposal proposal;
if (ingestionSourceUrn.isPresent()) {
// Update existing ingestion source
try {
proposal =
buildMetadataChangeProposalWithUrn(
Urn.createFromString(ingestionSourceUrn.get()), INGESTION_INFO_ASPECT_NAME, info);
} catch (URISyntaxException e) {
throw new DataHubGraphQLException(
String.format("Malformed urn %s provided.", ingestionSourceUrn.get()),
DataHubGraphQLErrorCode.BAD_REQUEST);
}
} else {
// Create new ingestion source
// Since we are creating a new Ingestion Source, we need to generate a unique UUID.
final UUID uuid = UUID.randomUUID();
final String uuidStr = uuid.toString();
final DataHubIngestionSourceKey key = new DataHubIngestionSourceKey();
key.setId(uuidStr);
proposal =
buildMetadataChangeProposalWithKey(
key, INGESTION_SOURCE_ENTITY_NAME, INGESTION_INFO_ASPECT_NAME, info);
}

try {
return _entityClient.ingestProposal(context.getOperationContext(), proposal, false);
} catch (Exception e) {
throw new RuntimeException(
String.format(
"Failed to perform update against ingestion source with urn %s",
input.toString()),
e);
}
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
try {
return _entityClient.ingestProposal(context.getOperationContext(), proposal, false);
} catch (Exception e) {
throw new RuntimeException(
String.format(
"Failed to perform update against ingestion source with urn %s",
input.toString()),
e);
}
throw new AuthorizationException(
"Unauthorized to perform this action. Please contact your DataHub administrator.");
},
this.getClass().getSimpleName(),
"get");
Expand Down Expand Up @@ -137,9 +137,38 @@ private DataHubIngestionSourceConfig mapConfig(final UpdateIngestionSourceConfig

private DataHubIngestionSourceSchedule mapSchedule(
final UpdateIngestionSourceScheduleInput input) {

final String modifiedCronInterval = adjustCronInterval(input.getInterval());
try {
CronExpression.parse(modifiedCronInterval);
} catch (IllegalArgumentException e) {
throw new DataHubGraphQLException(
String.format("Invalid cron schedule `%s`: %s", input.getInterval(), e.getMessage()),
DataHubGraphQLErrorCode.BAD_REQUEST);
}
try {
ZoneId.of(input.getTimezone());
} catch (DateTimeException e) {
throw new DataHubGraphQLException(
String.format("Invalid timezone `%s`: %s", input.getTimezone(), e.getMessage()),
DataHubGraphQLErrorCode.BAD_REQUEST);
}

final DataHubIngestionSourceSchedule result = new DataHubIngestionSourceSchedule();
result.setInterval(input.getInterval());
result.setTimezone(input.getTimezone());
return result;
}

// Copied from IngestionScheduler.java
private String adjustCronInterval(final String origCronInterval) {
Objects.requireNonNull(origCronInterval, "origCronInterval must not be null");
// Typically we support 5-character cron. Spring's lib only supports 6 character cron so we make
// an adjustment here.
final String[] originalCronParts = origCronInterval.split(" ");
if (originalCronParts.length == 5) {
return String.format("0 %s", origCronInterval);
}
return origCronInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.testng.Assert.*;

import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.exception.DataHubGraphQLException;
import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceConfigInput;
import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceInput;
import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceScheduleInput;
Expand All @@ -22,14 +23,17 @@

public class UpsertIngestionSourceResolverTest {

private static final UpdateIngestionSourceInput TEST_INPUT =
new UpdateIngestionSourceInput(
"Test source",
"mysql",
"Test source description",
new UpdateIngestionSourceScheduleInput("* * * * *", "UTC"),
new UpdateIngestionSourceConfigInput(
"my test recipe", "0.8.18", "executor id", false, null));
private static final UpdateIngestionSourceInput TEST_INPUT = makeInput();

private static UpdateIngestionSourceInput makeInput() {
return new UpdateIngestionSourceInput(
"Test source",
"mysql",
"Test source description",
new UpdateIngestionSourceScheduleInput("* * * * *", "UTC"),
new UpdateIngestionSourceConfigInput(
"my test recipe", "0.8.18", "executor id", false, null));
}

@Test
public void testGetSuccess() throws Exception {
Expand Down Expand Up @@ -104,4 +108,54 @@ public void testGetEntityClientException() throws Exception {

assertThrows(RuntimeException.class, () -> resolver.get(mockEnv).join());
}

@Test
public void testUpsertWithInvalidCron() throws Exception {
final UpdateIngestionSourceInput input = makeInput();
input.setSchedule(new UpdateIngestionSourceScheduleInput("* * * * 123", "UTC"));

// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);
UpsertIngestionSourceResolver resolver = new UpsertIngestionSourceResolver(mockClient);

// Execute resolver
QueryContext mockContext = getMockAllowContext();
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
Mockito.when(mockEnv.getArgument(Mockito.eq("urn")))
.thenReturn(TEST_INGESTION_SOURCE_URN.toString());
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input);
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);

assertThrows(DataHubGraphQLException.class, () -> resolver.get(mockEnv).join());
Mockito.verify(mockClient, Mockito.times(0)).ingestProposal(any(), any(), anyBoolean());

input.setSchedule(new UpdateIngestionSourceScheduleInput("null", "UTC"));
assertThrows(DataHubGraphQLException.class, () -> resolver.get(mockEnv).join());
Mockito.verify(mockClient, Mockito.times(0)).ingestProposal(any(), any(), anyBoolean());
}

@Test
public void testUpsertWithInvalidTimezone() throws Exception {
final UpdateIngestionSourceInput input = makeInput();
input.setSchedule(new UpdateIngestionSourceScheduleInput("* * * * *", "Invalid"));

// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);
UpsertIngestionSourceResolver resolver = new UpsertIngestionSourceResolver(mockClient);

// Execute resolver
QueryContext mockContext = getMockAllowContext();
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
Mockito.when(mockEnv.getArgument(Mockito.eq("urn")))
.thenReturn(TEST_INGESTION_SOURCE_URN.toString());
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input);
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);

assertThrows(DataHubGraphQLException.class, () -> resolver.get(mockEnv).join());
Mockito.verify(mockClient, Mockito.times(0)).ingestProposal(any(), any(), anyBoolean());

input.setSchedule(new UpdateIngestionSourceScheduleInput("* * * * *", "America/Los_Angel"));
assertThrows(DataHubGraphQLException.class, () -> resolver.get(mockEnv).join());
Mockito.verify(mockClient, Mockito.times(0)).ingestProposal(any(), any(), anyBoolean());
}
}
16 changes: 14 additions & 2 deletions metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import datahub as datahub_package
from datahub.cli import cli_utils
from datahub.cli.config_utils import CONDENSED_DATAHUB_CONFIG_PATH
from datahub.configuration.common import ConfigModel
from datahub.configuration.common import ConfigModel, GraphError
from datahub.configuration.config_loader import load_config_file
from datahub.emitter.mce_builder import datahub_guid
from datahub.ingestion.graph.client import get_default_graph
Expand Down Expand Up @@ -372,7 +372,19 @@ def deploy(
"""
)

response = datahub_graph.execute_graphql(graphql_query, variables=variables)
try:
response = datahub_graph.execute_graphql(
graphql_query, variables=variables, format_exception=False
)
except GraphError as graph_error:
try:
error = json.loads(str(graph_error).replace('"', '\\"').replace("'", '"'))
click.secho(error[0]["message"], fg="red", err=True)
except Exception:
click.secho(
f"Could not create ingestion source:\n{graph_error}", fg="red", err=True
)
sys.exit(1)

click.echo(
f"✅ Successfully wrote data ingestion source metadata for recipe {deploy_options.name}:"
Expand Down
6 changes: 5 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1111,6 +1111,7 @@ def execute_graphql(
query: str,
variables: Optional[Dict] = None,
operation_name: Optional[str] = None,
format_exception: bool = True,
) -> Dict:
url = f"{self.config.server}/api/graphql"

Expand All @@ -1127,7 +1128,10 @@ def execute_graphql(
)
result = self._post_generic(url, body)
if result.get("errors"):
raise GraphError(f"Error executing graphql query: {result['errors']}")
if format_exception:
raise GraphError(f"Error executing graphql query: {result['errors']}")
else:
raise GraphError(result["errors"])

return result["data"]

Expand Down

0 comments on commit bc75f7a

Please sign in to comment.