Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions .changeset/loud-cows-prove.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
---
"@effect/platform-node-shared": minor
"@effect/platform-node": minor
"@effect/platform-bun": minor
"@effect/cluster": minor
"@effect/rpc": patch
"@effect/workflow": minor
---

backport @effect/cluster from effect v4

@effect/cluster no longer requires a Shard Manager, and instead relies on the
`RunnerStorage` service to track runner state.

To migrate, remove any Shard Manager deployments and use the updated layers in
`@effect/platform-node` or `@effect/platform-bun`.

# Breaking Changes

- `ShardManager` module has been removed
- `EntityNotManagedByRunner` error has been removed
- Shard locks now use database advisory locks, which requires stable sessions
for database connections. This means load balancers or proxies that rotate
connections may cause issues.
- `@effect/platform-node/NodeClusterSocketRunner` is now
`@effect/cluster/NodeClusterSocket`
- `@effect/platform-node/NodeClusterHttpRunner` is now
`@effect/cluster/NodeClusterHttp`
- `@effect/platform-bun/BunClusterSocketRunner` is now
`@effect/cluster/BunClusterSocket`
- `@effect/platform-bun/BunClusterHttpRunner` is now
`@effect/cluster/BunClusterHttp`

# New Features

- `RunnerHealth.layerK8s` has been added, which uses the Kubernetes API to track
runner health and liveness. To use it, you will need a service account with
permissions to read pod information.
5 changes: 5 additions & 0 deletions .changeset/petite-signs-thank.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/sql-pg": patch
---

disable pg onnotice by default
5 changes: 5 additions & 0 deletions .changeset/plenty-bats-ask.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/platform": patch
---

expose Layer output in HttpLayerRouter.serve
5 changes: 5 additions & 0 deletions .changeset/sad-bags-fall.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/sql-pg": minor
---

Use "pg" npm library for @effect/sql-pg backend
5 changes: 5 additions & 0 deletions .changeset/warm-aliens-dig.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

add experimental HashRing module
24 changes: 0 additions & 24 deletions packages/cluster/src/ClusterError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,30 +45,6 @@ export class EntityNotAssignedToRunner extends Schema.TaggedError<EntityNotAssig
}
}

/**
* Represents an error that occurs when a Runner receives a message for an entity
* that it is not responsible for managing.
*
* @since 1.0.0
* @category errors
*/
export class EntityNotManagedByRunner extends Schema.TaggedError<EntityNotManagedByRunner>()(
"EntityNotManagedByRunner",
{ address: EntityAddress }
) {
/**
* @since 1.0.0
*/
readonly [TypeId] = TypeId

/**
* @since 1.0.0
*/
static is(u: unknown): u is EntityNotManagedByRunner {
return hasProperty(u, TypeId) && isTagged(u, "EntityNotManagedByRunner")
}
}

/**
* Represents an error that occurs when a message fails to be properly
* deserialized by an entity.
Expand Down
28 changes: 12 additions & 16 deletions packages/cluster/src/ClusterMetrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@
*/
import * as Metric from "effect/Metric"

/**
* @since 1.0.0
* @category metrics
*/
export const shards = Metric.gauge("effect_cluster_shards")

/**
* @since 1.0.0
* @category metrics
Expand All @@ -21,28 +15,30 @@ export const entities = Metric.gauge("effect_cluster_entities", {
* @since 1.0.0
* @category metrics
*/
export const singletons = Metric.gauge("effect_cluster_singletons")

/**
* @since 1.0.0
* @category metrics
*/
export const runners = Metric.gauge("effect_cluster_runners")
export const singletons = Metric.gauge("effect_cluster_singletons", {
bigint: true
})

/**
* @since 1.0.0
* @category metrics
*/
export const assignedShards = Metric.gauge("effect_cluster_shards_assigned")
export const runners = Metric.gauge("effect_cluster_runners", {
bigint: true
})

/**
* @since 1.0.0
* @category metrics
*/
export const unassignedShards = Metric.gauge("effect_cluster_shards_unassigned")
export const runnersHealthy = Metric.gauge("effect_cluster_runners_healthy", {
bigint: true
})

/**
* @since 1.0.0
* @category metrics
*/
export const rebalances = Metric.counter("effect_cluster_rebalances")
export const shards = Metric.gauge("effect_cluster_shards", {
bigint: true
})
116 changes: 38 additions & 78 deletions packages/cluster/src/ClusterWorkflowEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,27 @@
* @since 1.0.0
*/
import * as Rpc from "@effect/rpc/Rpc"
import * as RpcServer from "@effect/rpc/RpcServer"
import { DurableDeferred } from "@effect/workflow"
import * as Activity from "@effect/workflow/Activity"
import * as DurableClock from "@effect/workflow/DurableClock"
import * as Workflow from "@effect/workflow/Workflow"
import { WorkflowEngine, WorkflowInstance } from "@effect/workflow/WorkflowEngine"
import * as Arr from "effect/Array"
import * as Cause from "effect/Cause"
import * as Context from "effect/Context"
import * as DateTime from "effect/DateTime"
import * as Duration from "effect/Duration"
import * as Effect from "effect/Effect"
import type * as Exit from "effect/Exit"
import * as Fiber from "effect/Fiber"
import * as FiberId from "effect/FiberId"
import * as Layer from "effect/Layer"
import * as Option from "effect/Option"
import type * as ParseResult from "effect/ParseResult"
import * as PrimaryKey from "effect/PrimaryKey"
import * as RcMap from "effect/RcMap"
import * as Record from "effect/Record"
import type * as Record from "effect/Record"
import * as Runtime from "effect/Runtime"
import * as Schedule from "effect/Schedule"
import * as Schema from "effect/Schema"
Expand Down Expand Up @@ -190,16 +193,7 @@ export const make = Effect.gen(function*() {
times: 3,
schedule: Schedule.exponential(250)
}),
Effect.orDie,
(effect, { activity, attempt, executionId }) =>
Effect.withSpan(effect, "WorkflowEngine.resetActivityAttempt", {
captureStackTrace: false,
attributes: {
name: activity.name,
executionId,
attempt
}
})
Effect.orDie
)

const clearClock = Effect.fnUntraced(function*(options: {
Expand Down Expand Up @@ -260,13 +254,12 @@ export const make = Effect.gen(function*() {
return {
run: (request: Entity.Request<any>) => {
const instance = WorkflowInstance.initial(workflow, executionId)
let payload = request.payload
const payload = request.payload
let parent: { workflowName: string; executionId: string } | undefined
if (payload[payloadParentKey]) {
parent = payload[payloadParentKey]
payload = Record.remove(payload, payloadParentKey)
}
return execute(payload, executionId).pipe(
return execute(workflow.payloadSchema.make(payload), executionId).pipe(
Effect.ensuring(Effect.suspend(() => {
if (!instance.suspended) {
return parent ? ensureSuccess(sendResumeParent(parent)) : Effect.void
Expand All @@ -291,17 +284,17 @@ export const make = Effect.gen(function*() {
) as any
},

activity: Effect.fnUntraced(
function*(request: Entity.Request<any>) {
const activityId = `${executionId}/${request.payload.name}`
activity(request: Entity.Request<any>) {
const activityId = `${executionId}/${request.payload.name}`
const instance = WorkflowInstance.initial(workflow, executionId)
return Effect.gen(function*() {
let entry = activities.get(activityId)
while (!entry) {
const latch = Effect.unsafeMakeLatch()
activityLatches.set(activityId, latch)
yield* latch.await
entry = activities.get(activityId)
}
const instance = WorkflowInstance.initial(workflow, executionId)
const contextMap = new Map(entry.runtime.context.unsafeMap)
contextMap.set(Activity.CurrentAttempt.key, request.payload.attempt)
contextMap.set(WorkflowInstance.key, instance)
Expand All @@ -311,23 +304,29 @@ export const make = Effect.gen(function*() {
runtimeFlags: Runtime.defaultRuntimeFlags
})
return yield* entry.activity.executeEncoded.pipe(
Effect.interruptible,
Effect.onInterrupt(() => {
instance.suspended = true
return Effect.void
}),
Workflow.intoResult,
Effect.provide(runtime),
Effect.ensuring(Effect.sync(() => {
activities.delete(activityId)
}))
Effect.provide(runtime)
)
},
Rpc.wrap({
fork: true,
uninterruptible: true
})
),
}).pipe(
Workflow.intoResult,
Effect.catchAllCause((cause) => {
const interruptors = Cause.interruptors(cause)
// we only want to store explicit interrupts
const ids = Array.from(interruptors, (id) => Array.from(FiberId.ids(id))).flat()
const suspend = ids.includes(RpcServer.fiberIdClientInterrupt.id) ||
ids.includes(RpcServer.fiberIdTransientInterrupt.id)
return suspend ? Effect.succeed(new Workflow.Suspended()) : Effect.failCause(cause)
}),
Effect.provideService(WorkflowInstance, instance),
Effect.provideService(Activity.CurrentAttempt, request.payload.attempt),
Effect.ensuring(Effect.sync(() => {
activities.delete(activityId)
})),
Rpc.wrap({
fork: true,
uninterruptible: true
})
)
},

deferred: Effect.fnUntraced(function*(request: Entity.Request<any>) {
yield* ensureSuccess(resume(workflow, executionId))
Expand Down Expand Up @@ -407,27 +406,10 @@ export const make = Effect.gen(function*() {
times: 3,
schedule: Schedule.exponential(250)
}),
Effect.orDie,
(effect, workflow, executionId) =>
Effect.withSpan(effect, "WorkflowEngine.interrupt", {
captureStackTrace: false,
attributes: {
name: workflow.name,
executionId
}
})
Effect.orDie
),

resume: (workflow, executionId) =>
ensureSuccess(resume(workflow, executionId)).pipe(
Effect.withSpan("WorkflowEngine.resume", {
captureStackTrace: false,
attributes: {
name: workflow.name,
executionId
}
})
),
resume: (workflow, executionId) => ensureSuccess(resume(workflow, executionId)),

activityExecute: Effect.fnUntraced(
function*({ activity, attempt }) {
Expand Down Expand Up @@ -460,15 +442,7 @@ export const make = Effect.gen(function*() {
return result
}
},
Effect.scoped,
(effect, { activity, attempt }) =>
Effect.withSpan(effect, "WorkflowEngine.activityExecute", {
captureStackTrace: false,
attributes: {
name: activity.name,
attempt
}
})
Effect.scoped
),

deferredResult: (deferred) =>
Expand All @@ -493,13 +467,7 @@ export const make = Effect.gen(function*() {
times: 3,
schedule: Schedule.exponential(250)
}),
Effect.orDie,
Effect.withSpan("WorkflowEngine.deferredResult", {
captureStackTrace: false,
attributes: {
name: deferred.name
}
})
Effect.orDie
),

deferredDone: Effect.fnUntraced(
Expand All @@ -512,15 +480,7 @@ export const make = Effect.gen(function*() {
}, { discard: true })
)
},
Effect.scoped,
(effect, { deferredName, executionId }) =>
Effect.withSpan(effect, "WorkflowEngine.deferredDone", {
captureStackTrace: false,
attributes: {
name: deferredName,
executionId
}
})
Effect.scoped
),

scheduleClock(options) {
Expand Down
9 changes: 2 additions & 7 deletions packages/cluster/src/Entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,7 @@ import * as Predicate from "effect/Predicate"
import type * as Schedule from "effect/Schedule"
import { Scope } from "effect/Scope"
import type * as Stream from "effect/Stream"
import type {
AlreadyProcessingMessage,
EntityNotManagedByRunner,
MailboxFull,
PersistenceError
} from "./ClusterError.js"
import type { AlreadyProcessingMessage, MailboxFull, PersistenceError } from "./ClusterError.js"
import { ShardGroup } from "./ClusterSchema.js"
import { EntityAddress } from "./EntityAddress.js"
import type { EntityId } from "./EntityId.js"
Expand Down Expand Up @@ -114,7 +109,7 @@ export interface Entity<
entityId: string
) => RpcClient.RpcClient.From<
Rpcs,
MailboxFull | AlreadyProcessingMessage | PersistenceError | EntityNotManagedByRunner
MailboxFull | AlreadyProcessingMessage | PersistenceError
>,
never,
Sharding
Expand Down
10 changes: 10 additions & 0 deletions packages/cluster/src/EntityAddress.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,13 @@ export class EntityAddress extends Schema.Class<EntityAddress>(SymbolKey)({
export const EntityAddressFromSelf: Schema.Schema<EntityAddress> = Schema.typeSchema(
EntityAddress
)

/**
* @since 4.0.0
* @category constructors
*/
export const make = (options: {
readonly shardId: ShardId
readonly entityType: EntityType
readonly entityId: EntityId
}): EntityAddress => new EntityAddress(options, { disableValidation: true })
Loading