Skip to content

Commit

Permalink
Updated airgradient adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
caparker committed Mar 27, 2024
1 parent 6a85435 commit 791aae1
Show file tree
Hide file tree
Showing 17 changed files with 103 additions and 67 deletions.
3 changes: 1 addition & 2 deletions .env
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
LCS_API=https://api.openaq.org
STACK=lcs-etl-pipeline
SECRET_STACK=lcs-etl-pipeline
BUCKET=openaq-fetches
VERBOSE=1
TOPIC_ARN=arn:aws:sns:us-east-1:470049585876:NewFetchResults
3 changes: 2 additions & 1 deletion cdk/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ const stack = new EtlPipeline(app, "lcs-etl-pipeline", {
schedulerModuleDir: "scheduler",
sources: require('../fetcher/sources'),
bucketName: process.env.BUCKET || 'openaq-fetches',
lcsApi: process.env.LCS_API || 'https://api.openaq.org'
lcsApi: process.env.LCS_API || 'https://api.openaq.org',
topicArn: process.env.TOPIC_ARN
});


Expand Down
15 changes: 14 additions & 1 deletion cdk/stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export class EtlPipeline extends cdk.Stack {
sources,
lcsApi,
bucketName,
topicArn,
...props
}: StackProps
) {
Expand All @@ -36,6 +37,7 @@ export class EtlPipeline extends cdk.Stack {
queue,
bucket,
lcsApi,
topicArn,
});
this.buildSchedulerLambdas({
moduleDir: schedulerModuleDir,
Expand All @@ -49,6 +51,7 @@ export class EtlPipeline extends cdk.Stack {
queue: sqs.Queue;
bucket: s3.IBucket;
lcsApi: string;
topicArn: string;
}): lambda.Function {
this.prepareNodeModules(props.moduleDir);
const handler = new lambda.Function(this, 'Fetcher', {
Expand All @@ -61,8 +64,8 @@ export class EtlPipeline extends cdk.Stack {
environment: {
BUCKET: props.bucket.bucketName,
STACK: cdk.Stack.of(this).stackName,
VERBOSE: '1',
LCS_API: props.lcsApi,
TOPIC_ARN: props.topicArn,
},
});
handler.addEventSource(
Expand All @@ -72,6 +75,14 @@ export class EtlPipeline extends cdk.Stack {
);
props.queue.grantConsumeMessages(handler);
props.bucket.grantReadWrite(handler);
handler.addToRolePolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['sns:Publish'],
resources: [props.topicArn],
})
);

handler.addToRolePolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
Expand All @@ -86,6 +97,7 @@ export class EtlPipeline extends cdk.Stack {
],
})
);

return handler;
}

