Skip to content

yuridevx/app

Repository files navigation

App

A Go framework for building concurrent applications with component-based architecture, lifecycle management, and middleware support.

go get github.com/yuridevx/app

Full Example

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "net/http"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"

    "github.com/yuridevx/app"
    "github.com/yuridevx/app/appch"
    "github.com/yuridevx/app/apperr"
    "github.com/yuridevx/app/apptrace"
    "github.com/yuridevx/app/options"
)

type Order struct {
    ID       int     `json:"id"`
    Amount   float64 `json:"amount"`
    Customer string  `json:"customer"`
}

var (
    orderQueue   = make(chan Order, 1000)
    submitOrder  app.ProxyFn
    ErrQueueFull = apperr.Error("QueueFullError")
)

func main() {
    wg := &sync.WaitGroup{}
    ctx, cancel := context.WithCancel(context.Background())

    go func() {
        sig := make(chan os.Signal, 1)
        signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
        <-sig
        cancel()
    }()

    traceMiddleware := apptrace.NewTraceMiddleware(nil, nil)

    builder := app.NewBuilder(func(opts *options.ApplicationOptions) {
        opts.Middleware = append(opts.Middleware, traceMiddleware)
        opts.AppShutdownTimeout = 30 * time.Second
        opts.ComponentShutdownTimeout = 40 * time.Second
    })

    // Order processing with parallel workers
    builder.C("order-processor").
        OnStart(func(ctx context.Context) error {
            return initDB()
        }).
        PConsume(appch.ToInterfaceChan(orderQueue), 10, func(ctx context.Context, order Order) error {
            trace := apptrace.FromContext(ctx)
            trace.WithAttributes("orderId", order.ID, "customer", order.Customer)
            return saveOrder(ctx, order)
        }).
        OnShutdown(func(ctx context.Context) error {
            return closeDB()
        })

    // HTTP API
    httpComponent := builder.C("http-server")

    submitOrder = httpComponent.Proxy(func(ctx context.Context, wg *sync.WaitGroup, order Order) error {
        defer wg.Done()
        select {
        case orderQueue <- order:
            return nil
        default:
            return ErrQueueFull("queue at capacity", apperr.WithAttribute("queueLen", len(orderQueue)))
        }
    })

    httpComponent.PBlocking(func(ctx context.Context) {
        mux := http.NewServeMux()
        mux.HandleFunc("/order", handleOrder)
        server := &http.Server{Addr: ":8080", Handler: mux}

        go func() {
            <-ctx.Done()
            shutdownCtx, c := context.WithTimeout(context.Background(), 10*time.Second)
            defer c()
            server.Shutdown(shutdownCtx)
        }()

        server.ListenAndServe()
    })

    // Metrics reporting with dynamic interval
    builder.C("metrics").
        CPeriodIndexFn(func(i int32) (int32, time.Duration) {
            if i == 0 {
                return 1, time.Nanosecond // immediate first run
            }
            return i, time.Minute
        }, func(ctx context.Context) {
            fmt.Printf("Queue: %d/%d\n", len(orderQueue), cap(orderQueue))
        })

    // Competing consumers example - both share same goroutine
    highPriority := make(chan interface{}, 10)
    lowPriority := make(chan interface{}, 100)

    builder.C("priority-handler").
        CConsume(highPriority, func(ctx context.Context, msg interface{}) {
            fmt.Println("HIGH:", msg)
        }).
        CConsume(lowPriority, func(ctx context.Context, msg interface{}) {
            fmt.Println("LOW:", msg)
        }).
        CPeriod(5*time.Second, func() {
            fmt.Println("Heartbeat")
        })

    // Conditional periodic task with enable/disable
    enableSync := make(chan bool)
    builder.C("sync-service").
        CPeriodFn(func() time.Duration {
            return 30 * time.Second
        }, func(ctx context.Context) {
            syncExternalService(ctx)
        }, func(opts *options.CPeriodOptions) {
            opts.SwitchCh = enableSync // waits for `true` before starting
        })

    builder.OnShutdown(func(ctx context.Context) {
        fmt.Println("Shutdown complete")
    })

    builder.Build().Run(ctx, wg)
    wg.Wait()
}

func handleOrder(w http.ResponseWriter, r *http.Request) {
    var order Order
    if err := json.NewDecoder(r.Body).Decode(&order); err != nil {
        http.Error(w, err.Error(), 400)
        return
    }
    if err := submitOrder(r.Context(), order); err != nil {
        http.Error(w, err.Error(), 503)
        return
    }
    w.WriteHeader(202)
}

Handler Types

Parallel (P-prefix) - Run in separate goroutines

PConsume - Multiple goroutines consuming from a channel:

builder.C("workers").
    PConsume(taskCh, 20, func(ctx context.Context, task Task) error {
        return process(task)
    })

PBlocking - Long-running daemon tasks:

builder.C("server").
    PBlocking(func(ctx context.Context) {
        for {
            select {
            case <-ctx.Done():
                return
            case conn := <-listener.Accept():
                go handle(conn)
            }
        }
    })

