If you’ve been working with Node.js streams for a while, you’re likely familiar with the EventEmitter-based API especially .on()
and .once()
for handling events like data
, end
, or error
.
But here’s a nuance that can quietly ruin your day: using .once()
to read from a stream can make it unusable for further consumption. No errors. No warnings. Just… silence.
Let me show you exactly what happened to me, how I found out, and what you should do instead.
The Subtle Bug
Here’s the setup:
I’m uploading a CSV file through a NestJS controller and parsing it using csv-parse
. I first want to peek into the stream to check the headers (to validate them before processing), and then go ahead and parse the entire stream in batches.
@Public()
@UseInterceptors(FileInterceptor('file'))
async uploadBvnResponseCsv(
@UploadedFile(
new ParseFilePipe({
validators: [
new MaxFileSizeValidator({ maxSize: 2 * 1024 * 1024 * 1024 }), // 2GB in bytes
new FileTypeValidator({ fileType: 'text/csv' }),
],
fileIsRequired: true,
errorHttpStatusCode: 400,
}),
)
file: Express.Multer.File,
) {
return await this.clientService.uploadBvnResponse(file);
}
async uploadBvnResponse(file: Express.Multer.File) {
const fileStream = this.streamParseCsv(file);
const fileHeaders = await new Promise<string[]>((resolve, reject) => {
let firstRow: any = null;
fileStream.once('data', (data: BvnResponse) => {
firstRow = data;
resolve(Object.keys(data));
});
fileStream.once('error', (err: any) => reject(err));
fileStream.once('end', () => {
if (!firstRow) {
reject(new BadRequestException('No data found in CSV file'));
}
});
});
const isValidHeader = expectedHeaders.every((header) =>
fileHeaders.includes(header),
);
if (!isValidHeader) {
throw new BadRequestException(
'CSV file headers do not match the expected schema.',
);
}
const results = {
recordsFound: 0,
recordsCreated: 0,
duplicateRecords: 0,
failedToCreateRecords: 0,
};
const BATCH_SIZE = 100;
let batch: BvnResponse[] = [];
// The problematic part of the code
try {
for await (const data of fileStream) {
results.recordsFound++;
const bvnResponse = this.mapCsvDataToBvnResponse(data);
batch.push(bvnResponse);
if (batch.length >= BATCH_SIZE) {
await this.processBatch(batch, results);
batch = [];
}
}
// Process remaining records
if (batch.length > 0) {
await this.processBatch(batch, results);
}
} catch (error) {
this.logger.error(`Error processing CSV: ${error.message}`);
throw new InternalServerErrorException('Error processing CSV file');
}
return results;
}
// we use a stream to parse the csv file for large files
// we do this because the csv file is too large to be parsed in one go
// we expect at the very least 5000 records in a single csv file
private streamParseCsv(file: Express.Multer.File) {
if (!file.buffer || file.buffer.length === 0) {
this.logger.warn('Empty file buffer received');
}
const parser = csv.parse({
delimiter: '\t', // from my observation, the delimiter is a tab not a comma, parsing the csv file with a comma will result in an error
columns: true,
skip_empty_lines: true,
trim: true,
relax_quotes: true,
bom: true,
quote: "'",
});
// Convert buffer to stream
const readableStream = new Readable();
readableStream.push(file.buffer);
readableStream.push(null);
const parsedStream = readableStream.pipe(parser);
return parsedStream;
}
Here’s the simplified flow:
const fileStream = this.streamParseCsv(file);
const fileHeaders = await new Promise<string[]>((resolve, reject) => {
let firstRow = null;
fileStream.once("data", (data) => {
firstRow = data;
resolve(Object.keys(data));
});
fileStream.once("error", (err) => reject(err));
fileStream.once("end", () => {
if (!firstRow) {
reject(new BadRequestException("No data found in CSV file"));
}
});
});
So far so good.
But then this fails silently:
for await (const data of fileStream) {
// This block never runs
}
Why This Happens
The issue is that .once('data')
pulls data from the stream, and by doing so, you move the internal cursor forward. If you do not “recreate” the stream or buffer the data, you lose access to the remaining stream content.
In simpler terms, you consumed part of the stream when peeking, and you can’t rewind it unless you’re working with something like a buffer-based or custom duplex stream.
Node streams do not support rewinding. Once consumed, that portion is gone.
The Fix
Option 1: Clone or recreate the stream
Instead of using the same stream instance twice, buffer the file and create a fresh stream for each use:
// First, read headers
const headerStream = this.streamParseCsv(file);
const headers = await this.peekHeaders(headerStream);
// Then recreate the stream for full parsing
const dataStream = this.streamParseCsv(file);
for await (const data of dataStream) {
// Now this works
}
Option 2: Peek within the same stream logic
If you’re okay adding some complexity, you can peek and retain the row to reprocess:
let firstRowConsumed = null;
const stream = this.streamParseCsv(file);
stream.once("data", (data) => {
firstRowConsumed = data;
});
// Then push `firstRowConsumed` manually before the loop
But that often gets messy with async iterators.
Lesson Learnt
Node.js streams can be deceptive. Just because you didn’t get an error doesn’t mean everything is fine. Silent bugs like this are why it’s so important to understand how EventEmitter and stream internals work together.
If you’re using .once()
or .on()
to pre-process streams, make sure you’re not sabotaging future access to the data.
💡TL;DR
-
Don’t call .once(‘data’) and then try to iterate the same stream.
-
Either recreate the stream or buffer and reprocess.
-
Streams are read-once. Consuming them prematurely can leave you with nothing.