Files
Spencer Pincott 97737ca1ae Initial commit
2024-07-15 22:20:13 -04:00

380 lines
13 KiB
JavaScript

const fs = require('fs');
const JSONStream = require('JSONStream');
const through = require('through');
const transform = require('stream-transform');
const Batch = require('batch-stream');
const async = require('async');
const csv = require('csvtojson');
const regexParser = require('regex-parser');
const chalk = require('chalk');
const algolia = require('algoliasearch');
const Base = require('./Base.js');
class ImportScript extends Base {
constructor() {
super();
// Bind class methods
this.defaultTransformations = this.defaultTransformations.bind(this);
this.suggestions = this.suggestions.bind(this);
this.checkMemoryUsage = this.checkMemoryUsage.bind(this);
this.handleHighMemoryUsage = this.handleHighMemoryUsage.bind(this);
this.handleExtremeMemoryUsage = this.handleExtremeMemoryUsage.bind(this);
this.setIndex = this.setIndex.bind(this);
this.setTransformations = this.setTransformations.bind(this);
this.setCsvOptions = this.setCsvOptions.bind(this);
this.conditionallyParseCsv = this.conditionallyParseCsv.bind(this);
this.setBatchSize = this.setBatchSize.bind(this);
this.estimateBatchSize = this.estimateBatchSize.bind(this);
this.updateBatchSize = this.updateBatchSize.bind(this);
this.importToAlgolia = this.importToAlgolia.bind(this);
this.retryImport = this.retryImport.bind(this);
this.indexFiles = this.indexFiles.bind(this);
this.start = this.start.bind(this);
// Define validation constants
this.message =
'\nExample: $ algolia import -s sourcefilepath -a algoliaappid -k algoliaapikey -n algoliaindexname -b batchsize -t transformationfilepath -m maxconcurrency -p csvtojsonparams\n\n';
this.params = [
'sourcefilepath',
'algoliaappid',
'algoliaapikey',
'algoliaindexname',
];
}
defaultTransformations(data, cb) {
cb(null, data);
}
suggestions() {
let output = `\nConsider reducing <batchSize> (currently ${
this.batchSize
}).`;
if (this.maxConcurrency > 1)
output += `\nConsider reducing <maxConcurrency> (currently ${
this.maxConcurrency
}).`;
return output;
}
checkMemoryUsage() {
// Exit early if high memory usage warning issued too recently
if (this.highMemoryUsage) return false;
// Get memory usage
const { usedMb, percentUsed } = this.getMemoryUsage();
// Handle if heap usage exceeds n% of estimated allocation for node process
if (percentUsed >= 70) this.handleHighMemoryUsage(percentUsed);
if (percentUsed >= 90) this.handleExtremeMemoryUsage(usedMb, percentUsed);
return false;
}
handleHighMemoryUsage(percentUsed) {
const newBatchSize = Math.floor(this.batchSize / 2);
this.updateBatchSize(newBatchSize);
this.writeProgress(
`High memory usage (${percentUsed}%). Reducing batchSize to ${newBatchSize}`
);
}
handleExtremeMemoryUsage(usedMb, percentUsed) {
// Issue warning
const name = `Warning: High memory usage`;
const message = `Memory usage at ${usedMb} MB (${percentUsed}% of heap allocation for this process).`;
// Set class instance flag to debounce future warnings
this.highMemoryUsage = true;
// Output warning
console.log(
chalk.white.bgRed(`\n${name}`),
chalk.red(`\n${message}`),
chalk.red(`${this.suggestions()}`)
);
// Reset flag in 30 seconds
setTimeout(() => {
this.highMemoryUsage = false;
}, 30000);
}
setIndex(options) {
// Set Algolia index
this.client = algolia(options.appId, options.apiKey);
this.index = this.client.initIndex(options.indexName);
}
setTransformations(options) {
try {
// Set JSON record transformations
const transformations = options.transformations
? require(this.normalizePath(options.transformations))
: null;
// Validate transformations function input param
const valid = transformations && typeof transformations === 'function';
// Assign our transformations function using provided custom transformations file if exists
this.formatRecord = valid ? transformations : this.defaultTransformations;
} catch (e) {
throw e;
}
}
setCsvOptions(options) {
try {
this.csvOptions = options.csvToJsonParams
? JSON.parse(options.csvToJsonParams)
: null;
if (!this.csvOptions) return;
const csvToJsonRegexPropertyList = ['includeColumns', 'ignoreColumns'];
csvToJsonRegexPropertyList.forEach(prop => {
if (this.csvOptions.hasOwnProperty(prop)) {
this.csvOptions[prop] = regexParser(this.csvOptions[prop]);
}
});
} catch (e) {
throw e;
}
}
conditionallyParseCsv(isCsv) {
// Return the appropriate writestream for piping depending on filetype
return isCsv
? csv(this.csvOptions) // Convert from CSV to JSON
: through(); // Do nothing
}
async setBatchSize(options) {
try {
// If user provided batchSize, use and exit early
// Otherwise calculate and set optimal batch size
if (options.objectsPerBatch !== null) {
this.batchSize = options.objectsPerBatch;
return;
}
// Test files to estimate optimal batch size
const estimatedBatchSize = await this.estimateBatchSize();
// Test network upload speed
const uploadSpeedMb = await this.getNetworkSpeed();
// Calculate optimal batch size
this.writeProgress('Calculating optimal batch size...');
let batchSize;
// Reconcile batch size with network speed
if (uploadSpeedMb >= this.desiredBatchSizeMb)
batchSize = Math.floor(estimatedBatchSize);
else
batchSize = Math.floor(
(uploadSpeedMb / this.desiredBatchSizeMb) * estimatedBatchSize
);
// Ensure minimum batch size is enforced
batchSize = Math.max(this.minBatchSize, batchSize);
console.log(chalk.blue(`\nOptimal batch size: ${batchSize}`));
// Set batch size
this.batchSize = batchSize;
} catch (e) {
throw e;
}
}
estimateBatchSize() {
// Read file, estimate average record size, estimate batch size
// Return estimated batch size divided by maxConcurrency
return new Promise((resolve, reject) => {
try {
const filename = this.filenames[0];
const file = `${this.directory}/${filename}`;
const isCsv = filename.split('.').pop() === 'csv';
const fileStream = fs.createReadStream(file, {
autoclose: true,
flags: 'r',
});
this.writeProgress(`Estimating data size...`);
const jsonStreamOption = isCsv ? null : '*';
fileStream
.pipe(this.conditionallyParseCsv(isCsv))
.pipe(JSONStream.parse(jsonStreamOption))
.pipe(transform(this.formatRecord))
.pipe(new Batch({ size: 10000 }))
.pipe(
through(data => {
const count = data.length;
const string = JSON.stringify(data);
const batchSizeMb = this.getStringSizeMb(string);
const avgRecordSizeMb = batchSizeMb / count;
const avgRecordSizeKb = Math.ceil(avgRecordSizeMb * 1000);
const roughBatchSize = this.desiredBatchSizeMb / avgRecordSizeMb;
const estimatedBatchSize = Math.floor(
roughBatchSize / this.maxConcurrency
);
console.log(
chalk.blue(`\nAverage record size: ${avgRecordSizeKb} Kb`)
);
fileStream.destroy();
resolve(estimatedBatchSize);
})
);
} catch (e) {
reject(e);
}
});
}
updateBatchSize(newSize) {
this.batchSize = newSize;
}
getBatchStream() {
return new Batch({ size: this.batchSize });
}
async importToAlgolia(data) {
// Method to index batches of records in Algolia
try {
await this.index.addObjects(data);
this.importCount += data.length;
this.writeProgress(`Records indexed: ${this.importCount}`);
} catch (e) {
let message = e.message;
let addendum = e.stack;
if (e.name === 'AlgoliaSearchRequestTimeoutError') {
message = `You may be attempting to import batches too large for the network connection.`;
addendum = this.suggestions();
this.retryImport(data);
}
console.log(
chalk.white.bgRed(`\nImport error: ${e.name}`),
chalk.red(`\n${message}`),
chalk.red(addendum)
);
throw e;
}
}
retryImport(data) {
// Algolia import retry strategy
try {
this.retryCount++;
console.log(`\n(${this.retryCount}) Retrying batch...`);
const importedBatchCount = Math.floor(this.importCount / this.batchSize);
const retryLimit =
this.retryCount > 15 && this.retryCount > importedBatchCount / 2;
if (retryLimit) {
console.log(
chalk.white.bgRed(`\nError: Failure to index data`),
chalk.red(`\nRetry limit reached.`),
chalk.red(this.suggestions())
);
return;
}
// Split data in half
const middle = Math.floor(data.length / 2);
const firstHalf = data.splice(0, middle);
// Reduce batchsize
if (this.batchSize > middle) this.updateBatchSize(middle);
// Push each half of data into import queue
this.queue.push([firstHalf]);
this.queue.push([data]);
} catch (e) {
console.error('Retry error:', e);
throw e;
}
}
indexFiles(filenames) {
// Recursive method that iterates through an array of filenames, opens a read stream for each file
// then pipes the read stream through a series of transformations (parse CSV/JSON objects, transform
// them, batch them, index them in Algolia) while imposing a queue so that only so many
// indexing threads will be run in parallel
if (filenames.length <= 0) {
console.log('\nDone reading files');
return;
}
// Start new file read stream
// Note: filenames is a reference to the mutable class instance variable this.filenames
const filename = filenames.pop();
const file = `${this.directory}/${filename}`;
const isCsv = filename.split('.').pop() === 'csv';
const fileStream = fs.createReadStream(file, {
autoclose: true,
flags: 'r',
});
fileStream.on('data', () => {
if (this.queue.length() >= this.maxConcurrency) {
// If async upload queue is full, pause reading from file stream
fileStream.pause();
}
});
fileStream.on('end', () => {
// File complete, process next file
this.indexFiles(filenames);
});
// Once the async upload queue is drained, resume reading from file stream
this.queue.drain = () => {
fileStream.resume();
};
// Handle parsing, transforming, batching, and indexing JSON and CSV files
console.log(`\nImporting [${filename}]`);
const jsonStreamOption = isCsv ? null : '*';
fileStream
.pipe(this.conditionallyParseCsv(isCsv, filename))
.pipe(JSONStream.parse(jsonStreamOption))
.pipe(transform(this.formatRecord))
.pipe(this.getBatchStream())
.pipe(
through(data => {
this.checkMemoryUsage();
this.queue.push([data]);
})
);
}
async start(program) {
// Script reads JSON or CSV file, or directory of such files, optionally applies
// transformations, then batches and indexes the data in Algolia.
// Validate command; if invalid display help text and exit
this.validate(program, this.message, this.params);
// Config params
const options = {
sourceFilepath: program.sourcefilepath,
appId: program.algoliaappid,
apiKey: program.algoliaapikey,
indexName: program.algoliaindexname,
objectsPerBatch: program.batchsize || null,
transformations: program.transformationfilepath || null,
maxConcurrency: program.maxconcurrency || 2,
csvToJsonParams: program.params || null,
};
// Configure Algolia (this.client, this.index)
this.setIndex(options);
// Configure source paths (this.directory, this.filenames)
this.setSource(options);
// Configure transformations (this.formatRecord)
this.setTransformations(options);
// Configure optional csvtojson params (this.csvOptions)
this.setCsvOptions(options);
// Configure data upload parameters
this.maxConcurrency = options.maxConcurrency;
// Theoretically desirable batch size in MB
this.desiredBatchSizeMb = 10;
// Minimum batch size
this.minBatchSize = 100;
// Configure number of records to index per batch (this.batchSize, this.batch)
await this.setBatchSize(options);
// Assign dangerous memory usage flag
this.highMemoryUsage = false;
// Assign import count
this.importCount = 0;
// Assign retry count
this.retryCount = 0;
// Assign async queue
this.queue = async.queue(this.importToAlgolia, this.maxConcurrency);
// Execute import
console.log(chalk.bgGreen.white('Starting import...'));
return this.indexFiles(this.filenames);
}
}
const importScript = new ImportScript();
module.exports = importScript;