Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/node_modules/
.plan/
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
sudo: required
language: node_js
node_js:
- '6'
- '10'
services:
- docker

Expand Down
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
FROM node:6
FROM node:10

WORKDIR '/opt/socket-redis'

ADD https://git.io/vyCoJ /usr/local/bin/wait-for-it
RUN chmod a+x /usr/local/bin/wait-for-it
COPY package.json ./
RUN npm install --only=production
COPY package.json package-lock.json ./
RUN npm ci --only=production
COPY . ./

EXPOSE 8085 8090 8091
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ Install dependencies:
```
npm install
```
Requires Node.js 10 or newer.

Build the docker image:
```
Expand Down
10 changes: 2 additions & 8 deletions bin/socket-redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,9 @@
var socketRedis = require('../socket-redis.js');
var childProcess = require('child_process');
var utils = require('../lib/utils.js');
var optimist = require('optimist');
var cliOptions = require('../lib/cli-options.js');
var fs = require('fs');
var argv = optimist.default({
'log-dir': null,
'redis-host': 'localhost',
'redis-port': 6379,
'redis-pass': null
}).argv;
var argv = cliOptions.parse(process.argv.slice(2));
var redisConnection = {
'host': argv['redis-host'],
'port': argv['redis-port']
Expand All @@ -30,7 +25,6 @@ if (logDir) {
}

if (!process.send) {
argv = optimist.default('socket-ports', '8090').default('status-port', '8085').argv;
var socketPorts = String(argv['socket-ports']).split(',');
var publisher = new socketRedis.Server(redisConnection, argv['status-port'], argv['status-secret']);

Expand Down
33 changes: 33 additions & 0 deletions lib/cli-options.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
var minimist = require('minimist');

var DEFAULTS = {
'log-dir': null,
'redis-host': 'localhost',
'redis-port': 6379,
'redis-pass': null,
'socket-ports': '8090',
'status-port': '8085'
};

function parse(args) {
return minimist(args, {
default: DEFAULTS,
string: [
'log-dir',
'redis-host',
'redis-pass',
'socket-ports',
'status-port',
'status-secret',
'sockjs-client-url',
'ssl-key',
'ssl-cert',
'ssl-pfx',
'ssl-passphrase'
]
});
}

module.exports = {
parse: parse
};
12 changes: 9 additions & 3 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ var redis = require('redis');
var _ = require('underscore');
var Promise = require('bluebird');
var StatusServer = require('./status-server');
var validator = require('validator');

function isMissing(value) {
return typeof value === 'undefined' || value === null || value === '';
}

/**
* @param {Object} redisConnection
Expand Down Expand Up @@ -174,7 +177,10 @@ Server.prototype._createRedisClient = function(connection, alias) {
var retry_strategy = function(options) {
return Math.min(options.attempt * 100, 1000);
};
var options = _.extend(connection, {retry_strategy: retry_strategy});
var options = _.extend({}, connection, {retry_strategy: retry_strategy});
if (isMissing(options.password)) {
delete options.password;
}
var client = redis.createClient(options);

['error', 'warning', 'connect', 'ready', 'reconnecting', 'end'].forEach(function(event) {
Expand Down Expand Up @@ -234,7 +240,7 @@ Server.prototype._handleClientDownMessage = function(event) {
if (typeof eventData.data === 'undefined') {
eventData.data = null;
}
if (validator.isNull(eventData.channel) || validator.isNull(eventData.event)) {
if (isMissing(eventData.channel) || isMissing(eventData.event)) {
throw new Error('Missing channel or event: `' + JSON.stringify(eventData) + '`')
}
this._sendDownPublish(eventData.channel, eventData.event, eventData.data);
Expand Down
39 changes: 32 additions & 7 deletions lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,40 @@ var mkdirRecursive = function(directory) {

var logProcessInto = function(process, logFile) {
mkdirRecursive(path.dirname(logFile));
log4js.clearAppenders();
log4js.loadAppender('file');
log4js.addAppender(log4js.appenders.file(logFile));
log4js.configure({
appenders: {
socketRedis: {
type: 'file',
filename: logFile
}
},
categories: {
default: {
appenders: ['socketRedis'],
level: 'debug'
}
}
});
var logger = log4js.getLogger();
process.stdout.write = function(content) {
return logger.debug(content);
process.stdout.write = function(content, encoding, callback) {
logger.debug(content);
if (typeof encoding === 'function') {
encoding();
}
if (typeof callback === 'function') {
callback();
}
return true;
};
process.stderr.write = function(content) {
return logger.error(content);
process.stderr.write = function(content, encoding, callback) {
logger.error(content);
if (typeof encoding === 'function') {
encoding();
}
if (typeof callback === 'function') {
callback();
}
return true;
};
};

Expand Down
25 changes: 19 additions & 6 deletions lib/worker-connection.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
var util = require('util');
var EventEmitter = require('events');
var _ = require('underscore');
var validator = require('validator');
var Subscriber = require('./subscriber');

function isMissing(value) {
return typeof value === 'undefined' || value === null || value === '';
}

function isInteger(value) {
if (typeof value === 'number') {
return isFinite(value) && Math.floor(value) === value;
}
if (typeof value !== 'string') {
return false;
}
return /^-?\d+$/.test(value);
}

/**
* @param {SockJSConnection} connection
* @param {ChannelList} workerChannels
Expand Down Expand Up @@ -127,29 +140,29 @@ WorkerConnection.prototype._onData = function(data) {
try {
data = JSON.parse(data);

if (validator.isNull(data.event)) {
if (isMissing(data.event)) {
throw new Error('Missing `data.event`: `' + JSON.stringify(data) + '`')
}
var eventData = data.data;
switch (data.event) {
case 'subscribe':
if (validator.isNull(eventData.channel) || validator.isNull(eventData.data) || !validator.isInt(eventData.start)) {
if (isMissing(eventData.channel) || isMissing(eventData.data) || !isInteger(eventData.start)) {
throw new Error('Missing data: `' + JSON.stringify(eventData) + '`')
}

this.subscribe(eventData.channel, eventData.data, eventData.start);
break;

case 'unsubscribe':
if (validator.isNull(eventData.channel)) {
if (isMissing(eventData.channel)) {
throw new Error('Missing `data.channel`: `' + JSON.stringify(eventData) + '`')
}

this.unsubscribe(eventData.channel);
break;

case 'message':
if (validator.isNull(eventData.data)) {
if (isMissing(eventData.data)) {
throw new Error('Missing `data.data`: `' + JSON.stringify(eventData) + '`')
}

Expand All @@ -160,7 +173,7 @@ WorkerConnection.prototype._onData = function(data) {
if (typeof eventData.data === 'undefined') {
eventData.data = null;
}
if (validator.isNull(eventData.channel) || validator.isNull(eventData.event)) {
if (isMissing(eventData.channel) || isMissing(eventData.event)) {
throw new Error('Missing channel or event: `' + JSON.stringify(eventData) + '`')
}

Expand Down
Loading