Skip to content

Commit

Permalink
Merge pull request #39 from numforge/improve-reduction-API
Browse files Browse the repository at this point in the history
Add float32 implementation of min/max/sum
  • Loading branch information
mratsim committed Aug 25, 2019
2 parents af191c0 + 0283fd3 commit 2f619fd
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 95 deletions.
43 changes: 21 additions & 22 deletions benchmarks/fp_reduction_latency/reduction_max_bench.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func getIndex[T](t: Tensor[T], idx: varargs[int]): int =

func `[]`*[T](t: Tensor[T], idx: varargs[int]): T {.inline.}=
## Index tensor
t.storage.raw_data[t.getIndex(idx)]
t.storage.raw_buffer[t.getIndex(idx)]

################################################################

Expand Down Expand Up @@ -95,7 +95,7 @@ proc mainBench_1_accum_simple(a: Tensor[float32], nb_samples: int) =
var accum = float32(-Inf)
bench("Reduction - 1 accumulator - simple iter", accum):
for i in 0 ..< a.size:
accum = max(accum, a.storage.raw_data[i])
accum = max(accum, a.storage.raw_buffer[i])

proc mainBench_1_accum_macro(a: Tensor[float32], nb_samples: int) =
var accum = float32(-Inf)
Expand All @@ -111,10 +111,10 @@ proc mainBench_2_accum_simple(a: Tensor[float32], nb_samples: int) =
var
accum1 = float32(-Inf)
for i in countup(0, unroll_stop - 1, 2):
accum = max(accum, a.storage.raw_data[i])
accum1 = max(accum1, a.storage.raw_data[i+1])
accum = max(accum, a.storage.raw_buffer[i])
accum1 = max(accum1, a.storage.raw_buffer[i+1])
for i in unroll_stop ..< size:
accum = max(accum, a.storage.raw_data[i])
accum = max(accum, a.storage.raw_buffer[i])
accum = max(accum, accum1)

proc mainBench_3_accum_simple(a: Tensor[float32], nb_samples: int) =
Expand All @@ -126,11 +126,11 @@ proc mainBench_3_accum_simple(a: Tensor[float32], nb_samples: int) =
accum1 = float32(-Inf)
accum2 = float32(-Inf)
for i in countup(0, unroll_stop - 1, 3):
accum = max(accum, a.storage.raw_data[i])
accum1 = max(accum1, a.storage.raw_data[i+1])
accum2 = max(accum2, a.storage.raw_data[i+2])
accum = max(accum, a.storage.raw_buffer[i])
accum1 = max(accum1, a.storage.raw_buffer[i+1])
accum2 = max(accum2, a.storage.raw_buffer[i+2])
for i in unroll_stop ..< size:
accum = max(accum, a.storage.raw_data[i])
accum = max(accum, a.storage.raw_buffer[i])
accum = max(accum, max(accum1, accum2))

proc mainBench_4_accum_simple(a: Tensor[float32], nb_samples: int) =
Expand All @@ -143,12 +143,12 @@ proc mainBench_4_accum_simple(a: Tensor[float32], nb_samples: int) =
accum2 = float32(-Inf)
accum3 = float32(-Inf)
for i in countup(0, unroll_stop - 1, 4):
accum = max(accum, a.storage.raw_data[i])
accum1 = max(accum1, a.storage.raw_data[i+1])
accum2 = max(accum2, a.storage.raw_data[i+2])
accum3 = max(accum3, a.storage.raw_data[i+3])
accum = max(accum, a.storage.raw_buffer[i])
accum1 = max(accum1, a.storage.raw_buffer[i+1])
accum2 = max(accum2, a.storage.raw_buffer[i+2])
accum3 = max(accum3, a.storage.raw_buffer[i+3])
for i in unroll_stop ..< size:
accum = max(accum, a.storage.raw_data[i])
accum = max(accum, a.storage.raw_buffer[i])
accum = max(accum2, accum1)
accum2 = max(accum2, accum3)
accum = max(accum, accum2)
Expand All @@ -164,21 +164,21 @@ proc mainBench_5_accum_simple(a: Tensor[float32], nb_samples: int) =
accum3 = float32(-Inf)
accum4 = float32(-Inf)
for i in countup(0, unroll_stop - 1, 5):
accum = max(accum, a.storage.raw_data[i])
accum1 = max(accum1, a.storage.raw_data[i+1])
accum2 = max(accum2, a.storage.raw_data[i+2])
accum3 = max(accum3, a.storage.raw_data[i+3])
accum4 = max(accum4, a.storage.raw_data[i+4])
accum = max(accum, a.storage.raw_buffer[i])
accum1 = max(accum1, a.storage.raw_buffer[i+1])
accum2 = max(accum2, a.storage.raw_buffer[i+2])
accum3 = max(accum3, a.storage.raw_buffer[i+3])
accum4 = max(accum4, a.storage.raw_buffer[i+4])
for i in unroll_stop ..< size:
accum = max(accum, a.storage.raw_data[i])
accum = max(accum, a.storage.raw_buffer[i])
accum2 = max(accum2, max(accum3, accum4))
accum = max(accum, accum1)
accum = max(accum, accum2)

proc mainBench_packed_sse_prod(a: Tensor[float32], nb_samples: int) =
var accum = float32(-Inf)
bench("Max reduction - prod impl", accum):
accum = max(accum, max_sse3(a.storage.raw_data, a.size))
accum = max(accum, reduce_max(a.storage.raw_buffer, a.size))

when defined(fastmath):
{.passC:"-ffast-math".}
Expand Down Expand Up @@ -345,4 +345,3 @@ when isMainModule:
# +0x97 addq $32, %rcx
# +0x9b addq $2, %rdx
# +0x9f jne "max_sse3_skqjc9ccvpz3qvNJidlNb9aw+0x70"

16 changes: 8 additions & 8 deletions benchmarks/fp_reduction_latency/reduction_packed_sse.nim
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func getIndex[T](t: Tensor[T], idx: varargs[int]): int =

func `[]`*[T](t: Tensor[T], idx: varargs[int]): T {.inline.}=
## Index tensor
t.storage.raw_data[t.getIndex(idx)]
t.storage.raw_buffer[t.getIndex(idx)]

################################################################

Expand Down Expand Up @@ -109,7 +109,7 @@ proc mainBench_4_packed_sse_accums(a: Tensor[float32], nb_samples: int) =
var accums: m128
var ptr_data = a.unsafe_raw_data()
for i in countup(0, unroll_stop - 1, 4):
let data4 = a.storage.raw_data[i].unsafeaddr.mm_load_ps() # Can't use ptr_data, no address :/
let data4 = a.storage.raw_buffer[i].unsafeaddr.mm_load_ps() # Can't use ptr_data, no address :/
accums = mm_add_ps(accums, data4)
for i in unroll_stop ..< size:
accum += ptr_data[i]
Expand All @@ -125,8 +125,8 @@ proc mainBench_8_packed_sse_accums(a: Tensor[float32], nb_samples: int) =
for i in countup(0, unroll_stop - 1, 8):
# Can't use ptr_data, no address :/
let
data4_0 = a.storage.raw_data[i ].unsafeaddr.mm_load_ps()
data4_1 = a.storage.raw_data[i+4].unsafeaddr.mm_load_ps()
data4_0 = a.storage.raw_buffer[i ].unsafeaddr.mm_load_ps()
data4_1 = a.storage.raw_buffer[i+4].unsafeaddr.mm_load_ps()
accums0 = mm_add_ps(accums0, data4_0)
accums1 = mm_add_ps(accums1, data4_1)
for i in unroll_stop ..< size:
Expand All @@ -139,7 +139,7 @@ proc mainBench_8_packed_sse_accums(a: Tensor[float32], nb_samples: int) =
proc mainBench_packed_sse_prod(a: Tensor[float32], nb_samples: int) =
var accum = 0'f32
bench("Reduction - prod impl", accum):
accum += sum_kernel(a.storage.raw_data, a.size)
accum += reduce_sum(a.storage.raw_buffer, a.size)

