Polling Inside a Subscription

I would like to subscribe to a channel that polls for data and then when the data changes it pushed it to the client. I am getting this Redis specific error but I am wondering if this is the best way to handle this?
Error: Connection in subscriber mode, only subscriber commands may be used

import { isEqual } from 'apollo-utilities/lib/util/isEqual';

class LiveAlarmsDataSource extends GatewayDataSource {
  constructor(gatewayUrl) {
    super(gatewayUrl);
  }

  async pollForAlarms() {
    const GET_ALARMS = gql
      query getAlarms($alarmState: AlarmState) {
        alarms(alarmState: $alarmState) {
          alarms {
            id
          }
        }
      }
    `;

    let previous;

    setInterval(async () => {
      const response = await this.query(GET_ALARMS, {
        variables: { alarmState: 'EMERGENCY' },
      });

      if (isEqual(previous, response?.data)) {
        pubsub.publish('EMERGENCY_ALARMS_UPDATED', response?.data?.alarms);
      }
      previous = response?.data?.alarms;
    }, 3000);
  }
}


const subscribe = {
  emergencyAlarms: {
    resolve(payload: any, args: any, context: any, info: any) {
      return payload;
    },
    subscribe(
      parent: any,
      args: any,
      { dataSources: { liveAlarmsDataSource } }: IApolloServerContext,
      info: any
    ) {
      liveAlarmsDataSource.pollForAlarms();
      return pubsub.asyncIterator([`EMERGENCY_ALARMS_UPDATED`]);
    },
  },
};
1 Like

What error are you getting? I don’t see one in your snippet.

This is the error: Error: Connection in subscriber mode, only subscriber commands may be used
I was able to fix this by using two Redis clients. Is it fine if they are both using the same port?

export const redis = new Redis(6379, REDIS_HOST_ADDRESS);
export const redis2 = new Redis(6379, REDIS_HOST_ADDRESS);

export const pubsub = new RedisPubSub({
  publisher: redis,
  subscriber: redis,
});

export const pubsubPoll = new RedisPubSub({
  publisher: redis2,
  subscriber: redis2,
});

What if you make one connection strictly a publisher and the other connection strictly a subscriber?

I imagine redis does this because there’s no good reason why a client connected to it would want to know what it’s currently sending to redis.

1 Like

I could really use some help on this.
It is almost there. The problem now is that dynamic subscriptions seem to publish to other subscriptions.

(async () => {
  let apolloConfig;
  let schema;

  const GATEWAY_ENDPOINT = process.env.GATEWAY_ENDPOINT;
  const APOLLO_KEY = process.env.APOLLO_KEY;
  const APOLLO_GRAPH_VARIANT = process.env.APOLLO_GRAPH_VARIANT || 'current';
  const HOST_PORT = process.env.HOST_PORT;
  const isProd = process.env.NODE_ENV === 'prod';

  const DSC = {};

  const gateway = new ApolloGateway();

  gateway.onSchemaChange(gatewaySchema => {
    schema = makeSubscriptionSchema({ gatewaySchema, typeDefs, resolvers });
  });

  apolloConfig = getGatewayApolloConfig(APOLLO_KEY, APOLLO_GRAPH_VARIANT);

  await gateway.load({ ...(apolloConfig && { apollo: apolloConfig }) });

  const httpServer = http.createServer(function weServeSocketsOnly(_, res) {
    res.writeHead(200);
    res.end();
  });

  const wsServer = new ws.Server({
    server: httpServer,
    path: '/',
  });

  useServer(
    {
      execute,
      subscribe,
      connectionInitWaitTimeout: undefined,
      context: (context, message, args) => {
        const subID = message?.id;
        const headers = context.connectionParams;

        const liveAlarmsDataSource = new LiveAlarmsDataSource(GATEWAY_ENDPOINT);

        DSC[subID] = () =>
          liveAlarmsDataSource.updateContext({
            context: { subscriptions: {} },
          });

        liveAlarmsDataSource.initialize({ context, cache: false });

        const dataSources = {
          liveAlarmsDataSource: liveAlarmsDataSource,
        };

        // Return the complete context for the request
        return { dataSources, headers, subID: message?.id };
      },
      onClose: context => {
        // console.log('WS SERVER CLOSED');

        if (context?.subscriptions) {
          for (const [key, fn] of Object.entries(context.subscriptions)) {
            if (key in DSC) {
              DSC[key]();
              delete DSC[key];
            }
          }
        }

        return undefined;
      },
      onSubscribe: (context, msg) => {
        // Construct the execution arguments
        const args = {
          schema,
          operationName: msg?.payload?.operationName,
          document: parse(msg?.payload?.query),
          variableValues: msg?.payload?.variables,
          BRAD: 'HI',
        };

        const operationAST = getOperationAST(args.document, args.operationName);

        // Stops the subscription and sends an error message
        if (!operationAST) {
          return [new GraphQLError('Unable to identify operation')];
        }

        // Handle mutation and query requests
        if (operationAST.operation !== 'subscription') {
          return [
            new GraphQLError('Only subscription operations are supported'),
          ];
        }

        // Validate the operation document
        const errors = validate(args.schema, args.document);

        if (errors.length > 0) {
          console.log('errors', errors);
          return errors;
        }

        // Ready execution arguments
        return args;
      },
      onError: (context, message, error) => {
        console.log(context, message, error);
      },
    },
    wsServer
  );

  httpServer.listen({ port: HOST_PORT }, () => {
    console.log(
      `🚀 Subscriptions ready at ws://localhost:${HOST_PORT}${wsServer.options.path}`
    );
  });
})();

