diff --git a/cmd/wire_gen.go b/cmd/wire_gen.go index aae1c6af6..22a70f29d 100644 --- a/cmd/wire_gen.go +++ b/cmd/wire_gen.go @@ -72,7 +72,7 @@ import ( "github.com/apache/answer/internal/service/action" activity2 "github.com/apache/answer/internal/service/activity" activity_common2 "github.com/apache/answer/internal/service/activity_common" - "github.com/apache/answer/internal/service/activity_queue" + "github.com/apache/answer/internal/service/activityqueue" "github.com/apache/answer/internal/service/answer_common" auth2 "github.com/apache/answer/internal/service/auth" badge2 "github.com/apache/answer/internal/service/badge" @@ -83,14 +83,14 @@ import ( config2 "github.com/apache/answer/internal/service/config" "github.com/apache/answer/internal/service/content" "github.com/apache/answer/internal/service/dashboard" - "github.com/apache/answer/internal/service/event_queue" + "github.com/apache/answer/internal/service/eventqueue" export2 "github.com/apache/answer/internal/service/export" file_record2 "github.com/apache/answer/internal/service/file_record" "github.com/apache/answer/internal/service/follow" "github.com/apache/answer/internal/service/importer" meta2 "github.com/apache/answer/internal/service/meta" "github.com/apache/answer/internal/service/meta_common" - "github.com/apache/answer/internal/service/notice_queue" + "github.com/apache/answer/internal/service/noticequeue" "github.com/apache/answer/internal/service/notification" "github.com/apache/answer/internal/service/notification_common" "github.com/apache/answer/internal/service/object_info" @@ -172,29 +172,29 @@ func initApplication(debug bool, serverConf *conf.Server, dbConf *data.Database, tagRepo := tag.NewTagRepo(dataData, uniqueIDRepo) revisionRepo := revision.NewRevisionRepo(dataData, uniqueIDRepo) revisionService := revision_common.NewRevisionService(revisionRepo, userRepo) - activityQueueService := activity_queue.NewActivityQueueService() - tagCommonService := tag_common2.NewTagCommonService(tagCommonRepo, tagRelRepo, tagRepo, revisionService, siteInfoCommonService, activityQueueService) + v := activityqueue.NewService() + tagCommonService := tag_common2.NewTagCommonService(tagCommonRepo, tagRelRepo, tagRepo, revisionService, siteInfoCommonService, v) collectionRepo := collection.NewCollectionRepo(dataData, uniqueIDRepo) collectionCommon := collectioncommon.NewCollectionCommon(collectionRepo) answerCommon := answercommon.NewAnswerCommon(answerRepo) metaRepo := meta.NewMetaRepo(dataData) metaCommonService := metacommon.NewMetaCommonService(metaRepo) - questionCommon := questioncommon.NewQuestionCommon(questionRepo, answerRepo, voteRepo, followRepo, tagCommonService, userCommon, collectionCommon, answerCommon, metaCommonService, configService, activityQueueService, revisionRepo, siteInfoCommonService, dataData) - eventQueueService := event_queue.NewEventQueueService() + questionCommon := questioncommon.NewQuestionCommon(questionRepo, answerRepo, voteRepo, followRepo, tagCommonService, userCommon, collectionCommon, answerCommon, metaCommonService, configService, v, revisionRepo, siteInfoCommonService, dataData) + v2 := eventqueue.NewService() fileRecordRepo := file_record.NewFileRecordRepo(dataData) fileRecordService := file_record2.NewFileRecordService(fileRecordRepo, revisionRepo, serviceConf, siteInfoCommonService, userCommon) - userService := content.NewUserService(userRepo, userActiveActivityRepo, activityRepo, emailService, authService, siteInfoCommonService, userRoleRelService, userCommon, userExternalLoginService, userNotificationConfigRepo, userNotificationConfigService, questionCommon, eventQueueService, fileRecordService) + userService := content.NewUserService(userRepo, userActiveActivityRepo, activityRepo, emailService, authService, siteInfoCommonService, userRoleRelService, userCommon, userExternalLoginService, userNotificationConfigRepo, userNotificationConfigService, questionCommon, v2, fileRecordService) captchaRepo := captcha.NewCaptchaRepo(dataData) captchaService := action.NewCaptchaService(captchaRepo) userController := controller.NewUserController(authService, userService, captchaService, emailService, siteInfoCommonService, userNotificationConfigService) commentRepo := comment.NewCommentRepo(dataData, uniqueIDRepo) commentCommonRepo := comment.NewCommentCommonRepo(dataData, uniqueIDRepo) objService := object_info.NewObjService(answerRepo, questionRepo, commentCommonRepo, tagCommonRepo, tagCommonService) - notificationQueueService := notice_queue.NewNotificationQueueService() - externalNotificationQueueService := notice_queue.NewNewQuestionNotificationQueueService() + v3 := noticequeue.NewService() + v4 := noticequeue.NewExternalService() reviewRepo := review.NewReviewRepo(dataData) - reviewService := review2.NewReviewService(reviewRepo, objService, userCommon, userRepo, questionRepo, answerRepo, userRoleRelService, externalNotificationQueueService, tagCommonService, questionCommon, notificationQueueService, siteInfoCommonService, commentCommonRepo) - commentService := comment2.NewCommentService(commentRepo, commentCommonRepo, userCommon, objService, voteRepo, emailService, userRepo, notificationQueueService, externalNotificationQueueService, activityQueueService, eventQueueService, reviewService) + reviewService := review2.NewReviewService(reviewRepo, objService, userCommon, userRepo, questionRepo, answerRepo, userRoleRelService, v4, tagCommonService, questionCommon, v3, siteInfoCommonService, commentCommonRepo) + commentService := comment2.NewCommentService(commentRepo, commentCommonRepo, userCommon, objService, voteRepo, emailService, userRepo, v3, v4, v, v2, reviewService) rolePowerRelRepo := role.NewRolePowerRelRepo(dataData) rolePowerRelService := role2.NewRolePowerRelService(rolePowerRelRepo, userRoleRelService) rankService := rank2.NewRankService(userCommon, userRankRepo, objService, userRoleRelService, rolePowerRelService, configService) @@ -202,17 +202,17 @@ func initApplication(debug bool, serverConf *conf.Server, dbConf *data.Database, rateLimitMiddleware := middleware.NewRateLimitMiddleware(limitRepo) commentController := controller.NewCommentController(commentService, rankService, captchaService, rateLimitMiddleware) reportRepo := report.NewReportRepo(dataData, uniqueIDRepo) - tagService := tag2.NewTagService(tagRepo, tagCommonService, revisionService, followRepo, siteInfoCommonService, activityQueueService) - answerActivityRepo := activity.NewAnswerActivityRepo(dataData, activityRepo, userRankRepo, notificationQueueService) + tagService := tag2.NewTagService(tagRepo, tagCommonService, revisionService, followRepo, siteInfoCommonService, v) + answerActivityRepo := activity.NewAnswerActivityRepo(dataData, activityRepo, userRankRepo, v3) answerActivityService := activity2.NewAnswerActivityService(answerActivityRepo, configService) - externalNotificationService := notification.NewExternalNotificationService(dataData, userNotificationConfigRepo, followRepo, emailService, userRepo, externalNotificationQueueService, userExternalLoginRepo, siteInfoCommonService) - questionService := content.NewQuestionService(activityRepo, questionRepo, answerRepo, tagCommonService, tagService, questionCommon, userCommon, userRepo, userRoleRelService, revisionService, metaCommonService, collectionCommon, answerActivityService, emailService, notificationQueueService, externalNotificationQueueService, activityQueueService, siteInfoCommonService, externalNotificationService, reviewService, configService, eventQueueService, reviewRepo) - answerService := content.NewAnswerService(answerRepo, questionRepo, questionCommon, userCommon, collectionCommon, userRepo, revisionService, answerActivityService, answerCommon, voteRepo, emailService, userRoleRelService, notificationQueueService, externalNotificationQueueService, activityQueueService, reviewService, eventQueueService) + externalNotificationService := notification.NewExternalNotificationService(dataData, userNotificationConfigRepo, followRepo, emailService, userRepo, v4, userExternalLoginRepo, siteInfoCommonService) + questionService := content.NewQuestionService(activityRepo, questionRepo, answerRepo, tagCommonService, tagService, questionCommon, userCommon, userRepo, userRoleRelService, revisionService, metaCommonService, collectionCommon, answerActivityService, emailService, v3, v4, v, siteInfoCommonService, externalNotificationService, reviewService, configService, v2, reviewRepo) + answerService := content.NewAnswerService(answerRepo, questionRepo, questionCommon, userCommon, collectionCommon, userRepo, revisionService, answerActivityService, answerCommon, voteRepo, emailService, userRoleRelService, v3, v4, v, reviewService, v2) reportHandle := report_handle.NewReportHandle(questionService, answerService, commentService) - reportService := report2.NewReportService(reportRepo, objService, userCommon, answerRepo, questionRepo, commentCommonRepo, reportHandle, configService, eventQueueService) + reportService := report2.NewReportService(reportRepo, objService, userCommon, answerRepo, questionRepo, commentCommonRepo, reportHandle, configService, v2) reportController := controller.NewReportController(reportService, rankService, captchaService) - contentVoteRepo := activity.NewVoteRepo(dataData, activityRepo, userRankRepo, notificationQueueService) - voteService := content.NewVoteService(contentVoteRepo, configService, questionRepo, answerRepo, commentCommonRepo, objService, eventQueueService) + contentVoteRepo := activity.NewVoteRepo(dataData, activityRepo, userRankRepo, v3) + voteService := content.NewVoteService(contentVoteRepo, configService, questionRepo, answerRepo, commentCommonRepo, objService, v2) voteController := controller.NewVoteController(voteService, rankService, captchaService) tagController := controller.NewTagController(tagService, tagCommonService, rankService) followFollowRepo := activity.NewFollowRepo(dataData, uniqueIDRepo, activityRepo) @@ -228,7 +228,7 @@ func initApplication(debug bool, serverConf *conf.Server, dbConf *data.Database, searchService := content.NewSearchService(searchParser, searchRepo) searchController := controller.NewSearchController(searchService, captchaService) reviewActivityRepo := activity.NewReviewActivityRepo(dataData, activityRepo, userRankRepo, configService) - contentRevisionService := content.NewRevisionService(revisionRepo, userCommon, questionCommon, answerService, objService, questionRepo, answerRepo, tagRepo, tagCommonService, notificationQueueService, activityQueueService, reportRepo, reviewService, reviewActivityRepo) + contentRevisionService := content.NewRevisionService(revisionRepo, userCommon, questionCommon, answerService, objService, questionRepo, answerRepo, tagRepo, tagCommonService, v3, v, reportRepo, reviewService, reviewActivityRepo) revisionController := controller.NewRevisionController(contentRevisionService, rankService) rankController := controller.NewRankController(rankService) userAdminRepo := user.NewUserAdminRepo(dataData, authRepo) @@ -244,7 +244,7 @@ func initApplication(debug bool, serverConf *conf.Server, dbConf *data.Database, siteInfoService := siteinfo.NewSiteInfoService(siteInfoRepo, siteInfoCommonService, emailService, tagCommonService, configService, questionCommon, fileRecordService) siteInfoController := controller_admin.NewSiteInfoController(siteInfoService) controllerSiteInfoController := controller.NewSiteInfoController(siteInfoCommonService) - notificationCommon := notificationcommon.NewNotificationCommon(dataData, notificationRepo, userCommon, activityRepo, followRepo, objService, notificationQueueService, userExternalLoginRepo, siteInfoCommonService) + notificationCommon := notificationcommon.NewNotificationCommon(dataData, notificationRepo, userCommon, activityRepo, followRepo, objService, v3, userExternalLoginRepo, siteInfoCommonService) badgeRepo := badge.NewBadgeRepo(dataData, uniqueIDRepo) notificationService := notification.NewNotificationService(dataData, notificationRepo, notificationCommon, revisionService, userRepo, reportRepo, reviewService, badgeRepo) notificationController := controller.NewNotificationController(notificationService, rankService) @@ -253,7 +253,7 @@ func initApplication(debug bool, serverConf *conf.Server, dbConf *data.Database, uploaderService := uploader.NewUploaderService(serviceConf, siteInfoCommonService, fileRecordService) uploadController := controller.NewUploadController(uploaderService) activityActivityRepo := activity.NewActivityRepo(dataData, configService) - activityCommon := activity_common2.NewActivityCommon(activityRepo, activityQueueService) + activityCommon := activity_common2.NewActivityCommon(activityRepo, v) commentCommonService := comment_common.NewCommentCommonService(commentCommonRepo) activityService := activity2.NewActivityService(activityActivityRepo, userCommon, activityCommon, tagCommonService, objService, commentCommonService, revisionService, metaCommonService, configService) activityController := controller.NewActivityController(activityService) @@ -265,12 +265,12 @@ func initApplication(debug bool, serverConf *conf.Server, dbConf *data.Database, permissionController := controller.NewPermissionController(rankService) userPluginController := controller.NewUserPluginController(pluginCommonService) reviewController := controller.NewReviewController(reviewService, rankService, captchaService) - metaService := meta2.NewMetaService(metaCommonService, userCommon, answerRepo, questionRepo, eventQueueService) + metaService := meta2.NewMetaService(metaCommonService, userCommon, answerRepo, questionRepo, v2) metaController := controller.NewMetaController(metaService) badgeGroupRepo := badge_group.NewBadgeGroupRepo(dataData, uniqueIDRepo) eventRuleRepo := badge.NewEventRuleRepo(dataData) - badgeAwardService := badge2.NewBadgeAwardService(badgeAwardRepo, badgeRepo, userCommon, objService, notificationQueueService) - badgeEventService := badge2.NewBadgeEventService(dataData, eventQueueService, badgeRepo, eventRuleRepo, badgeAwardService) + badgeAwardService := badge2.NewBadgeAwardService(badgeAwardRepo, badgeRepo, userCommon, objService, v3) + badgeEventService := badge2.NewBadgeEventService(dataData, v2, badgeRepo, eventRuleRepo, badgeAwardService) badgeService := badge2.NewBadgeService(badgeRepo, badgeGroupRepo, badgeAwardRepo, badgeEventService, siteInfoCommonService) badgeController := controller.NewBadgeController(badgeService, badgeAwardService) controller_adminBadgeController := controller_admin.NewBadgeController(badgeService) @@ -281,7 +281,7 @@ func initApplication(debug bool, serverConf *conf.Server, dbConf *data.Database, avatarMiddleware := middleware.NewAvatarMiddleware(serviceConf, uploaderService) shortIDMiddleware := middleware.NewShortIDMiddleware(siteInfoCommonService) templateRenderController := templaterender.NewTemplateRenderController(questionService, userService, tagService, answerService, commentService, siteInfoCommonService, questionRepo) - templateController := controller.NewTemplateController(templateRenderController, siteInfoCommonService, eventQueueService, userService, questionService) + templateController := controller.NewTemplateController(templateRenderController, siteInfoCommonService, v2, userService, questionService) templateRouter := router.NewTemplateRouter(templateController, templateRenderController, siteInfoController, authUserMiddleware) connectorController := controller.NewConnectorController(siteInfoCommonService, emailService, userExternalLoginService) userCenterLoginService := user_external_login2.NewUserCenterLoginService(userRepo, userCommon, userExternalLoginRepo, userActiveActivityRepo, siteInfoCommonService) diff --git a/internal/base/queue/queue.go b/internal/base/queue/queue.go new file mode 100644 index 000000000..ae23d341c --- /dev/null +++ b/internal/base/queue/queue.go @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package queue + +import ( + "context" + "sync" + + "github.com/segmentfault/pacman/log" +) + +// Queue is a generic message queue service that processes messages asynchronously. +// It is thread-safe and supports graceful shutdown. +type Queue[T any] struct { + name string + queue chan T + handler func(ctx context.Context, msg T) error + mu sync.RWMutex + closed bool + wg sync.WaitGroup +} + +// New creates a new queue with the given name and buffer size. +func New[T any](name string, bufferSize int) *Queue[T] { + q := &Queue[T]{ + name: name, + queue: make(chan T, bufferSize), + } + q.startWorker() + return q +} + +// Send enqueues a message to be processed asynchronously. +// It will block if the queue is full. +func (q *Queue[T]) Send(ctx context.Context, msg T) { + q.mu.RLock() + closed := q.closed + q.mu.RUnlock() + + if closed { + log.Warnf("[%s] queue is closed, dropping message", q.name) + return + } + + select { + case q.queue <- msg: + log.Debugf("[%s] enqueued message: %+v", q.name, msg) + case <-ctx.Done(): + log.Warnf("[%s] context cancelled while sending message", q.name) + } +} + +// RegisterHandler sets the handler function for processing messages. +// This is thread-safe and can be called at any time. +func (q *Queue[T]) RegisterHandler(handler func(ctx context.Context, msg T) error) { + q.mu.Lock() + defer q.mu.Unlock() + q.handler = handler +} + +// Close gracefully shuts down the queue, waiting for pending messages to be processed. +func (q *Queue[T]) Close() { + q.mu.Lock() + if q.closed { + q.mu.Unlock() + return + } + q.closed = true + q.mu.Unlock() + + close(q.queue) + q.wg.Wait() + log.Infof("[%s] queue closed", q.name) +} + +// startWorker starts the background goroutine that processes messages. +func (q *Queue[T]) startWorker() { + q.wg.Add(1) + go func() { + defer q.wg.Done() + for msg := range q.queue { + q.processMessage(msg) + } + }() +} + +// processMessage handles a single message with proper synchronization. +func (q *Queue[T]) processMessage(msg T) { + q.mu.RLock() + handler := q.handler + q.mu.RUnlock() + + if handler == nil { + log.Warnf("[%s] no handler registered, dropping message: %+v", q.name, msg) + return + } + + // Use background context for async processing + // TODO: Consider adding timeout or using a derived context + if err := handler(context.TODO(), msg); err != nil { + log.Errorf("[%s] handler error: %v", q.name, err) + } +} diff --git a/internal/base/queue/queue_test.go b/internal/base/queue/queue_test.go new file mode 100644 index 000000000..79355fb72 --- /dev/null +++ b/internal/base/queue/queue_test.go @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package queue + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" +) + +type testMessage struct { + ID int + Data string +} + +func TestQueue_SendAndReceive(t *testing.T) { + q := New[*testMessage]("test", 10) + defer q.Close() + + received := make(chan *testMessage, 1) + q.RegisterHandler(func(ctx context.Context, msg *testMessage) error { + received <- msg + return nil + }) + + msg := &testMessage{ID: 1, Data: "hello"} + q.Send(context.Background(), msg) + + select { + case r := <-received: + if r.ID != msg.ID || r.Data != msg.Data { + t.Errorf("received message mismatch: got %+v, want %+v", r, msg) + } + case <-time.After(time.Second): + t.Fatal("timeout waiting for message") + } +} + +func TestQueue_MultipleMessages(t *testing.T) { + q := New[*testMessage]("test", 10) + defer q.Close() + + var count atomic.Int32 + var wg sync.WaitGroup + numMessages := 100 + wg.Add(numMessages) + + q.RegisterHandler(func(ctx context.Context, msg *testMessage) error { + count.Add(1) + wg.Done() + return nil + }) + + for i := range numMessages { + q.Send(context.Background(), &testMessage{ID: i}) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + if int(count.Load()) != numMessages { + t.Errorf("expected %d messages, got %d", numMessages, count.Load()) + } + case <-time.After(5 * time.Second): + t.Fatalf("timeout: only received %d of %d messages", count.Load(), numMessages) + } +} + +func TestQueue_NoHandlerDropsMessage(t *testing.T) { + q := New[*testMessage]("test", 10) + defer q.Close() + + // Send without handler - should not panic + q.Send(context.Background(), &testMessage{ID: 1}) + + // Give time for the message to be processed (dropped) + time.Sleep(100 * time.Millisecond) +} + +func TestQueue_RegisterHandlerAfterSend(t *testing.T) { + q := New[*testMessage]("test", 10) + defer q.Close() + + received := make(chan *testMessage, 1) + + // Send first + q.Send(context.Background(), &testMessage{ID: 1}) + + // Small delay then register handler + time.Sleep(50 * time.Millisecond) + q.RegisterHandler(func(ctx context.Context, msg *testMessage) error { + received <- msg + return nil + }) + + // Send another message that should be received + q.Send(context.Background(), &testMessage{ID: 2}) + + select { + case r := <-received: + if r.ID != 2 { + // First message was dropped (no handler), second should be received + t.Logf("received message ID: %d", r.ID) + } + case <-time.After(time.Second): + t.Fatal("timeout waiting for message") + } +} + +func TestQueue_Close(t *testing.T) { + q := New[*testMessage]("test", 10) + + var count atomic.Int32 + q.RegisterHandler(func(ctx context.Context, msg *testMessage) error { + count.Add(1) + return nil + }) + + // Send some messages + for i := range 5 { + q.Send(context.Background(), &testMessage{ID: i}) + } + + // Close and wait + q.Close() + + // All messages should have been processed + if count.Load() != 5 { + t.Errorf("expected 5 messages processed, got %d", count.Load()) + } + + // Sending after close should not panic + q.Send(context.Background(), &testMessage{ID: 99}) +} + +func TestQueue_ConcurrentSend(t *testing.T) { + q := New[*testMessage]("test", 100) + defer q.Close() + + var count atomic.Int32 + q.RegisterHandler(func(ctx context.Context, msg *testMessage) error { + count.Add(1) + return nil + }) + + var wg sync.WaitGroup + numGoroutines := 10 + messagesPerGoroutine := 100 + + for i := range numGoroutines { + wg.Add(1) + go func(id int) { + defer wg.Done() + for j := range messagesPerGoroutine { + q.Send(context.Background(), &testMessage{ID: id*1000 + j}) + } + }(i) + } + + wg.Wait() + + // Wait for processing + time.Sleep(500 * time.Millisecond) + + expected := int32(numGoroutines * messagesPerGoroutine) + if count.Load() != expected { + t.Errorf("expected %d messages, got %d", expected, count.Load()) + } +} + +func TestQueue_ConcurrentRegisterHandler(t *testing.T) { + q := New[*testMessage]("test", 10) + defer q.Close() + + // Concurrently register handlers - should not race + var wg sync.WaitGroup + for range 10 { + wg.Add(1) + go func() { + defer wg.Done() + q.RegisterHandler(func(ctx context.Context, msg *testMessage) error { + return nil + }) + }() + } + wg.Wait() +} diff --git a/internal/controller/template_controller.go b/internal/controller/template_controller.go index 257b02fa4..e6b94f4f2 100644 --- a/internal/controller/template_controller.go +++ b/internal/controller/template_controller.go @@ -32,7 +32,7 @@ import ( "github.com/apache/answer/internal/base/middleware" "github.com/apache/answer/internal/base/pager" "github.com/apache/answer/internal/service/content" - "github.com/apache/answer/internal/service/event_queue" + "github.com/apache/answer/internal/service/eventqueue" "github.com/apache/answer/plugin" "github.com/apache/answer/internal/base/constant" @@ -59,7 +59,7 @@ type TemplateController struct { cssPath string templateRenderController *templaterender.TemplateRenderController siteInfoService siteinfo_common.SiteInfoCommonService - eventQueueService event_queue.EventQueueService + eventQueueService eventqueue.Service userService *content.UserService questionService *content.QuestionService } @@ -68,7 +68,7 @@ type TemplateController struct { func NewTemplateController( templateRenderController *templaterender.TemplateRenderController, siteInfoService siteinfo_common.SiteInfoCommonService, - eventQueueService event_queue.EventQueueService, + eventQueueService eventqueue.Service, userService *content.UserService, questionService *content.QuestionService, ) *TemplateController { diff --git a/internal/repo/activity/answer_repo.go b/internal/repo/activity/answer_repo.go index 4aca874a7..96813f50c 100644 --- a/internal/repo/activity/answer_repo.go +++ b/internal/repo/activity/answer_repo.go @@ -34,7 +34,7 @@ import ( "github.com/apache/answer/internal/schema" "github.com/apache/answer/internal/service/activity" "github.com/apache/answer/internal/service/activity_common" - "github.com/apache/answer/internal/service/notice_queue" + "github.com/apache/answer/internal/service/noticequeue" "github.com/apache/answer/internal/service/rank" "github.com/apache/answer/pkg/converter" "github.com/segmentfault/pacman/errors" @@ -46,7 +46,7 @@ type AnswerActivityRepo struct { data *data.Data activityRepo activity_common.ActivityRepo userRankRepo rank.UserRankRepo - notificationQueueService notice_queue.NotificationQueueService + notificationQueueService noticequeue.Service } // NewAnswerActivityRepo new repository @@ -54,7 +54,7 @@ func NewAnswerActivityRepo( data *data.Data, activityRepo activity_common.ActivityRepo, userRankRepo rank.UserRankRepo, - notificationQueueService notice_queue.NotificationQueueService, + notificationQueueService noticequeue.Service, ) activity.AnswerActivityRepo { return &AnswerActivityRepo{ data: data, diff --git a/internal/repo/activity/vote_repo.go b/internal/repo/activity/vote_repo.go index f2d2be5f8..389ae18d8 100644 --- a/internal/repo/activity/vote_repo.go +++ b/internal/repo/activity/vote_repo.go @@ -28,7 +28,7 @@ import ( "github.com/segmentfault/pacman/log" "github.com/apache/answer/internal/base/constant" - "github.com/apache/answer/internal/service/notice_queue" + "github.com/apache/answer/internal/service/noticequeue" "github.com/apache/answer/pkg/converter" "github.com/apache/answer/internal/base/pager" @@ -51,7 +51,7 @@ type VoteRepo struct { data *data.Data activityRepo activity_common.ActivityRepo userRankRepo rank.UserRankRepo - notificationQueueService notice_queue.NotificationQueueService + notificationQueueService noticequeue.Service } // NewVoteRepo new repository @@ -59,7 +59,7 @@ func NewVoteRepo( data *data.Data, activityRepo activity_common.ActivityRepo, userRankRepo rank.UserRankRepo, - notificationQueueService notice_queue.NotificationQueueService, + notificationQueueService noticequeue.Service, ) content.VoteRepo { return &VoteRepo{ data: data, diff --git a/internal/service/activity_common/activity.go b/internal/service/activity_common/activity.go index 74f73a755..3d2efd6a3 100644 --- a/internal/service/activity_common/activity.go +++ b/internal/service/activity_common/activity.go @@ -25,7 +25,7 @@ import ( "github.com/apache/answer/internal/entity" "github.com/apache/answer/internal/schema" - "github.com/apache/answer/internal/service/activity_queue" + "github.com/apache/answer/internal/service/activityqueue" "github.com/apache/answer/pkg/converter" "github.com/apache/answer/pkg/uid" "github.com/segmentfault/pacman/log" @@ -49,13 +49,13 @@ type ActivityRepo interface { type ActivityCommon struct { activityRepo ActivityRepo - activityQueueService activity_queue.ActivityQueueService + activityQueueService activityqueue.Service } // NewActivityCommon new activity common func NewActivityCommon( activityRepo ActivityRepo, - activityQueueService activity_queue.ActivityQueueService, + activityQueueService activityqueue.Service, ) *ActivityCommon { activity := &ActivityCommon{ activityRepo: activityRepo, diff --git a/internal/service/activity_queue/activity_queue.go b/internal/service/activity_queue/activity_queue.go deleted file mode 100644 index 7b8c1e3b8..000000000 --- a/internal/service/activity_queue/activity_queue.go +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package activity_queue - -import ( - "context" - - "github.com/apache/answer/internal/schema" - "github.com/segmentfault/pacman/log" -) - -type ActivityQueueService interface { - Send(ctx context.Context, msg *schema.ActivityMsg) - RegisterHandler(handler func(ctx context.Context, msg *schema.ActivityMsg) error) -} - -type activityQueueService struct { - Queue chan *schema.ActivityMsg - Handler func(ctx context.Context, msg *schema.ActivityMsg) error -} - -func (ns *activityQueueService) Send(ctx context.Context, msg *schema.ActivityMsg) { - ns.Queue <- msg -} - -func (ns *activityQueueService) RegisterHandler( - handler func(ctx context.Context, msg *schema.ActivityMsg) error) { - ns.Handler = handler -} - -func (ns *activityQueueService) working() { - go func() { - for msg := range ns.Queue { - log.Debugf("received activity %+v", msg) - if ns.Handler == nil { - log.Warnf("no handler for activity") - continue - } - if err := ns.Handler(context.Background(), msg); err != nil { - log.Error(err) - } - } - }() -} - -// NewActivityQueueService create a new activity queue service -func NewActivityQueueService() ActivityQueueService { - ns := &activityQueueService{} - ns.Queue = make(chan *schema.ActivityMsg, 128) - ns.working() - return ns -} diff --git a/internal/service/activityqueue/activity_queue.go b/internal/service/activityqueue/activity_queue.go new file mode 100644 index 000000000..d32caf5e9 --- /dev/null +++ b/internal/service/activityqueue/activity_queue.go @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package activityqueue + +import ( + "github.com/apache/answer/internal/base/queue" + "github.com/apache/answer/internal/schema" +) + +type Service = *queue.Queue[*schema.ActivityMsg] + +func NewService() Service { + return queue.New[*schema.ActivityMsg]("activity", 128) +} diff --git a/internal/service/badge/badge_award_service.go b/internal/service/badge/badge_award_service.go index 982c1d1a4..0799b87c0 100644 --- a/internal/service/badge/badge_award_service.go +++ b/internal/service/badge/badge_award_service.go @@ -28,7 +28,7 @@ import ( "github.com/apache/answer/internal/base/translator" "github.com/apache/answer/internal/entity" "github.com/apache/answer/internal/schema" - "github.com/apache/answer/internal/service/notice_queue" + "github.com/apache/answer/internal/service/noticequeue" "github.com/apache/answer/internal/service/object_info" usercommon "github.com/apache/answer/internal/service/user_common" "github.com/apache/answer/pkg/uid" @@ -62,7 +62,7 @@ type BadgeAwardService struct { badgeRepo BadgeRepo userCommon *usercommon.UserCommon objectInfoService *object_info.ObjService - notificationQueueService notice_queue.NotificationQueueService + notificationQueueService noticequeue.Service } func NewBadgeAwardService( @@ -70,7 +70,7 @@ func NewBadgeAwardService( badgeRepo BadgeRepo, userCommon *usercommon.UserCommon, objectInfoService *object_info.ObjService, - notificationQueueService notice_queue.NotificationQueueService, + notificationQueueService noticequeue.Service, ) *BadgeAwardService { return &BadgeAwardService{ badgeAwardRepo: badgeAwardRepo, diff --git a/internal/service/badge/badge_event_handler.go b/internal/service/badge/badge_event_handler.go index 24cabf29b..0a9a84c0f 100644 --- a/internal/service/badge/badge_event_handler.go +++ b/internal/service/badge/badge_event_handler.go @@ -25,13 +25,13 @@ import ( "github.com/apache/answer/internal/base/data" "github.com/apache/answer/internal/entity" "github.com/apache/answer/internal/schema" - "github.com/apache/answer/internal/service/event_queue" + "github.com/apache/answer/internal/service/eventqueue" "github.com/segmentfault/pacman/log" ) type BadgeEventService struct { data *data.Data - eventQueueService event_queue.EventQueueService + eventQueueService eventqueue.Service badgeRepo BadgeRepo eventRuleRepo EventRuleRepo badgeAwardService *BadgeAwardService @@ -45,7 +45,7 @@ type EventRuleRepo interface { func NewBadgeEventService( data *data.Data, - eventQueueService event_queue.EventQueueService, + eventQueueService eventqueue.Service, badgeRepo BadgeRepo, eventRuleRepo EventRuleRepo, badgeAwardService *BadgeAwardService, diff --git a/internal/service/comment/comment_service.go b/internal/service/comment/comment_service.go index dc599e6df..30ff43c6b 100644 --- a/internal/service/comment/comment_service.go +++ b/internal/service/comment/comment_service.go @@ -22,7 +22,7 @@ package comment import ( "context" - "github.com/apache/answer/internal/service/event_queue" + "github.com/apache/answer/internal/service/eventqueue" "github.com/apache/answer/internal/service/review" "time" @@ -33,10 +33,10 @@ import ( "github.com/apache/answer/internal/entity" "github.com/apache/answer/internal/schema" "github.com/apache/answer/internal/service/activity_common" - "github.com/apache/answer/internal/service/activity_queue" + "github.com/apache/answer/internal/service/activityqueue" "github.com/apache/answer/internal/service/comment_common" "github.com/apache/answer/internal/service/export" - "github.com/apache/answer/internal/service/notice_queue" + "github.com/apache/answer/internal/service/noticequeue" "github.com/apache/answer/internal/service/object_info" "github.com/apache/answer/internal/service/permission" usercommon "github.com/apache/answer/internal/service/user_common" @@ -88,10 +88,10 @@ type CommentService struct { objectInfoService *object_info.ObjService emailService *export.EmailService userRepo usercommon.UserRepo - notificationQueueService notice_queue.NotificationQueueService - externalNotificationQueueService notice_queue.ExternalNotificationQueueService - activityQueueService activity_queue.ActivityQueueService - eventQueueService event_queue.EventQueueService + notificationQueueService noticequeue.Service + externalNotificationQueueService noticequeue.ExternalService + activityQueueService activityqueue.Service + eventQueueService eventqueue.Service reviewService *review.ReviewService } @@ -104,10 +104,10 @@ func NewCommentService( voteCommon activity_common.VoteRepo, emailService *export.EmailService, userRepo usercommon.UserRepo, - notificationQueueService notice_queue.NotificationQueueService, - externalNotificationQueueService notice_queue.ExternalNotificationQueueService, - activityQueueService activity_queue.ActivityQueueService, - eventQueueService event_queue.EventQueueService, + notificationQueueService noticequeue.Service, + externalNotificationQueueService noticequeue.ExternalService, + activityQueueService activityqueue.Service, + eventQueueService eventqueue.Service, reviewService *review.ReviewService, ) *CommentService { return &CommentService{ diff --git a/internal/service/content/answer_service.go b/internal/service/content/answer_service.go index f904b82f0..0e5914048 100644 --- a/internal/service/content/answer_service.go +++ b/internal/service/content/answer_service.go @@ -24,7 +24,7 @@ import ( "encoding/json" "time" - "github.com/apache/answer/internal/service/event_queue" + "github.com/apache/answer/internal/service/eventqueue" "github.com/apache/answer/internal/base/constant" "github.com/apache/answer/internal/base/reason" @@ -32,11 +32,11 @@ import ( "github.com/apache/answer/internal/schema" "github.com/apache/answer/internal/service/activity" "github.com/apache/answer/internal/service/activity_common" - "github.com/apache/answer/internal/service/activity_queue" + "github.com/apache/answer/internal/service/activityqueue" answercommon "github.com/apache/answer/internal/service/answer_common" collectioncommon "github.com/apache/answer/internal/service/collection_common" "github.com/apache/answer/internal/service/export" - "github.com/apache/answer/internal/service/notice_queue" + "github.com/apache/answer/internal/service/noticequeue" "github.com/apache/answer/internal/service/permission" questioncommon "github.com/apache/answer/internal/service/question_common" "github.com/apache/answer/internal/service/review" @@ -65,11 +65,11 @@ type AnswerService struct { voteRepo activity_common.VoteRepo emailService *export.EmailService roleService *role.UserRoleRelService - notificationQueueService notice_queue.NotificationQueueService - externalNotificationQueueService notice_queue.ExternalNotificationQueueService - activityQueueService activity_queue.ActivityQueueService + notificationQueueService noticequeue.Service + externalNotificationQueueService noticequeue.ExternalService + activityQueueService activityqueue.Service reviewService *review.ReviewService - eventQueueService event_queue.EventQueueService + eventQueueService eventqueue.Service } func NewAnswerService( @@ -85,11 +85,11 @@ func NewAnswerService( voteRepo activity_common.VoteRepo, emailService *export.EmailService, roleService *role.UserRoleRelService, - notificationQueueService notice_queue.NotificationQueueService, - externalNotificationQueueService notice_queue.ExternalNotificationQueueService, - activityQueueService activity_queue.ActivityQueueService, + notificationQueueService noticequeue.Service, + externalNotificationQueueService noticequeue.ExternalService, + activityQueueService activityqueue.Service, reviewService *review.ReviewService, - eventQueueService event_queue.EventQueueService, + eventQueueService eventqueue.Service, ) *AnswerService { return &AnswerService{ answerRepo: answerRepo, diff --git a/internal/service/content/question_service.go b/internal/service/content/question_service.go index b8372a72e..bc3ac0bb6 100644 --- a/internal/service/content/question_service.go +++ b/internal/service/content/question_service.go @@ -25,7 +25,7 @@ import ( "strings" "time" - "github.com/apache/answer/internal/service/event_queue" + "github.com/apache/answer/internal/service/eventqueue" "github.com/apache/answer/plugin" "github.com/apache/answer/internal/base/constant" @@ -38,13 +38,13 @@ import ( "github.com/apache/answer/internal/schema" "github.com/apache/answer/internal/service/activity" "github.com/apache/answer/internal/service/activity_common" - "github.com/apache/answer/internal/service/activity_queue" + "github.com/apache/answer/internal/service/activityqueue" answercommon "github.com/apache/answer/internal/service/answer_common" collectioncommon "github.com/apache/answer/internal/service/collection_common" "github.com/apache/answer/internal/service/config" "github.com/apache/answer/internal/service/export" metacommon "github.com/apache/answer/internal/service/meta_common" - "github.com/apache/answer/internal/service/notice_queue" + "github.com/apache/answer/internal/service/noticequeue" "github.com/apache/answer/internal/service/notification" "github.com/apache/answer/internal/service/permission" questioncommon "github.com/apache/answer/internal/service/question_common" @@ -84,14 +84,14 @@ type QuestionService struct { collectionCommon *collectioncommon.CollectionCommon answerActivityService *activity.AnswerActivityService emailService *export.EmailService - notificationQueueService notice_queue.NotificationQueueService - externalNotificationQueueService notice_queue.ExternalNotificationQueueService - activityQueueService activity_queue.ActivityQueueService + notificationQueueService noticequeue.Service + externalNotificationQueueService noticequeue.ExternalService + activityQueueService activityqueue.Service siteInfoService siteinfo_common.SiteInfoCommonService newQuestionNotificationService *notification.ExternalNotificationService reviewService *review.ReviewService configService *config.ConfigService - eventQueueService event_queue.EventQueueService + eventQueueService eventqueue.Service reviewRepo review.ReviewRepo } @@ -110,14 +110,14 @@ func NewQuestionService( collectionCommon *collectioncommon.CollectionCommon, answerActivityService *activity.AnswerActivityService, emailService *export.EmailService, - notificationQueueService notice_queue.NotificationQueueService, - externalNotificationQueueService notice_queue.ExternalNotificationQueueService, - activityQueueService activity_queue.ActivityQueueService, + notificationQueueService noticequeue.Service, + externalNotificationQueueService noticequeue.ExternalService, + activityQueueService activityqueue.Service, siteInfoService siteinfo_common.SiteInfoCommonService, newQuestionNotificationService *notification.ExternalNotificationService, reviewService *review.ReviewService, configService *config.ConfigService, - eventQueueService event_queue.EventQueueService, + eventQueueService eventqueue.Service, reviewRepo review.ReviewRepo, ) *QuestionService { return &QuestionService{ diff --git a/internal/service/content/revision_service.go b/internal/service/content/revision_service.go index 4ac08e769..13ec65b7d 100644 --- a/internal/service/content/revision_service.go +++ b/internal/service/content/revision_service.go @@ -32,9 +32,9 @@ import ( "github.com/apache/answer/internal/entity" "github.com/apache/answer/internal/schema" "github.com/apache/answer/internal/service/activity" - "github.com/apache/answer/internal/service/activity_queue" + "github.com/apache/answer/internal/service/activityqueue" answercommon "github.com/apache/answer/internal/service/answer_common" - "github.com/apache/answer/internal/service/notice_queue" + "github.com/apache/answer/internal/service/noticequeue" "github.com/apache/answer/internal/service/object_info" questioncommon "github.com/apache/answer/internal/service/question_common" "github.com/apache/answer/internal/service/report_common" @@ -62,8 +62,8 @@ type RevisionService struct { answerRepo answercommon.AnswerRepo tagRepo tag_common.TagRepo tagCommon *tag_common.TagCommonService - notificationQueueService notice_queue.NotificationQueueService - activityQueueService activity_queue.ActivityQueueService + notificationQueueService noticequeue.Service + activityQueueService activityqueue.Service reportRepo report_common.ReportRepo reviewService *review.ReviewService reviewActivity activity.ReviewActivityRepo @@ -79,8 +79,8 @@ func NewRevisionService( answerRepo answercommon.AnswerRepo, tagRepo tag_common.TagRepo, tagCommon *tag_common.TagCommonService, - notificationQueueService notice_queue.NotificationQueueService, - activityQueueService activity_queue.ActivityQueueService, + notificationQueueService noticequeue.Service, + activityQueueService activityqueue.Service, reportRepo report_common.ReportRepo, reviewService *review.ReviewService, reviewActivity activity.ReviewActivityRepo, diff --git a/internal/service/content/user_service.go b/internal/service/content/user_service.go index e9cc35788..711d6caa0 100644 --- a/internal/service/content/user_service.go +++ b/internal/service/content/user_service.go @@ -25,7 +25,7 @@ import ( "fmt" "time" - "github.com/apache/answer/internal/service/event_queue" + "github.com/apache/answer/internal/service/eventqueue" "github.com/apache/answer/pkg/token" "github.com/apache/answer/internal/base/constant" @@ -68,7 +68,7 @@ type UserService struct { userNotificationConfigRepo user_notification_config.UserNotificationConfigRepo userNotificationConfigService *user_notification_config.UserNotificationConfigService questionService *questioncommon.QuestionCommon - eventQueueService event_queue.EventQueueService + eventQueueService eventqueue.Service fileRecordService *file_record.FileRecordService } @@ -84,7 +84,7 @@ func NewUserService(userRepo usercommon.UserRepo, userNotificationConfigRepo user_notification_config.UserNotificationConfigRepo, userNotificationConfigService *user_notification_config.UserNotificationConfigService, questionService *questioncommon.QuestionCommon, - eventQueueService event_queue.EventQueueService, + eventQueueService eventqueue.Service, fileRecordService *file_record.FileRecordService, ) *UserService { return &UserService{ diff --git a/internal/service/content/vote_service.go b/internal/service/content/vote_service.go index aa6150497..1f74769f5 100644 --- a/internal/service/content/vote_service.go +++ b/internal/service/content/vote_service.go @@ -24,7 +24,7 @@ import ( "fmt" "strings" - "github.com/apache/answer/internal/service/event_queue" + "github.com/apache/answer/internal/service/eventqueue" "github.com/apache/answer/internal/base/constant" "github.com/apache/answer/internal/base/handler" @@ -62,7 +62,7 @@ type VoteService struct { answerRepo answercommon.AnswerRepo commentCommonRepo comment_common.CommentCommonRepo objectService *object_info.ObjService - eventQueueService event_queue.EventQueueService + eventQueueService eventqueue.Service } func NewVoteService( @@ -72,7 +72,7 @@ func NewVoteService( answerRepo answercommon.AnswerRepo, commentCommonRepo comment_common.CommentCommonRepo, objectService *object_info.ObjService, - eventQueueService event_queue.EventQueueService, + eventQueueService eventqueue.Service, ) *VoteService { return &VoteService{ voteRepo: voteRepo, diff --git a/internal/service/event_queue/event_queue.go b/internal/service/event_queue/event_queue.go deleted file mode 100644 index 77dc302b5..000000000 --- a/internal/service/event_queue/event_queue.go +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package event_queue - -import ( - "context" - - "github.com/apache/answer/internal/schema" - "github.com/segmentfault/pacman/log" -) - -type EventQueueService interface { - Send(ctx context.Context, msg *schema.EventMsg) - RegisterHandler(handler func(ctx context.Context, msg *schema.EventMsg) error) -} - -type eventQueueService struct { - Queue chan *schema.EventMsg - Handler func(ctx context.Context, msg *schema.EventMsg) error -} - -func (ns *eventQueueService) Send(ctx context.Context, msg *schema.EventMsg) { - ns.Queue <- msg -} - -func (ns *eventQueueService) RegisterHandler( - handler func(ctx context.Context, msg *schema.EventMsg) error) { - ns.Handler = handler -} - -func (ns *eventQueueService) working() { - go func() { - for msg := range ns.Queue { - log.Debugf("received badge %+v", msg) - if ns.Handler == nil { - log.Warnf("no handler for badge") - continue - } - if err := ns.Handler(context.Background(), msg); err != nil { - log.Error(err) - } - } - }() -} - -// NewEventQueueService create a new badge queue service -func NewEventQueueService() EventQueueService { - ns := &eventQueueService{} - ns.Queue = make(chan *schema.EventMsg, 128) - ns.working() - return ns -} diff --git a/internal/service/eventqueue/event_queue.go b/internal/service/eventqueue/event_queue.go new file mode 100644 index 000000000..8d3a22392 --- /dev/null +++ b/internal/service/eventqueue/event_queue.go @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package eventqueue + +import ( + "github.com/apache/answer/internal/base/queue" + "github.com/apache/answer/internal/schema" +) + +type Service = *queue.Queue[*schema.EventMsg] + +func NewService() Service { + return queue.New[*schema.EventMsg]("event", 128) +} diff --git a/internal/service/meta/meta_service.go b/internal/service/meta/meta_service.go index c1ca7c619..e48e8f468 100644 --- a/internal/service/meta/meta_service.go +++ b/internal/service/meta/meta_service.go @@ -26,7 +26,7 @@ import ( "strconv" "strings" - "github.com/apache/answer/internal/service/event_queue" + "github.com/apache/answer/internal/service/eventqueue" "github.com/apache/answer/internal/base/constant" "github.com/apache/answer/internal/base/handler" @@ -48,7 +48,7 @@ type MetaService struct { userCommon *usercommon.UserCommon questionRepo questioncommon.QuestionRepo answerRepo answercommon.AnswerRepo - eventQueueService event_queue.EventQueueService + eventQueueService eventqueue.Service } func NewMetaService( @@ -56,7 +56,7 @@ func NewMetaService( userCommon *usercommon.UserCommon, answerRepo answercommon.AnswerRepo, questionRepo questioncommon.QuestionRepo, - eventQueueService event_queue.EventQueueService, + eventQueueService eventqueue.Service, ) *MetaService { return &MetaService{ metaCommonService: metaCommonService, diff --git a/internal/service/notice_queue/external_notification_queue.go b/internal/service/notice_queue/external_notification_queue.go deleted file mode 100644 index 6322a77ec..000000000 --- a/internal/service/notice_queue/external_notification_queue.go +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package notice_queue - -import ( - "context" - - "github.com/apache/answer/internal/schema" - "github.com/segmentfault/pacman/log" -) - -type ExternalNotificationQueueService interface { - Send(ctx context.Context, msg *schema.ExternalNotificationMsg) - RegisterHandler(handler func(ctx context.Context, msg *schema.ExternalNotificationMsg) error) -} - -type externalNotificationQueueService struct { - Queue chan *schema.ExternalNotificationMsg - Handler func(ctx context.Context, msg *schema.ExternalNotificationMsg) error -} - -func (ns *externalNotificationQueueService) Send(ctx context.Context, msg *schema.ExternalNotificationMsg) { - ns.Queue <- msg -} - -func (ns *externalNotificationQueueService) RegisterHandler( - handler func(ctx context.Context, msg *schema.ExternalNotificationMsg) error) { - ns.Handler = handler -} - -func (ns *externalNotificationQueueService) working() { - go func() { - for msg := range ns.Queue { - log.Debugf("received notification %+v", msg) - if ns.Handler == nil { - log.Warnf("no handler for notification") - continue - } - if err := ns.Handler(context.Background(), msg); err != nil { - log.Error(err) - } - } - }() -} - -// NewNewQuestionNotificationQueueService create a new notification queue service -func NewNewQuestionNotificationQueueService() ExternalNotificationQueueService { - ns := &externalNotificationQueueService{} - ns.Queue = make(chan *schema.ExternalNotificationMsg, 128) - ns.working() - return ns -} diff --git a/internal/service/notice_queue/notice_queue.go b/internal/service/notice_queue/notice_queue.go deleted file mode 100644 index 22b733e32..000000000 --- a/internal/service/notice_queue/notice_queue.go +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package notice_queue - -import ( - "context" - - "github.com/apache/answer/internal/schema" - "github.com/segmentfault/pacman/log" -) - -type NotificationQueueService interface { - Send(ctx context.Context, msg *schema.NotificationMsg) - RegisterHandler(handler func(ctx context.Context, msg *schema.NotificationMsg) error) -} - -type notificationQueueService struct { - Queue chan *schema.NotificationMsg - Handler func(ctx context.Context, msg *schema.NotificationMsg) error -} - -func (ns *notificationQueueService) Send(ctx context.Context, msg *schema.NotificationMsg) { - ns.Queue <- msg -} - -func (ns *notificationQueueService) RegisterHandler( - handler func(ctx context.Context, msg *schema.NotificationMsg) error) { - ns.Handler = handler -} - -func (ns *notificationQueueService) working() { - go func() { - for msg := range ns.Queue { - log.Debugf("received notification %+v", msg) - if ns.Handler == nil { - log.Warnf("no handler for notification") - continue - } - if err := ns.Handler(context.Background(), msg); err != nil { - log.Error(err) - } - } - }() -} - -// NewNotificationQueueService create a new notification queue service -func NewNotificationQueueService() NotificationQueueService { - ns := ¬ificationQueueService{} - ns.Queue = make(chan *schema.NotificationMsg, 128) - ns.working() - return ns -} diff --git a/internal/service/noticequeue/notice_queue.go b/internal/service/noticequeue/notice_queue.go new file mode 100644 index 000000000..138f9ce61 --- /dev/null +++ b/internal/service/noticequeue/notice_queue.go @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package noticequeue + +import ( + "github.com/apache/answer/internal/base/queue" + "github.com/apache/answer/internal/schema" +) + +type Service = *queue.Queue[*schema.NotificationMsg] + +func NewService() Service { + return queue.New[*schema.NotificationMsg]("notification", 128) +} + +type ExternalService = *queue.Queue[*schema.ExternalNotificationMsg] + +func NewExternalService() ExternalService { + return queue.New[*schema.ExternalNotificationMsg]("external_notification", 128) +} diff --git a/internal/service/notification/external_notification.go b/internal/service/notification/external_notification.go index d6bdd2fb7..425a8c2bb 100644 --- a/internal/service/notification/external_notification.go +++ b/internal/service/notification/external_notification.go @@ -28,7 +28,7 @@ import ( "github.com/apache/answer/internal/schema" "github.com/apache/answer/internal/service/activity_common" "github.com/apache/answer/internal/service/export" - "github.com/apache/answer/internal/service/notice_queue" + "github.com/apache/answer/internal/service/noticequeue" "github.com/apache/answer/internal/service/siteinfo_common" usercommon "github.com/apache/answer/internal/service/user_common" "github.com/apache/answer/internal/service/user_external_login" @@ -42,7 +42,7 @@ type ExternalNotificationService struct { followRepo activity_common.FollowRepo emailService *export.EmailService userRepo usercommon.UserRepo - notificationQueueService notice_queue.ExternalNotificationQueueService + notificationQueueService noticequeue.ExternalService userExternalLoginRepo user_external_login.UserExternalLoginRepo siteInfoService siteinfo_common.SiteInfoCommonService } @@ -53,7 +53,7 @@ func NewExternalNotificationService( followRepo activity_common.FollowRepo, emailService *export.EmailService, userRepo usercommon.UserRepo, - notificationQueueService notice_queue.ExternalNotificationQueueService, + notificationQueueService noticequeue.ExternalService, userExternalLoginRepo user_external_login.UserExternalLoginRepo, siteInfoService siteinfo_common.SiteInfoCommonService, ) *ExternalNotificationService { diff --git a/internal/service/notification_common/notification.go b/internal/service/notification_common/notification.go index 0bbd1865f..166387066 100644 --- a/internal/service/notification_common/notification.go +++ b/internal/service/notification_common/notification.go @@ -35,7 +35,7 @@ import ( "github.com/apache/answer/internal/entity" "github.com/apache/answer/internal/schema" "github.com/apache/answer/internal/service/activity_common" - "github.com/apache/answer/internal/service/notice_queue" + "github.com/apache/answer/internal/service/noticequeue" "github.com/apache/answer/internal/service/object_info" usercommon "github.com/apache/answer/internal/service/user_common" "github.com/apache/answer/pkg/uid" @@ -66,7 +66,7 @@ type NotificationCommon struct { followRepo activity_common.FollowRepo userCommon *usercommon.UserCommon objectInfoService *object_info.ObjService - notificationQueueService notice_queue.NotificationQueueService + notificationQueueService noticequeue.Service userExternalLoginRepo user_external_login.UserExternalLoginRepo siteInfoService siteinfo_common.SiteInfoCommonService } @@ -78,7 +78,7 @@ func NewNotificationCommon( activityRepo activity_common.ActivityRepo, followRepo activity_common.FollowRepo, objectInfoService *object_info.ObjService, - notificationQueueService notice_queue.NotificationQueueService, + notificationQueueService noticequeue.Service, userExternalLoginRepo user_external_login.UserExternalLoginRepo, siteInfoService siteinfo_common.SiteInfoCommonService, ) *NotificationCommon { diff --git a/internal/service/provider.go b/internal/service/provider.go index 65535f41b..f6d954709 100644 --- a/internal/service/provider.go +++ b/internal/service/provider.go @@ -23,7 +23,7 @@ import ( "github.com/apache/answer/internal/service/action" "github.com/apache/answer/internal/service/activity" "github.com/apache/answer/internal/service/activity_common" - "github.com/apache/answer/internal/service/activity_queue" + "github.com/apache/answer/internal/service/activityqueue" answercommon "github.com/apache/answer/internal/service/answer_common" "github.com/apache/answer/internal/service/auth" "github.com/apache/answer/internal/service/badge" @@ -34,14 +34,14 @@ import ( "github.com/apache/answer/internal/service/config" "github.com/apache/answer/internal/service/content" "github.com/apache/answer/internal/service/dashboard" - "github.com/apache/answer/internal/service/event_queue" + "github.com/apache/answer/internal/service/eventqueue" "github.com/apache/answer/internal/service/export" "github.com/apache/answer/internal/service/file_record" "github.com/apache/answer/internal/service/follow" "github.com/apache/answer/internal/service/importer" "github.com/apache/answer/internal/service/meta" metacommon "github.com/apache/answer/internal/service/meta_common" - "github.com/apache/answer/internal/service/notice_queue" + "github.com/apache/answer/internal/service/noticequeue" "github.com/apache/answer/internal/service/notification" notficationcommon "github.com/apache/answer/internal/service/notification_common" "github.com/apache/answer/internal/service/object_info" @@ -114,14 +114,14 @@ var ProviderSetService = wire.NewSet( user_external_login.NewUserCenterLoginService, plugin_common.NewPluginCommonService, config.NewConfigService, - notice_queue.NewNotificationQueueService, - activity_queue.NewActivityQueueService, + noticequeue.NewService, + activityqueue.NewService, user_notification_config.NewUserNotificationConfigService, notification.NewExternalNotificationService, - notice_queue.NewNewQuestionNotificationQueueService, + noticequeue.NewExternalService, review.NewReviewService, meta.NewMetaService, - event_queue.NewEventQueueService, + eventqueue.NewService, badge.NewBadgeService, badge.NewBadgeEventService, badge.NewBadgeAwardService, diff --git a/internal/service/question_common/question.go b/internal/service/question_common/question.go index 846dea894..557a5db15 100644 --- a/internal/service/question_common/question.go +++ b/internal/service/question_common/question.go @@ -34,7 +34,7 @@ import ( "github.com/apache/answer/internal/base/handler" "github.com/apache/answer/internal/base/reason" "github.com/apache/answer/internal/service/activity_common" - "github.com/apache/answer/internal/service/activity_queue" + "github.com/apache/answer/internal/service/activityqueue" "github.com/apache/answer/internal/service/config" metacommon "github.com/apache/answer/internal/service/meta_common" "github.com/apache/answer/internal/service/revision" @@ -103,7 +103,7 @@ type QuestionCommon struct { AnswerCommon *answercommon.AnswerCommon metaCommonService *metacommon.MetaCommonService configService *config.ConfigService - activityQueueService activity_queue.ActivityQueueService + activityQueueService activityqueue.Service revisionRepo revision.RevisionRepo siteInfoService siteinfo_common.SiteInfoCommonService data *data.Data @@ -119,7 +119,7 @@ func NewQuestionCommon(questionRepo QuestionRepo, answerCommon *answercommon.AnswerCommon, metaCommonService *metacommon.MetaCommonService, configService *config.ConfigService, - activityQueueService activity_queue.ActivityQueueService, + activityQueueService activityqueue.Service, revisionRepo revision.RevisionRepo, siteInfoService siteinfo_common.SiteInfoCommonService, data *data.Data, diff --git a/internal/service/report/report_service.go b/internal/service/report/report_service.go index d32ccdabf..84c15d597 100644 --- a/internal/service/report/report_service.go +++ b/internal/service/report/report_service.go @@ -22,7 +22,7 @@ package report import ( "encoding/json" - "github.com/apache/answer/internal/service/event_queue" + "github.com/apache/answer/internal/service/eventqueue" "github.com/apache/answer/internal/base/constant" "github.com/apache/answer/internal/base/handler" @@ -57,7 +57,7 @@ type ReportService struct { commentCommonRepo comment_common.CommentCommonRepo reportHandle *report_handle.ReportHandle configService *config.ConfigService - eventQueueService event_queue.EventQueueService + eventQueueService eventqueue.Service } // NewReportService new report service @@ -70,7 +70,7 @@ func NewReportService( commentCommonRepo comment_common.CommentCommonRepo, reportHandle *report_handle.ReportHandle, configService *config.ConfigService, - eventQueueService event_queue.EventQueueService, + eventQueueService eventqueue.Service, ) *ReportService { return &ReportService{ reportRepo: reportRepo, diff --git a/internal/service/review/review_service.go b/internal/service/review/review_service.go index a23b9ee43..bbb142894 100644 --- a/internal/service/review/review_service.go +++ b/internal/service/review/review_service.go @@ -29,7 +29,7 @@ import ( "github.com/apache/answer/internal/schema" answercommon "github.com/apache/answer/internal/service/answer_common" commentcommon "github.com/apache/answer/internal/service/comment_common" - "github.com/apache/answer/internal/service/notice_queue" + "github.com/apache/answer/internal/service/noticequeue" "github.com/apache/answer/internal/service/object_info" questioncommon "github.com/apache/answer/internal/service/question_common" "github.com/apache/answer/internal/service/role" @@ -66,8 +66,8 @@ type ReviewService struct { userRoleService *role.UserRoleRelService tagCommon *tagcommon.TagCommonService questionCommon *questioncommon.QuestionCommon - externalNotificationQueueService notice_queue.ExternalNotificationQueueService - notificationQueueService notice_queue.NotificationQueueService + externalNotificationQueueService noticequeue.ExternalService + notificationQueueService noticequeue.Service siteInfoService siteinfo_common.SiteInfoCommonService commentCommonRepo commentcommon.CommentCommonRepo } @@ -81,10 +81,10 @@ func NewReviewService( questionRepo questioncommon.QuestionRepo, answerRepo answercommon.AnswerRepo, userRoleService *role.UserRoleRelService, - externalNotificationQueueService notice_queue.ExternalNotificationQueueService, + externalNotificationQueueService noticequeue.ExternalService, tagCommon *tagcommon.TagCommonService, questionCommon *questioncommon.QuestionCommon, - notificationQueueService notice_queue.NotificationQueueService, + notificationQueueService noticequeue.Service, siteInfoService siteinfo_common.SiteInfoCommonService, commentCommonRepo commentcommon.CommentCommonRepo, ) *ReviewService { diff --git a/internal/service/tag/tag_service.go b/internal/service/tag/tag_service.go index e61bfa06e..640f06b69 100644 --- a/internal/service/tag/tag_service.go +++ b/internal/service/tag/tag_service.go @@ -25,7 +25,7 @@ import ( "strings" "github.com/apache/answer/internal/base/constant" - "github.com/apache/answer/internal/service/activity_queue" + "github.com/apache/answer/internal/service/activityqueue" "github.com/apache/answer/internal/service/revision_common" "github.com/apache/answer/internal/service/siteinfo_common" tagcommonser "github.com/apache/answer/internal/service/tag_common" @@ -50,7 +50,7 @@ type TagService struct { revisionService *revision_common.RevisionService followCommon activity_common.FollowRepo siteInfoService siteinfo_common.SiteInfoCommonService - activityQueueService activity_queue.ActivityQueueService + activityQueueService activityqueue.Service } // NewTagService new tag service @@ -60,7 +60,7 @@ func NewTagService( revisionService *revision_common.RevisionService, followCommon activity_common.FollowRepo, siteInfoService siteinfo_common.SiteInfoCommonService, - activityQueueService activity_queue.ActivityQueueService, + activityQueueService activityqueue.Service, ) *TagService { return &TagService{ tagRepo: tagRepo, diff --git a/internal/service/tag_common/tag_common.go b/internal/service/tag_common/tag_common.go index 87c10bcc9..9ca8e100f 100644 --- a/internal/service/tag_common/tag_common.go +++ b/internal/service/tag_common/tag_common.go @@ -33,7 +33,7 @@ import ( "github.com/apache/answer/internal/base/validator" "github.com/apache/answer/internal/entity" "github.com/apache/answer/internal/schema" - "github.com/apache/answer/internal/service/activity_queue" + "github.com/apache/answer/internal/service/activityqueue" "github.com/apache/answer/internal/service/revision_common" "github.com/apache/answer/internal/service/siteinfo_common" "github.com/apache/answer/pkg/converter" @@ -89,7 +89,7 @@ type TagCommonService struct { tagRelRepo TagRelRepo tagRepo TagRepo siteInfoService siteinfo_common.SiteInfoCommonService - activityQueueService activity_queue.ActivityQueueService + activityQueueService activityqueue.Service } // NewTagCommonService new tag service @@ -99,7 +99,7 @@ func NewTagCommonService( tagRepo TagRepo, revisionService *revision_common.RevisionService, siteInfoService siteinfo_common.SiteInfoCommonService, - activityQueueService activity_queue.ActivityQueueService, + activityQueueService activityqueue.Service, ) *TagCommonService { return &TagCommonService{ tagCommonRepo: tagCommonRepo,