Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stricter CSV Reader #385

Open
TimPossiblee opened this issue Sep 25, 2024 · 5 comments
Open

Stricter CSV Reader #385

TimPossiblee opened this issue Sep 25, 2024 · 5 comments
Labels
enhancement New feature or request

Comments

@TimPossiblee
Copy link

Feature Description

I would like to request the following improvements regarding CSV loading.

  1. Fail on different number of columns per file

    When for example the delimiter is part of the data, different rows will have different amount of columns.
    I would like to have an option to Error when not every row has the same number of columns, instead of omitting or creating dummy columns.
    Different number of columns accross files should still be allowed.
    The expected number of columns should be taken from the csv header or first row of data.

  2. Preserve raw values when column is defined as string

    I would expect sling to preserve the original value of a column when it is defined as string e.g. columns={"*": "string"}.
    But even with SAMPLE_SIZE=0 the analyzer still seems to be running on the first row of data.
    Disabling data sampling when set to 0 would also be an option.

    See how in the example in the first column in the first row is missing the whitespace.

  3. SourceOptions.TrimSpace seems to be a no-op

    If I want to trim all whitespaces above the SAMPLE_SIZE value trim_space=true seems to have no effect. Specifying it as a transformation works, is this the intended way?

  • Version: 1.2.20
  • Operating System: linux
  • Sample CSV:
col1;col2;col3
 col1_data ;col2_data;col3_data
 col1_data ;col2_data;with_delimiter;col3_data
 col1_data ;col2_data
  • Replication Configuration:
source: LOCAL
target: SNOWFLAKE

defaults:
  mode: full-refresh
  single: true

  source_options:
    format: csv
    header: true
    delimiter: ';'
    trim_space: true
    columns:
      "*": string

  target_options:
    column_casing: target

streams:
  "file://./test.csv":
    object: 'PUBLIC.CSV_TEST'

env:
  SLING_STREAM_URL_COLUMN: true
  SAMPLE_SIZE: 0
  • Output in Snowflake:
COL1 COL2 COL3 _SLING_STREAM_URL
col1_data col2_data col3_data file://./test.csv
\scol1_data\s col2_data with_delimiter file://./test.csv
\scol1_data\s col2_data NULL file://./test.csv
@TimPossiblee TimPossiblee added the enhancement New feature or request label Sep 25, 2024
@flarco
Copy link
Collaborator

flarco commented Nov 30, 2024

2 & 3 are done, feel free to try in the latest dev build.
for # 3, the source_options.trim_space has been removed, will only available via transform (trim_space).

1 is a bit trickier...

@flarco
Copy link
Collaborator

flarco commented Nov 30, 2024

@TimPossiblee actually, can you try source-option below? fields_per_rec. It was undocumented, but I think that should work for your case.

image

@flarco
Copy link
Collaborator

flarco commented Nov 30, 2024

It gets passed to the Go CSV parser

image

The default is -1, so no check is made. If you specify a positive number, and the number of columns doesn't match, it will fail.

@TimPossiblee
Copy link
Author

  • Sling version (sling --version): 1.3.1

  • Operating System (linux, mac, windows): linux

  • Replication Configuration:

source: LOCAL
target: SNOWFLAKE

env:
  SLING_STREAM_URL_COLUMN: true
  SAMPLE_SIZE: 0

defaults:
  mode: full-refresh
  single: true

  source_options:
    format: csv
    header: true
    delimiter: ;
    fields_per_rec: 0

  target_options:
    column_casing: target

streams:
  "file://./test.csv":
    object: 'PUBLIC.CSV_TEST'
Log Output
2024-12-02 15:18:20 DBG opened "file" connection (conn-file-DnJ)
2024-12-02 15:18:20 INF Sling Replication | LOCAL -> SNOWFLAKE | file://./test.csv
2024-12-02 15:18:20 DBG Sling version: 1.3.1 (linux amd64)
2024-12-02 15:18:20 DBG type is file-db
2024-12-02 15:18:20 DBG using: {"columns":null,"mode":"full-refresh","transforms":null}
2024-12-02 15:18:20 DBG using source options: {"empty_as_null":true,"header":true,"fields_per_rec":0,"compression":"auto","format":"csv","null_if":"NULL","datetime_format":"AUTO","skip_blank_lines":false,"delimiter":";","max_decimals":-1}
2024-12-02 15:18:20 DBG using target options: {"datetime_format":"auto","file_max_rows":0,"file_max_bytes":0,"max_decimals":-1,"use_bulk":true,"add_new_columns":true,"adjust_column_type":false,"column_casing":"target"}
2024-12-02 15:18:20 DBG opened "snowflake" connection (conn-snowflake-iUc)
2024-12-02 15:18:21 INF connecting to target database (snowflake)
2024-12-02 15:18:21 INF reading from source file system (file)
2024-12-02 15:18:21 DBG opened "file" connection (conn-file-HDx)
2024-12-02 15:18:21 DBG reading single datastream from file://./test.csv [format=csv]
2024-12-02 15:18:21 DBG merging csv readers of 1 files [concurrency=3] from file://./test.csv
2024-12-02 15:18:21 DBG processing reader from file://./test.csv
2024-12-02 15:18:21 INF writing to target database [mode: full-refresh]
2024-12-02 15:18:21 DBG drop table if exists "PUBLIC"."CSV_TEST_TMP"
2024-12-02 15:18:21 DBG table "PUBLIC"."CSV_TEST_TMP" dropped
2024-12-02 15:18:21 DBG create transient table "PUBLIC"."CSV_TEST_TMP" ("COL1" varchar,
"COL2" varchar,
"COL3" varchar,
"_SLING_LOADED_AT" timestamp_tz,
"_SLING_STREAM_URL" varchar)
2024-12-02 15:18:21 INF created table "PUBLIC"."CSV_TEST_TMP"
2024-12-02 15:18:21 INF streaming data
2024-12-02 15:18:22 DBG USE SCHEMA PUBLIC
2024-12-02 15:18:22 DBG opened "file" connection (conn-file-4Ck)
2024-12-02 15:18:22 DBG writing to file:///home/tim/gitbox/eds-warehouse.git/v2/orchestrate/sling/tmp/c384ed37/snowflake/put/public.csv_test_tmp/2024-12-02T151822.003 [fileRowLimit=0 fileBytesLimit=0 compression= concurrency=7 useBufferedStream=false fileFormat=csv singleFile=true]
2024-12-02 15:18:22 DBG REMOVE @PUBLIC.sling_staging/public.csv_test_tmp/2024-12-02T151822.003
2024-12-02 15:18:22 DBG PUT 'file:///home/tim/gitbox/eds-warehouse.git/v2/orchestrate/sling/tmp/c384ed37/snowflake/put/public.csv_test_tmp/2024-12-02T151822.003' @PUBLIC.sling_staging/public.csv_test_tmp/2024-12-02T151822.003 PARALLEL=8 AUTO_COMPRESS=FALSE
2024-12-02 15:18:22 DBG COPY INTO "PUBLIC"."CSV_TEST_TMP" ("COL1", "COL2", "COL3", "_SLING_LOADED_AT", "_SLING_STREAM_URL")
from ( 
  select T.$1, T.$2, T.$3, T.$4, T.$5
  from @PUBLIC.sling_staging/public.csv_test_tmp/2024-12-02T151822.003 as T
)
FILE_FORMAT = (
  TYPE = CSV
  RECORD_DELIMITER = '\n'
  ESCAPE_UNENCLOSED_FIELD = NONE
  FIELD_OPTIONALLY_ENCLOSED_BY = '0x22'
  EMPTY_FIELD_AS_NULL = FALSE
  NULL_IF = '\\N'
  SKIP_HEADER = 1
  REPLACE_INVALID_CHARACTERS = TRUE
)
ON_ERROR = ABORT_STATEMENT
2024-12-02 15:18:22 DBG 
+-------------------------------------------------------------------------------+--------+-------------+-------------+
| FILE                                                                          | STATUS | ROWS_LOADED | ERRORS_SEEN |
+-------------------------------------------------------------------------------+--------+-------------+-------------+
| sling_staging/public.csv_test_tmp/2024-12-02T151822.003/2024-12-02T151822.003 | LOADED |           3 |           0 |
+-------------------------------------------------------------------------------+--------+-------------+-------------+
2024-12-02 15:18:22 DBG select count(*) cnt from "PUBLIC"."CSV_TEST_TMP"
2024-12-02 15:18:23 DBG drop table if exists "PUBLIC"."CSV_TEST"
2024-12-02 15:18:23 DBG table "PUBLIC"."CSV_TEST" dropped
2024-12-02 15:18:23 DBG create table "PUBLIC"."CSV_TEST" ("COL1" varchar,
"COL2" varchar,
"COL3" varchar,
"_SLING_LOADED_AT" timestamp_tz,
"_SLING_STREAM_URL" varchar)
2024-12-02 15:18:23 INF created table "PUBLIC"."CSV_TEST"
2024-12-02 15:18:23 DBG insert into "PUBLIC"."CSV_TEST" ("COL2", "COL3", "_SLING_LOADED_AT", "_SLING_STREAM_URL") select "COL2", "COL3", "_SLING_LOADED_AT", "_SLING_STREAM_URL" from "PUBLIC"."CSV_TEST_TMP"
2024-12-02 15:18:23 DBG inserted rows into "PUBLIC"."CSV_TEST" from temp table "PUBLIC"."CSV_TEST_TMP"
2024-12-02 15:18:23 INF inserted 3 rows into "PUBLIC"."CSV_TEST" in 3 secs [1 r/s]
2024-12-02 15:18:23 DBG drop table if exists "PUBLIC"."CSV_TEST_TMP"
2024-12-02 15:18:24 DBG table "PUBLIC"."CSV_TEST_TMP" dropped
2024-12-02 15:18:24 DBG closed "snowflake" connection (conn-snowflake-iUc)
2024-12-02 15:18:24 INF execution succeeded

