Skip to content

Commit

Permalink
Merge pull request #49 from TidierOrg/add_streaming_duckdb
Browse files Browse the repository at this point in the history
Add streaming, multiple file reading and customized file reading with DuckDB backend
  • Loading branch information
drizk1 authored Jul 28, 2024
2 parents 8c98381 + cb89abf commit 210f315
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 9 deletions.
8 changes: 7 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# TidierDB.jl updates

## v0.3. - 2024-07-25
## v0.3.1 - 2024-07-28
- adds support for reading from multiple files at once as a vector of paths in `db_table` when using DuckDB
- ie `db_table(db, ["path1", "path2"])`
- adds streaming support when using DuckDB with `@collect(stream = true)`
- allows user to customize file reading via `db_table(db, "read_*(path, args)")` when using DuckDB"

## v0.3.0 - 2024-07-25
- Introduces package extensions for:
- Postgres, ClickHouse, MySQL, MsSQL, SQLite, Oracle, Athena, and Google BigQuery
- (Documentation)[https://tidierorg.github.io/TidierDB.jl/latest/examples/generated/UserGuide/getting_started/] updated for using these backends.
Expand Down
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "TidierDB"
uuid = "86993f9b-bbba-4084-97c5-ee15961ad48b"
authors = ["Daniel Rizk <[email protected]> and contributors"]
version = "0.3.0"
version = "0.3.1"

[deps]
Arrow = "69666777-d1a9-59fb-9406-91d4454c9d45"
Expand Down
46 changes: 44 additions & 2 deletions src/TBD_macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -672,12 +672,54 @@ function final_collect(sqlquery::SQLQuery, ::Type{<:snowflake})
result = execute_snowflake(sqlquery.db, final_query)
return DataFrame(result)
end
#using TidierDB
function stream_collect(sqlquery::SQLQuery)
final_query = finalize_query(sqlquery)
res = DBInterface.execute(sqlquery.db, final_query, DuckDB.StreamResult)

# Helper function to get the non-Missing type from a Union{Missing, T}
function non_missing_type(T)
T === Missing && return Any
T <: Union{Missing} ? non_missing_type(Base.typesplit(T)[2]) : T
end

# Initialize DataFrame with correct types
df = DataFrame([name => Vector{non_missing_type(t)}() for (name, t) in zip(res.names, res.types)])

while true
chunk = DuckDB.nextDataChunk(res)
chunk === missing && break # All chunks processed

macro collect(sqlquery)
for (col_idx, col_name) in enumerate(res.names)
# Convert DuckDB data to Julia data
duckdb_logical_type = DuckDB.LogicalType(DuckDB.duckdb_column_logical_type(res.handle, col_idx))
duckdb_conversion_state = DuckDB.ColumnConversionData([chunk], col_idx, duckdb_logical_type, nothing)
col_data = DuckDB.convert_column(duckdb_conversion_state)

# Append the data to the DataFrame
append!(df[!, col_name], col_data)
end

DuckDB.destroy_data_chunk(chunk)
end

return df
end


"""
$docstring_collect
"""
macro collect(sqlquery, stream = false)
return quote
backend = current_sql_mode[]
if backend == duckdb()
final_collect($(esc(sqlquery)), duckdb)
if $stream
println("streaming")
stream_collect($(esc(sqlquery)))
else
final_collect($(esc(sqlquery)), duckdb)
end
elseif backend == clickhouse()
final_collect($(esc(sqlquery)), clickhouse)
elseif backend == sqlite()
Expand Down
32 changes: 31 additions & 1 deletion src/TidierDB.jl
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ function db_table(db, table, athena_params::Any=nothing; iceberg::Bool=false, de
table_name2 = "delta_scan('$table_name')"
# println(table_name2)
metadata = get_table_metadata(db, table_name2)
elseif startswith(table_name, "read")
table_name2 = "$table_name"
metadata = get_table_metadata(db, table_name2)
elseif occursin(r"[:/]", table_name)
table_name2 = "'$table_name'"
metadata = get_table_metadata(db, table_name2)
Expand All @@ -229,15 +232,42 @@ function db_table(db, table, athena_params::Any=nothing; iceberg::Bool=false, de
"iceberg_scan('$table_name', allow_moved_paths = true)"
elseif delta
"delta_scan('$table_name')"
elseif occursin(r"[:/]", table_name) && !(iceberg || delta)
elseif occursin(r"[:/]", table_name) && !(iceberg || delta) && !startswith(table_name, "read")
"'$table_name'"
elseif startswith(table_name, "read")
"$table_name"
else
table_name
end

return SQLQuery(from=formatted_table_name, metadata=metadata, db=db, athena_params=athena_params)
end

function db_table(db, table::Vector{String}, athena_params::Any=nothing)
if isempty(table)
error("Empty vector of file paths provided")
end

# Get file type from the first file
file_type = lowercase(splitext(first(table))[2])

# Format paths: wrap each in single quotes and join with commas
formatted_paths = join(map(path -> "'$path'", table), ", ")

formatted_table_name = if file_type == ".csv"
"read_csv([$formatted_paths])"
elseif file_type == ".parquet"
"read_parquet([$formatted_paths])"
else
error("Unsupported file type: $file_type")
end
meta_vec = first(table)
# Get metadata from the first file
metadata = get_table_metadata(db, "'$meta_vec'")

return SQLQuery(from=formatted_table_name, metadata=metadata, db=db, athena_params=athena_params)
end

"""
$docstring_copy_to
"""
Expand Down
48 changes: 44 additions & 4 deletions src/docstrings.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1051,7 +1051,8 @@ const docstring_db_table =
`db_table` starts the underlying SQL query struct, adding the metadata and table.
#arguments
# Arguments
`database`: The Database or connection object
`table_name`: tablename as a string. Table name can be a name of a table on the database or paths to the following types
-CSV
Expand All @@ -1060,8 +1061,9 @@ const docstring_db_table =
-Iceberg
-Delta
-S3 tables from AWS or Google Cloud
`delta`: must be true to read delta
`iceberg`: must be true to read iceberg
- vector of CSV or Parquet paths to read multiple at once
`delta`: must be true to read delta files
`iceberg`: must be true to read iceberg finalize_ctes
# Example
```julia
Expand All @@ -1085,4 +1087,42 @@ TidierDB.SQLQuery("", "df_mem", "", "", "", "", "", "", false, false, 4×4 DataF
3 │ value BIGINT 1 df_mem
4 │ percent DOUBLE 1 df_mem, false, DuckDB.Connection(":memory:"), TidierDB.CTE[], 0, nothing)
```
"""
"""

const docstring_collect =
"""
@collect(sql_query, stream = false)
`db_table` starts the underlying SQL query struct, adding the metadata and table.
# Arguments
- `sql_query`: The SQL query to operate on.
- `stream`: optional streaming for query/execution of results when using duck db. Defaults to false
# Example
```julia
julia> db = connect(duckdb());
julia> df = DataFrame(id = [string('A' + i ÷ 26, 'A' + i % 26) for i in 0:9],
groups = [i % 2 == 0 ? "aa" : "bb" for i in 1:10],
value = repeat(1:5, 2),
percent = 0.1:0.1:1.0);
julia> copy_to(db, df, "df_mem");
julia> @collect db_table(db, "df_mem")
10×4 DataFrame
Row │ id groups value percent
│ String? String? Int64? Float64?
─────┼────────────────────────────────────
1 │ AA bb 1 0.1
2 │ AB aa 2 0.2
3 │ AC bb 3 0.3
4 │ AD aa 4 0.4
5 │ AE bb 5 0.5
6 │ AF aa 1 0.6
7 │ AG bb 2 0.7
8 │ AH aa 3 0.8
9 │ AI bb 4 0.9
10 │ AJ aa 5 1.0
```
"""

2 comments on commit 210f315

@drizk1
Copy link
Member Author

@drizk1 drizk1 commented on 210f315 Jul 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator register

Release notes:

  • adds support for reading from multiple files at once as a vector of paths in db_table when using DuckDB
    • ie db_table(db, ["path1", "path2"])
  • adds streaming support when using DuckDB with @collect(stream = true)
  • allows user to customize file reading via db_table(db, "read_*(path, args)") when using DuckDB

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/111943

Tagging

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.3.1 -m "<description of version>" 210f315f0b3f16219497ca40d55d5ae796a26d27
git push origin v0.3.1

Please sign in to comment.