diff --git a/packages/core/package.json b/packages/core/package.json index c684220..ec4a309 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -37,7 +37,7 @@ "ini": "^2.0.0", "js-yaml": "^4.0.0", "ramda": "^0.27.1", - "rxjs": "^7.3.0" + "strict-callbag-basics": "^0.24.2" }, "gitHead": "a5aa6a6e10b62a957de11c4bc55ac4d3ee9a2ece" } diff --git a/packages/core/src/generate.ts b/packages/core/src/generate.ts index 3d395ad..9c455b1 100644 --- a/packages/core/src/generate.ts +++ b/packages/core/src/generate.ts @@ -2,16 +2,15 @@ import * as fs from "fs"; import * as Ini from "ini"; import * as Yaml from "js-yaml"; import * as path from "path"; -import * as Rx from "rxjs"; -import * as RxOp from "rxjs/operators"; -import { configs$, configsToFiles } from "./internal/config"; +import { configs, configsToFiles } from "./internal/config"; import { calculatePatch, executePatch, IInputContents, toFileTree, } from "./internal/fileTrees"; -import { files$ } from "./internal/fs"; +import { filesSource } from "./internal/fs"; +import * as CB from "strict-callbag-basics"; export interface IGenerateOpts { context: any; @@ -40,30 +39,37 @@ export function generate( fs.mkdirSync(outDir); } - const inputConfigs$ = configs$(inputDir, context, formats, format, ignore); - const inputConfigsArray$ = inputConfigs$.pipe(RxOp.toArray()); - const outputFT$ = files$(outDir).pipe(toFileTree(outDir)); - const inputFT$ = inputConfigs$.pipe(configsToFiles(), toFileTree(inputDir)); - - return Rx.zip(inputConfigsArray$, inputFT$, outputFT$) - .pipe( - RxOp.flatMap(([configs, inputFT, outputFT]) => { - const contents = configs.reduce((acc, c) => { - acc[c.file] = c.contents; - return acc; - }, {} as any) as IInputContents; - - const patch = calculatePatch(inputFT, outputFT, { - contents, - outDir, - }); - return Rx.from(patch).pipe(executePatch(contents, outDir)); - }), - ) - .toPromise() - .finally(() => { - process.chdir(startDir); - }); + const inputConfigs = CB.share( + configs(inputDir, context, formats, format, ignore), + ); + const inputConfigsArray = CB.toArray(inputConfigs); + const outputFT = CB.pipe(filesSource(outDir), toFileTree(outDir)); + const inputFT = CB.pipe(inputConfigs, configsToFiles(), toFileTree(inputDir)); + + return CB.pipe( + inputConfigsArray, + + CB.zip(inputFT), + CB.zip(outputFT), + + CB.chain(([[configs, inputFT], outputFT]) => { + const contents = configs.reduce((acc, c) => { + acc[c.file] = c.contents; + return acc; + }, {} as any) as IInputContents; + + const patch = calculatePatch(inputFT, outputFT, { + contents, + outDir, + }); + + return CB.pipe(CB.fromIter(patch), executePatch(contents, outDir)); + }), + + CB.run_, + ).finally(() => { + process.chdir(startDir); + }); } export interface IFormat { diff --git a/packages/core/src/internal/config.ts b/packages/core/src/internal/config.ts index 0db2d68..ba7527f 100644 --- a/packages/core/src/internal/config.ts +++ b/packages/core/src/internal/config.ts @@ -1,10 +1,9 @@ -import * as Rx from "rxjs"; -import * as RxOp from "rxjs/operators"; +import * as Fs from "fs"; import * as path from "path"; import * as R from "ramda"; -import { files$ } from "./fs"; +import * as CB from "strict-callbag-basics"; import { IFormat } from "../generate"; -import { promises as fsp } from "fs"; +import { filesSource } from "./fs"; export interface IConfig extends Readonly<{ @@ -15,23 +14,24 @@ export interface IConfig /** * Streams a list of configuration files from a source directory. */ -export function configs$( +export function configs( dir: string, context: any, formats: Map, format: string, ignore?: string, -): Rx.Observable { - return files$(dir, ignore).pipe( - RxOp.flatMap((file) => { - if ([".js", ".ts"].includes(path.extname(file))) { - return Rx.of(file).pipe( - resolveConfigFromExports(dir, context, formats, format), - ); - } - - return Rx.of(file).pipe(resolveConfigFromContents(dir)); - }), +): CB.Source { + return CB.pipe( + filesSource(dir, ignore), + CB.groupBy((file) => [".js", ".ts"].includes(path.extname(file))), + CB.chainPar(([source, isScript]) => + isScript + ? CB.pipe( + source, + resolveConfigFromExports(dir, context, formats, format), + ) + : CB.pipe(source, resolveConfigFromContents(dir)), + ), ); } @@ -40,19 +40,20 @@ export function configs$( */ export const resolveConfigFromExports = (dir: string, context: any, formats: Map, format: string) => - (input$: Rx.Observable) => - input$.pipe( + (inputSource: CB.Source): CB.Source => + CB.pipe( + inputSource, // For each file, require() it and load its exports - RxOp.map((file) => ({ + CB.map((file) => ({ file, exports: resolveFile(file), })), // We only want files with a "default" export - RxOp.filter(R.hasPath(["exports", "default"])), + CB.filter(R.hasPath(["exports", "default"])), // Remove file extensions and de-nest "default" exports - RxOp.map(({ file, exports }) => ({ + CB.map(({ file, exports }) => ({ relativePath: R.pipe( R.split("."), R.remove(-1, 1), @@ -62,7 +63,7 @@ export const resolveConfigFromExports = })), // If we have an "index" file, don't create a directory for it - RxOp.map( + CB.map( R.when( R.pipe( (s: { relativePath: string; exports: any }) => s, @@ -75,9 +76,13 @@ export const resolveConfigFromExports = ), // Map functions / promises to the actual configuration - RxOp.flatMap(({ relativePath, exports }) => - Rx.from(resolveContents(context, exports)).pipe( - RxOp.map((contents) => ({ + CB.chainPar(({ relativePath, exports }) => + CB.pipe( + CB.fromPromise_( + () => resolveContents(context, exports), + (e) => e, + ), + CB.map((contents) => ({ relativePath, contents, })), @@ -86,10 +91,9 @@ export const resolveConfigFromExports = // For each key in the configuration create a file with the correct // extension for the format. - RxOp.flatMap(({ relativePath, contents }) => - Rx.from(Object.keys(contents)).pipe( - // Determine the format for the file - RxOp.map((file) => { + CB.chain(({ relativePath, contents }) => + CB.fromIter( + Object.keys(contents).map((file) => { const fileContents = contents[file]; let formatOverride = fileFormat(formats)(file); return formatOverride @@ -109,7 +113,7 @@ export const resolveConfigFromExports = // Map functions / promises for file contents, then encode it to the correct // format. - RxOp.map(({ file, format, contents }) => ({ + CB.map(({ file, format, contents }) => ({ file, contents: encodeContents(formats, format, contents), })), @@ -119,12 +123,15 @@ export const resolveConfigFromExports = * Streams configuration files from non-module files. */ export const resolveConfigFromContents = - (dir: string) => (input$: Rx.Observable) => - input$.pipe( + (dir: string) => + (inputSource: CB.Source): CB.Source => + CB.pipe( + inputSource, // Load contents from file - RxOp.flatMap((file) => - Rx.from(fsp.readFile(file)).pipe( - RxOp.map((contents) => ({ + CB.chainPar((file) => + CB.pipe( + CB.fromCallback((cb) => Fs.readFile(file, cb)), + CB.map((contents) => ({ file, contents, })), @@ -132,7 +139,7 @@ export const resolveConfigFromContents = ), // Make file path relative - RxOp.map(({ file, contents }) => ({ + CB.map(({ file, contents }) => ({ file: path.relative(dir, file), contents, })), @@ -142,8 +149,8 @@ export const resolveConfigFromContents = * Transforms config objects to file paths */ export function configsToFiles() { - return (input$: Rx.Observable) => - input$.pipe(RxOp.map((config) => config.file)); + return (inputSource: CB.Source) => + CB.map_(inputSource, (c) => c.file); } function resolveFile(file: string) { diff --git a/packages/core/src/internal/fileTrees.ts b/packages/core/src/internal/fileTrees.ts index 970e79e..04f772f 100644 --- a/packages/core/src/internal/fileTrees.ts +++ b/packages/core/src/internal/fileTrees.ts @@ -1,19 +1,17 @@ -import * as Rx from "rxjs"; -import * as RxOp from "rxjs/operators"; +import * as Fs from "fs"; import { Operation } from "fs-tree-diff"; -import * as fs from "fs"; -import { promises as fsp } from "fs"; import * as path from "path"; -import { bufferUntil } from "./operators"; +import * as CB from "strict-callbag-basics"; import FSTree = require("fs-tree-diff"); export function toFileTree(dir: string) { - return (input$: Rx.Observable) => - input$.pipe( - RxOp.map((file) => path.relative(dir, file)), - RxOp.toArray(), - RxOp.map((files) => FSTree.fromPaths(files, { sortAndExpand: true })), + return (inputSource: CB.Source) => + CB.pipe( + inputSource, + CB.map((file) => path.relative(dir, file)), + CB.toArray, + CB.map((files) => FSTree.fromPaths(files, { sortAndExpand: true })), ); } @@ -36,35 +34,57 @@ export function calculatePatch( typeof contents[a.relativePath] === "string" ? Buffer.from(contents[a.relativePath]) : (contents[a.relativePath] as Buffer); - const bContent = fs.readFileSync(path.join(outDir, b.relativePath)); + const bContent = Fs.readFileSync(path.join(outDir, b.relativePath)); return aContent.compare(bContent) === 0; }); } +export type ExecutePatchError = { + _tag: "fs"; + op: string; + cause: unknown; +}; + +const runFs = (op: string, f: (cb: CB.Callback) => void) => + CB.pipe( + CB.fromCallback(f), + CB.mapError( + (cause): ExecutePatchError => ({ + _tag: "fs", + op, + cause, + }), + ), + ); + export function executePatch(contents: IInputContents, outDir: string) { - return (input$: Rx.Observable) => - input$.pipe( - bufferUntil(([op]) => op === "mkdir" || op === "rmdir"), - RxOp.concatMap((ops) => - Rx.from(ops).pipe( - RxOp.mergeMap(([op, file, _entry]) => { + return (source: CB.Source) => + CB.pipe( + source, + CB.batchUntil(([op]) => op === "mkdir" || op === "rmdir", true), + CB.chain((ops) => + CB.chainPar_( + CB.fromIter(ops), + ([op, file, _entry]): CB.Source => { const path = `${outDir}/${file}`; console.log(op.toUpperCase(), file); switch (op) { case "mkdir": - return Rx.from(fsp.mkdir(path)); + return runFs(op, (cb) => Fs.mkdir(path, cb)); case "rmdir": - return Rx.from(fsp.rmdir(path)); + return runFs(op, (cb) => Fs.rmdir(path, cb)); case "change": case "create": - return Rx.from(fsp.writeFile(path, contents[file])); + return runFs(op, (cb) => + Fs.writeFile(path, contents[file], cb), + ); case "unlink": - return Rx.from(fsp.unlink(path)); + return runFs(op, (cb) => Fs.unlink(path, cb)); } - }), + }, ), ), ); diff --git a/packages/core/src/internal/fs.ts b/packages/core/src/internal/fs.ts index 263604c..3c84a59 100644 --- a/packages/core/src/internal/fs.ts +++ b/packages/core/src/internal/fs.ts @@ -1,20 +1,29 @@ -import * as Rx from "rxjs"; -import * as RxOp from "rxjs/operators"; -import { promises as fs } from "fs"; +import * as FS from "fs"; +import * as CB from "strict-callbag-basics"; const glob = require("glob-to-regexp"); -export function files$(dir: string, ignore?: string): Rx.Observable { +export type FilesError = { _tag: "readdir"; cause: unknown }; + +export function filesSource( + dir: string, + ignore?: string, +): CB.Source { const ignoreRegExp = ignore ? glob(ignore) : undefined; - return Rx.from(fs.readdir(dir, { withFileTypes: true })).pipe( - RxOp.flatMap((f) => f), - RxOp.filter((f) => !f.name.startsWith(".")), - RxOp.filter((f) => (ignoreRegExp ? !ignoreRegExp.test(f.name) : true)), - RxOp.flatMap((f) => + return CB.pipe( + CB.fromCallback((cb) => + FS.readdir(dir, { withFileTypes: true }, cb), + ), + CB.mapError((cause): FilesError => ({ _tag: "readdir", cause })), + + CB.chain((f) => CB.fromIter(f)), + CB.filter((f) => !f.name.startsWith(".")), + CB.filter((f) => (ignoreRegExp ? !ignoreRegExp.test(f.name) : true)), + CB.chainPar((f) => f.isDirectory() - ? files$(`${dir}/${f.name}`, ignore) - : Rx.of(`${dir}/${f.name}`), + ? filesSource(`${dir}/${f.name}`, ignore) + : CB.of(`${dir}/${f.name}`), ), ); } diff --git a/packages/core/src/internal/operators.ts b/packages/core/src/internal/operators.ts deleted file mode 100644 index 99c98ee..0000000 --- a/packages/core/src/internal/operators.ts +++ /dev/null @@ -1,36 +0,0 @@ -import * as Rx from "rxjs"; - -export const bufferUntil = - (predicate: (value: T) => boolean) => - (source$: Rx.Observable) => - new Rx.Observable((s) => { - let buffer: Array = []; - - const maybeEmit = () => { - if (buffer.length) { - emit(); - } - }; - - const emit = () => { - s.next(buffer); - buffer = []; - }; - - source$.subscribe( - (v) => { - if (predicate(v)) { - maybeEmit(); - buffer = [v]; - emit(); - } else { - buffer.push(v); - } - }, - (err) => s.error(err), - () => { - maybeEmit(); - s.complete(); - }, - ); - }); diff --git a/yarn.lock b/yarn.lock index 8e13868..a8b014d 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4594,7 +4594,7 @@ rxjs@^6.6.0: dependencies: tslib "^1.9.0" -rxjs@^7.3.0, rxjs@^7.5.5: +rxjs@^7.5.5: version "7.5.5" resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-7.5.5.tgz#2ebad89af0f560f460ad5cc4213219e1f7dd4e9f" integrity sha512-sy+H0pQofO95VDmFLzyaw9xNJU4KTRSwQIGM6+iG3SypAtCiLDzpeG8sJrNCWn2Up9km+KhkvTdbkrdy+yzZdw== @@ -4840,6 +4840,29 @@ ssri@^8.0.0, ssri@^8.0.1: dependencies: minipass "^3.1.1" +strict-callbag-basics@^0.24.2: + version "0.24.2" + resolved "https://registry.yarnpkg.com/strict-callbag-basics/-/strict-callbag-basics-0.24.2.tgz#b7eb5b28877180214db912ab197438317af63072" + integrity sha512-YtP3eKy/C9sDTfLIkGNQoXUPd8FvhsXCDg67WJHkcs19N5MjmHlKTr70hWXQaq8ccIMdiYLJb3ghtj7b7bOj6A== + dependencies: + callbag-buffer "^1.0.0" + callbag-buffer-time "^1.0.0" + callbag-concat "^1.2.1" + callbag-filter "^1.1.0" + callbag-flatten "^1.7.0" + callbag-from-iter "^1.3.0" + callbag-from-obs "^1.2.0" + callbag-interval "^1.2.0" + callbag-map "^1.1.0" + callbag-scan "^1.1.0" + callbag-share "^1.3.0" + callbag-start-with "^3.1.0" + callbag-take "^1.5.0" + callbag-take-while "^2.0.0" + callbag-to-async-iterable "^1.0.0" + strict-callbag "^0.9.0" + symbol-observable "^4.0.0" + strict-callbag-basics@^0.25.0: version "0.25.0" resolved "https://registry.yarnpkg.com/strict-callbag-basics/-/strict-callbag-basics-0.25.0.tgz#bd4596b0031604c1927727d8c0044dcd5fc9d0c9"