mirror of
https://github.com/alice-lg/alice-lg.git
synced 2024-05-11 05:55:03 +00:00
use transactions
This commit is contained in:
@ -32,23 +32,35 @@ func (b *NeighborsBackend) SetNeighbors(
|
||||
sourceID string,
|
||||
neighbors api.Neighbors,
|
||||
) error {
|
||||
// Clear current neighbors
|
||||
now := time.Now().UTC()
|
||||
|
||||
tx, err := b.pool.BeginTx(ctx, pgx.TxOptions{
|
||||
IsoLevel: pgx.ReadCommitted,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback(ctx)
|
||||
|
||||
// Clear current neighbors
|
||||
if err := b.clear(ctx, tx, sourceID, now); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Set neighbors
|
||||
for _, n := range neighbors {
|
||||
if err := b.persist(ctx, sourceID, n, now); err != nil {
|
||||
if err := b.persist(ctx, tx, sourceID, n, now); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// Remove old neighbors
|
||||
if err := b.deleteStale(ctx, sourceID, now); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
||||
return tx.Commit(ctx)
|
||||
}
|
||||
|
||||
// Private persist saves a neighbor to the database
|
||||
func (b *NeighborsBackend) persist(
|
||||
ctx context.Context,
|
||||
tx pgx.Tx,
|
||||
sourceID string,
|
||||
neighbor *api.Neighbor,
|
||||
now time.Time,
|
||||
@ -57,27 +69,22 @@ func (b *NeighborsBackend) persist(
|
||||
INSERT INTO neighbors (
|
||||
id, rs_id, neighbor, updated_at
|
||||
) VALUES ( $1, $2, $3, $4 )
|
||||
ON CONFLICT ON CONSTRAINT neighbors_pkey DO UPDATE
|
||||
SET neighbor = EXCLUDED.neighbor,
|
||||
updated_at = EXCLUDED.updated_at
|
||||
`
|
||||
_, err := b.pool.Exec(ctx, qry, neighbor.ID, sourceID, neighbor, now)
|
||||
_, err := tx.Exec(ctx, qry, neighbor.ID, sourceID, neighbor, now)
|
||||
return err
|
||||
}
|
||||
|
||||
// Private deleteStale removes all neighbors not inserted or
|
||||
// updated at a specific time.
|
||||
func (b *NeighborsBackend) deleteStale(
|
||||
// Private clear removes all neighbors for a RS
|
||||
func (b *NeighborsBackend) clear(
|
||||
ctx context.Context,
|
||||
tx pgx.Tx,
|
||||
sourceID string,
|
||||
t time.Time,
|
||||
) error {
|
||||
qry := `
|
||||
DELETE FROM neighbors
|
||||
WHERE rs_id = $1
|
||||
AND updated_at <> $2
|
||||
DELETE FROM neighbors WHERE rs_id = $1
|
||||
`
|
||||
_, err := b.pool.Exec(ctx, qry, sourceID, t)
|
||||
_, err := tx.Exec(ctx, qry, sourceID, t)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -85,6 +92,7 @@ func (b *NeighborsBackend) deleteStale(
|
||||
// for a given sourceID
|
||||
func (b *NeighborsBackend) queryNeighborsAt(
|
||||
ctx context.Context,
|
||||
tx pgx.Tx,
|
||||
sourceID string,
|
||||
) (pgx.Rows, error) {
|
||||
qry := `
|
||||
@ -92,7 +100,7 @@ func (b *NeighborsBackend) queryNeighborsAt(
|
||||
FROM neighbors
|
||||
WHERE rs_id = $1
|
||||
`
|
||||
return b.pool.Query(ctx, qry, sourceID)
|
||||
return tx.Query(ctx, qry, sourceID)
|
||||
}
|
||||
|
||||
// GetNeighborsAt retrieves all neighbors associated
|
||||
@ -101,7 +109,15 @@ func (b *NeighborsBackend) GetNeighborsAt(
|
||||
ctx context.Context,
|
||||
sourceID string,
|
||||
) (api.Neighbors, error) {
|
||||
rows, err := b.queryNeighborsAt(ctx, sourceID)
|
||||
tx, err := b.pool.BeginTx(ctx, pgx.TxOptions{
|
||||
IsoLevel: pgx.ReadCommitted,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer tx.Rollback(ctx)
|
||||
|
||||
rows, err := b.queryNeighborsAt(ctx, tx, sourceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -123,7 +139,14 @@ func (b *NeighborsBackend) GetNeighborsMapAt(
|
||||
ctx context.Context,
|
||||
sourceID string,
|
||||
) (map[string]*api.Neighbor, error) {
|
||||
rows, err := b.queryNeighborsAt(ctx, sourceID)
|
||||
tx, err := b.pool.BeginTx(ctx, pgx.TxOptions{
|
||||
IsoLevel: pgx.ReadCommitted,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer tx.Rollback(ctx)
|
||||
rows, err := b.queryNeighborsAt(ctx, tx, sourceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -144,12 +167,18 @@ func (b *NeighborsBackend) CountNeighborsAt(
|
||||
ctx context.Context,
|
||||
sourceID string,
|
||||
) (int, error) {
|
||||
tx, err := b.pool.BeginTx(ctx, pgx.TxOptions{
|
||||
IsoLevel: pgx.ReadCommitted,
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer tx.Rollback(ctx)
|
||||
qry := `
|
||||
SELECT COUNT(1) FROM neighbors
|
||||
WHERE rs_id = $1
|
||||
SELECT COUNT(1) FROM neighbors WHERE rs_id = $1
|
||||
`
|
||||
count := 0
|
||||
err := b.pool.QueryRow(ctx, qry, sourceID).Scan(&count)
|
||||
err = tx.QueryRow(ctx, qry, sourceID).Scan(&count)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
Reference in New Issue
Block a user