Skip to content

Commit

Permalink
Merge pull request #2052 from mabel-dev/#2051
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer authored Oct 7, 2024
2 parents a3e9898 + a7df06c commit 2e220ef
Show file tree
Hide file tree
Showing 31 changed files with 575 additions and 493 deletions.
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 818
__build__ = 819

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
179 changes: 92 additions & 87 deletions opteryx/compiled/cross_join/cython_cross_join.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3,74 +3,57 @@
# cython: cdivision=True
# cython: initializedcheck=False
# cython: infer_types=True
# cython: wraparound=True
# cython: wraparound=False
# cython: boundscheck=False

from libc.stdlib cimport malloc, free
import numpy as np
cimport numpy as cnp
cimport cython
from libc.stdint cimport int32_t
from libc.math cimport INFINITY

cpdef tuple build_rows_indices_and_column(cnp.ndarray column_data):
cdef int32_t i, total_size = 0
cdef int32_t length
cdef int32_t row_count = len(column_data)
cdef int32_t *lengths = <int32_t *>malloc(row_count * sizeof(int32_t))
if lengths is NULL:
raise MemoryError("Failed to allocate memory for lengths array.")

# Calculate the total size and fill lengths array
cdef Py_ssize_t row_count = column_data.shape[0]
cdef cnp.int32_t[::1] lengths = np.empty(row_count, dtype=np.int32)
cdef cnp.int32_t[::1] offsets = np.empty(row_count + 1, dtype=np.int32)
cdef Py_ssize_t i
cdef Py_ssize_t total_size = 0
cdef cnp.dtype element_dtype = None

# Calculate lengths and total_size
for i in range(row_count):
length = column_data[i].shape[0]
lengths[i] = length
total_size += length

# If the total size is zero, return empty arrays
if column_data[i] is None:
lengths[i] = 0
else:
lengths[i] = column_data[i].shape[0]
if element_dtype is None:
element_dtype = column_data[i].dtype
total_size += lengths[i]

# Early exit if total_size is zero
if total_size == 0:
free(lengths)
return (np.array([], dtype=np.int32), np.array([], dtype=object))

# Determine the dtype of the elements in the arrays, handling the case where the first element is None
element_dtype = object
for i in range(row_count):
if column_data[i] is not None:
element_dtype = column_data[i].dtype
break

# Preallocate arrays for indices and flat data
flat_data = np.empty(total_size, dtype=element_dtype) # More efficient than list
cdef int32_t *indices = <int32_t *>malloc(total_size * sizeof(int32_t))
if indices is NULL:
free(lengths)
raise MemoryError("Failed to allocate memory for indices.")
if element_dtype is None:
element_dtype = np.object_

cdef int32_t start = 0
cdef int32_t end = 0

# Flatten the data and fill indices
# Compute offsets for efficient slicing
offsets[0] = 0
for i in range(row_count):
end = start + lengths[i]
flat_data[start:end] = column_data[i] # NumPy handles the slicing and copying
for j in range(start, end):
indices[j] = i
start = end

free(lengths) # Free the lengths array

# Create a NumPy array from indices
cdef cnp.int32_t[:] mv = <cnp.int32_t[:total_size]>indices
np_array = np.array(mv, copy=True) # Copy the memoryview into a NumPy array
free(indices) # Free the indices memory now that we've copied it
offsets[i + 1] = offsets[i] + lengths[i]
cdef cnp.int32_t[::1] indices = np.empty(total_size, dtype=np.int32)
cdef cnp.ndarray flat_data = np.empty(total_size, dtype=element_dtype)

return (np_array, flat_data)
# Fill indices and flat_data
for i in range(row_count):
start = offsets[i]
end = offsets[i + 1]
if end > start:
indices[start:end] = i
flat_data[start:end] = column_data[i]

return (indices, flat_data)

from libc.stdlib cimport malloc, realloc, free
import numpy as np
cimport numpy as cnp
cimport cython
from libc.stdint cimport int32_t


