Skip to content

Commit

Permalink
Fix a rare privacy bug in DistinctPerKey in Privacy on Beam. (#84)
Browse files Browse the repository at this point in the history
The bug occurred when there are outlier users in the input that
contribute to many partitions and/or to many values AND the
values contributed are the same as values from other users (the second
part is critical, if the contributed values only come from a single user
then the bug does not occur). Then, the output might not have be DP due
to incorrect contribution bounding. See the comments in the newly added
tests for concrete examples of when/how the bug used to occur.

This is cherry-picked from the main branch commit
e149618.

This commit includes a minor change compared to the one on the main
branch: It removes the error output for kv.Codec.Encode/Decode
functions used in aggregations.go since the error output for these
functions weren't implemented in v1.0.0.
  • Loading branch information
miracvbasaran committed Jul 27, 2021
1 parent 68bdbb2 commit 4d91d7c
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 22 deletions.
109 changes: 102 additions & 7 deletions privacy-on-beam/pbeam/aggregations.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,11 @@ func init() {
beam.RegisterType(reflect.TypeOf((*decodePairInt64Fn)(nil)))
beam.RegisterType(reflect.TypeOf((*decodePairFloat64Fn)(nil)))
beam.RegisterType(reflect.TypeOf((*dropValuesFn)(nil)))
beam.RegisterType(reflect.TypeOf((*encodeKVFn)(nil)))
beam.RegisterType(reflect.TypeOf((*encodeIDKFn)(nil)))
beam.RegisterType(reflect.TypeOf((*decodeIDKFn)(nil)))
beam.RegisterType(reflect.TypeOf((*expandValuesCombineFn)(nil)))
beam.RegisterType(reflect.TypeOf((*expandFloat64ValuesCombineFn)(nil)))
beam.RegisterType(reflect.TypeOf((*decodePairArrayFloat64Fn)(nil)))
beam.RegisterType(reflect.TypeOf((*partitionsMapFn)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*prunePartitionsVFn)(nil)).Elem())
Expand Down Expand Up @@ -736,6 +739,26 @@ func (fn *dropValuesFn) ProcessElement(id beam.Z, kv kv.Pair) (beam.Z, beam.W) {
return id, k
}

// encodeKVFn takes a PCollection<kv.Pair{ID,K}, codedV> as input, and returns a
// PCollection<ID, kv.Pair{K,V}>; where K and V have been coded, and ID has been
// decoded.
type encodeKVFn struct {
InputPairCodec *kv.Codec // Codec for the input kv.Pair{ID,K}
}

func newEncodeKVFn(idkCodec *kv.Codec) *encodeKVFn {
return &encodeKVFn{InputPairCodec: idkCodec}
}

func (fn *encodeKVFn) Setup() error {
return fn.InputPairCodec.Setup()
}

func (fn *encodeKVFn) ProcessElement(pair kv.Pair, codedV []byte) (beam.W, kv.Pair) {
id, _ := fn.InputPairCodec.Decode(pair)
return id, kv.Pair{pair.V, codedV} // pair.V is the K in PCollection<kv.Pair{ID,K}, codedV>
}

// encodeIDKFn takes a PCollection<ID,kv.Pair{K,V}> as input, and returns a
// PCollection<kv.Pair{ID,K},V>; where ID and K have been coded, and V has been
// decoded.
Expand Down Expand Up @@ -766,6 +789,36 @@ func (fn *encodeIDKFn) ProcessElement(id beam.W, pair kv.Pair) (kv.Pair, beam.V)
return kv.Pair{idBuf.Bytes(), pair.K}, v
}

// decodeIDKFn is the reverse operation of encodeIDKFn. It takes a PCollection<kv.Pair{ID,K},V>
// as input, and returns a PCollection<ID, kv.Pair{K,V}>; where K and V has been coded, and ID
// has been decoded.
type decodeIDKFn struct {
VType beam.EncodedType // Type information of the value V
vEnc beam.ElementEncoder // Encoder for privacy ID, set during Setup() according to VType
InputPairCodec *kv.Codec // Codec for the input kv.Pair{ID,K}
}

func newDecodeIDKFn(vType typex.FullType, idkCodec *kv.Codec) *decodeIDKFn {
return &decodeIDKFn{
VType: beam.EncodedType{vType.Type()},
InputPairCodec: idkCodec,
}
}

func (fn *decodeIDKFn) Setup() error {
fn.vEnc = beam.NewElementEncoder(fn.VType.T)
return fn.InputPairCodec.Setup()
}

func (fn *decodeIDKFn) ProcessElement(pair kv.Pair, v beam.V) (beam.W, kv.Pair, error) {
var vBuf bytes.Buffer
if err := fn.vEnc.Encode(v, &vBuf); err != nil {
return nil, kv.Pair{}, fmt.Errorf("pbeam.decodeIDKFn.ProcessElement: couldn't encode V %v: %w", v, err)
}
id, _ := fn.InputPairCodec.Decode(pair)
return id, kv.Pair{pair.V, vBuf.Bytes()}, nil // pair.V is the K in PCollection<kv.Pair{ID,K},V>
}

// decodePairArrayFloat64Fn transforms a PCollection<pairArrayFloat64<codedX,[]float64>> into a
// PCollection<X,[]float64>.
type decodePairArrayFloat64Fn struct {
Expand Down Expand Up @@ -862,29 +915,71 @@ func convertUint64ToFloat64Fn(z beam.Z, i uint64) (beam.Z, float64) {
}

type expandValuesAccum struct {
Values [][]byte
}

// expandValuesCombineFn converts a PCollection<K,V> to PCollection<K,[]V> where each value
// corresponding to the same key are collected in a slice. Resulting PCollection has a
// single slice for each key.
type expandValuesCombineFn struct {
VType beam.EncodedType
vEnc beam.ElementEncoder
}

func newExpandValuesCombineFn(vType beam.EncodedType) *expandValuesCombineFn {
return &expandValuesCombineFn{VType: vType}
}

func (fn *expandValuesCombineFn) Setup() {
fn.vEnc = beam.NewElementEncoder(fn.VType.T)
}

func (fn *expandValuesCombineFn) CreateAccumulator() expandValuesAccum {
return expandValuesAccum{Values: make([][]byte, 0)}
}

func (fn *expandValuesCombineFn) AddInput(a expandValuesAccum, value beam.V) (expandValuesAccum, error) {
var vBuf bytes.Buffer
if err := fn.vEnc.Encode(value, &vBuf); err != nil {
return a, fmt.Errorf("pbeam.expandValuesCombineFn.AddInput: couldn't encode V %v: %w", value, err)
}
a.Values = append(a.Values, vBuf.Bytes())
return a, nil
}

func (fn *expandValuesCombineFn) MergeAccumulators(a, b expandValuesAccum) expandValuesAccum {
a.Values = append(a.Values, b.Values...)
return a
}

func (fn *expandValuesCombineFn) ExtractOutput(a expandValuesAccum) [][]byte {
return a.Values
}

type expandFloat64ValuesAccum struct {
Values []float64
}

// expandValuesCombineFn converts a PCollection<K,float64> to PCollection<K,[]float64>
// expandFloat64ValuesCombineFn converts a PCollection<K,float64> to PCollection<K,[]float64>
// where each value corresponding to the same key are collected in a slice. Resulting
// PCollection has a single slice for each key.
type expandValuesCombineFn struct{}
type expandFloat64ValuesCombineFn struct{}

func (fn *expandValuesCombineFn) CreateAccumulator() expandValuesAccum {
return expandValuesAccum{Values: make([]float64, 0)}
func (fn *expandFloat64ValuesCombineFn) CreateAccumulator() expandFloat64ValuesAccum {
return expandFloat64ValuesAccum{Values: make([]float64, 0)}
}

func (fn *expandValuesCombineFn) AddInput(a expandValuesAccum, value float64) expandValuesAccum {
func (fn *expandFloat64ValuesCombineFn) AddInput(a expandFloat64ValuesAccum, value float64) expandFloat64ValuesAccum {
a.Values = append(a.Values, value)
return a
}

func (fn *expandValuesCombineFn) MergeAccumulators(a, b expandValuesAccum) expandValuesAccum {
func (fn *expandFloat64ValuesCombineFn) MergeAccumulators(a, b expandFloat64ValuesAccum) expandFloat64ValuesAccum {
a.Values = append(a.Values, b.Values...)
return a
}

func (fn *expandValuesCombineFn) ExtractOutput(a expandValuesAccum) []float64 {
func (fn *expandFloat64ValuesCombineFn) ExtractOutput(a expandFloat64ValuesAccum) []float64 {
return a.Values
}

Expand Down
11 changes: 11 additions & 0 deletions privacy-on-beam/pbeam/coders.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func init() {
beam.RegisterCoder(reflect.TypeOf(boundedMeanAccumFloat64{}), encodeBoundedMeanAccumFloat64, decodeBoundedMeanAccumFloat64)
beam.RegisterCoder(reflect.TypeOf(boundedQuantilesAccum{}), encodeBoundedQuantilesAccum, decodeBoundedQuantilesAccum)
beam.RegisterCoder(reflect.TypeOf(expandValuesAccum{}), encodeExpandValuesAccum, decodeExpandValuesAccum)
beam.RegisterCoder(reflect.TypeOf(expandFloat64ValuesAccum{}), encodeExpandFloat64ValuesAccum, decodeExpandFloat64ValuesAccum)
beam.RegisterCoder(reflect.TypeOf(partitionSelectionAccum{}), encodePartitionSelectionAccum, decodePartitionSelectionAccum)
}

Expand Down Expand Up @@ -96,6 +97,16 @@ func decodeExpandValuesAccum(data []byte) (expandValuesAccum, error) {
return ret, err
}

func encodeExpandFloat64ValuesAccum(v expandFloat64ValuesAccum) ([]byte, error) {
return encode(v)
}

func decodeExpandFloat64ValuesAccum(data []byte) (expandFloat64ValuesAccum, error) {
var ret expandFloat64ValuesAccum
err := decode(&ret, data)
return ret, err
}

func encodePartitionSelectionAccum(v partitionSelectionAccum) ([]byte, error) {
return encode(v)
}
Expand Down
60 changes: 54 additions & 6 deletions privacy-on-beam/pbeam/distinct_per_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type DistinctPerKeyParams struct {
func DistinctPerKey(s beam.Scope, pcol PrivatePCollection, params DistinctPerKeyParams) beam.PCollection {
s = s.Scope("pbeam.DistinctPerKey")
// Obtain type information from the underlying PCollection<K,V>.
_, kvT := beam.ValidateKVType(pcol.col)
idT, kvT := beam.ValidateKVType(pcol.col)
if kvT.Type() != reflect.TypeOf(kv.Pair{}) {
log.Exitf("DistinctPerKey must be used on a PrivatePCollection of type <K,V>, got type %v instead", kvT)
}
Expand Down Expand Up @@ -103,14 +103,62 @@ func DistinctPerKey(s beam.Scope, pcol PrivatePCollection, params DistinctPerKey
log.Exit(err)
}

// Perform partition selection
// Do initial per- and cross-partition contribution bounding and swap kv.Pair<K,V> and ID.
// This is not great in terms of utility, since dropping contributions randomly might
// mean that we keep duplicates instead of distinct values. However, this is necessary
// for the current algorithm to be DP.
if spec.testMode != noNoiseWithoutContributionBounding {
// First, rekey by kv.Pair{ID,K} and do per-partition contribution bounding.
rekeyed := beam.ParDo(
s,
newEncodeIDKFn(idT, pcol.codec),
pcol.col,
beam.TypeDefinition{Var: beam.VType, T: pcol.codec.VType.T}) // PCollection<kv.Pair{ID,K}, V>.
// Keep only maxContributionsPerPartition values per (privacyKey, partitionKey) pair.
sampled := boundContributions(s, rekeyed, params.MaxContributionsPerPartition)

// Collect all values per kv.Pair{ID,K} in a slice.
combined := beam.CombinePerKey(s,
newExpandValuesCombineFn(pcol.codec.VType),
sampled) // PCollection<kv.Pair{ID,K}, []codedV}>, where codedV=[]byte

_, codedVSliceType := beam.ValidateKVType(combined)

decoded := beam.ParDo(
s,
newDecodeIDKFn(codedVSliceType, kv.NewCodec(idT.Type(), pcol.codec.KType.T)),
combined,
beam.TypeDefinition{Var: beam.WType, T: idT.Type()}) // PCollection<ID, kv.Pair{K,[]codedV}>, where codedV=[]byte

// Second, do cross-partition contribution bounding.
decoded = boundContributions(s, decoded, params.MaxPartitionsContributed)

rekeyed = beam.ParDo(
s,
newEncodeIDKFn(idT, kv.NewCodec(pcol.codec.KType.T, codedVSliceType.Type())),
decoded,
beam.TypeDefinition{Var: beam.VType, T: codedVSliceType.Type()}) // PCollection<kv.Pair{ID,K}, []codedV>, where codedV=[]byte

flattened := beam.ParDo(s, flattenValuesFn, rekeyed) // PCollection<kv.Pair{ID,K}, codedV>, where codedV=[]byte

pcol.col = beam.ParDo(
s,
newEncodeKVFn(kv.NewCodec(idT.Type(), pcol.codec.KType.T)),
flattened,
beam.TypeDefinition{Var: beam.WType, T: idT.Type()}) // PCollection<ID, kv.Pair{K,V}>
}

// Perform partition selection.
// We do partition selection after cross-partition contribution bounding because
// we want to keep the same contributions across partitions for partition selection
// and Count.
noiseEpsilon, partitionSelectionEpsilon, noiseDelta, partitionSelectionDelta := splitBudget(epsilon, delta, noiseKind)
partitions := SelectPartitions(s, pcol, SelectPartitionsParams{Epsilon: partitionSelectionEpsilon, Delta: partitionSelectionDelta, MaxPartitionsContributed: params.MaxPartitionsContributed})

// Deduplicate (partitionKey,value) pairs across users.
rekeyed := beam.SwapKV(s, pcol.col) // PCollection<kv.Pair{K,V}, ID>.
// Only keep one privacyKey per (partitionKey,value) pair.
sampled := boundContributions(s, rekeyed, 1)
// Keep only one privacyKey per (partitionKey, value) pair
// (i.e. remove duplicate values for each partition).
swapped := beam.SwapKV(s, pcol.col) // PCollection<kv.Pair{K,V}, ID>
sampled := boundContributions(s, swapped, 1)

// Drop V's, each <privacyKey, partitionKey> pair now corresponds to a unique V.
sampled = beam.SwapKV(s, sampled) // PCollection<ID, kv.Pair{K,V}>.
Expand Down
99 changes: 92 additions & 7 deletions privacy-on-beam/pbeam/distinct_per_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ import (
// are correctly counted (without duplicates).
func TestDistinctPrivacyKeyNoNoise(t *testing.T) {
var triples []testutils.TripleWithIntValue
for i := 0; i < 100; i++ { // Add 400 values of which 200 are distinct to Partition 0.
for i := 0; i < 100; i++ { // Add 200 distinct values to Partition 0.
triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 0, Value: i})
triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 0, Value: 100 + i})
triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 0, Value: i}) // Duplicate each value. Should be discarded by DistinctPerKey.
triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 0, Value: 100 + i}) // Duplicate each value. Should be discarded by DistinctPerKey.
}
for i := 100; i < 200; i++ { // Add 200 additional values, all of which are duplicates of the existing distinct values, to Partition 0.
// The duplicates come from users different from the 100 users above in order to not discard
// any distinct values during the initial per-partition contribution bounding step.
triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 0, Value: i - 100}) // Duplicate. Should be discarded by DistinctPerKey.
triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 0, Value: i}) // Duplicate. Should be discarded by DistinctPerKey.
}
for i := 0; i < 50; i++ { // Add 200 values of which 100 are distinct to Partition 1.
triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 1, Value: i})
Expand Down Expand Up @@ -183,6 +187,49 @@ func TestDistinctPerKeyPerKeyCrossPartitionContributionBounding(t *testing.T) {
}
}

// Checks that DistinctPrivacyKey bounds cross-partition contributions before doing deduplication of
// values. This is to ensure we don't run into a contribution bounding-related privacy bug in some
// rare cases.
func TestDistinctPerKeyPerKeyCrossPartitionContributionBounding_IsAppliedBeforeDeduplication(t *testing.T) {
var triples []testutils.TripleWithIntValue
for i := 0; i < 100; i++ { // Add value=1 to 100 partitions.
triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: i, Value: 1})
}
for i := 0; i < 100; i++ { // Add a user that contributes value=1 to all 100 partitions.
triples = append(triples, testutils.TripleWithIntValue{ID: 100, Partition: i, Value: 1})
}
// Assume cross-partition contribution bounding is not done before deduplication of values.
// Each value=1 in each of the i ∈ {0, ..., 99} partitions would have two users associated
// with it: user with ID=i and user with ID=100. We pick one of these two users randomly,
// so in expectation about 50 of 100 partitions' deduplicated values would have user with id=100
// associated with them. After cross-partition contribution bounding happens, we would be
// left with around 50 partitions with a single distinct value each and the test would fail.
result := []testutils.TestInt64Metric{}
for i := 0; i < 100; i++ {
result = append(result, testutils.TestInt64Metric{i, 1})
}
p, s, col, want := ptest.CreateList2(triples, result)
col = beam.ParDo(s, testutils.ExtractIDFromTripleWithIntValue, col)

// ε=50, δ=1-10⁻¹⁵ and l1Sensitivity=1 gives a threshold of ≈2.
// However, since δ is very large, a partition with a single user
// is kept with a probability almost 1.
// We have 100 partitions. So, to get an overall flakiness of 10⁻²³,
// we can have each partition fail with 1-10⁻²⁵ probability (k=25).
epsilon, delta, k, l1Sensitivity := 50.0, 1-1e-15, 25.0, 1.0
// ε is split by 2 for noise and for partition selection, so we use 2*ε to get a Laplace noise with ε.
pcol := MakePrivate(s, col, NewPrivacySpec(2*epsilon, delta))
pcol = ParDo(s, testutils.TripleWithIntValueToKV, pcol)
got := DistinctPerKey(s, pcol, DistinctPerKeyParams{MaxPartitionsContributed: 1, NoiseKind: LaplaceNoise{}, MaxContributionsPerPartition: 1})
want = beam.ParDo(s, testutils.Int64MetricToKV, want)
if err := testutils.ApproxEqualsKVInt64(s, got, want, testutils.LaplaceTolerance(k, l1Sensitivity, epsilon)); err != nil {
t.Fatalf("TestDistinctPerKeyPerKeyCrossPartitionContributionBounding_IsAppliedBeforeDeduplication: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("TestDistinctPerKeyPerKeyCrossPartitionContributionBounding_IsAppliedBeforeDeduplication: DistinctPerKey(%v) = %v, expected %v: %v", col, got, want, err)
}
}

// Checks that DistinctPrivacyKey bounds per-partition contributions correctly.
func TestDistinctPrivacyKeyPerPartitionContributionBounding(t *testing.T) {
var triples []testutils.TripleWithIntValue
Expand Down Expand Up @@ -218,19 +265,57 @@ func TestDistinctPrivacyKeyPerPartitionContributionBounding(t *testing.T) {
// ε=50, δ=10⁻¹⁰⁰ and l1Sensitivity=6 gives a threshold of ≈33.
// We have 3 partitions. So, to get an overall flakiness of 10⁻²³,
// we can have each partition fail with 1-10⁻²⁵ probability (k=25).
// To see the logic and the math behind flakiness and tolerance calculation,
// See https://github.com/google/differential-privacy/blob/main/privacy-on-beam/docs/Tolerance_Calculation.pdf.
epsilon, delta, k, l1Sensitivity := 50.0, 1e-100, 25.0, 6.0
// ε is split by 2 for noise and for partition selection, so we use 2*ε to get a Laplace noise with ε.
pcol := MakePrivate(s, col, NewPrivacySpec(2*epsilon, delta))
pcol = ParDo(s, testutils.TripleWithIntValueToKV, pcol)
got := DistinctPerKey(s, pcol, DistinctPerKeyParams{MaxPartitionsContributed: 3, NoiseKind: LaplaceNoise{}, MaxContributionsPerPartition: 2})
want = beam.ParDo(s, testutils.Int64MetricToKV, want)
if err := testutils.ApproxEqualsKVInt64(s, got, want, testutils.LaplaceTolerance(k, l1Sensitivity, epsilon)); err != nil {
t.Fatalf("TestDistinctPerKeyNoNoise: %v", err)
t.Fatalf("TestDistinctPrivacyKeyPerPartitionContributionBounding: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("TestDistinctPerKeyNoNoise: DistinctPerKey(%v) = %v, expected %v: %v", col, got, want, err)
t.Errorf("TestDistinctPrivacyKeyPerPartitionContributionBounding: DistinctPerKey(%v) = %v, expected %v: %v", col, got, want, err)
}
}

// Checks that DistinctPrivacyKey bounds per-partition contributions before doing deduplication of
// values. This is to ensure we don't run into a contribution bounding-related privacy bug in some
// rare cases.
func TestDistinctPrivacyKeyPerPartitionContributionBounding_IsAppliedBeforeDeduplication(t *testing.T) {
var triples []testutils.TripleWithIntValue
for i := 0; i < 100; i++ { // Add 100 distinct values to Partition 0.
triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 0, Value: i})
}
for i := 0; i < 100; i++ { // Add a user that contributes all these 100 distinct values to Partition 0.
triples = append(triples, testutils.TripleWithIntValue{ID: 100, Partition: 0, Value: i})
}
// Assume per-partition contribution bounding is not done before deduplication of values.
// Each value i ∈ {0, ..., 99} would have two users associated with it: user with ID=i and
// user with ID=100. We pick one of these two users randomly, so in expectation about 50
// of 100 deduplicated values would have user with id=100 associated with them. After
// per-partition contribution bounding happens, we would be left with around 50 distinct
// values and the test would fail.
result := []testutils.TestInt64Metric{
{0, 100},
}
p, s, col, want := ptest.CreateList2(triples, result)
col = beam.ParDo(s, testutils.ExtractIDFromTripleWithIntValue, col)

// ε=50, δ=10⁻¹⁰⁰ and l1Sensitivity=1 gives a threshold of ≈6.
// We have 1 partition. So, to get an overall flakiness of 10⁻²³,
// we need to have each partition pass with 1-10⁻²³ probability (k=23).
epsilon, delta, k, l1Sensitivity := 50.0, 1e-100, 23.0, 1.0
// ε is split by 2 for noise and for partition selection, so we use 2*ε to get a Laplace noise with ε.
pcol := MakePrivate(s, col, NewPrivacySpec(2*epsilon, delta))
pcol = ParDo(s, testutils.TripleWithIntValueToKV, pcol)
got := DistinctPerKey(s, pcol, DistinctPerKeyParams{MaxPartitionsContributed: 1, NoiseKind: LaplaceNoise{}, MaxContributionsPerPartition: 1})
want = beam.ParDo(s, testutils.Int64MetricToKV, want)
if err := testutils.ApproxEqualsKVInt64(s, got, want, testutils.LaplaceTolerance(k, l1Sensitivity, epsilon)); err != nil {
t.Fatalf("TestDistinctPrivacyKeyPerPartitionContributionBounding_IsAppliedBeforeDeduplication: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("TestDistinctPrivacyKeyPerPartitionContributionBounding_IsAppliedBeforeDeduplication: DistinctPerKey(%v) = %v, expected %v: %v", col, got, want, err)
}
}

Expand Down
Loading

0 comments on commit 4d91d7c

Please sign in to comment.