-
Notifications
You must be signed in to change notification settings - Fork 1k
add fread file connection support #7422
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 20 commits
00dc7bb
78bce0e
0afd468
fa79c8c
9590e22
58b3386
995d2dc
3866b6d
8294c6f
1b7cec7
9b3c387
3da8943
f6f9ed3
5a98e62
d76c3a5
d520cd4
c3f7cf6
4235a5c
e37b0ee
2bcfc6c
2e67cc2
4383ae2
5e91780
5182c0c
441c557
63fa168
a609fda
daabbb7
1a98f38
5eef830
0c6eff5
236bc5c
6f4d90f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| \name{reopen_connection} | ||
| \alias{reopen_connection} | ||
ben-schwen marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| \title{ Reopen a connection in binary mode } | ||
| \description{ | ||
| S3 generic to reopen a connection in binary read mode. Used internally by \code{fread}. Exported so packages with custom connection classes can define methods. | ||
| } | ||
| \usage{ | ||
| reopen_connection(con, description, ...) | ||
| } | ||
| \arguments{ | ||
| \item{con}{ A connection object. } | ||
| \item{description}{ character string. A description of the connection. } | ||
| \item{...}{ Additional arguments for methods. } | ||
| } | ||
| \details{ | ||
| Reopens a connection in binary read mode (\code{"rb"}). Methods are provided for \code{file}, \code{gzfile}, \code{bzfile}, \code{url}, \code{unz} and \code{pipe} connections. | ||
|
|
||
| To support custom connection types with \code{fread}, define a method for your connection class that returns a new connection opened in binary mode. | ||
| } | ||
| \value{ | ||
| A connection object opened in binary read mode. | ||
| } | ||
| \seealso{ | ||
| \code{\link{fread}} | ||
| } | ||
| \keyword{ data } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,8 @@ | ||
| #include "fread.h" | ||
| #include "freadR.h" | ||
| #include "data.table.h" | ||
| #include <R_ext/Connections.h> | ||
| #include <errno.h> | ||
|
|
||
| /***** TO DO ***** | ||
| Restore test 1339 (balanced embedded quotes, see ?fread already updated). | ||
|
|
@@ -28,6 +30,7 @@ Secondary separator for list() columns, such as columns 11 and 12 in BED (no nee | |
| static int typeSxp[NUT] = { NILSXP, LGLSXP, LGLSXP, LGLSXP, LGLSXP, LGLSXP, LGLSXP, INTSXP, REALSXP, REALSXP, REALSXP, REALSXP, INTSXP, REALSXP, STRSXP, REALSXP, STRSXP }; | ||
| static char typeRName[NUT][10] = { "NULL", "logical", "logical", "logical", "logical", "logical", "logical", "integer", "integer64", "double", "double", "double", "IDate", "POSIXct", "character", "numeric", "CLASS" }; | ||
| static int typeEnum[NUT] = { CT_DROP, CT_EMPTY, CT_BOOL8_N, CT_BOOL8_U, CT_BOOL8_T, CT_BOOL8_L, CT_BOOL8_Y, CT_INT32, CT_INT64, CT_FLOAT64, CT_FLOAT64_HEX, CT_FLOAT64_EXT, CT_ISO8601_DATE, CT_ISO8601_TIME, CT_STRING, CT_FLOAT64, CT_STRING }; | ||
|
|
||
| static colType readInt64As = CT_INT64; | ||
| static SEXP selectSxp; | ||
| static SEXP dropSxp; | ||
|
|
@@ -77,7 +80,8 @@ SEXP freadR( | |
| SEXP integer64Arg, | ||
| SEXP encodingArg, | ||
| SEXP keepLeadingZerosArgs, | ||
| SEXP noTZasUTC | ||
| SEXP noTZasUTC, | ||
| SEXP connectionSpillArg | ||
| ) | ||
| { | ||
| verbose = LOGICAL(verboseArg)[0]; | ||
|
|
@@ -170,6 +174,19 @@ SEXP freadR( | |
| args.warningsAreErrors = warningsAreErrors; | ||
| args.keepLeadingZeros = LOGICAL(keepLeadingZerosArgs)[0]; | ||
| args.noTZasUTC = LOGICAL(noTZasUTC)[0]; | ||
| args.connectionSpillActive = false; | ||
| args.connectionSpillSeconds = 0.0; | ||
| args.connectionSpillBytes = 0.0; | ||
| if (!isNull(connectionSpillArg)) { | ||
| if (!isReal(connectionSpillArg) || LENGTH(connectionSpillArg) != 2) | ||
| internal_error(__func__, "connectionSpillArg must be length-2 real vector"); // # nocov | ||
| const double *spill = REAL(connectionSpillArg); | ||
| args.connectionSpillSeconds = spill[0]; | ||
| args.connectionSpillBytes = spill[1]; | ||
| if (!R_FINITE(args.connectionSpillSeconds) || args.connectionSpillSeconds < 0) args.connectionSpillSeconds = 0.0; | ||
| if (!R_FINITE(args.connectionSpillBytes) || args.connectionSpillBytes < 0) args.connectionSpillBytes = 0.0; | ||
| args.connectionSpillActive = true; | ||
| } | ||
|
|
||
| // === extras used for callbacks === | ||
| if (!isString(integer64Arg) || LENGTH(integer64Arg) != 1) error(_("'integer64' must be a single character string")); | ||
|
|
@@ -724,6 +741,86 @@ void progress(int p, int eta) | |
| } | ||
| // # nocov end | ||
|
|
||
| // Spill connection contents to a tempfile so R-level fread can treat it like a filename | ||
| SEXP spillConnectionToFile(SEXP connection, SEXP tempfile_path, SEXP nrows_limit) { | ||
ben-schwen marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| #if R_CONNECTIONS_VERSION != 1 | ||
| INTERNAL_STOP(_("spillConnectionToFile: unexpected R_CONNECTIONS_VERSION = %d", R_CONNECTIONS_VERSION)); // # nocov | ||
| #else | ||
| if (!isString(tempfile_path) || LENGTH(tempfile_path) != 1) { | ||
| INTERNAL_STOP(_("spillConnectionToFile: tempfile_path must be a single string")); // # nocov | ||
| } | ||
|
|
||
| if (!isReal(nrows_limit) || LENGTH(nrows_limit) != 1) { | ||
| INTERNAL_STOP(_("spillConnectionToFile: nrows_limit must be a single numeric value")); // # nocov | ||
| } | ||
|
|
||
| Rconnection con = R_GetConnection(connection); | ||
| const char *filepath = CHAR(STRING_ELT(tempfile_path, 0)); | ||
| const double nrows_max = REAL_RO(nrows_limit)[0]; | ||
| const bool limit_rows = R_FINITE(nrows_max) && nrows_max >= 0.0; | ||
| size_t row_limit = 0; | ||
| if (limit_rows) { | ||
| row_limit = (size_t)nrows_max; | ||
| if (row_limit == 0) row_limit = 100; // read at least 100 rows if nrows==0 | ||
| row_limit++; // cater for potential header row | ||
| } | ||
|
|
||
| FILE *outfile = fopen(filepath, "wb"); | ||
| if (outfile == NULL) { | ||
| STOP(_("spillConnectionToFile: failed to open temp file '%s' for writing: %s"), filepath, strerror(errno)); // # nocov | ||
| } | ||
|
|
||
| // Read and write in chunks // TODO tune chunk size | ||
| size_t chunk_size = 256 * 1024; | ||
| char *buffer = malloc(chunk_size); | ||
| if (!buffer) { | ||
| fclose(outfile); // # nocov | ||
| STOP(_("spillConnectionToFile: failed to allocate buffer")); // # nocov | ||
| } | ||
|
|
||
| size_t total_read = 0; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An R connection may produce more than 4 gigabytes of data even on a 32-bit system, because R enables Large File Support. |
||
| size_t nrows_seen = 0; | ||
|
|
||
| while (true) { | ||
| size_t nread = R_ReadConnection(con, buffer, chunk_size); | ||
|
||
| if (nread == 0) { | ||
| break; // EOF | ||
| } | ||
|
|
||
| size_t bytes_to_write = nread; | ||
| if (limit_rows && nrows_seen < row_limit) { | ||
| for (size_t i = 0; i < nread; i++) { | ||
| if (buffer[i] == '\n') { | ||
| nrows_seen++; | ||
| if (nrows_seen >= row_limit) { | ||
| bytes_to_write = i + 1; | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| size_t nwritten = fwrite(buffer, 1, bytes_to_write, outfile); | ||
| if (nwritten != bytes_to_write) { | ||
| // # nocov start | ||
| free(buffer); | ||
| fclose(outfile); | ||
| STOP(_("spillConnectionToFile: write error %s (wrote %zu of %zu bytes)"), strerror(errno), nwritten, bytes_to_write); | ||
| // # nocov end | ||
| } | ||
| total_read += bytes_to_write; | ||
|
|
||
| if (limit_rows && nrows_seen >= row_limit) { | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| free(buffer); | ||
| fclose(outfile); | ||
| return ScalarReal((double)total_read); | ||
| #endif // was R_CONNECTIONS_VERSION not != 1? | ||
| } | ||
ben-schwen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| void halt__(bool warn, const char *format, ...) | ||
| { | ||
| // Solves: http://stackoverflow.com/questions/18597123/fread-data-table-locks-files | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.