Expand Down Expand Up @@ -147,6 +159,7 @@ interface StackProps extends cdk.StackProps {
fetcherModuleDir: string;
schedulerModuleDir: string;
lcsApi: string;
topicArn: string;
bucketName: string;
sources: Source[];
}
23 changes: 9 additions & 14 deletions fetcher/lib/providers.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class Providers {
*/
async publish(message, subject) {
console.log('Publishing:', subject, message);
if (process.env.TOPIC_ARN) {
if (process.env.TOPIC_ARN && message) {
const cmd = new PublishCommand({
TopicArn: process.env.TOPIC_ARN,
Subject: subject,
Expand Down Expand Up @@ -111,20 +111,21 @@ class Providers {
prettyPrintStation(currentData);
}
} catch (err) {
if (err.statusCode !== 404) throw err;
if (err.Code !== 'NoSuchKey') throw err;
}

const compressedString = await gzip(newData);

if (!DRYRUN) {
if (VERBOSE) console.debug(`Saving station to ${Bucket}/${Key}`);
await putObject({
await putObject(
compressedString,

Check failure on line 122 in fetcher/lib/providers.js

View workflow job for this annotation

GitHub Actions / build

Expected indentation of 16 spaces but found 8 tabs
Bucket,
Key,
Body: compressedString,
ContentType: 'application/json',
ContentEncoding: 'gzip'
}).promise();
false,
'application/json',
'gzip'
);
}
if (VERBOSE) console.log(`finished station: ${providerStation}\n------------------------`);
}
Expand Down Expand Up @@ -152,13 +153,7 @@ class Providers {
}
if (VERBOSE) console.debug(`Saving measurements to ${Bucket}/${Key}`);

return putObject({
Bucket,
Key,
Body: compressedString,
ContentType: 'text/csv',
ContentEncoding: 'gzip'
}).promise();
return await putObject(compressedString, Bucket, Key, false, 'text/csv', 'gzip');
}
}

Expand Down
13 changes: 10 additions & 3 deletions fetcher/lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,20 @@ const request = promisify(require('request'));

const { SecretsManagerClient, GetSecretValueCommand } = require('@aws-sdk/client-secrets-manager');
const { S3Client, GetObjectCommand, PutObjectCommand } = require('@aws-sdk/client-s3');
const { NodeHttpHandler } = require('@smithy/node-http-handler');

Check failure on line 7 in fetcher/lib/utils.js

View workflow job for this annotation

GitHub Actions / build

'NodeHttpHandler' is assigned a value but never used
const https = require('https');

Check failure on line 8 in fetcher/lib/utils.js

View workflow job for this annotation

GitHub Actions / build

'https' is assigned a value but never used

const VERBOSE = !!process.env.VERBOSE;
const DRYRUN = !!process.env.DRYRUN;

const s3 = new S3Client({
maxRetries: 10
maxRetries: 10,

Check failure on line 14 in fetcher/lib/utils.js

View workflow job for this annotation

GitHub Actions / build

Unexpected trailing comma
// requestHandler: new NodeHttpHandler({

Check failure on line 15 in fetcher/lib/utils.js

View workflow job for this annotation

GitHub Actions / build

Expected indentation of 4 spaces but found 2 tabs
// httpsAgent: new https.Agent({

Check failure on line 16 in fetcher/lib/utils.js

View workflow job for this annotation

GitHub Actions / build

Expected indentation of 4 spaces but found 2 tabs
// maxSockets: 1000

Check failure on line 17 in fetcher/lib/utils.js

View workflow job for this annotation

GitHub Actions / build

Expected indentation of 4 spaces but found 2 tabs
// }),

Check failure on line 18 in fetcher/lib/utils.js

View workflow job for this annotation

GitHub Actions / build

Expected indentation of 4 spaces but found 2 tabs
// socketAcquisitionWarningTimeout: 6000,

Check failure on line 19 in fetcher/lib/utils.js

View workflow job for this annotation

GitHub Actions / build

Expected indentation of 4 spaces but found 2 tabs
// })

Check failure on line 20 in fetcher/lib/utils.js

View workflow job for this annotation

GitHub Actions / build

Expected indentation of 4 spaces but found 2 tabs
});

const gzip = promisify(zlib.gzip);
Expand All @@ -31,8 +39,7 @@ async function getObject(Bucket, Key) {
return currentData;
}

async function putObject(text, Bucket, Key, gzip = true, ContentType = 'application/json') {
let ContentEncoding = null;
async function putObject(text, Bucket, Key, gzip = true, ContentType = 'application/json', ContentEncoding = null) {
if (gzip) {
text = await gzip(text);
ContentEncoding = 'gzip';
Expand Down
25 changes: 18 additions & 7 deletions fetcher/providers/airgradient.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const lookup = {
'pm01': ['pm1', 'µg/m³'],
'pm02': ['pm25', 'µg/m³'],
'pm10': ['pm10', 'µg/m³'],
'pm003count': ['um003', 'particles/cm³'],
'pm003Count': ['um003', 'particles/cm³'],
'rhum': ['relativehumidity', '%'],
'atmp': ['temperature', 'c']
};
Expand Down Expand Up @@ -47,15 +47,26 @@ function getLatestReading(sensorData) {
// and the data we are looking for is not always ready when we first check
// so we are going back 3 hrs in order to cover missing data
// if we still see gaps we can increase the lag time
const d = new Date();
d.setHours(d.getHours() - 3);
d.setMinutes(0);
d.setSeconds(0);
d.setMilliseconds(0);
const offset = 3;
const from = new Date();
const to = new Date();
from.setHours(from.getHours() - offset);
from.setMinutes(0);
from.setSeconds(0);
from.setMilliseconds(0);

// the current hour is always wrong because its a rolling average
to.setHours(to.getHours() - 1);
to.setMinutes(0);
to.setSeconds(0);
to.setMilliseconds(0);

const params = Object.keys(lookup);
const measurements = sensorData
.filter((o) => new Date(o.date).getTime() >= d.getTime())
.filter((o) => {
const now = new Date(o.date).getTime();
return now >= from.getTime() && now <= to.getTime();
})
.map((o) => {
const timestamp = new Date(o.date);
// convert to hour ending to match our system
Expand Down
15 changes: 9 additions & 6 deletions fetcher/providers/habitatmap.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ const lookup = {

async function processor(source) {
const measurands = await Measurand.getSupportedMeasurands(lookup);
await process_fixed_locations(source, measurands);
await process_mobile_locations(source, measurands);
const fixed = await process_fixed_locations(source, measurands);
const mobile = await process_mobile_locations(source, measurands);
return fixed;
}

async function process_fixed_locations(source, measurands) {
Expand Down Expand Up @@ -64,10 +65,11 @@ async function process_fixed_locations(source, measurands) {
}

await Promise.all(stations);
console.log(`ok - all ${stations.length} fixed stations pushed`);
//console.log(`ok - all ${stations.length} fixed stations pushed`);

await Providers.put_measures(source.provider, measures);
console.log(`ok - all ${measures.length} fixed measures pushed`);
//console.log(`ok - all ${measures.length} fixed measures pushed`);
return { locations: stations.length, measures: measures.length, from: measures.from, to: measures.to };
}

async function process_mobile_locations(source, measurands) {
Expand Down Expand Up @@ -119,10 +121,11 @@ async function process_mobile_locations(source, measurands) {
}

await Promise.all(stations);
console.log(`ok - all ${stations.length} mobile stations pushed`);
//console.log(`ok - all ${stations.length} mobile stations pushed`);

await Providers.put_measures(source.provider, measures);
console.log(`ok - all ${measures.length} mobile measures pushed`);
//console.log(`ok - all ${measures.length} mobile measures pushed`);
return { locations: stations.length, measures: measures.length, from: measures.from, to: measures.to };
}

async function fixed_locations(source) {
Expand Down
2 changes: 1 addition & 1 deletion fetcher/providers/purpleair.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async function fetchSensorData(source) {
// if we are looking for a specific sourceid lets not limit
if (!process.env.SOURCEID) {
// Filter results to only include sensors modified or updated within the last number of seconds.
url.searchParams.append('max_age', 75);
url.searchParams.append('max_age', 100);
// Filter results to only include outdoor sensors.
url.searchParams.append('location_type', 0);
}
Expand Down
3 changes: 2 additions & 1 deletion fetcher/sources/airgradient.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"schema": "v1",
"provider": "airgradient",
"frequency": "hour",
"secretKey": "airgradient",
"secretKey": "airgradient",
"active": true,
"meta": {
"url": "https://api.airgradient.com"
}
Expand Down
15 changes: 8 additions & 7 deletions fetcher/sources/clarity.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
{
"schema": "v1",
"provider": "clarity",
"frequency": "hour",
"secretKey": "clarity-keys",
"meta": {
"url": "https://clarity-data-api.clarity.io"
}
"schema": "v1",
"provider": "clarity",
"frequency": "hour",
"secretKey": "clarity-keys",
"active": true,
"meta": {
"url": "https://clarity-data-api.clarity.io"
}
}
13 changes: 7 additions & 6 deletions fetcher/sources/cmu.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
{
"schema": "v1",
"provider": "cmu",
"frequency": "hour",
"meta": {
"folderId": "1Mp_a-OyGGlk5tGkezYK41iZ2qybnrPzp"
}
"schema": "v1",
"provider": "cmu",
"frequency": "hour",
"active": false,
"meta": {
"folderId": "1Mp_a-OyGGlk5tGkezYK41iZ2qybnrPzp"
}
}
13 changes: 7 additions & 6 deletions fetcher/sources/habitatmap.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
{
"schema": "v1",
"provider": "habitatmap",
"frequency": "minute",
"meta": {
"url": "http://aircasting.habitatmap.org"
}
"schema": "v1",
"provider": "habitatmap",
"frequency": "minute",
"active": true,
"meta": {
"url": "http://aircasting.habitatmap.org"
}
}
3 changes: 2 additions & 1 deletion fetcher/sources/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ const fs = require('fs');
/** @type {Source[]} */
module.exports = fs.readdirSync(__dirname)
.filter((f) => f.endsWith('.json'))
.map((f) => require(`./${f}`));
.map((f) => require(`./${f}`))
.filter((f) => f.active);
15 changes: 8 additions & 7 deletions fetcher/sources/purpleair.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
{
"schema": "v1",
"provider": "purpleair",
"frequency": "minute",
"secretKey": "purpleair",
"meta": {
"url": "https://api.purpleair.com/"
}
"schema": "v1",
"provider": "purpleair",
"frequency": "minute",
"secretKey": "purpleair",
"active": false,
"meta": {
"url": "https://api.purpleair.com/"
}
}
2 changes: 1 addition & 1 deletion fetcher/sources/senstate.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
"schema": "v1",
"provider": "senstate",
"frequency": "minute",
"active": true,
"meta": {
"url": "https://open-data.senstate.cloud"
}
}

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"@aws-sdk/client-secrets-manager": "^3.523.0",
"@aws-sdk/client-sns": "^3.521.0",
"@aws-sdk/client-sqs": "^3.525.0",
"@smithy/node-http-handler": "^2.4.1",
"csv-parser": "^3.0.0",
"csv-writer": "^1.6.0",
"dayjs": "^1.11.10",
Expand Down
6 changes: 3 additions & 3 deletions test/source.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ const ajv = new Ajv({
});

ajv.addMetaSchema(
require('ajv/lib/refs/json-schema-draft-04.json'),
'http://json-schema.org/draft-04/schema#'
require('ajv/lib/refs/json-schema-draft-07.json'),
'http://json-schema.org/draft-07/schema#'
);

const validate = ajv.compile(schema);
Expand All @@ -21,7 +21,7 @@ tape('validate', (t) => {

// find all the sources, has to be synchronous for tape
sources.forEach((source) => {
tape(`tests for ${source}`, (t) => {
tape(`tests for ${source}`, (t) => {
try {
const valid = validate(source);

Expand Down

0 comments on commit 791aae1

Please sign in to comment.