2024-12-02 15:18:24 INF Sling Replication Completed in 3s | LOCAL -> SNOWFLAKE | 1 Successes | 0 Failures
  • Sample CSV:
col1;col2;col3
 col1_data ; col2_data ;col3_data
 col1_data ;" col2_data;with_delimiter ";col3_data
 col1_data ; col2_data ;
  • Output in Snowflake:
COL1 COL2 COL3 _SLING_LOADED_AT _SLING_STREAM_URL
NULL col2_data col3_data 1733147183 file://./test.csv
NULL \scol2_data;with_delimiter\s col3_data 1733147183 file://./test.csv
NULL \scol2_data\s NULL 1733147183 file://./test.csv
  • Comment:

I updated sling to the latest version and adjusted the yaml config.

Point 1:
fields_per_rec: 0 seems to work exactly like I wanted.
I could not load the original sample file without adjusting it accordingly (good).

Point 2:
I chose to use SAMPLE_SIZE: 0 to test the change.
First thing I noticed is that the first column is omitted in the final insert statement (visible in the logs).
If SAMPLE_SIZE is set to 1 or higher the insert statement is correct.

Secondly I could not see a difference to before. The first row is trimmed while the other two rows keep their whitespace (see col2).
Maybe there is an other reason for this and datatype detection has nothing to do with this?
Maybe it's because of this line

if ds.it.Counter == 1 && !ds.NoDebug {

  • Side note:

Using SLING_SAMPLE_SIZE instead of the now legacy SAMPLE_SIZE is not doing anything for me.
It's like not setting it at all.

@TimPossiblee
Copy link
Author

v1.3.2 almost works

SAMPLE_SIZE: 0 + no explicit column casting == NULL values for the first row
SAMPLE_SIZE: 1 + no explicit column casting == like before first row with cleaned values
SAMPLE_SIZE: 0 + and all string columns == works values untouched
SAMPLE_SIZE: 1 + and all string columns == works values untouched

Example for SAMPLE_SIZE: 0 + no explicit column casting:

COL1 COL2 COL3 _SLING_LOADED_AT
NULL NULL NULL 1733298429
\scol1_data\s \scol2_data;with_delimiter\s col3_data 1733298429
\scol1_data\s \scol2_data\s NULL 1733298429
  • Side note:
    When casting all columns as strings the _SLING_LOADED_AT column is also created as a string.
    This makes the unix timestamp obviously a string instead of a number as before.
    But also fails when using SLING_LOADED_AT_COLUMN: timestamp
100035 (22007): Timestamp '2024-12-04 08:57:10.546234881 +0100 CET' is not recognized
  File 'public.csv_test_tmp/2024-12-04T085711.568/2024-12-04T085711.568', line 2, character 39
  Row 1, column "CSV_TEST_TMP"["_SLING_LOADED_AT":4]
  If you would like to continue loading when an error is encountered, use other values such as 'SKIP_FILE' or 'CONTINUE' for the ON_ERROR option. For more information on loading options, please run 'info loading_data' in a SQL client.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants