Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalized merging function #72

Merged
merged 6 commits into from
Oct 29, 2022
Merged

Generalized merging function #72

merged 6 commits into from
Oct 29, 2022

Conversation

kelindar
Copy link
Owner

@kelindar kelindar commented Oct 29, 2022

This PR generalizes Merge() function and removes now redundant Add() functions for numbers. When Merge() function is called on numerical columns without specifying a custom merge option on the column, the behavior of Add() will be observed (i.e. increments by default).

Example

In this example we are creating a column called location that contains a JSON-encoded position and velocity. The position is updated by calling a MergeString() function with the velocity vector, the updates are then merged atomically using a merging function specified.

The merge happens when transaction is committed to ensure consistency. Hence, this technique allows for two concurrent transactions to update the same position.

// Movement represents a movement with a position and velocity
type Movement struct {
	Position [2]float64 `json:"position,omitempty"`
	Velocity [2]float64 `json:"velocity,omitempty"`
}

func main() {

	// A merging function that accepts a velocity vector and updates
	// the movement structure accordingly.
	mergeVectors := func(value, delta string) string {
		movement, ok := parseMovement(value)
		if !ok {
			movement = Movement{
				Position: [2]float64{0, 0},
			}
		}

		// Parse the incoming delta value
		velocity, ok := parseVector(delta)
		if !ok {
			return value
		}

		// Update the current velocity and recalculate the position
		movement.Velocity = velocity
		movement.Position[0] += velocity[0] // Update X
		movement.Position[1] += velocity[1] // Update Y

		// Encode the movement as JSON and return the updated value
		return encode(movement)
	}

	// Create a column with a specified merge function
	db := column.NewCollection()
	db.CreateColumn("location", column.ForString(
		column.WithMerge(mergeVectors), // use our merging function
	))

	// Insert an empty row
	id, _ := db.Insert(func(r column.Row) error {
		r.SetString("location", "{}")
		return nil
	})

	// Update 20 times
	for i := 0; i < 20; i++ {

		// Move the location by applying a same velocity vector
		db.Query(func(txn *column.Txn) error {
			location := txn.String("location")
			return txn.QueryAt(id, func(r column.Row) error {
				location.Merge(encode([2]float64{1, 2}))
				return nil
			})
		})

		// Print out current location
		db.Query(func(txn *column.Txn) error {
			location := txn.String("location")
			return txn.QueryAt(id, func(r column.Row) error {
				value, _ := location.Get()
				fmt.Printf("%.2d: %v \n", i, value)
				return nil
			})
		})
	}

}

// parseMovement parses a value string into a Movement struct
func parseMovement(value string) (out Movement, ok bool) {
	if err := json.Unmarshal([]byte(value), &out); err != nil {
		return Movement{}, false
	}
	return out, true
}

// parseVector parses a value string into 2 dimensional array
func parseVector(value string) (out [2]float64, ok bool) {
	if err := json.Unmarshal([]byte(value), &out); err != nil {
		return [2]float64{}, false
	}
	return out, true
}

// encodes encodes the value as JSON
func encode(value any) string {
	encoded, _ := json.Marshal(value)
	return string(encoded)
}

@kelindar kelindar merged commit 7d654c0 into main Oct 29, 2022
@kelindar kelindar deleted the merge branch October 29, 2022 19:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant