Shahzad Bhatti Welcome to my ramblings and rants!

February 22, 2024

Building a distributed orchestration and graph processing system

Filed under: Computing,GO — admin @ 8:45 pm

1. Introduction

The architecture of Formicary, a distributed orchestration engine will be described, which is intended for the execution of background tasks, jobs, and workflows. The operation is based on a directed acyclic graph of tasks, where each task is seen as a unit of work. Execution of these tasks can be achieved through a variety of protocols, including Docker, Kubernetes, Shell, HTTP, and Messaging. The Leader-Follower model (comprising a queen-leader and ant-workers) is employed by Formicary. Tasks are scheduled by the queen-leader, and their execution is carried out by the ant-workers. The Pipes and Filter and SEDA patterns are supported, permitting the output from one task to serve as the input for another. Parallel execution and result aggregation of tasks are made possible by the Fork/Join pattern. Distribution of tasks is guided by tags, labels, and executor protocols. The following is a list of its significant features:

  • Declarative Task/Job Definitions: Tasks and Jobs are defined as DAGs using simple YAML configuration files, with support for GO-based templates for customization.
  • Authentication & Authorization: The access to Formicary is secured using OAuth and OIDC standards.
  • Persistence of Artifacts: Artifacts and outputs from tasks can be stored and used by subsequent tasks or as job inputs.
  • Extensible Execution Methods: Supports a variety of execution protocols, including Docker, Kubernetes, HTTP, and custom protocols.
  • Quota: Limit maximum allowed CPU, memory, and disk quota usage for each task.
  • Caching: Supports caching for dependencies such as npm, maven, gradle, and python.
  • Encryption: Secures confidential configurations in databases and during network communication.
  • Scheduling: Cron-based scheduling for periodic job execution.
  • Optional and Finalized Tasks: Supports optional tasks that may fail and finalized tasks that run regardless of job success or failure.
  • Child Jobs: Supports spawning of child jobs based on Fork/Join patterns.
  • Retry Mechanisms: Supports retrying of tasks or jobs based on error/exit codes.
  • Job Filtering and Priority: Allows job/task execution filtering and prioritization.
  • Job prioritization, job/task retries, and cancellation.
  • Resource based Routing: Supports constraint-based routing of workloads for computing resources based on tags, labels, execution protocols, etc.
  • Monitoring, Alarms and Notifications: Offers job execution reports, real-time log streaming, and email notifications.
  • Other: Graceful and abrupt shutdown capabilities. Reporting and statistics on job outcomes and resource usage.

2. Use-Cases

The Formicary is designed for efficient and flexible job and task execution, adaptable to various complex scenarios, and capable of scaling according to the user base and task demands. Following is a list of its major use cases:

  • Complex Workflow Orchestration: Formicary is specially designed to run a series of integration tests, code analysis, and deployment tasks that depend on various conditions and outputs of previous tasks. Formicary can orchestrate this complex workflow across multiple environments, such as staging and production, with tasks running in parallel or sequence based on conditions.
  • Image Processing Pipeline: Formicary supports artifacts management for uploading images to S3 compatible storage including Minio. It allows orchestrating a series of tasks for image resizing, watermarking, and metadata extraction, with the final output stored in an object store.
  • Automate Build, Test and Release Workflows: A DevOps team can use Formicary to trigger a workflow that builds the project, runs tests, creates a Release, uploads build artifacts to the release, and publishes the package to a registry like npm or PyPI.
  • Scheduled Data ETL Job: A data engineering team can use Formicary to manage scheduled ETL jobs that extract data from multiple sources, transform it, and load it into a data warehouse, with tasks to validate and clean the data at each step.
  • Machine Learning Pipeline: A data science team can use Formicary pipeline to preprocess datasets, train machine learning models, evaluate their performance, and, based on certain metrics, decide whether to retrain the models or adjust preprocessing steps.

3. Architecture

The Formicary architecture is a complex system designed for task orchestration and execution, based on the Leader-Follower, SEDA and Fork/Join patterns.

3.1 Design Patterns

Here are some common design patterns used in the Formicary architecture:

  1. Microservices Architecture: Formicary architecture is decomposed into smaller, independent services that enhances scalability and facilitates independent deployment and updates.
  2. Pipeline Pattern: It structures the processing of tasks in a linear sequence of processing steps (stages).
  3. Distributed Task Queues: It manages task distribution among multiple worker nodes. This ensures load balancing and effective utilization of resources.
  4. Event-Driven Architecture: Formicary components communicate with events, triggering actions based on event occurrence for handling asynchronous processes and integrating various services.
  5. Load Balancer Pattern: It distributes incoming requests or tasks evenly across a pool of servers and prevents any single server from becoming a bottleneck.
  6. Circuit Breaker Pattern: It prevents a system from repeatedly trying to execute an operation that’s likely to fail.
  7. Retry Pattern: It automatically re-attempts failed operations a certain number of times before considering the operation failed.
  8. Observer Pattern: Formicary uses observer pattern for monitoring, logging, and metrics collection.
  9. Scheduler-Agent-Supervisor Pattern: The Formicary schedulers trigger tasks, agents to execute them, and supervisors to monitor task execution.
  10. Immutable Infrastructure: It treats infrastructure entities as immutable, replacing them for each deployment instead of updating them.
  11. Fork-Join Pattern: It decomposes a task into sub-tasks, processes them in parallel, and then combines the results.
  12. Caching Pattern: It stores intermediate build artifacts such as npm/maven/gradle libraries in a readily accessible location to reduce latency and improves performance.
  13. Back-Pressure Pattern: It controls the rate of task generation or data flow to prevent overwhelming the system.
  14. Idempotent Operations: It ensures that an operation produces the same result even if it’s executed multiple times.
  15. External Configuration Store Pattern: It manages job configuration and settings in a separate, external location, enabling easier changes and consistency across services.
  16. Blue-Green Deployment Pattern: It manages deployment by switching between two identical environments, one running the current version (blue) and one running the new version (green).

3.2 High-level Components

The architecture of Formicary is designed to manage and execute complex workflows where tasks are organized in a DAG structure. This architecture is inherently scalable and robust, catering to the needs of task scheduling, execution, and monitoring. Here’s an overview of its key functionalities and components:

components diagram

3.2.1 Functionalities

  • Job Processing: Formicary supports defining workflows as Job, where each node represents a task, and edges define dependencies. It ensures that tasks are executed in an order that respects their dependencies.
  • Task Distribution: Tasks, defined as units of work, are distributed among ant-workers based on tags and executor protocols (Kubernetes, Docker, Shell, HTTP, Websockets, etc.).
  • Scalability: Formicary scales to handle a large number of tasks and complex workflows. It supports horizontal scaling where more workers can be added to handle increased load.
  • Fault Tolerance and Reliability: It handles failures and retries of tasks.
  • Extensibility: It provides interfaces and plugins for extending its capabilities.
  • Resource Management: Efficiently allocates resources for task execution, optimizing for performance and cost.
  • Resource Quotas: It define maximum resource quotas for CPU, memory, disk space, and network usage for each job or task. This prevent any single job from over-consuming resources, ensuring fair resource allocation among all jobs.
  • Prioritization: It prioritize jobs based on criticality or predefined rules.
  • Job Throttling: It implement throttling mechanisms to control the rate at which jobs are fed into the system.
  • Kubernetes Clusters: Formicary allows for the creation of kubernetes clusters to supports auto-scaling and termination to optimize resource usage and cost.
  • Monitoring and Logging: It offers extensive monitoring and logging capabilities.
  • Authentication and Authorization: Formicary enforces strict authentication and authorization based on OAuth 2.0 and OIDC protocols before allowing access to the system.
  • Multitenancy: Formicary accommodates multiple tenants, allowing various organizations to sign up with one or more users, ensuring their data is safeguarded through robust authentication and authorization measures.
  • Common Plugins: Formicary allows the sharing of common plugins that function as sub-jobs for reusable features, which other users can then utilize.

3.2.2 Core Components

Following are core components of the Formicary system:

API Controller

The API controller defines an API that supports the following functions:

  • Checking the status of current, pending, or completed jobs
  • Submitting new jobs for execution
  • Looking up or modifying job specifications
  • Enrolling ant workers and overseeing resources for processing
  • Retrieving or uploading job-related artifacts
  • Handling settings, error codes, and resource allocation
  • Delivering both real-time and historical data reports
UI Controller

The UI controller offers the following features:

  • Displaying ongoing, queued, or completed jobs
  • Initiating new job submissions
  • Reviewing job specifications or introducing new ones
  • Supervising ant workers and execution units
  • Accessing or submitting artifacts
  • Configuring settings, error codes, and resource management
  • Providing access to both live and archived reports
Resource Manager

The resource manager enrolls ant workers and monitors the resources accessible for processing jobs. Ant workers regularly inform the resource manager about their available capacity and current workload. This continuous communication allows the resource manager to assess the maximum number of jobs that can run simultaneously without surpassing the capacity of the workers.

Job Scheduler

The job scheduler examines the queue for jobs awaiting execution and consults the resource manager to determine if a job can be allocated for execution. When sufficient resources are confirmed to be available, it dispatches a remote command to the Job-Launcher to initiate the job’s execution. Please note that the formicary architecture allows for multiple server instances, with the scheduler operating on the leader node. Meanwhile, other servers host the job-launcher and executor components, which are responsible for executing and orchestrating jobs.

Job Launcher

The job launcher remains attentive to incoming requests for job execution and initiates the process by engaging the Job-Supervisor. The Job-Supervisor then takes on the role of overseeing the execution of the job, ensuring its successful completion.

Job Supervisor

The job supervisor initiates a job in an asynchronous manner and manages the job’s execution. It oversees each task through the Task-Supervisor and determines the subsequent task to execute, guided by the status or exit code of the previously completed task.

Task Supervisor

The task supervisor initiates task execution by dispatching a remote instruction to the ant worker equipped to handle the specific task method, then stands by for a response. Upon receiving the outcome, the task supervisor records the results in the database for future reference and analysis.

Ant Workers

An ant worker registers with the queen server by specifying the types of tasks it can handle, using specific methods or tags for identification. Once registered, it remains vigilant for task requests, processing each one asynchronously according to the execution protocols defined for each task, and then relaying the results back to the server. Before starting on a task, the ant worker ensures all required artifacts are gathered and then uploads them once the task is completed. Moreover, the ant worker is responsible for managing the lifecycle of any external containers, such as those in Docker and Kubernetes systems, from initiation to termination.

To maintain system efficiency and prevent any single worker from becoming overwhelmed, the ant worker consistently updates the queen server with its current workload and capacity. This mechanism allows for a balanced distribution of tasks, ensuring that no worker is overloaded. The architecture is scalable, allowing for the addition of more ant workers to evenly spread the workload. These workers communicate with the queen server through messaging queues, enabling them to:

  • Regularly update the server on their workload and capacity.
  • Download necessary artifacts needed for task execution.
  • Execute tasks using the appropriate executors, such as Docker, HTTP, Kubernetes, Shell, or Websockets.
  • Upload the resulting artifacts upon completion of tasks.
  • Monitor and manage the lifecycle of Docker/Kubernetes containers, reporting back any significant events to the server.
Executors

The formicary system accommodates a range of executor methods, such as Kubernetes Pods, Docker containers, Shell commands, HTTP requests, and Messaging protocols, to abstract the runtime environment for executing tasks. The choice of executor within the formicary is defined through designated methods, with each method specifying a different execution environment.

ExecutorMethod
Kubernetes PodsKUBERNETES
Docker containersDOCKER
ShellSHELL
HTTP (GET POST, PUT, DELETE)HTTP_GET HTTP_POST_FORM HTTP_POST_JSON HTTP_PUT_FORM HTTP_PUT_JSON HTTP_DELETE WEBSOCKET
Fork/AwaitJOB_FORK, JOB_FORK_AWAIT
Artifact/ExpirationEXPIRE_ARTIFACTS
MessagingMESSAGING
Executor Methods

Note: These execution methods can be easily extended to support other executor protocols to provide greater flexibility in how tasks are executed and integrated with different environments.

Database

The formicary system employs a relational database to systematically store and manage a wide array of data, including job requests, detailed job definitions, resource allocations, error codes, and various configurations.

Artifacts and Object Store

The formicary system utilizes an object storage solution to maintain the artifacts produced during task execution, those generated within the image cache, or those uploaded directly by users. This method ensures a scalable and secure way to keep large volumes of unstructured data, facilitating easy access and retrieval of these critical components for operational efficiency and user interaction.

Messaging

Messaging enables seamless interaction between the scheduler and the workers, guaranteeing dependable dissemination of tasks across distributed settings.

Notification System

The notification system dispatches alerts and updates regarding the pipeline status to users.

3.3 Data Model

Here’s an overview of its key data model in Formicary system:

Domain Classes

3.3.1 Job Definition

A JobDefinition outlines a set of tasks arranged in a Directed Acyclic Graph (DAG), executed by worker entities. The workflow progresses based on the exit codes of tasks, determining the subsequent task to execute. Each task definition encapsulates a job’s specifics, and upon receiving a new job request, an instance of this job is initiated through JobExecution.

type JobDefinition struct {
	// ID defines UUID for primary key
	ID string `yaml:"-" json:"id" gorm:"primary_key"`
	// JobType defines a unique type of job
	JobType string `yaml:"job_type" json:"job_type"`
	// Version defines internal version of the job-definition, which is updated when a job is updated. The database
	// stores each version as a separate row but only latest version is used for new jobs.
	Version int32 `yaml:"-" json:"-"`
	// SemVersion - semantic version is used for external version, which can be used for public plugins.
	SemVersion string `yaml:"sem_version" json:"sem_version"`
	// URL defines url for job
	URL string `json:"url"`
	// UserID defines user who updated the job
	UserID string `json:"user_id"`
	// OrganizationID defines org who submitted the job
	OrganizationID string `json:"organization_id"`
	// Description of job
	Description string `yaml:"description,omitempty" json:"description"`
	// Platform can be OS platform or target runtime and a job can be targeted for specific platform that can be used for filtering
	Platform string `yaml:"platform,omitempty" json:"platform"`
	// CronTrigger can be used to run the job periodically
	CronTrigger string `yaml:"cron_trigger,omitempty" json:"cron_trigger"`
	// Timeout defines max time a job should take, otherwise the job is aborted
	Timeout time.Duration `yaml:"timeout,omitempty" json:"timeout"`
	// Retry defines max number of tries a job can be retried where it re-runs failed job
	Retry int `yaml:"retry,omitempty" json:"retry"`
	// HardResetAfterRetries defines retry config when job is rerun and as opposed to re-running only failed tasks, all tasks are executed.
	HardResetAfterRetries int `yaml:"hard_reset_after_retries,omitempty" json:"hard_reset_after_retries"`
	// DelayBetweenRetries defines time between retry of job
	DelayBetweenRetries time.Duration `yaml:"delay_between_retries,omitempty" json:"delay_between_retries"`
	// MaxConcurrency defines max number of jobs that can be run concurrently
	MaxConcurrency int `yaml:"max_concurrency,omitempty" json:"max_concurrency"`
	// disabled is used to stop further processing of job, and it can be used during maintenance, upgrade or debugging.
	Disabled bool `yaml:"-" json:"disabled"`
	// PublicPlugin means job is public plugin
	PublicPlugin bool `yaml:"public_plugin,omitempty" json:"public_plugin"`
	// RequiredParams from job request (and plugin)
	RequiredParams []string `yaml:"required_params,omitempty" json:"required_params" gorm:"-"`
	// Tags are used to use specific followers that support the tags defined by ants.
	// Tags is aggregation of task tags
	Tags string `yaml:"tags,omitempty" json:"tags"`
	// Methods is aggregation of task methods
	Methods string `yaml:"methods,omitempty" json:"methods"`
	// Tasks defines one to many relationships between job and tasks, where a job defines
	// a directed acyclic graph of tasks that are executed for the job.
	Tasks []*TaskDefinition `yaml:"tasks" json:"tasks" gorm:"ForeignKey:JobDefinitionID" gorm:"auto_preload" gorm:"constraint:OnUpdate:CASCADE"`
	// Configs defines config properties of job that are used as parameters for the job template or task request when executing on a remote
	// ant follower. Both config and variables provide similar capabilities but config can be updated for all job versions and can store
	// sensitive data.
	Configs []*JobDefinitionConfig `yaml:"-" json:"-" gorm:"ForeignKey:JobDefinitionID" gorm:"auto_preload" gorm:"constraint:OnUpdate:CASCADE"`
	// Variables defines properties of job that are used as parameters for the job template or task request when executing on a remote
	// ant follower. Both config and variables provide similar capabilities but variables are part of the job yaml definition.
	Variables []*JobDefinitionVariable `yaml:"-" json:"-" gorm:"ForeignKey:JobDefinitionID" gorm:"auto_preload" gorm:"constraint:OnUpdate:CASCADE"`
	// CreatedAt job creation time
	CreatedAt time.Time `yaml:"-" json:"created_at"`
	// UpdatedAt job update time
	UpdatedAt time.Time `yaml:"-" json:"updated_at"`
}

3.3.2 Task Definition

A TaskDefinition outlines the work performed by worker entities. It specifies the task’s parameters and, upon a new job request, a TaskExecution instance is initiated to carry out the task. The task details, including its method and tags, guide the dispatch of task requests to a compatible remote worker. Upon task completion, the outcomes are recorded in the database for reference.

type TaskDefinition struct {
	// ID defines UUID for primary key
	ID string `yaml:"-" json:"id" gorm:"primary_key"`
	// JobDefinitionID defines foreign key for JobDefinition
	JobDefinitionID string `yaml:"-" json:"job_definition_id"`
	// TaskType defines type of task
	TaskType string `yaml:"task_type" json:"task_type"`
	// Method TaskMethod defines method of communication
	Method common.TaskMethod `yaml:"method" json:"method"`
	// Description of task
	Description string `yaml:"description,omitempty" json:"description"`
	// HostNetwork defines kubernetes/docker config for host_network
	HostNetwork string `json:"host_network,omitempty" yaml:"host_network,omitempty" gorm:"-"`
	// AllowFailure means the task is optional and can fail without failing entire job
	AllowFailure bool `yaml:"allow_failure,omitempty" json:"allow_failure"`
	// AllowStartIfCompleted  means the task is always run on retry even if it was completed successfully
	AllowStartIfCompleted bool `yaml:"allow_start_if_completed,omitempty" json:"allow_start_if_completed"`
	// AlwaysRun means the task is always run on execution even if the job fails. For example, a required task fails (without
	// AllowFailure), the job is aborted and remaining tasks are skipped but a task defined as `AlwaysRun` is run even if the job fails.
	AlwaysRun bool `yaml:"always_run,omitempty" json:"always_run"`
	// Timeout defines max time a task should take, otherwise the job is aborted
	Timeout time.Duration `yaml:"timeout,omitempty" json:"timeout"`
	// Retry defines max number of tries a task can be retried where it re-runs failed tasks
	Retry int `yaml:"retry,omitempty" json:"retry"`
	// DelayBetweenRetries defines time between retry of task
	DelayBetweenRetries time.Duration `yaml:"delay_between_retries,omitempty" json:"delay_between_retries"`
	// Webhook config
	Webhook *common.Webhook `yaml:"webhook,omitempty" json:"webhook" gorm:"-"`
	// OnExitCodeSerialized defines next task to execute
	OnExitCodeSerialized string `yaml:"-" json:"-"`
	// OnExitCode defines next task to run based on exit code
	OnExitCode map[common.RequestState]string `yaml:"on_exit_code,omitempty" json:"on_exit_code" gorm:"-"`
	// OnCompleted defines next task to run based on completion
	OnCompleted string `yaml:"on_completed,omitempty" json:"on_completed" gorm:"on_completed"`
	// OnFailed defines next task to run based on failure
	OnFailed string `yaml:"on_failed,omitempty" json:"on_failed" gorm:"on_failed"`
	// Variables defines properties of task
	Variables []*TaskDefinitionVariable `yaml:"-" json:"-" gorm:"ForeignKey:TaskDefinitionID" gorm:"auto_preload" gorm:"constraint:OnUpdate:CASCADE"`
	TaskOrder int       `yaml:"-" json:"-" gorm:"task_order"`
	// ReportStdout is used to send stdout as a report
	ReportStdout bool `yaml:"report_stdout,omitempty" json:"report_stdout"`
	// Transient properties -- these are populated when AfterLoad or Validate is called
	NameValueVariables interface{} `yaml:"variables,omitempty" json:"variables" gorm:"-"`
	// Header defines HTTP headers
	Headers map[string]string `yaml:"headers,omitempty" json:"headers" gorm:"-"`
	// BeforeScript defines list of commands that are executed before main script
	BeforeScript []string `yaml:"before_script,omitempty" json:"before_script" gorm:"-"`
	// AfterScript defines list of commands that are executed after main script for cleanup
	AfterScript []string `yaml:"after_script,omitempty" json:"after_script" gorm:"-"`
	// Script defines list of commands to execute in container
	Script []string `yaml:"script,omitempty" json:"script" gorm:"-"`
	// Resources defines resources required by the task
	Resources BasicResource `yaml:"resources,omitempty" json:"resources" gorm:"-"`
	// Tags are used to use specific followers that support the tags defined by ants.
	// For example, you may start a follower that processes payments and the task will be routed to that follower
	Tags []string `yaml:"tags,omitempty" json:"tags" gorm:"-"`
	// Except is used to filter task execution based on certain condition
	Except string `yaml:"except,omitempty" json:"except" gorm:"-"`
	// JobVersion defines job version
	JobVersion string `yaml:"job_version,omitempty" json:"job_version" gorm:"-"`
	// Dependencies defines dependent tasks for downloading artifacts
	Dependencies []string `json:"dependencies,omitempty" yaml:"dependencies,omitempty" gorm:"-"`
	// ArtifactIDs defines id of artifacts that are automatically downloaded for job-execution
	ArtifactIDs []string `json:"artifact_ids,omitempty" yaml:"artifact_ids,omitempty" gorm:"-"`
	// ForkJobType defines type of job to work
	ForkJobType string `json:"fork_job_type,omitempty" yaml:"fork_job_type,omitempty" gorm:"-"`
	// URL to use
	URL string `json:"url,omitempty" yaml:"url,omitempty" gorm:"-"`
	// AwaitForkedTasks defines list of jobs to wait for completion
	AwaitForkedTasks      []string `json:"await_forked_tasks,omitempty" yaml:"await_forked_tasks,omitempty" gorm:"-"`
	MessagingRequestQueue string   `json:"messaging_request_queue,omitempty" yaml:"messaging_request_queue,omitempty" gorm:"-"`
	MessagingReplyQueue   string   `json:"messaging_reply_queue,omitempty" yaml:"messaging_reply_queue,omitempty" gorm:"-"`
	// CreatedAt job creation time
	CreatedAt time.Time `yaml:"-" json:"created_at"`
	// UpdatedAt job update time
	UpdatedAt time.Time `yaml:"-" json:"updated_at"`  
}

3.3.3 JobExecution

JobExecution refers to a specific instance of a job-definition that gets activated upon the submission of a job-request. When a job is initiated by the job-launcher, this triggers the creation of a job-execution instance, which is also recorded in the database. Following this initiation, the job-launcher transfers responsibility for the job to the job-supervisor, which then commences execution, updating the status of both the job request and execution to EXECUTING. The job supervisor manages the execution process, ultimately altering the status to COMPLETED or FAILED upon completion. Throughout this process, the formicary system emits job lifecycle events to reflect these status changes, which can be monitored by UI or API clients.

For every task outlined within the task-definition associated with the JobExecution, a corresponding TaskExecution instance is generated. This setup tracks the progress and state of both job and task executions within a database, and any outputs generated during the job execution process are preserved in object storage.

type JobExecution struct {
	// ID defines UUID for primary key
	ID string `json:"id" gorm:"primary_key"`
	// JobRequestID defines foreign key for job request
	JobRequestID uint64 `json:"job_request_id"`
	// JobType defines type for the job
	JobType    string `json:"job_type"`
	JobVersion string `json:"job_version"`
	// JobState defines state of job that is maintained throughout the lifecycle of a job
	JobState types.RequestState `json:"job_state"`
	// OrganizationID defines org who submitted the job
	OrganizationID string `json:"organization_id"`
	// UserID defines user who submitted the job
	UserID string `json:"user_id"`
	// ExitCode defines exit status from the job execution
	ExitCode string `json:"exit_code"`
	// ExitMessage defines exit message from the job execution
	ExitMessage string `json:"exit_message"`
	// ErrorCode captures error code at the end of job execution if it fails
	ErrorCode string `json:"error_code"`
	// ErrorMessage captures error message at the end of job execution if it fails
	ErrorMessage string `json:"error_message"`
	// Contexts defines context variables of job
	Contexts []*JobExecutionContext `json:"contexts" gorm:"ForeignKey:JobExecutionID" gorm:"auto_preload"`
	// Tasks defines list of tasks that are executed for the job
	Tasks []*TaskExecution `json:"tasks" gorm:"ForeignKey:JobExecutionID" gorm:"auto_preload"`
	// StartedAt job execution start time
	StartedAt time.Time `json:"started_at"`
	// EndedAt job execution end time
	EndedAt *time.Time `json:"ended_at"`
	// UpdatedAt job execution last update time
	UpdatedAt time.Time `json:"updated_at"`
	// CPUSecs execution time
	CPUSecs int64 `json:"cpu_secs"`
}

The state of job execution includes: PENDING, READY, COMPLETED, FAILED, EXECUTING, STARTED, PAUSED, and CANCELLED.

3.3.4 TaskExecution

TaskExecution records the execution of a task or a unit of work, carried out by ant-workers in accordance with the specifications of the task-definition. It captures the status and the outputs produced by the task execution, storing them in the database and the object-store. When a task begins, it is represented by a task-execution instance, initiated by the task supervisor. This instance is stored in the database by the task supervisor, which then assembles a task request to dispatch to a remote ant worker. The task supervisor awaits the worker’s response before updating the database with the outcome. Task execution concludes with either a COMPLETED or FAILED status, and it also accommodates an exit code provided by the worker. Based on the final status or exit code, orchestration rules determine the subsequent task to execute.

type TaskExecution struct {
	// ID defines UUID for primary key
	ID string `json:"id" gorm:"primary_key"`
	// JobExecutionID defines foreign key for JobExecution
	JobExecutionID string `json:"job_execution_id"`
	// TaskType defines type of task
	TaskType string `json:"task_type"`
	// Method defines method of communication
	Method types.TaskMethod `yaml:"method" json:"method"`
	// TaskState defines state of task that is maintained throughout the lifecycle of a task
	TaskState types.RequestState `json:"task_state"`
	// AllowFailure means the task is optional and can fail without failing entire job
	AllowFailure bool `json:"allow_failure"`
	// ExitCode defines exit status from the job execution
	ExitCode string `json:"exit_code"`
	// ExitMessage defines exit message from the job execution
	ExitMessage string `json:"exit_message"`
	// ErrorCode captures error code at the end of job execution if it fails
	ErrorCode string `json:"error_code"`
	// ErrorMessage captures error message at the end of job execution if it fails
	ErrorMessage string `json:"error_message"`
	// FailedCommand captures command that failed
	FailedCommand string `json:"failed_command"`
	// AntID - id of ant with version
	AntID string `json:"ant_id"`
	// AntHost - host where ant ran the task
	AntHost string `json:"ant_host"`
	// Retried keeps track of retry attempts
	Retried int `json:"retried"`
	// Contexts defines context variables of task
	Contexts []*TaskExecutionContext `json:"contexts" gorm:"ForeignKey:TaskExecutionID" gorm:"auto_preload"`
	// Artifacts defines list of artifacts that are generated for the task
	Artifacts []*types.Artifact `json:"artifacts" gorm:"ForeignKey:TaskExecutionID"`
	// TaskOrder
	TaskOrder int `json:"task_order"`
	// CountServices
	CountServices int `json:"count_services"`
	// CostFactor
	CostFactor float64 `json:"cost_factor"`
  	Stdout []string `json:"stdout" gorm:"-"`
	// StartedAt job creation time
	StartedAt time.Time `json:"started_at"`
	// EndedAt job update time
	EndedAt *time.Time `json:"ended_at"`
	// UpdatedAt job execution last update time
	UpdatedAt time.Time `json:"updated_at"`
}

The state of TaskExecution includes READY, STARTED, EXECUTING, COMPLETED, and FAILED.

3.3.5 JobRequest

JobRequest outlines a user’s request to execute a job as per its job-definition. Upon submission, a job-request is marked as PENDING in the database and later, it is asynchronously scheduled for execution by the job scheduler, depending on resource availability. It’s important to note that users have the option to schedule a job for a future date to avoid immediate execution. Additionally, a job definition can include a cron property, which automatically generates job requests at predetermined times for execution. Besides user-initiated requests, a job request might also be issued by a parent job to execute a child job in a fork/join manner.

type JobRequest struct {
	//gorm.Model
	// ID defines UUID for primary key
	ID uint64 `json:"id" gorm:"primary_key"`
	// ParentID defines id for parent job
	ParentID uint64 `json:"parent_id"`
	// UserKey defines user-defined UUID and can be used to detect duplicate jobs
	UserKey string `json:"user_key"`
	// JobDefinitionID points to the job-definition version
	JobDefinitionID string `json:"job_definition_id"`
	// JobExecutionID defines foreign key for JobExecution
	JobExecutionID string `json:"job_execution_id"`
	// LastJobExecutionID defines foreign key for JobExecution
	LastJobExecutionID string `json:"last_job_execution_id"`
	// OrganizationID defines org who submitted the job
	OrganizationID string `json:"organization_id"`
	// UserID defines user who submitted the job
	UserID string `json:"user_id"`
	// Permissions provides who can access this request 0 - all, 1 - Org must match, 2 - UserID must match from authentication
	Permissions int `json:"permissions"`
	// Description of the request
	Description string `json:"description"`
	// Platform overrides platform property for targeting job to a specific follower
	Platform string `json:"platform"`
	// JobType defines type for the job
	JobType    string `json:"job_type"`
	JobVersion string `json:"job_version"`
	// JobState defines state of job that is maintained throughout the lifecycle of a job
	JobState types.RequestState `json:"job_state"`
	// JobGroup defines a property for grouping related job
	JobGroup string `json:"job_group"`
	// JobPriority defines priority of the job
	JobPriority int `json:"job_priority"`
	// Timeout defines max time a job should take, otherwise the job is aborted
	Timeout time.Duration `yaml:"timeout,omitempty" json:"timeout"`
	// ScheduleAttempts defines attempts of schedule
	ScheduleAttempts int `json:"schedule_attempts" gorm:"schedule_attempts"`
	// Retried keeps track of retry attempts
	Retried int `json:"retried"`
	// CronTriggered is true if request was triggered by cron
	CronTriggered bool `json:"cron_triggered"`
	// QuickSearch provides quick search to search a request by params
	QuickSearch string `json:"quick_search"`
	// ErrorCode captures error code at the end of job execution if it fails
	ErrorCode string `json:"error_code"`
	// ErrorMessage captures error message at the end of job execution if it fails
	ErrorMessage string `json:"error_message"`
	// Params are passed with job request
	Params []*JobRequestParam `yaml:"-" json:"-" gorm:"ForeignKey:JobRequestID" gorm:"auto_preload" gorm:"constraint:OnUpdate:CASCADE"`
	// Execution refers to job-Execution
	Execution       *JobExecution          `yaml:"-" json:"execution" gorm:"-"`
	Errors          map[string]string      `yaml:"-" json:"-" gorm:"-"`  
	// ScheduledAt defines schedule time when job will be submitted so that you can submit a job
	// that will be executed later
	ScheduledAt time.Time `json:"scheduled_at"`
	// CreatedAt job creation time
	CreatedAt time.Time `json:"created_at"`
	// UpdatedAt job update time
	UpdatedAt time.Time `json:"updated_at" gorm:"updated_at"`
}

3.3.6 TaskRequest

TaskRequest specifies the parameters for a task that is dispatched to a remote ant-worker for execution. This request is transmitted through a messaging middleware to the most appropriate ant-worker, selected based on its resource availability and capacity to handle the task efficiently.

type TaskRequest struct {
	UserID          string                   `json:"user_id" yaml:"user_id"`
	OrganizationID  string                   `json:"organization_id" yaml:"organization_id"`
	JobDefinitionID string                   `json:"job_definition_id" yaml:"job_definition_id"`
	JobRequestID    uint64                   `json:"job_request_id" yaml:"job_request_id"`
	JobType         string                   `json:"job_type" yaml:"job_type"`
	JobTypeVersion  string                   `json:"job_type_version" yaml:"job_type_version"`
	JobExecutionID  string                   `json:"job_execution_id" yaml:"job_execution_id"`
	TaskExecutionID string                   `json:"task_execution_id" yaml:"task_execution_id"`
	TaskType        string                   `json:"task_type" yaml:"task_type"`
	CoRelationID    string                   `json:"co_relation_id"`
	Platform        string                   `json:"platform" yaml:"platform"`
	Action          TaskAction               `json:"action" yaml:"action"`
	JobRetry        int                      `json:"job_retry" yaml:"job_retry"`
	TaskRetry       int                      `json:"task_retry" yaml:"task_retry"`
	AllowFailure    bool                     `json:"allow_failure" yaml:"allow_failure"`
	Tags            []string                 `json:"tags" yaml:"tags"`
	BeforeScript    []string                 `json:"before_script" yaml:"before_script"`
	AfterScript     []string                 `json:"after_script" yaml:"after_script"`
	Script          []string                 `json:"script" yaml:"script"`
	Timeout         time.Duration            `json:"timeout" yaml:"timeout"`
	Variables       map[string]VariableValue `json:"variables" yaml:"variables"`
	ExecutorOpts    *ExecutorOptions         `json:"executor_opts" yaml:"executor_opts"`
}

3.3.7 ExecutorOptions

ExecutorOptions specify the settings for the underlying executor, including Docker, Kubernetes, Shell, HTTP, etc., ensuring tasks are carried out using the suitable computational resources.

type ExecutorOptions struct {
	Name                       string                  `json:"name" yaml:"name"`
	Method                     TaskMethod              `json:"method" yaml:"method"`
	Environment                EnvironmentMap          `json:"environment,omitempty" yaml:"environment,omitempty"`
	HelperEnvironment          EnvironmentMap          `json:"helper_environment,omitempty" yaml:"helper_environment,omitempty"`
	WorkingDirectory           string                  `json:"working_dir,omitempty" yaml:"working_dir,omitempty"`
	ArtifactsDirectory         string                  `json:"artifacts_dir,omitempty" yaml:"artifacts_dir,omitempty"`
	Artifacts                  ArtifactsConfig         `json:"artifacts,omitempty" yaml:"artifacts,omitempty"`
	CacheDirectory             string                  `json:"cache_dir,omitempty" yaml:"cache_dir,omitempty"`
	Cache                      CacheConfig             `json:"cache,omitempty" yaml:"cache,omitempty"`
	DependentArtifactIDs       []string                `json:"dependent_artifact_ids,omitempty" yaml:"dependent_artifact_ids,omitempty"`
	MainContainer              *ContainerDefinition    `json:"container,omitempty" yaml:"container,omitempty"`
	HelperContainer            *ContainerDefinition    `json:"helper,omitempty" yaml:"helper,omitempty"`
	Services                   []Service               `json:"services,omitempty" yaml:"services,omitempty"`
	Privileged                 bool                    `json:"privileged,omitempty" yaml:"privileged,omitempty"`
	Affinity                   *KubernetesNodeAffinity `json:"affinity,omitempty" yaml:"affinity,omitempty"`
	NodeSelector               map[string]string       `json:"node_selector,omitempty" yaml:"node_selector,omitempty"`
	NodeTolerations            NodeTolerations         `json:"node_tolerations,omitempty" yaml:"node_tolerations,omitempty"`
	PodLabels                  map[string]string       `json:"pod_labels,omitempty" yaml:"pod_labels,omitempty"`
	PodAnnotations             map[string]string       `json:"pod_annotations,omitempty" yaml:"pod_annotations,omitempty"`
	NetworkMode                string                  `json:"network_mode,omitempty" yaml:"network_mode,omitempty"`
	HostNetwork                bool                    `json:"host_network,omitempty" yaml:"host_network,omitempty"`
	Headers                    map[string]string       `yaml:"headers,omitempty" json:"headers"`
	QueryParams                map[string]string       `yaml:"query,omitempty" json:"query"`
	MessagingRequestQueue      string                  `json:"messaging_request_queue,omitempty" yaml:"messaging_request_queue,omitempty"`
	MessagingReplyQueue        string                  `json:"messaging_reply_queue,omitempty" yaml:"messaging_reply_queue,omitempty"`
	ForkJobType                string                  `json:"fork_job_type,omitempty" yaml:"fork_job_type,omitempty"`
	ForkJobVersion             string                  `json:"fork_job_version,omitempty" yaml:"fork_job_version,omitempty"`
	ArtifactKeyPrefix          string                  `json:"artifact_key_prefix,omitempty" yaml:"artifact_key_prefix,omitempty"`
	AwaitForkedTasks           []string                `json:"await_forked_tasks,omitempty" yaml:"await_forked_tasks,omitempty"`
	CostFactor                 float64                 `json:"cost_factor,omitempty" yaml:"cost_factor,omitempty"`
}

3.3.8 TaskResponse

TaskResponse outlines the outcome of a task execution, encompassing its status, context, generated artifacts, and additional outputs.

type TaskResponse struct {
	JobRequestID    uint64                 `json:"job_request_id"`
	TaskExecutionID string                 `json:"task_execution_id"`
	JobType         string                 `json:"job_type"`
	JobTypeVersion  string                 `json:"job_type_version"`
	TaskType        string                 `json:"task_type"`
	CoRelationID    string                 `json:"co_relation_id"`
	Status          RequestState           `json:"status"`
	AntID           string                 `json:"ant_id"`
	Host            string                 `json:"host"`
	Namespace       string                 `json:"namespace"`
	Tags            []string               `json:"tags"`
	ErrorMessage    string                 `json:"error_message"`
	ErrorCode       string                 `json:"error_code"`
	ExitCode        string                 `json:"exit_code"`
	ExitMessage     string                 `json:"exit_message"`
	FailedCommand   string                 `json:"failed_command"`
	TaskContext     map[string]interface{} `json:"task_context"`
	JobContext      map[string]interface{} `json:"job_context"`
	Artifacts       []*Artifact            `json:"artifacts"`
	Warnings        []string               `json:"warnings"`
	Stdout          []string               `json:"stdout"`
	CostFactor      float64                `json:"cost_factor"`
	Timings         TaskResponseTimings    `json:"timings"`
}

3.4 Events Model

Here’s a summary of the principal events model within the Formicary system, which facilitates communication among the main components:

Formicary Events

In above diagram, the lifecycle events are published upon start and completion of a job-request, job-execution, task-execution, and containers. Other events are propagated upon health errors, logging and leader election for the job scheduler.

3.5 Physical Architecture

Following diagram depicts the physical architecture of the Formicary system:

physical architecture

The physical architecture of a Formicary system is structured as follows:

  1. Queen Server: It manages task scheduling, resource allocation, and system monitoring. The job requests, definitions, user data, and configuration settings are maintained in the database.
  2. Ant Workers: These are distributed computing resources that execute the tasks assigned by the central server. Each ant worker is equipped with the necessary software to perform various tasks, such as processing data, running applications, or handling web requests. Worker nodes report their status, capacity, and workload back to the central server to facilitate efficient task distribution.
  3. Storage Systems: Relational databases are used to store structured data such as job definitions, user accounts, and system configurations. Object storage systems hold unstructured data, including task artifacts, logs, and binary data.
  4. Messaging Middleware: Messaging queues and APIs facilitate asynchronous communication and integration with other systems.
  5. Execution Environments: Consist of container orchestration systems like Kubernetes and Docker for isolating and managing task executions. They provide scalable and flexible environments that support various execution methods, including shell scripts, HTTP requests, and custom executables.
  6. Monitoring and Alerting Tools: Formicary system integrates with Prometheus for monitoring solutions to track the health, performance, and resource usage of both the central server and worker nodes. Alerting mechanisms notify administrators and users about system events, performance bottlenecks, and potential issues.
  7. Security Infrastructure: Authentication and authorization mechanisms control access to resources and tasks based on user roles and permissions.

This architecture allows the Formicary system to scale horizontally by adding more worker nodes as needed to handle increased workloads, and vertically by enhancing the capabilities of the central server and worker nodes. The system’s design emphasizes reliability, scalability, and efficiency, making it suitable for a wide range of applications, from data processing and analysis to web hosting and content delivery.

4. API Overview

The Formicary API is a comprehensive orchestration engine designed for executing complex job workflows, represented as a directed acyclic graph (DAG) of tasks. It’s built on design principles such as Leader-Follower, Pipes-Filter, Fork-Join, and SEDA, catering to a range of execution strategies. The API supports task unit management, job definition, and workflow configurations, including conditional logic and parameterization. The API defines a range of models for different entities such as AntRegistration, Artifact, JobDefinition, JobRequest, and many more, providing a structured approach to orchestration. Key functionalities of the Formicary API include:

4.1 Ant Registration

Management of ant registrations, enabling queries and retrievals by ID such as:

  • GET /api/ants: Queries ant registration.
  • GET /api/ants/{id}: Retrieves ant-registration by its id.

4.2 Artifacts

Handling of artifacts, allowing for querying, uploading, and managing artifact data, including downloading and log retrieval.

  • GET /api/artifacts: Queries artifacts by name, task-type, etc.
  • POST /api/artifacts: Uploads artifact data from the request body and returns metadata for the uploaded data.
  • DELETE /api/artifacts:/{id}: Deletes an artifact by its ID.
  • GET /api/artifacts:/{id}: Retrieves an artifact by its ID.

4.3 System Configs

Creation, deletion, updating, and querying of system configurations.

  • GET /api/configs: Retrieves system configs.
  • POST /api/configs: Creates new system config based on request body.
  • DELETE /api/configs/{id}: Deletes an existing system config based on id.
  • GET /api/configs/{id}: Finds an existing system config based on id.
  • PUT /api/configs/{id}: Updates an existing system config based on request body.

4.4 Error Codes

Management of error codes, including creation, updating, deletion, and querying by type or regex.

  • GET /api/errors: Queries error-codes by type, regex.
  • POST /api/errors: Creates new error code based on request body.
  • PUT /api/errors: Updates new error code based on request body.
  • DELETE /api/errors/{id}: Deletes error code by id.
  • GET /api/errors/{id}: Finds error code by id.

4.5 Container Execution

Querying of container executions and management of container executors.

  • GET /api/executors: Queries container executions.
  • GET /api/executors/{id}: Deletes container-executor by its id.

4.6 Job Definitions

Comprehensive job definition management, including creation, deletion, querying, concurrency updates, enabling/disabling, and graphical representation (DOT format).

  • GET /api/jobs/definitions: Queries job definitions by criteria such as type, platform, etc.
  • POST /api/jobs/definitions: Uploads job definitions using JSON or YAML body based on content-type header.
  • DELETE /api/jobs/definitions/{id}: Deletes the job-definition by id.
  • GET /api/jobs/definitions/{id}: Finds the job-definition by id.
  • PUT /api/jobs/definitions/{id}/concurrency: Updates the concurrency for job-definition by id to limit the maximum jobs that can be executed at the same time.
  • POST /api/jobs/definitions/{id}/disable: disables job-definition so that no new requests are executed while in-progress jobs are allowed to complete.
  • GET /api/jobs/definitions/{id}/dot: Returns Graphviz DOT definition for the graph of tasks defined in the job.
  • GET /api/jobs/definitions/{id}/stats: Returns Real-time statistics of jobs running.
  • POST /api/jobs/definitions/{id}/enable: Enables job-definition so that new requests can start processing.
  • GET /api/jobs/definitions/{type}/yaml: Finds job-definition by type and returns response YAML format.
  • GET /api/jobs/plugins: Queries job definitions by criteria such as type, platform, etc.

4.7 Job Configs

Configuration management for jobs, including adding, deleting, finding, and updating configs.

  • GET /api/jobs/definitions/{jobId}/configs: Queries job configs by criteria such as name, type, etc.
  • POST /api/jobs/definitions/{jobId}/configs: Adds a config for the job.
  • DELETE /api/jobs/definitions/{jobId}/configs/{id}: Deletes a config for the job by id.
  • GET /api/jobs/definitions/{jobId}/configs/{id}: Finds a config for the job by id.
  • PUT /api/jobs/definitions/{jobId}/configs/{id}: Updates a config for the job.

4.8 Job Requests

Handling of job requests, including submission, querying, cancellation, restart, triggering, and wait time calculations.

  • GET /api/jobs/requests: Queries job requests by criteria such as type, platform, etc.
  • POST /api/jobs/requests: Submits a job-request for processing, which is saved in the database and is then scheduled for execution.
  • GET /api/jobs/requests/{id}: Finds the job-request by id.
  • POST /api/jobs/requests/{id}/cancel: Cancels a job-request that is pending for execution or already executing.
  • GET /api/jobs/requests/{id}/dot: Returns Graphviz DOT request for the graph of tasks defined in the job request.
  • GET /api/jobs/requests/{id}/dot.png: Returns Graphviz DOT image for the graph of tasks defined in the job.
  • POST /api/jobs/requests/{id}/restart: Restarts a previously failed job so that it can re-execute.
  • POST /api/jobs/requests/{id}/trigger: Triggers a scheduled job.
  • GET /api/jobs/requests/{id}/wait_time: Returns wait time for the job-request.
  • GET /api/jobs/requests/dead_ids: Returns job-request ids for recently completed jobs.
  • GET /api/jobs/requests/stats: Returns statistics for the job-request such as success rate, latency, etc.

4.9 Job Resources

Management of job resources, including adding, finding, updating, and configuring resources.

  • GET /api/jobs/resources: Queries job resources by criteria such as type, platform, etc.
  • POST /api/jobs/resources: Adds a job-resource that can be used for managing internal or external constraints.
  • GET /api/jobs/resources/{id}: Finds the job-resource by id.
  • PUT /api/jobs/resources/{id}: Updates a job-resource that can be used for managing internal or external constraints.
  • POST /api/jobs/resources/{id}/configs: Saves the configuration of a job-resource.
  • DELETE /api/jobs/resources/{id}/configs/{configId}: Deletes the configuration of a job-resource.
  • POST /api/jobs/resources/{id}/disable: Disables the job-resource so that any jobs requiring it will not be able to execute.

4.10 Organizations

Organization management, including creation, deletion, updating, and querying.

  • GET /api/orgs: Queries organizations by criteria such as org-unit, bundle, etc.
  • POST /api/orgs: Creates new organization.
  • DELETE /api/orgs/{id}: Deletes the organization by its id.
  • GET /api/orgs/{id}: Finds the organization by its id.
  • PUT /api/orgs/{id}: Updates the organization profile.
  • POST /api/orgs/{id}/invite: Invites a user to an organization.
  • POST /api/orgs/usage_report: Generates usage report for the organization.

4.11 Organization Configs

Management of organization-specific configs.

  • GET /api/orgs/{orgId}/configs: Queries organization configs by criteria such as name, type, etc.
  • POST /api/orgs/{orgId}/configs: Adds a config for the organization.
  • DELETE /api/orgs/{orgId}/configs/{id}: Deletes a config for the organization by id.
  • GET /api/orgs/{orgId}/configs/{id}: Finds a config for the organization by id.
  • PUT /api/orgs/{orgId}/configs/{id}: Updates a config for the organization.

4.12 Subscriptions

Management of system subscriptions.

  • GET /api/subscriptions: Finds an existing system subscription based on id.
  • POST /api/subscriptions: Creates new system subscription based on request body.
  • DELETE /api/subscriptions/{id}: Deletes an existing system subscription based on id.
  • GET /api/subscriptions/{id}: Finds an existing system subscription based on id.
  • PUT /api/subscriptions/{id}: Updates an existing system subscription based on request body.

4.13 Users

User management within an organization, including creation, deletion, and profile updates.

  • GET /api/users: Queries users within the organization that is allowed.
  • POST /api/users: Creates new user.
  • DELETE /api/users/{id}: Deletes the user profile by its id.
  • GET /api/users/{id}: Finds user profile by its id.
  • PUT /api/users/{id}: Updates user profile.
  • PUT /api/users/{id}/notify: Updates user notification.

4.14 User Tokens

Management of user tokens for API access.

  • GET /api/users/{userId}/tokens: Queries user-tokens for the API access.
  • POST /api/users/{userId}/tokens: Creates new user-token for the API access.
  • DELETE /api/users/{userId}/tokens/{id}: Deletes user-token by its id so that it cannot be used for the API access.

The API documentation is accessible at a URL and includes a Swagger YAML file for detailed API specifications. The API emphasizes support and extensibility, offering various endpoints for managing jobs, configurations, resources, and user interactions within the orchestration platform.

5. Getting Started

5.1 Declarative Job & Task Declaration

In the Formicary system, a job is structured as a declarative directed acyclic graph (DAG). This setup dictates the execution sequence, where the transition from one task to another is contingent upon the exit code or status of the preceding node or task.

5.1.1 Job Configuration

A job is described as a series of tasks arranged in a directed acyclic graph (DAG), with each task detailing the required environment, commands/APIs, and configuration parameters necessary for execution. The definition of a job encompasses the following attributes:

  • job_type: defines a short unique name of the job and as Formicary is a multi-tenant system, it’s only needs to be unique within an organization. For example:
    job_type: my_test_job
  • job_variables: defines variables for job context that are available for all tasks, e.g.
    job_variables:
    OSVersion: 10.1
    Architecture: ARM64
  • description: is an optional property to specify details about the job, e.g.,
    description: A test job for building a node application.
  • max_concurrency: defines max number of jobs that can be run concurrently, e.g.
    max_concurrency: 5
  • required_params: specifies list of parameter names that must be defined when submitting a job request, e.g.,
    required_params:
    Name
    Age
  • cron_trigger: uses cron syntax to schedule the job at regular intervals, for example, the following job is set to run every minute:
    cron_trigger: 0 * * * * * *
  • skip_if: allows a job to skip execution based on a conditional logic using GO template, e.g. following condition will skip processing if git branch name is not main:
    skip_if: {{if ne .GitBranch "main"}} true {{end}}
  • retry: A job may be configured to attempt retries a certain number of times. For example,
    retry: 3
  • delay_between_retries: specifies the pause duration between each attempt. For instance, following setting specifies delay of 10 seconds between each retry:
    delay_between_retries: 10s
  • hard_reset_after_retries: When a job fails, only the failed tasks are executed. However, you can use hard_reset_after_retries so that all tasks are executed due to persisted failure, e.g.:
    hard_reset_after_retries: 3
  • timeout: defines the maximum time that a job can take for the execution and if the job takes longer, then it’s aborted, e.g.,
    timeout: 5m
  • public_plugin: indicates the job is a public plugin so it can be shared by any other user in the system, e.g.,
    public_plugin: true
  • sem_version: specifies a semantic version of the public plugin, e.g.,
    sem_version: 1.2.5
  • tasks: defines an array of task definitions. The order of tasks is not important as formicary creates a graph based on dependencies between the tasks for execution.

5.1.2 Task Configuration

A task serves as a work segment carried out by an ant worker, encompassing the following attributes within its definition:

  • task_type: defines type or name of the task, e.g.:
    - task_type: lint-task
  • description: specifies details about the task, e.g.:
    description: This task verifies code quality with the lint tool.
  • method: defines executor to use for the task such as
    • DOCKER
    • KUBERNETES
    • SHELL
    • HTTP_GET
    • HTTP_POST_FORM
    • HTTP_POST_JSON
    • HTTP_PUT_FORM
    • HTTP_PUT_JSON
    • HTTP_DELETE
    • WEBSOCKET
    • MESSAGING
    • FORK_JOB
    • AWAIT_FORKED_JOB
    • EXPIRE_ARTIFACTS
  • on_completed: defines next task to run if task completes successfully, e.g.,:
    on_completed: build
  • on_failed: defines the next task to run if task fails, e.g.,:
    on_failed: cleanup
  • on_exit: is used to run the next task based on exit-code returned by the task, e.g.,
    on_exit_code:
    101: cleanup
    COMPLETED: deploy
  • environment: defines environment variables that will be available for commands that are executed, e.g.:
    environment:
    AWS_REGION: us-east-1
  • variables: define context property that can be used for scripts as template parameters or pass to the executors, e.g.,
    variables:
    max-string-len: 50
    service-name: myservice
  • after_script: is used to list commands that are executed after the main script regardless the main script succeeds or fails, e.g.:
    after_script:
    - echo cleaning up
  • before_script: is used to list commands that are executed before the main script, e.g.:
    before_script:
    - git clone https://{{.GithubToken}}@github.com/bhatti/go-cicd.git .
    - go mod vendor
  • timeout: defines the maximum time that a task can take for the execution otherwise it’s aborted, e.g.,:
    timeout: 5m
  • cache: allows caching for directories that store 3rd party dependencies, e.g., following example shows caching of pip dependencies so that they are not downloaded upon each execution:
    cache:
    key: cache-key
    paths:
    - .cache/pip
    - venv
  • retry: defines maximum attempts to execute a task if it fails, e.g.,:
    retry: 3
  • delay_between_retries: defines delay between a task retries, e.g.,:
    delay_between_retries: 10s
  • url: specifies URL to invoke for HTTP executor, e.g.,:
    method: HTTP_GET
    url: https://jsonplaceholder.typicode.com/todos/1
  • artifacts: defines list of files or directories that are persisted in the artifacts store and are available for dependent tasks or user download, e.g.,
    artifacts:
    paths:
    - output.json
  • except: is used to skip task execution based on certain condition, e.g.:
    except: {{if ne .GitBranch "main" }} true {{end}}
  • allow_failure: defines the task is optional and can fail without failing entire job, e.g.,
    allow_failure: true
  • allow_start_if_completed: Upon retry, only failed tasks are re-executed, but you can mark certain tasks to rerun previously completed task, e.g.
    allow_start_if_completed: true
  • always_run: A task can be marked as always_run so that they are run even when the job fails for cleanup purpose, e.g.,
    always_run: true
  • tags: is used to route the task to a specific ant worker that supports given tags, e.g.,:
    tags:
    - Mac
  • webhook: configures a job to call an external webhook API upon completion of a job or task, e.g.:
    webhook:
    url: {{.WebhookURL}}
    headers:
    Authorization: Bearer {{.WebhookAuth}}
    query:
    task_key: task_value
  • services: allows starting sidecar container(s) with the given image, e.g.,:
    services:
    – name: redis
    image: redis:6.2.2-alpine
    ports:
    – number: 6379
  • notify: configure job to receive email or slack notifications when a job completes successfully or with failure, e.g., (Note: when parameter can take always, onSuccess, onFailure or never values):
    notify:
    email:
    recipients:
    – myemail@mydomain.cc
    when: always
  • fork_job_type / await_forked_tasks: allows spawning other related jobs or marketplace plugins from a job, which are run concurrently, e.g.:
    - task_type: fork-task
    method: FORK_JOB
    fork_job_type: child-job
    on_completed: fork-wait
    - task_type: fork-wait
    method: AWAIT_FORKED_JOB
    await_forked_tasks:
    - fork-task
  • EXPIRE_ARTIFACTS: method can be used to expire old artifacts, e.g.:
    - task_type: expire
    method: EXPIRE_ARTIFACTS
  • MESSAGING: implements a customized executor by subscribing to the messaging queue, e.g.,
    - task_type: trigger
    method: MESSAGING
    messaging_request_queue: formicary-message-ant-request
    messaging_reply_queue: formicary-message-ant-response

5.1.3 GO Templates

The job and task definition supports GO templates, and you can use variables that are passed by job-request or task definitions, e.g.

- task_type: extract
  method: DOCKER
  container:
    image: python:3.8-buster
  script:
    - python -c 'import yfinance as yf;import json;stock = yf.Ticker("{{.Symbol}}");j = json.dumps(stock.info);print(j);' > stock.json

In addition, you can also use if/then conditions with templates, e.g.

- task_type: task1
  method: DOCKER
  container:
    image: alpine
  script:
    { { if .IsWindowsPlatform } }
    - ipconfig /all
    { { else } }
    - ifconfig -a
    { { end } }

5.2 Installation and Startup of Core Components

5.2.1 Prerequisites

5.2.2 Launching Server

Here is an example docker-compose file designed to launch the queen-server, database server, messaging server, and object-store:

version: '3.7'
services:
  redis:
    image: "redis:alpine"
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data
  minio:
    image: minio/minio:RELEASE.2024-02-09T21-25-16Z
    volumes:
      - minio-data:/data
    ports:
      - "9000:9000"
      - "9001:9001"
    environment:
      MINIO_ROOT_USER: admin
      MINIO_ROOT_PASSWORD: password
    command: server /data --console-address ":9001"
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
      interval: 30s
      timeout: 20s
      retries: 3
  mysql:
    image: "mysql:8"
    command: --default-authentication-plugin=mysql_native_password
    restart: always
    ports:
      - "3306:3306"
    environment:
      MYSQL_ALLOW_EMPTY_PASSWORD: "yes"
      DB_NAME: ${DB_NAME:-formicary_db}
      DB_USER: ${DB_USER:-formicary_user}
      DB_PASSWORD: ${DB_PASSWORD:-formicary_pass}
      DB_ROOT_USER: ${DB_ROOT_USER:-root}
      DB_ROOT_PASSWORD: ${DB_ROOT_PASSWORD:-rootroot}
      MYSQL_USER: ${DB_USER}
      MYSQL_PASSWORD: ${DB_PASSWORD}
      MYSQL_DATABASE: ${DB_NAME}
      MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD:-rootroot}
    healthcheck:
      test: ["CMD", "mysqladmin" ,"ping", "-h", "localhost"]
      timeout: 20s
      retries: 10
    volumes:
      - mysql-data:/var/lib/mysql
  formicary-server:
    build:
      context: .
      dockerfile: Dockerfile
    depends_on:
      - redis
      - mysql
      - minio
    environment:
      COMMON_DEBUG: '${DEBUG:-false}'
      COMMON_REDIS_HOST: 'redis'
      COMMON_REDIS_PORT: '${COMMON_REDIS_PORT:-6379}'
      COMMON_S3_ENDPOINT: 'minio:9000'
      COMMON_S3_ACCESS_KEY_ID: 'admin'
      COMMON_S3_SECRET_ACCESS_KEY: 'password'
      COMMON_S3_REGION: '${AWS_DEFAULT_REGION:-us-west-2}'
      COMMON_S3_BUCKET: '${BUCKET:-formicary-artifacts}'
      COMMON_S3_PREFIX: '${PREFIX:-formicary}'
      COMMON_AUTH_GITHUB_CLIENT_ID: '${COMMON_AUTH_GITHUB_CLIENT_ID}'
      COMMON_AUTH_GITHUB_CLIENT_SECRET: '${COMMON_AUTH_GITHUB_CLIENT_SECRET}'
      COMMON_AUTH_GOOGLE_CLIENT_ID: '${COMMON_AUTH_GOOGLE_CLIENT_ID}'
      COMMON_AUTH_GOOGLE_CLIENT_SECRET: '${COMMON_AUTH_GOOGLE_CLIENT_SECRET}'
      CONFIG_FILE: ${CONFIG_FILE:-/config/formicary-queen.yaml}
      COMMON_HTTP_PORT: ${HTTP_PORT:-7777}
      DB_USER: ${DB_USER:-formicary_user}
      DB_PASSWORD: ${DB_PASSWORD:-formicary_pass}
      DB_HOST: 'mysql'
      DB_TYPE: "mysql"
      DB_DATA_SOURCE: "${DB_USER:-formicary_user}:${DB_PASSWORD:-formicary_pass}@tcp(mysql:3306)/${DB_NAME:-formicary_db}?charset=utf8mb4&parseTime=true&loc=Local"
    ports:
      - 7777:7777
    volumes:
      - ./config:/config
    entrypoint: ["/bin/sh", "-c", "/migrations/mysql_setup_db.sh migrate-only && exec /formicary --config=/config/formicary-queen.yaml --id=formicary-server-id1"]
volumes:
  minio-data:
  redis-data:
  mysql-data:
  mysql-initdb:

You can then define the server configuration file as follows:

id: queen-server-id
subscription_quota_enabled: false
common:
  messaging_provider: REDIS_MESSAGING
  external_base_url: https://public-website
  auth:
    enabled: false
    secure: true
    jwt_secret: secret-key

Note: The configuration above supports OAuth 2.0 based authentication and allows enabling of the allocation of computing resource quotas per user. Furthermore, it supports setting up notifications through email and Slack.

You can then launch the server as follows:

docker-compose up

Once, the Formicary system starts up, you can use dashboard UI or API for managing jobs at the specified host and port.

5.2.3 Launching Ant Worker(s)

Here is an example docker-compose file designed to launch the ant-worker:

version: '3.7'
services:
  formicary-ant:
    network_mode: "host"
    build:
      context: .
      dockerfile: Dockerfile
    environment:
      COMMON_DEBUG: '${DEBUG:-false}'
      COMMON_REDIS_HOST: '${QUEEN_SERVER:-192.168.1.102}'
      COMMON_REDIS_PORT: '${COMMON_REDIS_PORT:-6379}'
      COMMON_S3_ENDPOINT: '${QUEEN_SERVER:-192.168.1.102}:9000'
      COMMON_S3_ACCESS_KEY_ID: 'admin'
      COMMON_S3_SECRET_ACCESS_KEY: 'password'
      COMMON_S3_REGION: '${AWS_DEFAULT_REGION:-us-west-2}'
      COMMON_S3_BUCKET: '${BUCKET:-formicary-artifacts}'
      COMMON_S3_PREFIX: '${PREFIX:-formicary}'
      COMMON_HTTP_PORT: ${HTTP_PORT:-5555}
      CONFIG_FILE: ${CONFIG_FILE:-/config/formicary-ant.yaml}
    volumes:
      - ./config:/config
      - ./.kube:/home/formicary-user/.kube
    entrypoint: ["/bin/sh", "-c", "/formicary ant --config=/config/formicary-ant.yaml --id=formicary-ant-id1 --tags \"builder pulsar redis kotlin aws-lambda\""]

Above config shares config for the redis and minio so that ant workers can access queen server and store artifacts directly in the object-store. Here is a sample configuration for the ant worker:

methods:
  - DOCKER
  - KUBERNETES
  - HTTP_GET
  - HTTP_POST_FORM
  - HTTP_POST_JSON
  - HTTP_PUT_FORM
  - HTTP_PUT_JSON
  - HTTP_DELETE
docker:
  host: "tcp://192.168.1.102:2375"
common:
  id: ant-worker-id
  public_dir: "/public/"
  messaging_provider: REDIS_MESSAGING
  external_base_url: https://public-website
kubernetes:
  allow_privilege_escalation: true
  pod_security_context:
    run_as_user: 0

Above docker-compose file mounts a kubernetes config file that you can generate using microk8s.config such as:

apiVersion: v1
clusters:
- cluster:
    certificate-authority-data: LS..
    server: https://192.168.1.120:16443
  name: microk8s-cluster
contexts:
- context:
    cluster: microk8s-cluster
    user: admin
  name: microk8s
current-context: microk8s
kind: Config
preferences: {}
users:
- name: admin
  user:
    token: V..

Above kubernetes configuration assumes that you are running your kubernetes cluster at 192.168.1.120 and you can change it accordingly. You can then launch the worker as follows:

docker-compose -f ant-docker-compose.yaml up

6. Usage with Examples

The Formicary system can be utilized for running batch jobs, orchestrating workflows, or managing CI/CD pipelines. In this system, the execution parameters are detailed in a job configuration file. Each job outlines a Directed Acyclic Graph (DAG) that represents the sequence in which tasks should be executed, essentially mapping out the workflow or the execution trajectory for the tasks. Each task is defined as a discrete unit of work, capable of being executed through various executors such as DOCKER, KUBERNETES, HTTP, WEBSOCKET, SHELL, MESSAGING, among others.

6.1 Workload & Pipeline Processing

A pipeline encapsulates various stages of data processing, adaptable to numerous scenarios such as CI/CD pipelines in software development, ETL processes for data import/export, or other types of batch processing. The formicary facilitates these pipelines through the concept of tasks and jobs, where a task represents a single unit of work, and a job is a collection of tasks organized in a DAG. Tasks within a job are executed in sequence, with the execution order of subsequent tasks contingent on the exit status of the preceding task.

The following example illustrates a job definition for a straightforward pipeline where a video pipeline first validates input, downloads a mock video and then mock encodes it:

job_type: video-encoding
description: Simple example of video encoding
max_concurrency: 1
tasks:
- task_type: validate
  script:
    - echo request must have URL {{.URL}}, InputEncoding {{.InputEncoding}} and OutputEncoding {{.OutputEncoding}}
  container:
    image: alpine
  on_completed: download
- task_type: download
  container:
    image: python:3.8-buster
  script:
    - curl -o video_file.{{.InputEncoding}} {{.URL}}
  artifacts:
    paths:
      - video_file.{{.InputEncoding}}
  on_completed: encode
- task_type: encode
  container:
    image: alpine
  script:
    - ls -l
    - mv video_file.{{.InputEncoding}} video_file.{{.OutputEncoding}}
  dependencies:
    - download
  artifacts:
    paths:
      - video_file.{{.OutputEncoding}}

6.1.1 Uploading Job Definition

You can upload above pipeline job using API as follows:

curl -v -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/yaml" \
    --data-binary @video-encoding.yaml $SERVER/api/jobs/definitions

6.1.2 Submitting Job Request

You can then submit the job request as follows:

curl -v -H "Authorization: Bearer $TOKEN" \
    -H "Content-Type: application/json" \
    --data '{"job_type": "video-encoding", "params": {"InputEncoding": "MP4", "OutputEncoding": "WebM", "URL": "https://github.com"}}' \
    $SERVER/api/jobs/requests

The above example kicks off video-encoding job and passes URL, InputEncoding, and OutputEncoding as parameters. You can then view status of the job using dashboard UI, e.g.,

video pipeline

Above UI also allows users to download artifacts generated by various tasks.

6.2 Parallel Workload & Pipeline Processing

You can then enhance workload & pipelines by adding multiple jobs that originate from the parent job and run concurrently in the background. For instance, a video-encoding job can achieve parallel video encoding by initiating multiple jobs as illustrated below:

job_type: parallel-video-encoding
description: Parallel example of video encoding
max_concurrency: 1
tasks:
- task_type: validate
  script:
    - echo request must have URL {{.URL}}, InputEncoding {{.InputEncoding}} and OutputEncoding {{.OutputEncoding}}
  container:
    image: alpine
  on_completed: download
- task_type: download
  container:
    image: python:3.8-buster
  script:
    - curl -o video_file.{{.InputEncoding}} {{.URL}}
  artifacts:
    paths:
      - video_file.{{.InputEncoding}}
  on_completed: split
- task_type: split
  container:
    image: alpine
  script:
    - ls -l
    - cp video_file.{{.InputEncoding}} video_file.{{.InputEncoding}}.1
    - cp video_file.{{.InputEncoding}} video_file.{{.InputEncoding}}.2
    - cp video_file.{{.InputEncoding}} video_file.{{.InputEncoding}}.3
  dependencies:
    - download
  artifacts:
    paths:
      - video_file.{{.InputEncoding}}.1
      - video_file.{{.InputEncoding}}.2
  on_completed: fork-encode1
- task_type: fork-encode1
  method: FORK_JOB
  fork_job_type: video-encoding
  variables:
    URL: {{.split_ArtifactURL_1}}
    InputEncoding: {{.InputEncoding}}
    OutputEncoding: {{.OutputEncoding}}
  on_completed: fork-encode2
- task_type: fork-encode2
  method: FORK_JOB
  fork_job_type: video-encoding
  variables:
    URL: {{.split_ArtifactURL_2}}
    InputEncoding: {{.InputEncoding}}
    OutputEncoding: {{.OutputEncoding}}
  on_completed: fork-await
- task_type: fork-await
  method: AWAIT_FORKED_JOB
  on_completed: combine
  await_forked_tasks:
    - fork-encode1
    - fork-encode2
- task_type: combine
  container:
    image: alpine
  script:
    - ls -l
    - cat video_file.{{.InputEncoding}}* >  video_file.{{.OutputEncoding}}
  dependencies:
    - fork-await
  artifacts:
    paths:
      - video_file.{{.OutputEncoding}}

Above definition defines validate and download tasks as before but split task splits video file into smaller video files that can be encoded in parallel. It then defines fork-encode1 and fork-encode2 tasks to fork child video-encoding job that was defined earlier and then wait for their completion in fork-await task. Finally, it combines output files into a single file. Following graph diagram shows the execution flow:

parallel video encoding

6.2.1 Fork Task

The task method with value of FORK_JOB spawns a child job where fork_job_type defines type of the job and variables define the input parameters to the job.

6.2.2 Waiting for completion of child jobs

The task method with value of AWAIT_FORKED_JOB waits for completion of child jobs where await_forked_tasks defines list of jobs to wait.

6.2.3 Uploading Job Definition

You can upload above pipeline job using API as follows:

curl -v -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/yaml" \
    --data-binary @parallel-video-encoding.yaml $SERVER/api/jobs/definitions

6.2.4 Submitting Job Request

You can then submit the job request as follows:

curl -v -H "Authorization: Bearer $TOKEN" \
    -H "Content-Type: application/json" \
    --data '{"job_type": "parallel-video-encoding", "params": {"InputEncoding": "MP4", "OutputEncoding": "WebM", "URL": "https://github.com"}}' \
    $SERVER/api/jobs/requests

The above example kicks off parallel-video-encoding job and passes URL, InputEncoding, and OutputEncoding as parameters.

6.3 CI/CD Pipelines

To implement CI/CD, you can create a job configuration and upload it to the server. The various stages of the build process, such as compilation, testing, and deployment, are represented by tasks within this job configuration. Organizations can use continuous integration to regularly integrating code changes into a shared repository where each integration is automatically built and tested, facilitating early detection of bugs and integration issues. Further continuous delivery and deployment automates the release of software to production, moving away from manual approvals and deployments. Here is a list of major features for supporting CI/CD pipelines in the Formicary system:

  • Artifacts management for any packages and binaries produced during the CI/CD pipeline or software deployment.
  • Job Parameters and Variables: Refer to the documentation on Variables and Request Parameters to set up job configuration variables and request parameters.
  • Environment Variables: Consult the section on Environment Variables to configure and access environment variables within your container.
  • Job / Organization Configs: For secure configurations at the job and organization level, see the Job / Organization Configs section.
  • Access Tokens for Source Code Repositories: Instructions for accessing source code repositories can be found in the relevant documentation.
  • Starting Jobs Manually: For manual job initiation, see the guide on Scheduling Jobs Manually. Jobs can be submitted as outlined there.
  • Scheduling Jobs for the Future: To schedule a job for a future time, refer to the Job Scheduling documentation.
  • Regular Interval Job Scheduling: For setting up jobs to run at regular intervals, see the Periodic Job Scheduling section.
  • GitHub Webhooks: Utilize GitHub webhooks for job scheduling as described in the GitHub-Webhooks documentation.
  • Post-Commit Hooks: For job scheduling using git post-commit hooks, consult the Post-commit hooks section.
  • Skipping Job Requests: To skip scheduled jobs, refer to the Job Skip documentation.

Following is an example of CI/CD pipeline for a simple GO project:

job_type: go-build-ci
max_concurrency: 1
# only run on main branch
skip_if: {{if ne .GitBranch "main"}} true {{end}}
tasks:
- task_type: build
  method: DOCKER
  working_dir: /sample
  container:
    image: golang:1.16-buster
  before_script:
    - git clone https://{{.GithubToken}}@github.com/bhatti/go-cicd.git .
    - git checkout -t origin/{{.GitBranch}} || git checkout {{.GitBranch}}
    - go mod download
    - go mod vendor
  script:
    - echo branch {{.GitBranch}}, Commit {{.GitCommitID}}
    - make build
  after_script:
    - ls -l
  cache:
    key_paths:
      - go.mod
    paths:
      - vendor
  on_completed: test
- task_type: test
  method: DOCKER
  container:
    image: golang:1.16-buster
  working_dir: /sample
  environment:
    GO111MODULE: on
    CGO_ENABLED: 0
  before_script:
    - git clone https://{{.GithubToken}}@github.com/bhatti/go-cicd.git .
    - git checkout -t origin/{{.GitBranch}} || git checkout {{.GitBranch}}
  script:
    - make test-coverage
  after_script:
    - ls -l
  dependencies:
    - build
  on_completed: deploy
- task_type: deploy
  method: DOCKER
  container:
    image: golang:1.16-buster
  working_dir: /sample
  before_script:
    - git clone https://{{.GithubToken}}@github.com/bhatti/go-cicd.git .
    - git checkout -t origin/{{.GitBranch}} || git checkout {{.GitBranch}}
  script:
    - make build
  dependencies:
    - test

Above job configuration demonstrates how different stages of build, test and deployment process can be defined with artifacts and vendor-cachings support.

6.3.1 Uploading Job Definition

You can upload above pipeline job using API as follows:

curl -v -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/yaml" \
    --data-binary @go-build-ci.yaml $SERVER/api/jobs/definitions

6.3.2 Secure storage of access tokens

You can store access tokens for your git repository securely with encryption as follows:

curl -v -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/yaml" \
  $SERVER/api/jobs/definitions/<job-id>/configs -d '{"Name": "GithubToken", "Value": "<mytoken>", "Secret": true}'

6.3.3 Submitting Job Request

You can then submit the job request as follows:

curl -v -H "Authorization: Bearer $TOKEN" \
   -H "Content-Type: application/json" \
   --data '{"job_type": "go-build-ci", "params": { "GitCommitID": "$COMMIT", "GitBranch": "$BRANCH", "GitCommitMessage": "$COMMIT_MESSAGE" } }' \
   $SERVER/api/jobs/requests

The above example kicks off go-build-ci job and passes GitCommitID, GitBranch, and GitMessage as parameters.

6.3.4 Github-Webhooks

See Github-Webhooks for scheduling above job with GitHub webhooks support.

6.3.5 PostCommit Hooks

See Post-commit hooks for scheduling above job using Git post-commit hooks.

6.4 ETL Jobs

Here is a sample ETL (Extract, Transform, Load) job that retrieves stock data, processes and extracts pricing information, and conducts calculations on the obtained data:

job_type: etl-stock-job
description: Simple ETL Stock pipeline example
max_concurrency: 1
tasks:
- task_type: extract
  method: KUBERNETES
  container:
    image: python:3.8-buster
  before_script:
    - pip install yfinance --upgrade --no-cache-dir
  script:
    - python -c 'import yfinance as yf;import json;stock = yf.Ticker("{{.Symbol}}");j = json.dumps(stock.info);print(j);' > stock.json
  artifacts:
    paths:
      - stock.json
  on_completed: transform
- task_type: transform
  method: KUBERNETES
  tags:
  - builder
  container:
    image: alpine
  dependencies:
    - extract
  before_script:
    - apk --update add jq && rm -rf /var/lib/apt/lists/* && rm /var/cache/apk/*
  script:
    - cat stock.json | jq '.ask,.bid' > askbid.txt
  artifacts:
    paths:
      - askbid.txt
  on_completed: load
- task_type: load
  method: KUBERNETES
  tags:
  - builder
  dependencies:
    - transform
  script:
    - awk '{ sum += $1; n++ } END { if (n > 0) print sum / n; }' askbid.txt > avg.txt
  after_script:
    - ls -l
  container:
    image: alpine
  artifacts:
    paths:
      - avg.txt

6.4.1 Uploading Job Definition

You can upload above ETL job using API as follows:

curl -v -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/yaml" \
    --data-binary @etl-stock-job.yaml $SERVER/api/jobs/definitions

6.4.2 Submitting Job Request

You can then submit the job request as follows:

curl -v -H "Authorization: Bearer $TOKEN" \
    -H "Content-Type: application/json" \
    --data '{"job_type": "etl-stock-job", "params": {"Symbol": "AAPL"}}' \
    $SERVER/api/jobs/requests

The above example kicks off etl-stock-job and passes Symbol as a parameter. You can then download results after the job execution.

6.5 Scanning Containers

6.5.1 Trivy

Trivy is a simple and comprehensive vulnerability/misconfiguration scanner for containers and other artifacts. Following example shows scanning a docker in docker (dind) using Trivy:

job_type: trivy-scan-job
description: vulnerability/misconfiguration scanner for containers using Trivy
url: https://aquasecurity.github.io/trivy/v0.19.0/
max_concurrency: 1
job_variables:
  CI_COMMIT_SHA: db65c90a07e753e71db5143c877940f4c11a33e1
tasks:
  - task_type: scan
    working_dir: /trivy-ci-test
    variables:
      DOCKER_HOST: tcp://localhost:2375
      DOCKER_TLS_CERTDIR: ""
      IMAGE: trivy-ci-test:{{.CI_COMMIT_SHA}}
    container:
      image: docker:20.10-git
    privileged: true
    services:
      - name: docker-dind
        alias: docker
        image: docker:20.10-dind
        entrypoint: [ "env", "-u", "DOCKER_HOST" ]
        command: [ "dockerd-entrypoint.sh" ]
    allow_failure: true
    before_script:
      - echo image $IMAGE
      - git clone https://github.com/aquasecurity/trivy-ci-test.git .
      - wget -qO - "https://api.github.com/repos/aquasecurity/trivy/releases/latest" | grep '"tag_name":' | sed -E 's/.*"v([^"]+)".*/\1/'
      - export TRIVY_VERSION=$(wget -qO - "https://api.github.com/repos/aquasecurity/trivy/releases/latest" | grep '"tag_name":' | sed -E 's/.*"v([^"]+)".*/\1/')
      - echo $TRIVY_VERSION
      - apk add --update-cache --upgrade curl
      - curl -sfL https://raw.githubusercontent.com/aquasecurity/trivy/main/contrib/install.sh | sh -s -- -b /usr/local/bin
      - mkdir -p /root/.docker/
      - curl -o /root/.docker/ca.pem https://gist.githubusercontent.com/bhatti/8a37691361c09afbef751cb168715867/raw/118f47230adec566cef72661e66370cf95ba1be8/ca.pem
    script:
      # Build image
      - docker build -t $IMAGE .
      - curl -o tmpl.tpl https://raw.githubusercontent.com/aquasecurity/trivy/main/contrib/gitlab-codequality.tpl
      # Build report
      - trivy --exit-code 0 --cache-dir .trivycache/ --no-progress --format template --template "tmpl.tpl" -o gl-container-scanning-report.json $IMAGE
      # Print report
      - trivy --exit-code 0 --cache-dir .trivycache/ --no-progress --severity HIGH $IMAGE
      # Fail on severe vulnerabilities
      - trivy --exit-code 1 --cache-dir .trivycache/ --severity CRITICAL --no-progress $IMAGE
    cache:
      paths:
        - .trivycache/
    artifacts:
      paths:
        - gl-container-scanning-report.json

6.6 Other Advanced Usage

Following examples illustrate advanced configuration of job orchestration (See Job and Task Definition Options for more details):

6.6.1 Kubernetes Jobs with Volumes and Services

Following example illustrates usage of Kubernetes Volume, Mounts and Services:

job_type: kube-example1
description: Simple Kubernetes example with volume mounts, secrets and ports
max_concurrency: 1
tasks:
- task_type: k8-task
  tags:
  - builder
  pod_labels:
    foor: bar
  script:
    - ls -l /myshared
    - ls -l /myempty
    - sleep 30
  method: KUBERNETES
  host_network: false
  services:
    - name: redis
      image: redis:6.2.2-alpine
      ports:
        - number: 6379
  container:
    image: ubuntu:16.04
    volumes:
      host_path:
        - name: mount1
          mount_path: /myshared
          host_path: /shared
      empty_dir:
        - name: mount2
          mount_path: /myempty
      projected:
        - name: oidc-token
          mount_path: /var/run/sigstore/cosign
          sources:
            - service_account_token:
              path: oidc-token
              expiration_seconds: 600
              audience: sigstore

You can use kubctl describe pod <podname> to verify labels, volumes or services such as:

 Labels:       AntID=formicary-ant-id1
               foor=bar
    ...
 Volumes:
   mount1:
     Type:          HostPath (bare host directory volume)
     Path:          /shared
    ... 
 Mounts:
   /myshared from mount1 (rw)     
    ...
 Containers:
   svc-redis:
     Container ID:
     Image:          redis:6.2.2-alpine
    ...

6.6.2 Kubernetes Jobs with Resources

Here is a sample job definition that specifies resources for Kubernetes:

job_type: kube-example2
description: Simple Kubernetes example with resources
max_concurrency: 1
tasks:
- task_type: kubby
  tags:
  - builder
  pod_labels:
    foor: bar
  script:
    - echo hello world
    - ls -l
    - sleep 21
  method: KUBERNETES
  container:
    image: ubuntu:16.04
    cpu_limit: "1"
    cpu_request: 500m
    memory_limit: 2Gi
    memory_request: 1Gi
    ephemeral_storage_limit: 2Gi
    ephemeral_storage_request: 1Gi

6.6.3 Scheduling a job in Future

You can submit a job at scheduled time by adding scheduled_at parameter as follows:

curl -v -H "Authorization:
  Bearer $TOKEN" -H "Content-Type:
    application/json" --data '{"job_type": "my-job", "scheduled_at": "2028-06-15T00:00:00.0-00:00", "params": { "Target": "bob" } }' \
    $SERVER/api/jobs/requests

6.6.4 Periodic Jobs

You can configure a job to execute periodically using a cron syntax as follows:

 job_type: cron-kube-build
 cron_trigger: 0 * * * * * *
 tasks:
...

6.6.5 Retries with Backoff

You can set up a job to automatically retry a failed task or the entire job up to a specified number of maximum attempts, incorporating a delay between each retry attempt as described below:

job_type: test-job
retry: 3
delay_between_retries: 10s
...

6.6.6 Timeout

You can configure a job to timeout if it does not complete with the allowed duration as shown below:

job_type: test-job
timeout: 5m
...

6.6.7 On-Exit Task

Besides determining the subsequent task through on_completed or on_failed, you can employ on_exit to initiate the next task according to the exit code produced by the task. This exit code, distinct from the task’s status, is generated by the command specified in the script. It’s important to note that on_exit assigns specific exit codes for COMPLETED and FAILED, allowing you to consolidate all exit conditions in a single location, for example:

job_type: taco-job
tasks:
- task_type: allocate
  container:
    image: alpine
  script:
    - echo allocating
  on_completed: check-date
- task_type: check-date
  container:
    image: alpine
  script:
    - echo monday && exit {{.ExitCode}}
  on_exit_code:
    1: monday
    2: tuesday
    3: friday 
  on_completed: deallocate
- task_type: monday
  container:
    image: alpine
  script:
    - echo monday
  on_completed: deallocate
- task_type: tuesday
  container:
    image: alpine
  script:
    - echo tuesday
  on_completed: taco-tuesday
- task_type: taco-tuesday
  container:
    image: alpine
  script:
    - echo taco tuesday
  on_completed: deallocate
- task_type: friday
  container:
    image: alpine
  script:
    - echo friday
  on_completed: party
- task_type: party
  container:
    image: alpine
  script:
    - echo tgif party
  on_completed: deallocate
- task_type: deallocate
  container:
    image: alpine
  always_run: true
  script:
    - echo deallocating

6.6.8 Building Docker images using Docker in Docker (dind)

Following example shows using docker in docker (dind) using TLS to build images:

job_type: dind-tls-job
max_concurrency: 1
tasks:
- task_type: build
  working_dir: /sample
  variables:
    DOCKER_HOST: tcp://localhost:2376
    DOCKER_TLS_VERIFY: 1
    DOCKER_TLS: 1
    DOCKER_TLS_CERTDIR: "/mycerts"
  container:
    image: docker:20.10-git
    volumes:
      empty_dir:
        - name: certs
          mount_path: /mycerts/client
  privileged: true
  services:
    - name: docker-dind
      alias: docker
      image: docker:20.10-dind
      entrypoint: ["env", "-u", "DOCKER_HOST"]
      command: ["dockerd-entrypoint.sh"]
      volumes:
        empty_dir:
          - name: certs
            mount_path: /mycerts/client
  before_script:
    - git clone https://github.com/aquasecurity/trivy-ci-test.git .
    - mkdir -p /root/.docker/ && cp /mycerts/client/* /root/.docker
    - apk --no-cache add ca-certificates
    - docker info
  script:
    # Build image
    - docker build -t my-image .

6.6.9 Sensor or Polling Job

Following example shows how exit_codes with EXECUTING state can be used for polling tasks:

job_type: sensor-job
tasks:
- task_type: first
  method: HTTP_GET
  environment:
    OLD_ENV_VAR: ijk
  allow_failure: true
  timeout: 15s
  delay_between_retries: 5s
  script:
    {{ if lt .JobElapsedSecs 3 }}
    - https://jsonplaceholder.typicode.com/blaaaaahtodos/1
    {{ else }}
    - https://jsonplaceholder.typicode.com/todos/1
    {{ end }}
  on_completed: second
  on_exit_code:
    404: EXECUTING
- task_type: second
  container:
    image: alpine
  script:
      - echo nonce {{.Nonce}}
      - exit {{ Random 0 5 }}
  on_exit_code:
    1: FAILED
    2: RESTART_JOB
    3: COMPLETED
    4: ERR_BLAH
    5: RESTART_TASK
  on_completed: third
- task_type: third
  container:
    image: alpine
  environment:
    OLD_ENV_VAR: ijk
  script:
    - date > date.txt
    - env NEW_ENV_VAR=xyz
    - echo variable value is $NEW_ENV_VAR
  artifacts:
    paths:
      - date.txt

6.6.10 Formicary Plugins

A plugin represents a publicly accessible job definition that other jobs can utilize. It can encompass a range of functions, from security assessments to data evaluation and beyond. While any member within an organization can share a job definition, a public plugin enables the creation of a job that’s accessible to all users. Organizations can publish a plugin by creating a job definition that starts with the organization’s bundle prefix and includes a semantic version number, like 1.0 or 1.2.1, indicating its version. Here is an example plugin:

job_type: io.formicary.stock-plugin
description: Simple Plugin example
public_plugin: true
sem_version: 1.0-dev
max_concurrency: 1
tasks:
  - task_type: extract
    method: KUBERNETES
    container:
      image: python:3.8-buster
    before_script:
      - pip install yfinance --upgrade --no-cache-dir
    script:
      - python -c 'import yfinance as yf;import json;stock = yf.Ticker("{{.Symbol}}");j = json.dumps(stock.info);print(j);' > stock.json
    artifacts:
      paths:
        - stock.json
    on_completed: transform
  - task_type: transform
    method: KUBERNETES
    tags:
      - builder
    container:
      image: alpine
    dependencies:
      - extract
    before_script:
      - apk --update add jq && rm -rf /var/lib/apt/lists/* && rm /var/cache/apk/*
    script:
      - jq '.ask,.bid' > askbid.txt
    artifacts:
      paths:
        - askbid.txt
    on_completed: load
  - task_type: load
    method: KUBERNETES
    tags:
      - builder
    dependencies:
      - transform
    script:
      - awk '{ sum += $1; n++ } END { if (n > 0) print sum / n; }' askbid.txt > avg.txt
    after_script:
      - ls -l
    container:
      image: alpine
    artifacts:
      paths:
        - avg.txt
Spawning Plugin Job

The formicary allows spawning other plugins from a job using FORK_JOB method, e.g.

 - task_type: spawn-plugin
   method: FORK_JOB
   fork_job_type: io.formicary.stock-plugin
   fork_job_version: 1.0-dev
   variables:
     Symbol: {{.Symbol}}
   on_completed: wait-plugin

7. Formicary Executors

In Formicary, an executor abstracts the runtime environment needed for executing a task. The type of executor is specified through the use of a method. Formicary natively supports the following executor methods:

7.1 Shell Executor

The shell executor forks a shell process from the ant worker to run commands specified in the script section. This executor doesn’t need extra configuration; however, assigning a unique user with appropriate permissions to the ant worker is advisable to ensure security.

7.2 REST APIs

The REST API Executor triggers external HTTP APIs through actions such as GET, POST, PUT, or DELETE, for example:

job_type: http-job
tasks:
- task_type: get
  method: HTTP_GET
  url: https://jsonplaceholder.typicode.com/todos/1
  on_completed: post
- task_type: post
  method: HTTP_POST_JSON
  url: https://jsonplaceholder.typicode.com/todos
  on_completed: put
- task_type: put
  method: HTTP_PUT_JSON
  url: https://jsonplaceholder.typicode.com/todos/1
  on_completed: delete
- task_type: delete
  method: HTTP_DELETE
  url: https://jsonplaceholder.typicode.com/todos/1

7.3 Websockets

The Websockets method enables browser-based or ant workers written in Python, Go, Java, and other languages to connect and carry out tasks, for example:

job_type: web-job
tasks:
- task_type: process
  method: WEBSOCKET
  tags:
    - web
    - js

The web or client uses websocket clients register with the server, e.g.

    const ws = new WebSocket(uri);
    ws.onopen = function () {
        const registration = {
            'ant_id': 'sample-web',
            'tags': ['js', 'web'],
            'methods': ['WEBSOCKET']
        }
        ws.send(JSON.stringify(registration));
    }

    ws.onmessage = function (evt) {
        const msg = JSON.parse(evt.data);
        // handle message
        msg.status = 'COMPLETED';
        ws.send(JSON.stringify(msg));
    }

7.4 Docker

The Docker executor initiates a primary container, named after the job or task, to run the specified script, along with a secondary ‘helper’ container, designated by a ‘-helper’ suffix, to handle artifact management, e.g.,

common:
  id: test-id
  messaging_provider: "REDIS_MESSAGING"
tags:
  - tag1
  - tag2
methods:
  - DOCKER
docker:
  registry:
    registry: docker-registry-server
    username: docker-registry-user
    password: docker-registry-pass
    pull_policy: if-not-present
  host: kubernetes-host

7.5 Kubernetes

The Kubernetes executor launches a primary container, which executes the script and is named according to the job or task, along with an auxiliary ‘helper’ container, identified by a ‘-helper’ suffix, for artifact management. Tasks can specify dependent services that are initiated with a ‘svc-‘ prefix, e.g.:

name: task1
method: KUBERNETES
environment:
  AWS-KEY: Mykey
container:
  image: ubuntu:16.04
  volumes:
    host_path:
      - name: mount1
        mount_path: /shared
        host_path: /host/shared
    pvc:
      - name: mount2
        mount_path: /mnt/sh1
    config_map:
      - name: mount3
        mount_path: /mnt/sh2
        items:
          item1: val1
    secret:
      - name: mount4
        mount_path: /mnt/sh3
        items:
          item1: val1
    empty_dir:
      - name: mount4
        mount_path: /mnt/sh3
    projected:
      - name: oidc-token
        mount_path: /var/run/sigstore/cosign
        sources:
          - service_account_token:
            path: oidc-token
            expiration_seconds: 600
            audience: sigstore
  volume_driver: voldriver
  devices:
    - devices
  bind_dir: /shared
  cpu_limit: "1"
  cpu_request: 500m
  memory_limit: 1Gi
  memory_request: 1Gi
services:
  - name: svc-name
    image: ubuntu:16.04
    command:
      - cmd1
    entrypoint:
      - /bin/bash
    volumes:
      host_path:
        - name: svc-mount1
          mount_path: /shared
          host_path: /host/shared
          read_only: false
      pvc:
        - name: svc-mount2
          mount_path: /mnt/sh1
          read_only: true
      config_map:
        - name: svc-mount3
          mount_path: /mnt/sh2
          read_only: true
          items:
            item1: val1
      secret:
        - name: svc-mount4
          mount_path: /mnt/sh3
          items:
            mysecret: file-name
      empty_dir:
        - name: svc-mount5
          mount_path: /mnt/sh3
      projected:
        - name: oidc-token
          mount_path: /var/run/sigstore/cosign
          sources:
            - service_account_token:
              path: oidc-token
              expiration_seconds: 600
              audience: sigstore
    cpu_limit: "1"
    cpu_request: 500m
    memory_limit: 1Gi
    memory_request: 1Gi
privileged: true
affinity:
  required_during_scheduling_ignored_during_execution:
    node_selector_terms:
      - match_expressions:
          - key: datacenter
            operator: In
            values:
              - seattle
        match_fields:
          - key: key2
            operator: In
            values:
              - val2
  preferred_during_scheduling_ignored_during_execution:
    - weight: 1
      preference:
        match_expressions:
          - key: datacenter
            operator: In
            values:
              - chicago
        match_fields:
          - key: color
            operator: In
            values:
              - blue
node_selector:
  formicary: "true"
node_tolerations:
  empty: PreferNoSchedule
  myrole: NoSchedule
pod_labels:
  foo: bar
pod_annotations:
  ann1: val
network_mode: mod1
host_network: true

7.6 Messaging or Customized Executor

You can create a custom executor by subscribing to a messaging queue. For instance, below is an example of a messaging executor:

// MessagingHandler structure
type MessagingHandler struct {
	id            string
	requestTopic  string
	responseTopic string
	queueClient   queue.Client
}

// NewMessagingHandler constructor
func NewMessagingHandler(
	id string,
	requestTopic string,
	responseTopic string,
	queueClient queue.Client,
) *MessagingHandler {
	return &MessagingHandler{
		id:            id,
		requestTopic:  requestTopic,
		responseTopic: responseTopic,
		queueClient:   queueClient,
	}
}

func (h *MessagingHandler) Start(
	ctx context.Context,
) (err error) {
	return h.queueClient.Subscribe(
		ctx,
		h.requestTopic,
		h.id,
		make(map[string]string),
		true, // shared subscription
		func(ctx context.Context, event *queue.MessageEvent) error {
			defer event.Ack()
			err = h.execute(ctx, event.Payload)
			if err != nil {
				logrus.WithFields(logrus.Fields{
					"Component": "MessagingHandler",
					"Payload":   string(event.Payload),
					"Target":    h.id,
					"Error":     err}).Error("failed to execute")
				return err
			}
			return nil
		},
	)
}

// Stop stops subscription
func (h *MessagingHandler) Stop(
	ctx context.Context,
) (err error) {
	return h.queueClient.UnSubscribe(
		ctx,
		h.requestTopic,
		h.id,
	)
}

// execute incoming request
func (h *MessagingHandler) execute(
	ctx context.Context,
	reqPayload []byte) (err error) {
	var req *types.TaskRequest
    req, err = types.UnmarshalTaskRequest(h.antCfg.EncryptionKey, reqPayload)
	if err != nil {
		return err
	}
	resp := types.NewTaskResponse(req)

	// Implement business logic below
	epoch := time.Now().Unix()
	if epoch%2 == 0 {
		resp.Status = types.COMPLETED
	} else {
		resp.ErrorCode = "ERR_MESSAGING_WORKER"
		resp.ErrorMessage = "mock error for messaging client"
		resp.Status = types.FAILED
	}
	resp.AddContext("epoch", epoch)

	// Send back reply
    resPayload, err := resp.Marshal(h.antCfg.EncryptionKey)
	if err != nil {
		return err
	}
	_, err = h.queueClient.Send(
		ctx,
		h.responseTopic,
		make(map[string]string),
		resPayload,
		false)
	return err
}

8. Summary

The Formicary is specially designed for executing background tasks, jobs, DAGs, and workflows in a complex, scalable environment. Each task represents a discrete unit of work that can be executed using a variety of protocols such as Docker, Kubernetes, Shell, HTTP, and Messaging. Its architecture employs a Leader-Follower model with a queen-leader that schedules tasks and ant-workers that execute them. It supports advanced patterns like Pipes and Filters, SEDA, and Fork/Join, enabling tasks to be interconnected, parallelized, and aggregated. Formicary distinguishes itself from other similar frameworks by supporting declarative job definitions, extensible execution, AuthN/AuthZ, artifacts management, quota management, caching and encryption, and advanced scheduling options including cron-based scheduling, task retries, and job prioritization, offer significant control over job execution. The resource management capabilities especially the dynamic allocation and monitoring of resources help to optimize task execution without overloading the system

Formicary‘s architecture supports a wide range of use cases from complex workflow orchestration across various environments to specific tasks like image processing pipelines, automated build/test/release workflows, scheduled data ETL jobs, and machine learning pipelines. Formicary supports advanced error handling and retry capabilities, essential for long-running and complex tasks. Its ability to handle complex, conditional workflows and scale according to demand makes it suitable for organizations with intricate processing requirements.

October 18, 2022

Mocking and Fuzz Testing Distributed Micro Services with Record/Play, Templates and OpenAPI Specifications

Filed under: GO,REST,Technology — admin @ 11:36 am

Building large distributed systems often requires integrating with multiple distributed micro-services that makes development a particularly difficult as it’s not always easy to deploy and test all dependent services in a local environment with constrained resources. In addition, you might be working on a large system with multiple teams where you may have received new API specs from another team but the API changes are not available yet. Though, you can use mocking frameworks based on API specs when writing a unit tests but integration or functional testing requires an access to the network service. A common solution that I have used in past projects is to configure a mock service that can simulate different API operations. I wrote a JVM based mock-service many years ago with following use-cases:

Use-Cases

  • As a service owner, I need to mock remote dependent service(s) by capturing/recording request/responses through an HTTP proxy so that I can play it back when testing the remote service(s) without connecting with them.
  • As a service owner, I need to mock remote dependent service(s) based on a open-api/swagger specifications so that my service client can test all service behavior per specifications for the remote service(s) even when remote service is not fully implemented or accessible.
  • As a service owner, I need to mock remote dependent service(s) based on a mock scenario defined in a template so that my service client can test service behavior per expected request/response in the template even when remote service is not fully implemented or accessible.
  • As a service owner, I need to inject various response behavior and faults to the output of a remote service so that I can build a robust client that prevents cascading failures and is more resilient to unexpected faults.
  • As a service owner, I need to define test cases with faulty or fuzz responses to test my own service so that I can predict how it will behave with various input data and assert the service response based on expected behavior.

Features

API mock service for REST/HTTP based services with following features:

  • Record API request/response by working as a HTTP proxy server (native http/https or via API) between client and remote service.
  • Playback API response that were previously recorded based on request parameters.
  • Define API behavior manually by specifying request parameters and response contents using static data or dynamic data based on GO templating language.
  • Generate API behavior from open standards such as Open API/Swagger and automatically create constraints and regex based on the specification.
  • Customize API behavior using a GO template language so that users can generate dynamic contents based on input parameters or other configuration.
  • Generate large responses using the template language with dynamic loops so that you can test performance of your system.
  • Define multiple test scenarios for the API based on different input parameters or simulating various error cases that are difficult to reproduce with real services.
  • Store API request/responses locally as files so that it’s easy to port stubbed request/responses to any machine.
  • Allow users to define API request/response with various formats such as XML/JSON/YAML and upload them to the mock service.
  • Support test fixtures that can be uploaded to the mock service and can be used to generate mock responses.
  • Define a collection of helper methods to generate different kind of random data such as UDID, dates, URI, Regex, text and numeric data.
  • Ability to playback all test scenarios or a specific scenario and change API behavior dynamically with different input parameters.
  • Support multiple mock scenarios for the same API that can be selected either using round-robin order, custom predicates based on parameters or based on scenario name.
  • Inject error conditions and artificial delays so that you can test how your system handles error conditions that are difficult to reproduce or use for game days/chaos testing.
  • Generate client requests for a remote API for chaos and stochastic testing where a set of requests are sent with a dynamic data generated based on regex or other constraints.

I used this service in many past projects, however I felt it needed a bit fresh approach to meet above goals so I rewrote it in GO language, which has a robust support for writing network services. You can download the new version from https://github.com/bhatti/api-mock-service. As, it’s written in GO, you can either download GO runtime environment or use Docker to install it locally. If you haven’t installed docker, you can download the community version from https://docs.docker.com/engine/installation/ or find installer for your OS on https://docs.docker.com/get-docker/.

docker build -t api-mock-service .
docker run -p 8000:8080 -p 8081:8081 -e HTTP_PORT=8080 PROXY_PORT=8081 \
	-e DATA_DIR=/tmp/mocks -e ASSET_DIR=/tmp/assets api-mock-service

or pull an image from docker hub (https://hub.docker.com/r/plexobject/api-mock-service), e.g.

docker pull plexobject/api-mock-service:latest
docker run -p 8000:8080 -p 8081:8081 -e HTTP_PORT=8080 PROXY_PORT=8081 -e DATA_DIR=/tmp/mocks \
	-e ASSET_DIR=/tmp/assets plexobject/api-mock-service:latest

Alternatively, you can run it locally with GO environment, e.g.,

make && ./out/bin/api-mock-service

For full command line options, execute api-mock-service -h that will show you command line options such as:

./out/bin/api-mock-service -h
Starts mock service

Usage:
  api-mock-service [flags]
  api-mock-service [command]

Available Commands:
  chaos       Executes chaos client
  completion  Generate the autocompletion script for the specified shell
  help        Help about any command
  version     Version will output the current build information

Flags:
      --assetDir string   asset dir to store static assets/fixtures
      --config string     config file
      --dataDir string    data dir to store mock scenarios
  -h, --help              help for api-mock-service
      --httpPort int      HTTP port to listen
      --proxyPort int     Proxy port to listen

Recording a Mock Scenario via HTTP/HTTPS Proxy

Once you have the API mock service running, the mock service will start two ports on startup, first port (default 8080) will be used to record/play mock scenarios, updating templates or uploading OpenAPIs. The second port (default 8081) will setup an HTTP/HTTPS proxy server that you can point to record your scenarios, e.g.

export http_proxy="http://localhost:8081"
export https_proxy="http://localhost:8081"

curl -k -v -H "Authorization: Bearer sk_test_xxxx" \
	https://api.stripe.com/v1/customers/cus_xxx/cash_balance

Above curl command will automatically record all requests and responses and create mock scenario to play it back. For example, if you call the same API again, it will return a local response instead of contacting the server. You can customize the proxy behavior for record by adding X-Mock-Record: true header to your request.

Recording a Mock Scenario via API

Alternatively, you can use invoke an internal API as a pass through to invoke a remote API so that you can automatically record API behavior and play it back later, e.g.

% curl -H "X-Mock-Url: https://api.stripe.com/v1/customers/cus_**/cash_balance" \
	-H "Authorization: Bearer sk_test_***" http://localhost:8080/_proxy

