Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(java): Invoke callback on read failure #1870

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions java/fury-core/src/main/java/org/apache/fury/Fury.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.fury.serializer.ArraySerializers;
import org.apache.fury.serializer.BufferCallback;
import org.apache.fury.serializer.BufferObject;
import org.apache.fury.serializer.FieldMismatchCallback;
import org.apache.fury.serializer.OpaqueObjects;
import org.apache.fury.serializer.PrimitiveSerializers.LongSerializer;
import org.apache.fury.serializer.Serializer;
Expand Down Expand Up @@ -1584,6 +1585,10 @@ public boolean isBasicTypesRefIgnored() {
return config.isBasicTypesRefIgnored();
}

public FieldMismatchCallback getFieldMismatchCallback() {
return config.getFieldMismatchCallback();
}

public boolean checkClassVersion() {
return config.checkClassVersion();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import static org.apache.fury.builder.Generated.GeneratedMetaSharedSerializer.SERIALIZER_FIELD_NAME;

import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Map;
import java.util.SortedMap;
Expand All @@ -36,6 +37,7 @@
import org.apache.fury.meta.ClassDef;
import org.apache.fury.reflect.TypeRef;
import org.apache.fury.serializer.CodegenSerializer;
import org.apache.fury.serializer.FieldMismatchCallback;
import org.apache.fury.serializer.MetaSharedSerializer;
import org.apache.fury.serializer.ObjectSerializer;
import org.apache.fury.serializer.Serializer;
Expand Down Expand Up @@ -190,11 +192,49 @@ protected Expression createRecord(SortedMap<Integer, Expression> recordComponent
@Override
protected Expression setFieldValue(Expression bean, Descriptor descriptor, Expression value) {
if (descriptor.getField() == null) {
// Field doesn't exist in current class, skip set this field value.
// Note that the field value shouldn't be an inlined value, otherwise field value read may
// be ignored.
// Add an ignored call here to make expression type to void.
return new Expression.StaticInvoke(ExceptionUtils.class, "ignore", value);
FieldMismatchCallback.FieldAdjustment adjustment =
fury.getFieldMismatchCallback()
.onMismatch(beanClass, descriptor.getTypeName(), descriptor.getName());

if (adjustment == null) {
// Field doesn't exist in current class, skip set this field value.
// Note that the field value shouldn't be an inlined value, otherwise field value read may
// be ignored.
// Add an ignored call here to make expression type to void.
return new Expression.StaticInvoke(ExceptionUtils.class, "ignore", value);
} else {

Field newTargetField = adjustment.getTargetField();

// Field doesn't exist in current class, invoke field mismatch callback.
Expression fieldMismatchCallback =
Expression.Invoke.inlineInvoke(
new Expression.Reference(FURY_NAME, TypeRef.of(Fury.class)),
"getFieldMismatchCallback",
TypeRef.of(FieldMismatchCallback.class),
/* needTryCatch */ false);
Comment on lines +209 to +215
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a private field, instead of being instantiated every time. I'm not used to this kind of metaprogramming so I need to figure out how to do this.


Expression onMismatch =
Expression.Invoke.inlineInvoke(
fieldMismatchCallback,
"onMismatch",
TypeRef.of(FieldMismatchCallback.FieldAdjustment.class),
new Expression.StaticInvoke(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use org.apache.fury.builder.CodecBuilder#beanClassExpr(java.lang.Class<?>) instead

Class.class,
"forName",
TypeRef.of(Class.class),
true,
new Literal(beanClass.getName())),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Literal.ofXXX instead

new Literal(descriptor.getTypeName()),
new Literal(descriptor.getName()));

Expression invokeHandler =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The onMismatch can be cached as a field using getOrCreateField function

new Expression.Invoke(onMismatch, "adjustValue", TypeRef.of(Object.class), value);

Descriptor updatedDescriptor = new Descriptor(newTargetField, null, null, null);

return super.setFieldValue(bean, updatedDescriptor, invokeHandler);
}
}
return super.setFieldValue(bean, descriptor, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.fury.Fury;
import org.apache.fury.meta.MetaCompressor;
import org.apache.fury.serializer.FieldMismatchCallback;
import org.apache.fury.serializer.Serializer;
import org.apache.fury.serializer.TimeSerializers;
import org.apache.fury.util.Preconditions;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class Config implements Serializable {
private final boolean scalaOptimizationEnabled;
private transient int configHash;
private final boolean deserializeNonexistentEnumValueAsNull;
private final FieldMismatchCallback fieldMismatchCallback;

public Config(FuryBuilder builder) {
name = builder.name;
Expand Down Expand Up @@ -93,6 +95,7 @@ public Config(FuryBuilder builder) {
asyncCompilationEnabled = builder.asyncCompilationEnabled;
scalaOptimizationEnabled = builder.scalaOptimizationEnabled;
deserializeNonexistentEnumValueAsNull = builder.deserializeNonexistentEnumValueAsNull;
fieldMismatchCallback = builder.fieldMismatchCallback;
}

/** Returns the name for Fury serialization. */
Expand Down Expand Up @@ -255,6 +258,10 @@ public boolean isScalaOptimizationEnabled() {
return scalaOptimizationEnabled;
}

public FieldMismatchCallback getFieldMismatchCallback() {
return fieldMismatchCallback;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.fury.pool.ThreadPoolFury;
import org.apache.fury.reflect.ReflectionUtils;
import org.apache.fury.resolver.ClassResolver;
import org.apache.fury.serializer.FieldMismatchCallback;
import org.apache.fury.serializer.JavaSerializer;
import org.apache.fury.serializer.ObjectStreamSerializer;
import org.apache.fury.serializer.Serializer;
Expand Down Expand Up @@ -83,6 +84,14 @@ public final class FuryBuilder {
boolean suppressClassRegistrationWarnings = true;
boolean deserializeNonexistentEnumValueAsNull = false;
MetaCompressor metaCompressor = new DeflaterMetaCompressor();
FieldMismatchCallback fieldMismatchCallback =
new FieldMismatchCallback() {
@Override
public FieldMismatchCallback.FieldAdjustment onMismatch(
Class<?> modifiedClass, String deserializedTypeName, String deserializedFieldName) {
return null;
}
};

public FuryBuilder() {}

Expand Down Expand Up @@ -336,6 +345,11 @@ public FuryBuilder withName(String name) {
return this;
}

public FuryBuilder withFieldMismatchCallback(FieldMismatchCallback callback) {
this.fieldMismatchCallback = callback;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it can be confusing that the callback can be overridden, not added on top of already registered ones?

return this;
}

private void finish() {
if (classLoader == null) {
classLoader = Thread.currentThread().getContextClassLoader();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.apache.fury.serializer;

import java.lang.reflect.Field;

/** Callback interface for handling field mismatch during deserialization. */
public interface FieldMismatchCallback {

/**
* Called when a field mismatch is detected during deserialization.
*
* @param modifiedClass The class that is being deserialized
* @param deserializedTypeName The name of the type that was deserialized
* @param deserializedFieldName The name of the field that was deserialized
* @return A FieldAdjustment that contains the target Field and a method to map the deserialized
* value to the target field.
*/
FieldAdjustment onMismatch(
Class<?> modifiedClass, String deserializedTypeName, String deserializedFieldName);

abstract class FieldAdjustment {
private final Field targetField;

public FieldAdjustment(Field targetField) {
this.targetField = targetField;
}

public Field getTargetField() {
return targetField;
}

public abstract Object adjustValue(Object deserializedValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,30 @@ public T read(MemoryBuffer buffer) {
ObjectSerializer.FinalTypeField fieldInfo = finalFields[i];
boolean isFinal = this.isFinal[i];
FieldAccessor fieldAccessor = fieldInfo.fieldAccessor;
if (fieldAccessor != null) {
short classId = fieldInfo.classId;
short classId = fieldInfo.classId;

if (fieldAccessor == null) {
FieldMismatchCallback.FieldAdjustment fieldAdjustment =
fury.getFieldMismatchCallback()
.onMismatch(
type,
fieldInfo.classInfo.getCls().getName(),
fieldInfo.qualifiedFieldName.substring(
fieldInfo.qualifiedFieldName.lastIndexOf('.') + 1));
FieldAccessor fallbackAccessor = null;
try {
fallbackAccessor = FieldAccessor.createAccessor(fieldAdjustment.getTargetField());
} catch (RuntimeException ignored) {
fallbackAccessor = null;
}

if (fallbackAccessor != null) {
Object fieldValue =
ObjectSerializer.readFieldValue(
fury, refResolver, classResolver, fieldInfo, isFinal, buffer, classId);
fallbackAccessor.putObject(obj, fieldAdjustment.adjustValue(fieldValue));
}
} else {
if (ObjectSerializer.readPrimitiveFieldValueFailed(
fury, buffer, obj, fieldAccessor, classId)
&& ObjectSerializer.readBasicObjectFieldValueFailed(
Expand All @@ -160,16 +182,6 @@ public T read(MemoryBuffer buffer) {
fury, refResolver, classResolver, fieldInfo, isFinal, buffer);
fieldAccessor.putObject(obj, fieldValue);
}
} else {
if (skipPrimitiveFieldValueFailed(fury, fieldInfo.classId, buffer)) {
if (fieldInfo.classInfo == null) {
// TODO(chaokunyang) support registered serializer in peer with ref tracking disabled.
fury.readRef(buffer, classInfoHolder);
} else {
ObjectSerializer.readFinalObjectFieldValue(
fury, refResolver, classResolver, fieldInfo, isFinal, buffer);
}
}
}
}
for (ObjectSerializer.GenericTypeField fieldInfo : otherFields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,135 @@ static boolean readBasicObjectFieldValueFailed(
}
}

static Object readFieldValue(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a rather big method; I'm not sure if HotSpot can compile this. In any case, this shouldn't be in the hot path.

Fury fury,
RefResolver refResolver,
ClassResolver classResolver,
FinalTypeField fieldInfo,
boolean isFinal,
MemoryBuffer buffer,
short classId) {
switch (classId) {
case ClassResolver.PRIMITIVE_BOOLEAN_CLASS_ID:
return buffer.readBoolean();
case ClassResolver.PRIMITIVE_BYTE_CLASS_ID:
return buffer.readByte();
case ClassResolver.PRIMITIVE_CHAR_CLASS_ID:
return buffer.readChar();
case ClassResolver.PRIMITIVE_SHORT_CLASS_ID:
return buffer.readInt16();
case ClassResolver.PRIMITIVE_INT_CLASS_ID:
if (fury.compressInt()) {
return buffer.readVarInt32();
} else {
return buffer.readInt32();
}
case ClassResolver.PRIMITIVE_FLOAT_CLASS_ID:
return buffer.readFloat32();
case ClassResolver.PRIMITIVE_LONG_CLASS_ID:
return fury.readInt64(buffer);
case ClassResolver.PRIMITIVE_DOUBLE_CLASS_ID:
return buffer.readFloat64();
case ClassResolver.STRING_CLASS_ID:
return fury.readJavaStringRef(buffer);
case ClassResolver.BOOLEAN_CLASS_ID:
{
if (buffer.readByte() == Fury.NULL_FLAG) {
return null;
} else if (fury.isBasicTypesRefIgnored()) {
return readFinalObjectFieldValue(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this branch can be removed

fury, refResolver, classResolver, fieldInfo, isFinal, buffer);
} else {
return buffer.readBoolean();
}
}
case ClassResolver.BYTE_CLASS_ID:
{
if (buffer.readByte() == Fury.NULL_FLAG) {
return null;
} else if (fury.isBasicTypesRefIgnored()) {
return readFinalObjectFieldValue(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

fury, refResolver, classResolver, fieldInfo, isFinal, buffer);
} else {
return buffer.readByte();
}
}
case ClassResolver.CHAR_CLASS_ID:
{
if (buffer.readByte() == Fury.NULL_FLAG) {
return null;
} else if (fury.isBasicTypesRefIgnored()) {
return readFinalObjectFieldValue(
fury, refResolver, classResolver, fieldInfo, isFinal, buffer);
} else {
return buffer.readChar();
}
}
case ClassResolver.SHORT_CLASS_ID:
{
if (buffer.readByte() == Fury.NULL_FLAG) {
return null;
} else if (fury.isBasicTypesRefIgnored()) {
return readFinalObjectFieldValue(
fury, refResolver, classResolver, fieldInfo, isFinal, buffer);
} else {
return buffer.readInt16();
}
}
case ClassResolver.INTEGER_CLASS_ID:
{
if (buffer.readByte() == Fury.NULL_FLAG) {
return null;
} else if (fury.isBasicTypesRefIgnored()) {
return readFinalObjectFieldValue(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

fury, refResolver, classResolver, fieldInfo, isFinal, buffer);
} else {
if (fury.compressInt()) {
return buffer.readVarInt32();
} else {
return buffer.readInt32();
}
}
}
case ClassResolver.FLOAT_CLASS_ID:
{
if (buffer.readByte() == Fury.NULL_FLAG) {
return null;
} else if (fury.isBasicTypesRefIgnored()) {
return readFinalObjectFieldValue(
fury, refResolver, classResolver, fieldInfo, isFinal, buffer);
} else {
return buffer.readFloat32();
}
}
case ClassResolver.LONG_CLASS_ID:
{
if (buffer.readByte() == Fury.NULL_FLAG) {
return null;
} else if (fury.isBasicTypesRefIgnored()) {
return readFinalObjectFieldValue(
fury, refResolver, classResolver, fieldInfo, isFinal, buffer);
} else {
return fury.readInt64(buffer);
}
}
case ClassResolver.DOUBLE_CLASS_ID:
{
if (buffer.readByte() == Fury.NULL_FLAG) {
return null;
} else if (fury.isBasicTypesRefIgnored()) {
return readFinalObjectFieldValue(
fury, refResolver, classResolver, fieldInfo, isFinal, buffer);
} else {
return buffer.readFloat64();
}
}
default:
return readFinalObjectFieldValue(
fury, refResolver, classResolver, fieldInfo, isFinal, buffer);
}
}

public static int computeVersionHash(Collection<Descriptor> descriptors) {
// TODO(chaokunyang) use murmurhash
List<Integer> list = new ArrayList<>();
Expand Down
Loading