chore(perf): generate and persist durations incrementally

This commit is contained in:
Ferdinand Mütsch
2025-02-21 07:35:58 +01:00
parent 77bb01020f
commit f9835fde71
5 changed files with 38 additions and 6 deletions

View File

@@ -26,6 +26,11 @@ func (m *DurationRepositoryMock) GetAllWithinByFilters(t time.Time, t2 time.Time
return args.Get(0).([]*models.Duration), args.Error(1)
}
func (m *DurationRepositoryMock) GetLatestByUser(u *models.User) (*models.Duration, error) {
args := m.Called(u)
return args.Get(0).(*models.Duration), args.Error(1)
}
func (m *DurationRepositoryMock) StreamAllWithin(t time.Time, t2 time.Time, u *models.User) (chan *models.Duration, error) {
args := m.Called(t, t2, u)
return args.Get(0).(chan *models.Duration), args.Error(1)

View File

@@ -40,6 +40,16 @@ func (r *DurationRepository) GetAllWithinByFilters(from, to time.Time, user *mod
return durations, nil
}
func (r *DurationRepository) GetLatestByUser(user *models.User) (*models.Duration, error) {
var duration *models.Duration
err := r.db.
Where(&models.Duration{UserID: user.ID}).
Order("time desc").
First(&duration).
Error
return duration, err
}
// note: streaming is only sensible if results are aggregated in some way in the calling function, otherwise we'll end up with an entire list anyway
func (r *DurationRepository) StreamAllWithin(from, to time.Time, user *models.User) (chan *models.Duration, error) {

View File

@@ -51,6 +51,7 @@ type IDurationRepository interface {
InsertBatch([]*models.Duration) error
GetAllWithin(time.Time, time.Time, *models.User) ([]*models.Duration, error)
GetAllWithinByFilters(time.Time, time.Time, *models.User, map[string][]string) ([]*models.Duration, error)
GetLatestByUser(*models.User) (*models.Duration, error)
StreamAllWithin(time.Time, time.Time, *models.User) (chan *models.Duration, error)
StreamAllWithinByFilters(time.Time, time.Time, *models.User, map[string][]string) (chan *models.Duration, error)
DeleteByUser(*models.User) error

View File

@@ -0,0 +1,5 @@
SELECT table_schema name,
ROUND(SUM(data_length + index_length) / 1024 / 1024, 1) size
FROM information_schema.tables
GROUP BY table_schema
ORDER BY size DESC;

View File

@@ -98,16 +98,27 @@ func (srv *DurationService) Get(from, to time.Time, user *models.User, filters *
func (srv *DurationService) Regenerate(user *models.User, forceAll bool) {
slog.Info("generating ephemeral durations for user up until now", "user", user.ID)
durations, err := srv.Get(time.Time{}, time.Now(), user, nil, forceAll)
if err != nil {
config.Log().Error("failed to Regenerate ephemeral durations for user up until now", "user", user.ID, "error", err)
return
var from time.Time
latest, err := srv.durationRepository.GetLatestByUser(user)
if err == nil && latest != nil {
from = latest.TimeEnd()
}
if err := srv.durationRepository.DeleteByUser(user); err != nil {
config.Log().Error("failed to delete old durations while generating ephemeral new ones", "user", user.ID, "error", err)
durations, err := srv.Get(from, time.Now(), user, nil, forceAll)
if err != nil {
config.Log().Error("failed to regenerate ephemeral durations for user up until now", "user", user.ID, "error", err)
return
}
if len(durations) > 0 && durations[0].Time.T().Before(from) && !forceAll {
config.Log().Warn("got generated duration before requested min date", "user", user.ID, "time", durations[0].Time.T(), "group_hash", durations[0].GroupHash, "min_date", from)
}
if forceAll {
if err := srv.durationRepository.DeleteByUser(user); err != nil {
config.Log().Error("failed to delete old durations while generating ephemeral new ones", "user", user.ID, "error", err)
return
}
}
if err := srv.durationRepository.InsertBatch(durations); err != nil {
config.Log().Error("failed to persist new ephemeral durations for user", "user", user.ID, "error", err)