Hero image for post.

Building a fault-tolerant automation system

November 21, 2024

One of the features we built into Ambit is a powerful automation feature to enable event-driven actions within the platform.

Automations are actions performed automatically when certain other actions take place (when A happens, then do B).

Here's how we built simple, robust, and scalable automation system.

Requirements

First, let's identify the requirements.

The automation system should be:

  1. Resilient

We don't want an incomplete/half-finished automation state.

Automations can vary in length, duration, and complexity.

We need to guarantee that they complete, considering worst-case scenarios like the API crashing halfway through.

  1. Modular

Reduce redundant code, make it extensible to add additional step types/actions.

  1. Observable

The underlying state should be simple to understand and easily queried.

AWS

Starting with requirement #1, we'll achieve this by using serverless AWS components that scale on their own and require little to no maintenance.

The most important things to keep in mind:

  • Automation execution should be asynchronous — it should happen in the background, and shouldn't affect request-response lifecycle of the initial request.
  • In the case our API crashes right as an automation is kicked off, the automation should still carry out when the server is restored.

To achieve this, we'll build an event-driven system by combining three AWS products: SNS, SQS, and Lambda.

SNS

SNS is a fast and cost-effective notification or pub/sub system.

It lets us quickly and cheaply publish messages, and connect various subscribers.

It will have no problem processing large numbers of automations.

However, SNS on its own isn't enough to ensure resilience.

It doesn't retain messages after they're sent.

Meaning, if our server goes down in the middle of an automation run, the notification will try to reach our down server, fail, and be lost.

We can account for this by combining it with a queue.

SQS

SQS is a serverless queue.

It integrates nicely with SNS, storing notifications and retrying them in the event of failure.

It's a simple and cost-effective way of adding retry capability to our notifications.

Lambda

Lambda is AWS's serverless function offering.

We'll create a Lambda function that subscribes to the queue and calls back to our API with notification data.

Setting up AWS

SNS

Connecting to SNS from the backend:

Wrapper around the AWS SDK:

class SnsService {
  client: SNSClient;

  constructor(private readonly configService: ConfigService) {
    this.client = new SNSClient({
      credentials: {
        accessKeyId: configService.get("AWS_ACCESS_KEY_ID"),
        secretAccessKey: configService.get("AWS_SECRET_ACCESS_KEY"),
      },
      region: configService.get("AWS_REGION"),
    });
  }

  async publish(data: Omit<PublishCommandInput, "TopicArn">) {
    const response = await this.client.send(
      new PublishCommand({
        ...data,
        TopicArn: this.configService.get("AWS_SNS_NOTIFICATIONS_TOPIC_ARN"),
      })
    );
    return response;
  }
}

A nicer interface around the SNS client to publish notifications with typing support:

class NotificationsService {
  constructor(private readonly snsService: SnsService) {}

  async publish<T extends NotificationType>(
    workspaceId: string,
    type: T,
    data: NotificationPayloads[T]
  ) {
    return await this.snsService.publish({
      MessageAttributes: {
        workspaceId: {
          DataType: "String",
          StringValue: workspaceId,
        },
        type: {
          DataType: "String",
          StringValue: type,
        },
      },
      Message: JSON.stringify(data),
    });
  }
}

Now we can call it like this to publish to SNS.

await this.notificationsService.publish(
  workspaceId,
  "TAG_ASSIGN",
  {
    contactId: "ecf7f7fd-1de8-4b50-88ea-f688b300129a",
    tagId: "c1612cc1-dc1d-4d20-82e9-3079b1e5d61f"
  }
);

Lambda

The Lambda script is simple.

A batch of SQS messages is available at event.Records.

We'll just send them as-is to our backend and process them there.

export const handler = async (event) => {
    const response = await fetch("https://myapp.example.com/notifications/track", {
    method: "POST",
    headers: {
      "Content-type": "application/json",
    },
    body: JSON.stringify(event.Records),
  });

  if (response.ok) {
    return response;
  } else {
    throw new Error("An error occurred while processing batch.");
  }
};
  • If Lambda returns a value, it's considered a success and the SQS batch will be deleted.

  • If it throws an error, it's considered an error and the SQS batch will be retried.

We return the response we get from our backend, so if the request times out or we get an error code, the batch will be retried.

Putting it together

A high-level overview of how data will move through AWS.

Notification flow through AWS services

Automation flow

Now let's drive application actions via these cloud services.

An overview of the flow:

Automation logic flow


  1. Business operations publish notifications to SNS.
  2. SNS notifications are queued and re-emitted to API.
  3. When a notification is received, check for automations that should trigger.
  4. If automation(s) are found, begin new automation runs, execute the first action, and emit an AUTOMATION_PROGRESS_SUCCESS notification.
  5. When an AUTOMATION_PROGRESS_SUCCESS notification is received, execute the next action, and emit another AUTOMATION_PROGRESS_SUCCESS notification.
  6. Repeat step 5 until the last action has been executed.

Database schema

We'll define our business entities like this:

  1. Automations

    Top-level organizational unit.

  2. Automation steps

    Individual steps for each automation.

  3. Automation runs

    Individual instances of when the automation was executed.

Automations

Table automations

Column nameTypeDescription
iduuidPrimary key.
workspace_iduuidForeign key on containing workspace.
nametextDescriptive front-end facing name.
is_activebooleanToggle whether automation is able to be run.
versionintegerGets incremented when automation is re-built.
created_atdatetimeCreation timestamp.
updated_atdatetimeLast updated timestamp.

Automation Steps

Table automation_steps

Stores linked steps within an automation.

Each automation has a single trigger, and one or more actions.

A step is a trigger if:

  • parent_automation_step_id is null.
  • trigger_type is defined.

A step is an action if:

  • parent_automation_step_id is defined.
  • trigger_type is null.
  • action_type is defined.
Column nameTypeDescription
iduuidPrimary key.
automation_iduuidForeign key on containing automation.
parent_automation_step_iduuidRecursive foreign key on the parent automation step.
trigger_typetextDefined if this step is the trigger, which kicks off the automation. Else null.
action_typetextDefined if this step is an action that performs work. Else null.
inputsjsonbParameters for the executed code.
versionintegerAt time of creation gets the value of the parent automation's version.
indexintegerTracks ordering of steps in chain.

Automation Runs

Table automation_runs

Column nameTypeDescription
iduuidPrimary key.
contact_iduuidForeign key on the contact that kicked off the automation.
automation_iduuidForeign key on the automation this run is for.
automation_step_iduuidForeign key for the automation step this run is currently on.
statustextTracks run completion status.
created_atdatetimeCreation timestamp.
updated_atdatetimeLast updated timestamp.

Writing some code

The backend logic is initiated when the Lambda callback endpoint is hit.

This is what the handler looks like:

async handle<T extends NotificationType>(data: {
    workspaceId: string;
    type: T;
    payload: NotificationPayloads[T];
    messageId: string;
}) {
    if (this.triggerExistsForNotificationType(data.type)) {
        const triggerType = this.mapNotificationsToTriggers[data.type];

        const contactId = this.extractContactIdFromNotification[data.type](
            data.payload,
        );

        const openAutomationTriggers = await this.findAutomationsByTrigger(
            data.workspaceId,
            triggerType,
        );

        for (const automation of openAutomationTriggers) {
            await this.beginAutomationRun(
                data.workspaceId,
                contactId,
                automation.automation_id,
            );
        }
    } else {
        switch (data.type) {
            case "AUTOMATION_PROGRESS_SUCCESS":
                await this.executeAutomationRun(
                    data.workspaceId,
                    (data.payload as AutomationProgressSuccessPayload)
                        .automationRunId,
                );
                break;
            default:
                break;
        }
    }
}

Notification handler


  1. Map notification types to automation trigger types. Check if the incoming notification type maps to an automation type.
  2. If yes, retrieve the automation trigger type and extract the ID of the contact for whom the action was performed.
  3. Find automations that are valid for starting a new run.
  4. Begin a new automation run for each.
  5. If no at 2, check if the notification is driving an in-progress automation run.
  6. If the notification is of type AUTOMATION_PROGRESS_SUCCESS, continue the in-progress automation run.

mapNotificationsToTriggers: Record<NotificationType, AutomationTriggerType> = {
  CONTACT_ADD: "CONTACT_ADD",
  TAG_ASSIGN: "TAG_ASSIGN",
  CONTACT_PIPELINE_MOVE: "STAGE_ENTER",
  CONTACT_PIPELINE_ENTER: "PIPELINE_ENTER",
  CONTACT_PIPELINE_EXIT: "PIPELINE_EXIT",
  CONTACT_PIPELINE_STAGE_EXIT: "STAGE_EXIT",
  FORM_SUBMISSION: "FORM_SUBMISSION",
};

Map notification types to trigger types

extractContactIdFromNotification: Record<
  NotificationType,
  (payload: NotificationPayloads[NotificationType]) => string
> = {
  TAG_ASSIGN: (payload: TagAssignPayload) => payload.contactId,
  CONTACT_ADD: (payload: ContactAddPayload) => payload.id,
  CONTACT_PIPELINE_ENTER: (payload: ContactPipelineEnterPayload) =>
    payload.contactId,
  CONTACT_PIPELINE_EXIT: (payload: ContactPipelineExitPayload) =>
    payload.contactId,
  CONTACT_PIPELINE_STAGE_EXIT: (payload: ContactPipelineStageExitPayload) =>
    payload.contactId,
  FORM_SUBMISSION: (payload: any) => payload.contactId,
  CONTACT_PIPELINE_STAGE_ENTER: (payload: ContactPipelineStageEnterPayload) =>
    payload.contactId,
  AUTOMATION_PROGRESS_SUCCESS: (payload: AutomationProgressSuccessPayload) =>
    payload.contactId,
};

Helper fns to extract contact ID from each notification payload type

Executing work

How do we actually execute code?

Let's take a look at the function executeAutomationRun.

But first...

There's one more database table I didn't mention.

Table automation_delays

Column nameTypeDescription
iduuidPrimary key.
automation_run_iduuidForeign key on automation run that created delay.
automation_step_iduuidForeign key on corresponding DELAY automation step.
statustextDelay status.
triggers_atdatetimeWhen the delay should trigger.
created_atdatetimeCreation timestamp.

This table drives delay type automation steps.

These are automation steps that pause the automation for a period of time.

Simply set the triggers_at column for when the automation should be continued.

Use a cron job to periodically poll the database and resume the automation for delays after their trigger date.

Let's take a look at the code.

async executeAutomationRun(workspaceId: string, automationRunId: string) {
    const automationRun = /* Find automation run with ID */

    if (!automationRun) {
        throw new NotFoundException(
            "No in-progress automation instance was found with the supplied ID.",
        );
    }

    switch (automationRun.action_type) {
        case "DELAY":
            await this.createDelay(
                workspaceId,
                automationRun.automation_run_id,
                automationRun.inputs,
            );
            break;
        default:
            try {
                await this.executeAutomationStep(
                    workspaceId,
                    automationRun,
                );
            catch (error) {}

            await this.advanceAutomationRun(
                workspaceId,
                automationRun,
            );

            break;
    }
}

Executing an automation run

  1. Find the automation run with the supplied ID.
  2. Check its type.
  3. If it's a delay type automation, create a new automation delay to postpone the flow.
  4. Otherwise, execute the automation step's work and advance the automation run.

Command and Conquer

The last missing piece is a robust system for dynamically executing chunks of code.

The command pattern solves this for us.

It lets us store units of code execution as business entities.

Let's write the interface for a command.

interface ICommand<D, R> {
  execute(
    workspaceId: string,
    resourceId: string | undefined,
    data: D | undefined,
    metadata?: unknown
  ): Promise<R>;

  getType(): CommandType;
}

We'll implement this interface every time we create a new command.

Let's also create some command types:

const CommandTypeValues = [
  "CREATE_CONTACT",
  "ASSIGN_TAG",
  "UNASSIGN_TAG",
  "ADD_CONTACT_TO_PIPELINE",
  "MOVE_CONTACT_TO_STAGE",
  "REMOVE_CONTACT_FROM_PIPELINE",
  "SEND_EMAIL",
] as const;
type CommandType = (typeof CommandTypeValues)[number];

Up next is an example implementation, for the CREATE_CONTACT command.

Notice we can perform side-effects alongside the main business action.

This is how we'll publish the initialnotifications that will kick off the automation process.

class CreateContactCommand implements ICommand<InsertContactData, Contact> {
  constructor(private readonly contactsService: ContactsService) {}

  async execute(
    workspaceId: string,
    _resourceId: undefined,
    data: InsertContactData
  ) {
      const contact = await this.contactsService.create(workspaceId, data);

      try {
          await this.notificationsService.publish(
              workspaceId,
              "CONTACT_ADD",
              contact,
          );
          return contact;
      } catch (error) {
          await this.contactsService.destroy(
              workspaceId,
              contact.id,
          );
          throw error;
      }
  }

  getType(): CommandType {
    return "CREATE_CONTACT";
  }
}

Command example implementation

To recap, the command pattern gives us several benefits:

  1. Standard interface for executing well-defined "actions" within our application.
  2. Can write complex logic, potentially combining methods from multiple services, into a single logical unit of work.
  3. Can add in additional side-effects when the command is run.

This is the service method we'll use to dispatch these commands:

async execute<D, R>(
  commandConstructor: Type<ICommand<D, R>>,
  workspaceId: string,
  data: ExecuteCommandData<D, R>,
): Promise<R> {
  const commandInstance: ICommand<D, R> =
      this.moduleRef.get(commandConstructor);
  if (!commandInstance)
      throw new Error(
          "Error: Requested command not found in module context.",
      );

  const commandRecord = await this.track(workspaceId, {
      commandType: commandInstance.getType(),
      issuerType: data.issuerType,
      issuerId: data.issuerId,
      resourceId: data.resourceId ?? null,
      data: data.data ?? null,
      metadata: data.metadata ?? null,
  });

  try {
      const commandResult = await commandInstance.execute(
          workspaceId,
          data.resourceId,
          data.data,
          data.metadata,
      );

      await this.update(workspaceId, commandRecord.id, {
          status: "COMPLETED",
      });

      return commandResult;
  } catch (error) {
      await this.update(workspaceId, commandRecord.id, {
          status: "FAILED",
          error: error,
      });
      throw error;
  }
}

CommandService::execute implementation

This function does three things:

  1. Grab an instance of the specified command
  2. Create a new record in the commands table pending execution
  3. Attempt to run the command's work
  4. Update the commands record to COMPLETED if successful, FAILED if an error occurred (and re-throw the error)

Finally, to execute a command, it looks like this:

await this.commandsService.execute(
  CreateContactCommand,
  workspaceId,
  {
      issuerType: "USER",
      issuerId: userId,
      data: createContactDto,
  },
),

This will run CreateContactCommand's execute function, return the value from it, and as a side-effect, log its execution.

Executing automation steps

We'll use commands to execute actions from automations.

This is the function executeAutomationStep that gets called from executeAutomationRun.

async executeAutomationStep(
       workspaceId: string,
       automationRun: AutomationRun & Automation & AutomationStep,
   ) {
       switch (automationRun.action_type) {
           case "TAG_ASSIGN":
               return await Promise.any(
                   automationRun.inputs.tagIds.map((tagId) =>
                       this.commandsService.execute(
                           AssignTagCommand,
                           workspaceId,
                           {
                               issuerType: "AUTOMATION",
                               issuerId: automationRun.automation_id,
                               resourceId: automationRun.contact_id,
                               data: { tagId },
                           },
                       ),
                   ),
               );
           case "TAG_REMOVE":
               return await Promise.any(
                   automationRun.inputs.tagIds.map((tagId) =>
                       this.commandsService.execute(
                           UnassignTagCommand,
                           workspaceId,
                           {
                               issuerType: "AUTOMATION",
                               issuerId: automationRun.automation_id,
                               resourceId: automationRun.contact_id,
                               data: { tagId },
                           },
                       ),
                   ),
               );
           case "CONTACT_UPDATE":
               return await this.commandsService.execute(
                   UpdateContactCommand,
                   workspaceId,
                   {
                       issuerType: "AUTOMATION",
                       issuerId: automationRun.automation_id,
                       resourceId: automationRun.contact_id,
                       data: automationRun.inputs,
                   },
               );
           case "ADD_TO_PIPELINE_STAGE":
               return await this.commandsService.execute(
                   AddContactToPipelineCommand,
                   workspaceId,
                   {
                       issuerType: "AUTOMATION",
                       issuerId: automationRun.automation_id,
                       data: {
                           contactId: automationRun.contact_id,
                           pipelineStageId:
                               automationRun.inputs.pipelineStageId,
                       },
                   },
               );
           case "REMOVE_FROM_PIPELINE":
               return await this.commandsService.execute(
                   RemoveContactFromPipelineCommand,
                   workspaceId,
                   {
                       issuerType: "AUTOMATION",
                       issuerId: automationRun.automation_id,
                       resourceId: automationRun.inputs.pipelineId,
                       data: {
                           contactId: automationRun.contact_id,
                       },
                   },
               );
           case "SEND_EMAIL":
               return await this.commandsService.execute(
                   SendEmailCommand,
                   workspaceId,
                   {
                       issuerType: "AUTOMATION",
                       issuerId: automationRun.automation_id,
                       resourceId: automationRun.inputs.emailId,
                       data: { contactId: automationRun.contact_id },
                   },
               );
           case "END":
               return;
           default:
               return;
       }
}

Executing automation steps

  1. Check the type of the current automation step.
  2. Execute a command for each type.
  3. Pass in the automation step's inputs as parameters to the command.

And we're done!

To add new step types, all we have to do is add cases to this switch statement.

Querying

We've addressed requirements 1 and 2.

💡We solved the resiliency requirement by bringing in serverless AWS components that will scale with usage.

💡We solved the modularity requirement by using the command pattern and map structures to create a unified interface for executing automation work, with limited edge cases/conditionals.

💡The third requirement of observability is solved for us due to our database design!

The automation_runs table is useful for reporting.

Some example queries to run:

  • Filter by contact ID to get a contact's history of automations.

  • Filter by workspace ID to and aggregate to summarize in-progress or completed automations.

  • Additionally filter by status to find prior automation runs that were completed, or runs that are in progress.

Conclusion

Thank you for reading! I hope you enjoyed this deep dive.

In the interest of brevity, I only covered the core components in this article.

Some behavior I didn't mention is:

  • Building automations from JSON data (to connect to a front-end builder, for example).
  • Managing versioning, so that updates to automations don't interrupt existing runs.
  • Aborting/purging in-progress automation runs.

Shoot me an email or DM me on LinkedIn if you have any questions or comments, or would just like to get in touch!