380 lines
13 KiB
JavaScript
380 lines
13 KiB
JavaScript
const fs = require('fs');
|
|
const JSONStream = require('JSONStream');
|
|
const through = require('through');
|
|
const transform = require('stream-transform');
|
|
const Batch = require('batch-stream');
|
|
const async = require('async');
|
|
const csv = require('csvtojson');
|
|
const regexParser = require('regex-parser');
|
|
const chalk = require('chalk');
|
|
const algolia = require('algoliasearch');
|
|
const Base = require('./Base.js');
|
|
|
|
class ImportScript extends Base {
|
|
constructor() {
|
|
super();
|
|
// Bind class methods
|
|
this.defaultTransformations = this.defaultTransformations.bind(this);
|
|
this.suggestions = this.suggestions.bind(this);
|
|
this.checkMemoryUsage = this.checkMemoryUsage.bind(this);
|
|
this.handleHighMemoryUsage = this.handleHighMemoryUsage.bind(this);
|
|
this.handleExtremeMemoryUsage = this.handleExtremeMemoryUsage.bind(this);
|
|
this.setIndex = this.setIndex.bind(this);
|
|
this.setTransformations = this.setTransformations.bind(this);
|
|
this.setCsvOptions = this.setCsvOptions.bind(this);
|
|
this.conditionallyParseCsv = this.conditionallyParseCsv.bind(this);
|
|
this.setBatchSize = this.setBatchSize.bind(this);
|
|
this.estimateBatchSize = this.estimateBatchSize.bind(this);
|
|
this.updateBatchSize = this.updateBatchSize.bind(this);
|
|
this.importToAlgolia = this.importToAlgolia.bind(this);
|
|
this.retryImport = this.retryImport.bind(this);
|
|
this.indexFiles = this.indexFiles.bind(this);
|
|
this.start = this.start.bind(this);
|
|
// Define validation constants
|
|
this.message =
|
|
'\nExample: $ algolia import -s sourcefilepath -a algoliaappid -k algoliaapikey -n algoliaindexname -b batchsize -t transformationfilepath -m maxconcurrency -p csvtojsonparams\n\n';
|
|
this.params = [
|
|
'sourcefilepath',
|
|
'algoliaappid',
|
|
'algoliaapikey',
|
|
'algoliaindexname',
|
|
];
|
|
}
|
|
|
|
defaultTransformations(data, cb) {
|
|
cb(null, data);
|
|
}
|
|
|
|
suggestions() {
|
|
let output = `\nConsider reducing <batchSize> (currently ${
|
|
this.batchSize
|
|
}).`;
|
|
if (this.maxConcurrency > 1)
|
|
output += `\nConsider reducing <maxConcurrency> (currently ${
|
|
this.maxConcurrency
|
|
}).`;
|
|
return output;
|
|
}
|
|
|
|
checkMemoryUsage() {
|
|
// Exit early if high memory usage warning issued too recently
|
|
if (this.highMemoryUsage) return false;
|
|
// Get memory usage
|
|
const { usedMb, percentUsed } = this.getMemoryUsage();
|
|
// Handle if heap usage exceeds n% of estimated allocation for node process
|
|
if (percentUsed >= 70) this.handleHighMemoryUsage(percentUsed);
|
|
if (percentUsed >= 90) this.handleExtremeMemoryUsage(usedMb, percentUsed);
|
|
return false;
|
|
}
|
|
|
|
handleHighMemoryUsage(percentUsed) {
|
|
const newBatchSize = Math.floor(this.batchSize / 2);
|
|
this.updateBatchSize(newBatchSize);
|
|
this.writeProgress(
|
|
`High memory usage (${percentUsed}%). Reducing batchSize to ${newBatchSize}`
|
|
);
|
|
}
|
|
|
|
handleExtremeMemoryUsage(usedMb, percentUsed) {
|
|
// Issue warning
|
|
const name = `Warning: High memory usage`;
|
|
const message = `Memory usage at ${usedMb} MB (${percentUsed}% of heap allocation for this process).`;
|
|
// Set class instance flag to debounce future warnings
|
|
this.highMemoryUsage = true;
|
|
// Output warning
|
|
console.log(
|
|
chalk.white.bgRed(`\n${name}`),
|
|
chalk.red(`\n${message}`),
|
|
chalk.red(`${this.suggestions()}`)
|
|
);
|
|
// Reset flag in 30 seconds
|
|
setTimeout(() => {
|
|
this.highMemoryUsage = false;
|
|
}, 30000);
|
|
}
|
|
|
|
setIndex(options) {
|
|
// Set Algolia index
|
|
this.client = algolia(options.appId, options.apiKey);
|
|
this.index = this.client.initIndex(options.indexName);
|
|
}
|
|
|
|
setTransformations(options) {
|
|
try {
|
|
// Set JSON record transformations
|
|
const transformations = options.transformations
|
|
? require(this.normalizePath(options.transformations))
|
|
: null;
|
|
// Validate transformations function input param
|
|
const valid = transformations && typeof transformations === 'function';
|
|
// Assign our transformations function using provided custom transformations file if exists
|
|
this.formatRecord = valid ? transformations : this.defaultTransformations;
|
|
} catch (e) {
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
setCsvOptions(options) {
|
|
try {
|
|
this.csvOptions = options.csvToJsonParams
|
|
? JSON.parse(options.csvToJsonParams)
|
|
: null;
|
|
if (!this.csvOptions) return;
|
|
const csvToJsonRegexPropertyList = ['includeColumns', 'ignoreColumns'];
|
|
csvToJsonRegexPropertyList.forEach(prop => {
|
|
if (this.csvOptions.hasOwnProperty(prop)) {
|
|
this.csvOptions[prop] = regexParser(this.csvOptions[prop]);
|
|
}
|
|
});
|
|
} catch (e) {
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
conditionallyParseCsv(isCsv) {
|
|
// Return the appropriate writestream for piping depending on filetype
|
|
return isCsv
|
|
? csv(this.csvOptions) // Convert from CSV to JSON
|
|
: through(); // Do nothing
|
|
}
|
|
|
|
async setBatchSize(options) {
|
|
try {
|
|
// If user provided batchSize, use and exit early
|
|
// Otherwise calculate and set optimal batch size
|
|
if (options.objectsPerBatch !== null) {
|
|
this.batchSize = options.objectsPerBatch;
|
|
return;
|
|
}
|
|
// Test files to estimate optimal batch size
|
|
const estimatedBatchSize = await this.estimateBatchSize();
|
|
// Test network upload speed
|
|
const uploadSpeedMb = await this.getNetworkSpeed();
|
|
// Calculate optimal batch size
|
|
this.writeProgress('Calculating optimal batch size...');
|
|
let batchSize;
|
|
// Reconcile batch size with network speed
|
|
if (uploadSpeedMb >= this.desiredBatchSizeMb)
|
|
batchSize = Math.floor(estimatedBatchSize);
|
|
else
|
|
batchSize = Math.floor(
|
|
(uploadSpeedMb / this.desiredBatchSizeMb) * estimatedBatchSize
|
|
);
|
|
// Ensure minimum batch size is enforced
|
|
batchSize = Math.max(this.minBatchSize, batchSize);
|
|
console.log(chalk.blue(`\nOptimal batch size: ${batchSize}`));
|
|
// Set batch size
|
|
this.batchSize = batchSize;
|
|
} catch (e) {
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
estimateBatchSize() {
|
|
// Read file, estimate average record size, estimate batch size
|
|
// Return estimated batch size divided by maxConcurrency
|
|
return new Promise((resolve, reject) => {
|
|
try {
|
|
const filename = this.filenames[0];
|
|
const file = `${this.directory}/${filename}`;
|
|
const isCsv = filename.split('.').pop() === 'csv';
|
|
const fileStream = fs.createReadStream(file, {
|
|
autoclose: true,
|
|
flags: 'r',
|
|
});
|
|
this.writeProgress(`Estimating data size...`);
|
|
const jsonStreamOption = isCsv ? null : '*';
|
|
fileStream
|
|
.pipe(this.conditionallyParseCsv(isCsv))
|
|
.pipe(JSONStream.parse(jsonStreamOption))
|
|
.pipe(transform(this.formatRecord))
|
|
.pipe(new Batch({ size: 10000 }))
|
|
.pipe(
|
|
through(data => {
|
|
const count = data.length;
|
|
const string = JSON.stringify(data);
|
|
const batchSizeMb = this.getStringSizeMb(string);
|
|
const avgRecordSizeMb = batchSizeMb / count;
|
|
const avgRecordSizeKb = Math.ceil(avgRecordSizeMb * 1000);
|
|
const roughBatchSize = this.desiredBatchSizeMb / avgRecordSizeMb;
|
|
const estimatedBatchSize = Math.floor(
|
|
roughBatchSize / this.maxConcurrency
|
|
);
|
|
console.log(
|
|
chalk.blue(`\nAverage record size: ${avgRecordSizeKb} Kb`)
|
|
);
|
|
fileStream.destroy();
|
|
resolve(estimatedBatchSize);
|
|
})
|
|
);
|
|
} catch (e) {
|
|
reject(e);
|
|
}
|
|
});
|
|
}
|
|
|
|
updateBatchSize(newSize) {
|
|
this.batchSize = newSize;
|
|
}
|
|
|
|
getBatchStream() {
|
|
return new Batch({ size: this.batchSize });
|
|
}
|
|
|
|
async importToAlgolia(data) {
|
|
// Method to index batches of records in Algolia
|
|
try {
|
|
await this.index.addObjects(data);
|
|
this.importCount += data.length;
|
|
this.writeProgress(`Records indexed: ${this.importCount}`);
|
|
} catch (e) {
|
|
let message = e.message;
|
|
let addendum = e.stack;
|
|
if (e.name === 'AlgoliaSearchRequestTimeoutError') {
|
|
message = `You may be attempting to import batches too large for the network connection.`;
|
|
addendum = this.suggestions();
|
|
this.retryImport(data);
|
|
}
|
|
console.log(
|
|
chalk.white.bgRed(`\nImport error: ${e.name}`),
|
|
chalk.red(`\n${message}`),
|
|
chalk.red(addendum)
|
|
);
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
retryImport(data) {
|
|
// Algolia import retry strategy
|
|
try {
|
|
this.retryCount++;
|
|
console.log(`\n(${this.retryCount}) Retrying batch...`);
|
|
const importedBatchCount = Math.floor(this.importCount / this.batchSize);
|
|
const retryLimit =
|
|
this.retryCount > 15 && this.retryCount > importedBatchCount / 2;
|
|
if (retryLimit) {
|
|
console.log(
|
|
chalk.white.bgRed(`\nError: Failure to index data`),
|
|
chalk.red(`\nRetry limit reached.`),
|
|
chalk.red(this.suggestions())
|
|
);
|
|
return;
|
|
}
|
|
// Split data in half
|
|
const middle = Math.floor(data.length / 2);
|
|
const firstHalf = data.splice(0, middle);
|
|
// Reduce batchsize
|
|
if (this.batchSize > middle) this.updateBatchSize(middle);
|
|
// Push each half of data into import queue
|
|
this.queue.push([firstHalf]);
|
|
this.queue.push([data]);
|
|
} catch (e) {
|
|
console.error('Retry error:', e);
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
indexFiles(filenames) {
|
|
// Recursive method that iterates through an array of filenames, opens a read stream for each file
|
|
// then pipes the read stream through a series of transformations (parse CSV/JSON objects, transform
|
|
// them, batch them, index them in Algolia) while imposing a queue so that only so many
|
|
// indexing threads will be run in parallel
|
|
if (filenames.length <= 0) {
|
|
console.log('\nDone reading files');
|
|
return;
|
|
}
|
|
// Start new file read stream
|
|
// Note: filenames is a reference to the mutable class instance variable this.filenames
|
|
const filename = filenames.pop();
|
|
const file = `${this.directory}/${filename}`;
|
|
const isCsv = filename.split('.').pop() === 'csv';
|
|
const fileStream = fs.createReadStream(file, {
|
|
autoclose: true,
|
|
flags: 'r',
|
|
});
|
|
|
|
fileStream.on('data', () => {
|
|
if (this.queue.length() >= this.maxConcurrency) {
|
|
// If async upload queue is full, pause reading from file stream
|
|
fileStream.pause();
|
|
}
|
|
});
|
|
|
|
fileStream.on('end', () => {
|
|
// File complete, process next file
|
|
this.indexFiles(filenames);
|
|
});
|
|
|
|
// Once the async upload queue is drained, resume reading from file stream
|
|
this.queue.drain = () => {
|
|
fileStream.resume();
|
|
};
|
|
|
|
// Handle parsing, transforming, batching, and indexing JSON and CSV files
|
|
console.log(`\nImporting [${filename}]`);
|
|
const jsonStreamOption = isCsv ? null : '*';
|
|
fileStream
|
|
.pipe(this.conditionallyParseCsv(isCsv, filename))
|
|
.pipe(JSONStream.parse(jsonStreamOption))
|
|
.pipe(transform(this.formatRecord))
|
|
.pipe(this.getBatchStream())
|
|
.pipe(
|
|
through(data => {
|
|
this.checkMemoryUsage();
|
|
this.queue.push([data]);
|
|
})
|
|
);
|
|
}
|
|
|
|
async start(program) {
|
|
// Script reads JSON or CSV file, or directory of such files, optionally applies
|
|
// transformations, then batches and indexes the data in Algolia.
|
|
|
|
// Validate command; if invalid display help text and exit
|
|
this.validate(program, this.message, this.params);
|
|
|
|
// Config params
|
|
const options = {
|
|
sourceFilepath: program.sourcefilepath,
|
|
appId: program.algoliaappid,
|
|
apiKey: program.algoliaapikey,
|
|
indexName: program.algoliaindexname,
|
|
objectsPerBatch: program.batchsize || null,
|
|
transformations: program.transformationfilepath || null,
|
|
maxConcurrency: program.maxconcurrency || 2,
|
|
csvToJsonParams: program.params || null,
|
|
};
|
|
// Configure Algolia (this.client, this.index)
|
|
this.setIndex(options);
|
|
// Configure source paths (this.directory, this.filenames)
|
|
this.setSource(options);
|
|
// Configure transformations (this.formatRecord)
|
|
this.setTransformations(options);
|
|
// Configure optional csvtojson params (this.csvOptions)
|
|
this.setCsvOptions(options);
|
|
// Configure data upload parameters
|
|
this.maxConcurrency = options.maxConcurrency;
|
|
// Theoretically desirable batch size in MB
|
|
this.desiredBatchSizeMb = 10;
|
|
// Minimum batch size
|
|
this.minBatchSize = 100;
|
|
// Configure number of records to index per batch (this.batchSize, this.batch)
|
|
await this.setBatchSize(options);
|
|
// Assign dangerous memory usage flag
|
|
this.highMemoryUsage = false;
|
|
// Assign import count
|
|
this.importCount = 0;
|
|
// Assign retry count
|
|
this.retryCount = 0;
|
|
// Assign async queue
|
|
this.queue = async.queue(this.importToAlgolia, this.maxConcurrency);
|
|
|
|
// Execute import
|
|
console.log(chalk.bgGreen.white('Starting import...'));
|
|
return this.indexFiles(this.filenames);
|
|
}
|
|
}
|
|
|
|
const importScript = new ImportScript();
|
|
module.exports = importScript;
|