Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Potential Parquet File Metadata Corruption After Process Timeout #879

Open
alordthorsen opened this issue Aug 22, 2023 · 7 comments
Open

Comments

@alordthorsen
Copy link

alordthorsen commented Aug 22, 2023

Describe the issue:
After hitting a timeout in an AWS Lambda I'm no longer able to read from a parquet file. I'm hitting this stack trace

python scripts/read_bucket.py
Traceback (most recent call last):
  File "/Users/alexlordthorsen/git/rates/flatten-rates-etl/scripts/read_bucket.py", line 5, in <module>
    pf = ParquetFile('prod-data-platform-data-lake-shippoprod-com/transformed/flattened_rates/', open_with=myopen)
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/fastparquet/api.py", line 153, in __init__
    self._parse_header(f, verify)
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/fastparquet/api.py", line 225, in _parse_header
    for rg in fmd[4]:
TypeError: 'NoneType' object is not iterable

and I believe my _metadata file is corrupted. I suspect we hit the timeout in the middle of the write command and potentially in the metadata write.

I attempted to upload the files for this report but github won't allow binary file uploads.

The contents of _metadata are just PAR1% which feels incorrect but I don't know enough about parquet to be able to know without digging deeper into the standard.

% /bin/cat ~/Downloads/_metadata
PAR1%
% hexdump ~/Downloads/_metadata
0000000 4150 3152
0000004

The _common_metadata file looks more correct to me (I have a couple hundred columns in my case so I'm not going to post here unless that's required but a sample from the end of the file looks like

"field_name": "shipment__is_test", "metadata": null, "name": "shipment__is_test", "numpy_type": "bool", "pandas_type": "bool"}], "creator": {"library": "fastparquet", "version": "2023.7.0"}, "index_columns": [{"kind": "range", "name": null, "start": 0, "step": 1, "stop": 9}], "pandas_version": "2.0.3", "partition_columns": [{"field_name": "year", "metadata": null, "name": "year", "numpy_type": "int64", "pandas_type": "int64"}, {"field_name": "month", "metadata": null, "name": "month", "numpy_type": "int64", "pandas_type": "int64"}]}-fastparquet-python version 2023.7.0 (build 0)PAR1

Minimal Complete Verifiable Example:

import s3fs
from fastparquet import ParquetFile
s3 = s3fs.S3FileSystem()
myopen = s3.open
pf = ParquetFile("my_s3_bucket", open_with=myopen)
breakpoint()
df = pf.to_pandas()

Anything else we need to know?:
I'm looking for some way to recover this metadata file I think. either on the fly as part of my write process or as a script I can run manually.

I plan to avoid timeouts like this in the future but I'm wondering if there's anything I can do to help ensure I don't hit this issue again.

Environment:

  • Dask version: Using fastparquet directly
  • fastparquet: 2023.7.0
  • Python version: 3.9
  • Operating System: Amazon Linux 2
  • Install method (conda, pip, source): Pip
@martindurant
Copy link
Member

Indeed, the metadata should have more than just b"PAR1", which is just the magic marker saying this is a parquet file. The utility function fastparquet.writer.merge should help you recreate it. However, you can also delete the corrupted file, it is not essential, but rather an optimisation.

@alordthorsen
Copy link
Author

I'm going to use fastparquet.writer.merge just to increase my own understanding of how things work for now.

If I have an S3 structure like

transformed/flattened_rates/_common_metadata
transformed/flattened_rates/_metadata
transformed/flattened_rates/year=2023/month=8/part.0.parquet
transformed/flattened_rates/year=2023/month=8/part.1.parquet
transformed/flattened_rates/year=2023/month=8/part.10.parquet
transformed/flattened_rates/year=2023/month=8/part.100.parquet
transformed/flattened_rates/year=2023/month=8/part.101.parquet
transformed/flattened_rates/year=2023/month=8/part.102.parquet

Does that mean I should call

fastparquet.writer.merge(s3_paths, open_with=s3.open, root="transformed/flattened_rates/")

in this particular case?

@martindurant
Copy link
Member

martindurant commented Aug 22, 2023 via email

@alordthorsen
Copy link
Author

alordthorsen commented Aug 22, 2023

I originally was hitting

