Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test of malicious behavior during zero check protocol #1205

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 40 additions & 12 deletions ipa-core/src/helpers/transport/in_memory/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::borrow::Cow;
use std::{borrow::Cow, future::ready, pin::Pin};

use futures::{Future, FutureExt};

use crate::{
helpers::{HelperIdentity, Role, RoleAssignment},
Expand Down Expand Up @@ -42,19 +44,33 @@ pub trait StreamInterceptor: Send + Sync {
/// from additive attacks without additional measures implemented
/// at the transport layer, like checksumming, share consistency
/// checks, etc.
fn peek(&self, ctx: &Self::Context, data: &mut Vec<u8>);
fn peek<'a>(
&'a self,
ctx: &'a Self::Context,
data: &'a mut Vec<u8>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
}

impl<F: Fn(&InspectContext, &mut Vec<u8>) + Send + Sync + 'static> StreamInterceptor for F {
impl<F> StreamInterceptor for F
where
for<'a> F: Fn(&'a InspectContext, &'a mut Vec<u8>) -> Pin<Box<dyn Future<Output = ()> + Send>>
+ Send
+ Sync
+ 'a,
{
type Context = InspectContext;

fn peek(&self, ctx: &Self::Context, data: &mut Vec<u8>) {
(self)(ctx, data);
fn peek(
&self,
ctx: &Self::Context,
data: &mut Vec<u8>,
) -> Pin<Box<dyn Future<Output = ()> + Send>> {
(self)(ctx, data)
}
}

/// The general context provided to stream inspectors.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct InspectContext {
/// The shard index of this instance.
/// This is `None` for non-sharded helpers.
Expand All @@ -76,7 +92,7 @@ pub struct InspectContext {
#[inline]
#[must_use]
pub fn passthrough() -> Arc<dyn StreamInterceptor<Context = InspectContext>> {
Arc::new(|_ctx: &InspectContext, _data: &mut Vec<u8>| {})
Arc::new(|_ctx: &InspectContext, _data: &mut Vec<u8>| ready(()).boxed())
}

/// This narrows the implementation of stream seeker
Expand All @@ -93,7 +109,10 @@ pub struct MaliciousHelper<F> {
inner: F,
}

impl<F: Fn(&MaliciousHelperContext, &mut Vec<u8>) + Send + Sync> MaliciousHelper<F> {
impl<F> MaliciousHelper<F>
where
F: Fn(MaliciousHelperContext, &mut Vec<u8>) -> Pin<Box<dyn Future<Output = ()> + Send>>,
{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this given that it is only used in non-async context? it may be better to convert non-async code to async inside this helper - non-async code is much easier to deal with

pub fn new(role: Role, role_assignment: &RoleAssignment, peeker: F) -> Arc<Self> {
Arc::new(Self {
identity: role_assignment.identity(role),
Expand Down Expand Up @@ -131,14 +150,23 @@ pub struct MaliciousHelperContext {
pub gate: Gate,
}

impl<F: Fn(&MaliciousHelperContext, &mut Vec<u8>) + Send + Sync> StreamInterceptor
for MaliciousHelper<F>
impl<F> StreamInterceptor for MaliciousHelper<F>
where
F: Fn(MaliciousHelperContext, &mut Vec<u8>) -> Pin<Box<dyn Future<Output = ()> + Send>>
+ Send
+ Sync,
{
type Context = InspectContext;

fn peek(&self, ctx: &Self::Context, data: &mut Vec<u8>) {
fn peek(
&self,
ctx: &Self::Context,
data: &mut Vec<u8>,
) -> Pin<Box<dyn Future<Output = ()> + Send>> {
if ctx.identity == self.identity {
(self.inner)(&self.context(ctx), data);
(self.inner)(self.context(ctx), data)
} else {
ready(()).boxed()
}
}
}
20 changes: 12 additions & 8 deletions ipa-core/src/helpers/transport/in_memory/transport.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
collections::HashMap,
fmt::{Debug, Formatter},
future::ready,
io,
pin::Pin,
task::{Context, Poll},
Expand All @@ -12,7 +13,7 @@ use ::tokio::sync::{
};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{Stream, StreamExt};
use futures::{FutureExt, Stream, StreamExt};
#[cfg(all(feature = "shuttle", test))]
use shuttle::future as tokio;
use tokio_stream::wrappers::ReceiverStream;
Expand All @@ -21,8 +22,7 @@ use tracing::Instrument;
use crate::{
error::BoxError,
helpers::{
in_memory_config,
in_memory_config::DynStreamInterceptor,
in_memory_config::{self, DynStreamInterceptor},
transport::{
in_memory::config::InspectContext,
routing::{Addr, RouteId},
Expand Down Expand Up @@ -195,12 +195,16 @@ impl<I: TransportIdentity> Transport for Weak<InMemoryTransport<I>> {
channel
.send((
addr,
InMemoryStream::wrap(data.map({
move |mut chunk| {
if let Some(ref context) = context {
this.config.stream_interceptor.peek(context, &mut chunk);
InMemoryStream::wrap(data.then(move |mut chunk| {
let interceptor = Arc::clone(&this.config.stream_interceptor);
if let Some(context) = context.clone() {
async move {
interceptor.peek(&context, &mut chunk).await;
Ok(Bytes::from(chunk))
}
Ok(Bytes::from(chunk))
.boxed()
} else {
ready(Ok(Bytes::from(chunk))).boxed()
}
})),
ack_tx,
Expand Down
197 changes: 189 additions & 8 deletions ipa-core/src/protocol/basics/check_zero.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ use crate::{
error::Error,
ff::Field,
protocol::{
basics::{mul::semi_honest_multiply, reveal::Reveal, step::CheckZeroStep as Step},
basics::{malicious_reveal, mul::semi_honest_multiply, step::CheckZeroStep as Step},
context::Context,
prss::{FromRandom, SharedRandomness},
RecordId,
},
secret_sharing::replicated::semi_honest::AdditiveShare as Replicated,
};

#[cfg(test)]
static SH1: once_cell::sync::Lazy<tests::NotifyOnceCell<crate::ff::Fp32BitPrime>> =
once_cell::sync::Lazy::new(|| tests::NotifyOnceCell::new());

/// A very simple protocol to check if a replicated secret sharing is a sharing of zero.
///
/// NOTE: this protocol leaks information about `v` the helpers. Please only use this in cases where
Expand Down Expand Up @@ -39,6 +43,9 @@ use crate::{
/// ## Errors
/// Lots of things may go wrong here, from timeouts to bad output. They will be signalled
/// back via the error response
/// ## Panics
/// If the full reveal of `rv_share` does not return a value, which would only happen if the
/// reveal implementation is broken.
pub async fn check_zero<C, F>(ctx: C, record_id: RecordId, v: &Replicated<F>) -> Result<bool, Error>
where
C: Context,
Expand All @@ -48,26 +55,82 @@ where

let rv_share =
semi_honest_multiply(ctx.narrow(&Step::MultiplyWithR), record_id, &r_sharing, v).await?;
tracing::info!("{:?}", &rv_share);
let rv = F::from_array(
&rv_share
.reveal(ctx.narrow(&Step::RevealR), record_id)
.await?,
&malicious_reveal(ctx.narrow(&Step::RevealR), record_id, None, &rv_share)
.await?
.expect("full reveal should always return a value"),
);

tracing::info!("{:?}", &rv);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tracing::info!("{:?}", &rv);

Ok(rv == F::ZERO)
}

#[cfg(test)]
pub async fn check_zero_fp32bitprime<C>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you have plans to use it outside of this module or we can move it into tests?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same argument applies to SH1 as well

ctx: C,
record_id: RecordId,
v: &Replicated<crate::ff::Fp32BitPrime>,
) -> Result<bool, Error>
where
C: Context,
{
use crate::{
ff::Fp32BitPrime,
secret_sharing::{replicated::ReplicatedSecretSharing, SharedValue},
};

let r_sharing: Replicated<Fp32BitPrime> = ctx.prss().generate(record_id);

let rv_share =
semi_honest_multiply(ctx.narrow(&Step::MultiplyWithR), record_id, &r_sharing, v).await?;
tracing::info!("{:?}", &rv_share);
if ctx.role() == crate::helpers::Role::H1 {
SH1.set(rv_share.right()).unwrap();
tracing::info!("sent sh1 = {:?}", rv_share.right());
}
let rv = Fp32BitPrime::from_array(
&malicious_reveal(ctx.narrow(&Step::RevealR), record_id, None, &rv_share)
.await?
.expect("full reveal should always return a value"),
);

Ok(rv == Fp32BitPrime::ZERO)
}

#[cfg(all(test, unit_test))]
mod tests {
use futures_util::future::try_join3;
use std::{
future::ready,
pin::Pin,
sync::{Arc, Mutex},
};

use futures::{
future::{join, try_join3},
Future, FutureExt,
};
use generic_array::GenericArray;
use once_cell::sync::OnceCell;
use rand::Rng;
use tokio::sync::Notify;
use typenum::U4;

use crate::{
error::Error,
ff::{Fp31, PrimeField, U128Conversions},
protocol::{basics::check_zero, context::Context, RecordId},
ff::{Fp31, Fp32BitPrime, PrimeField, Serializable, U128Conversions},
helpers::{
in_memory_config::{InspectContext, StreamInterceptor},
HelperIdentity, TransportIdentity,
},
protocol::{
basics::check_zero::{check_zero, check_zero_fp32bitprime},
context::Context,
RecordId,
},
rand::thread_rng,
secret_sharing::{IntoShares, SharedValue},
test_fixture::TestWorld,
test_fixture::{Runner, TestWorld, TestWorldConfig},
};

#[tokio::test]
Expand Down Expand Up @@ -124,4 +187,122 @@ mod tests {

Ok(())
}

pub struct NotifyOnceCell<T: Clone + Send + Sync> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is it different from https://docs.rs/tokio/latest/tokio/sync/struct.OnceCell.html? Maybe you could elaborate that in the documentation to this struct

inner: Mutex<NotifyOnceCellInner<T>>,
}

struct NotifyOnceCellInner<T: Clone + Send + Sync> {
cell: OnceCell<T>,
notify: Arc<Notify>,
}

impl<T: Clone + Send + Sync> NotifyOnceCell<T> {
pub fn new() -> Self {
Self {
inner: Mutex::new(NotifyOnceCellInner {
cell: OnceCell::new(),
notify: Arc::new(Notify::new()),
}),
}
}

pub fn set(&self, value: T) -> Result<(), T> {
let inner = self.inner.lock().unwrap();
inner.cell.set(value)?;
inner.notify.notify_waiters();
Ok(())
}

pub fn get(&self) -> Pin<Box<dyn Future<Output = T> + Send + '_>> {
let inner = self.inner.lock().unwrap();
if let Some(value) = inner.cell.get() {
return ready(value.clone()).boxed();
}
let notify = inner.notify.clone();
async move {
notify.notified().await;
self.inner.lock().unwrap().cell.get().unwrap().clone()
}
.boxed()
}
}

struct MaliciousCheckZeroInterceptor {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets explain what is the goal for this interceptor because it does seem very specific to check zero use case

sh2: NotifyOnceCell<Fp32BitPrime>,
}

impl MaliciousCheckZeroInterceptor {
fn new() -> Self {
Self {
sh2: NotifyOnceCell::new(),
}
}
}

impl StreamInterceptor for MaliciousCheckZeroInterceptor {
type Context = InspectContext;

fn peek<'a>(
&'a self,
ctx: &'a Self::Context,
data: &'a mut Vec<u8>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
if ctx
.gate
.as_ref()
.contains(super::Step::MultiplyWithR.as_ref())
&& ctx.identity == HelperIdentity::ONE
|| ctx.gate.as_ref().contains(super::Step::RevealR.as_ref())
&& ctx.identity == HelperIdentity::ONE
&& ctx.dest == HelperIdentity::TWO.as_str()
{
async {
assert_eq!(data.len(), 4);
let (sh1, sh2) = join(super::SH1.get(), self.sh2.get()).await;
tracing::info!("got shares: {sh1:?} {sh2:?}");
let adjusted_share = -sh1 - sh2;
tracing::info!("adjusted share {adjusted_share:?}");
adjusted_share.serialize(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added serialize_to_slice and deserialize_from_slice methods to Serializable trait (only for tests). I think you may want to use them here

<&mut GenericArray<u8, U4>>::try_from(data.as_mut_slice()).unwrap(),
);
}
.boxed()
} else if ctx.gate.as_ref().contains(super::Step::RevealR.as_ref())
&& ctx.identity == HelperIdentity::TWO
&& ctx.dest == HelperIdentity::ONE.as_str()
{
assert_eq!(data.len(), 4);
let sh2 = Fp32BitPrime::deserialize_unchecked(
<&GenericArray<u8, U4>>::try_from(data.as_slice()).unwrap(),
);
self.sh2.set(sh2).unwrap();
tracing::info!("sent sh2 = {sh2:?}");
ready(()).boxed()
} else {
ready(()).boxed()
}
}
}

#[tokio::test]
async fn malicious_check_zero() {
let mut config = TestWorldConfig::default();
config.stream_interceptor = Arc::new(MaliciousCheckZeroInterceptor::new());
let world = TestWorld::new_with(&config);
let mut rng = thread_rng();
let v = rng.gen::<Fp32BitPrime>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this test benefit from using a random value or we can use a constant here to avoid false negatives?


let [res0, res1, res2] = world
.semi_honest(v, |ctx, v| async move {
check_zero_fp32bitprime(ctx.set_total_records(1), RecordId::FIRST, &v)
.await
.unwrap()
})
.await;

assert_eq!(res0, false, "zero check failed on H1");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you prefer this syntax over assert!(!res0, "...")?

assert_eq!(res1, false, "zero check failed on H2");
assert_eq!(res2, false, "zero check failed on H3");
}
}
Loading
Loading