Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Accessing DO_WHILE 'iteration' value inside the the looped tasks. #1677

Closed
krishnaputhran opened this issue May 11, 2020 · 23 comments
Closed
Labels

Comments

@krishnaputhran
Copy link

Hi, anyone know how to access the iterator value inside the tasks, which are running as part of the DO_WHILE loop. For eg: I want to read an array value inside the running tasks. The array would be an input to the looped task from another task output, which was executed before the loop started. I am not sure how we can use the DO_WHILE 'iteration' variable for reading the array values.

I want to use the 'iteration' value something like below inside a task defined inside the DO_WHILE loop. Here 'preOrder_service' is task which runs before the the loop starts and 'products' is an array.

preOrder_service_get.output.response.body..products['iteration'].pluCode

@manan164
Copy link
Contributor

Hi @krishnaputhran ,
Please check the documentation here https://netflix.github.io/conductor/configuration/systask/#do-while-task.
You can use the LAMBDA task to parse the input array that is coming from the above task.
Here is the sample json that might help.

{
    "name": "test_loopTask",
    "taskReferenceName": "test_loopTask",
    "inputParameters": {
        "orders_count": "${workflow.input.orders.size}"
    },
    "type": "DO_WHILE",
    "loopCondition": "if ($.test_loopTask['iteration'] <= $.orders_count) { true; } else { false; }",
    "loopOver": [
        {
            "name": "get_order_at_index",
            "taskReferenceName": "get_order_at_index",
            "inputParameters": {
                "orders": "${workflow.input.orders}",
                "iterator": "$.test_loopTask['iteration']",
                "scriptExpression": "return $.orders.get($.iterator);"
            },
            "type": "LAMBDA"
        }
    ]
}

@krishnaputhran
Copy link
Author

krishnaputhran commented May 11, 2020

Hi @manan164 thank you very much for the response and I appreciate your timely response. I have tried reading the do-while-task documentation, but couldn't get answer for my problem, but somehow I didn't consider the use fo LAMDA tasks.
I tried to follow your suggestion. But somehow, I feel that LAMDA task inside the loop is getting executed only once and hence same output from the first execution is getting passed in subsequent executions without getting the next value of the array. Please see my test workflow def and wrorkflow start scripts.

WorkfFlow Definition

{
"name": "test_do_while_workflow",
"description": "test_do_while_workflow",
"tasks": [
{
"name": "LoopTask",
"taskReferenceName": "LoopTask",
"type": "DO_WHILE",
"inputParameters": {
"preOrderListSize": "${workflow.input.value.response.body.products.length()}"
},
"loopCondition": "if ( $.LoopTask['iteration'] &lt; $.preOrderListSize ) { true; } else { false; }",
"loopOver": [
{
"name": "get_order_at_index",
"taskReferenceName": "get_order_at_index",
"inputParameters": {
"orders": "${workflow.input.value.response.body.products}",
"iterator": "$.LoopTask['iteration']",
"scriptExpression": "return $.orders.get($.iterator);"
},
"type": "LAMBDA"
}
],
}
]
}

Input to the workflow

{
"name": "test_do_while_workflow",
"correlationId": "test_do_while_workflow",
"input": {
"value":{
"response": {
"body": {
"products": [
{
"pluCode": 4900001157498,
"quantity": 2
},
{
"pluCode": 4904230037873,
"quantity": 3
}
]
},
}
}
}
}

Even the Conductor UI shows the execution of LAMDA function only once, although my input array products size is 2.
image

But in the DO-WHILE task output shows two executions but same output from the LAMDA task. If LAMDA executed properly, I would have see pluCode: 4904230037873 also.

DO-WHILE task output from the conductor UI

{
"1": {
"get_order_at_index": {
"result": {
"pluCode": 4900001157498,
"quantity": 2
}
}
},
"2": {
"get_order_at_index": {
"result": {
"pluCode": 4900001157498,
"quantity": 2
}
}
},
"iteration": 2
}

