refactor(perf): user first heartbeats query

This commit is contained in:
Ferdinand Mütsch
2024-11-25 20:21:37 +01:00
parent 24751ea2d0
commit f162accfb4
4 changed files with 158 additions and 162 deletions

View File

@@ -134,20 +134,22 @@ func (r *HeartbeatRepository) GetLatestByFilters(user *models.User, filterMap ma
func (r *HeartbeatRepository) GetFirstByUsers() ([]*models.TimeByUser, error) { func (r *HeartbeatRepository) GetFirstByUsers() ([]*models.TimeByUser, error) {
var result []*models.TimeByUser var result []*models.TimeByUser
r.db.Model(&models.User{}). r.db.Raw("with agg as (select " + utils.QuoteSql(r.db, "user_id, min(time) as %s", "time") + " from heartbeats group by user_id) " +
Select(utils.QuoteSql(r.db, "users.id as %s, min(time) as %s", "user", "time")). "select " + utils.QuoteSql(r.db, "id as %s, time ", "user") +
Joins("left join heartbeats on users.id = heartbeats.user_id"). "from users " +
Group("users.id"). "left join agg on agg.user_id = id " +
"order by users.id").
Scan(&result) Scan(&result)
return result, nil return result, nil
} }
func (r *HeartbeatRepository) GetLastByUsers() ([]*models.TimeByUser, error) { func (r *HeartbeatRepository) GetLastByUsers() ([]*models.TimeByUser, error) {
var result []*models.TimeByUser var result []*models.TimeByUser
r.db.Model(&models.User{}). r.db.Raw("with agg as (select " + utils.QuoteSql(r.db, "user_id, max(time) as %s", "time") + " from heartbeats group by user_id) " +
Select(utils.QuoteSql(r.db, "users.id as %s, max(time) as %s", "user", "time")). "select " + utils.QuoteSql(r.db, "id as %s, time ", "user") +
Joins("left join heartbeats on users.id = heartbeats.user_id"). "from users " +
Group("user"). "left join agg on agg.user_id = id " +
"order by users.id").
Scan(&result) Scan(&result)
return result, nil return result, nil
} }

View File

@@ -68,14 +68,14 @@ func (srv *AggregationService) AggregateSummaries(userIds datastructure.Set[stri
slog.Info("generating summaries") slog.Info("generating summaries")
// Get a map from user ids to the time of their latest summary or nil if none exists yet // Get a map from user ids to the time of their latest summary or nil if none exists yet
lastUserSummaryTimes, err := srv.summaryService.GetLatestByUser() lastUserSummaryTimes, err := srv.summaryService.GetLatestByUser() // TODO: build user-specific variant of this query for efficiency
if err != nil { if err != nil {
config.Log().Error("error occurred", "error", err) config.Log().Error("error occurred", "error", err)
return err return err
} }
// Get a map from user ids to the time of their earliest heartbeats or nil if none exists yet // Get a map from user ids to the time of their earliest heartbeats or nil if none exists yet
firstUserHeartbeatTimes, err := srv.heartbeatService.GetFirstByUsers() firstUserHeartbeatTimes, err := srv.heartbeatService.GetFirstByUsers() // TODO: build user-specific variant of this query for efficiency
if err != nil { if err != nil {
config.Log().Error("error occurred", "error", err) config.Log().Error("error occurred", "error", err)
return err return err

View File

@@ -1,205 +1,205 @@
package services package services
import ( import (
"github.com/duke-git/lancet/v2/slice" "github.com/duke-git/lancet/v2/slice"
"github.com/muety/artifex/v2" "github.com/muety/artifex/v2"
"github.com/muety/wakapi/config" "github.com/muety/wakapi/config"
"github.com/muety/wakapi/models" "github.com/muety/wakapi/models"
"github.com/muety/wakapi/utils" "github.com/muety/wakapi/utils"
"log/slog" "log/slog"
"time" "time"
) )
type HousekeepingService struct { type HousekeepingService struct {
config *config.Config config *config.Config
userSrvc IUserService userSrvc IUserService
heartbeatSrvc IHeartbeatService heartbeatSrvc IHeartbeatService
summarySrvc ISummaryService summarySrvc ISummaryService
queueDefault *artifex.Dispatcher queueDefault *artifex.Dispatcher
queueWorkers *artifex.Dispatcher queueWorkers *artifex.Dispatcher
} }
func NewHousekeepingService(userService IUserService, heartbeatService IHeartbeatService, summaryService ISummaryService) *HousekeepingService { func NewHousekeepingService(userService IUserService, heartbeatService IHeartbeatService, summaryService ISummaryService) *HousekeepingService {
return &HousekeepingService{ return &HousekeepingService{
config: config.Get(), config: config.Get(),
userSrvc: userService, userSrvc: userService,
heartbeatSrvc: heartbeatService, heartbeatSrvc: heartbeatService,
summarySrvc: summaryService, summarySrvc: summaryService,
queueDefault: config.GetDefaultQueue(), queueDefault: config.GetDefaultQueue(),
queueWorkers: config.GetQueue(config.QueueHousekeeping), queueWorkers: config.GetQueue(config.QueueHousekeeping),
} }
} }
func (s *HousekeepingService) Schedule() { func (s *HousekeepingService) Schedule() {
s.scheduleDataCleanups() s.scheduleDataCleanups()
s.scheduleInactiveUsersCleanup() s.scheduleInactiveUsersCleanup()
if s.config.App.WarmCaches { if s.config.App.WarmCaches {
s.scheduleProjectStatsCacheWarming() s.scheduleProjectStatsCacheWarming()
} }
} }
func (s *HousekeepingService) CleanUserDataBefore(user *models.User, before time.Time) error { func (s *HousekeepingService) CleanUserDataBefore(user *models.User, before time.Time) error {
slog.Warn("cleaning up user data older than", "userID", user.ID, "date", before) slog.Warn("cleaning up user data older than", "userID", user.ID, "date", before)
if s.config.App.DataCleanupDryRun { if s.config.App.DataCleanupDryRun {
slog.Info("skipping actual data deletion for dry run", "userID", user.ID) slog.Info("skipping actual data deletion for dry run", "userID", user.ID)
return nil return nil
} }
// clear old heartbeats // clear old heartbeats
if err := s.heartbeatSrvc.DeleteByUserBefore(user, before); err != nil { if err := s.heartbeatSrvc.DeleteByUserBefore(user, before); err != nil {
return err return err
} }
// clear old summaries // clear old summaries
slog.Info("clearing summaries for user older than", "userID", user.ID, "date", before) slog.Info("clearing summaries for user older than", "userID", user.ID, "date", before)
if err := s.summarySrvc.DeleteByUserBefore(user.ID, before); err != nil { if err := s.summarySrvc.DeleteByUserBefore(user.ID, before); err != nil {
return err return err
} }
return nil return nil
} }
func (s *HousekeepingService) CleanInactiveUsers(before time.Time) error { func (s *HousekeepingService) CleanInactiveUsers(before time.Time) error {
slog.Info("cleaning up users inactive since", "date", before) slog.Info("cleaning up users inactive since", "date", before)
users, err := s.userSrvc.GetAll() users, err := s.userSrvc.GetAll()
if err != nil { if err != nil {
return err return err
} }
var i int var i int
for _, u := range users { for _, u := range users {
if u.LastLoggedInAt.T().After(before) || u.HasData { if u.LastLoggedInAt.T().After(before) || u.HasData {
continue continue
} }
slog.Warn("deleting user due to inactivity and no data", "userID", u.ID) slog.Warn("deleting user due to inactivity and no data", "userID", u.ID)
if err := s.userSrvc.Delete(u); err != nil { if err := s.userSrvc.Delete(u); err != nil {
config.Log().Error("failed to delete user", "userID", u.ID) config.Log().Error("failed to delete user", "userID", u.ID)
} else { } else {
i++ i++
} }
} }
slog.Info("deleted users due to inactivity", "deletedCount", i, "totalCount", len(users)) slog.Info("deleted users due to inactivity", "deletedCount", i, "totalCount", len(users))
return nil return nil
} }
func (s *HousekeepingService) WarmUserProjectStatsCache(user *models.User) error { func (s *HousekeepingService) WarmUserProjectStatsCache(user *models.User) error {
slog.Info("pre-warming project stats cache for user", "userID", user.ID) slog.Info("pre-warming project stats cache for user", "userID", user.ID)
if _, err := s.heartbeatSrvc.GetUserProjectStats(user, time.Time{}, utils.BeginOfToday(time.Local), nil, true); err != nil { if _, err := s.heartbeatSrvc.GetUserProjectStats(user, time.Time{}, utils.BeginOfToday(time.Local), nil, true); err != nil {
config.Log().Error("failed to pre-warm project stats cache", "userID", user.ID, "error", err) config.Log().Error("failed to pre-warm project stats cache", "userID", user.ID, "error", err)
} }
return nil return nil
} }
func (s *HousekeepingService) runWarmProjectStatsCache() { func (s *HousekeepingService) runWarmProjectStatsCache() {
// fetch active users // fetch active users
users, err := s.userSrvc.GetActive(false) users, err := s.userSrvc.GetActive(false)
if err != nil { if err != nil {
config.Log().Error("failed to get active users for project stats cache warming", "error", err) config.Log().Error("failed to get active users for project stats cache warming", "error", err)
return return
} }
// fetch user heartbeat counts // fetch user heartbeat counts
userHeartbeatCounts, err := s.heartbeatSrvc.CountByUsers(users) userHeartbeatCounts, err := s.heartbeatSrvc.CountByUsers(users)
if err != nil { if err != nil {
config.Log().Error("failed to count user heartbeats for project stats cache warming", "error", err) config.Log().Error("failed to count user heartbeats for project stats cache warming", "error", err)
return return
} }
// schedule jobs // schedule jobs
for _, c := range userHeartbeatCounts { for _, c := range userHeartbeatCounts {
// only warm cache for users with >= 100k heartbeats (where calculation is expected to take unbearably long) // only warm cache for users with >= 100k heartbeats (where calculation is expected to take unbearably long)
if c.Count < 100_000 { if c.Count < 100_000 {
continue continue
} }
user, _ := slice.FindBy[*models.User](users, func(i int, u *models.User) bool { user, _ := slice.FindBy[*models.User](users, func(i int, u *models.User) bool {
return u.ID == c.User return u.ID == c.User
}) })
s.queueWorkers.Dispatch(func() { s.queueWorkers.Dispatch(func() {
if err := s.WarmUserProjectStatsCache(user); err != nil { if err := s.WarmUserProjectStatsCache(user); err != nil {
config.Log().Error("failed to pre-warm project stats cache", "userID", user.ID) config.Log().Error("failed to pre-warm project stats cache", "userID", user.ID)
} }
}) })
} }
} }
func (s *HousekeepingService) runCleanData() { func (s *HousekeepingService) runCleanData() {
// fetch all users // fetch all users
users, err := s.userSrvc.GetAll() users, err := s.userSrvc.GetAll()
if err != nil { if err != nil {
config.Log().Error("failed to get users for data cleanup", "error", err) config.Log().Error("failed to get users for data cleanup", "error", err)
return return
} }
// schedule jobs // schedule jobs
for _, u := range users { for _, u := range users {
// don't clean data for subscribed users or when they otherwise have unlimited data access // don't clean data for subscribed users or when they otherwise have unlimited data access
if u.MinDataAge().IsZero() { if u.MinDataAge().IsZero() {
continue continue
} }
user := *u user := *u
s.queueWorkers.Dispatch(func() { s.queueWorkers.Dispatch(func() {
if err := s.CleanUserDataBefore(&user, user.MinDataAge()); err != nil { if err := s.CleanUserDataBefore(&user, user.MinDataAge()); err != nil {
config.Log().Error("failed to clear old user data", "userID", user.ID) config.Log().Error("failed to clear old user data", "userID", user.ID)
} }
}) })
} }
} }
func (s *HousekeepingService) runCleanInactiveUsers() { func (s *HousekeepingService) runCleanInactiveUsers() {
s.queueWorkers.Dispatch(func() { s.queueWorkers.Dispatch(func() {
if s.config.App.MaxInactiveMonths <= 0 { if s.config.App.MaxInactiveMonths <= 0 {
return return
} }
if err := s.CleanInactiveUsers(time.Now().AddDate(0, -s.config.App.MaxInactiveMonths, 0)); err != nil { if err := s.CleanInactiveUsers(time.Now().AddDate(0, -s.config.App.MaxInactiveMonths, 0)); err != nil {
config.Log().Error("failed to clean up inactive users", "error", err) config.Log().Error("failed to clean up inactive users", "error", err)
} }
}) })
} }
// individual scheduling functions // individual scheduling functions
func (s *HousekeepingService) scheduleDataCleanups() { func (s *HousekeepingService) scheduleDataCleanups() {
if s.config.App.DataRetentionMonths <= 0 { if s.config.App.DataRetentionMonths <= 0 {
return return
} }
slog.Info("scheduling data cleanup") slog.Info("scheduling data cleanup")
_, err := s.queueDefault.DispatchCron(s.runCleanData, s.config.App.DataCleanupTime) _, err := s.queueDefault.DispatchCron(s.runCleanData, s.config.App.DataCleanupTime)
if err != nil { if err != nil {
config.Log().Error("failed to dispatch data cleanup jobs", "error", err) config.Log().Error("failed to dispatch data cleanup jobs", "error", err)
} }
} }
func (s *HousekeepingService) scheduleInactiveUsersCleanup() { func (s *HousekeepingService) scheduleInactiveUsersCleanup() {
if s.config.App.MaxInactiveMonths <= 0 { if s.config.App.MaxInactiveMonths <= 0 {
return return
} }
slog.Info("scheduling inactive users cleanup") slog.Info("scheduling inactive users cleanup")
_, err := s.queueDefault.DispatchCron(s.runCleanInactiveUsers, s.config.App.DataCleanupTime) _, err := s.queueDefault.DispatchCron(s.runCleanInactiveUsers, s.config.App.DataCleanupTime)
if err != nil { if err != nil {
config.Log().Error("failed to dispatch inactive users cleanup job", "error", err) config.Log().Error("failed to dispatch inactive users cleanup job", "error", err)
} }
} }
func (s *HousekeepingService) scheduleProjectStatsCacheWarming() { func (s *HousekeepingService) scheduleProjectStatsCacheWarming() {
slog.Info("scheduling project stats cache pre-warming") slog.Info("scheduling project stats cache pre-warming")
_, err := s.queueDefault.DispatchEvery(s.runWarmProjectStatsCache, 12*time.Hour) _, err := s.queueDefault.DispatchEvery(s.runWarmProjectStatsCache, 12*time.Hour)
if err != nil { if err != nil {
config.Log().Error("failed to dispatch pre-warming project stats cache", "error", err) config.Log().Error("failed to dispatch pre-warming project stats cache", "error", err)
} }
// run once initially, 1 min after start // run once initially, 1 min after start
if !s.config.QuickStart { if !s.config.QuickStart {
if err := s.queueDefault.DispatchIn(s.runWarmProjectStatsCache, 1*time.Minute); err != nil { if err := s.queueDefault.DispatchIn(s.runWarmProjectStatsCache, 1*time.Minute); err != nil {
config.Log().Error("failed to dispatch pre-warming project stats cache", "error", err) config.Log().Error("failed to dispatch pre-warming project stats cache", "error", err)
} }
} }
} }

View File

@@ -66,22 +66,16 @@ func (s stringWriter) WriteString(str string) (int, error) {
// QuoteDbIdentifier quotes a column name used in a query. // QuoteDbIdentifier quotes a column name used in a query.
func QuoteDbIdentifier(db *gorm.DB, identifier string) string { func QuoteDbIdentifier(db *gorm.DB, identifier string) string {
builder := stringWriter{Builder: &strings.Builder{}} builder := stringWriter{Builder: &strings.Builder{}}
db.Dialector.QuoteTo(builder, identifier) db.Dialector.QuoteTo(builder, identifier)
return builder.Builder.String() return builder.Builder.String()
} }
// QuoteSql quotes a SQL statement with the given identifiers. // QuoteSql quotes a SQL statement with the given identifiers.
func QuoteSql(db *gorm.DB, queryTemplate string, identifiers ...string) string { func QuoteSql(db *gorm.DB, queryTemplate string, identifiers ...string) string {
quotedIdentifiers := make([]interface{}, len(identifiers)) quotedIdentifiers := make([]interface{}, len(identifiers))
for i, identifier := range identifiers { for i, identifier := range identifiers {
quotedIdentifiers[i] = QuoteDbIdentifier(db, identifier) quotedIdentifiers[i] = QuoteDbIdentifier(db, identifier)
} }
return fmt.Sprintf(queryTemplate, quotedIdentifiers...) return fmt.Sprintf(queryTemplate, quotedIdentifiers...)
} }