Skip to content

Conversation

yunyad
Copy link
Contributor

@yunyad yunyad commented Aug 19, 2025

This PR implements basic operator-level parallelism optimization by modifying the GUI interface for UDFs (User Defined Functions). It corresponds to [PR 2] in the Basic Ramen plan. The details and context are discussed in issue #3605.

The Basic Ramen strategy assumes that between two executions of the same workflow, the workflow structure remains unchanged. This allows us to reuse past runtime statistics for optimizing operator-level resource allocation (e.g., worker count).

The full implementation will be split into two PRs:

  • PR 1 (this PR): Add UI and backend support for operator-level parallelism in UDFs
  • PR 2: Extend support to all other parallelizable operators

Key Changes in This PR

Updated UDF UI:

  • Added number-of-workers input field to the UDF operator panel
  • Ensures users can configure parallelism directly through the UI

Backend Modifications:

  • Refactored ResourceAllocator to support configurable parallelism
  • Implemented GreedyResourceAllocator to select parallelism level based on historical runtime
  • Integrated GreedyResourceAllocator with the UDF operator execution logic

Configuration Support:

  • Added workflow-level flags to enable/disable GreedyResourceAllocator
  • Allows flexible toggling of Basic Ramen mode

@yunyad yunyad changed the title feat(amber): add basic operator-level resource allocator feat(amber): [PR 1/2] Add Basic Ramen Support for UDF Operators Aug 19, 2025
@yunyad yunyad self-assigned this Aug 20, 2025
Copy link
Contributor

@Yicong-Huang Yicong-Huang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the PR is a bit too large. better to split it into two PRs: one for user interface change, one for new allocator implementation.

}


private def readStatsFromUri(uriStr: String): Map[String, (Double, Int)] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is not clear at all. what stats? what is the uri pointing to? please clarify by renaming and add comments.

val document = DocumentFactory.openDocument(uri)

document._1.get().foldLeft(Map.empty[String, (Double, Int)]) { (acc, tuple) =>
val record = tuple.asInstanceOf[Tuple]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is a record? please give meaningful naming.

* represented as a Double value (currently set to 0, but will be
* updated in the future).
* @param region Region to allocate.
* @return (updated Region, estimated cost)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per comments in #3660, we hope to only return resourceConfig instead of the updated region.

operatorConfigs: Map[PhysicalOpIdentity, OperatorConfig],
seedLinkPartitions: Map[PhysicalLink, PartitionInfo] = Map.empty
): Map[PhysicalLink, PartitionInfo] = {
val linkPartitionInfos = mutable.HashMap[PhysicalLink, PartitionInfo]() ++= seedLinkPartitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you saving a copy of the link partitions inside this method? the return type is already a map of partition infos. why do you need to pass an input map seedLinkPartitions?


schedule-generator {
max-concurrent-regions = 1
max-concurrent-regions = 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we change this default value?

Comment on lines +64 to +68
@JsonProperty(required = true, defaultValue = "true")
@JsonSchemaTitle("Parallelizable?")
@JsonPropertyDescription("Default: True")
@JsonSchemaInject(json = """{"toggleHidden" : ["advanced"]}""")
val parallelizable: Boolean = Boolean.box(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit against this three-step design. Why do we ask users to click a check box (parallelizible), then click another one (advanced), then provide a number? This is way too complicated. Can we simplify it?

Comment on lines +127 to +152
PhysicalOp
.oneToOnePhysicalOp(
workflowId,
executionId,
operatorIdentifier,
OpExecWithCode(code, "java")
)
.withDerivePartition(_ => UnknownPartition())
.withInputPorts(operatorInfo.inputPorts)
.withOutputPorts(operatorInfo.outputPorts)
.withPartitionRequirement(partitionRequirement)
.withIsOneToManyOp(true)
.withParallelizable(true)
.withSuggestedWorkerNum(workers)
.withPropagateSchema(SchemaPropagationFunc(propagateSchema))
} else {
PhysicalOp
.oneToOnePhysicalOp(
workflowId,
executionId,
operatorIdentifier,
OpExecWithCode(code, "java")
)
.withDerivePartition(_ => UnknownPartition())
.withInputPorts(operatorInfo.inputPorts)
.withOutputPorts(operatorInfo.outputPorts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge the common code. only apply a difference part (i.e., .withParallelizable(true)) to different cases

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see pythonUDFSourceOpDescV2 for example.

Comment on lines +99 to +117
val physicalOp = if (parallelizable) {
if (advanced) {
PhysicalOp
.oneToOnePhysicalOp(
workflowId,
executionId,
operatorIdentifier,
OpExecWithCode(code, "python")
)
.withSuggestedWorkerNum(workers)
} else {
PhysicalOp
.oneToOnePhysicalOp(
workflowId,
executionId,
operatorIdentifier,
OpExecWithCode(code, "python")
)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Comment on lines +128 to +146
val physicalOp = if (parallelizable) {
if (advanced) {
PhysicalOp
.oneToOnePhysicalOp(
workflowId,
executionId,
operatorIdentifier,
OpExecWithCode(code, "python")
)
.withSuggestedWorkerNum(workers)
} else {
PhysicalOp
.oneToOnePhysicalOp(
workflowId,
executionId,
operatorIdentifier,
OpExecWithCode(code, "python")
)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Comment on lines +128 to +146
if (parallelizable) {
if (advanced) {
PhysicalOp
.oneToOnePhysicalOp(
workflowId,
executionId,
operatorIdentifier,
OpExecWithCode(code, r_operator_type)
)
} else {
PhysicalOp
.oneToOnePhysicalOp(
workflowId,
executionId,
operatorIdentifier,
OpExecWithCode(code, r_operator_type)
)
.withSuggestedWorkerNum(workers)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

@yunyad yunyad changed the title feat(amber): [PR 1/2] Add Basic Ramen Support for UDF Operators feat(amber): Add Basic Ramen Support for UDF Operators Aug 20, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants