Shahzad Bhatti Welcome to my ramblings and rants!

December 26, 2020

Applying Structured Concurrency patterns in GO with case-study of Gitlab-runner

Filed under: Concurrency,GO — admin @ 9:31 pm

I recently wrote a series of blogs on structured concurrency (Part-I, Part-II, Part-III, Part-IV) and how it improves readability, concurrency-scope, composition and flow-control of concurrent code and adds better support for error-handling, cancellation, and timeout. I have been using GO on a number of projects over last few years and I will share a few concurrency patterns that I have used or seen in other projects such as gitlab-runner. I demonstrated in above series how structured concurrency considers GO statement harmful similar to GOTO in structured programming. Just as structured programming replaced GOTO with control-flow primitives such as single-entry/exit, if-then, loop, and functions calls; structured concurrency provides scope of concurrency where parent waits for all asynchronous code. I will show how common concurrency patterns in GO can take advantage of structured concurrency.

Asynchronous Tasks

The primary purpose of goroutines is to perform asynchronous tasks where you might be requesting a data from remote API or database, e.g. here is a sample code from gitlab-runner that uses goroutines to copy archive artifacts:

func (s *Client) Run(ctx context.Context, cmd Command) error {
...
	waitCh := make(chan error)
	go func() {
		err := session.Wait()
		if _, ok := err.(*ssh.ExitError); ok {
			err = &ExitError{Inner: err}
		}
		waitCh <- err
	}()

	select {
	case <-ctx.Done():
		_ = session.Signal(ssh.SIGKILL)
		_ = session.Close()
		return <-waitCh

	case err := <-waitCh:
		return err
	}
}

Here is a async/await based syntax based on async_await.go that performs similar task using structured concurrency:

func (s *Client) Run(ctx context.Context, cmd Command) error {
...
    ctx := context.Background()
    timeout := ..
    handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
		err := session.Wait()
		if _, ok := err.(*ssh.ExitError); ok {
			err = &ExitError{Inner: err}
		}
        return nil, err
    }
    abort := func(ctx context.Context, payload interface{}) (interface{}, error) {
		_ = session.Signal(ssh.SIGKILL)
		_ = session.Close()
    }
    async.Execute(ctx, handler, abort).Await(ctx, timeout)
}

Above code defines scope of concurrency and adds support for timeout while making the code easier to comprehend.

Note: due to lack of generics in GO, interface{} are used to accept different types that may be passed to asynchronous tasks.

Racing Asynchronous Tasks

A common use of goroutines is spawning multiple asynchronous tasks and takes result of first task that completes e.g. here is a sample code from gitlab-runner that uses goroutines to copy stdout and stderr:

	stdoutErrCh := make(chan error)
	go func() {
		_, errCopy := stdcopy.StdCopy(output, output, hijacked.Reader)
		stdoutErrCh <- errCopy
	}()

	// Write the input to the container and close its STDIN to get it to finish
	stdinErrCh := make(chan error)
	go func() {
		_, errCopy := io.Copy(hijacked.Conn, input)
		_ = hijacked.CloseWrite()
		if errCopy != nil {
			stdinErrCh <- errCopy
		}
	}()

	// Wait until either:
	// - the job is aborted/cancelled/deadline exceeded
	// - stdin has an error
	// - stdout returns an error or nil, indicating the stream has ended and
	//   the container has exited
	select {
	case <-ctx.Done():
		err = errors.New("aborted")
	case err = <-stdinErrCh:
	case err = <-stdoutErrCh:
	}

Above code creates stdoutErrCh channel to capture errors from stdout and stdinErrCh channel to capture errors from stderr and then waits for either to finish.

Here is equivalent code that uses structured concurrency with async/await primitives from async_racer.go:

    ctx := context.Background()
    timeout := ..
    pollin
    handler1 := func(ctx context.Context) (interface{}, error) {
        return nil, stdcopy.StdCopy(output, output, hijacked.Reader)
    }
    handler2 := func(ctx context.Context) (interface{}, error) {
		defer hijacked.CloseWrite()
        return nil, io.Copy(hijacked.Conn, input)
    }
    future, _ := async. ExecuteRacer(ctx, handler1, handler2)
     _, err := future.Await(ctx, timeout)

Above code uses async/await syntax to define scope of concurrency and clarifies intent of the business logic without distraction of concurrency logic.

Performing cleanup when task is aborted or cancelled

If a goroutine spawns an external process for background work, you may need to kill that process in case goroutine task is cancelled or times out. For example, here is a sample code from gitlab-runner that calls KillAndWait function to terminate external process when context.Done() is invoked:

   func (c *command) Run() error {
       err := c.cmd.Start()
       if err != nil {
           return fmt.Errorf("failed to start command: %w", err)
       }

       go c.waitForCommand()

       select {
       case err = <-c.waitCh:
           return err

       case <-c.context.Done():
           return newProcessKillWaiter(c.logger, c.gracefulKillTimeout, c.forceKillTimeout).
               KillAndWait(c.cmd, c.waitCh)
       }
   }

In above, Run method starts a command in goroutine, waits for completion in another goroutine and then listens to response from waitCh and context.Done channels.

Here is how async/await structure from async_await.go can apply structured concurrency to above code:

   func (c *command) Run() error {
       timeout := ...
       ctx := context.Background()
       handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
         return nil, c.cmd.Run()
       }
       abort := func(ctx context.Context, payload interface{}) (interface{}, error) {
         return nil, KillAndWait(c.cmd, c.waitCh)
       }
       Execute(ctx, handler, abort, nil).Await(ctx, timeout)
   }

Using GO Channels as data pipe/queue

GO channels are designed based on CSP rendezvous primitives where both sender and receiver have to wait to exchange messages. However, you can add buffering to make these channels as bounded queue (for back-pressure). Here is an example code from gitlab-runner that uses channels to stream log messages:

 func (l *kubernetesLogProcessor) scan(ctx context.Context, logs io.Reader) (*bufio.Scanner, <-chan string) {
     logsScanner := bufio.NewScanner(logs)

     linesCh := make(chan string)
     go func() {
         defer close(linesCh)

         // This goroutine will exit when the calling method closes the logs stream or the context is cancelled
         for logsScanner.Scan() {
             select {
             case <-ctx.Done():
                 return
             case linesCh <- logsScanner.Text():
             }
         }
     }()

     return logsScanner, linesCh
 }

Above code creates a channel linesCh without any buffer and then creates a goroutine where logs are read and sent to linesCh channel. As I mentioned, above code will block sender until the receiver is ready to receive these log messages and you may lose log messages if receiver is slow and goroutine is killed before logs can be read. Though, structured concurrency cannot help in this case but we can simply use an event-bus or a local message-queue to stream these logs.

WaitGroup to wait for completion of goroutines

GO language supports sync.WaitGroup to wait for completion of goroutines but it’s redundant if you are also using channels to receive for reply. For example, here is a sample code from gitlab-runner that uses WaitGroup to wait for completion of groutines:

 func (e *executor) waitForServices() {
...
     // wait for all services to came up
     if waitForServicesTimeout > 0 && len(e.services) > 0 {
         wg := sync.WaitGroup{}
         for _, service := range e.services {
             wg.Add(1)
             go func(service *types.Container) {
                 _ = e.waitForServiceContainer(service, time.Duration(waitForServicesTimeout)*time.Second)
                 wg.Done()
             }(service)
         }
         wg.Wait()
     }
 }

Here is how above code can be replaced by async/await syntax from my async_await.go:

 func (e *executor) waitForServices() {
...
     // wait for all services to came up
     if waitForServicesTimeout > 0 && len(e.services) > 0 {
        ctx := context.Background()
        timeout := time.Duration(waitForServicesTimeout)*time.Second
        futures := make([]async.Awaiter, len(e.services))
        handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
          return e.waitForServiceContainer(payload.(string)), nil
        }
        for i:=0; i<len(e.services); i++ {
          futures[i] = async.Execute(ctx, handler, async.NoAbort, e.services[i])
	    }
        async.AwaitAll(ctx, timeout, futures...)
     }
 }

Above code is not shorter than the original but it is more readable and eliminates subtle bugs where you might be using WaitGroup incorrectly, thus resulting in deadlock.

Fork-Join based Asynchronous Tasks

One common use of goroutines is to spawn multiple asynchronous tasks and wait for their completion similar to fork-join pattern, e.g. here is a sample code from gitlab-runner that uses goroutines to perform cleanup of multiple services and sends back result via a channel.

func (s *executor) cleanupServices() {
	ch := make(chan serviceDeleteResponse)
	var wg sync.WaitGroup
	wg.Add(len(s.services))

	for _, service := range s.services {
		go s.deleteKubernetesService(service.ObjectMeta.Name, ch, &wg)
	}

	go func() {
		wg.Wait()
		close(ch)
	}()

	for res := range ch {
		if res.err != nil {
			s.Errorln(fmt.Sprintf("Error cleaning up the pod service %q: %v", res.serviceName, res.err))
		}
	}
}

func (s *executor) deleteKubernetesService(serviceName string, ch chan<- serviceDeleteResponse, wg *sync.WaitGroup) {
	defer wg.Done()

	err := s.kubeClient.CoreV1().
		Services(s.configurationOverwrites.namespace).
		Delete(serviceName, &metav1.DeleteOptions{})
	ch <- serviceDeleteResponse{serviceName: serviceName, err: err}
}

The cleanupServices method above goes through a collection of services and then calls deleteKubernetesService in goroutine, which calls kubernetes API to remove given service. It waits for all background using WaitGroup and then receives any errors from error channel and logs them.

Here is how you can use async/await code from async_await.go that applies structured concurrency by abstracting low-level goroutines and channels:

func (s *executor) cleanupServices() {
    ctx := context.Background()
    timeout := time.Duration(5 * time.Second)
    futures := make([]async.Awaiter, len(s.services))
    handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
         serviceName := payload.(string)
         err := s.kubeClient.CoreV1().
      		Services(s.configurationOverwrites.namespace).
			Delete(serviceName, &metav1.DeleteOptions{})
         return nil, err
    }
    for i:=0; i<len(s.services); i++ {
        futures[i] = async.Execute(ctx, handler, async.NoAbort, s.services[i])
	}
    results := async.AwaitAll(ctx, timeout, futures...)
	for res := range results {
		if res.Err != nil {
			s.Errorln(fmt.Sprintf("Error cleaning up the pod service %q: %v", res.serviceName, res.Err))
		}
	}
}

Above code uses structured concurrency by defining scope of asynchronous code in cleanupServices and adds better support for cancellation, timeout and error handling. Also, you don’t need to use WaitGroup to wait for completion anymore.

Polling Asynchronous Task

In some cases, you may need to poll a background task to check its status or wait for its completion, e.g. here is a sample code from gitlab-runner that waits for pod until it’s running:

func waitForPodRunning(
	ctx context.Context,
	c *kubernetes.Clientset,
	pod *api.Pod,
	out io.Writer,
	config *common.KubernetesConfig,
) (api.PodPhase, error) {
	pollInterval := config.GetPollInterval()
	pollAttempts := config.GetPollAttempts()
	for i := 0; i <= pollAttempts; i++ {
		select {
		case r := <-triggerPodPhaseCheck(c, pod, out):
			if !r.done {
				time.Sleep(time.Duration(pollInterval) * time.Second)
				continue
			}
			return r.phase, r.err
		case <-ctx.Done():
			return api.PodUnknown, ctx.Err()
		}
	}
	return api.PodUnknown, errors.New("timed out waiting for pod to start")
}

func triggerPodPhaseCheck(c *kubernetes.Clientset, pod *api.Pod, out io.Writer) <-chan podPhaseResponse {
	errc := make(chan podPhaseResponse)
	go func() {
		defer close(errc)
		errc <- getPodPhase(c, pod, out)
	}()
	return errc
}

The waitForPodRunning method above repeatedly calls triggerPodPhaseCheck, which creates a goroutine and then invokes kubernetes API to get pod status. It then returns pod status in a channel that waitForPodRunning listens to.

Here is equivalent code using async/await from async_polling.go:

func waitForPodRunning(
	ctx context.Context,
	c *kubernetes.Clientset,
	pod *api.Pod,
	out io.Writer,
	config *common.KubernetesConfig,
) (api.PodPhase, error) {
    timeout := config.getPodRunningTimeout()
    pollInterval := config.GetPollInterval()
    handler := func(ctx context.Context, payload interface{}) (bool, interface{}, error) {
         r := getPodPhase(c, pod, out)
         return r.done, r.phase, r.err
    }
    _, phase, err := async.ExecutePolling(ctx, handler, NoAbort, 0, pollInterval).
  						Await(ctx, timeout)
	return r.phase, r.err
}

Above code removes complexity due to manually polling and managing goroutines/channels.

Background Task with watchdog

Another concurrency pattern in GO involves starting a background task but then launch another background process to monitor the task or its runtime environment so that it can terminate background task if watchdog finds any errors. For example, here is a sample code from gitlab-runner that executes a command inside kubernetes pod and then it launches another goroutine to monitor pod status, i.e.,

func (s *executor) runWithAttach(cmd common.ExecutorCommand) error {
...
	ctx, cancel := context.WithCancel(cmd.Context)
	defer cancel()
    containerName = ...
    containerCommand = ...
...

	podStatusCh := s.watchPodStatus(ctx)

	select {
	case err := <-s.runInContainer(containerName, containerCommand):
		s.Debugln(fmt.Sprintf("Container %q exited with error: %v", containerName, err))
		var terminatedError *commandTerminatedError
		if err != nil && errors.As(err, &terminatedError) {
			return &common.BuildError{Inner: err, ExitCode: terminatedError.exitCode}
		}

		return err
	case err := <-podStatusCh:
		return &common.BuildError{Inner: err}
	case <-ctx.Done():
		return fmt.Errorf("build aborted")
	}
}

func (s *executor) watchPodStatus(ctx context.Context) <-chan error {
	// Buffer of 1 in case the context is cancelled while the timer tick case is being executed
	// and the consumer is no longer reading from the channel while we try to write to it
	ch := make(chan error, 1)

	go func() {
		defer close(ch)

		t := time.NewTicker(time.Duration(s.Config.Kubernetes.GetPollInterval()) * time.Second)
		defer t.Stop()

		for {
			select {
			case <-ctx.Done():
				return
			case <-t.C:
				err := s.checkPodStatus()
				if err != nil {
					ch <- err
					return
				}
			}
		}
	}()

	return ch
}

func (s *executor) runInContainer(name string, command []string) <-chan error {
	errCh := make(chan error, 1)
	go func() {
		defer close(errCh)

		...
		retryable := retry.New(retry.WithBuildLog(&attach, &s.BuildLogger))
		err := retryable.Run()
		if err != nil {
			errCh <- err
		}

		exitStatus := <-s.remoteProcessTerminated
		if *exitStatus.CommandExitCode == 0 {
			errCh <- nil
			return
		}

		errCh <- &commandTerminatedError{exitCode: *exitStatus.CommandExitCode}
	}()

	return errCh
}

In above code, runWithAttach calls watchPodStatus to monitor status of the pod in background goroutine and then executes command in runInContainer.

Here is equivalent code using async/await from async_watchdog.go:

func (s *executor) runWithAttach(cmd common.ExecutorCommand) error {
...
	ctx, cancel := context.WithCancel(cmd.Context)
	defer cancel()
    containerName = ...
    containerCommand = ...

...
    handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
      return nil, s.runInContainer(containerName, containerCommand)
    }
    watchdogHandler := func(ctx context.Context, payload interface{}) error {
         return nil, s.checkPodStatus()
    }

    res, err := async.ExecuteWatchdog(ctx, handler, watchdogHandler, async.NoAbort, nil, poll).
  				Await(ctx, timeout)
	if err != nil {
		return &common.BuildError{Inner: err, ExitCode: terminatedError.exitCode}
	}
}

func (s *executor) runInContainer(name string, command []string) error {
		...
		retryable := retry.New(retry.WithBuildLog(&attach, &s.BuildLogger))
		return retryable.Run()
}

Above code removes extraneous complexity due to concurrent code embedded with functional code and makes it easier to comprehend with improved support of concurrency scope, error handling, timeout and cancellation.

Other accidental complexity in Gitlab-Runner

Besides concurrency, here are a few other design choices that adds accidental complexity in gitlab-runner:

Abstraction

The primary goal of gitlab-runner is to abstract executor framework so that it can use different platforms such as Docker, Kubernetes, SSH, Shell, etc to execute processes. However, it doesn’t define an interface for common behavior such as managing runtime containers or executing a command for these executors.

Adapter/Gateway pattern

A common pattern to abstract third party library or APIs is to use an adapter or gateway pattern but gitlab-runner mixes external APIs with internal executor logic. The kubernetes executor in gitlab-runner defines logic for both interacting with external Kubernetes server and managing Kubernetes Pod or executing processes inside those pods. For example, my initial intent for looking at the gitlab-runner was to adopt APIs for interacting with Kubernetes but I could not reuse any code as a library and instead I had to copy relevant code for my use-case.

Separation of Concerns

The gitlab-runner is not only riddled with concurrency related primitives such as goroutines and channels but it also mixes other aspects such as configuration, feature-flags, logging, monitoring, etc. For example, it uses configurations for defining containers, services, volumes for Kubernetes but it hard codes various internal configurations for build, helpers, monitoring containers instead of injecting them via external configuration. Similarly, it hard codes volumes for repo, cache, logging, etc. A common design pattern in building software is to use layers of abstractions or separation of concerns where each layer or a module is responsible for a single concern (single-responsibility principle). For example, you can divide executors into three layers: adapter-layer, middle-layer and high-level layer where adapter layer strictly interacts with underlying platform such as Kubernetes and no other layer needs to know internal types or APIs of Kubernetes. The middle layer uses adapter layer to delegate any executor specific behavior and defines methods to configure containers. The high-level layer is responsible for injecting dependent services, containers and configurations to execute jobs within the gitlab environment. I understand some of these abstractions might become leaky if there are significant differences among executors but such design allows broader reuse of lower-level layers in other contexts.

Summary

Though, modern scalable and performant applications demand concurrency but sprinkling low-level concurrency patterns all over your code adds significant accidental or extraneous complexity to your code. In this blog, I demonstrated how encapsulating concurrent code with library of structured concurrency patterns can simplify your code. Concurrency is hard especially with lower-level primitives such as WaitGroup, Mutex, goroutines, channels in GO. In addition, incorrect use of these concurrency primitives can lead to deadlocks or race conditions. Using a small library for abstracting common concurrency patterns can reduce probability of concurrency related bugs. Finally, due to lack of immutability, strong ownership, generics or scope of concurrency in GO, you still have to manage race conditions and analyze your code carefully for deadlocks but applying structured concurrency can help manage concurrent code better.

No Comments

No comments yet.

RSS feed for comments on this post. TrackBack URL

Sorry, the comment form is closed at this time.

Powered by WordPress