Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Milan multiple compression #11083

Merged
merged 7 commits into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"axios": "^0.26.0",
"fastest-json-copy": "^1.0.1",
"lodash": "^4.17.21",
"lz4js": "^0.2.0",
"msgpackr": "^1.4.7",
"pako": "^2.0.4",
"uuid": "^8.3.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import { IChannelFactory, IFluidDataStoreRuntime } from "@fluidframework/datastore-definitions";
import { SharedPropertyTree } from "./propertyTree";
import { DeflatedPropertyTreeFactory } from "./propertyTreeExtFactories";
import { DeflatedPropertyTreeFactory, LZ4PropertyTreeFactory } from "./propertyTreeExtFactories";

/**
* This class is the extension of SharedPropertyTree which compresses
Expand All @@ -20,3 +20,13 @@ export class DeflatedPropertyTree extends SharedPropertyTree {
return new DeflatedPropertyTreeFactory();
}
}

export class LZ4PropertyTree extends SharedPropertyTree {
public static create(runtime: IFluidDataStoreRuntime, id?: string, queryString?: string) {
return runtime.createChannel(id, DeflatedPropertyTreeFactory.Type) as LZ4PropertyTree;
Copy link
Contributor

@DLehenbauer DLehenbauer Jul 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this should be 'LZ4PropertyTreeFactory.Type'.

When loading a document, the type string is used to find the right factory to instantiate the DDS.

The way the unit test is written, the LZ4 and Deflate factories are never both registered in the same container instance, so the bug doesn't surface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DLehenbauer thank you for finding this

}

public static getFactory(): IChannelFactory {
return new LZ4PropertyTreeFactory();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,68 +2,133 @@
* Copyright (c) Microsoft Corporation and contributors. All rights reserved.
* Licensed under the MIT License.
*/
/* eslint-disable @typescript-eslint/no-unsafe-return */
import { deflate, inflate } from "pako";
import { compress, decompress } from "lz4js";
import { bufferToString, stringToBuffer } from "@fluidframework/common-utils";
import {
IChannelAttributes,
IFluidDataStoreRuntime,
IChannelServices,
IChannelFactory,
IChannel,
} from "@fluidframework/datastore-definitions";
import { IPropertyTreeMessage, ISharedPropertyTreeEncDec, ISnapshotSummary, SharedPropertyTreeOptions }
from "./propertyTree";
import { DeflatedPropertyTree } from "./propertyTreeExt";

