const Docker = require('dockerode'); const WebSocket = require('ws'); const ws = new WebSocket(process.env.URL, { perMessageDeflate: false, }); ws.on('close', () => { console.log('connection closed'); process.exit(1); }); ws.on('error', (err) => { console.log('error'); console.log(err); process.exit(1); }); ws.on('message', (msg) => { console.log('unexpected message'); console.log(msg); process.exit(1); }); const containerClient = new Docker({ socketPath: '/var/run/docker.sock' }); const eventClient = new Docker({ socketPath: '/var/run/docker.sock' }); const cache = new Map(); const shouldHandleEvent = (event) => { if (event.Type !== 'container') { return false; } if (typeof event.id !== 'string') { console.log(`ignoring event due to non-string container id: ${event.id}`); return false; } switch (event.status) { case 'create': case 'destroy': case 'die': case 'health_status': case 'kill': case 'oom': case 'pause': case 'restart': case 'start': case 'stop': case 'unpause': case 'update': return true; default: return false; } }; const shouldAnnounce = (container) => { if (container.State.Status !== 'running') { return false; } if ( Object.hasOwn(container.State, 'Health') && container.State.Health.Status !== 'healthy' ) { return false; } return Object.keys(container.Config.Labels).some((key) => key.startsWith('com.keganmyers.unhinged.') ); }; function* toAnnouncements(container) { for (const [k, v] of Object.entries(container.Config.Labels)) { if (!k.startsWith('com.keganmyers.unhinged.')) { continue; } const segments = k.split('.').slice(3); if ( segments[0] !== 'http' && segments[0] !== 'http-middleware-forward-auth' ) { continue; } yield { type: segments[0], payload: JSON.parse(v), }; } } const announce = (containerId, announcements) => { for (const { type, ...announcement } of announcements) { const astr = JSON.stringify({ type: `${type}/announce`, ...announcement }); ws.send(astr); console.log(`${containerId.slice(0, 8)} announce: ${astr}`); } cache.set(containerId, JSON.stringify(announcements)); }; const retract = (containerId) => { for (const { type, ...announcement } of JSON.parse( cache.get(containerId) || '[]' )) { const astr = JSON.stringify({ type: `${type}/retract`, ...announcement }); ws.send(astr); console.log(`${containerId.slice(0, 8)} retract: ${astr}`); } cache.delete(containerId); }; const checkContainer = async (containerId) => { let container; try { container = await containerClient.getContainer(containerId).inspect(); } catch (e) { if (e.statusCode === 404) { retract(containerId); return; } throw e; } if (!shouldAnnounce(container)) { retract(containerId); return; } if (cache.has(containerId)) { console.log(`${containerId.slice(0, 8)} ok`); return; } announce(containerId, Array.from(toAnnouncements(container))); }; async function* discoverContainerUpdates() { for (const container of await eventClient.listContainers()) { if (typeof container.Id === 'string') { yield container.Id; } } let parseFails = 0; let remainder = ''; for await (const chunk of await eventClient.getEvents({ // the filters don't seem to work right... type: 'container', since: '0', })) { const chunkStr = chunk.toString(); for (const line of chunkStr.split('\n')) { if (line === '') { continue; } let event; try { event = JSON.parse(remainder + line); parseFails = 0; remainder = ''; } catch (e) { remainder += line; parseFails += 1; if (parseFails === 10) { throw new Error('too many errors while reading events'); } continue; } if (!shouldHandleEvent(event)) { // console.log(`ignoring event: ${event.Type}.${event.status}`); continue; } yield event.id; } } } (async () => { for await (const containerId of discoverContainerUpdates()) { await checkContainer(containerId); } })().then( () => { console.log('end of event stream'); process.exit(1); }, (err) => { console.log(`unexpected error: ${err}`); process.exit(1); } );