Skip to content

Commit

Permalink
Merge branch 'callbag'
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Smart committed Apr 6, 2022
2 parents da381b3 + 78afa52 commit 580360b
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 137 deletions.
2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"ini": "^2.0.0",
"js-yaml": "^4.0.0",
"ramda": "^0.27.1",
"rxjs": "^7.3.0"
"strict-callbag-basics": "^0.24.2"
},
"gitHead": "a5aa6a6e10b62a957de11c4bc55ac4d3ee9a2ece"
}
62 changes: 34 additions & 28 deletions packages/core/src/generate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@ import * as fs from "fs";
import * as Ini from "ini";
import * as Yaml from "js-yaml";
import * as path from "path";
import * as Rx from "rxjs";
import * as RxOp from "rxjs/operators";
import { configs$, configsToFiles } from "./internal/config";
import { configs, configsToFiles } from "./internal/config";
import {
calculatePatch,
executePatch,
IInputContents,
toFileTree,
} from "./internal/fileTrees";
import { files$ } from "./internal/fs";
import { filesSource } from "./internal/fs";
import * as CB from "strict-callbag-basics";

export interface IGenerateOpts {
context: any;
Expand Down Expand Up @@ -40,30 +39,37 @@ export function generate(
fs.mkdirSync(outDir);
}

const inputConfigs$ = configs$(inputDir, context, formats, format, ignore);
const inputConfigsArray$ = inputConfigs$.pipe(RxOp.toArray());
const outputFT$ = files$(outDir).pipe(toFileTree(outDir));
const inputFT$ = inputConfigs$.pipe(configsToFiles(), toFileTree(inputDir));

return Rx.zip(inputConfigsArray$, inputFT$, outputFT$)
.pipe(
RxOp.flatMap(([configs, inputFT, outputFT]) => {
const contents = configs.reduce((acc, c) => {
acc[c.file] = c.contents;
return acc;
}, {} as any) as IInputContents;

const patch = calculatePatch(inputFT, outputFT, {
contents,
outDir,
});
return Rx.from(patch).pipe(executePatch(contents, outDir));
}),
)
.toPromise()
.finally(() => {
process.chdir(startDir);
});
const inputConfigs = CB.share(
configs(inputDir, context, formats, format, ignore),
);
const inputConfigsArray = CB.toArray(inputConfigs);
const outputFT = CB.pipe(filesSource(outDir), toFileTree(outDir));
const inputFT = CB.pipe(inputConfigs, configsToFiles(), toFileTree(inputDir));

return CB.pipe(
inputConfigsArray,

CB.zip(inputFT),
CB.zip(outputFT),

CB.chain(([[configs, inputFT], outputFT]) => {
const contents = configs.reduce((acc, c) => {
acc[c.file] = c.contents;
return acc;
}, {} as any) as IInputContents;

const patch = calculatePatch(inputFT, outputFT, {
contents,
outDir,
});

return CB.pipe(CB.fromIter(patch), executePatch(contents, outDir));
}),

CB.run_,
).finally(() => {
process.chdir(startDir);
});
}

export interface IFormat {
Expand Down
83 changes: 45 additions & 38 deletions packages/core/src/internal/config.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import * as Rx from "rxjs";
import * as RxOp from "rxjs/operators";
import * as Fs from "fs";
import * as path from "path";
import * as R from "ramda";
import { files$ } from "./fs";
import * as CB from "strict-callbag-basics";
import { IFormat } from "../generate";
import { promises as fsp } from "fs";
import { filesSource } from "./fs";

export interface IConfig
extends Readonly<{
Expand All @@ -15,23 +14,24 @@ export interface IConfig
/**
* Streams a list of configuration files from a source directory.
*/
export function configs$(
export function configs(
dir: string,
context: any,
formats: Map<string, IFormat>,
format: string,
ignore?: string,
): Rx.Observable<IConfig> {
return files$(dir, ignore).pipe(
RxOp.flatMap((file) => {
if ([".js", ".ts"].includes(path.extname(file))) {
return Rx.of(file).pipe(
resolveConfigFromExports(dir, context, formats, format),
);
}

return Rx.of(file).pipe(resolveConfigFromContents(dir));
}),
): CB.Source<IConfig> {
return CB.pipe(
filesSource(dir, ignore),
CB.groupBy((file) => [".js", ".ts"].includes(path.extname(file))),
CB.chainPar(([source, isScript]) =>
isScript
? CB.pipe(
source,
resolveConfigFromExports(dir, context, formats, format),
)
: CB.pipe(source, resolveConfigFromContents(dir)),
),
);
}

Expand All @@ -40,19 +40,20 @@ export function configs$(
*/
export const resolveConfigFromExports =
(dir: string, context: any, formats: Map<string, IFormat>, format: string) =>
(input$: Rx.Observable<string>) =>
input$.pipe(
(inputSource: CB.Source<string>): CB.Source<IConfig> =>
CB.pipe(
inputSource,
// For each file, require() it and load its exports
RxOp.map((file) => ({
CB.map((file) => ({
file,
exports: resolveFile(file),
})),

// We only want files with a "default" export
RxOp.filter(R.hasPath(["exports", "default"])),
CB.filter(R.hasPath(["exports", "default"])),

// Remove file extensions and de-nest "default" exports
RxOp.map(({ file, exports }) => ({
CB.map(({ file, exports }) => ({
relativePath: R.pipe(
R.split("."),
R.remove(-1, 1),
Expand All @@ -62,7 +63,7 @@ export const resolveConfigFromExports =
})),

// If we have an "index" file, don't create a directory for it
RxOp.map(
CB.map(
R.when(
R.pipe(
(s: { relativePath: string; exports: any }) => s,
Expand All @@ -75,9 +76,13 @@ export const resolveConfigFromExports =
),

// Map functions / promises to the actual configuration
RxOp.flatMap(({ relativePath, exports }) =>
Rx.from(resolveContents(context, exports)).pipe(
RxOp.map((contents) => ({
CB.chainPar(({ relativePath, exports }) =>
CB.pipe(
CB.fromPromise_(
() => resolveContents(context, exports),
(e) => e,
),
CB.map((contents) => ({
relativePath,
contents,
})),
Expand All @@ -86,10 +91,9 @@ export const resolveConfigFromExports =

// For each key in the configuration create a file with the correct
// extension for the format.
RxOp.flatMap(({ relativePath, contents }) =>
Rx.from(Object.keys(contents)).pipe(
// Determine the format for the file
RxOp.map((file) => {
CB.chain(({ relativePath, contents }) =>
CB.fromIter(
Object.keys(contents).map((file) => {
const fileContents = contents[file];
let formatOverride = fileFormat(formats)(file);
return formatOverride
Expand All @@ -109,7 +113,7 @@ export const resolveConfigFromExports =

// Map functions / promises for file contents, then encode it to the correct
// format.
RxOp.map(({ file, format, contents }) => ({
CB.map(({ file, format, contents }) => ({
file,
contents: encodeContents(formats, format, contents),
})),
Expand All @@ -119,20 +123,23 @@ export const resolveConfigFromExports =
* Streams configuration files from non-module files.
*/
export const resolveConfigFromContents =
(dir: string) => (input$: Rx.Observable<string>) =>
input$.pipe(
(dir: string) =>
(inputSource: CB.Source<string>): CB.Source<IConfig> =>
CB.pipe(
inputSource,
// Load contents from file
RxOp.flatMap((file) =>
Rx.from(fsp.readFile(file)).pipe(
RxOp.map((contents) => ({
CB.chainPar((file) =>
CB.pipe(
CB.fromCallback<Buffer>((cb) => Fs.readFile(file, cb)),
CB.map((contents) => ({
file,
contents,
})),
),
),

// Make file path relative
RxOp.map(({ file, contents }) => ({
CB.map(({ file, contents }) => ({
file: path.relative(dir, file),
contents,
})),
Expand All @@ -142,8 +149,8 @@ export const resolveConfigFromContents =
* Transforms config objects to file paths
*/
export function configsToFiles() {
return (input$: Rx.Observable<IConfig>) =>
input$.pipe(RxOp.map((config) => config.file));
return (inputSource: CB.Source<IConfig>) =>
CB.map_(inputSource, (c) => c.file);
}

function resolveFile(file: string) {
Expand Down
64 changes: 42 additions & 22 deletions packages/core/src/internal/fileTrees.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
import * as Rx from "rxjs";
import * as RxOp from "rxjs/operators";
import * as Fs from "fs";
import { Operation } from "fs-tree-diff";
import * as fs from "fs";
import { promises as fsp } from "fs";
import * as path from "path";
import { bufferUntil } from "./operators";
import * as CB from "strict-callbag-basics";

import FSTree = require("fs-tree-diff");

export function toFileTree(dir: string) {
return (input$: Rx.Observable<string>) =>
input$.pipe(
RxOp.map((file) => path.relative(dir, file)),
RxOp.toArray(),
RxOp.map((files) => FSTree.fromPaths(files, { sortAndExpand: true })),
return (inputSource: CB.Source<string>) =>
CB.pipe(
inputSource,
CB.map((file) => path.relative(dir, file)),
CB.toArray,
CB.map((files) => FSTree.fromPaths(files, { sortAndExpand: true })),
);
}

Expand All @@ -36,35 +34,57 @@ export function calculatePatch(
typeof contents[a.relativePath] === "string"
? Buffer.from(contents[a.relativePath])
: (contents[a.relativePath] as Buffer);
const bContent = fs.readFileSync(path.join(outDir, b.relativePath));
const bContent = Fs.readFileSync(path.join(outDir, b.relativePath));

return aContent.compare(bContent) === 0;
});
}

export type ExecutePatchError = {
_tag: "fs";
op: string;
cause: unknown;
};

const runFs = (op: string, f: (cb: CB.Callback<void, unknown>) => void) =>
CB.pipe(
CB.fromCallback(f),
CB.mapError(
(cause): ExecutePatchError => ({
_tag: "fs",
op,
cause,
}),
),
);

export function executePatch(contents: IInputContents, outDir: string) {
return (input$: Rx.Observable<Operation>) =>
input$.pipe(
bufferUntil(([op]) => op === "mkdir" || op === "rmdir"),
RxOp.concatMap((ops) =>
Rx.from(ops).pipe(
RxOp.mergeMap(([op, file, _entry]) => {
return (source: CB.Source<Operation>) =>
CB.pipe(
source,
CB.batchUntil(([op]) => op === "mkdir" || op === "rmdir", true),
CB.chain((ops) =>
CB.chainPar_(
CB.fromIter(ops),
([op, file, _entry]): CB.Source<void, ExecutePatchError> => {
const path = `${outDir}/${file}`;

console.log(op.toUpperCase(), file);

switch (op) {
case "mkdir":
return Rx.from(fsp.mkdir(path));
return runFs(op, (cb) => Fs.mkdir(path, cb));
case "rmdir":
return Rx.from(fsp.rmdir(path));
return runFs(op, (cb) => Fs.rmdir(path, cb));
case "change":
case "create":
return Rx.from(fsp.writeFile(path, contents[file]));
return runFs(op, (cb) =>
Fs.writeFile(path, contents[file], cb),
);
case "unlink":
return Rx.from(fsp.unlink(path));
return runFs(op, (cb) => Fs.unlink(path, cb));
}
}),
},
),
),
);
Expand Down
31 changes: 20 additions & 11 deletions packages/core/src/internal/fs.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,29 @@
import * as Rx from "rxjs";
import * as RxOp from "rxjs/operators";
import { promises as fs } from "fs";
import * as FS from "fs";
import * as CB from "strict-callbag-basics";

const glob = require("glob-to-regexp");

export function files$(dir: string, ignore?: string): Rx.Observable<string> {
export type FilesError = { _tag: "readdir"; cause: unknown };

export function filesSource(
dir: string,
ignore?: string,
): CB.Source<string, FilesError> {
const ignoreRegExp = ignore ? glob(ignore) : undefined;

return Rx.from(fs.readdir(dir, { withFileTypes: true })).pipe(
RxOp.flatMap((f) => f),
RxOp.filter((f) => !f.name.startsWith(".")),
RxOp.filter((f) => (ignoreRegExp ? !ignoreRegExp.test(f.name) : true)),
RxOp.flatMap((f) =>
return CB.pipe(
CB.fromCallback<FS.Dirent[], unknown>((cb) =>
FS.readdir(dir, { withFileTypes: true }, cb),
),
CB.mapError((cause): FilesError => ({ _tag: "readdir", cause })),

CB.chain((f) => CB.fromIter(f)),
CB.filter((f) => !f.name.startsWith(".")),
CB.filter((f) => (ignoreRegExp ? !ignoreRegExp.test(f.name) : true)),
CB.chainPar((f) =>
f.isDirectory()
? files$(`${dir}/${f.name}`, ignore)
: Rx.of(`${dir}/${f.name}`),
? filesSource(`${dir}/${f.name}`, ignore)
: CB.of(`${dir}/${f.name}`),
),
);
}
Loading

0 comments on commit 580360b

Please sign in to comment.