diff --git a/pkg/store/backends/postgres/neighbors_backend.go b/pkg/store/backends/postgres/neighbors_backend.go index 2191a15..3c99c1e 100644 --- a/pkg/store/backends/postgres/neighbors_backend.go +++ b/pkg/store/backends/postgres/neighbors_backend.go @@ -32,23 +32,35 @@ func (b *NeighborsBackend) SetNeighbors( sourceID string, neighbors api.Neighbors, ) error { - // Clear current neighbors now := time.Now().UTC() + + tx, err := b.pool.BeginTx(ctx, pgx.TxOptions{ + IsoLevel: pgx.ReadCommitted, + }) + if err != nil { + return err + } + defer tx.Rollback(ctx) + + // Clear current neighbors + if err := b.clear(ctx, tx, sourceID, now); err != nil { + return err + } + + // Set neighbors for _, n := range neighbors { - if err := b.persist(ctx, sourceID, n, now); err != nil { + if err := b.persist(ctx, tx, sourceID, n, now); err != nil { return err } } - // Remove old neighbors - if err := b.deleteStale(ctx, sourceID, now); err != nil { - return err - } - return nil + + return tx.Commit(ctx) } // Private persist saves a neighbor to the database func (b *NeighborsBackend) persist( ctx context.Context, + tx pgx.Tx, sourceID string, neighbor *api.Neighbor, now time.Time, @@ -57,27 +69,22 @@ func (b *NeighborsBackend) persist( INSERT INTO neighbors ( id, rs_id, neighbor, updated_at ) VALUES ( $1, $2, $3, $4 ) - ON CONFLICT ON CONSTRAINT neighbors_pkey DO UPDATE - SET neighbor = EXCLUDED.neighbor, - updated_at = EXCLUDED.updated_at ` - _, err := b.pool.Exec(ctx, qry, neighbor.ID, sourceID, neighbor, now) + _, err := tx.Exec(ctx, qry, neighbor.ID, sourceID, neighbor, now) return err } -// Private deleteStale removes all neighbors not inserted or -// updated at a specific time. -func (b *NeighborsBackend) deleteStale( +// Private clear removes all neighbors for a RS +func (b *NeighborsBackend) clear( ctx context.Context, + tx pgx.Tx, sourceID string, t time.Time, ) error { qry := ` - DELETE FROM neighbors - WHERE rs_id = $1 - AND updated_at <> $2 + DELETE FROM neighbors WHERE rs_id = $1 ` - _, err := b.pool.Exec(ctx, qry, sourceID, t) + _, err := tx.Exec(ctx, qry, sourceID, t) return err } @@ -85,6 +92,7 @@ func (b *NeighborsBackend) deleteStale( // for a given sourceID func (b *NeighborsBackend) queryNeighborsAt( ctx context.Context, + tx pgx.Tx, sourceID string, ) (pgx.Rows, error) { qry := ` @@ -92,7 +100,7 @@ func (b *NeighborsBackend) queryNeighborsAt( FROM neighbors WHERE rs_id = $1 ` - return b.pool.Query(ctx, qry, sourceID) + return tx.Query(ctx, qry, sourceID) } // GetNeighborsAt retrieves all neighbors associated @@ -101,7 +109,15 @@ func (b *NeighborsBackend) GetNeighborsAt( ctx context.Context, sourceID string, ) (api.Neighbors, error) { - rows, err := b.queryNeighborsAt(ctx, sourceID) + tx, err := b.pool.BeginTx(ctx, pgx.TxOptions{ + IsoLevel: pgx.ReadCommitted, + }) + if err != nil { + return nil, err + } + defer tx.Rollback(ctx) + + rows, err := b.queryNeighborsAt(ctx, tx, sourceID) if err != nil { return nil, err } @@ -123,7 +139,14 @@ func (b *NeighborsBackend) GetNeighborsMapAt( ctx context.Context, sourceID string, ) (map[string]*api.Neighbor, error) { - rows, err := b.queryNeighborsAt(ctx, sourceID) + tx, err := b.pool.BeginTx(ctx, pgx.TxOptions{ + IsoLevel: pgx.ReadCommitted, + }) + if err != nil { + return nil, err + } + defer tx.Rollback(ctx) + rows, err := b.queryNeighborsAt(ctx, tx, sourceID) if err != nil { return nil, err } @@ -144,12 +167,18 @@ func (b *NeighborsBackend) CountNeighborsAt( ctx context.Context, sourceID string, ) (int, error) { + tx, err := b.pool.BeginTx(ctx, pgx.TxOptions{ + IsoLevel: pgx.ReadCommitted, + }) + if err != nil { + return 0, err + } + defer tx.Rollback(ctx) qry := ` - SELECT COUNT(1) FROM neighbors - WHERE rs_id = $1 + SELECT COUNT(1) FROM neighbors WHERE rs_id = $1 ` count := 0 - err := b.pool.QueryRow(ctx, qry, sourceID).Scan(&count) + err = tx.QueryRow(ctx, qry, sourceID).Scan(&count) if err != nil { return 0, err }