Need Help Connecting to Subscription Resolvers

My ApolloServer setup is working great for queries and mutations. It’s successfully sending my userId and pubsub in context.

But for some reason, it isn’t connecting to my Subscription Resolvers.

Here’s my Subscription Resolver:

    Subscription: {
        IncomingMessage_Subscription: {
            subscribe: withFilter(
                (parent, args, context, info) => {
                    console.log(`subscribed to incoming messages for ${context.userId}`);
                    return context.pubsub.asyncIterator(INCOMING_MESSAGE_CHANNEL)
                },
                (objectToPublish, args, context) => {
                    if (typeof (objectToPublish) === 'undefined') {
                        return false;
                    }

That console.log message has always printed, showing that my app is connecting to the Subscription Resolver.

But my new ApolloServer code, isn’t triggering that. Here is my ApolloServer setup:

// npm install @apollo/server express graphql cors
import { ApolloServer } from '@apollo/server';
import { ExpressContextFunctionArgument, expressMiddleware } from '@apollo/server/express4';
import { ApolloServerPluginDrainHttpServer } from '@apollo/server/plugin/drainHttpServer';
import express from 'express';
import http from 'http';
import { WebSocketServer } from 'ws';
import { useServer } from 'graphql-ws/lib/use/ws';
import bodyParser from 'body-parser';
import cors from 'cors';
import { readFileSync } from 'fs';
import path from 'path';
import * as url from 'url';
import mongoose from 'mongoose';
import { makeExecutableSchema } from '@graphql-tools/schema';
import { createServer } from 'http';
import { MongodbPubSub } from 'graphql-mongodb-subscriptions';
import {getUserIdByLoginToken} from "./utils_server/meteor-apollo-utils";
import typeDefs from "./api/schema";
import {resolvers} from "./api/resolvers";


const PORT = 4000;
const MONGODB_URI = `mongodb://localhost:3001/meteor`;

const connectToDb = async () => {
    await mongoose.connect(MONGODB_URI);
};

await connectToDb();
console.log('🎉 Connected to database successfully');

const mongodbpubsub = new MongodbPubSub({
    connectionDb: mongoose.connections[0].db
});


// Create schema, which will be used separately by ApolloServer and
// the WebSocket server.
const schema = makeExecutableSchema({
    typeDefs,
    resolvers
});

// Create an Express app and HTTP server; we will attach the WebSocket
// server and the ApolloServer to this HTTP server.
const app = express();
const httpServer = createServer(app);

// Set up WebSocket server.
const wsServer = new WebSocketServer({
    server: httpServer,
    path: '/graphql'
});
const serverCleanup = useServer({
    schema,
    context: ({req})  => {
        return { pubsub: mongodbpubsub };
    }
}, wsServer);

// Set up ApolloServer.
const server = new ApolloServer({
    schema,
    plugins: [
        // Proper shutdown for the HTTP server.
        ApolloServerPluginDrainHttpServer({ httpServer }),

        // Proper shutdown for the WebSocket server.
        {
            async serverWillStart() {
                return {
                    async drainServer() {
                        await serverCleanup.dispose();
                    }
                };
            }
        }
    ]
});

await server.start();
app.use(
    '/graphql',
    cors(),
    bodyParser.json(),
    expressMiddleware(server, {
        // Adding a context property lets you add data to your GraphQL operation contextValue
        // @ts-ignore
        context: async (ctx, msg, args) => {
            // You can define your own function for setting a dynamic context
            // or provide a static value
            // return getDynamicContext(ctx, msg, args);
            let token = ctx.req.headers['token']
            let user = null;
            let userId = null;
            try {
                if (token !== "null") {
                    [user, userId] = await getUserIdByLoginToken(token);
                }
            } catch (error) {
                console.log('context: ', error)
            }

            let clientIp = '';
            try{
                clientIp = ctx.req.headers['x-forwarded-for'] || ctx.req.connection.remoteAddress;
            }
            catch{
                console.log("Couldn't get clientIp in ctx.req")
            }

            return {
                user: user,
                userId: userId,
                clientIp: clientIp,
                pubsub: mongodbpubsub
            };
        }
    }));

// Now that our HTTP server is fully set up, actually listen.
httpServer.listen(PORT, () => {
    console.log(`🚀 Query endpoint ready at http://localhost:${PORT}/graphql`);
    console.log(`🚀 Subscription endpoint ready at ws://localhost:${PORT}/graphql`);
});



// https://github.com/mjwheatley/graphql-mongodb-subscriptions/issues/43
// https://github.com/mjwheatley/apollo-graphql-mongodb/blob/main/src/index.ts
// https://www.apollographql.com/docs/apollo-server/migration/#migrate-from-apollo-server-express
// …in the section following the text:
//     The context function's syntax is similar for the expressMiddleware function:
// https://www.apollographql.com/docs/apollo-server/data/subscriptions
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries

…and here is my ApolloClient setup:

import { ApolloClient, HttpLink, InMemoryCache, ApolloLink, ApolloProvider } from "@apollo/client";

// Create an HttpLink pointing to your GraphQL endpoint
const httpLink = new HttpLink({ uri: 'http://localhost:4000/graphql' });

// Middleware to add custom headers
const customHeadersMiddleware = new ApolloLink((operation, forward) => {
    // Define your custom headers
    const customHeaders = {
        "token": localStorage.getItem("Meteor.loginToken")
    };

    // Use operation.setContext to add the custom headers to the request
    operation.setContext(({ headers }) => ({
        headers: {
            ...headers,
            ...customHeaders,
        },
    }));

    return forward(operation);
});

// Combine the middleware with the HttpLink
const apolloClient = new ApolloClient({
    link: customHeadersMiddleware.concat(httpLink),
    cache: new InMemoryCache(),
});

export {apolloClient};

I have a feeling it’s just some minor detail I’m leaving out.

What am I missing?

I just tried again with a duplicate of the ApolloServer setup code provided at Subscriptions in Apollo Server, with the addition only of adding some context variables. But it is not yet connecting to my subscription resolvers. Am I using mismatching libraries of some kind?

import { ApolloServer } from '@apollo/server';
import { expressMiddleware } from '@apollo/server/express4';
import { ApolloServerPluginDrainHttpServer } from '@apollo/server/plugin/drainHttpServer';
import { createServer } from 'http';
import express from 'express';
import { makeExecutableSchema } from '@graphql-tools/schema';
import { WebSocketServer } from 'ws';
import { useServer } from 'graphql-ws/lib/use/ws';
import cors from 'cors';
import { MongodbPubSub } from 'graphql-mongodb-subscriptions';
import {getUserIdByLoginToken} from "./utils_server/meteor-apollo-utils";
import typeDefs from "./api/schema";
import {resolvers} from "./api/resolvers";
import mongoose from "mongoose";
import bodyParser from "body-parser";

const MONGODB_URI = `mongodb://localhost:3001/meteor`;

const connectToDb = async () => {
    await mongoose.connect(MONGODB_URI);
};

await connectToDb();
console.log('🎉 Connected to database successfully');

const mongodbpubsub = new MongodbPubSub({
    connectionDb: mongoose.connections[0].db
});
// Create the schema, which will be used separately by ApolloServer and
// the WebSocket server.
const schema = makeExecutableSchema({ typeDefs, resolvers });

// Create an Express app and HTTP server; we will attach both the WebSocket
// server and the ApolloServer to this HTTP server.
const app = express();
const httpServer = createServer(app);

// Create our WebSocket server using the HTTP server we just set up.
const wsServer = new WebSocketServer({
    server: httpServer,
    path: '/subscriptions',
});
// Save the returned server's info so we can shutdown this server later
const serverCleanup = useServer({ schema }, wsServer);

// Set up ApolloServer.
const server = new ApolloServer({
    schema,
    plugins: [
        // Proper shutdown for the HTTP server.
        ApolloServerPluginDrainHttpServer({ httpServer }),

        // Proper shutdown for the WebSocket server.
        {
            async serverWillStart() {
                return {
                    async drainServer() {
                        await serverCleanup.dispose();
                    },
                };
            },
        },
    ],
});

await server.start();
app.use(
    '/graphql',
    cors(),
    bodyParser.json(),
    expressMiddleware(server, {
        // Adding a context property lets you add data to your GraphQL operation contextValue
        // @ts-ignore
        context: async (ctx, msg, args) => {
            // You can define your own function for setting a dynamic context
            // or provide a static value
            // return getDynamicContext(ctx, msg, args);
            let token = ctx.req.headers['token']
            let user = null;
            let userId = null;
            try {
                if (token !== "null") {
                    [user, userId] = await getUserIdByLoginToken(token);
                }
            } catch (error) {
                console.log('context: ', error)
            }

            let clientIp = '';
            try{
                clientIp = ctx.req.headers['x-forwarded-for'] || ctx.req.connection.remoteAddress;
            }
            catch{
                console.log("Couldn't get clientIp in ctx.req")
            }

            return {
                user: user,
                userId: userId,
                clientIp: clientIp,
                pubsub: mongodbpubsub
            };
        }
    }));

const PORT = 4000;
// Now that our HTTP server is fully set up, we can listen to it.
httpServer.listen(PORT, () => {
    console.log(`Server is now running on http://localhost:${PORT}/graphql`);
});



// https://github.com/mjwheatley/graphql-mongodb-subscriptions/issues/43
// https://github.com/mjwheatley/apollo-graphql-mongodb/blob/main/src/index.ts
// https://www.apollographql.com/docs/apollo-server/migration/#migrate-from-apollo-server-express
// …in the section following the text:
//     The context function's syntax is similar for the expressMiddleware function:
// https://www.apollographql.com/docs/apollo-server/data/subscriptions
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries

@hwillson I would appreciate your guidance!

I may have learned something. I have subscriptions on websockets and queries/mutations on http. So I need to tweak my client setup with a link split. So far I’ve only been able to find old code examples, so this is what I have so far:

ApolloClient Setup

import { ApolloClient, InMemoryCache, ApolloLink, ApolloProvider } from "@apollo/client";

import { split, HttpLink } from '@apollo/client';
import { getMainDefinition } from '@apollo/client/utilities';
import { GraphQLWsLink } from '@apollo/client/link/subscriptions';
import { createClient } from 'graphql-ws';

const httpLink = new HttpLink({
    uri: 'http://localhost:4000/graphql'
});

const wsLink = new GraphQLWsLink(createClient({
    url: 'ws://localhost:4000/subscriptions',
}));

// The split function takes three parameters:
//
// * A function that's called for each operation to execute
// * The Link to use for an operation if the function returns a "truthy" value
// * The Link to use for an operation if the function returns a "falsy" value
const splitLink = split(
    ({ query }) => {
        const definition = getMainDefinition(query);
        return (
            definition.kind === 'OperationDefinition' &&
            definition.operation === 'subscription'
        );
    },
    wsLink,
    httpLink,
);

// Middleware to add custom headers
const customHeadersMiddleware = new ApolloLink((operation, forward) => {
    // Define your custom headers
    const customHeaders = {
        "token": localStorage.getItem("Meteor.loginToken")
    };

    // Use operation.setContext to add the custom headers to the request
    operation.setContext(({ headers }) => ({
        headers: {
            ...headers,
            ...customHeaders,
        },
    }));

    return forward(operation);
});

// Combine the middleware with the HttpLink
const apolloClient = new ApolloClient({
    link: customHeadersMiddleware.concat(httpLink),
    cache: new InMemoryCache(),
});

export {apolloClient};

It’s not working yet. Could someone please advise on how to fix it?

Also, in case it’s relevant, here is my latest server setup:

ApolloServer

import { ApolloServer } from '@apollo/server';
import { expressMiddleware } from '@apollo/server/express4';
import { ApolloServerPluginDrainHttpServer } from '@apollo/server/plugin/drainHttpServer';
import { createServer } from 'http';
import express from 'express';
import { makeExecutableSchema } from '@graphql-tools/schema';
import { WebSocketServer } from 'ws';
import { useServer } from 'graphql-ws/lib/use/ws';
import cors from 'cors';
import { MongodbPubSub } from 'graphql-mongodb-subscriptions';
import {getUserIdByLoginToken} from "./utils_server/meteor-apollo-utils";
import typeDefs from "./api/schema";
import {resolvers} from "./api/resolvers";
import mongoose from "mongoose";
import bodyParser from "body-parser";

const MONGODB_URI = `mongodb://localhost:3001/meteor`;

const connectToDb = async () => {
    await mongoose.connect(MONGODB_URI);
};

await connectToDb();
console.log('🎉 Connected to database successfully');

const mongodbpubsub = new MongodbPubSub({
    connectionDb: mongoose.connections[0].db
});
// Create the schema, which will be used separately by ApolloServer and
// the WebSocket server.
const schema = makeExecutableSchema({ typeDefs, resolvers });

// https://www.youtube.com/watch?v=AcZ5dcYMwA4
const app = express();
const httpServer = createServer(app);
const server = new WebSocketServer({
// port: 4000,
    path: "/graphql",
    server: httpServer,
});

const serverCleanUp = useServer({schema}, server);
const apolloServer = new ApolloServer({
    schema,
    plugins: [
        ApolloServerPluginDrainHttpServer({httpServer}),
        {
            async serverWillStart() {
                return {
                    async drainServer() {
                        await serverCleanUp.dispose();
                    }
                }
            }
        }
    ],
    subscriptions: true
});

await apolloServer.start();

app.use(
    "/graphql",
    cors({
        origin: "http://localhost:3000",
        credentials: true,
    }),
    express.json(),
    expressMiddleware(apolloServer, {
        context: async (ctx, msg, args) => {
            // You can define your own function for setting a dynamic context
            // or provide a static value
            // return getDynamicContext(ctx, msg, args);
            let token = ctx.req.headers['token']
            let user = null;
            let userId = null;
            try {
                if ((!!token) && (token !== "null")) {
                    [user, userId] = await getUserIdByLoginToken(token);
                }
            } catch (error) {
                console.log('context: ', error)
            }

            let clientIp = '';
            try{
                clientIp = ctx.req.headers['x-forwarded-for'] || ctx.req.connection.remoteAddress;
            }
            catch{
                console.log("Couldn't get clientIp in ctx.req")
            }

            return {
                user: user,
                userId: userId,
                clientIp: clientIp,
                pubsub: mongodbpubsub
            };
        }
    })
);

await new Promise((resolve) => httpServer.listen({port: 4000}, resolve));

const PORT = 4000;
console.log(`Server is now running on http://localhost:${PORT}/graphql`);

// Now that our HTTP server is fully set up, we can listen to it.
// httpServer.listen(PORT, () => {
//     console.log(`Server is now running on http://localhost:${PORT}/graphql`);
// });


// https://github.com/mjwheatley/graphql-mongodb-subscriptions/issues/43
// https://github.com/mjwheatley/apollo-graphql-mongodb/blob/main/src/index.ts
// https://www.apollographql.com/docs/apollo-server/migration/#migrate-from-apollo-server-express
// …in the section following the text:
//     The context function's syntax is similar for the expressMiddleware function:
// https://www.apollographql.com/docs/apollo-server/data/subscriptions
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries