diff --git a/NAMESPACE b/NAMESPACE index 361b706d3..c93a0a641 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -20,6 +20,7 @@ export(rbindlist) export(fifelse) export(fcase) export(fread) +export(binary_reopener) export(fwrite) export(foverlaps) export(shift) diff --git a/NEWS.md b/NEWS.md index 66648c41c..95cd86718 100644 --- a/NEWS.md +++ b/NEWS.md @@ -296,7 +296,10 @@ See [#2611](https://github.com/Rdatatable/data.table/issues/2611) for details. T # user system elapsed # 0.028 0.000 0.005 ``` - 20. `fread()` now supports the `comment.char` argument to skip trailing comments or comment-only lines, consistent with `read.table()`, [#856](https://github.com/Rdatatable/data.table/issues/856). The default remains `comment.char = ""` (no comment parsing) for backward compatibility and performance, in contrast to `read.table(comment.char = "#")`. Thanks to @arunsrinivasan and many others for the suggestion and @ben-schwen for the implementation. + +20. `fread()` now supports the `comment.char` argument to skip trailing comments or comment-only lines, consistent with `read.table()`, [#856](https://github.com/Rdatatable/data.table/issues/856). The default remains `comment.char = ""` (no comment parsing) for backward compatibility and performance, in contrast to `read.table(comment.char = "#")`. Thanks to @arunsrinivasan and many others for the suggestion and @ben-schwen for the implementation. + +21. `fread()` can now read from connections directly by spilling to a temporary file first, [#561](https://github.com/Rdatatable/data.table/issues/561). For the best throughput, point `tmpdir=` (or the global temp directory) to fast storage like an SSD or RAM. Thanks to Chris Neff for the report and @ben-schwen for the implementation. ### BUG FIXES diff --git a/R/fread.R b/R/fread.R index 16a72ed24..172477894 100644 --- a/R/fread.R +++ b/R/fread.R @@ -1,3 +1,39 @@ +# nocov start +# S3 generic that returns a function to open connections in binary mode +binary_reopener = function(con, ...) { + UseMethod("binary_reopener") +} + +binary_reopener.default = function(con, ...) { + con_class = class1(con) + stopf("Don't know how to reopen connection type '%s'. Need a connection opened in binary mode to continue.", con_class) +} + +binary_reopener.file = function(con, ...) { + function(description) file(description, "rb", ...) +} + +binary_reopener.gzfile = function(con, ...) { + function(description) gzfile(description, "rb", ...) +} + +binary_reopener.bzfile = function(con, ...) { + function(description) bzfile(description, "rb", ...) +} + +binary_reopener.url = function(con, ...) { + function(description) url(description, "rb", ...) +} + +binary_reopener.unz = function(con, ...) { + function(description) unz(description, "rb", ...) +} + +binary_reopener.pipe = function(con, ...) { + function(description) pipe(description, "rb", ...) +} +# nocov end + fread = function( input="", file=NULL, text=NULL, cmd=NULL, sep="auto", sep2="auto", dec="auto", quote="\"", nrows=Inf, header="auto", na.strings=getOption("datatable.na.strings","NA"), stringsAsFactors=FALSE, verbose=getOption("datatable.verbose",FALSE), @@ -55,7 +91,16 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") input = text } } - else if (is.null(cmd)) { + # Check if input is a connection and read it into memory + input_is_con = FALSE + if (!missing(input) && inherits(input, "connection")) { + input_is_con = TRUE + } else if (!is.null(file) && inherits(file, "connection")) { + input = file + input_is_con = TRUE + file = NULL + } + if (!input_is_con && is.null(cmd) && is.null(text)) { if (!is.character(input) || length(input)!=1L) { stopf("input= must be a single character string containing a file name, a system command containing at least one space, a URL starting 'http[s]://', 'ftp[s]://' or 'file://', or, the input data itself containing at least one \\n or \\r") } @@ -81,6 +126,51 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") } file = tmpFile } + connection_spill_info = NULL + if (input_is_con) { + if (verbose) { + catf("[00] Spill connection to tempfile\n Connection class: %s\n Reading connection into a temporary file... ", toString(class(input))) + flush.console() + } + spill_started.at = proc.time() + con_open = isOpen(input) + + needs_reopen = FALSE + if (con_open) { + con_summary = summary(input) + binary_modes = c("rb", "r+b") + if (!con_summary$mode %chin% binary_modes) needs_reopen = TRUE + } + + close_con = NULL + + if (needs_reopen) { + close(input) + input = binary_reopener(input)(con_summary$description) + close_con = input + } else if (!con_open) { + open(input, "rb") + close_con = input + } + if (!is.null(close_con)) on.exit(close(close_con), add=TRUE) + tmpFile = tempfile(tmpdir=tmpdir) + on.exit(unlink(tmpFile), add=TRUE) + bytes_copied = .Call(CspillConnectionToFile, input, tmpFile, as.numeric(nrows)) + spill_elapsed = (proc.time() - spill_started.at)[["elapsed"]] + + if (bytes_copied == 0) { + warningf("Connection has size 0. Returning a NULL %s.", if (data.table) 'data.table' else 'data.frame') + return(if (data.table) data.table(NULL) else data.frame(NULL)) + } + + if (verbose) { + catf("done in %s\n", timetaken(spill_started.at)) + flush.console() + } + connection_spill_info = c(spill_elapsed, bytes_copied) + input = tmpFile + file = tmpFile + } if (!is.null(file)) { if (!is.character(file) || length(file)!=1L) stopf("file= must be a single character string containing a filename, or URL starting 'http[s]://', 'ftp[s]://' or 'file://'") @@ -293,7 +383,7 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") tz="UTC" } ans = .Call(CfreadR,input,identical(input,file),sep,dec,quote,header,nrows,skip,na.strings,strip.white,blank.lines.skip,comment.char, - fill,showProgress,nThread,verbose,warnings2errors,logical01,logicalYN,select,drop,colClasses,integer64,encoding,keepLeadingZeros,tz=="UTC") + fill,showProgress,nThread,verbose,warnings2errors,logical01,logicalYN,select,drop,colClasses,integer64,encoding,keepLeadingZeros,tz=="UTC",connection_spill_info) if (!length(ans)) return(null.data.table()) # test 1743.308 drops all columns nr = length(ans[[1L]]) require_bit64_if_needed(ans) diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index e6ef9be1d..b5b7e5d61 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -2737,11 +2737,13 @@ if (test_bit64) { # getwd() has been set by test.data.table() to the location of this tests.Rraw file. Test files should be in the same directory. if (test_R.utils) { f = testDir("ch11b.dat.bz2") # http://www.stats.ox.ac.uk/pub/datasets/csb/ch11b.dat - test(900.1, fread(f, logical01=FALSE), as.data.table(read.table(f))) + test(900.1, DT<-fread(f, logical01=FALSE), as.data.table(read.table(f))) + test(900.15, fread(file(f), logical01=FALSE), DT) test(900.2, fread(f, logical01=TRUE), as.data.table(read.table(f))[,V5:=as.logical(V5)]) f = testDir("1206FUT.txt.bz2") # a CRLF line ending file (DOS) test(901.1, DT<-fread(f,strip.white=FALSE), setDT(read.table(f,sep="\t",header=TRUE,colClasses=as.vector(sapply(DT,class))))) + test(901.15, fread(file(f), strip.white=FALSE), DT) test(901.2, DT<-fread(f), setDT(read.table(f,sep="\t",header=TRUE,colClasses=as.vector(sapply(DT,class)),strip.white=TRUE))) } @@ -6654,8 +6656,10 @@ if (test_bit64 && test_R.utils) { ZBJBLOAJAQI = c("LHCYS AYE ZLEMYA IFU HEI JG FEYE", "", ""), JKCRUUBAVQ = c("", ".\\YAPCNXJ\\004570_850034_757\\VWBZSS_848482_600874_487_PEKT-6-KQTVIL-7_30\\IRVQT\\HUZWLBSJYHZ\\XFWPXQ-WSPJHC-00-0770000855383.KKZ", "") ) - test(1449.1, fread(testDir("quoted_multiline.csv.bz2"))[c(1L, 43:44), c(1L, 22:24)], DT) - test(1449.2, fread(testDir("quoted_multiline.csv.bz2"), integer64='character', select = 'GPMLHTLN')[c(1L, 43:44)][[1L]], DT[ , as.character(GPMLHTLN)]) + f = testDir("quoted_multiline.csv.bz2") + test(1449.1, fread(f)[c(1L, 43:44), c(1L, 22:24)], DT) + test(1449.15, fread(file(f))[c(1L, 43:44), c(1L, 22:24)], DT) + test(1449.2, fread(f, integer64='character', select = 'GPMLHTLN')[c(1L, 43:44)][[1L]], DT[ , as.character(GPMLHTLN)]) } # Fix for #927 @@ -21858,3 +21862,17 @@ test(2344.04, key(DT[, .(V4 = c("b", "a"), V2, V5 = c("y", "x"), V1)]), c("V1", # fread with quotes and single column #7366 test(2345, fread('"this_that"\n"2025-01-01 00:00:01"'), data.table(this_that = as.POSIXct("2025-01-01 00:00:01", tz="UTC"))) + +# fread supports connections #561 +f = testDir("russellCRLF.csv") +test(2346.1, fread(file=file(f, "r"), verbose=TRUE), fread(f), output="Spill connection to tempfile") +test(2346.2, fread(file(f, "r"), nrows=0L), fread(f, nrows=0L)) +test(2346.3, fread(file(f, "r"), nrows=5), fread(f, nrows=5)) +test(2346.4, fread(file(f, "r"), nrows=5, header=FALSE), fread(f, nrows=5, header=FALSE)) +# test with open connection consuming part of the connection before fread +con = file(f, "rb") +test(2346.5, {readLines(con, n=3); fread(con)}, fread(f, skip=3L)) +close(con) +file.create(f <- tempfile()) +test(2346.6, fread(file(f)), data.table(), warning="Connection has size 0.") +unlink(f) diff --git a/man/connection_opener.Rd b/man/connection_opener.Rd new file mode 100644 index 000000000..c22927c73 --- /dev/null +++ b/man/connection_opener.Rd @@ -0,0 +1,40 @@ +\name{binary_reopener} +\alias{binary_reopener} +\alias{binary_reopener.default} +\alias{binary_reopener.file} +\alias{binary_reopener.gzfile} +\alias{binary_reopener.bzfile} +\alias{binary_reopener.url} +\alias{binary_reopener.unz} +\alias{binary_reopener.pipe} +\title{ Create a function to open connections in binary mode } +\description{ + S3 generic that returns a function to open a connection in binary read mode. Used internally by \code{fread}. Exported so packages with custom connection classes can define methods. +} +\usage{ +binary_reopener(con, ...) +} +\arguments{ + \item{con}{ A connection object. } + \item{...}{ Additional arguments passed to the connection constructor. } +} +\details{ + Returns a function that accepts a description argument and opens a connection in binary read mode (\code{"rb"}). Methods are provided for \code{file}, \code{gzfile}, \code{bzfile}, \code{url}, \code{unz} and \code{pipe} connections. + + To support custom connection types with \code{fread}, define a method for your connection class that returns an opener function. +} +\value{ + A function that accepts a description argument and returns a connection object opened in binary read mode. +} +\examples{ +\dontrun{ +# Define a method for a custom connection class +binary_reopener.my_con = function(con, ...) { + function(description) my_con(description, mode = "rb", ...) +} +} +} +\seealso{ + \code{\link{fread}} +} +\keyword{ data } diff --git a/src/data.table.h b/src/data.table.h index 663f0adb4..5d4773b77 100644 --- a/src/data.table.h +++ b/src/data.table.h @@ -361,7 +361,8 @@ SEXP setcharvec(SEXP, SEXP, SEXP); SEXP chmatch_R(SEXP, SEXP, SEXP); SEXP chmatchdup_R(SEXP, SEXP, SEXP); SEXP chin_R(SEXP, SEXP); -SEXP freadR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); +SEXP freadR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); +SEXP spillConnectionToFile(SEXP, SEXP, SEXP); SEXP fwriteR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rbindlist(SEXP, SEXP, SEXP, SEXP, SEXP); SEXP setlistelt(SEXP, SEXP, SEXP); diff --git a/src/fread.c b/src/fread.c index 51b08af3f..e4be0b6e5 100644 --- a/src/fread.c +++ b/src/fread.c @@ -1575,9 +1575,16 @@ int freadMain(freadMainArgs _args) CloseHandle(hFile); // see https://msdn.microsoft.com/en-us/library/windows/desktop/aa366537(v=vs.85).aspx if (mmp == NULL) { #endif - int nbit = 8 * sizeof(char*); // #nocov - STOP(_("Opened %s file ok but could not memory map it. This is a %dbit process. %s."), filesize_to_str(fileSize), nbit, // # nocov - nbit <= 32 ? _("Please upgrade to 64bit") : _("There is probably not enough contiguous virtual memory available")); // # nocov + // # nocov start + int nbit = 8 * sizeof(char*); + if (nrowLimit < INT64_MAX) { + STOP(_("Opened %s file ok but could not memory map it. This is a %dbit process. Since you specified nrows=%"PRId64", try wrapping the file in a connection: fread(file('filename'), nrows=%"PRId64")."), + filesize_to_str(fileSize), nbit, nrowLimit, nrowLimit); + } else { + STOP(_("Opened %s file ok but could not memory map it. This is a %dbit process. %s."), filesize_to_str(fileSize), nbit, + nbit <= 32 ? _("Please upgrade to 64bit") : _("There is probably not enough contiguous virtual memory available")); + } + // # nocov end } sof = (const char*) mmp; if (verbose) DTPRINT(_(" Memory mapped ok\n")); @@ -2971,7 +2978,10 @@ int freadMain(freadMainArgs _args) if (verbose) { DTPRINT("=============================\n"); // # notranslate + tTot = tTot + (args.connectionSpillActive ? args.connectionSpillSeconds : 0.0); if (tTot < 0.000001) tTot = 0.000001; // to avoid nan% output in some trivially small tests where tot==0.000s + if (args.connectionSpillActive) + DTPRINT(_("%8.3fs (%3.0f%%) Spill connection to tempfile (%.3fGiB)\n"), args.connectionSpillSeconds, 100.0 * args.connectionSpillSeconds / tTot, args.connectionSpillBytes / (1024.0 * 1024.0 * 1024.0)); DTPRINT(_("%8.3fs (%3.0f%%) Memory map %.3fGiB file\n"), tMap - t0, 100.0 * (tMap - t0) / tTot, 1.0 * fileSize / (1024 * 1024 * 1024)); DTPRINT(_("%8.3fs (%3.0f%%) sep="), tLayout - tMap, 100.0 * (tLayout - tMap) / tTot); DTPRINT(sep == '\t' ? "'\\t'" : (sep == '\n' ? "'\\n'" : "'%c'"), sep); // # notranslate diff --git a/src/freadR.c b/src/freadR.c index d586aaed0..e7d7cb8b0 100644 --- a/src/freadR.c +++ b/src/freadR.c @@ -1,6 +1,8 @@ #include "fread.h" #include "freadR.h" #include "data.table.h" +#include +#include /***** TO DO ***** Restore test 1339 (balanced embedded quotes, see ?fread already updated). @@ -28,6 +30,7 @@ Secondary separator for list() columns, such as columns 11 and 12 in BED (no nee static int typeSxp[NUT] = { NILSXP, LGLSXP, LGLSXP, LGLSXP, LGLSXP, LGLSXP, LGLSXP, INTSXP, REALSXP, REALSXP, REALSXP, REALSXP, INTSXP, REALSXP, STRSXP, REALSXP, STRSXP }; static char typeRName[NUT][10] = { "NULL", "logical", "logical", "logical", "logical", "logical", "logical", "integer", "integer64", "double", "double", "double", "IDate", "POSIXct", "character", "numeric", "CLASS" }; static int typeEnum[NUT] = { CT_DROP, CT_EMPTY, CT_BOOL8_N, CT_BOOL8_U, CT_BOOL8_T, CT_BOOL8_L, CT_BOOL8_Y, CT_INT32, CT_INT64, CT_FLOAT64, CT_FLOAT64_HEX, CT_FLOAT64_EXT, CT_ISO8601_DATE, CT_ISO8601_TIME, CT_STRING, CT_FLOAT64, CT_STRING }; + static colType readInt64As = CT_INT64; static SEXP selectSxp; static SEXP dropSxp; @@ -77,7 +80,8 @@ SEXP freadR( SEXP integer64Arg, SEXP encodingArg, SEXP keepLeadingZerosArgs, - SEXP noTZasUTC + SEXP noTZasUTC, + SEXP connectionSpillArg ) { verbose = LOGICAL(verboseArg)[0]; @@ -170,6 +174,19 @@ SEXP freadR( args.warningsAreErrors = warningsAreErrors; args.keepLeadingZeros = LOGICAL(keepLeadingZerosArgs)[0]; args.noTZasUTC = LOGICAL(noTZasUTC)[0]; + args.connectionSpillActive = false; + args.connectionSpillSeconds = 0.0; + args.connectionSpillBytes = 0.0; + if (!isNull(connectionSpillArg)) { + if (!isReal(connectionSpillArg) || LENGTH(connectionSpillArg) != 2) + internal_error(__func__, "connectionSpillArg must be length-2 real vector"); // # nocov + const double *spill = REAL(connectionSpillArg); + args.connectionSpillSeconds = spill[0]; + args.connectionSpillBytes = spill[1]; + if (!R_FINITE(args.connectionSpillSeconds) || args.connectionSpillSeconds < 0) args.connectionSpillSeconds = 0.0; + if (!R_FINITE(args.connectionSpillBytes) || args.connectionSpillBytes < 0) args.connectionSpillBytes = 0.0; + args.connectionSpillActive = true; + } // === extras used for callbacks === if (!isString(integer64Arg) || LENGTH(integer64Arg) != 1) error(_("'integer64' must be a single character string")); @@ -724,6 +741,111 @@ void progress(int p, int eta) } // # nocov end +#if R_CONNECTIONS_VERSION == 1 +typedef struct { + Rconnection con; + const char *filepath; + size_t row_limit; + FILE *outfile; + char *buffer; +} SpillState; + +static void spill_cleanup(void *data) +{ + SpillState *state = (SpillState *)data; + if (!state) return; + free(state->buffer); // free(NULL) is safe no-op + if (state->outfile) { + fclose(state->outfile); + } +} + +static SEXP do_spill(void *data) +{ + SpillState *state = (SpillState *)data; + const size_t chunk_size = 256 * 1024; // TODO tune chunk size + + state->outfile = fopen(state->filepath, "wb"); + if (state->outfile == NULL) { + STOP(_("spillConnectionToFile: failed to open temp file '%s' for writing: %s"), state->filepath, strerror(errno)); // # nocov + } + + state->buffer = malloc(chunk_size); + if (!state->buffer) { + STOP(_("spillConnectionToFile: failed to allocate buffer")); // # nocov + } + const bool limit_rows = state->row_limit > 0; + size_t total_read = 0; + size_t nrows_seen = 0; + + while (true) { + size_t nread = R_ReadConnection(state->con, state->buffer, chunk_size); + if (nread == 0) { + break; // EOF + } + + size_t bytes_to_write = nread; + if (limit_rows && nrows_seen < state->row_limit) { + for (size_t i = 0; i < nread; i++) { + if (state->buffer[i] == '\n') { + nrows_seen++; + if (nrows_seen >= state->row_limit) { + bytes_to_write = i + 1; + break; + } + } + } + } + + size_t nwritten = fwrite(state->buffer, 1, bytes_to_write, state->outfile); + if (nwritten != bytes_to_write) { + STOP(_("spillConnectionToFile: write error %s (wrote %zu of %zu bytes)"), strerror(errno), nwritten, bytes_to_write); // # nocov + } + total_read += bytes_to_write; + + if (limit_rows && nrows_seen >= state->row_limit) { + break; + } + } + + return ScalarReal((double)total_read); +} +#endif // R_CONNECTIONS_VERSION == 1 + +// Spill connection contents to a tempfile so R-level fread can treat it like a filename +SEXP spillConnectionToFile(SEXP connection, SEXP tempfile_path, SEXP nrows_limit) { +#if R_CONNECTIONS_VERSION == 1 + if (!isString(tempfile_path) || LENGTH(tempfile_path) != 1) { + INTERNAL_STOP(_("spillConnectionToFile: tempfile_path must be a single string")); // # nocov + } + + if (!isReal(nrows_limit) || LENGTH(nrows_limit) != 1) { + INTERNAL_STOP(_("spillConnectionToFile: nrows_limit must be a single numeric value")); // # nocov + } + + SpillState state = { + .con = R_GetConnection(connection), + .filepath = translateChar(STRING_ELT(tempfile_path, 0)), + .row_limit = 0, + .outfile = NULL, + .buffer = NULL + }; + + const double nrows_max = REAL_RO(nrows_limit)[0]; + if (R_FINITE(nrows_max) && nrows_max >= 0.0) { + if (nrows_max > SIZE_MAX) + STOP(_("spillConnectionToFile: nrows_limit (%g) must fit into a native-size unsigned integer (<= %zu)"), nrows_max, (size_t)SIZE_MAX); // # nocov + state.row_limit = (size_t)nrows_max; + if (state.row_limit == 0) state.row_limit = 100; // read at least 100 rows if nrows==0 + state.row_limit++; // cater for potential header row + } + + return R_ExecWithCleanup(do_spill, &state, spill_cleanup, &state); +#else // R_CONNECTIONS_VERSION != 1 + INTERNAL_STOP(_("spillConnectionToFile: unexpected R_CONNECTIONS_VERSION = %d", R_CONNECTIONS_VERSION)); // # nocov +#endif +} + void halt__(bool warn, const char *format, ...) { // Solves: http://stackoverflow.com/questions/18597123/fread-data-table-locks-files diff --git a/src/freadR.h b/src/freadR.h index a13d2a1df..7e48cfe54 100644 --- a/src/freadR.h +++ b/src/freadR.h @@ -8,7 +8,10 @@ #include "po.h" #define FREAD_MAIN_ARGS_EXTRA_FIELDS \ - bool oldNoDateTime; + bool oldNoDateTime; \ + bool connectionSpillActive; \ + double connectionSpillSeconds; \ + double connectionSpillBytes; #define FREAD_PUSH_BUFFERS_EXTRA_FIELDS \ int nStringCols; \ diff --git a/src/init.c b/src/init.c index ef81a7a0e..a6befca52 100644 --- a/src/init.c +++ b/src/init.c @@ -67,6 +67,7 @@ R_CallMethodDef callMethods[] = { {"Cchmatchdup", (DL_FUNC) &chmatchdup_R, -1}, {"Cchin", (DL_FUNC) &chin_R, -1}, {"CfreadR", (DL_FUNC) &freadR, -1}, +{"CspillConnectionToFile", (DL_FUNC) &spillConnectionToFile, -1}, {"CfwriteR", (DL_FUNC) &fwriteR, -1}, {"Creorder", (DL_FUNC) &reorder, -1}, {"Crbindlist", (DL_FUNC) &rbindlist, -1},