refactor(wip): drop heartbeat padding in summary aggregation (see #675)

This commit is contained in:
Ferdinand Mütsch
2025-02-19 16:32:27 +01:00
parent 2b3f1d9ef4
commit 8d3a049f4d
9 changed files with 571 additions and 584 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -12,9 +12,10 @@ import (
)
const (
DefaultHeartbeatsTimeout = 2 * time.Minute
MinHeartbeatsTimeout = 30 * time.Second
MaxHeartbeatsTimeout = 5 * time.Minute
DefaultHeartbeatsTimeout = 10 * time.Minute
DefaultHeartbeatsTimeoutLegacy = 2 * time.Minute
MinHeartbeatsTimeout = 1 * time.Minute
MaxHeartbeatsTimeout = 1 * time.Hour
)
func init() {
@@ -49,7 +50,7 @@ type User struct {
StripeCustomerId string `json:"-"`
InvitedBy string `json:"-"`
ExcludeUnknownProjects bool `json:"-"`
HeartbeatsTimeoutSec int `json:"-" gorm:"default:120"` // https://github.com/muety/wakapi/issues/156
HeartbeatsTimeoutSec int `json:"-" gorm:"default:600"` // https://github.com/muety/wakapi/issues/156
}
type Login struct {

View File

@@ -33,7 +33,6 @@ func (r *HeartbeatRepository) GetAll() ([]*models.Heartbeat, error) {
}
func (r *HeartbeatRepository) InsertBatch(heartbeats []*models.Heartbeat) error {
// sqlserver on conflict has bug https://github.com/go-gorm/sqlserver/issues/100
// As a workaround, insert one by one, and ignore duplicate key error
if r.db.Dialector.Name() == (sqlserver.Dialector{}).Name() {

View File

@@ -376,10 +376,11 @@ func (h *SettingsHandler) actionUpdateHeartbeatsTimeout(w http.ResponseWriter, r
defer h.userSrvc.FlushCache()
val, err := strconv.ParseInt(r.PostFormValue("heartbeats_timeout"), 0, 0)
if dur := time.Duration(val) * time.Second; err != nil || dur < models.MinHeartbeatsTimeout || dur > models.MaxHeartbeatsTimeout {
dur := time.Duration(val) * time.Minute
if err != nil || dur < models.MinHeartbeatsTimeout || dur > models.MaxHeartbeatsTimeout {
return actionResult{http.StatusBadRequest, "", "invalid input", nil}
}
user.HeartbeatsTimeoutSec = int(val)
user.HeartbeatsTimeoutSec = int(dur.Seconds())
if _, err := h.userSrvc.Update(user); err != nil {
return actionResult{http.StatusInternalServerError, "", "internal sever error", nil}

View File

@@ -1,9 +1,13 @@
SELECT project, language, editor, operating_system, machine, branch, SUM(GREATEST(1, diff)) as 'sum'
FROM (
SELECT project, language, editor, operating_system, machine, branch, TIME_TO_SEC(LEAST(TIMEDIFF(time, LAG(time) over w), '00:02:00')) as 'diff'
FROM heartbeats
WHERE user_id = 'n1try'
WINDOW w AS (ORDER BY time)
) s2
SELECT project, language, editor, operating_system, machine, branch, SUM(diff) as 'sum'
FROM (SELECT project,
language,
editor,
operating_system,
machine,
branch,
TIME_TO_SEC(LEAST(TIMEDIFF(time, LAG(time) over w), '00:00:00')) as 'diff' -- time constant ~ heartbeats padding (none by default, formerly 2 mins)
FROM heartbeats
WHERE user_id = 'n1try'
WINDOW w AS (ORDER BY time)) s2
WHERE diff IS NOT NULL
GROUP BY project, language, editor, operating_system, machine, branch;

View File

@@ -1,9 +1,14 @@
SELECT project, language, editor, operating_system, machine, branch, SUM(GREATEST(1, diff)) as 'sum'
FROM (
SELECT project, language, editor, operating_system, machine, branch, EXTRACT(EPOCH FROM LEAST(time - LAG(time) OVER w, INTERVAL '2 minutes')) as diff
FROM heartbeats
WHERE user_id = 'n1try'
WINDOW w AS (ORDER BY time)
) s2
SELECT project, language, editor, operating_system, machine, branch, SUM(diff) as sum
FROM (SELECT project,
language,
editor,
operating_system,
machine,
branch,
EXTRACT(EPOCH FROM
LEAST(time - LAG(time) OVER w, INTERVAL '0 minutes')) as diff -- time constant ~ heartbeats padding (none by default, formerly 2 mins)
FROM heartbeats
WHERE user_id = 'n1try'
WINDOW w AS (ORDER BY time)) s2
WHERE diff IS NOT NULL
GROUP BY project, language, editor, operating_system, machine, branch;

View File

@@ -1,13 +1,15 @@
package services
import (
"github.com/duke-git/lancet/v2/condition"
"github.com/duke-git/lancet/v2/datetime"
"github.com/duke-git/lancet/v2/mathutil"
"github.com/muety/wakapi/config"
"github.com/muety/wakapi/models"
"time"
)
const heartbeatPadding = 0 * time.Second
type DurationService struct {
config *config.Config
heartbeatService IHeartbeatService
@@ -30,11 +32,11 @@ func (srv *DurationService) Get(from, to time.Time, user *models.User, filters *
}
// Aggregation
// the below logic is approximately equivalent to the SQL query at scripts/aggregate_durations_mysql.sql
// a postgres-compatible script was contributed by @cwilby and is available at scripts/aggregate_durations_postgres.sql
// i'm hesitant to replicate that logic for sqlite and mssql too (because probably painful to impossible), but we could
// think about adding a distrinctio here to use pure-sql aggregation for mysql and postgres, and traditional, programmatic
// aggregation for all other databases
// The below logic is approximately (no filtering, no "same day"-check) equivalent to the SQL query at scripts/aggregate_durations_mysql.sql.
// A Postgres-compatible script was contributed by @cwilby and is available at scripts/aggregate_durations_postgres.sql
// I'm hesitant to replicate that logic for sqlite and mssql too (because probably painful to impossible), but we could
// think about adding a distinction here to use pure-sql aggregation for MySQL and Postgres, and traditional, programmatic
// aggregation for all other databases.
var count int
var latest *models.Duration
@@ -43,39 +45,31 @@ func (srv *DurationService) Get(from, to time.Time, user *models.User, filters *
for _, h := range heartbeats {
d1 := models.NewDurationFromHeartbeat(h).WithEntityIgnored().Hashed()
// initialize map entry
if list, ok := mapping[d1.GroupHash]; !ok || len(list) < 1 {
mapping[d1.GroupHash] = []*models.Duration{d1}
mapping[d1.GroupHash] = []*models.Duration{}
}
// first heartbeat
if latest == nil {
mapping[d1.GroupHash] = append(mapping[d1.GroupHash], d1)
latest = d1
continue
}
// Skip heartbeats that span across two adjacent summaries (assuming there are no more than 1 summary per day).
// This is relevant to prevent the time difference between generating summaries from raw heartbeats and aggregating pre-generated summaries.
// For the latter case, the very last heartbeat of a day won't be counted, so we don't want to count it here either
sameDay := datetime.BeginOfDay(d1.Time.T()) == datetime.BeginOfDay(latest.Time.T())
dur := time.Duration(mathutil.Min(
int64(d1.Time.T().Sub(latest.Time.T().Add(latest.Duration))),
int64(heartbeatsTimeout),
))
dur := condition.Ternary[bool, time.Duration](sameDay, d1.Time.T().Sub(latest.Time.T().Add(latest.Duration)), 0)
latest.Duration += condition.Ternary[bool, time.Duration](dur < heartbeatsTimeout, dur, heartbeatPadding)
// skip heartbeats that span across two adjacent summaries (assuming there are no more than 1 summary per day)
// this is relevant to prevent the time difference between generating summaries from raw heartbeats and aggregating pre-generated summaries
// for the latter case, the very last heartbeat of a day won't be counted, so we don't want to count it here either
// another option would be to adapt the Summarize() method to always append up to DefaultHeartbeatsTimeout seconds to a day's very last duration
if !sameDay {
dur = 0
}
latest.Duration += dur
// start new "group" if:
// (a) heartbeats were too far apart each other,
// (b) if they are of a different entity or,
// (c) if they span across two days
// Start new "group" if:
// (a) heartbeats were too far apart each other or,
// (b) they are of a different entity or,
// (c) they span across two days
if dur >= heartbeatsTimeout || latest.GroupHash != d1.GroupHash || !sameDay {
list := mapping[d1.GroupHash]
if d0 := list[len(list)-1]; d0 != d1 {
mapping[d1.GroupHash] = append(mapping[d1.GroupHash], d1)
}
mapping[d1.GroupHash] = append(mapping[d1.GroupHash], d1)
latest = d1
} else {
latest.NumHeartbeats++
@@ -88,31 +82,23 @@ func (srv *DurationService) Get(from, to time.Time, user *models.User, filters *
for _, list := range mapping {
for _, d := range list {
// even when filters are applied, we'll still have to compute the whole summary first and then filter out non-matching durations
// if we fetched only matching heartbeats in the first place, there will be false positive gaps (see DefaultHeartbeatsTimeout)
// in case the user worked on different projects in parallel
// see https://github.com/muety/wakapi/issues/535
// Even when filters are applied, we'll still have to compute the whole summary first and then filter out non-matching durations.
// If we fetched only matching heartbeats in the first place, there will be false positive gaps (see heartbeatsTimeout)
// in case the user worked on different projects in parallel.
// See https://github.com/muety/wakapi/issues/535, https://github.com/muety/wakapi/issues/716
if filters != nil && !filters.MatchDuration(d) {
continue
}
if user.ExcludeUnknownProjects && d.Project == "" {
continue
}
// will only happen if two heartbeats with different hashes (e.g. different project) have the same timestamp
// that, in turn, will most likely only happen for mysql, where `time` column's precision was set to second for a while
// assume that two non-identical heartbeats with identical time are sub-second apart from each other, so round up to expectancy value
// also see https://github.com/muety/wakapi/issues/340
if d.Duration == 0 {
d.Duration = 500 * time.Millisecond
}
durations = append(durations, d)
}
}
if len(heartbeats) == 1 && len(durations) == 1 {
durations[0].Duration = heartbeatsTimeout
durations[0].Duration = heartbeatPadding
}
return durations.Sorted(), nil

View File

@@ -45,7 +45,7 @@ type DurationServiceTestSuite struct {
}
func (suite *DurationServiceTestSuite) SetupSuite() {
suite.TestUser = &models.User{ID: TestUserId}
suite.TestUser = &models.User{ID: TestUserId, HeartbeatsTimeoutSec: int(models.DefaultHeartbeatsTimeoutLegacy / time.Second)}
// https://anchr.io/i/F0HEK.jpg
suite.TestStartTime = time.Unix(0, MinUnixTime1)
@@ -160,7 +160,7 @@ func (suite *DurationServiceTestSuite) TestDurationService_Get() {
assert.Nil(suite.T(), err)
assert.Len(suite.T(), durations, 1)
assert.Equal(suite.T(), models.DefaultHeartbeatsTimeout, durations.First().Duration)
assert.Equal(suite.T(), time.Duration(0), durations.First().Duration)
assert.Equal(suite.T(), 1, durations.First().NumHeartbeats)
/* TEST 3 */
@@ -171,7 +171,7 @@ func (suite *DurationServiceTestSuite) TestDurationService_Get() {
assert.Nil(suite.T(), err)
assert.Len(suite.T(), durations, 3)
assert.Equal(suite.T(), 150*time.Second, durations[0].Duration)
assert.Equal(suite.T(), 30*time.Second, durations[0].Duration)
assert.Equal(suite.T(), 20*time.Second, durations[1].Duration)
assert.Equal(suite.T(), 15*time.Second, durations[2].Duration)
assert.Equal(suite.T(), TestEditorGoland, durations[0].Editor)
@@ -198,6 +198,8 @@ func (suite *DurationServiceTestSuite) TestDurationService_Get_Filtered() {
durations, err = sut.Get(from, to, suite.TestUser, models.NewFiltersWith(models.SummaryEditor, TestEditorGoland))
assert.Nil(suite.T(), err)
assert.Len(suite.T(), durations, 2)
assert.Equal(suite.T(), 30*time.Second, durations[0].Duration)
assert.Equal(suite.T(), 20*time.Second, durations[1].Duration)
for _, d := range durations {
assert.Equal(suite.T(), TestEditorGoland, d.Editor)
}
@@ -213,7 +215,7 @@ func (suite *DurationServiceTestSuite) TestDurationService_Get_CustomTimeout() {
)
defer func() {
suite.TestUser.HeartbeatsTimeoutSec = int(models.DefaultHeartbeatsTimeout / time.Second) // revert to defaults
suite.TestUser.HeartbeatsTimeoutSec = int(models.DefaultHeartbeatsTimeoutLegacy / time.Second) // revert to defaults
}()
from, to = suite.TestStartTime, suite.TestStartTime.Add(1*time.Hour)
@@ -224,7 +226,7 @@ func (suite *DurationServiceTestSuite) TestDurationService_Get_CustomTimeout() {
durations, _ = sut.Get(from, to, suite.TestUser, nil)
assert.Len(suite.T(), durations, 3)
assert.Equal(suite.T(), 90*time.Second, durations[0].Duration)
assert.Equal(suite.T(), 30*time.Second, durations[0].Duration)
assert.Equal(suite.T(), 20*time.Second, durations[1].Duration)
assert.Equal(suite.T(), 15*time.Second, durations[2].Duration)
assert.Equal(suite.T(), 3, durations[0].NumHeartbeats)
@@ -236,7 +238,7 @@ func (suite *DurationServiceTestSuite) TestDurationService_Get_CustomTimeout() {
durations, _ = sut.Get(from, to, suite.TestUser, nil)
assert.Len(suite.T(), durations, 3)
assert.Equal(suite.T(), 160*time.Second, durations[0].Duration)
assert.Equal(suite.T(), 30*time.Second, durations[0].Duration)
assert.Equal(suite.T(), 20*time.Second, durations[1].Duration)
assert.Equal(suite.T(), 15*time.Second, durations[2].Duration)
assert.Equal(suite.T(), 3, durations[0].NumHeartbeats)
@@ -244,7 +246,7 @@ func (suite *DurationServiceTestSuite) TestDurationService_Get_CustomTimeout() {
assert.Equal(suite.T(), 3, durations[2].NumHeartbeats)
/* Test 3 */
suite.TestUser.HeartbeatsTimeoutSec = 300
suite.TestUser.HeartbeatsTimeoutSec = 140
durations, _ = sut.Get(from, to, suite.TestUser, nil)
assert.Len(suite.T(), durations, 2)

View File

@@ -466,8 +466,8 @@
<div class="flex flex-col flex-grow gap-y-1">
<label class="font-semibold text-gray-300" for="heartbeats_timeout">Timeout / offset (seconds)</label>
<div class="flex gap-x-2 items-center">
<input class="input-default" type="number" id="heartbeats_timeout" name="heartbeats_timeout" style="max-width: 100px;" placeholder="120" min="30" max="300" step="10" required value="{{ .User.HeartbeatsTimeoutSec }}">
<span class="text-gray-600 text-sm">(min. 30 seconds, max. 5 minutes)</span>
<input class="input-default" type="number" id="heartbeats_timeout" name="heartbeats_timeout" style="max-width: 100px;" placeholder="1" min="1" max="60" step="1" required value="{{ .User.HeartbeatsTimeoutSec }}">
<span class="text-gray-600 text-sm">(min. 1 min, max. 60 min)</span>
</div>
</div>
<button type="submit" class="btn-primary h-min">Save</button>