chore: replace precomputed user heartbeat ranges by grouping view query

This commit is contained in:
Ferdinand Mütsch
2025-09-07 21:45:42 +02:00
parent 73d7213656
commit 2082624132
14 changed files with 2703 additions and 2558 deletions

View File

@@ -35,7 +35,6 @@ const (
KeyLatestTotalUsers = "latest_total_users"
KeyLastImport = "last_import" // import attempt
KeyLastImportSuccess = "last_successful_import" // last actual successful import
KeyFirstHeartbeat = "first_heartbeat"
KeySubscriptionNotificationSent = "sub_reminder"
KeyNewsbox = "newsbox"
KeyInviteCode = "invite"

File diff suppressed because it is too large Load Diff

View File

@@ -232,7 +232,7 @@ func main() {
shieldV1BadgeHandler := shieldsV1Routes.NewBadgeHandler(summaryService, userService)
// MVC Handlers
summaryHandler := routes.NewSummaryHandler(summaryService, userService, keyValueService, durationService, aliasService)
summaryHandler := routes.NewSummaryHandler(summaryService, userService, heartbeatService, durationService, aliasService)
settingsHandler := routes.NewSettingsHandler(userService, heartbeatService, durationService, summaryService, aliasService, aggregationService, languageMappingService, projectLabelService, keyValueService, mailService)
subscriptionHandler := routes.NewSubscriptionHandler(userService, mailService, keyValueService)
projectsHandler := routes.NewProjectsHandler(userService, heartbeatService)

View File

@@ -0,0 +1,51 @@
package migrations
import (
"github.com/muety/wakapi/config"
"github.com/muety/wakapi/utils"
"gorm.io/gorm"
)
func init() {
const name = "20250907-add_user_heartbeats_range_view"
f := migrationFunc{
name: name,
background: true,
f: func(db *gorm.DB, cfg *config.Config) error {
if hasRun(name, db) {
return nil
}
const q = "select u.id as user_id, min(h.time) as first, max(h.time) as last " +
"from users u left join heartbeats h on u.id = h.user_id " +
"group by u.id"
if err := db.Transaction(func(tx *gorm.DB) error {
// https://stackoverflow.com/a/1236008/3112139
if cfg.Db.IsSQLite() {
if err := tx.Migrator().DropView("user_heartbeats_range"); err != nil {
return err
}
}
if err := tx.Migrator().CreateView("user_heartbeats_range", gorm.ViewOption{
Query: db.Raw(q),
Replace: !cfg.Db.IsSQLite(),
}); err != nil {
return err
}
if err := tx.Exec("delete from key_string_values where "+utils.QuoteSql(db, "%s like ?", "key"), "first_heartbeat_%").Error; err != nil {
return err
}
return nil
}); err != nil {
return err
}
setHasRun(name, db)
return nil
},
}
registerPostMigration(f)
}

View File

@@ -57,11 +57,31 @@ func (m *HeartbeatServiceMock) StreamAllWithinByFilters(t time.Time, t2 time.Tim
return args.Get(0).(chan *models.Heartbeat), args.Error(1)
}
func (m *HeartbeatServiceMock) GetFirstByUsers() ([]*models.TimeByUser, error) {
func (m *HeartbeatServiceMock) GetFirstAll() ([]*models.TimeByUser, error) {
args := m.Called()
return args.Get(0).([]*models.TimeByUser), args.Error(1)
}
func (m *HeartbeatServiceMock) GetLastAll() ([]*models.TimeByUser, error) {
args := m.Called()
return args.Get(0).([]*models.TimeByUser), args.Error(1)
}
func (m *HeartbeatServiceMock) GetFirstByUser(u *models.User) (time.Time, error) {
args := m.Called(u)
return args.Get(0).(time.Time), args.Error(1)
}
func (m *HeartbeatServiceMock) GetLastByUser(u *models.User) (time.Time, error) {
args := m.Called(u)
return args.Get(0).(time.Time), args.Error(1)
}
func (m *HeartbeatServiceMock) GetRangeByUser(u *models.User) (*models.RangeByUser, error) {
args := m.Called(u)
return args.Get(0).(*models.RangeByUser), args.Error(1)
}
func (m *HeartbeatServiceMock) GetLatestByUser(user *models.User) (*models.Heartbeat, error) {
args := m.Called(user)
return args.Get(0).(*models.Heartbeat), args.Error(1)

View File

@@ -96,6 +96,12 @@ type UserDataUpdate struct {
PublicLeaderboard bool `schema:"public_leaderboard"`
}
type RangeByUser struct {
User string
First CustomTime
Last CustomTime
}
type TimeByUser struct {
User string
Time CustomTime

View File

@@ -168,26 +168,22 @@ func (r *HeartbeatRepository) GetLatestByFilters(user *models.User, filterMap ma
return heartbeat, nil
}
func (r *HeartbeatRepository) GetFirstByUsers() ([]*models.TimeByUser, error) {
func (r *HeartbeatRepository) GetFirstAll() ([]*models.TimeByUser, error) {
var result []*models.TimeByUser
r.db.Raw("with agg as (select " + utils.QuoteSql(r.db, "user_id, min(time) as %s", "time") + " from heartbeats group by user_id) " +
"select " + utils.QuoteSql(r.db, "id as %s, time ", "user") +
"from users " +
"left join agg on agg.user_id = id " +
"order by users.id").
Scan(&result)
return result, nil
err := r.db.Raw("select user_id as user, first as time from user_heartbeats_range").Scan(&result).Error
return result, err
}
func (r *HeartbeatRepository) GetLastByUsers() ([]*models.TimeByUser, error) {
func (r *HeartbeatRepository) GetLastAll() ([]*models.TimeByUser, error) {
var result []*models.TimeByUser
r.db.Raw("with agg as (select " + utils.QuoteSql(r.db, "user_id, max(time) as %s", "time") + " from heartbeats group by user_id) " +
"select " + utils.QuoteSql(r.db, "id as %s, time ", "user") +
"from users " +
"left join agg on agg.user_id = id " +
"order by users.id").
Scan(&result)
return result, nil
err := r.db.Raw("select user_id as user, last as time from user_heartbeats_range").Scan(&result).Error
return result, err
}
func (r *HeartbeatRepository) GetRangeByUser(user *models.User) (*models.RangeByUser, error) {
var result *models.RangeByUser
err := r.db.Raw("select user_id as user, first, last from user_heartbeats_range where user_id = ?", user.ID).Scan(&result).Error
return result, err
}
func (r *HeartbeatRepository) Count(approximate bool) (count int64, err error) {

View File

@@ -32,8 +32,9 @@ type IHeartbeatRepository interface {
GetWithin(time.Time, time.Time, *models.User) ([]*models.Heartbeat, error)
GetAllWithinByFilters(time.Time, time.Time, *models.User, map[string][]string) ([]*models.Heartbeat, error)
GetLatestByFilters(*models.User, map[string][]string) (*models.Heartbeat, error)
GetFirstByUsers() ([]*models.TimeByUser, error)
GetLastByUsers() ([]*models.TimeByUser, error)
GetFirstAll() ([]*models.TimeByUser, error)
GetLastAll() ([]*models.TimeByUser, error)
GetRangeByUser(*models.User) (*models.RangeByUser, error)
GetLatestByUser(*models.User) (*models.Heartbeat, error)
GetLatestByOriginAndUser(string, *models.User) (*models.Heartbeat, error)
StreamWithin(time.Time, time.Time, *models.User) (chan *models.Heartbeat, error)

View File

@@ -949,15 +949,20 @@ func (h *SettingsHandler) buildViewModel(r *http.Request, w http.ResponseWriter,
}
// user first data
var firstData time.Time
firstDataKv := h.keyValueSrvc.MustGetString(fmt.Sprintf("%s_%s", conf.KeyFirstHeartbeat, user.ID))
if firstDataKv.Value != "" {
firstData, _ = time.Parse(time.RFC822Z, firstDataKv.Value)
firstData, err := h.heartbeatSrvc.GetFirstByUser(user)
if err != nil {
conf.Log().Request(r).Error("error while user's heartbeats range", "user", user.ID, "error", err)
return &view.SettingsViewModel{
SharedLoggedInViewModel: view.SharedLoggedInViewModel{
SharedViewModel: view.NewSharedViewModel(h.config, &view.Messages{Error: criticalError}),
User: user,
},
}
}
// invite link
inviteCode := getVal[string](args, valueInviteCode, "")
inviteLink := condition.TernaryOperator[bool, string](inviteCode == "", "", fmt.Sprintf("%s/signup?invite=%s", h.config.Server.GetPublicUrl(), inviteCode))
inviteLink := condition.Ternary[bool, string](inviteCode == "", "", fmt.Sprintf("%s/signup?invite=%s", h.config.Server.GetPublicUrl(), inviteCode))
vm := &view.SettingsViewModel{
SharedLoggedInViewModel: view.SharedLoggedInViewModel{

View File

@@ -22,22 +22,22 @@ const (
)
type SummaryHandler struct {
config *conf.Config
userSrvc services.IUserService
summarySrvc services.ISummaryService
durationSrvc services.IDurationService
aliasSrvc services.IAliasService
keyValueSrvc services.IKeyValueService
config *conf.Config
userSrvc services.IUserService
summarySrvc services.ISummaryService
durationSrvc services.IDurationService
aliasSrvc services.IAliasService
heartbeatsSrvc services.IHeartbeatService
}
func NewSummaryHandler(summaryService services.ISummaryService, userService services.IUserService, keyValueService services.IKeyValueService, durationService services.IDurationService, aliasService services.IAliasService) *SummaryHandler {
func NewSummaryHandler(summaryService services.ISummaryService, userService services.IUserService, heartbeatsService services.IHeartbeatService, durationService services.IDurationService, aliasService services.IAliasService) *SummaryHandler {
return &SummaryHandler{
summarySrvc: summaryService,
userSrvc: userService,
keyValueSrvc: keyValueService,
durationSrvc: durationService,
aliasSrvc: aliasService,
config: conf.Get(),
summarySrvc: summaryService,
userSrvc: userService,
heartbeatsSrvc: heartbeatsService,
durationSrvc: durationService,
aliasSrvc: aliasService,
config: conf.Get(),
}
}
@@ -77,8 +77,8 @@ func (h *SummaryHandler) GetIndex(w http.ResponseWriter, r *http.Request) {
summaryParams, _ := helpers.ParseSummaryParams(r)
summary, err, status := su.LoadUserSummary(h.summarySrvc, r)
if err != nil {
w.WriteHeader(status)
conf.Log().Request(r).Error("failed to load summary", "error", err)
w.WriteHeader(status)
templates[conf.SummaryTemplate].Execute(w, h.buildViewModel(r, w).WithError(err.Error()))
return
}
@@ -91,10 +91,12 @@ func (h *SummaryHandler) GetIndex(w http.ResponseWriter, r *http.Request) {
}
// user first data
var firstData time.Time
firstDataKv := h.keyValueSrvc.MustGetString(fmt.Sprintf("%s_%s", conf.KeyFirstHeartbeat, user.ID))
if firstDataKv.Value != "" {
firstData, _ = time.Parse(time.RFC822Z, firstDataKv.Value)
firstData, err := h.heartbeatsSrvc.GetFirstByUser(user)
if err != nil {
conf.Log().Request(r).Error("error while user's heartbeats range", "user", user.ID, "error", err)
w.WriteHeader(http.StatusInternalServerError)
templates[conf.SummaryTemplate].Execute(w, h.buildViewModel(r, w).WithError(err.Error()))
return
}
var timeline []*view.TimelineViewModel

View File

@@ -79,7 +79,7 @@ func (srv *AggregationService) AggregateSummaries(userIds datastructure.Set[stri
}
// Get a map from user ids to the time of their earliest heartbeats or nil if none exists yet
firstUserHeartbeatTimes, err := srv.heartbeatService.GetFirstByUsers() // TODO: build user-specific variant of this query for efficiency
firstUserHeartbeatTimes, err := srv.heartbeatService.GetFirstAll() // TODO: build user-specific variant of this query for efficiency
if err != nil {
config.Log().Error("error occurred", "error", err.Error())
return err

View File

@@ -47,6 +47,7 @@ func NewHeartbeatService(heartbeatRepo repositories.IHeartbeatRepository, langua
srv.cache.IncrementInt64(srv.countByUserCacheKey(heartbeat.UserID), 1) // increment doesn't update expiration time
srv.cache.IncrementInt64(srv.countTotalCacheKey(), 1)
srv.checkInvalidateProjectStatsCache(heartbeat)
srv.checkInvalidateRangeCache(heartbeat)
}
}(&sub1)
@@ -190,8 +191,46 @@ func (srv *HeartbeatService) GetLatestByFilters(user *models.User, filters *mode
return srv.repository.GetLatestByFilters(user, srv.filtersToColumnMap(filters))
}
func (srv *HeartbeatService) GetFirstByUsers() ([]*models.TimeByUser, error) {
return srv.repository.GetFirstByUsers()
func (srv *HeartbeatService) GetFirstAll() ([]*models.TimeByUser, error) {
return srv.repository.GetFirstAll()
}
func (srv *HeartbeatService) GetLastAll() ([]*models.TimeByUser, error) {
return srv.repository.GetLastAll()
}
func (srv *HeartbeatService) GetFirstByUser(user *models.User) (time.Time, error) {
cacheKey := srv.getUserFirstCacheKey(user.ID)
if result, found := srv.cache.Get(cacheKey); found {
return result.(time.Time), nil
}
result, err := srv.repository.GetRangeByUser(user)
if err != nil {
return time.Time{}, err
}
srv.cache.Set(cacheKey, result.First.T(), cache.NoExpiration)
return result.First.T(), nil
}
func (srv *HeartbeatService) GetLastByUser(user *models.User) (time.Time, error) {
cacheKey := srv.getUserLastCacheKey(user.ID)
if result, found := srv.cache.Get(cacheKey); found {
return result.(time.Time), nil
}
result, err := srv.repository.GetRangeByUser(user)
if err != nil {
return time.Time{}, err
}
srv.cache.Set(cacheKey, result.Last.T(), cache.NoExpiration)
return result.Last.T(), nil
}
func (srv *HeartbeatService) GetRangeByUser(user *models.User) (*models.RangeByUser, error) {
return srv.repository.GetRangeByUser(user)
}
func (srv *HeartbeatService) GetEntitySetByUser(entityType uint8, userId string) ([]string, error) {
@@ -311,6 +350,14 @@ func (srv *HeartbeatService) getUserProjectsCacheKey(userId string) string {
return fmt.Sprintf("unique_projects_%s", userId)
}
func (srv *HeartbeatService) getUserFirstCacheKey(userId string) string {
return fmt.Sprintf("user_first_%s", userId)
}
func (srv *HeartbeatService) getUserLastCacheKey(userId string) string {
return fmt.Sprintf("user_last_%s", userId)
}
func (srv *HeartbeatService) updateEntityUserCache(entityType uint8, entityKey string, userId string) {
cacheKey := srv.getEntityUserCacheKey(entityType, userId)
if entities, found := srv.cache.Get(cacheKey); found {
@@ -394,3 +441,17 @@ func (srv *HeartbeatService) checkInvalidateProjectStatsCache(newHeartbeat *mode
go srv.populateUniqueUserProjects(newHeartbeat.UserID)
}
}
func (srv *HeartbeatService) checkInvalidateRangeCache(newHeartbeat *models.Heartbeat) {
keyFirst, keyLast := srv.getUserFirstCacheKey(newHeartbeat.UserID), srv.getUserLastCacheKey(newHeartbeat.UserID)
first, found := srv.cache.Get(keyFirst)
if found && newHeartbeat.Time.T().Before(first.(time.Time)) {
srv.cache.Delete(keyFirst)
}
last, found := srv.cache.Get(keyLast)
if found && newHeartbeat.Time.T().After(last.(time.Time)) {
srv.cache.Delete(keyLast)
}
}

View File

@@ -2,17 +2,18 @@ package services
import (
"fmt"
"github.com/duke-git/lancet/v2/slice"
"github.com/muety/artifex/v2"
"github.com/muety/wakapi/config"
"github.com/muety/wakapi/utils"
"go.uber.org/atomic"
"log/slog"
"strconv"
"strings"
"sync"
"time"
"github.com/duke-git/lancet/v2/slice"
"github.com/muety/artifex/v2"
"github.com/muety/wakapi/config"
"github.com/muety/wakapi/utils"
"go.uber.org/atomic"
"github.com/muety/wakapi/models"
)
@@ -61,11 +62,6 @@ func (srv *MiscService) Schedule() {
config.Log().Error("failed to schedule user counting jobs", "error", err)
}
slog.Info("scheduling first data computing")
if _, err := srv.queueDefault.DispatchEvery(srv.ComputeOldestHeartbeats, computeOldestDataEvery); err != nil {
config.Log().Error("failed to schedule first data computing jobs", "error", err)
}
if srv.config.Subscriptions.Enabled && srv.config.Subscriptions.ExpiryNotifications && srv.config.App.DataRetentionMonths > 0 {
slog.Info("scheduling subscription notifications")
if _, err := srv.queueDefault.DispatchEvery(srv.NotifyExpiringSubscription, notifyExpiringSubscriptionsEvery); err != nil {
@@ -79,11 +75,6 @@ func (srv *MiscService) Schedule() {
config.Log().Error("failed to dispatch user counting jobs", "error", err)
}
}
if !srv.existsUsersFirstData() {
if err := srv.queueDefault.Dispatch(srv.ComputeOldestHeartbeats); err != nil {
config.Log().Error("failed to dispatch first data computing jobs", "error", err)
}
}
if !srv.existsSubscriptionNotifications() && srv.config.Subscriptions.Enabled && srv.config.Subscriptions.ExpiryNotifications && srv.config.App.DataRetentionMonths > 0 {
if err := srv.queueDefault.Dispatch(srv.NotifyExpiringSubscription); err != nil {
config.Log().Error("failed to schedule subscription notification jobs", "error", err)
@@ -141,40 +132,6 @@ func (srv *MiscService) CountTotalTime() {
}(&pendingJobs)
}
func (srv *MiscService) ComputeOldestHeartbeats() {
slog.Info("computing users' first data")
if err := srv.queueWorkers.Dispatch(func() {
if ok := firstDataLock.TryLock(); !ok {
config.Log().Warn("couldn't acquire lock for computing users' first data, job is still pending")
return
}
defer firstDataLock.Unlock()
results, err := srv.heartbeatService.GetFirstByUsers()
if err != nil {
config.Log().Error("failed to compute users' first data", "error", err)
return
}
for _, entry := range results {
if entry.Time.T().IsZero() {
continue
}
kvKey := fmt.Sprintf("%s_%s", config.KeyFirstHeartbeat, entry.User)
if err := srv.keyValueService.PutString(&models.KeyStringValue{
Key: kvKey,
Value: entry.Time.T().Format(time.RFC822Z),
}); err != nil {
config.Log().Error("failed to save user's first heartbeat time", "error", err)
}
}
}); err != nil {
config.Log().Error("failed to enqueue computing first data for user", "error", err)
}
}
// NotifyExpiringSubscription sends a reminder e-mail to all users, notifying them if their subscription has expired or is about to, given these conditions:
// - Data cleanup is enabled on the server (non-zero retention time)
// - Subscriptions are enabled on the server (aka. users can do something about their old data getting cleaned up)
@@ -275,14 +232,6 @@ func (srv *MiscService) existsUsersTotalTime() bool {
return len(results) > 0
}
func (srv *MiscService) existsUsersFirstData() bool {
results, err := srv.keyValueService.GetByPrefix(config.KeyFirstHeartbeat)
if err != nil {
config.Log().Error("failed to fetch first heartbeats key-values", "error", err)
}
return len(results) > 0
}
func (srv *MiscService) existsSubscriptionNotifications() bool {
results, err := srv.keyValueService.GetByPrefix(config.KeySubscriptionNotificationSent)
if err != nil {

View File

@@ -40,7 +40,11 @@ type IHeartbeatService interface {
CountByUsers([]*models.User) ([]*models.CountByUser, error)
GetAllWithin(time.Time, time.Time, *models.User) ([]*models.Heartbeat, error)
GetAllWithinByFilters(time.Time, time.Time, *models.User, *models.Filters) ([]*models.Heartbeat, error)
GetFirstByUsers() ([]*models.TimeByUser, error)
GetFirstAll() ([]*models.TimeByUser, error)
GetLastAll() ([]*models.TimeByUser, error)
GetRangeByUser(*models.User) (*models.RangeByUser, error)
GetFirstByUser(*models.User) (time.Time, error)
GetLastByUser(*models.User) (time.Time, error)
GetLatestByUser(*models.User) (*models.Heartbeat, error)
GetLatestByOriginAndUser(string, *models.User) (*models.Heartbeat, error)
GetLatestByFilters(*models.User, *models.Filters) (*models.Heartbeat, error)