-
Notifications
You must be signed in to change notification settings - Fork 590
Replies: 1 comment · 6 replies
-
Hi, Someone posted/asked something very very similar on slack yesterday.. Is this part of some course, or maybe you are in the same team? Anyway, it might be faster if you go through the vaex tutorial, since there it is explained how vaex works. You don't should not iterate over rows yourself. Is this the line the breaks? ( you should state where the issue happens.. and a reproducible example would be nice, we can't run the example above..) If you want to get outside of vaex, you can use |
Beta Was this translation helpful? Give feedback.
All reactions
-
I tried to refactor to "apply" but get another exception. I'm running out of ideas what I'm doing wrong: def vaex_version(idx_from, idx_to, df1, df2, signals, threshold_max, threshold_min):
"""
Return indices (need to be later transformed to date time index) when a value from series1
is greater than value in series2 (the first occurence).
:params df1: vaex.dataframe.DataFrameLocal with 1 column "series1"
:params df2: vaex.dataframe.DataFrameLocal with 1 column "series2"
:params signals: vaex.dataframe.DataFrameLocal with 1 column "signals"
"""
def fun(idx, val):
window = df2[idx:]
window_selection = window[window.series2 > val + threshold_min]
return window_selection.vindex[0]
indices_above_threshold_min = []
# define range of interest
df1 = df1[idx_from:idx_to]
signals = signals[idx_from:idx_to]
df1_joined = df1.join(signals)
df1_joined["vindex"] = vaex.vrange(0, len(df1), dtype='int64')
# since we don't know at what index the value in series1 will be higher than in series2, we leave the interval open
df2 = df2[idx_from:]
df2["vindex"] = vaex.vrange(0, len(df2), dtype='int64')
# select values of interest
selected_indices = df1_joined[df1_joined.signals >= threshold_max].vindex
df_selection = df1_joined.take(selected_indices.tolist())
return df_selection.apply(fun, arguments=[df_selection.vindex, df_selection.series1])
---------------------------------------------------------------------------
RemoteTraceback Traceback (most recent call last)
RemoteTraceback:
"""
Traceback (most recent call last):
File "/home/toaster/PROGS/miniconda3/envs/puma-lab/lib/python3.7/site-packages/vaex/scopes.py", line 106, in evaluate
result = self[expression]
File "/home/toaster/PROGS/miniconda3/envs/puma-lab/lib/python3.7/site-packages/vaex/scopes.py", line 166, in __getitem__
raise KeyError("Unknown variables or column: %r" % (variable,))
KeyError: "Unknown variables or column: 'lambda_function(vindex, series1)'"
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/toaster/PROGS/miniconda3/envs/puma-lab/lib/python3.7/multiprocessing/pool.py", line 121, in worker
result = (True, func(*args, **kwds))
File "/home/toaster/PROGS/miniconda3/envs/puma-lab/lib/python3.7/site-packages/vaex/expression.py", line 1614, in _apply
scalar_result = self.f(*[fix_type(k[i]) for k in args], **{key: value[i] for key, value in kwargs.items()})
File "/tmp/ipykernel_900334/1192567667.py", line 4, in fun
File "/home/toaster/PROGS/miniconda3/envs/puma-lab/lib/python3.7/site-packages/vaex/expression.py", line 591, in __getitem__
raise NotImplementedError
NotImplementedError
"""
The above exception was the direct cause of the following exception:
NotImplementedError Traceback (most recent call last)
/tmp/ipykernel_900334/3848261726.py in <module>
----> 1 test.evaluate()
~/PROGS/miniconda3/envs/puma-lab/lib/python3.7/site-packages/vaex/expression.py in evaluate(self, i1, i2, out, selection, parallel, array_type)
1107
1108 def evaluate(self, i1=None, i2=None, out=None, selection=None, parallel=True, array_type=None):
-> 1109 return self.ds.evaluate(self, i1, i2, out=out, selection=selection, array_type=array_type, parallel=parallel)
1110
1111 # TODO: it is not so elegant we need to have a custom version of this
~/PROGS/miniconda3/envs/puma-lab/lib/python3.7/site-packages/vaex/dataframe.py in evaluate(self, expression, i1, i2, out, selection, filtered, array_type, parallel, chunk_size)
2893 return self.evaluate_iterator(expression, s1=i1, s2=i2, out=out, selection=selection, filtered=filtered, array_type=array_type, parallel=parallel, chunk_size=chunk_size)
2894 else:
-> 2895 return self._evaluate_implementation(expression, i1=i1, i2=i2, out=out, selection=selection, filtered=filtered, array_type=array_type, parallel=parallel, chunk_size=chunk_size)
2896
2897 def evaluate_iterator(self, expression, s1=None, s2=None, out=None, selection=None, filtered=True, array_type=None, parallel=True, chunk_size=None, prefetch=True):
~/PROGS/miniconda3/envs/puma-lab/lib/python3.7/site-packages/vaex/dataframe.py in _evaluate_implementation(self, expression, i1, i2, out, selection, filtered, array_type, parallel, chunk_size, raw)
6065
6066 for expression in set(expressions):
-> 6067 dtypes[expression] = dtype = df.data_type(expression).internal
6068 if expression not in df.columns:
6069 virtual.add(expression)
~/PROGS/miniconda3/envs/puma-lab/lib/python3.7/site-packages/vaex/dataframe.py in data_type(self, expression, array_type, internal, axis)
2047 data = self.evaluate(expression, 0, 1, filtered=False, array_type=array_type, parallel=False)
2048 except:
-> 2049 data = self.evaluate(expression, 0, 1, filtered=True, array_type=array_type, parallel=False)
2050 if data_type is None:
2051 # means we have to determine it from the data
~/PROGS/miniconda3/envs/puma-lab/lib/python3.7/site-packages/vaex/dataframe.py in evaluate(self, expression, i1, i2, out, selection, filtered, array_type, parallel, chunk_size)
2893 return self.evaluate_iterator(expression, s1=i1, s2=i2, out=out, selection=selection, filtered=filtered, array_type=array_type, parallel=parallel, chunk_size=chunk_size)
2894 else:
-> 2895 return self._evaluate_implementation(expression, i1=i1, i2=i2, out=out, selection=selection, filtered=filtered, array_type=array_type, parallel=parallel, chunk_size=chunk_size)
2896
2897 def evaluate_iterator(self, expression, s1=None, s2=None, out=None, selection=None, filtered=True, array_type=None, parallel=True, chunk_size=None, prefetch=True):
~/PROGS/miniconda3/envs/puma-lab/lib/python3.7/site-packages/vaex/dataframe.py in _evaluate_implementation(self, expression, i1, i2, out, selection, filtered, array_type, parallel, chunk_size, raw)
6153 if out is not None:
6154 scope.buffers[expression] = out
-> 6155 value = scope.evaluate(expression)
6156 # if isinstance(value, ColumnString) and not internal:
6157 # value = value.to_numpy()
~/PROGS/miniconda3/envs/puma-lab/lib/python3.7/site-packages/vaex/scopes.py in evaluate(self, expression, out)
110 # logger.debug("in eval")
111 # eval("def f(")
--> 112 result = eval(expression, expression_namespace, self)
113 result = auto_encode(self.df, expression, result)
114 self.values[expression] = wrap(result)
<string> in <module>
~/PROGS/miniconda3/envs/puma-lab/lib/python3.7/site-packages/vaex/arrow/numpy_dispatch.py in wrapper(*args, **kwargs)
134 args = list(map(unwrap, args))
135 kwargs = {k: unwrap(v) for k, v, in kwargs.items()}
--> 136 result = f(*args, **kwargs)
137 return wrap(result)
138 return wrapper
~/PROGS/miniconda3/envs/puma-lab/lib/python3.7/site-packages/vaex/expression.py in __call__(self, *args, **kwargs)
1597 def __call__(self, *args, **kwargs):
1598 import vaex.multiprocessing
-> 1599 return vaex.multiprocessing.apply(self._apply, args, kwargs, self.multiprocessing)
1600
1601 def _apply(self, *args, **kwargs):
~/PROGS/miniconda3/envs/puma-lab/lib/python3.7/site-packages/vaex/multiprocessing.py in apply(f, args, kwargs, multiprocessing)
30 args = [_trim(k) for k in args]
31 kwargs = {k:_trim(v) for k, v in kwargs.items()}
---> 32 result = _get_pool().apply(f, args, kwargs)
33 return result
34 else:
~/PROGS/miniconda3/envs/puma-lab/lib/python3.7/multiprocessing/pool.py in apply(self, func, args, kwds)
259 Pool must be running.
260 '''
--> 261 return self.apply_async(func, args, kwds).get()
262
263 def map(self, func, iterable, chunksize=None):
~/PROGS/miniconda3/envs/puma-lab/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
655 return self._value
656 else:
--> 657 raise self._value
658
659 def _set(self, i, obj):
~/PROGS/miniconda3/envs/puma-lab/lib/python3.7/multiprocessing/pool.py in worker()
119 job, i, func, args, kwds = task
120 try:
--> 121 result = (True, func(*args, **kwds))
122 except Exception as e:
123 if wrap_exception and func is not _helper_reraises_exception:
~/PROGS/miniconda3/envs/puma-lab/lib/python3.7/site-packages/vaex/expression.py in _apply()
1612 args = [vaex.array_types.tolist(k) for k in args]
1613 for i in range(length):
-> 1614 scalar_result = self.f(*[fix_type(k[i]) for k in args], **{key: value[i] for key, value in kwargs.items()})
1615 result.append(scalar_result)
1616 result = np.array(result)
/tmp/ipykernel_900334/1192567667.py in fun()
2 window = df2[idx:]
3 window_selection = window[window.series2 > val + 10]
----> 4 return window_selection.vindex[0]
5
6 df1=bids_vaex
~/PROGS/miniconda3/envs/puma-lab/lib/python3.7/site-packages/vaex/expression.py in __getitem__()
589 indices, fields = slicer
590 else:
--> 591 raise NotImplementedError
592
593 if indices != slice(None):
NotImplementedError: Wouldn't expect that since |
Beta Was this translation helpful? Give feedback.
All reactions
-
Where is the I find the code sample difficult to follow.. can you perhaps only isolate the part that you have trouble with, so we can have a look at that specifically. There are bunch of things there that are distracting from the real issue i guess. In principle you should not iterate yourself (i guess you are still doing it somewhere given the error). Note that vaex's "columns" are not in memory structures like pandas for instance, so working with them can be a bit different at times (hence the tutorial to see how it is). |
Beta Was this translation helpful? Give feedback.
All reactions
-
The most simplest form of the problem would be: for a data frame with 1 column "series1", find the first index when a value (from series) is greater than some threshold. Now if we're interested in all values from series1, I'm not sure how else to design the algorithm than iterating through series1 and find for each iteration min index. |
Beta Was this translation helpful? Give feedback.
All reactions
-
Ok I think i know what you want. Check out the code below, it is a self contained example: import vaex
import numpy as np
from tqdm.notebook import tqdm
# Generate data
size = 100_000
x = np.random.randint(1, 100, size=size)
y = np.random.randint(1, 100, size=size)
# Create a vaex dataframe with a virtual index for convenience
df = vaex.from_arrays(x=x, y=y, index=vaex.vrange(0, size, dtype='int'))
df.head(3)
# Find the unique values in the reference column
# To not have to look again for repeated values
uniques = df.x.unique()
# Store the findings in a "mapping" dictionary
mapper = {}
# For every unique value in x, find the index
# for which the that value is larger than the values in y
# Note this is inefficient, but I don't know a better way from the top of my head
for value in tqdm(uniques):
# The try and except is there to catch cases for which the
# [value > df.y] condition will never be satisfied
# and thus no index value can be found
try:
idx = df[value > df.y].index[:1].tolist()[0]
except:
idx = -1 # Special value to assing when this exception occurs
mapper[value] = idx
# Create the result via the map method:
df['result'] = df.x.map(mapper=mapper)
# Done
df.head(5) Some points:
If anything, i hope the above example will serve as a "quick fix" or an inspiration for a better solution. Edit: return window_selection.vindex[:1].tolist()[0] |
Beta Was this translation helpful? Give feedback.
All reactions
-
Thank you very much for the long answer! |
Beta Was this translation helpful? Give feedback.
-
As a first time user I'm having trouble converting my Pandas function to Vaex version. When I want to iterate rows I get
NotImplementedError
. Do I have to somehow first evaluate ("materialize"/compute) the expressions?Running the above produces:
Beta Was this translation helpful? Give feedback.
All reactions