Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: avoid allocating ADBC inputs and outputs twice #97

Merged
merged 23 commits into from
Jun 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 56 additions & 76 deletions c_src/adbc_arrow_array.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#ifndef ADBC_ARROW_ARRAY_HPP
#define ADBC_ARROW_ARRAY_HPP
#pragma once

#include <stdio.h>
Expand All @@ -9,9 +10,10 @@
#include <adbc.h>
#include <erl_nif.h>
#include "adbc_half_float.hpp"
#include "adbc_arrow_metadata.hpp"

static int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, uint64_t level, std::vector<ERL_NIF_TERM> &out_terms, ERL_NIF_TERM &value_type, ERL_NIF_TERM &metadata, ERL_NIF_TERM &error, bool *end_of_series = nullptr, bool skip_dictionary_check = false);
static int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, int64_t offset, int64_t count, int64_t level, std::vector<ERL_NIF_TERM> &out_terms, ERL_NIF_TERM &value_type, ERL_NIF_TERM &metadata, ERL_NIF_TERM &error, bool *end_of_series = nullptr, bool skip_dictionary_check = false);
static int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, uint64_t level, std::vector<ERL_NIF_TERM> &out_terms, ERL_NIF_TERM &value_type, ERL_NIF_TERM &metadata, ERL_NIF_TERM &error, bool skip_dictionary_check = false);
static int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, int64_t offset, int64_t count, int64_t level, std::vector<ERL_NIF_TERM> &out_terms, ERL_NIF_TERM &value_type, ERL_NIF_TERM &metadata, ERL_NIF_TERM &error, bool skip_dictionary_check = false);
static int get_arrow_array_children_as_list(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, uint64_t level, std::vector<ERL_NIF_TERM> &children, ERL_NIF_TERM &error);
static int get_arrow_array_children_as_list(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, int64_t offset, int64_t count, uint64_t level, std::vector<ERL_NIF_TERM> &children, ERL_NIF_TERM &error);
static int get_arrow_struct(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, uint64_t level, std::vector<ERL_NIF_TERM> &children, ERL_NIF_TERM &error);
Expand Down Expand Up @@ -227,11 +229,11 @@ int get_arrow_array_children_as_list(ErlNifEnv *env, struct ArrowSchema * schema

int get_arrow_struct(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, int64_t offset, int64_t count, uint64_t level, std::vector<ERL_NIF_TERM> &children, ERL_NIF_TERM &error) {
if (schema->n_children > 0 && schema->children == nullptr) {
error = erlang::nif::error(env, "invalid ArrowSchema, schema->children == nullptr, however, schema->n_children > 0");
error = erlang::nif::error(env, "invalid ArrowSchema, schema->children == nullptr while schema->n_children > 0");
return 1;
}
if (values->n_children > 0 && values->children == nullptr) {
error = erlang::nif::error(env, "invalid ArrowArray, values->children == nullptr, however, values->n_children > 0");
error = erlang::nif::error(env, "invalid ArrowArray, values->children == nullptr while values->n_children > 0");
return 1;
}
if (values->n_children != schema->n_children) {
Expand All @@ -257,7 +259,7 @@ int get_arrow_struct(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowAr
if (enif_is_identical(childrens[1], kAtomNil)) {
children[child_i] = kAtomNil;
} else {
children[child_i] = make_adbc_column(env, schema, values, childrens[0], child_type, nullable, child_metadata, childrens[1]);
children[child_i] = make_adbc_column(env, child_schema, values, childrens[0], child_type, nullable, child_metadata, childrens[1]);
}
}
}
Expand All @@ -275,10 +277,10 @@ int get_arrow_dictionary(ErlNifEnv *env,
std::vector<ERL_NIF_TERM> keys, values;
ERL_NIF_TERM index_type, index_metadata;
ERL_NIF_TERM value_type, value_metadata;
if (arrow_array_to_nif_term(env, index_schema, index_array, offset, count, level + 1, keys, index_type, index_metadata, error, nullptr, true) == 1) {
if (arrow_array_to_nif_term(env, index_schema, index_array, offset, count, level + 1, keys, index_type, index_metadata, error, true) == 1) {
return 1;
}
if (arrow_array_to_nif_term(env, value_schema, value_array, offset, count, level + 1, values, value_type, value_metadata, error, nullptr, false) == 1) {
if (arrow_array_to_nif_term(env, value_schema, value_array, offset, count, level + 1, values, value_type, value_metadata, error, false) == 1) {
return 1;
}

Expand Down Expand Up @@ -327,46 +329,50 @@ ERL_NIF_TERM get_arrow_array_map_children(ErlNifEnv *env, struct ArrowSchema * s

struct ArrowSchema * entries_schema = schema->children[0];
struct ArrowArray * entries_values = values->children[0];
if (strncmp("entries", entries_schema->name, 7) != 0) {
if (strcmp("entries", entries_schema->name) != 0) {
return erlang::nif::error(env, "invalid ArrowSchema (map), its single child is not named entries");
}
if (count == -1) count = entries_values->n_children;
if (entries_schema->n_children != 2) {
return erlang::nif::error(env, "invalid ArrowSchema (map), its entries n_children != 2");
}

struct ArrowSchema * key_schema, * value_schema;
struct ArrowArray * key_values, * value_values;
if (strcmp("key", entries_schema->children[0]->name) == 0 && strcmp("value", entries_schema->children[1]->name) == 0) {
key_schema = entries_schema->children[0];
key_values = entries_values->children[0];
value_schema = entries_schema->children[1];
value_values = entries_values->children[1];
} else if (strcmp("key", entries_schema->children[1]->name) == 0 && strcmp("value", entries_schema->children[0]->name) == 0) {
key_schema = entries_schema->children[1];
key_values = entries_values->children[1];
value_schema = entries_schema->children[0];
value_values = entries_values->children[0];
} else {
return erlang::nif::error(env, "invalid map entries, key or value or both are missing");
}

std::vector<ERL_NIF_TERM> nif_keys, nif_values;
bool failed = false;
for (int64_t child_i = offset; child_i < offset + count; child_i++) {
struct ArrowSchema * entry_schema = entries_schema->children[child_i];
struct ArrowArray * entry_values = entries_values->children[child_i];
if (strncmp("key", entry_schema->name, 3) == 0) {
if (get_arrow_array_children_as_list(env, entry_schema, entry_values, level + 1, nif_keys, error) == 1) {
failed = true;
break;
}
} else if (strncmp("value", entry_schema->name, 5) == 0 && entry_schema->n_children == 1) {
struct ArrowSchema * item_schema = entry_schema->children[0];
struct ArrowArray * item_values = entry_values->children[0];
if (get_arrow_array_children_as_list(env, item_schema, item_values, level + 1, nif_values, error) == 1) {
failed = true;
break;
}
} else {
failed = true;
}
ERL_NIF_TERM key_type, key_metadata;
ERL_NIF_TERM value_type, value_metadata;
if (arrow_array_to_nif_term(env, key_schema, key_values, offset, count, level + 1, nif_keys, key_type, key_metadata, error) == 1) {
return erlang::nif::error(env, "failed to get map keys");
}

if (!failed) {
if (nif_keys.size() != nif_values.size()) {
return erlang::nif::error(env, "number of keys and values doesn't match");
}

if (!enif_make_map_from_arrays(env, nif_keys.data(), nif_values.data(), (unsigned)nif_keys.size(), &map_out)) {
return erlang::nif::error(env, "map contains duplicated keys");
} else {
return map_out;
}
} else {
return erlang::nif::error(env, "invalid map");
if (arrow_array_to_nif_term(env, value_schema, value_values, offset, count, level + 1, nif_values, value_type, value_metadata, error) == 1) {
return erlang::nif::error(env, "failed to get map values");
}

ERL_NIF_TERM map_keys[] = {
kAtomKey,
kAtomValue
};
ERL_NIF_TERM map_values[] = {
make_adbc_column(env, key_schema, key_values, nif_keys[0], key_type, key_schema->flags & ARROW_FLAG_NULLABLE, key_metadata, nif_keys[1]),
make_adbc_column(env, value_schema, value_values, nif_values[0], value_type, value_schema->flags & ARROW_FLAG_NULLABLE, value_metadata, nif_values[1])
};

enif_make_map_from_arrays(env, map_keys, map_values, (unsigned)(sizeof(map_keys)/sizeof(map_keys[0])), &map_out);
return map_out;
}

ERL_NIF_TERM get_arrow_array_map_children(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, uint64_t level) {
Expand Down Expand Up @@ -511,10 +517,10 @@ ERL_NIF_TERM get_arrow_run_end_encoded(ErlNifEnv *env, struct ArrowSchema * sche
if (schema->children == nullptr || values->children == nullptr) {
return erlang::nif::error(env, "invalid ArrowArray (run_end_encoded), schema->children == nullptr || values->children == nullptr");
}
if (strncmp("run_ends", schema->children[0]->name, 8) != 0) {
if (strcmp("run_ends", schema->children[0]->name) != 0) {
return erlang::nif::error(env, "invalid ArrowSchema (run_end_encoded), its first child is not named run_ends");
}
if (strncmp("values", schema->children[1]->name, 6) != 0) {
if (strcmp("values", schema->children[1]->name) != 0) {
return erlang::nif::error(env, "invalid ArrowSchema (run_end_encoded), its second child is not named values");
}

Expand Down Expand Up @@ -574,7 +580,7 @@ ERL_NIF_TERM get_arrow_array_list_children(ErlNifEnv *env, struct ArrowSchema *
const uint8_t * bitmap_buffer = (const uint8_t *)values->buffers[bitmap_buffer_index];
struct ArrowSchema * items_schema = schema->children[0];
struct ArrowArray * items_values = values->children[0];
if (strncmp("item", items_schema->name, 4) != 0) {
if (strcmp("item", items_schema->name) != 0) {
return erlang::nif::error(env, "invalid ArrowSchema (list), its single child is not named item");
}

Expand Down Expand Up @@ -698,7 +704,7 @@ ERL_NIF_TERM get_arrow_array_list_view(ErlNifEnv *env, struct ArrowSchema * sche

struct ArrowSchema * items_schema = schema->children[0];
struct ArrowArray * items_values = values->children[0];
if (strncmp("item", items_schema->name, 4) != 0) {
if (strcmp("item", items_schema->name) != 0) {
return erlang::nif::error(env, "invalid ArrowSchema (list), its single child is not named item");
}
if (count == -1) count = values->length;
Expand Down Expand Up @@ -753,7 +759,7 @@ ERL_NIF_TERM get_arrow_array_list_view(ErlNifEnv *env, struct ArrowSchema * sche
return get_arrow_array_list_view(env, schema, values, 0, -1, level, list_type);
}

int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, int64_t offset, int64_t count, int64_t level, std::vector<ERL_NIF_TERM> &out_terms, ERL_NIF_TERM &term_type, ERL_NIF_TERM &arrow_metadata, ERL_NIF_TERM &error, bool *end_of_series, bool skip_dictionary_check) {
int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, int64_t offset, int64_t count, int64_t level, std::vector<ERL_NIF_TERM> &out_terms, ERL_NIF_TERM &term_type, ERL_NIF_TERM &arrow_metadata, ERL_NIF_TERM &error, bool skip_dictionary_check) {
if (schema == nullptr) {
error = erlang::nif::error(env, "invalid ArrowSchema (nullptr) when invoking next");
return 1;
Expand All @@ -769,39 +775,14 @@ int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct
ERL_NIF_TERM current_term{}, children_term{};
size_t format_len = strlen(format);

if (level == 0 && format_len == 2 && strncmp("+s", format, 2) == 0 && values->length == 0) {
if (end_of_series) {
*end_of_series = true;
}
out_terms.clear();
out_terms.emplace_back(kAtomEndOfSeries);
return 0;
}

term_type = kAtomNil;
arrow_metadata = kAtomNil;
std::vector<ERL_NIF_TERM> children;

constexpr int64_t bitmap_buffer_index = 0;
int64_t data_buffer_index = 1;
int64_t offset_buffer_index = 2;

std::vector<ERL_NIF_TERM> metadata_keys, metadata_values;
if (schema->metadata) {
struct ArrowMetadataReader metadata_reader{};
struct ArrowStringView key;
struct ArrowStringView value;
if (ArrowMetadataReaderInit(&metadata_reader, schema->metadata) == NANOARROW_OK) {
while (ArrowMetadataReaderRead(&metadata_reader, &key, &value) == NANOARROW_OK) {
// printf("key: %.*s, value: %.*s\n", (int)key.size_bytes, key.data, (int)value.size_bytes, value.data);
metadata_keys.push_back(erlang::nif::make_binary(env, key.data, (size_t)key.size_bytes));
metadata_values.push_back(erlang::nif::make_binary(env, value.data, (size_t)value.size_bytes));
}
if (metadata_keys.size() > 0) {
enif_make_map_from_arrays(env, metadata_keys.data(), metadata_values.data(), (unsigned)metadata_keys.size(), &arrow_metadata);
}
}
}
NANOARROW_RETURN_NOT_OK(arrow_metadata_to_nif_term(env, schema->metadata, &arrow_metadata));

if (!skip_dictionary_check) {
if (schema->dictionary != nullptr && values->dictionary != nullptr) {
Expand Down Expand Up @@ -1703,7 +1684,7 @@ int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct
}

if (!format_processed) {
snprintf(err_msg_buf, 255, "not yet implemented for format: `%s`", schema->format);
snprintf(err_msg_buf, sizeof(err_msg_buf)/sizeof(err_msg_buf[0]), "not yet implemented for format: `%s`", schema->format);
error = erlang::nif::error(env, erlang::nif::make_binary(env, err_msg_buf));
return 1;
// printf("not implemented for format: `%s`\r\n", schema->format);
Expand All @@ -1716,7 +1697,6 @@ int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct
}

out_terms.clear();

if (is_struct) {
if (level > 0) {
out_terms.emplace_back(erlang::nif::make_binary(env, name));
Expand All @@ -1735,8 +1715,8 @@ int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct
return 0;
}

int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, uint64_t level, std::vector<ERL_NIF_TERM> &out_terms, ERL_NIF_TERM &out_type, ERL_NIF_TERM &metadata, ERL_NIF_TERM &error, bool *end_of_series, bool skip_dictionary_check) {
return arrow_array_to_nif_term(env, schema, values, 0, -1, level, out_terms, out_type, metadata, error, end_of_series, skip_dictionary_check);
int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, uint64_t level, std::vector<ERL_NIF_TERM> &out_terms, ERL_NIF_TERM &out_type, ERL_NIF_TERM &metadata, ERL_NIF_TERM &error, bool skip_dictionary_check) {
return arrow_array_to_nif_term(env, schema, values, 0, -1, level, out_terms, out_type, metadata, error, skip_dictionary_check);
}

#endif // ADBC_ARROW_ARRAY_HPP
50 changes: 50 additions & 0 deletions c_src/adbc_arrow_array_stream_record.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#ifndef ADBC_ARROW_ARRAY_STREAM_RECORD_HPP
#define ADBC_ARROW_ARRAY_STREAM_RECORD_HPP
#pragma once

#include <adbc.h>

struct ArrowArrayStreamRecord {
struct ArrowSchema *schema = nullptr;
struct ArrowArray *values = nullptr;

/// Allocate memory for schema and values
/// @return 0 if success, 1 if failed
int allocate_schema_and_values() {
this->schema = (struct ArrowSchema *)enif_alloc(sizeof(struct ArrowSchema));
if (this->schema == nullptr) {
return 1;
}
memset(this->schema, 0, sizeof(struct ArrowSchema));

this->values = (struct ArrowArray *)enif_alloc(sizeof(struct ArrowArray));
if (this->values == nullptr) {
enif_free(this->schema);
this->schema = nullptr;
return 1;
}
memset(this->values, 0, sizeof(struct ArrowArray));

return 0;
}

void release_schema_and_values() {
if (this->schema) {
if (this->schema->release) {
this->schema->release(this->schema);
}
enif_free(this->schema);
this->schema = nullptr;
}

if (this->values) {
if (this->values->release) {
this->values->release(this->values);
}
enif_free(this->values);
this->values = nullptr;
}
}
};

#endif // ADBC_ARROW_ARRAY_STREAM_RECORD_HPP
32 changes: 32 additions & 0 deletions c_src/adbc_arrow_metadata.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#ifndef ADBC_ARROW_METADATA_HPP
#define ADBC_ARROW_METADATA_HPP
#pragma once

#include <stdio.h>
#include <vector>
#include <adbc.h>
#include <erl_nif.h>
#include "adbc_consts.h"
#include "nif_utils.hpp"

static int arrow_metadata_to_nif_term(ErlNifEnv *env, const char * metadata, ERL_NIF_TERM * out_metadata) {
std::vector<ERL_NIF_TERM> metadata_keys, metadata_values;
*out_metadata = kAtomNil;
if (metadata == nullptr) return NANOARROW_OK;

struct ArrowMetadataReader metadata_reader{};
struct ArrowStringView key;
struct ArrowStringView value;
NANOARROW_RETURN_NOT_OK(ArrowMetadataReaderInit(&metadata_reader, metadata));
while (ArrowMetadataReaderRead(&metadata_reader, &key, &value) == NANOARROW_OK) {
// printf("key: %.*s, value: %.*s\n", (int)key.size_bytes, key.data, (int)value.size_bytes, value.data);
metadata_keys.push_back(erlang::nif::make_binary(env, key.data, (size_t)key.size_bytes));
metadata_values.push_back(erlang::nif::make_binary(env, value.data, (size_t)value.size_bytes));
}
if (metadata_keys.size() > 0) {
enif_make_map_from_arrays(env, metadata_keys.data(), metadata_values.data(), (unsigned)metadata_keys.size(), out_metadata);
}
return NANOARROW_OK;
}

#endif // ADBC_ARROW_METADATA_HPP
Loading