function encodeSummary(snapshotSummary: ISnapshotSummary) {
const summaryStr = JSON.stringify(snapshotSummary);
const unzipped = new TextEncoder().encode(summaryStr);
const serializedSummary: Buffer = deflate(unzipped);
return serializedSummary;
import {
IPropertyTreeConfig, IPropertyTreeMessage, ISharedPropertyTreeEncDec,
ISnapshotSummary, SharedPropertyTree, SharedPropertyTreeOptions,
}
from "./propertyTree";
import { DeflatedPropertyTree, LZ4PropertyTree } from "./propertyTreeExt";

function decodeSummary(serializedSummary): ISnapshotSummary {
const unzipped = inflate(serializedSummary);
const summaryStr = new TextDecoder().decode(unzipped);
const snapshotSummary: ISnapshotSummary = JSON.parse(summaryStr);
return snapshotSummary;
}
/**
* This class contains builders of the compression methods used to compress
* of summaries and messages with the plugable compression algorithm.
*/
class CompressionMethods {
Copy link
Contributor

@DLehenbauer DLehenbauer Jul 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most JS developers would find a function more natural than a class here:

function createCompressionMethods(encoder, decoder) {
        return {
            messageEncoder: {
                encode: (...) => { ... },
                decode: (...) => { ... },
            },
            summaryEncoder: {
                encode: (...) => { ... },
                decode: (...) => { ... },
            },
        };
    };

Possibly this function should be part of CompressedPropertyTreeFactory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DLehenbauer The idea here is complete separation of compression methods from the CompressedPropertyTreeFactory. The only relevant method here is getEncDec and how it is implemented is the responsibility of the extended Factory class. CompressionMethods is only a helper, just a utility class which helps the extended factories to reduce the code if they want this behavior but that is only one way of many. Puristic approach would require here the method getEncDec to be abstract. I have chosen the default behavior using the CompressionMethods but did not want to bind them too closely with CompressedPropertyTreeFactory so I kept them outside in another util class.

Concerning the method usage instead of class usage, I can do it but it looks to me that it might produce messy code putting various responsibilities in one place (one method). I was trying to follow single responsibility pattern and separate the functionality in dedicated methods referencing them in the returned object. I could create 5 functions instead of the class for building encoders, decoders and createCompressionMethods but it looked to me more self explaining if I put them to one class.

Nevertheless I will follow your suggestions if you still think that it is more reasonable to do it by function within the CompressedPropertyTreeFactory.

public constructor(private readonly encodeFn, private readonly decodeFn) { }

private buildEncodeSummary() {
return (snapshotSummary: ISnapshotSummary): Buffer => {
const summaryStr = JSON.stringify(snapshotSummary);
const unzipped = new TextEncoder().encode(summaryStr);
const serializedSummary: Buffer = this.encodeFn(unzipped);
return serializedSummary;
};
}

function encodeMessage(change: IPropertyTreeMessage) {
const changeSetStr = JSON.stringify(change.changeSet);
const unzipped = new TextEncoder().encode(changeSetStr);
const zipped: Buffer = deflate(unzipped);
const zippedStr = bufferToString(zipped, "base64");
if (zippedStr.length < changeSetStr.length) {
// eslint-disable-next-line @typescript-eslint/dot-notation
change["isZipped"] = "1";
change.changeSet = zippedStr;
}
return change;
private buildDecodeSummary() {
return (serializedSummary): ISnapshotSummary => {
const unzipped = this.decodeFn(serializedSummary);
const summaryStr = new TextDecoder().decode(unzipped);
const snapshotSummary: ISnapshotSummary = JSON.parse(summaryStr);
return snapshotSummary;
};
}

private buildEncodeMessage() {
return (change: IPropertyTreeMessage) => {
const changeSetStr = JSON.stringify(change.changeSet);
const unzipped = new TextEncoder().encode(changeSetStr);
const zipped: Buffer = this.encodeFn(unzipped);
const zippedStr = bufferToString(zipped, "base64");
if (zippedStr.length < changeSetStr.length) {
// eslint-disable-next-line @typescript-eslint/dot-notation
change["isZipped"] = "1";
change.changeSet = zippedStr;
}
return change;
};
}

private buildDecodeMessage() {
return (transferChange: IPropertyTreeMessage) => {
// eslint-disable-next-line @typescript-eslint/dot-notation
if (transferChange["isZipped"]) {
const zipped = new Uint8Array(stringToBuffer(transferChange.changeSet, "base64"));
const unzipped = this.decodeFn(zipped);
const changeSetStr = new TextDecoder().decode(unzipped);
transferChange.changeSet = JSON.parse(changeSetStr);
}
return transferChange;
};
}

public buildEncDec(): ISharedPropertyTreeEncDec {
return {
messageEncoder: {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: @justus-camp has landed runtime-level per-op compression yesterday:
#11208

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing to this. I will go through. At first time I do not see any reusable code which we can use here or in general summary compression (such which would determine the compression algorithm based on configuration, environment or any further conditions). The API is dedicated to the fluid operation message : unpackRuntimeMessage(message: ISequencedDocumentMessage) and then hard-coding usage of lz4. So I still need to keep this implementation as is. I would like to point you to #11366 where we could discuss the reusability.

encode: this.buildEncodeMessage(),
decode: this.buildDecodeMessage(),
},
summaryEncoder: {
encode: this.buildEncodeSummary(),
decode: this.buildDecodeSummary(),
},
};
}
}

function decodeMessage(transferChange: IPropertyTreeMessage) {
// eslint-disable-next-line @typescript-eslint/dot-notation
if (transferChange["isZipped"]) {
const zipped = stringToBuffer(transferChange.changeSet, "base64");
const unzipped = inflate(zipped);
const changeSetStr = new TextDecoder().decode(unzipped);
transferChange.changeSet = JSON.parse(changeSetStr);
export abstract class CompressedPropertyTreeFactory implements IChannelFactory {
public abstract get attributes();
public abstract get type();
public abstract getEncodeFce();
public abstract getDecodeFce();
public getEncDec(): ISharedPropertyTreeEncDec {
const compressionMeths = new CompressionMethods(this.getEncodeFce(), this.getDecodeFce());
return compressionMeths.buildEncDec();
}
public abstract newPropertyTree(
id: string,
runtime: IFluidDataStoreRuntime,
attributes: IChannelAttributes,
options: SharedPropertyTreeOptions,
propertyTreeConfig: IPropertyTreeConfig): SharedPropertyTree;

public async load(
runtime: IFluidDataStoreRuntime,
id: string,
services: IChannelServices,
attributes: IChannelAttributes,
url?: string,
): Promise<SharedPropertyTree> {
const options = {};
const instance = this.newPropertyTree(id, runtime, attributes,
options as SharedPropertyTreeOptions
, { encDec: this.getEncDec() });
await instance.load(services);
return instance;
}

public create(document: IFluidDataStoreRuntime, id: string, requestUrl?: string): SharedPropertyTree {
const options = {};
const cell = this.newPropertyTree(id, document,
this.attributes, options as SharedPropertyTreeOptions,
{ encDec: this.getEncDec() });
cell.initializeLocal();
return cell;
}
return transferChange;
}

const encDec: ISharedPropertyTreeEncDec = {
messageEncoder: {
encode: encodeMessage,
decode: decodeMessage,
},
summaryEncoder: {
encode: encodeSummary,
decode: decodeSummary,
},
};

export class DeflatedPropertyTreeFactory implements IChannelFactory {
export class DeflatedPropertyTreeFactory extends CompressedPropertyTreeFactory {
public static readonly Type = "DeflatedPropertyTree:84534a0fe613522101f6";

public static readonly Attributes: IChannelAttributes = {
Expand All @@ -72,6 +137,20 @@ export class DeflatedPropertyTreeFactory implements IChannelFactory {
packageVersion: "0.0.1",
};

public async load(
runtime: IFluidDataStoreRuntime,
id: string,
services: IChannelServices,
attributes: IChannelAttributes,
url?: string,
): Promise<DeflatedPropertyTree> {
return await super.load(runtime, id, services, attributes, url) as DeflatedPropertyTree;
}

public create(document: IFluidDataStoreRuntime, id: string, requestUrl?: string): DeflatedPropertyTree {
return super.create(document, id, requestUrl);
}

public get type() {
return DeflatedPropertyTreeFactory.Type;
}
Expand All @@ -80,25 +159,57 @@ export class DeflatedPropertyTreeFactory implements IChannelFactory {
return DeflatedPropertyTreeFactory.Attributes;
}

public getEncodeFce() { return deflate; }
public getDecodeFce() { return inflate; }
public newPropertyTree(
id: string,
runtime: IFluidDataStoreRuntime,
attributes: IChannelAttributes,
options: SharedPropertyTreeOptions,
propertyTreeConfig: IPropertyTreeConfig): SharedPropertyTree {
return new DeflatedPropertyTree(id, runtime, attributes, options, propertyTreeConfig);
}
}

export class LZ4PropertyTreeFactory extends CompressedPropertyTreeFactory {
public static readonly Type = "LZ4PropertyTree:84534a0fe613522101f6";

public static readonly Attributes: IChannelAttributes = {
type: LZ4PropertyTreeFactory.Type,
snapshotFormatVersion: "0.1",
packageVersion: "0.0.1",
};

public async load(
runtime: IFluidDataStoreRuntime,
id: string,
services: IChannelServices,
attributes: IChannelAttributes,
url?: string,
): Promise<DeflatedPropertyTree> {
const options = {};
const instance = new DeflatedPropertyTree(id, runtime, attributes, options as SharedPropertyTreeOptions
, { encDec });
await instance.load(services);
return instance;
): Promise<LZ4PropertyTree> {
return await super.load(runtime, id, services, attributes, url) as DeflatedPropertyTree;
}

public create(document: IFluidDataStoreRuntime, id: string, requestUrl?: string): DeflatedPropertyTree {
const options = {};
const cell = new DeflatedPropertyTree(id, document,
this.attributes, options as SharedPropertyTreeOptions, { encDec });
cell.initializeLocal();
return cell;
public create(document: IFluidDataStoreRuntime, id: string, requestUrl?: string): LZ4PropertyTree {
return super.create(document, id, requestUrl);
}

public get type() {
return DeflatedPropertyTreeFactory.Type;
}
DLehenbauer marked this conversation as resolved.
Show resolved Hide resolved

public get attributes() {
return DeflatedPropertyTreeFactory.Attributes;
}

public getEncodeFce() { return compress; }
public getDecodeFce() { return decompress; }
public newPropertyTree(
id: string,
runtime: IFluidDataStoreRuntime,
attributes: IChannelAttributes,
options: SharedPropertyTreeOptions,
propertyTreeConfig: IPropertyTreeConfig): SharedPropertyTree {
return new LZ4PropertyTree(id, runtime, attributes, options, propertyTreeConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
TestFluidObjectFactory,
} from "@fluidframework/test-utils";
import { PropertyFactory, StringProperty, BaseProperty } from "@fluid-experimental/property-properties";
import { DeflatedPropertyTree } from "../propertyTreeExt";
import { DeflatedPropertyTree, LZ4PropertyTree } from "../propertyTreeExt";
import { SharedPropertyTree } from "../propertyTree";

describe("PropertyTree", () => {
Expand All @@ -39,6 +39,11 @@ describe("PropertyTree", () => {
describe("SharedPropertyTree", () => {
executePerPropertyTreeType(codeDetails, factory2, documentId, documentLoadUrl, propertyDdsId);
});

const factory3 = new TestFluidObjectFactory([[propertyDdsId, LZ4PropertyTree.getFactory()]]);
describe("LZ4PropertyTree", () => {
executePerPropertyTreeType(codeDetails, factory1, documentId, documentLoadUrl, propertyDdsId);
});
});
function executePerPropertyTreeType(codeDetails: IFluidCodeDetails,
factory: TestFluidObjectFactory, documentId: string, documentLoadUrl:
Expand Down