logo
Published on

File upload using NATS JetStream in 5 minutes

Authors

TLDR: you can use JetStream Object Storage to upload, mange versions, and delete files on the client and server in a very simple way.


For some reason, I have always been afraid from any kind of file uploading. I have always thought that it is a very complex process. But it is not. It is very simple. In this article, I will show how to upload a file using NATS JetStream Object Storage in 5 minutes.

first things first

  • Assumptions: using NATS.ws for client/server communications instead of http, why?
  • Benifits: Simply, Jetstream ObjectStore provides an easy to use client AND server APIs to deal with files. It is a very simple API that you can use to upload, download, delete, and list files.

So..

updated useNats.ts which creates the nats cliet with some helpers to access JetStream, KV, and ObjectStore

import {
  connect,
  headers,
  JetStreamClient,
  JSONCodec,
  jwtAuthenticator,
  KV,
  NatsConnection,
  ObjectStore,
} from "nats.ws";
import { useCallback, useEffect, useState } from "react";

export const useNATS = (authApiUrl: string) => {
  const [nats, setNATS] = useState<NatsConnection | undefined>(undefined);
  const [natsConnected, setNATSConnected] = useState(false);
  const [natsError, setNATSError] = useState<any>(undefined);
  const [loading, setLoading] = useState(false);

  const connectNATS = useCallback(async () => {
    if (natsConnected) {
      return;
    }

    try {
      console.log("connecting to NATS...");

      const resp = await fetch(authApiUrl);

      const { jwt, natsUrl } = await resp.json();

      const authenticator = jwtAuthenticator(jwt);

      const nc = await connect({
        servers: natsUrl,
        authenticator,
      });

      setNATS(nc);
      setNATSConnected(true);

      console.log("NATS connected:", natsUrl);
    } catch (error: any) {
      setNATSError(error);
      console.log("NATS error ==> ", error);
    }
  }, [natsConnected]);

  useEffect(() => {
    connectNATS();
  }, []);

  async function request<R, S>(
    topic: string,
    data: R,
    headersObj: object = {}
  ): Promise<S | Error> {
    if (!nats || !natsConnected) {
      return new Error("nats not connected !!");
    }

    const h = headers();

    for (const [key, value] of Object.entries(headersObj)) {
      h.append(key, value);
    }

    console.time("request: " + topic);
    const jc = JSONCodec<R>();
    try {
      const res = await nats.request(topic, jc.encode(data), {
        // todo: move to config
        timeout: 3000,
        headers: h,
      });
      console.timeEnd("request: " + topic);
      return jc.decode(res.data) as unknown as S;
    } catch (error: any) {
      console.timeEnd("request: " + topic);

      return error;
    }
  }

  function parseJson<T>(data: Uint8Array): T | Error {
    const jc = JSONCodec<T>();
    try {
      return jc.decode(data) as unknown as T;
    } catch (error: any) {
      return error;
    }
  }

  async function publish(topic: string, data: any, headersObj: object = {}) {
    if (!nats || !natsConnected) {
      return;
    }

    console.time("publish: " + topic);
    const jc = JSONCodec();

    const h = headers();

    for (const [key, value] of Object.entries(headersObj)) {
      h.append(key, value);
    }

    nats.publish(topic, jc.encode(data), {
      headers: h,
    });
    console.timeEnd("publish: " + topic);
  }

  async function subscribe(
    topic: string,
    callbackFn: (err: any, data: any) => void
  ) {
    if (!nats || !natsConnected) {
      return;
    }

    nats.subscribe(topic, {
      callback: (err, msg) => {
        const jc = JSONCodec();
        const data = jc.decode(msg.data);
        callbackFn(err, data);
      },
    });
  }

  function JetStream(): JetStreamClient | Error {
    if (!nats || !natsConnected) {
      return new Error("nats not connected !!");
    }

    return nats.jetstream();
  }

  // todo: move to a separate hook
  async function KV(bucket: string): Promise<KV | Error> {
    if (!nats || !natsConnected) {
      return new Error("nats not connected !!");
    }
    const js = JetStream();

    if (js instanceof Error) {
      return js;
    }

    return js.views.kv(bucket);
  }

  // todo: move to a separate hook
  async function ObjectStore(bucket: string): Promise<ObjectStore | Error> {
    const js = JetStream();

    if (js instanceof Error) {
      return js;
    }

    return js.views.os(bucket);
  }

  return {
    nats,
    JetStream,
    KV,
    ObjectStore,
    natsConnected,
    natsError,
    request,
    publish,
    subscribe,
    parseJson,
    loading,
  };
};

Client side react data upload component (push to object store)

const { natsConnected, request, publish, nats, JetStream, KV, ObjectStore, parseJson } =
  useContext(NatsContext)

async function handleFileUploadFormSubmit(data: any) {
  const { file } = data

  const bucket = 'test-bucket'

  const reader = new FileReader()

  reader.onloadend = async () => {
    const fileContent = reader.result

    const os = await ObjectStore(bucket)

    if (os instanceof Error) {
      return
    }

    const name = file.name.replace(/\.[^/.]+$/, '') + '-' + Date.now()

    const meta = {
      'Content-Type': file.type,
      name,
    }

    const stream = stringToReadableStream(fileContent)

    const result = await os.put(meta, stream)
    console.log({ result })
  }

  reader.readAsText(file)
}

// return html with a file upload form

return (
  <div>
    <form onSubmit={handleFileUploadFormSubmit}>
      <div>
        <input type="file" />
      </div>
      <div>
        <button type="submit">Upload</button>
      </div>
    </form>
  </div>
)

Then the magic happens on the server, (Go in my case)


js, err := NatsClient.JetStream()
	if err != nil {
		util.NotifyError(err, ServiceName)
		return
	}

	// Obtain a kv bucket
	os, err := js.ObjectStore(bucket)
	if err != nil {
		util.NotifyError(err, ServiceName)
		return
	}

	valBytes, err := os.GetBytes(key)

	if err != nil {
		return
	}

The beauty of using NATS.WS with Jetstream materialized views (KV, Object Store) is that we can create reactive services that lives across the transboundries of client and server, so once the file is pushed to the object store on the client, the server can WATCH for the Object store bucket and react to any changes on the bucket.

in the next blog i will discuss the implementaion on the row couner in the video and a simple yet powerful JobQueue using JetStream KV buckets.

Peace ✌️