Datasource

class LiveAlarmsDataSource extends GatewayDataSource {
  constructor(gatewayUrl) {
    super(gatewayUrl);
    this.poll = this.poll.bind(this);
    this.pollForAlarms = this.pollForAlarms.bind(this);
  }

  willSendRequest(request) {
    const headers = this.context.connectionParams;
    request.headers = headers;

    // console.log(this.context);
  }

  updateContext(config) {
    this.context = config.context;
  }

  async poll({
    subscriptionId,
    query,
    publish,
    interval,
    previous = undefined,
  }) {
    try {
      const response = await query();

      if (previous && response.success && !isEqual(previous, response)) {
        publish(response);
      }
      if (subscriptionId in this.context.subscriptions) {
        setTimeout(this.poll, interval, {
          subscriptionId,
          query,
          publish,
          interval,
          previous: response,
        });
      }
    } catch {
      if (subscriptionId in this.context.subscriptions) {
        setTimeout(this.poll, interval, {
          subscriptionId,
          query,
          publish,
          interval,
          previous: undefined,
        });
      }
    }
  }

  async pollAgents({ subscriptionId, event }) {
    const query = async () => {
      try {
        const ping = await this.query(GET_AGENTS);
        const fetchedAgents = ping?.data?.agents;

        return fetchedAgents;
      } catch (error) {
        console.log('apiCall error', error);
        return undefined;
      }
    };

    const publish = async response => {
      pubsub.publish(event, response);
    };

    this.poll({
      query,
      publish,
      interval: 2000,
      subscriptionId,
    });
  }

  async pollResponders({ subscriptionId, event }) {
    const query = async () => {
      try {
        const ping = await this.query(GET_RESPONDERS);
        const fetchedResponders = ping?.data?.responders;

        return fetchedResponders;
      } catch (error) {
        console.log('apiCall error', error);
        return undefined;
      }
    };

    const publish = async response => {
      pubsub.publish(event, response);
    };

    this.poll({
      query,
      publish,
      interval: 100,
      subscriptionId,
    });
  }

  async pollForAlarms({ subscriptionId, event, alarmState }) {
    const query = async () => {
      try {
        const ping = await this.query(GET_ALARMS_POLL, {
          variables: { alarmState },
        });
        const fetchedAlarms = ping?.data?.alarms;

        return fetchedAlarms;
      } catch (error) {
        console.log('apiCall error', error);
        return undefined;
      }
    };

    const publish = async () => {
      try {
        const ping = await this.query(GET_ALARMS, {
          variables: { alarmState },
        });

        pubsub.publish(event, ping?.data?.alarms);
        return ping?.data?.alarms;
      } catch (error) {
        console.log('onUpdate error', error);
      }
    };

    this.poll({
      query,
      publish,
      interval: 100,
      subscriptionId,
    });
  }
}

export default LiveAlarmsDataSource;

Resolver

import { pubsub } from '../../../../redis';
import { withFilter } from 'graphql-subscriptions';

const subscribe = {
  test: {
    resolve(payload: any, args: any, context: any, info: any) {
      return 'test';
    },
    subscribe(parent: any, args, context, info: any) {
      return pubsub.asyncIterator('TEST');
    },
  },
  agentsUpdated: {
    resolve(payload: any, args: any, context: any, info: any) {
      return { ...payload };
    },
    subscribe(parent: any, args, context, info: any) {
      const sspId = JSON.parse(context.headers['ssp-id']);
      const agentId = JSON.parse(context.headers['agent-id']);
      const event = `AGENTS_UPDATED:${sspId}${agentId}`;

      console.log(event);

      context.dataSources.liveAlarmsDataSource.pollAgents({
        subscriptionId: context.subID,
        event,
      });

      return pubsub.asyncIterator([event]);
    },
  },
  respondersUpdated: {
    resolve(payload: any, args: any, context: any, info: any) {
      return { ...payload };
    },
    subscribe(parent: any, args, context, info: any) {
      const sspId = JSON.parse(context.headers['ssp-id']);
      const agentId = JSON.parse(context.headers.agent).id;
      const event = `RESPONDERS_UPDATED:${sspId}${agentId}`;

      console.log(event);

      context.dataSources.liveAlarmsDataSource.pollResponders({
        subscriptionId: context.subID,
        event,
      });

      return pubsub.asyncIterator([event]);
    },
  },
  emergencyAlarms: {
    resolve(payload: any, args: any, context: any, info: any) {
      return { ...payload };
    },
    subscribe(parent: any, args, context, info: any) {
      const sspId = JSON.parse(context.headers['ssp-id']);
      const agentId = JSON.parse(context.headers.agent).id;
      const event = `EMERGENCY_ALARMS_UPDATED:${sspId}${agentId}`;

      console.log(event);

      context.dataSources.liveAlarmsDataSource.pollForAlarms({
        subscriptionId: context.subID,
        event,
        alarmState: 'EMERGENCY',
      });

      return pubsub.asyncIterator([event]);
    },
  },
  enrouteAlarms: {
    resolve(payload: any, args: any, context: any, info: any) {
      return { ...payload };
    },
    subscribe(parent: any, args, context, info: any) {
      const sspId = JSON.parse(context.headers['ssp-id']);
      const agentId = JSON.parse(context.headers.agent).id;
      const event = `ENROUTE_ALARMS_UPDATED:${sspId}${agentId}`;

      context.dataSources.liveAlarmsDataSource.pollForAlarms({
        subscriptionId: context.subID,
        event,
        alarmState: 'ENROUTE',
      });

      return pubsub.asyncIterator([event]);
    },
  },
  nonEmergencyAlarms: {
    resolve(payload: any, args: any, context: any, info: any) {
      return { ...payload };
    },
    subscribe(parent: any, args, context, info: any) {
      const sspId = JSON.parse(context.headers['ssp-id']);
      const agentId = JSON.parse(context.headers.agent).id;
      const event = `NON_EMERGENCY_ALARMS_UPDATED:${sspId}${agentId}`;

      context.dataSources.liveAlarmsDataSource.pollForAlarms({
        subscriptionId: context.subID,
        event,
        alarmState: 'NON_EMERGENCY',
      });

      return pubsub.asyncIterator([event]);
    },
  },
};

export default subscribe;

Hey I ended up using this approach, thanks!
Do you have any ideas why my dynamic subscriptions are returning data from another.

1 Like

Is it fine if they are both using the same port?

Yes, devices open transient inbound ports when establishing connections to other systems.

The redis server, on the other hand, only listens for incoming connections on 1 port.

It’s the redis server’s job to be able to identify which connection is which, but specifying a connection ID or the like on the client can make things easier to know which client is doing what.

The problem now is that dynamic subscriptions seem to publish to other subscriptions.

Please describe what your expected behavior is and what the actual behavior is.

If you subscribe to topic A and then publish to topic A, it’s 100% expected to receive an event from topic A on your subscriber.

Most of the time, a publisher and subscriber are expected to be different devices, because in most systems there is no reason to generate an event and consume it in the same place.

1 Like

Continuing the discussion from Polling Inside a Subscription:

Thank you so much for the response, I can’t express enough how grateful I am. Feeling the pressure of a Monday push to prod.

I have attached a video demonstrating the bug.

One thing I noticed (end of video) the publish function for sub A sometimes returns the query results that are expected from sub B.

VIDEO

I followed this tutorial: Using Subscriptions with Your Federated Data Graph - Apollo GraphQL Blog

Not sure why this.query is calling the other datasource class.

I create an Apollo Client datasource for each subscription but the return from the queries is still giving me results from the other query.

import {
  ApolloClient,
  ApolloLink,
  createHttpLink,
  DefaultOptions,
  DocumentNode,
  InMemoryCache,
} from '@apollo/client';
import { onError } from 'apollo-link-error';
import {
  ApolloError,
  AuthenticationError,
  ForbiddenError,
} from 'apollo-server-errors';
import fetch from 'node-fetch';

declare interface GlobalFetch {
  fetch(input: RequestInfo, init?: RequestInit): Promise<Response>;
}

const defaultOptions: DefaultOptions = {
  query: {
    // errorPolicy: 'all',
    fetchPolicy: 'no-cache',
  },
  watchQuery: {
    // errorPolicy: 'ignore',
    fetchPolicy: 'no-cache',
  },
};

export abstract class GraphQLDataSource<TContext = any> {
  protected client: ApolloClient<unknown>;

  public context!: TContext;

  protected constructor(context: any, uri: string) {
    this.context = context;

    this.client = new ApolloClient({
      cache: new InMemoryCache(),
      defaultOptions,
      link: this.buildLinks(uri, context),
    });
  }

  updateContext(config) {
    this.context = config.context;
  }

  // eslint-disable-next-line class-methods-use-this
  private buildLinks(uri, context): ApolloLink {
    return ApolloLink.from([
      // this.onErrorLink(),
      // this.onRequestLink(),
      createHttpLink({
        fetch: fetch as unknown as GlobalFetch['fetch'],
        headers: {
          ...context.connectionParams,
        },
        uri,
      }),
    ]);
  }

  public async query(query: DocumentNode, options?: Record<string, any>) {
    try {
      const response = await this.client.query({ ...options, query });
      return response;
    } catch (error) {
      this.didEncounterError(error);
    }

    return undefined;
  }

  // eslint-disable-next-line class-methods-use-this
  private didEncounterError(error: any) {
    const status = error.statusCode ? error.statusCode : null;
    const message = error.bodyText ? error.bodyText : null;

    let apolloError: ApolloError;

    switch (status) {
      case 401:
        apolloError = new AuthenticationError(message);
        break;
      case 403:
        apolloError = new ForbiddenError(message);
        break;
      case 502:
        apolloError = new ApolloError('Bad Gateway', status);
        break;
      default:
        apolloError = new ApolloError(message, status);
    }

    throw apolloError;
  }

  // eslint-disable-next-line class-methods-use-this
  private onErrorLink() {
    return onError(({ graphQLErrors, networkError }) => {
      if (graphQLErrors) {
        graphQLErrors.map(graphqlError =>
          console.error(`[GraphQL error]: ${graphqlError.message}`)
        );
      }

      if (networkError) {
        console.log(`[Network Error]: ${networkError}`);
      }
    });
  }
}

POLL FN

import { isEqual } from 'apollo-utilities/lib/util/isEqual';

import { pubsub } from '../redis';

import { GraphQLDataSource } from './GraphQLDataSource';
import { GET_ALARMS, GET_ALARMS_POLL } from './gql/alarm';
import { GET_RESPONDERS } from './gql/responder';
import { GET_AGENTS } from './gql/agent';

export class SEONDatasource extends GraphQLDataSource {
  public constructor({ context, uri }) {
    super(context, uri);
    this.poll = this.poll.bind(this);
  }

  private async poll({
    subscriptionId,
    query,
    publish,
    interval,
    previous = undefined,
    event,
  }) {
    try {
      const response = await query();
      console.log('poll', event, response?.total);

      if (previous && response.success && !isEqual(previous, response)) {
        publish(response);
      }

      if (subscriptionId in this.context.subscriptions) {
        setTimeout(this.poll, interval, {
          event,
          interval,
          previous: response,
          publish,
          query,
          subscriptionId,
        });
      }
    } catch {
      if (subscriptionId in this.context.subscriptions) {
        setTimeout(this.poll, interval, {
          event,
          interval,
          previous: undefined,
          publish,
          query,
          subscriptionId,
        });
      }
    }
  }

  async pollForAlarms({ alarmState, event, subscriptionId }) {
    const query = async () => {
      try {
        const ping = await this.query(GET_ALARMS_POLL, {
          variables: { alarmState, event },
        });

        const fetchedAlarms = ping?.data?.alarms;

        return fetchedAlarms;
      } catch (error) {
        console.log('apiCall error', error);
        return undefined;
      }
    };

    // eslint-disable-next-line consistent-return
    const publish = async () => {
      try {
        const ping = await this.query(GET_ALARMS, {
          variables: { alarmState, event },
        });

        pubsub.publish(event, ping?.data?.alarms);
        return ping?.data?.alarms;
      } catch (error) {
        console.log('onUpdate error', error);
        return undefined;
      }
    };

    this.poll({
      event,
      interval: 7000,
      publish,
      query,
      subscriptionId,
    });
  }
}

Then looking at this in the poll function: console.log(‘poll’, event, response?.total);
event: EMERGENCY_ALARMS_UPDATED:123123 should only return 2 alarms not 200

It logs this:

poll EMERGENCY_ALARMS_UPDATED:123123 200
poll EMERGENCY_ALARMS_UPDATED:123123 2
poll EMERGENCY_ALARMS_UPDATED:159159 200
poll EMERGENCY_ALARMS_UPDATED:123123 200
poll EMERGENCY_ALARMS_UPDATED:159159 200
poll EMERGENCY_ALARMS_UPDATED:123123 2
poll EMERGENCY_ALARMS_UPDATED:123123 2
poll EMERGENCY_ALARMS_UPDATED:159159 200
poll EMERGENCY_ALARMS_UPDATED:123123 2
poll EMERGENCY_ALARMS_UPDATED:159159 200
poll EMERGENCY_ALARMS_UPDATED:123123 2
poll EMERGENCY_ALARMS_UPDATED:123123 200
poll EMERGENCY_ALARMS_UPDATED:159159 200
poll EMERGENCY_ALARMS_UPDATED:123123 2

This is how the datasources are initialised


  useServer(
    {
      connectionInitWaitTimeout: undefined,
      context: (context, message, args) => {
        
        const headers = context.connectionParams;

        const seonDS = new SEONDatasource({ context, uri: GATEWAY_ENDPOINT });

        DSC[subID] = () =>
          SEONDatasource.updateContext({
            context: { subscriptions: {} },
          });

        const dataSources = {
          seonDS,
        };

 
        return { dataSources, headers, subID };
      },
      execute,
      onClose: context => {
        // console.log('WS SERVER CLOSED');

        if (context?.subscriptions) {
          // eslint-disable-next-line @typescript-eslint/no-unused-vars
          // eslint-disable-next-line no-restricted-syntax
          for (const [key] of Object.entries(context.subscriptions)) {
            if (key in DSC) {
              DSC[key]();
              delete DSC[key];
            }
          }
        }

        return undefined;
      },
      onError: (context, message, error) => {
        console.log('onError', error);
      },
      onSubscribe: (context, msg) => {
        // Construct the execution arguments
        const args = {
          document: parse(msg?.payload?.query),
          operationName: msg?.payload?.operationName,
          schema,
          variableValues: msg?.payload?.variables,
        };

        const operationAST = getOperationAST(args.document, args.operationName);

        // Stops the subscription and sends an error message
        if (!operationAST) {
          return [new GraphQLError('Unable to identify operation')];
        }

        // Handle mutation and query requests
        if (operationAST.operation !== 'subscription') {
          return [
            new GraphQLError('Only subscription operations are supported'),
          ];
        }

        // Validate the operation document
        const errors = validate(args.schema, args.document);

        if (errors.length > 0) {
          console.log('errors', errors);
          return errors;
        }

        // Ready execution arguments
        return args;
      },
      subscribe,
    },
    wsServer,



RestDatasource in the alarms graph needed to be changed
from

const dataSources: DataSources<IDataSources> = {
  seonRestAPI: new SeonRestAPI(SEON_RESTAPI_BASEURL),
};

TO

const dataSources = (): DataSources<IDataSources> => ({
  seonRestAPI: new SeonRestAPI(SEON_RESTAPI_BASEURL),
});

:pensive: