@@ -78,6 +78,7 @@ def bind_input(self, schema, datum, lead_pos=None, tail_pos=None):
7878 lead_pos = []
7979 bindings = [] # type: List[Dict[Text,Text]]
8080 binding = None # type: Dict[Text,Any]
81+ value_from_expression = False
8182 if "inputBinding" in schema and isinstance (schema ["inputBinding" ], dict ):
8283 binding = copy .copy (schema ["inputBinding" ])
8384
@@ -87,29 +88,33 @@ def bind_input(self, schema, datum, lead_pos=None, tail_pos=None):
8788 binding ["position" ] = aslist (lead_pos ) + [0 ] + aslist (tail_pos )
8889
8990 binding ["datum" ] = datum
91+ if "valueFrom" in binding :
92+ value_from_expression = True
9093
9194 # Handle union types
9295 if isinstance (schema ["type" ], list ):
93- for t in schema ["type" ]:
94- if isinstance (t , (str , Text )) and self .names .has_name (t , "" ):
95- avsc = self .names .get_name (t , "" )
96- elif isinstance (t , dict ) and "name" in t and self .names .has_name (t ["name" ], "" ):
97- avsc = self .names .get_name (t ["name" ], "" )
98- else :
99- avsc = AvroSchemaFromJSONData (t , self .names )
100- if validate .validate (avsc , datum ):
101- schema = copy .deepcopy (schema )
102- schema ["type" ] = t
103- return self .bind_input (schema , datum , lead_pos = lead_pos , tail_pos = tail_pos )
104- raise validate .ValidationException (u"'%s' is not a valid union %s" % (datum , schema ["type" ]))
96+ if not value_from_expression :
97+ for t in schema ["type" ]:
98+ if isinstance (t , (str , Text )) and self .names .has_name (t , "" ):
99+ avsc = self .names .get_name (t , "" )
100+ elif isinstance (t , dict ) and "name" in t and self .names .has_name (t ["name" ], "" ):
101+ avsc = self .names .get_name (t ["name" ], "" )
102+ else :
103+ avsc = AvroSchemaFromJSONData (t , self .names )
104+ if validate .validate (avsc , datum ):
105+ schema = copy .deepcopy (schema )
106+ schema ["type" ] = t
107+ return self .bind_input (schema , datum , lead_pos = lead_pos , tail_pos = tail_pos )
108+ raise validate .ValidationException (u"'%s' is not a valid union %s" % (datum , schema ["type" ]))
105109 elif isinstance (schema ["type" ], dict ):
106- st = copy .deepcopy (schema ["type" ])
107- if binding and "inputBinding" not in st and st ["type" ] == "array" and "itemSeparator" not in binding :
108- st ["inputBinding" ] = {}
109- for k in ("secondaryFiles" , "format" , "streamable" ):
110- if k in schema :
111- st [k ] = schema [k ]
112- bindings .extend (self .bind_input (st , datum , lead_pos = lead_pos , tail_pos = tail_pos ))
110+ if not value_from_expression :
111+ st = copy .deepcopy (schema ["type" ])
112+ if binding and "inputBinding" not in st and st ["type" ] == "array" and "itemSeparator" not in binding :
113+ st ["inputBinding" ] = {}
114+ for k in ("secondaryFiles" , "format" , "streamable" ):
115+ if k in schema :
116+ st [k ] = schema [k ]
117+ bindings .extend (self .bind_input (st , datum , lead_pos = lead_pos , tail_pos = tail_pos ))
113118 else :
114119 if schema ["type" ] in self .schemaDefs :
115120 schema = self .schemaDefs [schema ["type" ]]
0 commit comments