7
def flowread(flow_path, quantize=False, concat_axis=0, *args, **kwargs):
8
"""Read an optical flow map.
11
flow_path (ndarray or str): Flow path.
12
quantize (bool): whether to read quantized pair, if set to True,
13
remaining args will be passed to :func:`dequantize_flow`.
14
concat_axis (int): The axis that dx and dy are concatenated,
15
can be either 0 or 1. Ignored if quantize is False.
18
ndarray: Optical flow represented as a (h, w, 2) numpy array
21
assert concat_axis in [0, 1]
22
cat_flow = cv2.imread(flow_path, cv2.IMREAD_UNCHANGED)
23
if cat_flow.ndim != 2:
24
raise IOError(f'{flow_path} is not a valid quantized flow file, its dimension is {cat_flow.ndim}.')
25
assert cat_flow.shape[concat_axis] % 2 == 0
26
dx, dy = np.split(cat_flow, 2, axis=concat_axis)
27
flow = dequantize_flow(dx, dy, *args, **kwargs)
29
with open(flow_path, 'rb') as f:
31
header = f.read(4).decode('utf-8')
33
raise IOError(f'Invalid flow file: {flow_path}')
36
raise IOError(f'Invalid flow file: {flow_path}, header does not contain PIEH')
38
w = np.fromfile(f, np.int32, 1).squeeze()
39
h = np.fromfile(f, np.int32, 1).squeeze()
40
flow = np.fromfile(f, np.float32, w * h * 2).reshape((h, w, 2))
42
return flow.astype(np.float32)
45
def flowwrite(flow, filename, quantize=False, concat_axis=0, *args, **kwargs):
46
"""Write optical flow to file.
48
If the flow is not quantized, it will be saved as a .flo file losslessly,
49
otherwise a jpeg image which is lossy but of much smaller size. (dx and dy
50
will be concatenated horizontally into a single image if quantize is True.)
53
flow (ndarray): (h, w, 2) array of optical flow.
54
filename (str): Output filepath.
55
quantize (bool): Whether to quantize the flow and save it to 2 jpeg
56
images. If set to True, remaining args will be passed to
57
:func:`quantize_flow`.
58
concat_axis (int): The axis that dx and dy are concatenated,
59
can be either 0 or 1. Ignored if quantize is False.
62
with open(filename, 'wb') as f:
63
f.write('PIEH'.encode('utf-8'))
64
np.array([flow.shape[1], flow.shape[0]], dtype=np.int32).tofile(f)
65
flow = flow.astype(np.float32)
69
assert concat_axis in [0, 1]
70
dx, dy = quantize_flow(flow, *args, **kwargs)
71
dxdy = np.concatenate((dx, dy), axis=concat_axis)
72
os.makedirs(os.path.dirname(filename), exist_ok=True)
73
cv2.imwrite(filename, dxdy)
76
def quantize_flow(flow, max_val=0.02, norm=True):
77
"""Quantize flow to [0, 255].
79
After this step, the size of flow will be much smaller, and can be
80
dumped as jpeg images.
83
flow (ndarray): (h, w, 2) array of optical flow.
84
max_val (float): Maximum value of flow, values beyond
85
[-max_val, max_val] will be truncated.
86
norm (bool): Whether to divide flow values by image width/height.
89
tuple[ndarray]: Quantized dx and dy.
98
flow_comps = [quantize(d, -max_val, max_val, 255, np.uint8) for d in [dx, dy]]
99
return tuple(flow_comps)
102
def dequantize_flow(dx, dy, max_val=0.02, denorm=True):
103
"""Recover from quantized flow.
106
dx (ndarray): Quantized dx.
107
dy (ndarray): Quantized dy.
108
max_val (float): Maximum value used when quantizing.
109
denorm (bool): Whether to multiply flow values with width/height.
112
ndarray: Dequantized flow.
114
assert dx.shape == dy.shape
115
assert dx.ndim == 2 or (dx.ndim == 3 and dx.shape[-1] == 1)
117
dx, dy = [dequantize(d, -max_val, max_val, 255) for d in [dx, dy]]
122
flow = np.dstack((dx, dy))
126
def quantize(arr, min_val, max_val, levels, dtype=np.int64):
127
"""Quantize an array of (-inf, inf) to [0, levels-1].
130
arr (ndarray): Input array.
131
min_val (scalar): Minimum value to be clipped.
132
max_val (scalar): Maximum value to be clipped.
133
levels (int): Quantization levels.
134
dtype (np.type): The type of the quantized array.
137
tuple: Quantized array.
139
if not (isinstance(levels, int) and levels > 1):
140
raise ValueError(f'levels must be a positive integer, but got {levels}')
141
if min_val >= max_val:
142
raise ValueError(f'min_val ({min_val}) must be smaller than max_val ({max_val})')
144
arr = np.clip(arr, min_val, max_val) - min_val
145
quantized_arr = np.minimum(np.floor(levels * arr / (max_val - min_val)).astype(dtype), levels - 1)
150
def dequantize(arr, min_val, max_val, levels, dtype=np.float64):
151
"""Dequantize an array.
154
arr (ndarray): Input array.
155
min_val (scalar): Minimum value to be clipped.
156
max_val (scalar): Maximum value to be clipped.
157
levels (int): Quantization levels.
158
dtype (np.type): The type of the dequantized array.
161
tuple: Dequantized array.
163
if not (isinstance(levels, int) and levels > 1):
164
raise ValueError(f'levels must be a positive integer, but got {levels}')
165
if min_val >= max_val:
166
raise ValueError(f'min_val ({min_val}) must be smaller than max_val ({max_val})')
168
dequantized_arr = (arr + 0.5).astype(dtype) * (max_val - min_val) / levels + min_val
170
return dequantized_arr