In above example, the curl command is passing the URL of real service as an HTTP header Mock-Url. In addition, you can pass other authorization headers as needed.

Viewing the Recorded Mock Scenario

The API mock-service will store the request/response in a YAML file under a data directory that you can specify. For example, you may see a file under:

default_mocks_data/v1/customers/cus_***/cash_balance/GET/recorded-scenario-***.scr

Note: the sensitive authentication or customer keys are masked in above example but you will see following contents in the captured data file:

method: GET
name: recorded-v1-customers-cus
path: /v1/customers/cus_**/cash_balance
description: recorded at 2022-10-29 04:26:17.24776 +0000 UTC
request:
     match_query_params: {}
     match_headers: {}
     match_content_type: ""
     match_contents: ""
     example_path_params: {}
     example_query_params: {}
     example_headers:
         Accept: '*/*'
         Authorization: Bearer sk_test_xxx
         User-Agent: curl/7.65.2
         X-Mock-Url: https://api.stripe.com/v1/customers/cus_/cash_balance
     example_contents: ""
response:
    headers:
        Access-Control-Allow-Credentials:
            - "true"
        Access-Control-Allow-Methods:
            - GET, POST, HEAD, OPTIONS, DELETE
        Access-Control-Allow-Origin:
            - '*'
        Access-Control-Expose-Headers:
            - Request-Id, Stripe-Manage-Version, X-Stripe-External-Auth-Required, X-Stripe-Privileged-Session-Required
        Access-Control-Max-Age:
            - "300"
        Cache-Control:
            - no-cache, no-store
        Content-Length:
            - "168"
        Content-Type:
            - application/json
        Date:
            - Sat, 29 Oct 2022 04:26:17 GMT
        Request-Id:
            - req_lOP4bCsPIi5hQC
        Server:
            - nginx
        Strict-Transport-Security:
            - max-age=63072000; includeSubDomains; preload
        Stripe-Version:
            - "2018-09-06"
    content_type: application/json
    contents: |-
        {
          "object": "cash_balance",
          "available": null,
          "customer": "cus_",
          "livemode": false,
          "settings": {
            "reconciliation_mode": "automatic"
          }
        }
    contents_file: ""
    status_code: 200
wait_before_reply: 0s

Above example defines a mock scenario for testing /v1/customers/cus_**/cash_balance path. A test scenario includes:

Predicate

  • This is a boolean condition if you need to enable or disable a scenario test based on dynamic parameters or request count.

Group

  • This specifies the group for related test scenarios.

Request Matching Parameters:

The matching request parameters will be used to select the mock scenario to execute and you can use regular expressions to validate:

  • URL Query Parameters
  • URL Request Headers
  • Request Body

You can use these parameters so that test scenario is executed only when the parameters match, e.g.

    match_query_params:
      name: [a-z0-9]{1,50}
    match_headers:
      Content-Type: "application/json"

The matching request parameters will be used to select the mock scenario to execute and you can use regular expressions to validate, e.g. above example will be matched if content-type is application/json and it will validate that name query parameter is alphanumeric from 1-50 size.

Example Request Parameters:

The example request parameters show the contents captured from the record/play so that you can use and customize to define matching parameters:

  • URL Query Parameters
  • URL Request Headers
  • Request Body

Response Properties

The response properties will include:

  • Response Headers
  • Response Body statically defined or loaded from a test fixture
  • Response can also be loaded from a test fixture file
  • Status Code
  • Matching header and contents
  • Assertions You can copy recorded scenario to another folder and use templates to customize it and then upload it for playback.

The matching header and contents use match_headers and match_contents similar to request to validate response in case you want to test response from a real service for chaos testing. Similarly, assertions defines a set of predicates to test against response from a real service:

    assertions:
        - VariableContains contents.id 10
        - VariableContains contents.title illo
        - VariableContains headers.Pragma no-cache 

Above example will check API response and verify that id property contains 10, title contains illo and result headers include Pragma: no-cache header.

Playback the Mock API Scenario

You can playback the recorded response from above example as follows:

% curl http://localhost:8080/v1/customers/cus_***/cash_balance

Which will return captured response such as:

{
  "object": "cash_balance",
  "available": null,
  "customer": "cus_***",
  "livemode": false,
  "settings": {
    "reconciliation_mode": "automatic"
  }
}%

Though, you can customize your template with dynamic properties or conditional logic but you can also send HTTP headers for X-Mock-Response-Status to override HTTP status to return or X-Mock-Wait-Before-Reply to add artificial latency using duration syntax.

Debug Headers from Playback

The playback request will return mock-headers to indicate the selected mock scenario, path and request count, e.g.

X-Mock-Path: /v1/jobs/{jobId}/state
X-Mock-Request-Count: 13
X-Mock-Scenario: setDefaultState-bfb86eb288c9abf2988822938ef6d4aa3bd654a15e77158b89f17b9319d6f4e4

Upload Mock API Scenario

You can customize above recorded scenario, e.g. you can add path variables to above API as follows:

method: GET
name: stripe-cash-balance
path: /v1/customers/:customer/cash_balance
request:
    match_headers:
        Authorization: Bearer sk_test_[0-9a-fA-F]{10}$
response:
    headers:
        Access-Control-Allow-Credentials:
            - "true"
        Access-Control-Allow-Methods:
            - GET, POST, HEAD, OPTIONS, DELETE
        Access-Control-Allow-Origin:
            - '*'
        Access-Control-Expose-Headers:
            - Request-Id, Stripe-Manage-Version, X-Stripe-External-Auth-Required, X-Stripe-Privileged-Session-Required
        Access-Control-Max-Age:
            - "300"
        Cache-Control:
            - no-cache, no-store
        Content-Type:
            - application/json
        Request-Id:
            - req_2
        Server:
            - nginx
        Strict-Transport-Security:
            - max-age=63072000; includeSubDomains; preload
        Stripe-Version:
            - "2018-09-06"
    content_type: application/json
    contents: |-
        {
          "object": "cash_balance",
          "available": null,
          "customer": {{.customer}}
          "livemode": false,
          "page": {{.page}}
          "pageSize": {{.pageSize}}
          "settings": {
            "reconciliation_mode": "automatic"
          }
        }
    status_code: 200
wait_before_reply: 1s

In above example, I assigned a name stripe-cash-balance to the mock scenario and changed API path to /v1/customers/:customer/cash_balance so that it can capture customer-id as a path variable. I added a regular expression to ensure that the HTTP request includes an Authorization header matching Bearer sk_test_[0-9a-fA-F]{10}$ and defined dynamic properties such as {{.customer}}, {{.page}} and {{.pageSize}} so that they will be replaced at runtime.

The mock scenario uses builtin template syntax of GO. You can then upload it as follows:

curl -H "Content-Type: application/yaml" --data-binary @fixtures/stripe-customer.yaml \
	http://localhost:8080/_scenarios

and then play it back as follows:

curl -v -H "Authorization: Bearer sk_test_0123456789" \
	"http://localhost:8080/v1/customers/123/cash_balance?page=2&pageSize=55"

and it will generate:

 HTTP/1.1 200 OK
< Content-Type: application/json
< X-Mock-Request-Count: 1
< X-Mock-Scenario: stripe-cash-balance
< Request-Id: req_2
< Server: nginx
< Strict-Transport-Security: max-age=63072000; includeSubDomains; preload
< Stripe-Version: 2018-09-06
< Date: Sat, 29 Oct 2022 17:29:12 GMT
< Content-Length: 179
<
{
  "object": "cash_balance",
  "available": null,
  "customer": 123
  "livemode": false,
  "page": 2
  "pageSize": 55
  "settings": {
    "reconciliation_mode": "automatic"
  }

As you can see, the values of customer, page and pageSize are dynamically updated and the response header includes name of mock scenario with request counts. You can upload multiple mock scenarios for the same API and the mock API service will play it back sequentially. For example, you can upload another scenario for above API as follows:

method: GET
name: stripe-customer-failure
path: /v1/customers/:customer/cash_balance
request:
    match_headers:
        Authorization: Bearer sk_test_[0-9a-fA-F]{10}$
response:
    headers:
        Stripe-Version:
            - "2018-09-06"
    content_type: application/json
    contents: My custom error
    status_code: 500
wait_before_reply: 1s

And then play it back as before:

curl -v -H "Authorization: Bearer sk_test_0123456789" \
	"http://localhost:8080/v1/customers/123/cash_balance?page=2&pageSize=55"

which will return response with following error response

> GET /v1/customers/123/cash_balance?page=2&pageSize=55 HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.65.2
> Accept: */*
> Authorization: Bearer sk_test_0123456789
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 500 Internal Server Error
< Content-Type: application/json
< Mock-Request-Count: 1
< X-Mock-Scenario: stripe-customer-failure
< Stripe-Version: 2018-09-06
< Vary: Origin
< Date: Sat, 29 Oct 2022 17:29:15 GMT
< Content-Length: 15

Dynamic Templates with Mock API Scenarios

You can use loops and conditional primitives of template language and custom functions provided by the API mock library to generate dynamic responses as follows:

method: GET
name: get_devices
path: /devices
description: ""
request:
    match_content_type: "application/json; charset=utf-8"
response:
    headers:
        "Server":
            - "SampleAPI"
        "Connection":
            - "keep-alive"
    content_type: application/json
    contents: >
     {
     "Devices": [
{{- range $val := Iterate .pageSize }}
      {
        "Udid": "{{SeededUdid $val}}",
        "Line": { {{SeededFileLine "lines.txt" $val}}, "Type": "Public", "IsManaged": false },
        "Amount": {{JSONFileProperty "props.yaml" "amount"}},        
        "SerialNumber": "{{Udid}}",
        "MacAddress": "{{Udid}}",
        "Imei": "{{Udid}}",
        "AssetNumber": "{{RandString 20}}",
        "LocationGroupId": {
         "Id": {
           "Value": {{RandNumMax 1000}},
         },
         "Name": "{{SeededCity $val}}",
         "Udid": "{{Udid}}"
        },
        "DeviceFriendlyName": "Device for {{SeededName $val}}",
        "LastSeen": "{{Time}}",
        "Email": "{{RandEmail}}",
        "Phone": "{{RandPhone}}",        
        "EnrollmentStatus": {{SeededBool $val}}
        "ComplianceStatus": {{RandRegex "^AC[0-9a-fA-F]{32}$"}}
        "Group": {{RandCity}},
        "Date": {{TimeFormat "3:04PM"}},
        "BatteryLevel": "{{RandNumMax 100}}%",
        "StrEnum": {{EnumString "ONE TWO THREE"}},
        "IntEnum": {{EnumInt 10 20 30}},
        "ProcessorArchitecture": {{RandNumMax 1000}},
        "TotalPhysicalMemory": {{RandNumMax 1000000}},
        "VirtualMemory": {{RandNumMax 1000000}},
        "AvailablePhysicalMemory": {{RandNumMax 1000000}},
        "CompromisedStatus": {{RandBool}},
        "Add": {{Add 2 1}},
      }{{if LastIter $val $.PageSize}}{{else}},  {{end}}
{{ end }}
     ],
     "Page": {{.page}},
     "PageSize": {{.pageSize}},
     "Total": {{.pageSize}}
     }
    {{if NthRequest 10 }}
    status_code: {{EnumInt 500 501}}
    {{else}}
    status_code: {{EnumInt 200 400}}
    {{end}}
wait_before_reply: {{.page}}s

Above example includes a number of template primitives and custom functions to generate dynamic contents such as:

Loops

GO template support loops that can be used to generate multiple data entries in the response, e.g.

{{- range $val := Iterate .pageSize }}

Builtin functions

GO template supports custom functions that you can add to your templates. The mock service includes a number of helper functions to generate random data such as:

Add numbers

  "Num": "{{Add 1 2}}",

Date/Time

  "LastSeen": "{{Time}}",
  "Date": {{Date}},
  "DateFormatted": {{TimeFormat "3:04PM"}},
  "LastSeen": "{{Time}}",

Comparison

  {{if EQ .MyVariable 10 }}
  {{if GE .MyVariable 10 }}
  {{if GT .MyVariable 10 }}
  {{if LE .MyVariable 10 }}
  {{if LT .MyVariable 10 }}
  {{if Nth .MyVariable 10 }}

Enums

  "StrEnum": {{EnumString "ONE TWO THREE"}},
  "IntEnum": {{EnumInt 10 20 30}},

Random Data

  "SerialNumber": "{{Udid}}",
  "AssetNumber": "{{RandString 20}}",
  "LastSeen": "{{Time}}",
  "Host": "{{RandHost}}",
  "Email": "{{RandEmail}}",
  "Phone": "{{RandPhone}}",
  "URL": "{{RandURL}}",
  "EnrollmentStatus": {{SeededBool $val}}
  "ComplianceStatus": {{RandRegex "^AC[0-9a-fA-F]{32}$"}}
  "City": {{RandCity}},
  "Country": {{RandCountry}},
  "CountryCode": {{RandCountryCode}},
  "Completed": {{RandBool}},
  "Date": {{TimeFormat "3:04PM"}},
  "BatteryLevel": "{{RandNumMax 100}}%",
  "Object": "{{RandDict}}",
  "IntHistory": {{RandIntArrayMinMax 1 10}},
  "StringHistory": {{RandStringArrayMinMax 1 10}},
  "FirstName": "{{SeededName 1 10}}",
  "LastName": "{{RandName}}",
  "Score": "{{RandNumMinMax 1 100}}",
  "Paragraph": "{{RandParagraph 1 10}}",
  "Word": "{{RandWord 1 1}}",
  "Sentence": "{{RandSentence 1 10}}",
  "Colony": "{{RandString}}",

Request count and Conditional Logic

{{if NthRequest 10 }}   -- for every 10th request
{{if GERequest 10 }}    -- if number of requests made to API so far are >= 10
{{if LTRequest 10 }}    -- if number of requests made to API so far are < 10

The template syntax allows you to define a conditional logic such as:

    {{if NthRequest 10 }}
    status_code: {{AnyInt 500 501}}
    {{else}}
    status_code: {{AnyInt 200 400}}
    {{end}}

In above example, the mock API will return HTTP status 500 or 501 for every 10th request and 200 or 400 for other requests. You can use conditional syntax to simulate different error status or customize response.

Loops

  {{- range $val := Iterate 10}}

     {{if LastIter $val 10}}{{else}},{{end}}
  {{ end }}

Variables

     {{if VariableContains "contents" "blah"}}
     {{if VariableEquals "contents" "blah"}}
     {{if VariableSizeEQ "contents" "blah"}}
     {{if VariableSizeGE "contents" "blah"}}
     {{if VariableSizeLE "contents" "blah"}}

Test fixtures

The mock service allows you to upload a test fixture that you can refer in your template, e.g.

  "Line": { {{SeededFileLine "lines.txt" $val}}, "Type": "Public", "IsManaged": false },

Above example loads a random line from a lines.txt fixture. As you may need to generate a deterministic random data in some cases, you can use Seeded functions to generate predictable data so that the service returns same data. Following example will read a text fixture to load a property from a file:

  "Amount": {{JSONFileProperty "props.yaml" "amount"}},

This template file will generate content as follows:

{ "Devices": [
 {
   "Udid": "fe49b338-4593-43c9-b1e9-67581d000000",
   "Line": { "ApplicationName": "Chase", "Version": "3.80", "ApplicationIdentifier": "com.chase.sig.android", "Type": "Public", "IsManaged": false },
   "Amount": {"currency":"$","value":100},
   "SerialNumber": "47c2d7c3-c930-4194-b560-f7b89b33bc2a",
   "MacAddress": "1e015eac-68d2-42ee-9e8f-73fb80958019",
   "Imei": "5f8cae1b-c5e3-4234-a238-1c38d296f73a",
   "AssetNumber": "9z0CZSA03ZbUNiQw2aiF",
   "LocationGroupId": {
    "Id": {
      "Value": 980
    },
    "Name": "Houston",
    "Udid": "3bde6570-c0d4-488f-8407-10f35902cd99"
   },
   "DeviceFriendlyName": "Device for Alexander",
   "LastSeen": "2022-10-29T11:25:25-07:00",
   "Email": "john.smith@abc.com",
   "Phone": "1-408-454-1507",
   "EnrollmentStatus": true,
   "ComplianceStatus": "ACa3E07B0F2cA00d0fbFe88f5c6DbC6a9e",
   "Group": "Chicago",
   "Date": "11:25AM",
   "BatteryLevel": "43%",
   "StrEnum": "ONE",
   "IntEnum": 20,
   "ProcessorArchitecture": 243,
   "TotalPhysicalMemory": 320177,
   "VirtualMemory": 768345,
   "AvailablePhysicalMemory": 596326,
   "CompromisedStatus": false,
   "Add": 3
 },
...
 ], "Page": 2, "PageSize": 55, "Total": 55 }  

Artificial Delays

You can specify artificial delay for the API request as follows:

wait_before_reply: {{.page}}s

Above example shows delay based on page number but you can use any parameter to customize this behavior.

Conditional Logic

The template syntax allows you to define a conditional logic such as:

    {{if NthRequest 10 }}
    status_code: {{AnyInt 500 501}}
    {{else}}
    status_code: {{AnyInt 200 400}}
    {{end}}

In above example, the mock API will return HTTP status 500 or 501 for every 10th request and 200 or 400 for other requests. You can use conditional syntax to simulate different error status or customize response.

Test fixtures

The mock service allows you to upload a test fixture that you can refer in your template, e.g.

"Line": { {{SeededFileLine "lines.txt" $val}}, "Type": "Public", "IsManaged": false },

Above example loads a random line from a lines.txt fixture. As you may need to generate a deterministic random data in some cases, you can use Seeded functions to generate predictable data so that the service returns same data. Following example will read a text fixture to load a property from a file:

"Amount": {{JSONFileProperty "props.yaml" "amount"}},

This template file will generate content as follows:

{ "Devices": [
 {
   "Udid": "fe49b338-4593-43c9-b1e9-67581d000000",
   "Line": { "ApplicationName": "Chase", "Version": "3.80", "ApplicationIdentifier": "com.chase.sig.android", "Type": "Public", "IsManaged": false },
   "Amount": {"currency":"$","value":100},   
   "SerialNumber": "47c2d7c3-c930-4194-b560-f7b89b33bc2a",
   "MacAddress": "1e015eac-68d2-42ee-9e8f-73fb80958019",
   "Imei": "5f8cae1b-c5e3-4234-a238-1c38d296f73a",
   "AssetNumber": "9z0CZSA03ZbUNiQw2aiF",
   "LocationGroupId": {
    "Id": {
      "Value": 980,
    },
    "Name": "Houston",
    "Udid": "3bde6570-c0d4-488f-8407-10f35902cd99"
   },
   "DeviceFriendlyName": "Device for Alexander",
   "LastSeen": "2022-10-29T11:25:25-07:00",
   "Email": "anthony.christian@abblhfgkpd.edu",
   "Phone": "1-573-993-7542",   
   "EnrollmentStatus": true
   "ComplianceStatus": ACa3E07B0F2cA00d0fbFe88f5c6DbC6a9e
   "Group": Chicago,
   "Date": 11:25AM,
   "BatteryLevel": "43%",
   "StrEnum": ONE,
   "IntEnum": 20,
   "ProcessorArchitecture": 243,
   "TotalPhysicalMemory": 320177,
   "VirtualMemory": 768345,
   "AvailablePhysicalMemory": 596326,
   "CompromisedStatus": false,
   "Add": 3,
   "Dict": map[one:1 three:3 two:2]
 },
...
 ], "Page": 2, "PageSize": 55, "Total": 55 }   

Playing back a specific mock scenario

You can pass a header for X-Mock-Scenario to specify the name of scenario if you have multiple scenarios for the same API, e.g.

curl -v -H "X-Mock-Scenario: stripe-cash-balance" -H "Authorization: Bearer sk_test_0123456789" \
	"http://localhost:8080/v1/customers/123/cash_balance?page=2&pageSize=55"

You can also customize response status by overriding the request header with X-Mock-Response-Status and delay before return by overriding X-Mock-Wait-Before-Reply header.

Using Test Fixtures

You can define a test data in your test fixtures and then upload as follows:

curl -H "Content-Type: application/yaml" --data-binary @fixtures/lines.txt \
	http://localhost:8080/_fixtures/GET/lines.txt/devices

curl -v -H "Content-Type: application/yaml" --data-binary @fixtures/props.yaml \
    http://localhost:8080/_fixtures/GET/props.yaml/devices

In above example, test fixtures for lines.txt and props.yaml will be uploaded and will be available for all GET requests under /devices URL path. You can then refer to above fixture in your templates. You can also use this to serve any binary files, e.g. you can define an image template file as follows:

method: GET
name: test-image
path: /images/mock_image
description: ""
request:
response:
    headers:
      "Last-Modified":
        - {{Time}}
      "ETag":
        - {{RandString 10}}
      "Cache-Control":
        - max-age={{RandNumMinMax 1000 5000}}
    content_type: image/png
    contents_file: mockup.png
    status_code: 200

Then upload a binary image using:

curl -H "Content-Type: application/yaml" --data-binary @fixtures/mockup.png \
	http://localhost:8080/_fixtures/GET/mockup.png/images/mock_image

And then serve the image using:

curl -v "http://localhost:8080/images/mock_image"

Custom Functions

The API mock service defines following custom functions that can be used to generate test data:

Numeric Random Data

Following functions can be used to generate numeric data within a range or with a seed to always generate deterministic test data:

  • Random
  • SeededRandom
  • RandNumMinMax
  • RandIntArrayMinMax

Text Random Data

Following functions can be used to generate numeric data within a range or with a seed to always generate deterministic test data:

  • RandStringMinMax
  • RandStringArrayMinMax
  • RandRegex
  • RandEmail
  • RandPhone
  • RandDict
  • RandCity
  • RandName
  • RandParagraph
  • RandPhone
  • RandSentence
  • RandString
  • RandStringMinMax
  • RandWord

Email/Host/URL

  • RandURL
  • RandEmail
  • RandHost

Boolean

Following functions can be used to generate boolean data:

  • RandBool
  • SeededBool

UDID

Following functions can be used to generate UDIDs:

  • Udid
  • SeededUdid

String Enums

Following functions can be used to generate a string from a set of Enum values:

  • EnumString

Integer Enums

Following functions can be used to generate an integer from a set of Enum values:

  • EnumInt

Random Names

Following functions can be used to generate random names:

  • RandName
  • SeededName

City Names

Following functions can be used to generate random city names:

  • RandCity
  • SeededCity

Country Names or Codes

Following functions can be used to generate random country names or codes:

  • RandCountry
  • SeededCountry
  • RandCountryCode
  • SeededCountryCode

File Fixture

Following functions can be used to generate random data from a fixture file:

  • RandFileLine
  • SeededFileLine
  • FileProperty
  • JSONFileProperty
  • YAMLFileProperty

Generate Mock API Behavior from OpenAPI or Swagger Specifications

If you are using Open API or Swagger for API specifications, you can simply upload a YAML based API specification. For example, here is a sample Open API specification from Twilio:

openapi: 3.0.1
paths:
  /v1/AuthTokens/Promote:
    servers:
    - url: https://accounts.twilio.com
    description: Auth Token promotion
    x-twilio:
      defaultOutputProperties:
      - account_sid
      - auth_token
      - date_created
      pathType: instance
      mountName: auth_token_promotion
    post:
      description: Promote the secondary Auth Token to primary. After promoting the
        new token, all requests to Twilio using your old primary Auth Token will result
        in an error.
      responses:
        '200':
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/accounts.v1.auth_token_promotion'
          description: OK
      security:

...


   schemas:
     accounts.v1.auth_token_promotion:
       type: object
       properties:
         account_sid:
           type: string
           minLength: 34
           maxLength: 34
           pattern: ^AC[0-9a-fA-F]{32}$
           nullable: true
           description: The SID of the Account that the secondary Auth Token was created
             for
         auth_token:
           type: string
           nullable: true
           description: The promoted Auth Token
         date_created:
           type: string
           format: date-time
           nullable: true
           description: The ISO 8601 formatted date and time in UTC when the resource
             was created
         date_updated:
           type: string
           format: date-time
           nullable: true
           description: The ISO 8601 formatted date and time in UTC when the resource
             was last updated
         url:
           type: string
           format: uri
           nullable: true
           description: The URI for this resource, relative to `https://accounts.twilio.com`
...           

You can then upload the API specification as:

curl -H "Content-Type: application/yaml" --data-binary @fixtures/oapi/twilio_accounts_v1.yaml \
		http://localhost:8080/_oapi

It will generate a mock scenarios for each API based on mime-type, status-code, parameter formats, regex, data ranges, e.g.,

name: UpdateAuthTokenPromotion-xx
path: /v1/AuthTokens/Promote
description: Promote the secondary Auth Token to primary. After promoting the new token, all requests to Twilio using your old primary Auth Token will result in an error.
request:
    match_query_params: {}
    match_headers: {}
    match_content_type: ""
    match_contents: ""
    example_path_params: {}
    example_query_params: {}
    example_headers: {}
    example_contents: ""
response:
    headers: {}
    content_type: application/json
    contents: '{"account_sid":"{{RandRegex `^AC[0-9a-fA-F]{32}$`}}",\
    "auth_token":"{{RandStringMinMax 0 0}}","date_created":"{{Time}}",\
    "date_updated":"{{Time}}","url":"https://{{RandName}}.com"}'
    contents_file: ""
    status_code: 200
wait_before_reply: 0s

In above example, the account_sid uses regex to generate data and URI format to generate URL. Then invoke the mock API as:

curl -v -X POST http://localhost:8080/v1/AuthTokens/Promote

Which will generate dynamic response as follows:

{
  "account_sid": "ACF3A7ea7f5c90f6482CEcA77BED07Fb91",
  "auth_token": "PaC7rKdGER73rXNi6rVKZMN1Jw0QYxPFeEkqyvnM7Ojw2nziOER7SMWkIV6N2hXYTKxAfDMfS9t0",
  "date_created": "2022-10-29T11:54:46-07:00",
  "date_updated": "2022-10-29T11:54:46-07:00",
  "url": "https://Billy.com"
}

Listing all Mock Scenarios

You can list all available mock APIs using:

curl -v http://localhost:8080/_scenarios

Which will return summary of APIs such as:

{
  "/_scenarios/GET/FetchCredentialAws-8b2fcf02dfb7dc190fb735a469e1bbaa3ccb5fd1a24726976d110374b13403c6/v1/Credentials/AWS/{Sid}": {
    "method": "GET",
    "name": "FetchCredentialAws-8b2fcf02dfb7dc190fb735a469e1bbaa3ccb5fd1a24726976d110374b13403c6",
    "path": "/v1/Credentials/AWS/{Sid}",
    "match_query_params": {},
    "match_headers": {},
    "match_content_type": "",
    "match_contents": "",
    "LastUsageTime": 0,
    "RequestCount": 0
  },
  "/_scenarios/GET/FetchCredentialPublicKey-60a01dcea5290e6d429ce604c7acf5bd59606045fc32c0bc835e57ac2b1b8eb6/v1/Credentials/PublicKeys/{Sid}": {
    "method": "GET",
    "name": "FetchCredentialPublicKey-60a01dcea5290e6d429ce604c7acf5bd59606045fc32c0bc835e57ac2b1b8eb6",
    "path": "/v1/Credentials/PublicKeys/{Sid}",
    "match_query_params": {},
    "match_headers": {},
    "match_content_type": "",
    "match_contents": "",
    "LastUsageTime": 0,
    "RequestCount": 0
  },
  "/_scenarios/GET/ListCredentialAws-28717701f05de4374a09ec002066d308043e73e30f25fec2dcd4c3d3c001d300/v1/Credentials/AWS": {
    "method": "GET",
    "name": "ListCredentialAws-28717701f05de4374a09ec002066d308043e73e30f25fec2dcd4c3d3c001d300",
    "path": "/v1/Credentials/AWS",
    "match_query_params": {
      "PageSize": "\\d+"
    },
    "match_headers": {},
    "match_content_type": "",
    "match_contents": "",
    "LastUsageTime": 0,
    "RequestCount": 0
  },
...  

Chaos Testing

In addition to serving a mock service, you can also use a builtin chaos client to test remote services for stochastic testing by generating random data based on regex or API specifications. For example, you may capture a test scenario for a remote API using http proxy such as:

export http_proxy="http://localhost:8081"
export https_proxy="http://localhost:8081"
curl -k https://jsonplaceholder.typicode.com/todos

This will capture a mock scenario such as:

method: GET
name: recorded-todos-ff9a8e133347f7f05273f15394f722a9bcc68bb0e734af05ba3dd98a6f2248d1
path: /todos
description: recorded at 2022-12-12 02:23:42.845176 +0000 UTC for https://jsonplaceholder.typicode.com:443/todos
group: todos
predicate: ""
request:
    match_query_params: {}
    match_headers:
        Content-Type: ""
    match_contents: '{}'
    example_path_params: {}
    example_query_params: {}
    example_headers:
        Accept: '*/*'
        User-Agent: curl/7.65.2
    example_contents: ""
response:
    headers:
        Access-Control-Allow-Credentials:
            - "true"
        Age:
            - "19075"
        Alt-Svc:
            - h3=":443"; ma=86400, h3-29=":443"; ma=86400
        Cache-Control:
            - max-age=43200
        Cf-Cache-Status:
            - HIT
        Cf-Ray:
            - 7782ffe4bd6bc62c-SEA
        Connection:
            - keep-alive
        Content-Type:
            - application/json; charset=utf-8
        Date:
            - Mon, 12 Dec 2022 02:23:42 GMT
        Etag:
            - W/"5ef7-4Ad6/n39KWY9q6Ykm/ULNQ2F5IM"
        Expires:
            - "-1"
        Nel:
            - '{"success_fraction":0,"report_to":"cf-nel","max_age":604800}'
        Pragma:
            - no-cache
    contents: |-
      [
        {
          "userId": 1,
          "id": 1,
          "title": "delectus aut autem",
          "completed": false
        },
        {
          "userId": 1,
          "id": 2,
          "title": "quis ut nam facilis et officia qui",
          "completed": false
        },
      ...
        ]
    contents_file: ""
    status_code: 200
    match_headers: {}
    match_contents: '{"completed":"__string__.+","id":"(__number__[+-]?[0-9]{1,10})","title":"(__string__\\w+)","userId":"(__number__[+-]?[0-9]{1,10})"}'
    assertions: []

You can then customize this scenario with additional assertions and you may remove all response contents as they won’t be used. Note that above scenario is defined with group todos. You can then submit a request for chaos testing as follows:

curl -k -v -X POST http://localhost:8080/_chaos/todos -d '{"base_url": "https://jsonplaceholder.typicode.com", "execution_times": 10}'

Above request will submit 10 requests to the todo server with random data and return response such as:

{"errors":null,"failed":0,"succeeded":10}

If you have a local captured data, you can also run chaos client with a command line without running mock server, e.g.:

go run main.go chaos --base_url https://jsonplaceholder.typicode.com --group todos --times 10

Static Assets

The mock service can serve any static assets from a user-defined folder and then serve it as follows:

cp static-file default_assets

# execute the API mock server
make && ./out/bin/api-mock-service

# access assets
curl http://localhost:8080/_assets/default_assets

API Reference

The API specification for the mock library defines details for managing mock scenarios and customizing the mocking behavior.

Summary

Building and testing distributed systems often requires deploying a deep stack of dependent services, which makes development hard on a local environment with limited resources. Ideally, you should be able to deploy and test entire stack without using network or requiring a remote access so that you can spend more time on building features instead of configuring your local environment. Above examples show how you use the https://github.com/bhatti/api-mock-service to mock APIs for testing purpose and define test scenarios for simulating both happy and error cases as well as injecting faults or network delays in your testing processes so that you can test for fault tolerance. This mock library can be used to define the API mock behavior using record/play, template language or API specification standards. I have found a great use of tools like this when developing micro services and hopefully you find it useful. Feel free to connect with your feedback or suggestions.

December 26, 2020

Applying Structured Concurrency patterns in GO with case-study of Gitlab-runner

Filed under: Concurrency,GO — admin @ 9:31 pm

I recently wrote a series of blogs on structured concurrency (Part-I, Part-II, Part-III, Part-IV) and how it improves readability, concurrency-scope, composition and flow-control of concurrent code and adds better support for error-handling, cancellation, and timeout. I have been using GO on a number of projects over last few years and I will share a few concurrency patterns that I have used or seen in other projects such as gitlab-runner. I demonstrated in above series how structured concurrency considers GO statement harmful similar to GOTO in structured programming. Just as structured programming replaced GOTO with control-flow primitives such as single-entry/exit, if-then, loop, and functions calls; structured concurrency provides scope of concurrency where parent waits for all asynchronous code. I will show how common concurrency patterns in GO can take advantage of structured concurrency.

Asynchronous Tasks

The primary purpose of goroutines is to perform asynchronous tasks where you might be requesting a data from remote API or database, e.g. here is a sample code from gitlab-runner that uses goroutines to copy archive artifacts:

func (s *Client) Run(ctx context.Context, cmd Command) error {
...
	waitCh := make(chan error)
	go func() {
		err := session.Wait()
		if _, ok := err.(*ssh.ExitError); ok {
			err = &ExitError{Inner: err}
		}
		waitCh <- err
	}()

	select {
	case <-ctx.Done():
		_ = session.Signal(ssh.SIGKILL)
		_ = session.Close()
		return <-waitCh

	case err := <-waitCh:
		return err
	}
}

Here is a async/await based syntax based on async_await.go that performs similar task using structured concurrency:

func (s *Client) Run(ctx context.Context, cmd Command) error {
...
    ctx := context.Background()
    timeout := ..
    handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
		err := session.Wait()
		if _, ok := err.(*ssh.ExitError); ok {
			err = &ExitError{Inner: err}
		}
        return nil, err
    }
    abort := func(ctx context.Context, payload interface{}) (interface{}, error) {
		_ = session.Signal(ssh.SIGKILL)
		_ = session.Close()
    }
    async.Execute(ctx, handler, abort).Await(ctx, timeout)
}

Above code defines scope of concurrency and adds support for timeout while making the code easier to comprehend.

Note: due to lack of generics in GO, interface{} are used to accept different types that may be passed to asynchronous tasks.

Racing Asynchronous Tasks

A common use of goroutines is spawning multiple asynchronous tasks and takes result of first task that completes e.g. here is a sample code from gitlab-runner that uses goroutines to copy stdout and stderr:

	stdoutErrCh := make(chan error)
	go func() {
		_, errCopy := stdcopy.StdCopy(output, output, hijacked.Reader)
		stdoutErrCh <- errCopy
	}()

	// Write the input to the container and close its STDIN to get it to finish
	stdinErrCh := make(chan error)
	go func() {
		_, errCopy := io.Copy(hijacked.Conn, input)
		_ = hijacked.CloseWrite()
		if errCopy != nil {
			stdinErrCh <- errCopy
		}
	}()

	// Wait until either:
	// - the job is aborted/cancelled/deadline exceeded
	// - stdin has an error
	// - stdout returns an error or nil, indicating the stream has ended and
	//   the container has exited
	select {
	case <-ctx.Done():
		err = errors.New("aborted")
	case err = <-stdinErrCh:
	case err = <-stdoutErrCh:
	}

Above code creates stdoutErrCh channel to capture errors from stdout and stdinErrCh channel to capture errors from stderr and then waits for either to finish.

Here is equivalent code that uses structured concurrency with async/await primitives from async_racer.go:

    ctx := context.Background()
    timeout := ..
    pollin
    handler1 := func(ctx context.Context) (interface{}, error) {
        return nil, stdcopy.StdCopy(output, output, hijacked.Reader)
    }
    handler2 := func(ctx context.Context) (interface{}, error) {
		defer hijacked.CloseWrite()
        return nil, io.Copy(hijacked.Conn, input)
    }
    future, _ := async. ExecuteRacer(ctx, handler1, handler2)
     _, err := future.Await(ctx, timeout)

Above code uses async/await syntax to define scope of concurrency and clarifies intent of the business logic without distraction of concurrency logic.

Performing cleanup when task is aborted or cancelled

If a goroutine spawns an external process for background work, you may need to kill that process in case goroutine task is cancelled or times out. For example, here is a sample code from gitlab-runner that calls KillAndWait function to terminate external process when context.Done() is invoked:

   func (c *command) Run() error {
       err := c.cmd.Start()
       if err != nil {
           return fmt.Errorf("failed to start command: %w", err)
       }

       go c.waitForCommand()

       select {
       case err = <-c.waitCh:
           return err

       case <-c.context.Done():
           return newProcessKillWaiter(c.logger, c.gracefulKillTimeout, c.forceKillTimeout).
               KillAndWait(c.cmd, c.waitCh)
       }
   }

In above, Run method starts a command in goroutine, waits for completion in another goroutine and then listens to response from waitCh and context.Done channels.

Here is how async/await structure from async_await.go can apply structured concurrency to above code:

   func (c *command) Run() error {
       timeout := ...
       ctx := context.Background()
       handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
         return nil, c.cmd.Run()
       }
       abort := func(ctx context.Context, payload interface{}) (interface{}, error) {
         return nil, KillAndWait(c.cmd, c.waitCh)
       }
       Execute(ctx, handler, abort, nil).Await(ctx, timeout)
   }

Using GO Channels as data pipe/queue

GO channels are designed based on CSP rendezvous primitives where both sender and receiver have to wait to exchange messages. However, you can add buffering to make these channels as bounded queue (for back-pressure). Here is an example code from gitlab-runner that uses channels to stream log messages:

 func (l *kubernetesLogProcessor) scan(ctx context.Context, logs io.Reader) (*bufio.Scanner, <-chan string) {
     logsScanner := bufio.NewScanner(logs)

     linesCh := make(chan string)
     go func() {
         defer close(linesCh)

         // This goroutine will exit when the calling method closes the logs stream or the context is cancelled
         for logsScanner.Scan() {
             select {
             case <-ctx.Done():
                 return
             case linesCh <- logsScanner.Text():
             }
         }
     }()

     return logsScanner, linesCh
 }

Above code creates a channel linesCh without any buffer and then creates a goroutine where logs are read and sent to linesCh channel. As I mentioned, above code will block sender until the receiver is ready to receive these log messages and you may lose log messages if receiver is slow and goroutine is killed before logs can be read. Though, structured concurrency cannot help in this case but we can simply use an event-bus or a local message-queue to stream these logs.

WaitGroup to wait for completion of goroutines

GO language supports sync.WaitGroup to wait for completion of goroutines but it’s redundant if you are also using channels to receive for reply. For example, here is a sample code from gitlab-runner that uses WaitGroup to wait for completion of groutines:

 func (e *executor) waitForServices() {
...
     // wait for all services to came up
     if waitForServicesTimeout > 0 && len(e.services) > 0 {
         wg := sync.WaitGroup{}
         for _, service := range e.services {
             wg.Add(1)
             go func(service *types.Container) {
                 _ = e.waitForServiceContainer(service, time.Duration(waitForServicesTimeout)*time.Second)
                 wg.Done()
             }(service)
         }
         wg.Wait()
     }
 }

Here is how above code can be replaced by async/await syntax from my async_await.go:

 func (e *executor) waitForServices() {
...
     // wait for all services to came up
     if waitForServicesTimeout > 0 && len(e.services) > 0 {
        ctx := context.Background()
        timeout := time.Duration(waitForServicesTimeout)*time.Second
        futures := make([]async.Awaiter, len(e.services))
        handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
          return e.waitForServiceContainer(payload.(string)), nil
        }
        for i:=0; i<len(e.services); i++ {
          futures[i] = async.Execute(ctx, handler, async.NoAbort, e.services[i])
	    }
        async.AwaitAll(ctx, timeout, futures...)
     }
 }

Above code is not shorter than the original but it is more readable and eliminates subtle bugs where you might be using WaitGroup incorrectly, thus resulting in deadlock.

Fork-Join based Asynchronous Tasks

One common use of goroutines is to spawn multiple asynchronous tasks and wait for their completion similar to fork-join pattern, e.g. here is a sample code from gitlab-runner that uses goroutines to perform cleanup of multiple services and sends back result via a channel.

func (s *executor) cleanupServices() {
	ch := make(chan serviceDeleteResponse)
	var wg sync.WaitGroup
	wg.Add(len(s.services))

	for _, service := range s.services {
		go s.deleteKubernetesService(service.ObjectMeta.Name, ch, &wg)
	}

	go func() {
		wg.Wait()
		close(ch)
	}()

	for res := range ch {
		if res.err != nil {
			s.Errorln(fmt.Sprintf("Error cleaning up the pod service %q: %v", res.serviceName, res.err))
		}
	}
}

func (s *executor) deleteKubernetesService(serviceName string, ch chan<- serviceDeleteResponse, wg *sync.WaitGroup) {
	defer wg.Done()

	err := s.kubeClient.CoreV1().
		Services(s.configurationOverwrites.namespace).
		Delete(serviceName, &metav1.DeleteOptions{})
	ch <- serviceDeleteResponse{serviceName: serviceName, err: err}
}

The cleanupServices method above goes through a collection of services and then calls deleteKubernetesService in goroutine, which calls kubernetes API to remove given service. It waits for all background using WaitGroup and then receives any errors from error channel and logs them.

Here is how you can use async/await code from async_await.go that applies structured concurrency by abstracting low-level goroutines and channels:

func (s *executor) cleanupServices() {
    ctx := context.Background()
    timeout := time.Duration(5 * time.Second)
    futures := make([]async.Awaiter, len(s.services))
    handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
         serviceName := payload.(string)
         err := s.kubeClient.CoreV1().
      		Services(s.configurationOverwrites.namespace).
			Delete(serviceName, &metav1.DeleteOptions{})
         return nil, err
    }
    for i:=0; i<len(s.services); i++ {
        futures[i] = async.Execute(ctx, handler, async.NoAbort, s.services[i])
	}
    results := async.AwaitAll(ctx, timeout, futures...)
	for res := range results {
		if res.Err != nil {
			s.Errorln(fmt.Sprintf("Error cleaning up the pod service %q: %v", res.serviceName, res.Err))
		}
	}
}

Above code uses structured concurrency by defining scope of asynchronous code in cleanupServices and adds better support for cancellation, timeout and error handling. Also, you don’t need to use WaitGroup to wait for completion anymore.

Polling Asynchronous Task

In some cases, you may need to poll a background task to check its status or wait for its completion, e.g. here is a sample code from gitlab-runner that waits for pod until it’s running:

func waitForPodRunning(
	ctx context.Context,
	c *kubernetes.Clientset,
	pod *api.Pod,
	out io.Writer,
	config *common.KubernetesConfig,
) (api.PodPhase, error) {
	pollInterval := config.GetPollInterval()
	pollAttempts := config.GetPollAttempts()
	for i := 0; i <= pollAttempts; i++ {
		select {
		case r := <-triggerPodPhaseCheck(c, pod, out):
			if !r.done {
				time.Sleep(time.Duration(pollInterval) * time.Second)
				continue
			}
			return r.phase, r.err
		case <-ctx.Done():
			return api.PodUnknown, ctx.Err()
		}
	}
	return api.PodUnknown, errors.New("timed out waiting for pod to start")
}

func triggerPodPhaseCheck(c *kubernetes.Clientset, pod *api.Pod, out io.Writer) <-chan podPhaseResponse {
	errc := make(chan podPhaseResponse)
	go func() {
		defer close(errc)
		errc <- getPodPhase(c, pod, out)
	}()
	return errc
}

The waitForPodRunning method above repeatedly calls triggerPodPhaseCheck, which creates a goroutine and then invokes kubernetes API to get pod status. It then returns pod status in a channel that waitForPodRunning listens to.

Here is equivalent code using async/await from async_polling.go:

func waitForPodRunning(
	ctx context.Context,
	c *kubernetes.Clientset,
	pod *api.Pod,
	out io.Writer,
	config *common.KubernetesConfig,
) (api.PodPhase, error) {
    timeout := config.getPodRunningTimeout()
    pollInterval := config.GetPollInterval()
    handler := func(ctx context.Context, payload interface{}) (bool, interface{}, error) {
         r := getPodPhase(c, pod, out)
         return r.done, r.phase, r.err
    }
    _, phase, err := async.ExecutePolling(ctx, handler, NoAbort, 0, pollInterval).
  						Await(ctx, timeout)
	return r.phase, r.err
}

Above code removes complexity due to manually polling and managing goroutines/channels.

Background Task with watchdog

Another concurrency pattern in GO involves starting a background task but then launch another background process to monitor the task or its runtime environment so that it can terminate background task if watchdog finds any errors. For example, here is a sample code from gitlab-runner that executes a command inside kubernetes pod and then it launches another goroutine to monitor pod status, i.e.,

func (s *executor) runWithAttach(cmd common.ExecutorCommand) error {
...
	ctx, cancel := context.WithCancel(cmd.Context)
	defer cancel()
    containerName = ...
    containerCommand = ...
...

	podStatusCh := s.watchPodStatus(ctx)

	select {
	case err := <-s.runInContainer(containerName, containerCommand):
		s.Debugln(fmt.Sprintf("Container %q exited with error: %v", containerName, err))
		var terminatedError *commandTerminatedError
		if err != nil && errors.As(err, &terminatedError) {
			return &common.BuildError{Inner: err, ExitCode: terminatedError.exitCode}
		}

		return err
	case err := <-podStatusCh:
		return &common.BuildError{Inner: err}
	case <-ctx.Done():
		return fmt.Errorf("build aborted")
	}
}

func (s *executor) watchPodStatus(ctx context.Context) <-chan error {
	// Buffer of 1 in case the context is cancelled while the timer tick case is being executed
	// and the consumer is no longer reading from the channel while we try to write to it
	ch := make(chan error, 1)

	go func() {
		defer close(ch)

		t := time.NewTicker(time.Duration(s.Config.Kubernetes.GetPollInterval()) * time.Second)
		defer t.Stop()

		for {
			select {
			case <-ctx.Done():
				return
			case <-t.C:
				err := s.checkPodStatus()
				if err != nil {
					ch <- err
					return
				}
			}
		}
	}()

	return ch
}

func (s *executor) runInContainer(name string, command []string) <-chan error {
	errCh := make(chan error, 1)
	go func() {
		defer close(errCh)

		...
		retryable := retry.New(retry.WithBuildLog(&attach, &s.BuildLogger))
		err := retryable.Run()
		if err != nil {
			errCh <- err
		}

		exitStatus := <-s.remoteProcessTerminated
		if *exitStatus.CommandExitCode == 0 {
			errCh <- nil
			return
		}

		errCh <- &commandTerminatedError{exitCode: *exitStatus.CommandExitCode}
	}()

	return errCh
}

In above code, runWithAttach calls watchPodStatus to monitor status of the pod in background goroutine and then executes command in runInContainer.

Here is equivalent code using async/await from async_watchdog.go:

func (s *executor) runWithAttach(cmd common.ExecutorCommand) error {
...
	ctx, cancel := context.WithCancel(cmd.Context)
	defer cancel()
    containerName = ...
    containerCommand = ...

...
    handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
      return nil, s.runInContainer(containerName, containerCommand)
    }
    watchdogHandler := func(ctx context.Context, payload interface{}) error {
         return nil, s.checkPodStatus()
    }

    res, err := async.ExecuteWatchdog(ctx, handler, watchdogHandler, async.NoAbort, nil, poll).
  				Await(ctx, timeout)
	if err != nil {
		return &common.BuildError{Inner: err, ExitCode: terminatedError.exitCode}
	}
}

func (s *executor) runInContainer(name string, command []string) error {
		...
		retryable := retry.New(retry.WithBuildLog(&attach, &s.BuildLogger))
		return retryable.Run()
}

Above code removes extraneous complexity due to concurrent code embedded with functional code and makes it easier to comprehend with improved support of concurrency scope, error handling, timeout and cancellation.

Other accidental complexity in Gitlab-Runner

Besides concurrency, here are a few other design choices that adds accidental complexity in gitlab-runner:

Abstraction

The primary goal of gitlab-runner is to abstract executor framework so that it can use different platforms such as Docker, Kubernetes, SSH, Shell, etc to execute processes. However, it doesn’t define an interface for common behavior such as managing runtime containers or executing a command for these executors.

Adapter/Gateway pattern

A common pattern to abstract third party library or APIs is to use an adapter or gateway pattern but gitlab-runner mixes external APIs with internal executor logic. The kubernetes executor in gitlab-runner defines logic for both interacting with external Kubernetes server and managing Kubernetes Pod or executing processes inside those pods. For example, my initial intent for looking at the gitlab-runner was to adopt APIs for interacting with Kubernetes but I could not reuse any code as a library and instead I had to copy relevant code for my use-case.

Separation of Concerns

The gitlab-runner is not only riddled with concurrency related primitives such as goroutines and channels but it also mixes other aspects such as configuration, feature-flags, logging, monitoring, etc. For example, it uses configurations for defining containers, services, volumes for Kubernetes but it hard codes various internal configurations for build, helpers, monitoring containers instead of injecting them via external configuration. Similarly, it hard codes volumes for repo, cache, logging, etc. A common design pattern in building software is to use layers of abstractions or separation of concerns where each layer or a module is responsible for a single concern (single-responsibility principle). For example, you can divide executors into three layers: adapter-layer, middle-layer and high-level layer where adapter layer strictly interacts with underlying platform such as Kubernetes and no other layer needs to know internal types or APIs of Kubernetes. The middle layer uses adapter layer to delegate any executor specific behavior and defines methods to configure containers. The high-level layer is responsible for injecting dependent services, containers and configurations to execute jobs within the gitlab environment. I understand some of these abstractions might become leaky if there are significant differences among executors but such design allows broader reuse of lower-level layers in other contexts.

Summary

Though, modern scalable and performant applications demand concurrency but sprinkling low-level concurrency patterns all over your code adds significant accidental or extraneous complexity to your code. In this blog, I demonstrated how encapsulating concurrent code with library of structured concurrency patterns can simplify your code. Concurrency is hard especially with lower-level primitives such as WaitGroup, Mutex, goroutines, channels in GO. In addition, incorrect use of these concurrency primitives can lead to deadlocks or race conditions. Using a small library for abstracting common concurrency patterns can reduce probability of concurrency related bugs. Finally, due to lack of immutability, strong ownership, generics or scope of concurrency in GO, you still have to manage race conditions and analyze your code carefully for deadlocks but applying structured concurrency can help manage concurrent code better.

Powered by WordPress