Competing (C-prefix) - Share single goroutine, compete for execution

CConsume - Channel consumer:

builder.C("processor").
    CConsume(eventCh, func(ctx context.Context, event Event) {
        handle(event)
    })

CPeriod - Periodic execution with three scheduling modes:

// Fixed interval
CPeriod(time.Minute, handler)

// Dynamic interval
CPeriodFn(func() time.Duration {
    if peak() { return time.Second }
    return time.Minute
}, handler)

// Indexed (useful for backoff, immediate first run, etc.)
CPeriodIndexFn(func(i int32) (int32, time.Duration) {
    return i + 1, time.Duration(i) * time.Second
}, handler)

Lifecycle

builder.C("service").
    OnStart(func(ctx context.Context) error {
        return initialize() // error aborts component startup
    }).
    OnShutdown(func(ctx context.Context) error {
        return cleanup()
    })

// Application-level shutdown (called after all components)
builder.OnShutdown(func(ctx context.Context) {
    globalCleanup()
})

Proxy - Callable from outside

var processRequest app.ProxyFn

processRequest = builder.C("api").
    Proxy(func(ctx context.Context, wg *sync.WaitGroup, req Request) error {
        defer wg.Done() // required when using *sync.WaitGroup
        return handle(req)
    })

// After app starts, call from anywhere:
err := processRequest(ctx, myRequest)

Middleware

type Middleware = func(ctx context.Context, input interface{}, call Call, next NextFn) error

Three levels - application, component, handler:

// Application-wide
builder := app.NewBuilder(func(opts *options.ApplicationOptions) {
    opts.Middleware = []options.Middleware{traceMiddleware, loggingMiddleware}
})

// Component-level
builder.C("service").
    Options(func(opts *options.ComponentOptions) {
        opts.Middleware = append(opts.Middleware, authMiddleware)
    })

// Handler-level
builder.C("service").
    CPeriod(time.Second, handler, func(opts *options.CPeriodOptions) {
        opts.Middleware = append(opts.Middleware, rateLimitMiddleware)
    })

Call types: CallStart, CallShutdown, CallCPeriod, CallCConsume, CallPConsume, CallPBlocking, CallProxy

Handler Signatures

Handlers accept flexible signatures - the framework auto-detects:

func()
func() error
func(ctx context.Context)
func(ctx context.Context) error
func(input interface{})
func(ctx context.Context, input MyType) error
func(ctx context.Context, wg *sync.WaitGroup, input interface{}) error

Packages

appch - Channel utilities

typedCh := make(chan MyMessage, 100)
interfaceCh := appch.ToInterfaceChan(typedCh)

apperr - Structured errors with stack traces

var ErrNotFound = apperr.Error("NotFoundError")
var ErrValidation = apperr.Error("ValidationError", apperr.WithAttribute("severity", "warning"))

// Usage
return ErrNotFound(originalErr, "user not found", apperr.WithAttribute("userId", id))
return ErrValidation("invalid email", apperr.WithAttributes(map[string]interface{}{"field": "email", "value": input}))

apptrace - Tracing context

// Middleware setup
traceMiddleware := apptrace.NewTraceMiddleware(nil, nil)

// In handlers
trace := apptrace.FromContext(ctx)
trace.WithName("custom-name")
trace.WithAttributes("key", value, "key2", value2)
trace.WithError(nonFatalErr) // record without failing
trace.WithLog(true)          // force logging

Custom naming:

apptrace.NewTraceMiddleware(func(call options.Call) string {
    return fmt.Sprintf("%s/%s", call.GetComponentDefinition(), call.GetCallType())
}, nil)

appzap - Zap logging

import "github.com/yuridevx/app/appzap"

logger, _ := zap.NewProduction()
zapMiddleware := appzap.ZapMiddleware(logger, nil)

// Force/suppress logging per handler
opts.Middleware = append(opts.Middleware, appzap.LogMeMiddleware)
opts.Middleware = append(opts.Middleware, appzap.DontLogMeMiddleware)

apprelic - New Relic APM

import "github.com/yuridevx/app/apprelic"

nrApp, _ := newrelic.NewApplication(...)
opts.Middleware = []options.Middleware{
    apptrace.NewTraceMiddleware(nil, nil),
    apprelic.NewRelicTransactionMiddleware(nrApp),
    apprelic.NewRelicTraceMiddleware(),
}

Gin Support

Trace context works with Gin's context:

func ginHandler(c *gin.Context) {
    ctx, trace := apptrace.TraceContextNew(c)
    trace.WithAttributes("path", c.Request.URL.Path)
    result, err := service.Do(ctx)
    // ...
}

Configuration

app.NewBuilder(func(opts *options.ApplicationOptions) {
    opts.StartTimeout = 60 * time.Second             // OnStart timeout
    opts.ComponentShutdownTimeout = 40 * time.Second // Component shutdown
    opts.AppShutdownTimeout = 20 * time.Second       // App-level shutdown
})

About

Application framework that lets you manage component goroutines and graceful shutdown

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages