traefik/announcer/index.js
2023-06-16 09:11:54 -05:00

191 lines
4.5 KiB
JavaScript

const Docker = require('dockerode');
const WebSocket = require('ws');
const ws = new WebSocket(process.env.URL, {
perMessageDeflate: false,
});
ws.on('close', () => {
console.log('connection closed');
process.exit(1);
});
ws.on('error', (err) => {
console.log('error');
console.log(err);
process.exit(1);
});
ws.on('message', (msg) => {
console.log('unexpected message');
console.log(msg);
process.exit(1);
});
const containerClient = new Docker({ socketPath: '/var/run/docker.sock' });
const eventClient = new Docker({ socketPath: '/var/run/docker.sock' });
const cache = new Map();
const shouldHandleEvent = (event) => {
if (event.Type !== 'container') {
return false;
}
if (typeof event.id !== 'string') {
console.log(`ignoring event due to non-string container id: ${event.id}`);
return false;
}
switch (event.status) {
case 'create':
case 'destroy':
case 'die':
case 'health_status':
case 'kill':
case 'oom':
case 'pause':
case 'restart':
case 'start':
case 'stop':
case 'unpause':
case 'update':
return true;
default:
return false;
}
};
const shouldAnnounce = (container) => {
if (container.State.Status !== 'running') {
return false;
}
if (
Object.hasOwn(container.State, 'Health') &&
container.State.Health.Status !== 'healthy'
) {
return false;
}
return Object.keys(container.Config.Labels).some((key) =>
key.startsWith('com.keganmyers.unhinged.')
);
};
function* toAnnouncements(container) {
for (const [k, v] of Object.entries(container.Config.Labels)) {
if (!k.startsWith('com.keganmyers.unhinged.')) {
continue;
}
const segments = k.split('.').slice(3);
if (
segments[0] !== 'http' &&
segments[0] !== 'http-middleware-forward-auth'
) {
continue;
}
yield {
type: segments[0],
payload: JSON.parse(v),
};
}
}
const announce = (containerId, announcements) => {
for (const { type, ...announcement } of announcements) {
const astr = JSON.stringify({ type: `${type}/announce`, ...announcement });
ws.send(astr);
console.log(`${containerId.slice(0, 8)} announce: ${astr}`);
}
cache.set(containerId, JSON.stringify(announcements));
};
const retract = (containerId) => {
for (const announcement of JSON.parse(cache.get(containerId) || '[]')) {
const astr = JSON.stringify({ type: `${type}/retract`, ...announcement });
ws.send(astr);
console.log(`${containerId.slice(0, 8)} retract: ${astr}`);
}
cache.delete(containerId);
};
const checkContainer = async (containerId) => {
let container;
try {
container = await containerClient.getContainer(containerId).inspect();
} catch (e) {
if (e.statusCode === 404) {
retract(containerId);
return;
}
throw e;
}
if (!shouldAnnounce(container)) {
retract(containerId);
return;
}
if (cache.has(containerId)) {
console.log(`${containerId.slice(0, 8)} ok`);
return;
}
announce(containerId, Array.from(toAnnouncements(container)));
};
async function* discoverContainerUpdates() {
for (const container of await eventClient.listContainers()) {
if (typeof container.Id === 'string') {
yield container.Id;
}
}
let parseFails = 0;
let remainder = '';
for await (const chunk of await eventClient.getEvents({
// the filters don't seem to work right...
type: 'container',
since: '0',
})) {
const chunkStr = chunk.toString();
for (const line of chunkStr.split('\n')) {
if (line === '') {
continue;
}
let event;
try {
event = JSON.parse(remainder + line);
parseFails = 0;
remainder = '';
} catch (e) {
remainder += line;
parseFails += 1;
if (parseFails === 10) {
throw new Error('too many errors while reading events');
}
continue;
}
if (!shouldHandleEvent(event)) {
// console.log(`ignoring event: ${event.Type}.${event.status}`);
continue;
}
yield event.id;
}
}
}
(async () => {
for await (const containerId of discoverContainerUpdates()) {
await checkContainer(containerId);
}
})().then(
() => {
console.log('end of event stream');
process.exit(1);
},
(err) => {
console.log(`unexpected error: ${err}`);
process.exit(1);
}
);