proc mainBench_8_packed_avx_accums(a: Tensor[float32], nb_samples: int) =
var accum = 0'f32
Expand All @@ -150,7 +150,7 @@ proc mainBench_8_packed_avx_accums(a: Tensor[float32], nb_samples: int) =
var ptr_data = a.unsafe_raw_data()
for i in countup(0, unroll_stop - 1, 8):
# Can't use ptr_data, no address :/
let data8_0 = a.storage.raw_data[i ].unsafeaddr.mm256_load_ps()
let data8_0 = a.storage.raw_buffer[i ].unsafeaddr.mm256_load_ps()
accums0 = mm256_add_ps(accums0, data8_0)
for i in unroll_stop ..< size:
accum += ptr_data[i]
Expand All @@ -165,8 +165,8 @@ proc mainBench_16_packed_avx_accums(a: Tensor[float32], nb_samples: int) =
var ptr_data = a.unsafe_raw_data()
for i in countup(0, unroll_stop - 1, 16):
# Can't use ptr_data, no address :/
let data8_0 = a.storage.raw_data[i ].unsafeaddr.mm256_load_ps()
let data8_1 = a.storage.raw_data[i+8].unsafeaddr.mm256_load_ps()
let data8_0 = a.storage.raw_buffer[i ].unsafeaddr.mm256_load_ps()
let data8_1 = a.storage.raw_buffer[i+8].unsafeaddr.mm256_load_ps()
accums0 = mm256_add_ps(accums0, data8_0)
accums1 = mm256_add_ps(accums1, data8_1)
for i in unroll_stop ..< size:
Expand Down
19 changes: 19 additions & 0 deletions examples/ex05b_raw_buffer_parallel_reduction.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# ################################################################
#
# Example of using a parallel reduction
# primitives on a raw buffer
#
# ################################################################

import
random, sequtils,
../laser/primitives/reductions

proc main() =
let interval = -1f .. 1f
let size = 10_000_000
let buf = newSeqWith(size, rand(interval))

echo reduce_max(buf[0].unsafeAddr, size)

main()
167 changes: 102 additions & 65 deletions laser/primitives/reductions.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,69 +11,106 @@ import
when defined(i386) or defined(amd_64):
import ./simd_math/reductions_sse3

func sum_fallback(data: ptr UncheckedArray[float32], len: Natural): float32 =
## Fallback kernel for sum reduction
## We use 2 accumulators as they should exhibits the best compromise
## of speed and code-size across architectures
## especially when fastmath is turned on.
## Benchmarked: 2x faster than naive reduction and 2x slower than the SSE3 kernel

withCompilerOptimHints()
let data{.restrict.} = data

# Loop peeling is left at the compiler discretion,
# if optimizing for code size is desired
const step = 2
var accum1 = 0'f32
let unroll_stop = len.round_step_down(step)
for i in countup(0, unroll_stop - 1, 2):
result += data[i]
accum1 += data[i+1]
result += accum1
if unroll_stop != len:
result += data[unroll_stop] # unroll_stop = len -1 last element

proc sum_kernel*(data: ptr (float32 or UncheckedArray[float32]), len: Natural): float32 {.sideeffect.}=
## Does a sum reduction on a contiguous range of float32
## Warning:
## This kernel considers floating-point addition associative
## and will reorder additions.
## Due to parallel reduction and floating point rounding,
## same input can give different results depending on thread timings

# Note that the kernel is memory-bandwith bound once the
# CPU pipeline is saturated. Using AVX doesn't help
# loading data from memory faster.
when not defined(openmp):
when defined(i386) or defined(amd_64):
if cpuinfo_has_x86_sse3():
return sum_sse3(data, len)
return sum_fallback(data, len)
else:
# TODO: Fastest between a padded seq, a critical section, OMP atomics or CPU atomics?
let
max_threads = omp_get_max_threads()
omp_condition = OMP_MEMORY_BOUND_GRAIN_SIZE * max_threads < len
sse3 = cpuinfo_has_x86_sse3()

{.emit: "#pragma omp parallel if (`omp_condition`)".}
block:
# Fallback reduction operations
# ----------------------------------------------------------------------------------

template reduction_op_fallback(op_name, initial_val, scalar_op, merge_op: untyped) =
func `op_name`(data: ptr UncheckedArray[float32], len: Natural): float32 =
## Fallback kernel for reduction
## We use 2 accumulators as they should exhibits the best compromise
## of speed and code-size across architectures
## especially when fastmath is turned on.
## Benchmarked: 2x faster than naive reduction and 2x slower than the SSE3 kernel

withCompilerOptimHints()
let data{.restrict.} = data

# Loop peeling is left at the compiler discretion,
# if optimizing for code size is desired
const step = 2
var accum1 = initial_val
let unroll_stop = len.round_step_down(step)
for i in countup(0, unroll_stop - 1, 2):
result = scalar_op(result, data[i])
accum1 = scalar_op(accum1, data[i+1])
result = scalar_op(result, accum1)
if unroll_stop != len:
# unroll_stop = len - 1 last element
result = scalar_op(result, data[unroll_stop])

reduction_op_fallback(sum_fallback, 0'f32, `+`, `+`)
reduction_op_fallback(min_fallback, float32(Inf), min, min)
reduction_op_fallback(max_fallback, float32(-Inf), max, max)

# Reduction primitives
# ----------------------------------------------------------------------------------

template gen_reduce_kernel_f32(
kernel_name: untyped{ident},
initial_val: static float32,
sse3_kernel, fallback_kernel: untyped{ident},
merge_op: untyped
): untyped =

proc `kernel_name`*(data: ptr (float32 or UncheckedArray[float32]), len: Natural): float32 {.sideeffect.}=
## Does a reduction on a contiguous range of float32
## Warning:
## This kernel considers the reduction operation associative
## and will reorder operations.
## Due to parallel reduction and floating point rounding,
## the same input can give different results depending on thread timings
## for some operations like addition

# Note that the kernel is memory-bandwith bound once the
# CPU pipeline is saturated. Using AVX doesn't help
# loading data from memory faster.

withCompilerOptimHints()
let data{.restrict.} = cast[ptr UncheckedArray[float32]](data)

when not defined(openmp):
when defined(i386) or defined(amd_64):
if cpuinfo_has_x86_sse3():
return `sse3_kernel`(data, len)
return `fallback_kernel`(data, len)
else:
result = initial_val

let
nb_chunks = omp_get_num_threads()
whole_chunk_size = len div nb_chunks
thread_id = omp_get_thread_num()
`chunk_offset`{.inject.} = whole_chunk_size * thread_id
`chunk_size`{.inject.} = if thread_id < nb_chunks - 1: whole_chunk_size
else: len - chunk_offset
block:
let p_chunk{.restrict.} = cast[ptr UncheckedArray[float32]](
data[chunk_offset].addr
)
when defined(i386) or defined(amd_64):
let local_sum = if sse3: sum_sse3(p_chunk, chunk_size)
else: sum_fallback(p_chunk, chunk_size)
else:
let local_sum = sum_fallback(p_chunk, chunk_size)

{.emit: "#pragma omp atomic".}
{.emit: "`result` += `local_sum`;".}
omp_condition = OMP_MEMORY_BOUND_GRAIN_SIZE * omp_get_max_threads() < len
sse3 = cpuinfo_has_x86_sse3()

omp_parallel_if(omp_condition):
omp_chunks(len, chunk_offset, chunk_size):
let local_ptr_chunk{.restrict.} = cast[ptr UncheckedArray[float32]](
data[chunk_offset].addr
)
when defined(i386) or defined(amd_64):
let local_accum = if sse3: `sse3_kernel`(local_ptr_chunk, chunk_size)
else: `fallback_kernel`(local_ptr_chunk, chunk_size)
else:
let local_accum = `fallback_kernel`(local_ptr_chunk, chunk_size)

omp_critical:
result = merge_op(result, local_accum)

gen_reduce_kernel_f32(
reduce_sum,
0'f32,
sum_sse3, sum_fallback,
`+`
)

gen_reduce_kernel_f32(
reduce_min,
float32(Inf),
min_sse3, min_fallback,
min
)

gen_reduce_kernel_f32(
reduce_max,
float32(-Inf),
max_sse3, max_fallback,
max
)

0 comments on commit 2f619fd

Please sign in to comment.