Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IcebergDocument as one implementation of VirtualDocument #3147

Open
wants to merge 59 commits into
base: master
Choose a base branch
from

Conversation

bobbai00
Copy link
Collaborator

@bobbai00 bobbai00 commented Dec 10, 2024

This PR introduces an implementation of result storage using Apache Iceberg.

How to enable the Iceberg result storage

Go to storage-config.yaml,

  • change result-storage-mode to iceberg
  • configure storage.iceberg.catalog.jdbc section,
iceberg:
    catalog:
      jdbc: # currently we only support storing catalog info via jdbc, i.e. https://iceberg.apache.org/docs/1.7.1/jdbc/
        url: "jdbc:mysql://localhost:3306/texera_iceberg?serverTimezone=UTC"
        username: ""
        password: ""

make sure the JDBC is accessible via the url, username, and password

Major changes

  • Introduced IcebergDocument: a thread-safe implementation of VirtualDocument for storing and reading results in Iceberg tables.
  • Introduced IcebergTableWriter: an append-only writer for Iceberg tables with configurable buffer size.
  • Added support for new configuration properties under storage.iceberg to specify catalog and table settings.

Introduced Dependencies

In workflow-core, some new packages are added

  • Iceberg-related packages
  • Hadoop common. The reason of adding this dependency is to pass the compilation: In the source code of iceberg-parquet, the line 160,
    although the file is not of type HadoopOutputFile, it still creats a Hadoop Configuration() as the placeholder. During the runtime, we don't have any dependency on Hadoop or HDFS.

Overview of the behavior IcebergDocument and IcebergWriter

  • IcebergDocument:

    • Handles reading and managing data in Iceberg tables.
    • Initializes the table during construction, creating it if it does not exist or overriding it if specified.
    • Supports iterator-based incremental read operations.
    • Thread-safe for read and clear operations.
  • IcebergTableWriter:

    • Writes data to Iceberg tables in an append-only manner.
    • Creates new Parquet files for every buffer flush, ensuring immutability.
    • Not thread-safe, so it should only be accessed by one thread at a time.

How the result will be stored via Iceberg tables

  • Given a storage key, a table named key will be created.
  • To append tuples to the table key, each worker will append immutable parquet files to the table's data space using IcebergTableWriter. To avoid the parquet filename collision, each worker will prefix its created file with ${workerIndex}_${fileIndex}, in which workerIndex is its index, and fileIndex is a number maintained that increased by 1 every time a new data file is created and flushed by the writer.
  • To read the tuples, the reader uses the iterator returned by IcebergDocument.get. This iterator can incrementally read new data while writers are appending tuples.

@bobbai00 bobbai00 self-assigned this Dec 10, 2024
@bobbai00 bobbai00 force-pushed the jiadong-add-file-result-storage branch 2 times, most recently from 6522779 to a83d779 Compare December 14, 2024 00:14
@bobbai00 bobbai00 force-pushed the jiadong-add-file-result-storage branch 2 times, most recently from 1edb551 to cef347b Compare December 21, 2024 02:56
@bobbai00 bobbai00 changed the title Add PartitionDocument and ItemizedFileDocument Add IcebergDocument as one implementation of VirtualDocument that can be used to store operator results Dec 22, 2024
@bobbai00 bobbai00 changed the title Add IcebergDocument as one implementation of VirtualDocument that can be used to store operator results Add IcebergDocument as one implementation of VirtualDocument Dec 22, 2024
# Conflicts:
#	core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala
#	core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala
#	core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/managed/ProgressiveSinkOpExec.scala
@transient lazy val catalog: Catalog = IcebergCatalogInstance.getInstance()

// During construction, create or override the table
synchronized {
Copy link
Collaborator

@shengquan-ni shengquan-ni Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this synchronized is unnecessary as it only locks this instance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants