Why Your Node.js Stream Suddenly Stops Working After .once()

Node.js Streams
Node.js Streams

If you’ve been working with Node.js streams for a while, you’re likely familiar with the EventEmitter-based API especially .on() and .once() for handling events like data, end, or error.

But here’s a nuance that can quietly ruin your day: using .once() to read from a stream can make it unusable for further consumption. No errors. No warnings. Just… silence.

Let me show you exactly what happened to me, how I found out, and what you should do instead.

The Subtle Bug

Here’s the setup: I’m uploading a CSV file through a NestJS controller and parsing it using csv-parse. I first want to peek into the stream to check the headers (to validate them before processing), and then go ahead and parse the entire stream in batches.

 @Public()
  @UseInterceptors(FileInterceptor('file'))
  async uploadBvnResponseCsv(
    @UploadedFile(
      new ParseFilePipe({
        validators: [
          new MaxFileSizeValidator({ maxSize: 2 * 1024 * 1024 * 1024 }), // 2GB in bytes
          new FileTypeValidator({ fileType: 'text/csv' }),
        ],
        fileIsRequired: true,
        errorHttpStatusCode: 400,
      }),
    )
    file: Express.Multer.File,
  ) {
    return await this.clientService.uploadBvnResponse(file);
  }
 async uploadBvnResponse(file: Express.Multer.File) {
    const fileStream = this.streamParseCsv(file);

    const fileHeaders = await new Promise<string[]>((resolve, reject) => {
      let firstRow: any = null;

      fileStream.once('data', (data: BvnResponse) => {
        firstRow = data;
        resolve(Object.keys(data));
      });
      fileStream.once('error', (err: any) => reject(err));

      fileStream.once('end', () => {
        if (!firstRow) {
          reject(new BadRequestException('No data found in CSV file'));
        }
      });
    });

    const isValidHeader = expectedHeaders.every((header) =>
      fileHeaders.includes(header),
    );

    if (!isValidHeader) {
      throw new BadRequestException(
        'CSV file headers do not match the expected schema.',
      );
    }

    const results = {
      recordsFound: 0,
      recordsCreated: 0,
      duplicateRecords: 0,
      failedToCreateRecords: 0,
    };

    const BATCH_SIZE = 100;
    let batch: BvnResponse[] = [];

    // The problematic part of the code
    try {
      for await (const data of fileStream) {
        results.recordsFound++;

        const bvnResponse = this.mapCsvDataToBvnResponse(data);
        batch.push(bvnResponse);

        if (batch.length >= BATCH_SIZE) {
          await this.processBatch(batch, results);
          batch = [];
        }
      }

      // Process remaining records
      if (batch.length > 0) {
        await this.processBatch(batch, results);
      }
    } catch (error) {
      this.logger.error(`Error processing CSV: ${error.message}`);
      throw new InternalServerErrorException('Error processing CSV file');
    }

    return results;
  }
 // we use a stream to parse the csv file for large files
  // we do this because the csv file is too large to be parsed in one go
  // we expect at the very least 5000 records in a single csv file
  private streamParseCsv(file: Express.Multer.File) {
    if (!file.buffer || file.buffer.length === 0) {
      this.logger.warn('Empty file buffer received');
    }
    const parser = csv.parse({
      delimiter: '\t', // from my observation, the delimiter is a tab not a comma, parsing the csv file with a comma will result in an error
      columns: true,
      skip_empty_lines: true,
      trim: true,
      relax_quotes: true,
      bom: true,
      quote: "'",
    });

    // Convert buffer to stream
    const readableStream = new Readable();
    readableStream.push(file.buffer);
    readableStream.push(null);

    const parsedStream = readableStream.pipe(parser);

    return parsedStream;
  }

Here’s the simplified flow:

const fileStream = this.streamParseCsv(file);

const fileHeaders = await new Promise<string[]>((resolve, reject) => {
  let firstRow = null;

  fileStream.once("data", (data) => {
    firstRow = data;
    resolve(Object.keys(data));
  });

  fileStream.once("error", (err) => reject(err));

  fileStream.once("end", () => {
    if (!firstRow) {
      reject(new BadRequestException("No data found in CSV file"));
    }
  });
});

So far so good.

But then this fails silently:

for await (const data of fileStream) {
  // This block never runs
}

Why This Happens

The issue is that .once('data') pulls data from the stream, and by doing so, you move the internal cursor forward. If you do not “recreate” the stream or buffer the data, you lose access to the remaining stream content.

In simpler terms, you consumed part of the stream when peeking, and you can’t rewind it unless you’re working with something like a buffer-based or custom duplex stream.

Node streams do not support rewinding. Once consumed, that portion is gone.

The Fix

Option 1: Clone or recreate the stream

Instead of using the same stream instance twice, buffer the file and create a fresh stream for each use:

// First, read headers
const headerStream = this.streamParseCsv(file);
const headers = await this.peekHeaders(headerStream);

// Then recreate the stream for full parsing
const dataStream = this.streamParseCsv(file);
for await (const data of dataStream) {
  // Now this works
}

Option 2: Peek within the same stream logic

If you’re okay adding some complexity, you can peek and retain the row to reprocess:

let firstRowConsumed = null;

const stream = this.streamParseCsv(file);
stream.once("data", (data) => {
  firstRowConsumed = data;
});

// Then push `firstRowConsumed` manually before the loop

But that often gets messy with async iterators.

Lesson Learnt

Node.js streams can be deceptive. Just because you didn’t get an error doesn’t mean everything is fine. Silent bugs like this are why it’s so important to understand how EventEmitter and stream internals work together.

If you’re using .once() or .on() to pre-process streams, make sure you’re not sabotaging future access to the data.

💡TL;DR

  • Don’t call .once(‘data’) and then try to iterate the same stream.

  • Either recreate the stream or buffer and reprocess.

  • Streams are read-once. Consuming them prematurely can leave you with nothing.