Analizar enormes archivos de registro en Node.js – leer en línea por línea

Necesito hacer algunos análisis de archivos de registro grandes (5-10 Gb) en Javascript / Node.js (estoy usando Cube).

El logline se ve algo así como:

10:00:43.343423 I'm a friendly log message. There are 5 cats, and 7 dogs. We are in state "SUCCESS". 

Necesitamos leer cada línea, hacer algunos análisis (por ejemplo, quitar 5 , 7 y SUCCESS ), luego bombear estos datos en Cube ( https://github.com/square/cube ) usando su cliente JS.

En primer lugar, ¿cuál es la forma canónica en Node para leer en un archivo, línea por línea?

Parece ser una pregunta bastante común en línea:

  • http://www.quora.com/What-is-the-best-way-to-read-a-file-line-by-line-in-node-js
  • ¿Lee un archivo una línea a la vez en node.js?

Muchas de las respuestas parecen apuntar a un grupo de módulos de terceros:

  • https://github.com/nickewing/line-reader
  • https://github.com/jahewson/node-byline
  • https://github.com/pkrumins/node-lazy
  • https://github.com/Gagle/Node-BufferedReader

Sin embargo, esto parece una tarea bastante básica: seguramente, hay una manera simple dentro del archivo stdlib de leer en un archivo de texto, línea por línea.

En segundo lugar, necesito procesar cada línea (por ejemplo, convertir la marca de tiempo en un objeto Date y extraer campos útiles).

¿Cuál es la mejor manera de hacerlo, maximizando el rendimiento? ¿Hay alguna manera de que no se bloquee en la lectura en cada línea o en el envío a Cube?

En tercer lugar, supongo que usar divisiones de cadena, y el equivalente JS de contiene (IndexOf! = -1?) Será mucho más rápido que las expresiones regulares. ¿Alguien ha tenido mucha experiencia en analizar cantidades masivas de datos de texto en Node.js?

Saludos, Victor

Busqué una solución para analizar archivos muy grandes (gbs) línea por línea usando una secuencia. Todas las bibliotecas y ejemplos de terceros no se ajustaban a mis necesidades, ya que procesaban los archivos, línea por línea (como 1, 2, 3, 4 …) o leían todo el archivo en la memoria.

La siguiente solución puede analizar archivos muy grandes, línea por línea usando stream & pipe. Para las pruebas utilicé un archivo de 2.1 gb con 17.000.000 de registros. El uso de Ram no excedió los 60 mb.

 var fs = require('fs') , es = require('event-stream'); var lineNr = 0; var s = fs.createReadStream('very-large-file.csv') .pipe(es.split()) .pipe(es.mapSync(function(line){ // pause the readstream s.pause(); lineNr += 1; // process line here and call s.resume() when rdy // function below was for logging memory usage logMemoryUsage(lineNr); // resume the readstream, possibly from a callback s.resume(); }) .on('error', function(err){ console.log('Error while reading file.', err); }) .on('end', function(){ console.log('Read entire file.') }) ); 

enter image description here

¡Por favor déjame saber cómo va!

Puede usar el paquete readline incorporado, consulte los documentos aquí . Yo uso stream para crear una nueva stream de salida.

 var fs = require('fs'), readline = require('readline'), stream = require('stream'); var instream = fs.createReadStream('/path/to/file'); var outstream = new stream; outstream.readable = true; outstream.writable = true; var rl = readline.createInterface({ input: instream, output: outstream, terminal: false }); rl.on('line', function(line) { console.log(line); //Do your stuff ... //Then write to outstream rl.write(cubestuff); }); 

Los archivos grandes tardarán un tiempo en procesarse. Dime si funciona.

Realmente me gustó la respuesta @gerard, que en realidad merece ser la respuesta correcta aquí. Hice algunas mejoras:

  • El código está en una clase (modular)
  • El análisis está incluido
  • La capacidad de reanudar se da al exterior en caso de que haya un trabajo asincrónico encadenado a leer el CSV, como insertar en DB, o una solicitud HTTP.
  • Lectura en bloques / tamaños de lote que el usuario puede declarar. También me encargué de la encoding en la transmisión, en caso de que tengas archivos con encoding diferente.

Aquí está el código:

 'use strict' const fs = require('fs'), util = require('util'), stream = require('stream'), es = require('event-stream'), parse = require("csv-parse"), iconv = require('iconv-lite'); class CSVReader { constructor(filename, batchSize, columns) { this.reader = fs.createReadStream(filename).pipe(iconv.decodeStream('utf8')) this.batchSize = batchSize || 1000 this.lineNumber = 0 this.data = [] this.parseOptions = {delimiter: '\t', columns: true, escape: '/', relax: true} } read(callback) { this.reader .pipe(es.split()) .pipe(es.mapSync(line => { ++this.lineNumber parse(line, this.parseOptions, (err, d) => { this.data.push(d[0]) }) if (this.lineNumber % this.batchSize === 0) { callback(this.data) } }) .on('error', function(){ console.log('Error while reading file.') }) .on('end', function(){ console.log('Read entirefile.') })) } continue () { this.data = [] this.reader.resume() } } module.exports = CSVReader 

Entonces, básicamente, así es como lo usarás:

 let reader = CSVReader('path_to_file.csv') reader.read(() => reader.continue()) 

Probé esto con un archivo CSV de 35GB y funcionó para mí y es por eso que elegí comstackrlo en la respuesta de @gerard , las retroalimentaciones son bienvenidas.

Usé https://www.npmjs.com/package/line-by-line para leer más de 1 000 000 líneas de un archivo de texto. En este caso, una capacidad ocupada de RAM era de aproximadamente 50-60 megabytes.

  const LineByLineReader = require('line-by-line'), lr = new LineByLineReader('big_file.txt'); lr.on('error', function (err) { // 'err' contains error object }); lr.on('line', function (line) { // pause emitting of lines... lr.pause(); // ...do your asynchronous line processing.. setTimeout(function () { // ...and continue emitting lines. lr.resume(); }, 100); }); lr.on('end', function () { // All lines are read, file is closed now. }); 

Además de leer el gran archivo línea por línea, también puede leerlo en porciones. Para más, consulte este artículo

 var offset = 0; var chunkSize = 2048; var chunkBuffer = new Buffer(chunkSize); var fp = fs.openSync('filepath', 'r'); var bytesRead = 0; while(bytesRead = fs.readSync(fp, chunkBuffer, 0, chunkSize, offset)) { offset += bytesRead; var str = chunkBuffer.slice(0, bytesRead).toString(); var arr = str.split('\n'); if(bytesRead = chunkSize) { // the last item of the arr may be not a full line, leave it to the next chunk offset -= arr.pop().length; } lines.push(arr); } console.log(lines); 

Todavía tengo el mismo problema. Después de comparar varios módulos que parecen tener esta característica, decidí hacerlo yo mismo, es más simple de lo que pensaba.

esencia: https://gist.github.com/deemstone/8279565

 var fetchBlock = lineByline(filepath, onEnd); fetchBlock(function(lines, start){ ... }); //lines{array} start{int} lines[0] No. 

fetchBlock() el archivo abierto en un cierre, que fetchBlock() devuelto obtendrá un bloque del archivo, final dividir en matriz (tratará el segmento de la última búsqueda).

Establecí el tamaño del bloque en 1024 para cada operación de lectura. Esto puede tener errores, pero la lógica del código es obvia, pruébalo tú mismo.

node-byline usa streams, por lo que preferiría ese para tus enormes archivos.

para sus conversiones de fecha usaría moment.js .

para maximizar su rendimiento, podría pensar en utilizar un clúster de software. hay algunos bonitos módulos que envuelven bastante bien el módulo de clúster nativo del nodo. me gusta cluster-master de isaacs. por ejemplo, puedes crear un grupo de x trabajadores que calculen un archivo.

para la comparación de divisiones frente a las expresiones regulares, use benchmark.js . No lo he probado hasta ahora. benchmark.js está disponible como un nodo-módulo

He creado un módulo de nodos para leer archivos de gran tamaño de forma asíncrona o JSON. Probado en archivos grandes.

 var fs = require('fs') , util = require('util') , stream = require('stream') , es = require('event-stream'); module.exports = FileReader; function FileReader(){ } FileReader.prototype.read = function(pathToFile, callback){ var returnTxt = ''; var s = fs.createReadStream(pathToFile) .pipe(es.split()) .pipe(es.mapSync(function(line){ // pause the readstream s.pause(); //console.log('reading line: '+line); returnTxt += line; // resume the readstream, possibly from a callback s.resume(); }) .on('error', function(){ console.log('Error while reading file.'); }) .on('end', function(){ console.log('Read entire file.'); callback(returnTxt); }) ); }; FileReader.prototype.readJSON = function(pathToFile, callback){ try{ this.read(pathToFile, function(txt){callback(JSON.parse(txt));}); } catch(err){ throw new Error('json file is not valid! '+err.stack); } }; 

Simplemente guarde el archivo como file-reader.js, y úselo de esta manera:

 var FileReader = require('./file-reader'); var fileReader = new FileReader(); fileReader.readJSON(__dirname + '/largeFile.json', function(jsonObj){/*callback logic here*/}); 

En base a esta respuesta a las preguntas, implementé una clase que puede usar para leer un archivo de forma sincronizada línea por línea con fs.readSync() . Puede hacer esta “pausa” y “reanudar” usando una promesa Q ( jQuery parece requerir un DOM así que no puede ejecutarlo con nodejs ):

 var fs = require('fs'); var Q = require('q'); var lr = new LineReader(filenameToLoad); lr.open(); var promise; workOnLine = function () { var line = lr.readNextLine(); promise = complexLineTransformation(line).then( function() {console.log('ok');workOnLine();}, function() {console.log('error');} ); } workOnLine(); complexLineTransformation = function (line) { var deferred = Q.defer(); // ... async call goes here, in callback: deferred.resolve('done ok'); or deferred.reject(new Error(error)); return deferred.promise; } function LineReader (filename) { this.moreLinesAvailable = true; this.fd = undefined; this.bufferSize = 1024*1024; this.buffer = new Buffer(this.bufferSize); this.leftOver = ''; this.read = undefined; this.idxStart = undefined; this.idx = undefined; this.lineNumber = 0; this._bundleOfLines = []; this.open = function() { this.fd = fs.openSync(filename, 'r'); }; this.readNextLine = function () { if (this._bundleOfLines.length === 0) { this._readNextBundleOfLines(); } this.lineNumber++; var lineToReturn = this._bundleOfLines[0]; this._bundleOfLines.splice(0, 1); // remove first element (pos, howmany) return lineToReturn; }; this.getLineNumber = function() { return this.lineNumber; }; this._readNextBundleOfLines = function() { var line = ""; while ((this.read = fs.readSync(this.fd, this.buffer, 0, this.bufferSize, null)) !== 0) { // read next bytes until end of file this.leftOver += this.buffer.toString('utf8', 0, this.read); // append to leftOver this.idxStart = 0 while ((this.idx = this.leftOver.indexOf("\n", this.idxStart)) !== -1) { // as long as there is a newline-char in leftOver line = this.leftOver.substring(this.idxStart, this.idx); this._bundleOfLines.push(line); this.idxStart = this.idx + 1; } this.leftOver = this.leftOver.substring(this.idxStart); if (line !== "") { break; } } }; }