If I add a normal task, like HTTP, after the LAMDA task it gets executed twice and in the conductor UI as the screen shot pasted above, I could see two rows of it after the LAMDA task. I removed it here for reducing the text size.

@krishnaputhran
Copy link
Author

@manan164 I could see this is a defect. By adding a dummy task before this one makes the next task to be executed. So issue is that, in a DO_WHILE loop, the first task in the loopOver is gets executed only once. So a dummy task need to be added to address this issue.

But one more issue I saw. Hope you reply to my concern. As per your sample code, I tried to access the array values inside the loop, but it always gives the 0th array value.

"iterator": "$.test_loopTask['iteration']",
"scriptExpression": "return $.orders.get($.iterator);"

from the above script expression, the value returned is always the zeroth value from the orders.

@manan164
Copy link
Contributor

Hi @krishnaputhran There may be an issue with the sequence number. Please check do_while task output there you will see all loopOvertaks output.
Till the time I will come up with the finding, Please check DYNAMIC_FORK This will solve your purpose. You have to generate input to this task from the previous http task. The only thing is it will execute this task in parallel instead of sequential.

@krishnaputhran
Copy link
Author

krishnaputhran commented May 13, 2020

@manan164 thanks for the response. The do_while task output i have checked, and its has the expected output. Only issue is getting the "iteration" value in subsequent iterations.
I had little doubt regarding the usage of $.test_loopTask['iteration'], because as per jasonpath documentation $ refers to the current object, where in, when inside the task, $ refers to the current task under execution, than the actual root task. So I gave another try as shown below.

	"iterator": "${test_loopTask.output.iteration}",
	"scriptExpression": "return $.orders.get($.iterator)"

to my surprise, this worked, but had some strange issue. During the execution the value of iterator fetched using "${test_loopTask.output.iteration}" goes like this
1,1,2,3....n-1

If there are only two iterations, the iterator will always have the value 1. meaning for the second execution the value of iteration will still be 1 and after second execution the iteration value starts increasing.
But if you check the do..while task output, they are properly stored in the key value pair with 1, 2, 3 as keys.

But I am afraid, as per my reading on DYNAMIC_FORK will solve my problem. My issue is I want to fetch the array value one by one, which forms input to my set of tasks to be executed in sequence. This is a typical looping use-case. My task execution list is not dynamic. Anyway I'll give a try.

@rambusineni
Copy link

@krishnaputhran
Did looping issue got resolved? I used same code that you mentioned above but same object is getting passed in all the iteration.
Please let us know in case if you get answer.
Thanks

@apanicker-nflx
Copy link
Collaborator

@manan164 Can you please chime in on this one? Thanks

@manan164
Copy link
Contributor

Sure. @apanicker-nflx

@FrankOwen
Copy link

Hi @krishnaputhran ,
Please check the documentation here https://netflix.github.io/conductor/configuration/systask/#do-while-task.
You can use the LAMBDA task to parse the input array that is coming from the above task.
Here is the sample json that might help.

{
    "name": "test_loopTask",
    "taskReferenceName": "test_loopTask",
    "inputParameters": {
        "orders_count": "${workflow.input.orders.size}"
    },
    "type": "DO_WHILE",
    "loopCondition": "if ($.test_loopTask['iteration'] <= $.orders_count) { true; } else { false; }",
    "loopOver": [
        {
            "name": "get_order_at_index",
            "taskReferenceName": "get_order_at_index",
            "inputParameters": {
                "orders": "${workflow.input.orders}",
                "iterator": "$.test_loopTask['iteration']",
                "scriptExpression": "return $.orders.get($.iterator);"
            },
            "type": "LAMBDA"
        }
    ]
}

Any update on this issue? This proposed solution does not work, and yields the first element of the array for every iteration.
There seems to be no way to iterate over members of an array.

@elisherer
Copy link
Contributor

There seems to be an error in the workflow task (loopOver[0].inputParameters.iterator), I think it should be ${test_loopTask.iteration} and not $.test_loopTask['iteration']. See:

        {
            "name": "get_order_at_index",
            "taskReferenceName": "get_order_at_index",
            "inputParameters": {
                "orders": "${workflow.input.orders}",
                "iterator": "${test_loopTask.iteration}",
                "scriptExpression": "return $.orders.get($.iterator);"
            },
            "type": "LAMBDA"
        }

@FrankOwen
Copy link

I tried that. It doesn't work either. iterator becomes null, and script still returns the 0 element in the array.
I have a workaround using lambas to track a loop_index that I hack onto workflow.input. This should work.

{
    "name": "init_loop_index",
    "taskReferenceName": "init_loop_index",
    "inputParameters": {
        "wfinput": "${workflow.input}",
        "scriptExpression": "$.wfinput.loop_index=0; return 0;"
    },
    "type": "LAMBDA"
},
{
    "name": "test_loopTask",
    "taskReferenceName": "test_loopTask",
    "inputParameters": {
        "orders_count": "${workflow.input.orders.size}"
    },
    "type": "DO_WHILE",
    "loopCondition": "if ($.test_loopTask['iteration'] <= $.orders_count) { true; } else { false; }",
    "loopOver": [
        {
            "name": "get_order_at_index",
            "taskReferenceName": "get_order_at_index",
            "inputParameters": {
                "orders": "${workflow.input.orders}",
                "wfinput": "${workflow.input}",
                "iterator": "${workflow.input.loop_index}",
                "scriptExpression": "$.wfinput.loop_index++;return $.orders.get($.iterator);"
            },
            "type": "LAMBDA"
        }
    ]
}

@elisherer
Copy link
Contributor

Better not use workflow.input as it is immutable (or at least suppose to be)

I found out that iteration is stored in the DO_WHILE output. see the following:

{
  "schemaVersion": 2,
  "name": "test_do_while_workflow",
  "description": "test_do_while_workflow",
  "tasks": [
    {
      "name": "mock_data",
      "taskReferenceName": "mock_data",
      "inputParameters": {
        "scriptExpression": "var products = new java.util.ArrayList();\nproducts.add({\"pluCode\": 4900001157498,\"quantity\": 2});\nproducts.add({\"pluCode\": 4904230037873,\"quantity\": 3});\nreturn {\"products\": products};"
      },
      "type": "LAMBDA"
    },
    {
      "name": "test_loopTask",
      "taskReferenceName": "test_loopTask",
      "inputParameters": {
        "orders_count": 2
      },
      "type": "DO_WHILE",
      "loopCondition": "$.test_loopTask['iteration'] < $.orders_count",
      "loopOver": [
        {
          "name": "get_order_at_index",
          "taskReferenceName": "get_order_at_index",
          "inputParameters": {
            "orders": "${mock_data.output.result.products}",
            "iterator": "${test_loopTask.output.iteration}",
            "scriptExpression": "return $.orders.get($.iterator);"
          },
          "type": "LAMBDA"
        }
      ]
    }
  ],
  "ownerEmail": "test@example.com",
  "version": 1,
  "outputParameters": {
    "result": "${test_loopTask.output}"
  }
}

Returns the following output:

{
  "result": {
    "1": {
      "get_order_at_index": {
        "result": {
          "pluCode": 4900001157498,
          "quantity": 2
        }
      }
    },
    "2": {
      "get_order_at_index": {
        "result": {
          "pluCode": 4904230037873,
          "quantity": 3
        }
      }
    },
    "iteration": 2
  }
}

@FrankOwen
Copy link

Interesting.. I did try output.iteration, but I found, as reported here by others, that the output.iteration sequences as "1,1,2,3,4" instead of "0,1,2,3,4", which is not usable. perhaps this has been fixed lately? This would def. be preferred to my hack. We are running our own instance of conductor.. perhaps it is old..

@manan164
Copy link
Contributor

Hi @FrankOwen The seq number issue has been fixed via 1815.
We are putting iteration value in DO_WHILE task output Please check documentation https://netflix.github.io/conductor/configuration/systask/#do-while-task.
Do while task output number of iterations with iteration as key and value as number of iterations
Putting it in each loop over task is not needed as we can figure out from DO_WHILE task.

@FrankOwen
Copy link

Thanks, @manan164 . I guess our deployment must be more than 17 days old! I will see about getting updated. Would be good to remove my hack. Putting it in output is a little weird, since I think of 'output' as FINAL value of a task. But I'll take whatever works!

@FrankOwen
Copy link

@manan164 I confirmed we are at 2.27.2, which does not have 1815 fix. Does that also fix the issue where first task only executes once? I'm using a dummy task as workaround.

@github-actions
Copy link
Contributor

github-actions bot commented Apr 7, 2021

This issue is stale, because it has been open for 45 days with no activity. Remove the stale label or comment, or this will be closed in 7 days.

@github-actions github-actions bot added the Stale label Apr 7, 2021
@github-actions
Copy link
Contributor

This issue was closed, because it has been stalled for 7 days with no activity.

@techyragu
Copy link

techyragu commented Aug 26, 2021

Hi @FrankOwen The seq number issue has been fixed via 1815.
We are putting iteration value in DO_WHILE task output Please check documentation https://netflix.github.io/conductor/configuration/systask/#do-while-task.
Do while task output number of iterations with iteration as key and value as number of iterations
Putting it in each loop over task is not needed as we can figure out from DO_WHILE task.

@apanicker-nflx @manan164 - Why do you say, do not use within loopover task?
If I want to use the iterator as an input within loopover task, how will I use it?
I am able to get the iteration from looptask output within loopover task but for the first task it is coming as nil and then subsequent task as 1, 2.
How to fix the nil thing for first iteration?

@techyragu
Copy link

LoopTaskIterationWFDefinitionwithRun.pdf
@manan164 @apanicker-nflx - fyi Attached above WF Run Json

@techyragu
Copy link

@krishnaputhran @manan164 @FrankOwen

workflow.input.value.response.body.products.length()
workflow.input.orders.size

Does the above work to interpolate array size in task input?
I am also using this to calculate length of array for my input parameter but this doesn't seem to work.
Can you please advise?

@Priyank0711
Copy link

Hi, I am trying to execute a task 'A' inside do while task and I want to change the input to task 'A' on every iteration. Can we do this?

@manan164
Copy link
Contributor

manan164 commented Mar 5, 2022

Hi @Priyank0711 , Yes it is possible. Please check @elisherer 's comment, #1677 (comment)

aravindanr added a commit that referenced this issue May 5, 2022
…e DoWhile.execute method. The DO_WHILE TaskModel is persisted before the loop-over tasks are created. Therefore, the loop-over tasks will have access the iteration field in DO_WHILE TaskModel.
v1r3n added a commit to orkes-io/conductor that referenced this issue May 6, 2022
… created in th… (#45)

* Add an ack request for each task polled from queue, when polling in batches

* fixed typos

* remove manual acking in tests, since this done by default

* Fix for Netflix#1677, inspired from Netflix#2881. Loop-over tasks are created in the DoWhile.execute method. The DO_WHILE TaskModel is persisted before the loop-over tasks are created. Therefore, the loop-over tasks will have access the iteration field in DO_WHILE TaskModel.

Co-authored-by: gardusig <gustavo.gardusi@gmail.com>
Co-authored-by: Anoop Panicker <34087882+apanicker-nflx@users.noreply.github.com>
Co-authored-by: Amol Katdare <amol.katdare@gmail.com>
Co-authored-by: Anoop Panicker <apanicker@netflix.com>
Co-authored-by: Aravindan Ramkumar <1028385+aravindanr@users.noreply.github.com>
aravindanr added a commit that referenced this issue May 16, 2022
…e DoWhile.execute method. The DO_WHILE TaskModel is persisted before the loop-over tasks are created. Therefore, the loop-over tasks will have access the iteration field in DO_WHILE TaskModel.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

8 participants