I could really use some help on this.
It is almost there. The problem now is that dynamic subscriptions seem to publish to other subscriptions.
(async () => {
let apolloConfig;
let schema;
const GATEWAY_ENDPOINT = process.env.GATEWAY_ENDPOINT;
const APOLLO_KEY = process.env.APOLLO_KEY;
const APOLLO_GRAPH_VARIANT = process.env.APOLLO_GRAPH_VARIANT || 'current';
const HOST_PORT = process.env.HOST_PORT;
const isProd = process.env.NODE_ENV === 'prod';
const DSC = {};
const gateway = new ApolloGateway();
gateway.onSchemaChange(gatewaySchema => {
schema = makeSubscriptionSchema({ gatewaySchema, typeDefs, resolvers });
});
apolloConfig = getGatewayApolloConfig(APOLLO_KEY, APOLLO_GRAPH_VARIANT);
await gateway.load({ ...(apolloConfig && { apollo: apolloConfig }) });
const httpServer = http.createServer(function weServeSocketsOnly(_, res) {
res.writeHead(200);
res.end();
});
const wsServer = new ws.Server({
server: httpServer,
path: '/',
});
useServer(
{
execute,
subscribe,
connectionInitWaitTimeout: undefined,
context: (context, message, args) => {
const subID = message?.id;
const headers = context.connectionParams;
const liveAlarmsDataSource = new LiveAlarmsDataSource(GATEWAY_ENDPOINT);
DSC[subID] = () =>
liveAlarmsDataSource.updateContext({
context: { subscriptions: {} },
});
liveAlarmsDataSource.initialize({ context, cache: false });
const dataSources = {
liveAlarmsDataSource: liveAlarmsDataSource,
};
// Return the complete context for the request
return { dataSources, headers, subID: message?.id };
},
onClose: context => {
// console.log('WS SERVER CLOSED');
if (context?.subscriptions) {
for (const [key, fn] of Object.entries(context.subscriptions)) {
if (key in DSC) {
DSC[key]();
delete DSC[key];
}
}
}
return undefined;
},
onSubscribe: (context, msg) => {
// Construct the execution arguments
const args = {
schema,
operationName: msg?.payload?.operationName,
document: parse(msg?.payload?.query),
variableValues: msg?.payload?.variables,
BRAD: 'HI',
};
const operationAST = getOperationAST(args.document, args.operationName);
// Stops the subscription and sends an error message
if (!operationAST) {
return [new GraphQLError('Unable to identify operation')];
}
// Handle mutation and query requests
if (operationAST.operation !== 'subscription') {
return [
new GraphQLError('Only subscription operations are supported'),
];
}
// Validate the operation document
const errors = validate(args.schema, args.document);
if (errors.length > 0) {
console.log('errors', errors);
return errors;
}
// Ready execution arguments
return args;
},
onError: (context, message, error) => {
console.log(context, message, error);
},
},
wsServer
);
httpServer.listen({ port: HOST_PORT }, () => {
console.log(
`🚀 Subscriptions ready at ws://localhost:${HOST_PORT}${wsServer.options.path}`
);
});
})();
Datasource
class LiveAlarmsDataSource extends GatewayDataSource {
constructor(gatewayUrl) {
super(gatewayUrl);
this.poll = this.poll.bind(this);
this.pollForAlarms = this.pollForAlarms.bind(this);
}
willSendRequest(request) {
const headers = this.context.connectionParams;
request.headers = headers;
// console.log(this.context);
}
updateContext(config) {
this.context = config.context;
}
async poll({
subscriptionId,
query,
publish,
interval,
previous = undefined,
}) {
try {
const response = await query();
if (previous && response.success && !isEqual(previous, response)) {
publish(response);
}
if (subscriptionId in this.context.subscriptions) {
setTimeout(this.poll, interval, {
subscriptionId,
query,
publish,
interval,
previous: response,
});
}
} catch {
if (subscriptionId in this.context.subscriptions) {
setTimeout(this.poll, interval, {
subscriptionId,
query,
publish,
interval,
previous: undefined,
});
}
}
}
async pollAgents({ subscriptionId, event }) {
const query = async () => {
try {
const ping = await this.query(GET_AGENTS);
const fetchedAgents = ping?.data?.agents;
return fetchedAgents;
} catch (error) {
console.log('apiCall error', error);
return undefined;
}
};
const publish = async response => {
pubsub.publish(event, response);
};
this.poll({
query,
publish,
interval: 2000,
subscriptionId,
});
}
async pollResponders({ subscriptionId, event }) {
const query = async () => {
try {
const ping = await this.query(GET_RESPONDERS);
const fetchedResponders = ping?.data?.responders;
return fetchedResponders;
} catch (error) {
console.log('apiCall error', error);
return undefined;
}
};
const publish = async response => {
pubsub.publish(event, response);
};
this.poll({
query,
publish,
interval: 100,
subscriptionId,
});
}
async pollForAlarms({ subscriptionId, event, alarmState }) {
const query = async () => {
try {
const ping = await this.query(GET_ALARMS_POLL, {
variables: { alarmState },
});
const fetchedAlarms = ping?.data?.alarms;
return fetchedAlarms;
} catch (error) {
console.log('apiCall error', error);
return undefined;
}
};
const publish = async () => {
try {
const ping = await this.query(GET_ALARMS, {
variables: { alarmState },
});
pubsub.publish(event, ping?.data?.alarms);
return ping?.data?.alarms;
} catch (error) {
console.log('onUpdate error', error);
}
};
this.poll({
query,
publish,
interval: 100,
subscriptionId,
});
}
}
export default LiveAlarmsDataSource;
Resolver
import { pubsub } from '../../../../redis';
import { withFilter } from 'graphql-subscriptions';
const subscribe = {
test: {
resolve(payload: any, args: any, context: any, info: any) {
return 'test';
},
subscribe(parent: any, args, context, info: any) {
return pubsub.asyncIterator('TEST');
},
},
agentsUpdated: {
resolve(payload: any, args: any, context: any, info: any) {
return { ...payload };
},
subscribe(parent: any, args, context, info: any) {
const sspId = JSON.parse(context.headers['ssp-id']);
const agentId = JSON.parse(context.headers['agent-id']);
const event = `AGENTS_UPDATED:${sspId}${agentId}`;
console.log(event);
context.dataSources.liveAlarmsDataSource.pollAgents({
subscriptionId: context.subID,
event,
});
return pubsub.asyncIterator([event]);
},
},
respondersUpdated: {
resolve(payload: any, args: any, context: any, info: any) {
return { ...payload };
},
subscribe(parent: any, args, context, info: any) {
const sspId = JSON.parse(context.headers['ssp-id']);
const agentId = JSON.parse(context.headers.agent).id;
const event = `RESPONDERS_UPDATED:${sspId}${agentId}`;
console.log(event);
context.dataSources.liveAlarmsDataSource.pollResponders({
subscriptionId: context.subID,
event,
});
return pubsub.asyncIterator([event]);
},
},
emergencyAlarms: {
resolve(payload: any, args: any, context: any, info: any) {
return { ...payload };
},
subscribe(parent: any, args, context, info: any) {
const sspId = JSON.parse(context.headers['ssp-id']);
const agentId = JSON.parse(context.headers.agent).id;
const event = `EMERGENCY_ALARMS_UPDATED:${sspId}${agentId}`;
console.log(event);
context.dataSources.liveAlarmsDataSource.pollForAlarms({
subscriptionId: context.subID,
event,
alarmState: 'EMERGENCY',
});
return pubsub.asyncIterator([event]);
},
},
enrouteAlarms: {
resolve(payload: any, args: any, context: any, info: any) {
return { ...payload };
},
subscribe(parent: any, args, context, info: any) {
const sspId = JSON.parse(context.headers['ssp-id']);
const agentId = JSON.parse(context.headers.agent).id;
const event = `ENROUTE_ALARMS_UPDATED:${sspId}${agentId}`;
context.dataSources.liveAlarmsDataSource.pollForAlarms({
subscriptionId: context.subID,
event,
alarmState: 'ENROUTE',
});
return pubsub.asyncIterator([event]);
},
},
nonEmergencyAlarms: {
resolve(payload: any, args: any, context: any, info: any) {
return { ...payload };
},
subscribe(parent: any, args, context, info: any) {
const sspId = JSON.parse(context.headers['ssp-id']);
const agentId = JSON.parse(context.headers.agent).id;
const event = `NON_EMERGENCY_ALARMS_UPDATED:${sspId}${agentId}`;
context.dataSources.liveAlarmsDataSource.pollForAlarms({
subscriptionId: context.subID,
event,
alarmState: 'NON_EMERGENCY',
});
return pubsub.asyncIterator([event]);
},
},
};
export default subscribe;