diff --git a/src/commands.js b/src/commands.js new file mode 100644 index 0000000..6d8b0a9 --- /dev/null +++ b/src/commands.js @@ -0,0 +1,24 @@ +import { countHandler } from './commands/count.js'; +import { hashHandler } from './commands/hash.js'; +import { hashCompareHandler } from './commands/hashCompare.js'; +import { csvToJsonHandler } from './commands/csvToJson.js'; +import { jsonToCsvHandler } from './commands/jsonToCsv.js'; +import { upHandler, cdHandler, lsHandler } from './navigation.js'; +import { logStatsHandler } from './commands/logStats.js'; +import { encryptHandler } from './commands/encrypt.js'; +import { decryptHandler } from './commands/decrypt.js'; + + +export const COMMAND_HANDLERS_MAP = { + up: upHandler, + cd: cdHandler, + ls: lsHandler, + 'csv-to-json': csvToJsonHandler, + 'json-to-csv': jsonToCsvHandler, + count: countHandler, + hash: hashHandler, + 'hash-compare': hashCompareHandler, + 'log-stats': logStatsHandler, + encrypt: encryptHandler, + decrypt: decryptHandler, +} diff --git a/src/commands/count.js b/src/commands/count.js index e69de29..ab1757b 100644 --- a/src/commands/count.js +++ b/src/commands/count.js @@ -0,0 +1,48 @@ + +import { createReadStream } from 'node:fs'; +import { argParser } from '../utils/argParser.js'; +import { LINE_SEPARATOR, WORD_SEPARATOR } from '../constants.js'; +import { validateFileExtention } from '../utils/validateFileExtention.js'; + + +const getWordsFromString = (str) => str.split(WORD_SEPARATOR).filter(w => w.length > 0); +const getSum = (arr) => arr.reduce((acc, wc) => acc + wc, 0); + +export const countHandler = async (args) => { + const parsedArgs = argParser(args, { + input: { type: 'path', required: true }, + }); + + validateFileExtention(parsedArgs.input, '.txt'); + + const readableStream = createReadStream(parsedArgs.input, { encoding: 'utf-8' }); + + let buffer = ''; + let linesCount = 0; + let wordsCount = 0; + let charsCount = 0; + + for await (const chunk of readableStream) { + buffer += chunk; + + const lines = buffer.split(LINE_SEPARATOR); + // Count all symbols in line and single line separator as 1 char + charsCount += getSum(lines.map((line) => line.length)) + (lines.length - 1); + buffer = lines.pop(); + + linesCount += lines.length; + + wordsCount += getSum(lines.map(line => getWordsFromString(line).length)); + } + + // Add last line, even it's empty + linesCount++; + + if (buffer.length) { + wordsCount += getWordsFromString(buffer).length; + } + + console.log(`Lines: ${linesCount}`); + console.log(`Words: ${wordsCount}`); + console.log(`Characters: ${charsCount}`); +} \ No newline at end of file diff --git a/src/commands/csvToJson.js b/src/commands/csvToJson.js index e69de29..4e541d1 100644 --- a/src/commands/csvToJson.js +++ b/src/commands/csvToJson.js @@ -0,0 +1,81 @@ +import { Transform } from 'node:stream'; +import { fileConfersionHandler } from '../utils/fileConversionHandler.js'; +import { LINE_SEPARATOR } from '../constants.js'; + + +const EXTRA_HEADER_NAME = 'Extra'; +const INDENT = 2; + +const getCsvToJsonTransformSteam = () => { + let transformBuffer = ''; + let headers = []; + let isFirstDataRow = true; + + const getDataPrefix = () => { + let prefix = `,\n${' '.repeat(INDENT)}`; + if (isFirstDataRow) { + prefix = ' '.repeat(INDENT); + isFirstDataRow = false; + } + return prefix; + } + + const transformDataToJsonString = (data) => { + const obj = {}; + let unknownHeaderCounter = 1; + data.forEach((d, i) => { + let header; + if (headers[i]) { + header = headers[i]; + } else { + header = `${EXTRA_HEADER_NAME}${unknownHeaderCounter}`; + unknownHeaderCounter++; + } + obj[header] = d; + }); + return JSON.stringify(obj); + } + + return new Transform({ + transform(chunk, _, callback) { + transformBuffer += chunk; + + const lines = transformBuffer.split(LINE_SEPARATOR); + transformBuffer = lines.pop(); + + lines.forEach((line) => { + if (!line.trim()) { + return; + } + + const parsedLineData = line.split(','); + + if (!headers.length) { + headers = parsedLineData.map(h => h.trim()); + this.push('[\n'); + } else { + this.push(`${getDataPrefix()}${transformDataToJsonString(parsedLineData)}`); + } + }); + + callback(); + }, + flush(callback) { + if (!transformBuffer.trim()) { + this.push('\n]'); + return callback(); + } + + const parsedLineData = transformBuffer.split(','); + this.push(`${getDataPrefix()}${transformDataToJsonString(parsedLineData)}\n]\n`); + + transformBuffer = ''; + callback(); + } + }); +} + +export const csvToJsonHandler = async (args) => { + const transformStream = getCsvToJsonTransformSteam(); + return fileConfersionHandler(args, '.csv', '.json', transformStream); +} diff --git a/src/commands/decrypt.js b/src/commands/decrypt.js index e69de29..de02b7b 100644 --- a/src/commands/decrypt.js +++ b/src/commands/decrypt.js @@ -0,0 +1,53 @@ +import { createDecipheriv, scrypt } from 'node:crypto'; +import { createReadStream, createWriteStream } from 'node:fs'; +import { open, stat } from 'node:fs/promises'; +import { pipeline } from 'node:stream/promises'; +import { promisify } from 'node:util'; +import { SALT_SIZE, IV_SIZE, AUTH_TAG_SIZE, KEY_SIZE, ENCRYPTION_ALGORITHM } from '../constants.js'; +import { argParser } from '../utils/argParser.js'; + + +const getDecryptEntities = async (inputFilePath, size) => { + const handle = await open(inputFilePath, 'r'); + + const salt = Buffer.alloc(SALT_SIZE); + const iv = Buffer.alloc(IV_SIZE); + const authTag = Buffer.alloc(AUTH_TAG_SIZE); + + try { + await handle.read(salt, 0, SALT_SIZE, 0); + await handle.read(iv, 0, IV_SIZE, SALT_SIZE); + await handle.read(authTag, 0, AUTH_TAG_SIZE, size - AUTH_TAG_SIZE); + } finally { + if (handle) await handle.close(); + } + + return { + salt, + iv, + authTag, + } +} + +export const decryptHandler = async (args) => { + const parsedArgs = argParser(args, { + input: { type: 'path', required: true }, + output: { type: 'path', required: true }, + password: { type: 'string', required: true }, + }); + + const { size } = await stat(parsedArgs.input); + const { salt, iv, authTag } = await getDecryptEntities(parsedArgs.input, size); + + const key = await promisify(scrypt)(parsedArgs.password, salt, KEY_SIZE); + const decipher = createDecipheriv(ENCRYPTION_ALGORITHM, key, iv); + decipher.setAuthTag(authTag); + + const input = createReadStream(parsedArgs.input, { + start: SALT_SIZE + IV_SIZE, + end: size - AUTH_TAG_SIZE - 1 + }); + const output = createWriteStream(parsedArgs.output); + + await pipeline(input, decipher, output); +}; \ No newline at end of file diff --git a/src/commands/encrypt.js b/src/commands/encrypt.js index e69de29..0cd45fd 100644 --- a/src/commands/encrypt.js +++ b/src/commands/encrypt.js @@ -0,0 +1,32 @@ +import { createCipheriv, scrypt, randomBytes } from 'node:crypto'; +import { createReadStream, createWriteStream } from 'node:fs'; +import { pipeline, finished } from 'node:stream/promises'; +import { promisify } from 'node:util'; +import { ENCRYPTION_ALGORITHM, IV_SIZE, KEY_SIZE, SALT_SIZE } from '../constants.js'; +import { argParser } from '../utils/argParser.js' + +export const encryptHandler = async (args) => { + const parsedArgs = argParser(args, { + input: { type: 'path', required: true }, + output: { type: 'path', required: true }, + password: { type: 'string', required: true }, + }); + + const salt = randomBytes(SALT_SIZE); + const iv = randomBytes(IV_SIZE); + const key = await promisify(scrypt)(parsedArgs.password, salt, KEY_SIZE); + const cipher = createCipheriv(ENCRYPTION_ALGORITHM, key, iv); + + const output = createWriteStream(parsedArgs.output); + + output.write(salt); + output.write(iv); + + await pipeline(createReadStream(parsedArgs.input), cipher, output, { end: false }); + + const authTag = cipher.getAuthTag(); + output.write(authTag); + output.end(); + + await finished(output); +} \ No newline at end of file diff --git a/src/commands/hash.js b/src/commands/hash.js index e69de29..eec3b5d 100644 --- a/src/commands/hash.js +++ b/src/commands/hash.js @@ -0,0 +1,34 @@ +import { createHash } from 'node:crypto'; +import { pipeline } from 'node:stream/promises'; +import { Readable } from 'node:stream'; +import { createReadStream, createWriteStream } from 'node:fs'; +import { argParser } from '../utils/argParser.js'; +import { SUPPORTED_ALGORYTHMS } from '../constants.js'; + + +export const hashHandler = async (args) => { + const parsedArgs = argParser(args, { + input: { type: 'path', required: true }, + algorithm: { type: 'string', default: 'sha256' }, + save: { type: 'boolean', default: false }, + }); + + if (!SUPPORTED_ALGORYTHMS.includes(parsedArgs.algorithm)) { + throw new Error('Algorithm is not supported'); + } + + const hash = createHash(parsedArgs.algorithm); + const readableStream = createReadStream(parsedArgs.input); + + await pipeline(readableStream, hash); + + const hashedValue = hash.digest('hex'); + + if (parsedArgs.save) { + const outputPath = `${parsedArgs.input}.${parsedArgs.algorithm}`; + + await pipeline(Readable.from(hashedValue), createWriteStream(outputPath)); + } + + console.log(`${parsedArgs.algorithm}: ${hashedValue}`); +} \ No newline at end of file diff --git a/src/commands/hashCompare.js b/src/commands/hashCompare.js index e69de29..6b050f2 100644 --- a/src/commands/hashCompare.js +++ b/src/commands/hashCompare.js @@ -0,0 +1,35 @@ +import { createHash } from 'node:crypto'; +import { pipeline } from 'node:stream/promises'; +import { readFile } from 'node:fs/promises'; +import { createReadStream } from 'node:fs'; +import { argParser } from '../utils/argParser.js'; +import { SUPPORTED_ALGORYTHMS } from '../constants.js'; +import { validateFileExtention } from '../utils/validateFileExtention.js'; + + +export const hashCompareHandler = async (args) => { + const parsedArgs = argParser(args, { + input: { type: 'path', required: true }, + hash: { type: 'path', required: true }, + algorithm: { type: 'string', default: 'sha256' }, + }); + + if (!SUPPORTED_ALGORYTHMS.includes(parsedArgs.algorithm)) { + throw new Error('Algorithm is not supported'); + } + + validateFileExtention(parsedArgs.hash, `.${parsedArgs.algorithm.toLowerCase()}`); + + const hash = createHash(parsedArgs.algorithm); + const readableStream = createReadStream(parsedArgs.input); + + await pipeline(readableStream, hash); + + const hashedValue = hash.digest('hex'); + + const savedHash = await readFile(parsedArgs.hash, { encoding: 'utf-8' }); + + const isValid = hashedValue === savedHash.trim().toLowerCase(); + + console.log(isValid ? 'OK' : ' MISMATCH'); +} \ No newline at end of file diff --git a/src/commands/jsonToCsv.js b/src/commands/jsonToCsv.js index e69de29..a15b970 100644 --- a/src/commands/jsonToCsv.js +++ b/src/commands/jsonToCsv.js @@ -0,0 +1,39 @@ +import { Transform } from 'node:stream'; +import { fileConfersionHandler } from '../utils/fileConversionHandler.js'; + + +const getJsonToCsvTransformSteam = () => { + let jsonStringBuffer = ''; + + return new Transform({ + transform(chunk, _, callback) { + jsonStringBuffer += chunk; + callback(); + }, + flush(callback) { + const jsonData = JSON.parse(jsonStringBuffer); + jsonStringBuffer = ''; + if (!Array.isArray(jsonData)) { + throw new Error('Invalid json type'); + } + + const headerSet = new Set(); + jsonData.map((row) => Object.keys(row).forEach(h => headerSet.add(h))); + const headers = Array.from(headerSet); + + this.push(`${headers.join(',')}\n`) + + jsonData.forEach(row => { + const valuesInHeadersOrder = headers.map((header) => row[header] || ''); + this.push(`${valuesInHeadersOrder.join(',')}\n`); + }) + + callback(); + } + }); +} + +export const jsonToCsvHandler = async (args) => { + const transformStream = getJsonToCsvTransformSteam(); + return fileConfersionHandler(args, '.json', '.csv', transformStream); +} diff --git a/src/commands/logStats.js b/src/commands/logStats.js index e69de29..9135847 100644 --- a/src/commands/logStats.js +++ b/src/commands/logStats.js @@ -0,0 +1,70 @@ +import { stat } from 'node:fs/promises'; +import { availableParallelism } from 'node:os'; +import { Worker } from 'node:worker_threads'; +import { createWriteStream } from 'node:fs'; +import { Readable } from 'node:stream'; +import { pipeline } from 'node:stream/promises'; +import { argParser } from '../utils/argParser.js'; +import { INITIAL_LOG_STATS, mergeStats } from '../utils/mergeLogStats.js'; +import { validateFileExtention } from '../utils/validateFileExtention.js'; + + +const runWorker = (data) => { + const workerPath = new URL('../workers/logWorker.js', import.meta.url); + + return new Promise((resolve, reject) => { + const worker = new Worker(workerPath); + worker.on('message', (data) => { + resolve(data); + worker.terminate(); + }); + worker.on('error', (error) => { + reject(error); + worker.terminate(); + }); + worker.on('exit', (code) => { + if (code !== 0) reject(new Error(`Worker stopped with exit code ${code}`)); + }); + + worker.postMessage(data); + }); +} + +export const logStatsHandler = async (args) => { + const parsedArgs = argParser(args, { + input: { type: 'path', required: true }, + output: { type: 'path', required: true }, + }); + + validateFileExtention(parsedArgs.input, ['.txt', '.log']); + + const maxWorkersAvailable = availableParallelism(); + const inputFileStats = await stat(parsedArgs.input); + const inputFileBitesLength = inputFileStats.size; + + const bytesPerWorker = Math.floor(inputFileBitesLength / maxWorkersAvailable); + const positionsToReadByWorker = Array.from({ length: maxWorkersAvailable }).map((_, i) => ({ + filePath: parsedArgs.input, + start: i * bytesPerWorker, + end: (i === (maxWorkersAvailable - 1)) ? inputFileBitesLength : i * bytesPerWorker + bytesPerWorker, + })); + + const workerChunkStats = await Promise.all(positionsToReadByWorker.map(runWorker)); + const mergedStats = workerChunkStats.reduce(mergeStats, INITIAL_LOG_STATS); + + // Need just 2 most popular + const topPaths = Object.entries(mergedStats.paths) + .sort(([, a], [, b]) => b - a) + .slice(0, 2) + .map(([path, count]) => ({ path, count })) + + const mergedFormattedStats = { + total: mergedStats.total, + levels: mergedStats.levels, + status: mergedStats.status, + topPaths, + avgResponseTimeMs: mergedStats.avgResponseTimeMs.toFixed(2), + } + + await pipeline(Readable.from(JSON.stringify(mergedFormattedStats, null, 2)), createWriteStream(parsedArgs.output, 'utf-8')); +} \ No newline at end of file diff --git a/src/constants.js b/src/constants.js new file mode 100644 index 0000000..800cdea --- /dev/null +++ b/src/constants.js @@ -0,0 +1,21 @@ + +export const SPACE_SEPARATOR = /\s+/; +export const LINE_SEPARATOR = /\r?\n/; +export const WORD_SEPARATOR = /\s+/; + +export const SUPPORTED_ALGORYTHMS = ['sha256', 'md5', 'sha512']; + +export const INVALID_INPUT_ERROR_CODE = 'INVALID_INPUT'; + +export const ANSI_COLORS = { + red: '\x1b[31m', + green: '\x1b[32m', +}; + +export const ANSI_COLOR_RESET = '\x1b[0m'; + +export const ENCRYPTION_ALGORITHM = 'AES-256-GCM'; +export const SALT_SIZE = 16; +export const IV_SIZE = 12; +export const AUTH_TAG_SIZE = 16; +export const KEY_SIZE = 32; \ No newline at end of file diff --git a/src/cwdState.js b/src/cwdState.js new file mode 100644 index 0000000..d45ab00 --- /dev/null +++ b/src/cwdState.js @@ -0,0 +1,18 @@ +import os from 'node:os'; +import { InvalidInputError } from './utils/errors.js'; + + +export const initialCwd = os.homedir(); +let cwd = initialCwd; + +export const getCwd = () => { + return cwd; +} + +// dir - absolute path +export const chCwd = (dir) => { + if (!dir.startsWith(initialCwd)) { + throw new InvalidInputError('Cannot go higher than initial directory'); + } + cwd = dir; +} \ No newline at end of file diff --git a/src/main.js b/src/main.js index e69de29..6490aaf 100644 --- a/src/main.js +++ b/src/main.js @@ -0,0 +1,8 @@ +import { initRepl } from './repl.js'; + + +const init = () => { + initRepl(); +} + +init(); diff --git a/src/navigation.js b/src/navigation.js index e69de29..4bc0c5b 100644 --- a/src/navigation.js +++ b/src/navigation.js @@ -0,0 +1,56 @@ +import fs from 'node:fs/promises'; +import { getCwd, chCwd, initialCwd } from './cwdState.js'; +import { InvalidInputError } from './utils/errors.js'; +import { pathResolver } from './utils/pathResolver.js'; + + +export const upHandler = () => { + if (getCwd() !== initialCwd) { + const newDirectory = pathResolver(`${getCwd()}/..`); + chCwd(newDirectory); + } +} + +export const cdHandler = async (args) => { + const pathToDirectory = args[0]; + if (!pathToDirectory) { + throw new InvalidInputError('No path/to/directory'); + } + const newDirPath = pathResolver(pathToDirectory); + + const stats = await fs.stat(newDirPath); + + if (stats.isDirectory()) { + chCwd(newDirPath) + } else { + throw new InvalidInputError('Path is not a directory'); + } +} + +export const lsHandler = async () => { + const dirEntities = await fs.readdir(getCwd(), { withFileTypes: true }); + + const folders = []; + const files = []; + + let maxEntitName = 0; + + dirEntities.forEach((dirEnt) => { + maxEntitName = Math.max(maxEntitName, dirEnt.name.length); + + if (dirEnt.isDirectory()) { + folders.push(dirEnt.name); + } else if (dirEnt.isFile()) { + files.push(dirEnt.name); + } + }); + + const formatEntityName = (entitiName) => `${entitiName}${' '.repeat(maxEntitName - entitiName.length)}` + + folders.sort((a, b) => a.localeCompare(b)).forEach((ent) => { + console.log(`${formatEntityName(ent)} [folder]`) + }); + files.sort((a, b) => a.localeCompare(b)).forEach((ent) => { + console.log(`${formatEntityName(ent)} [file]`) + }); +} diff --git a/src/repl.js b/src/repl.js index e69de29..0472fc8 100644 --- a/src/repl.js +++ b/src/repl.js @@ -0,0 +1,75 @@ + + +import readline from 'node:readline'; +import process from 'node:process'; +import { getCwd } from './cwdState.js'; +import { InvalidInputError } from './utils/errors.js'; +import { COMMAND_HANDLERS_MAP } from './commands.js'; +import { ANSI_COLORS, ANSI_COLOR_RESET, INVALID_INPUT_ERROR_CODE, SPACE_SEPARATOR } from './constants.js'; + + +const WELCOME_TEXT = 'Welcome to Data Processing CLI!'; +const EXIT_TEXT = 'Thank you for using Data Processing CLI!'; +const INVALID_COMMAND_TEXT = 'Invalid input'; +const OPERATION_FAILED_TEXT = 'Operation failed'; +const CURRENT_DIRECTORY_PREFIX = 'You are currently in'; + +const onSuccess = () => { + console.log(`${ANSI_COLORS.green}${CURRENT_DIRECTORY_PREFIX} ${getCwd()}${ANSI_COLOR_RESET}`); +} + +const onError = (err) => { + if (err.code === INVALID_INPUT_ERROR_CODE) { + console.log(`${ANSI_COLORS.red}${INVALID_COMMAND_TEXT}${ANSI_COLOR_RESET}`); + } else { + console.log(`${ANSI_COLORS.red}${OPERATION_FAILED_TEXT}${ANSI_COLOR_RESET}`); + } +} + +const handleCommand = async (command, commandArgs) => { + if (command in COMMAND_HANDLERS_MAP) { + try { + await COMMAND_HANDLERS_MAP[command](commandArgs); + onSuccess() + } catch (err) { + onError(err) + } + + } else { + onError(new InvalidInputError('Invalid command')); + } +} + +export const initRepl = () => { + const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout, + prompt: '> ' + }); + + rl.on('line', async (line) => { + const lineTrimmed = line.trim(); + const [command, ...commandArgs] = lineTrimmed.split(SPACE_SEPARATOR); + + if (command === '.exit') { + rl.close() + } else { + await handleCommand(command, commandArgs); + rl.prompt(); + } + }); + + rl.on('SIGINT', () => { + rl.close(); + }); + + rl.on('close', () => { + console.log(EXIT_TEXT) + process.exit(0); + }); + + console.log(WELCOME_TEXT); + // Initial display of directory + onSuccess(); + rl.prompt(); +} diff --git a/src/utils/argParser.js b/src/utils/argParser.js index e69de29..53e10e9 100644 --- a/src/utils/argParser.js +++ b/src/utils/argParser.js @@ -0,0 +1,48 @@ +import { parseArgs } from 'node:util'; +import { pathResolver } from './pathResolver.js'; +import { InvalidInputError } from './errors.js'; + + +const formatArgValue = (type, value) => { + switch (type) { + case 'path': { + return pathResolver(value); + } + default: + return value; + } +} + +export const argParser = (args, options) => { + const argOptions = Object.keys(options) + .reduce((acc, key) => { + return { + ...acc, + [key]: { + type: options[key].type === 'boolean' ? 'boolean' : 'string', + default: options[key].default + } + } + }, {}); + + let parsedArgs; + + try { + parsedArgs = parseArgs({ args, options: argOptions }).values; + } catch (err) { + throw new InvalidInputError(err); + } + + const parsedArgsKeys = Object.keys(parsedArgs); + + if (Object.keys(options).some((key) => (options[key].required && parsedArgs[key] === undefined))) { + throw new InvalidInputError('Invalid input'); + } + + return parsedArgsKeys.reduce((acc, key) => { + return { + ...acc, + [key]: formatArgValue(options[key].type, parsedArgs[key]) + } + }, {}) +} diff --git a/src/utils/errors.js b/src/utils/errors.js new file mode 100644 index 0000000..cebad98 --- /dev/null +++ b/src/utils/errors.js @@ -0,0 +1,12 @@ +import { INVALID_INPUT_ERROR_CODE } from '../constants.js'; + + +export class InvalidInputError extends Error { + constructor(message) { + super(message); + this.name = 'InvalidInputError'; + this.code = INVALID_INPUT_ERROR_CODE; + + Error.captureStackTrace(this, this.constructor); + } +} diff --git a/src/utils/fileConversionHandler.js b/src/utils/fileConversionHandler.js new file mode 100644 index 0000000..43a5ace --- /dev/null +++ b/src/utils/fileConversionHandler.js @@ -0,0 +1,20 @@ +import { pipeline } from 'node:stream/promises'; +import { createReadStream, createWriteStream } from 'node:fs'; +import { argParser } from '../utils/argParser.js'; +import { validateFileExtention } from './validateFileExtention.js'; + + +export const fileConfersionHandler = async (args, inputExt, outputExt, transformStream) => { + const parsedArgs = argParser(args, { + input: { type: 'path', required: true }, + output: { type: 'path', required: true }, + }); + + validateFileExtention(parsedArgs.input, inputExt); + validateFileExtention(parsedArgs.output, outputExt); + + const inputStream = createReadStream(parsedArgs.input, { encoding: 'utf-8' }); + const outputStream = createWriteStream(parsedArgs.output, { encoding: 'utf-8' }); + + await pipeline(inputStream, transformStream, outputStream); +}; \ No newline at end of file diff --git a/src/utils/mergeLogStats.js b/src/utils/mergeLogStats.js new file mode 100644 index 0000000..dbcf4e6 --- /dev/null +++ b/src/utils/mergeLogStats.js @@ -0,0 +1,30 @@ +export const INITIAL_LOG_STATS = { + total: 0, + levels: { INFO: 0, WARN: 0, ERROR: 0 }, + status: { '2xx': 0, '3xx': 0, '4xx': 0, '5xx': 0 }, + paths: {}, + avgResponseTimeMs: 0 +}; + +export const mergeStats = (stat1, stat2) => { + const combinesStats = structuredClone(INITIAL_LOG_STATS); + + combinesStats.total = stat1.total + stat2.total; + combinesStats.avgResponseTimeMs = (stat1.avgResponseTimeMs + stat2.avgResponseTimeMs) / 2 + + Object.keys(combinesStats.levels).forEach((l) => { + combinesStats.levels[l] = stat1.levels[l] + stat2.levels[l] + }) + + Object.keys(combinesStats.status).forEach((s) => { + combinesStats.status[s] = stat1.status[s] + stat2.status[s] + }); + + const pathsSet = new Set([...Object.keys(stat1.paths), ...Object.keys(stat2.paths)]); + + Array.from(pathsSet).forEach((p) => { + combinesStats.paths[p] = (stat1.paths[p] || 0) + (stat2.paths[p] || 0) + }); + + return combinesStats; +} diff --git a/src/utils/pathResolver.js b/src/utils/pathResolver.js index e69de29..17437ba 100644 --- a/src/utils/pathResolver.js +++ b/src/utils/pathResolver.js @@ -0,0 +1,7 @@ +import path from 'node:path'; +import { getCwd } from '../cwdState.js'; + + +export const pathResolver = (pathArg) => { + return path.resolve(getCwd(), pathArg); +} \ No newline at end of file diff --git a/src/utils/validateFileExtention.js b/src/utils/validateFileExtention.js new file mode 100644 index 0000000..fa49396 --- /dev/null +++ b/src/utils/validateFileExtention.js @@ -0,0 +1,11 @@ +import { extname } from 'node:path'; +import { InvalidInputError } from './errors.js'; + + +export const validateFileExtention = (fileName, ext) => { + const allowedExt = Array.isArray(ext) ? ext.map((e) => e.toLowerCase()) : [ext.toLowerCase()]; + const fileExt = extname(fileName).toLowerCase(); + if (!allowedExt.includes(fileExt)) { + throw new InvalidInputError('Invalid file extention'); + } +} diff --git a/src/workers/logWorker.js b/src/workers/logWorker.js index e69de29..7d691ad 100644 --- a/src/workers/logWorker.js +++ b/src/workers/logWorker.js @@ -0,0 +1,94 @@ +import { parentPort } from 'worker_threads'; +import { createReadStream } from 'node:fs'; +import { LINE_SEPARATOR, SPACE_SEPARATOR } from '../constants.js'; +import { mergeStats, INITIAL_LOG_STATS } from '../utils/mergeLogStats.js'; + + +let readBufferLength = 0; +let isBufferRead = false; + +const getBuffersToProcess = (chunk, maxBufferLength) => { + const newReadBufferLength = readBufferLength + chunk.length; + + let chunkWithinRange; + let chunkToFinishLastLine; + + if (newReadBufferLength < maxBufferLength) { + chunkWithinRange = chunk; + readBufferLength = newReadBufferLength; + } else { + if (!isBufferRead) { + const requiredLenghtFromChunk = maxBufferLength - readBufferLength; + chunkWithinRange = chunk.subarray(0, requiredLenghtFromChunk); + readBufferLength += requiredLenghtFromChunk; + chunkToFinishLastLine = chunk.subarray(requiredLenghtFromChunk); + + isBufferRead = true; + } else { + chunkToFinishLastLine = chunk; + } + } + + return [chunkWithinRange, chunkToFinishLastLine]; +} + +const processLine = (stats, line) => { + const [, level, , statusCode, responseTime, , path] = line.split(SPACE_SEPARATOR); + + const lineStat = structuredClone(INITIAL_LOG_STATS); + lineStat.total++; + lineStat.levels[level]++; + lineStat.status[`${statusCode.toString().charAt(0)}xx`]++; + lineStat.avgResponseTimeMs = Number(responseTime); + lineStat.paths = { [path]: 1 } + + return mergeStats(stats, lineStat); +} + + +parentPort.on('message', async ({ filePath, start, end }) => { + let stats = INITIAL_LOG_STATS; + + const readableStream = createReadStream(filePath, { start }); + const maBufferLength = end - start; + let shouldExcluseFirstLine = start !== 0; + let buffer = ''; + + for await (const chunk of readableStream) { + const [chunkWithinRange, chunkToFinishLastLine] = getBuffersToProcess(chunk, maBufferLength); + + if (chunkWithinRange) { + buffer += chunkWithinRange.toString('utf-8'); + let lines = buffer.split(LINE_SEPARATOR); + buffer = lines.pop(); + + // We don't count first line if not first part worker + if (shouldExcluseFirstLine) { + lines.shift(); + shouldExcluseFirstLine = false + } + + lines.forEach(l => { + stats = processLine(stats, l); + }); + } + + if (chunkToFinishLastLine) { + const lastPieceBuffer = buffer + chunkToFinishLastLine.toString('utf-8'); + let lines = lastPieceBuffer.split(LINE_SEPARATOR); + buffer = lines.shift(); + + // If we no lines left - we din't have last new line in buffer + if (lines.length !== 0) { + break; + } + } + } + + // Processing of last line + if (buffer.length) { + stats = processLine(stats, buffer) + } + + parentPort.postMessage(stats); +});