Traceback (most recent call last):
  File "/Users/alexlordthorsen/git/rates/flatten-rates-etl/scripts/recover_corrupted_metadata_file.py", line 47, in <module>
    app()
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/typer/main.py", line 328, in __call__
    raise e
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/typer/main.py", line 311, in __call__
    return get_command(self)(*args, **kwargs)
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/typer/core.py", line 716, in main
    return _main(
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/typer/core.py", line 216, in _main
    rv = self.invoke(ctx)
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/typer/main.py", line 683, in wrapper
    return callback(**use_params)  # type: ignore
  File "/Users/alexlordthorsen/git/rates/flatten-rates-etl/scripts/recover_corrupted_metadata_file.py", line 39, in main
    attempt_recovery(parquet_file_keys, metadata_root=s3_key)
  File "/Users/alexlordthorsen/git/rates/flatten-rates-etl/scripts/recover_corrupted_metadata_file.py", line 23, in attempt_recovery
    merge(object_paths, open_with=myopen, root=metadata_root)
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/fastparquet/writer.py", line 1465, in merge
    out = ParquetFile(file_list, verify_schema, open_with, root)
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/fastparquet/api.py", line 124, in __init__
    basepath, fmd = metadata_from_many(fn, verify_schema=verify,
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/fastparquet/util.py", line 217, in metadata_from_many
    basepath, file_list = analyse_paths(file_list, root=root)
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/fastparquet/util.py", line 368, in analyse_paths
    assert all(p[:l] == basepath for p in path_parts_list
AssertionError: All paths must begin with the given root

with a root argument of 'transformed/flattened_rates' and it took me a second to figure out I needed to use {s3_bucket}/{s3_path} and not just the s3_path.

I think I have this working

% python3 scripts/read_bucket.py --s3-bucket=eng-40472-test-bucket --s3-key=transformed/flattened_rates
Traceback (most recent call last):
  File "/Users/alexlordthorsen/git/rates/flatten-rates-etl/scripts/read_bucket.py", line 17, in <module>
    app()
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/typer/main.py", line 328, in __call__
    raise e
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/typer/main.py", line 311, in __call__
    return get_command(self)(*args, **kwargs)
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/typer/core.py", line 716, in main
    return _main(
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/typer/core.py", line 216, in _main
    rv = self.invoke(ctx)
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/typer/main.py", line 683, in wrapper
    return callback(**use_params)  # type: ignore
  File "/Users/alexlordthorsen/git/rates/flatten-rates-etl/scripts/read_bucket.py", line 13, in main
    pf = ParquetFile(s3_path, open_with=myopen)
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/fastparquet/api.py", line 153, in __init__
    self._parse_header(f, verify)
  File "/Users/alexlordthorsen/.venvs/data_platform_39/lib/python3.9/site-packages/fastparquet/api.py", line 225, in _parse_header
    for rg in fmd[4]:
TypeError: 'NoneType' object is not iterable
% python3 scripts/recover_corrupted_metadata_file.py --s3-bucket=eng-40472-test-bucket --s3-key=transformed/flattened_rates
Metadata file has been recovered.

@martindurant would you be open to a PR that changes these TypeErrors into specific error types with more specific messages and a doc change to write.merge to explain that root needs to be a full s3 path if you're operating on s3 objects?

@martindurant
Copy link
Member

Yes, catching that error would be fine. It essentially means parsing failed.

@rawrgulmuffins
Copy link

rawrgulmuffins commented Aug 23, 2023

Hmmmm, I'm still hitting this error even after a rebuild.

I'm now re-examining this error in my code

    except s3fs.utils.FileExpired as error:
        # We've hit an issue where the `_metadata` file for this data set has
        # been modified in the middle of the `write` operation. We now need to
        # attempt to write our rows to the data set with the understanding that
        # the metadata file might be modified again mid-write.
        successful_write = False
        while not successful_write:
            logger.warning(
                f"Metadata File Was Updated And Deleted mid-write. Most likely this "
                f"means a new column was added by another process and we need to "
                f"reload the parquet schema before we commit our data. Exact error: "
                f"{error}"
            )
            s3_fs.invalidate_cache()
            try:
                write(
                    full_s3_path,
                    df,
                    file_scheme="hive",
                    append=True,
                    partition_on=partition_by_keys,
                    object_encoding=parquet_schema,
                    open_with=myopen,
                    mkdirs=noop,
                    stats=True,
                )

Looking at the timing of when the error is restarting and when I'm seeing these logs I'm pretty sure this is my core issue
Screenshot 2023-08-23 at 1 10 28 AM
Screenshot 2023-08-23 at 9 56 57 AM

I originally grabbed this idea from here

@martindurant
Copy link
Member

I guess the solution is to write the metadata file only one when everything else is done, as dask does. Or maybe remove it...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants