[ckip ci] Replace getline with fread, update parser

This commit is contained in:
ado 2023-08-12 21:15:44 +02:00
parent 4bedc32b63
commit 0b3b719155
2 changed files with 174 additions and 288 deletions

View File

@ -25,60 +25,4 @@ inline void assert_throw_on_error_not_defined() {
"'throw_on_error' is enabled"); "'throw_on_error' is enabled");
} }
#if __unix__
inline ssize_t get_line(char** lineptr, size_t* n, FILE* stream) {
return getline(lineptr, n, stream);
}
#else
using ssize_t = int64_t;
inline ssize_t get_line(char** lineptr, size_t* n, FILE* stream) {
size_t pos;
int c;
if (lineptr == nullptr || stream == nullptr || n == nullptr) {
errno = EINVAL;
return -1;
}
c = getc(stream);
if (c == EOF) {
return -1;
}
if (*lineptr == nullptr) {
*lineptr = static_cast<char*>(malloc(128));
if (*lineptr == nullptr) {
return -1;
}
*n = 128;
}
pos = 0;
while (c != EOF) {
if (pos + 1 >= *n) {
size_t new_size = *n + (*n >> 2);
if (new_size < 128) {
new_size = 128;
}
char* new_ptr = static_cast<char*>(
realloc(static_cast<void*>(*lineptr), new_size));
if (new_ptr == nullptr) {
return -1;
}
*n = new_size;
*lineptr = new_ptr;
}
(*lineptr)[pos++] = c;
if (c == '\n') {
break;
}
c = getc(stream);
}
(*lineptr)[pos] = '\0';
return pos;
}
#endif
} /* ss */ } /* ss */

View File

@ -1,5 +1,12 @@
#pragma once #pragma once
// TODO remove or rename
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
#define U_IF(OP) if (unlikely(OP))
#define L_IF(OP) if (likely(OP))
#include "common.hpp" #include "common.hpp"
#include "converter.hpp" #include "converter.hpp"
#include "exception.hpp" #include "exception.hpp"
@ -36,11 +43,12 @@ public:
const std::string& delim = ss::default_delimiter) const std::string& delim = ss::default_delimiter)
: file_name_{file_name}, reader_{file_name_, delim} { : file_name_{file_name}, reader_{file_name_, delim} {
if (reader_.file_) { if (reader_.file_) {
read_line(); // read_line();
if constexpr (ignore_header) { if constexpr (ignore_header) {
ignore_next(); ignore_next();
} else { } else {
raw_header_ = reader_.get_buffer(); // TODO handle
// raw_header_ = reader_.get_buffer();
} }
} else { } else {
handle_error_file_not_open(); handle_error_file_not_open();
@ -84,34 +92,11 @@ public:
} }
size_t line() const { size_t line() const {
return reader_.line_number_ > 1 ? reader_.line_number_ - 1 return reader_.line_number_;
: reader_.line_number_;
} }
template <typename T, typename... Ts> template <typename T, typename... Ts>
no_void_validator_tup_t<T, Ts...> get_next() { no_void_validator_tup_t<T, Ts...> get_next() {
std::optional<std::string> error;
if (!eof_) {
if constexpr (throw_on_error) {
try {
reader_.parse();
} catch (const ss::exception& e) {
read_line();
decorate_rethrow(e);
}
} else {
reader_.parse();
}
}
reader_.update();
if (!reader_.converter_.valid()) {
handle_error_invalid_conversion();
read_line();
return {};
}
clear_error(); clear_error();
if (eof_) { if (eof_) {
@ -121,22 +106,24 @@ public:
if constexpr (throw_on_error) { if constexpr (throw_on_error) {
try { try {
auto value = reader_.converter_.template convert<T, Ts...>();
read_line(); read_line();
return value; return reader_.converter_.template convert<T, Ts...>(
reader_.split_data_);
} catch (const ss::exception& e) { } catch (const ss::exception& e) {
read_line();
decorate_rethrow(e); decorate_rethrow(e);
} }
} }
auto value = reader_.converter_.template convert<T, Ts...>(); read_line();
// TODO check valid read
auto value =
reader_.converter_.template convert<T, Ts...>(reader_.split_data_);
if (!reader_.converter_.valid()) { if (!reader_.converter_.valid()) {
handle_error_invalid_conversion(); handle_error_invalid_conversion();
} }
read_line();
return value; return value;
} }
@ -189,8 +176,6 @@ public:
} }
reader_.converter_.set_column_mapping(column_mappings, header_.size()); reader_.converter_.set_column_mapping(column_mappings, header_.size());
reader_.next_line_converter_.set_column_mapping(column_mappings,
header_.size());
if (line() == 1) { if (line() == 1) {
ignore_next(); ignore_next();
@ -636,258 +621,215 @@ private:
eof_ = !reader_.read_next(); eof_ = !reader_.read_next();
} }
////////////////
// new reader
////////////////
struct reader { struct reader {
reader(const std::string& file_name_, const std::string& delim) reader(const reader& other) = delete;
: delim_{delim}, file_{fopen(file_name_.c_str(), "rb")} { reader& operator=(const reader& other) = delete;
}
// TODO update
reader(reader&& other) reader(reader&& other)
: buffer_{other.buffer_}, : delim_{std::move(other.delim_)}, file_{other.file_},
next_line_buffer_{other.next_line_buffer_}, converter_{std::move(other.converter_)}, buff_{other.buff_},
helper_buffer_{other.helper_buffer_}, converter_{std::move( line_number_{other.line_number_}, buff_size_{other.buff_size_},
other.converter_)}, buff_filled_{other.buff_filled_},
next_line_converter_{std::move(other.next_line_converter_)}, buff_processed_{other.buff_processed_},
buffer_size_{other.buffer_size_}, delim_char_{other.delim_char_}, begin_{other.begin_},
next_line_buffer_size_{other.next_line_buffer_size_}, curr_{other.curr_}, end_{other.end_},
helper_size_{other.helper_size_}, delim_{std::move(other.delim_)}, last_read_{other.last_read_}, split_data_{
file_{other.file_}, crlf_{other.crlf_}, std::move(other.split_data_)} {
line_number_{other.line_number_}, next_line_size_{
other.next_line_size_} {
other.buffer_ = nullptr;
other.next_line_buffer_ = nullptr;
other.helper_buffer_ = nullptr;
other.file_ = nullptr; other.file_ = nullptr;
other.buff_ = nullptr;
} }
reader& operator=(reader&& other) { reader& operator=(reader&& other) {
if (this != &other) { if (this != &other) {
buffer_ = other.buffer_;
next_line_buffer_ = other.next_line_buffer_;
helper_buffer_ = other.helper_buffer_;
converter_ = std::move(other.converter_);
next_line_converter_ = std::move(other.next_line_converter_);
buffer_size_ = other.buffer_size_;
next_line_buffer_size_ = other.next_line_buffer_size_;
helper_size_ = other.helper_size_;
delim_ = std::move(other.delim_); delim_ = std::move(other.delim_);
file_ = other.file_; file_ = other.file_;
crlf_ = other.crlf_; converter_ = std::move(other.converter_);
buff_ = other.buff_;
line_number_ = other.line_number_; line_number_ = other.line_number_;
next_line_size_ = other.next_line_size_; buff_filled_ = other.buff_filled_;
buff_size_ = other.buff_size_;
buff_processed_ = other.buff_processed_;
delim_char_ = other.delim_char_;
begin_ = other.begin_;
curr_ = other.curr_;
end_ = other.end_;
last_read_ = other.last_read_;
split_data_ = std::move(other.split_data_);
other.buffer_ = nullptr;
other.next_line_buffer_ = nullptr;
other.helper_buffer_ = nullptr;
other.file_ = nullptr; other.file_ = nullptr;
other.buff_ = nullptr;
} }
return *this; return *this;
} }
reader(const std::string& file_name, const std::string& delim)
: delim_{delim} {
// TODO update
delim_char_ = delim_.at(0);
// TODO check file
file_ = std::fopen(file_name.c_str(), "rb");
if (!file_) {
return;
}
// TODO check buff_
buff_ = static_cast<char*>(std::malloc(buff_size_));
// TODO check buff_filled
buff_filled_ = std::fread(buff_, 1, buff_size_, file_);
if (buff_filled_ != buff_size_) {
last_read_ = true;
}
// TODO handle differently
if (buff_filled_ == 0 || (buff_filled_ == 1 && buff_[0] == '\n') ||
(buff_filled_ == 2 && buff_[0] == '\r' && buff_[1] == '\n')) {
free(file_);
file_ = nullptr;
}
begin_ = buff_;
curr_ = buff_;
end_ = buff_ + buff_filled_;
}
~reader() { ~reader() {
free(buffer_); free(buff_);
free(next_line_buffer_);
free(helper_buffer_);
if (file_) { if (file_) {
fclose(file_); fclose(file_);
} }
} }
reader() = delete; void shift_read_next() {
reader(const reader& other) = delete; buff_filled_ -= buff_processed_;
reader& operator=(const reader& other) = delete;
// shift back data that is not processed
memcpy(buff_, buff_ + buff_processed_, buff_filled_);
// read data to fill the rest of the buffer
buff_filled_ +=
fread(buff_ + buff_filled_, 1, buff_processed_, file_);
}
void handle_buffer_end_reached() {
buff_size_ *= 8;
// TODO handle NULL
buff_ = static_cast<char*>(std::realloc(buff_, buff_size_));
// fill the rest of the buffer
buff_filled_ += fread(buff_ + buff_filled_, 1,
buff_size_ - buff_filled_, file_);
if (buff_filled_ != buff_size_) {
last_read_ = true;
}
};
// read next line each time in order to set eof_ // read next line each time in order to set eof_
bool read_next() { bool read_next() {
next_line_converter_.clear_error(); // TODO update division value
ssize_t ssize = 0; if (buff_processed_ > buff_filled_ / 2) {
size_t size = 0; if (!last_read_) {
while (size == 0) { shift_read_next();
++line_number_;
if (next_line_buffer_size_ > 0) { if (buff_filled_ != buff_size_) {
next_line_buffer_[0] = '\0'; last_read_ = true;
}
curr_ = buff_;
end_ = buff_ + buff_filled_;
} }
ssize = get_line(&next_line_buffer_, &next_line_buffer_size_, }
file_);
if (ssize == -1) { split_data_.clear();
return false; begin_ = curr_;
}
size = remove_eol(next_line_buffer_, ssize); while (true) {
if (*curr_ == '\n') {
if constexpr (!ignore_empty) { split_data_.emplace_back(begin_, curr_);
break; break;
} }
}
next_line_size_ = size; if (*curr_ == '\r' && *(curr_ + 1) == '\n') {
return true; split_data_.emplace_back(begin_, curr_);
} ++curr_;
break;
}
void parse() { if (*curr_ == delim_char_) {
size_t limit = 0; split_data_.emplace_back(begin_, curr_);
begin_ = curr_ + 1;
}
if constexpr (escaped_multiline_enabled) { ++curr_;
while (escaped_eol(next_line_size_)) {
if (multiline_limit_reached(limit)) { if (curr_ == end_) {
return; auto old_buff = buff_;
if (last_read_) {
// TODO handle
throw std::string{"no new line at eof"};
} }
if (!append_next_line_to_buffer(next_line_buffer_, handle_buffer_end_reached();
next_line_size_)) { end_ = buff_ + buff_filled_;
next_line_converter_.handle_error_unterminated_escape();
return; for (auto& [begin, end] : split_data_) {
begin = begin - old_buff + buff_;
end = end - old_buff + buff_;
} }
begin_ = begin_ - old_buff + buff_;
curr_ = curr_ - old_buff + buff_;
} }
} }
next_line_converter_.split(next_line_buffer_, delim_); ++curr_;
buff_processed_ = curr_ - buff_;
if constexpr (quoted_multiline_enabled) { // TODO check where to put this
while (unterminated_quote()) { ++line_number_;
next_line_size_ -= next_line_converter_.size_shifted();
if (multiline_limit_reached(limit)) { if (last_read_ && curr_ >= end_) {
return;
}
if (!append_next_line_to_buffer(next_line_buffer_,
next_line_size_)) {
next_line_converter_.handle_error_unterminated_quote();
return;
}
if constexpr (escaped_multiline_enabled) {
while (escaped_eol(next_line_size_)) {
if (multiline_limit_reached(limit)) {
return;
}
if (!append_next_line_to_buffer(next_line_buffer_,
next_line_size_)) {
next_line_converter_
.handle_error_unterminated_escape();
return;
}
}
}
next_line_converter_.resplit(next_line_buffer_,
next_line_size_, delim_);
}
}
}
void update() {
std::swap(buffer_, next_line_buffer_);
std::swap(buffer_size_, next_line_buffer_size_);
std::swap(converter_, next_line_converter_);
}
bool multiline_limit_reached(size_t& limit) {
if constexpr (multiline::size > 0) {
if (limit++ >= multiline::size) {
next_line_converter_.handle_error_multiline_limit_reached();
return true;
}
}
return false;
}
bool escaped_eol(size_t size) {
const char* curr;
for (curr = next_line_buffer_ + size - 1;
curr >= next_line_buffer_ &&
setup<Options...>::escape::match(*curr);
--curr) {
}
return (next_line_buffer_ - curr + size) % 2 == 0;
}
bool unterminated_quote() {
return next_line_converter_.unterminated_quote();
}
void undo_remove_eol(char* buffer, size_t& string_end) {
if (crlf_) {
std::copy_n("\r\n\0", 3, buffer + string_end);
string_end += 2;
} else {
std::copy_n("\n\0", 2, buffer + string_end);
string_end += 1;
}
}
size_t remove_eol(char*& buffer, size_t ssize) {
size_t size = ssize - 1;
if (ssize >= 2 && buffer[ssize - 2] == '\r') {
crlf_ = true;
size--;
} else {
crlf_ = false;
}
buffer[size] = '\0';
return size;
}
void realloc_concat(char*& first, size_t& first_size,
const char* const second, size_t second_size) {
// TODO make buffer_size an argument
next_line_buffer_size_ = first_size + second_size + 3;
auto new_first = static_cast<char*>(
realloc(static_cast<void*>(first), next_line_buffer_size_));
if (!first) {
throw std::bad_alloc{};
}
first = new_first;
std::copy_n(second, second_size + 1, first + first_size);
first_size += second_size;
}
bool append_next_line_to_buffer(char*& buffer, size_t& size) {
undo_remove_eol(buffer, size);
ssize_t next_ssize =
get_line(&helper_buffer_, &helper_size_, file_);
if (next_ssize == -1) {
return false; return false;
} }
++line_number_;
size_t next_size = remove_eol(helper_buffer_, next_ssize);
realloc_concat(buffer, size, helper_buffer_, next_size);
return true; return true;
} }
std::string get_buffer() { std::string delim_{};
return std::string{next_line_buffer_, next_line_buffer_size_};
}
////////////////
// members
////////////////
char* buffer_{nullptr};
char* next_line_buffer_{nullptr};
char* helper_buffer_{nullptr};
converter<Options...> converter_;
converter<Options...> next_line_converter_;
size_t buffer_size_{0};
size_t next_line_buffer_size_{0};
size_t helper_size_{0};
std::string delim_;
FILE* file_{nullptr}; FILE* file_{nullptr};
bool crlf_{false}; converter<Options...> converter_;
char* buff_{nullptr};
size_t line_number_{0}; size_t line_number_{0};
size_t next_line_size_{0}; // TODO set initial buffer size
size_t buff_size_{1};
size_t buff_filled_{0};
size_t buff_processed_{0};
// TODO update
char delim_char_;
// TODO move to splitter
char* begin_{nullptr};
char* curr_{nullptr};
char* end_{nullptr};
bool last_read_{false};
split_data split_data_;
}; };
//////////////// ////////////////