Merge remote-tracking branch 'origin/master' into merge-upstream

This commit is contained in:
David Yip
2017-11-17 17:40:00 -06:00
67 changed files with 851 additions and 218 deletions

View File

@@ -254,6 +254,26 @@ const startWorker = (workerId) => {
const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
const authorizeListAccess = (id, req, next) => {
pgPool.connect((err, client, done) => {
if (err) {
next(false);
return;
}
client.query('SELECT id, account_id FROM lists WHERE id = $1 LIMIT 1', [id], (err, result) => {
done();
if (err || result.rows.length === 0 || result.rows[0].account_id !== req.accountId) {
next(false);
return;
}
next(true);
});
});
};
const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => {
const streamType = notificationOnly ? ' (notification)' : '';
log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}${streamType}`);
@@ -414,7 +434,22 @@ const startWorker = (workerId) => {
streamFrom(`timeline:hashtag:${req.query.tag.toLowerCase()}:local`, req, streamToHttp(req, res), streamHttpEnd(req), true);
});
const wss = new WebSocket.Server({ server, verifyClient: wsVerifyClient });
app.get('/api/v1/streaming/list', (req, res) => {
const listId = req.query.list;
authorizeListAccess(listId, req, authorized => {
if (!authorized) {
res.writeHead(404, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Not found' }));
return;
}
const channel = `timeline:list:${listId}`;
streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)));
});
});
const wss = new WebSocket.Server({ server, verifyClient: wsVerifyClient });
wss.on('connection', ws => {
const req = ws.upgradeReq;
@@ -450,6 +485,19 @@ const startWorker = (workerId) => {
case 'hashtag:local':
streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}:local`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
break;
case 'list':
const listId = location.query.list;
authorizeListAccess(listId, req, authorized => {
if (!authorized) {
ws.close();
return;
}
const channel = `timeline:list:${listId}`;
streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
});
break;
default:
ws.close();
}