cpdef tuple build_filtered_rows_indices_and_column(cnp.ndarray column_data, set valid_values):
Expand All @@ -84,53 +67,75 @@ cpdef tuple build_filtered_rows_indices_and_column(cnp.ndarray column_data, set
A set of values to filter the rows by during the cross join.
Returns:
tuple of (ndarray, list)
Returns a tuple containing an array of indices and a list of flattened data for rows that match the filter.
tuple of (ndarray, ndarray)
Returns a tuple containing an array of indices and an array of flattened data for rows that match the filter.
"""
cdef int32_t i, index = 0, allocated_size
cdef int32_t row_count = len(column_data)
cdef int32_t initial_alloc_size = row_count * 2
allocated_size = initial_alloc_size
cdef int32_t *indices = <int32_t *>malloc(allocated_size * sizeof(int32_t))
cdef int32_t *new_indices

if indices is NULL:
raise MemoryError("Failed to allocate memory for indices.")

# Determine the dtype of the elements in the arrays, handling the case where the first element is None
element_dtype = object
cdef Py_ssize_t row_count = column_data.shape[0]
cdef Py_ssize_t allocated_size = row_count * 4 # Initial allocation size
cdef Py_ssize_t index = 0
cdef Py_ssize_t i, j, len_i
cdef object array_i
cdef cnp.ndarray flat_data
cdef cnp.int32_t[::1] indices
cdef cnp.dtype element_dtype = None
cdef object value

# Typed sets for different data types
cdef set[int] valid_values_int
cdef set[double] valid_values_double
cdef set[unicode] valid_values_str
cdef set valid_values_typed = None

# Determine the dtype of the elements
for i in range(row_count):
if column_data[i] is not None:
element_dtype = column_data[i].dtype
array_i = column_data[i]
if array_i is not None and array_i.size > 0:
element_dtype = array_i.dtype
break

cdef flat_data = np.empty(allocated_size, dtype=element_dtype)
if element_dtype is None:
element_dtype = np.object_

# Initialize indices and flat_data arrays
indices = np.empty(allocated_size, dtype=np.int32)
flat_data = np.empty(allocated_size, dtype=element_dtype)

# Handle set initialization based on element dtype
if np.issubdtype(element_dtype, np.integer):
valid_values_typed = set([int(v) for v in valid_values])
elif np.issubdtype(element_dtype, np.floating):
valid_values_typed = set([float(v) for v in valid_values])
elif np.issubdtype(element_dtype, np.str_):
valid_values_typed = set([unicode(v) for v in valid_values])
else:
valid_values_typed = valid_values # Fallback to generic Python set

# Main loop
for i in range(row_count):
for value in column_data[i]:
if value in valid_values:
if index == allocated_size: # Check if we need to expand the memory allocation
allocated_size = allocated_size * 2 # Double the allocation size to reduce reallocations
# Handle realloc for indices safely
new_indices = <int32_t *>realloc(indices, allocated_size * sizeof(int32_t))
if new_indices is NULL:
free(indices) # Free previously allocated memory to avoid memory leak
raise MemoryError("Failed to reallocate memory for indices.")
indices = new_indices
array_i = column_data[i]
if array_i is None:
continue
len_i = array_i.shape[0]
if len_i == 0:
continue

for j in range(len_i):
value = array_i[j]
if value in valid_values_typed:
if index >= allocated_size:
# Reallocate arrays
allocated_size *= 2
indices = np.resize(indices, allocated_size)
flat_data = np.resize(flat_data, allocated_size)
if indices is NULL:
raise MemoryError("Failed to reallocate memory for indices.")
flat_data[index] = value
indices[index] = i
index += 1

if index == 0:
free(indices)
return (np.array([], dtype=np.int32), [])
return (np.array([], dtype=np.int32), np.array([], dtype=element_dtype))

# Slice to actual used size before creating final arrays
cdef int32_t[:] indices_view = <int32_t[:index]>indices
cdef cnp.ndarray final_indices = np.array(indices_view, dtype=np.int32)
free(indices) # Free the original buffer now
# Slice arrays to the actual used size
indices = indices[:index]
flat_data = flat_data[:index]

return (final_indices, flat_data[:index])
return (indices, flat_data)
1 change: 1 addition & 0 deletions opteryx/compiled/functions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .functions import generate_random_strings
from .ip_address import ip_in_cidr
from .vectors import possible_match
from .vectors import possible_match_indices
Expand Down
49 changes: 49 additions & 0 deletions opteryx/compiled/functions/functions.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
cimport numpy as cnp
import numpy as np
from libc.time cimport time
cimport cython

cdef bytes alphabet = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_/"

# Seed for xorshift32 PRNG
cdef unsigned int xorshift32_state = <unsigned int>time(NULL)

@cython.boundscheck(False)
@cython.wraparound(False)
def generate_random_strings(int row_count, int width) -> cnp.ndarray:
"""
Generates a NumPy array of random fixed-width strings, repeated `row_count` times.

Parameters:
row_count: int
The number of random strings to generate.
width: int
The length of each random string.

Returns:
A NumPy array containing `row_count` random strings of fixed width.
"""

# Allocate NumPy array with fixed-width strings, dtype='S{width}'
cdef cnp.ndarray result = np.empty((row_count,), dtype=f'S{width}')

cdef unsigned int total_chars = row_count * width
cdef unsigned int i
cdef unsigned char rand_value
cdef char* ptr = <char*>result.data

for i from 0 <= i < total_chars:
rand_value = xorshift32() & 0x3F # Random value limited to 64 (alphabet size)
ptr[0] = alphabet[rand_value]
ptr += 1

return result

cdef inline unsigned int xorshift32():
global xorshift32_state # Declare as global to modify the module-level variable
cdef unsigned int x = xorshift32_state
x ^= (x << 13)
x ^= (x >> 17)
x ^= (x << 5)
xorshift32_state = x
return x
1 change: 1 addition & 0 deletions opteryx/compiled/list_ops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
from .cython_list_ops import cython_arrow_op
from .cython_list_ops import cython_get_element_op
from .cython_list_ops import cython_long_arrow_op
from .cython_list_ops import list_contains_any
22 changes: 21 additions & 1 deletion opteryx/compiled/list_ops/cython_list_ops.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,24 @@ cpdef cnp.ndarray array_encode_utf8(cnp.ndarray inp):
for i in range(n):
inp_view[i] = PyUnicode_AsUTF8String(inp_view[i])

return inp
return inp


cpdef cnp.ndarray list_contains_any(cnp.ndarray array, cnp.ndarray items):
"""
Cython optimized version that works with object arrays.
"""
cdef set items_set = set(items[0])
cdef Py_ssize_t size = array.size
cdef cnp.ndarray res = numpy.zeros(size, dtype=numpy.bool_)
cdef Py_ssize_t i
cdef object test_set, el

for i in range(size):
test_set = array[i]
if test_set is not None:
for el in test_set:
if el in items_set:
res[i] = True
break
return res
Loading

0 comments on commit 2e220ef

Please sign in to comment.