A Go framework for building concurrent applications with component-based architecture, lifecycle management, and middleware support.
go get github.com/yuridevx/apppackage main
import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/yuridevx/app"
"github.com/yuridevx/app/appch"
"github.com/yuridevx/app/apperr"
"github.com/yuridevx/app/apptrace"
"github.com/yuridevx/app/options"
)
type Order struct {
ID int `json:"id"`
Amount float64 `json:"amount"`
Customer string `json:"customer"`
}
var (
orderQueue = make(chan Order, 1000)
submitOrder app.ProxyFn
ErrQueueFull = apperr.Error("QueueFullError")
)
func main() {
wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
go func() {
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
<-sig
cancel()
}()
traceMiddleware := apptrace.NewTraceMiddleware(nil, nil)
builder := app.NewBuilder(func(opts *options.ApplicationOptions) {
opts.Middleware = append(opts.Middleware, traceMiddleware)
opts.AppShutdownTimeout = 30 * time.Second
opts.ComponentShutdownTimeout = 40 * time.Second
})
// Order processing with parallel workers
builder.C("order-processor").
OnStart(func(ctx context.Context) error {
return initDB()
}).
PConsume(appch.ToInterfaceChan(orderQueue), 10, func(ctx context.Context, order Order) error {
trace := apptrace.FromContext(ctx)
trace.WithAttributes("orderId", order.ID, "customer", order.Customer)
return saveOrder(ctx, order)
}).
OnShutdown(func(ctx context.Context) error {
return closeDB()
})
// HTTP API
httpComponent := builder.C("http-server")
submitOrder = httpComponent.Proxy(func(ctx context.Context, wg *sync.WaitGroup, order Order) error {
defer wg.Done()
select {
case orderQueue <- order:
return nil
default:
return ErrQueueFull("queue at capacity", apperr.WithAttribute("queueLen", len(orderQueue)))
}
})
httpComponent.PBlocking(func(ctx context.Context) {
mux := http.NewServeMux()
mux.HandleFunc("/order", handleOrder)
server := &http.Server{Addr: ":8080", Handler: mux}
go func() {
<-ctx.Done()
shutdownCtx, c := context.WithTimeout(context.Background(), 10*time.Second)
defer c()
server.Shutdown(shutdownCtx)
}()
server.ListenAndServe()
})
// Metrics reporting with dynamic interval
builder.C("metrics").
CPeriodIndexFn(func(i int32) (int32, time.Duration) {
if i == 0 {
return 1, time.Nanosecond // immediate first run
}
return i, time.Minute
}, func(ctx context.Context) {
fmt.Printf("Queue: %d/%d\n", len(orderQueue), cap(orderQueue))
})
// Competing consumers example - both share same goroutine
highPriority := make(chan interface{}, 10)
lowPriority := make(chan interface{}, 100)
builder.C("priority-handler").
CConsume(highPriority, func(ctx context.Context, msg interface{}) {
fmt.Println("HIGH:", msg)
}).
CConsume(lowPriority, func(ctx context.Context, msg interface{}) {
fmt.Println("LOW:", msg)
}).
CPeriod(5*time.Second, func() {
fmt.Println("Heartbeat")
})
// Conditional periodic task with enable/disable
enableSync := make(chan bool)
builder.C("sync-service").
CPeriodFn(func() time.Duration {
return 30 * time.Second
}, func(ctx context.Context) {
syncExternalService(ctx)
}, func(opts *options.CPeriodOptions) {
opts.SwitchCh = enableSync // waits for `true` before starting
})
builder.OnShutdown(func(ctx context.Context) {
fmt.Println("Shutdown complete")
})
builder.Build().Run(ctx, wg)
wg.Wait()
}
func handleOrder(w http.ResponseWriter, r *http.Request) {
var order Order
if err := json.NewDecoder(r.Body).Decode(&order); err != nil {
http.Error(w, err.Error(), 400)
return
}
if err := submitOrder(r.Context(), order); err != nil {
http.Error(w, err.Error(), 503)
return
}
w.WriteHeader(202)
}PConsume - Multiple goroutines consuming from a channel:
builder.C("workers").
PConsume(taskCh, 20, func(ctx context.Context, task Task) error {
return process(task)
})PBlocking - Long-running daemon tasks:
builder.C("server").
PBlocking(func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case conn := <-listener.Accept():
go handle(conn)
}
}
})CConsume - Channel consumer:
builder.C("processor").
CConsume(eventCh, func(ctx context.Context, event Event) {
handle(event)
})CPeriod - Periodic execution with three scheduling modes:
// Fixed interval
CPeriod(time.Minute, handler)
// Dynamic interval
CPeriodFn(func() time.Duration {
if peak() { return time.Second }
return time.Minute
}, handler)
// Indexed (useful for backoff, immediate first run, etc.)
CPeriodIndexFn(func(i int32) (int32, time.Duration) {
return i + 1, time.Duration(i) * time.Second
}, handler)builder.C("service").
OnStart(func(ctx context.Context) error {
return initialize() // error aborts component startup
}).
OnShutdown(func(ctx context.Context) error {
return cleanup()
})
// Application-level shutdown (called after all components)
builder.OnShutdown(func(ctx context.Context) {
globalCleanup()
})var processRequest app.ProxyFn
processRequest = builder.C("api").
Proxy(func(ctx context.Context, wg *sync.WaitGroup, req Request) error {
defer wg.Done() // required when using *sync.WaitGroup
return handle(req)
})
// After app starts, call from anywhere:
err := processRequest(ctx, myRequest)type Middleware = func(ctx context.Context, input interface{}, call Call, next NextFn) errorThree levels - application, component, handler:
// Application-wide
builder := app.NewBuilder(func(opts *options.ApplicationOptions) {
opts.Middleware = []options.Middleware{traceMiddleware, loggingMiddleware}
})
// Component-level
builder.C("service").
Options(func(opts *options.ComponentOptions) {
opts.Middleware = append(opts.Middleware, authMiddleware)
})
// Handler-level
builder.C("service").
CPeriod(time.Second, handler, func(opts *options.CPeriodOptions) {
opts.Middleware = append(opts.Middleware, rateLimitMiddleware)
})Call types: CallStart, CallShutdown, CallCPeriod, CallCConsume, CallPConsume, CallPBlocking, CallProxy
Handlers accept flexible signatures - the framework auto-detects:
func()
func() error
func(ctx context.Context)
func(ctx context.Context) error
func(input interface{})
func(ctx context.Context, input MyType) error
func(ctx context.Context, wg *sync.WaitGroup, input interface{}) errortypedCh := make(chan MyMessage, 100)
interfaceCh := appch.ToInterfaceChan(typedCh)var ErrNotFound = apperr.Error("NotFoundError")
var ErrValidation = apperr.Error("ValidationError", apperr.WithAttribute("severity", "warning"))
// Usage
return ErrNotFound(originalErr, "user not found", apperr.WithAttribute("userId", id))
return ErrValidation("invalid email", apperr.WithAttributes(map[string]interface{}{"field": "email", "value": input}))// Middleware setup
traceMiddleware := apptrace.NewTraceMiddleware(nil, nil)
// In handlers
trace := apptrace.FromContext(ctx)
trace.WithName("custom-name")
trace.WithAttributes("key", value, "key2", value2)
trace.WithError(nonFatalErr) // record without failing
trace.WithLog(true) // force loggingCustom naming:
apptrace.NewTraceMiddleware(func(call options.Call) string {
return fmt.Sprintf("%s/%s", call.GetComponentDefinition(), call.GetCallType())
}, nil)import "github.com/yuridevx/app/appzap"
logger, _ := zap.NewProduction()
zapMiddleware := appzap.ZapMiddleware(logger, nil)
// Force/suppress logging per handler
opts.Middleware = append(opts.Middleware, appzap.LogMeMiddleware)
opts.Middleware = append(opts.Middleware, appzap.DontLogMeMiddleware)import "github.com/yuridevx/app/apprelic"
nrApp, _ := newrelic.NewApplication(...)
opts.Middleware = []options.Middleware{
apptrace.NewTraceMiddleware(nil, nil),
apprelic.NewRelicTransactionMiddleware(nrApp),
apprelic.NewRelicTraceMiddleware(),
}Trace context works with Gin's context:
func ginHandler(c *gin.Context) {
ctx, trace := apptrace.TraceContextNew(c)
trace.WithAttributes("path", c.Request.URL.Path)
result, err := service.Do(ctx)
// ...
}app.NewBuilder(func(opts *options.ApplicationOptions) {
opts.StartTimeout = 60 * time.Second // OnStart timeout
opts.ComponentShutdownTimeout = 40 * time.Second // Component shutdown
opts.AppShutdownTimeout = 20 * time.Second // App-level shutdown
})