Skip to content

Commit

Permalink
std: object API change
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Jan 31, 2025
1 parent d6dad21 commit 13f255a
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 72 deletions.
11 changes: 5 additions & 6 deletions dv/dv/advert_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ func (a *advertModule) generate() {
a.seq++

// Produce the advertisement
name := a.dv.config.AdvertisementDataPrefix().
Append(enc.NewTimestampComponent(a.bootTime)).
WithVersion(a.seq)
name, err := a.dv.client.Produce(ndn.ProduceArgs{
Name: a.dv.config.AdvertisementDataPrefix().Append(
enc.NewTimestampComponent(a.bootTime),
),
Name: name,
Content: a.dv.rib.Advert().Encode(),
Version: a.seq,
FreshnessPeriod: 10 * time.Second,
})
if err != nil {
Expand All @@ -48,10 +48,9 @@ func (a *advertModule) dataFetch(nName enc.Name, bootTime uint64, seqNo uint64)
enc.NewKeywordComponent("DV"),
enc.NewKeywordComponent("ADV"),
enc.NewTimestampComponent(bootTime),
enc.NewVersionComponent(seqNo),
)...)

a.dv.client.Consume(advName, func(state ndn.ConsumeState) {
a.dv.client.Consume(advName.WithVersion(seqNo), func(state ndn.ConsumeState) {
if !state.IsComplete() {
return
}
Expand Down
7 changes: 6 additions & 1 deletion dv/dv/prefix_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ func (dv *Router) prefixDataFetch(nName enc.Name) {
// Fetch the prefix data object
log.Debug(dv.pfx, "Fetching prefix data", "router", nName, "known", router.Known, "latest", router.Latest)

// Get the versioned name of the next object to fetch
// This can be either a snapshot or a normal data depending on
// the state of the router in the prefix table
name := router.GetNextDataName()

// Fetch the object
dv.client.Consume(name, func(state ndn.ConsumeState) {
if !state.IsComplete() {
return
Expand All @@ -68,7 +73,7 @@ func (dv *Router) prefixDataFetch(nName enc.Name) {
go func() {
fetchErr := state.Error()
if fetchErr != nil {
log.Warn(dv.pfx, "Failed to fetch prefix data", "name", state.Name(), "err", fetchErr)
log.Warn(dv.pfx, "Failed to fetch prefix data", "name", name, "err", fetchErr)
time.Sleep(1 * time.Second) // wait on error
}

Expand Down
35 changes: 17 additions & 18 deletions dv/table/prefix_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,21 +166,22 @@ func (pt *PrefixTable) Apply(ops *tlv.PrefixOpList) (dirty bool) {
// If the difference between Known and Latest is greater than the threshold,
// fetch the latest snapshot. Otherwise, fetch the next sequence number.
func (r *PrefixTableRouter) GetNextDataName() enc.Name {
// /<router>/32=DV/32=PFX/t=<boot>/32=SNAP/v=<seq>
// /<router>/32=DV/32=PFX/t=<boot>/seq=<seq>/v=0
name := r.Name.Append(
enc.NewKeywordComponent("DV"),
enc.NewKeywordComponent("PFX"),
enc.NewTimestampComponent(r.BootTime),
)

if r.Latest-r.Known > PrefixSnapThreshold {
// no version - discover the latest snapshot
// /<router>/32=DV/32=PFX/t=<boot>/32=SNAP/v=<seq>
name = append(name, enc.NewKeywordComponent(PrefixSnapKeyword))
} else {
// zero version - immutable
// /<router>/32=DV/32=PFX/t=<boot>/seq=<seq>/v=0
name = append(name, enc.NewSequenceNumComponent(r.Known+1), enc.NewVersionComponent(0))
return name.
Append(enc.NewKeywordComponent(PrefixSnapKeyword))
}
return name

return name.
Append(enc.NewSequenceNumComponent(r.Known + 1)).
WithVersion(enc.VersionImmutable)
}

// Process the received prefix data. Returns if dirty.
Expand Down Expand Up @@ -226,12 +227,11 @@ func (pt *PrefixTable) publishOp(content enc.Wire) {
// Produce the operation
// /<router>/32=DV/32=PFX/t=<boot>/seq=<seq>/v=0
name, err := pt.client.Produce(ndn.ProduceArgs{
Name: pt.config.PrefixTableDataPrefix().Append(
enc.NewTimestampComponent(pt.me.BootTime),
enc.NewSequenceNumComponent(seq),
),
Name: pt.config.PrefixTableDataPrefix().
Append(enc.NewTimestampComponent(pt.me.BootTime)).
Append(enc.NewSequenceNumComponent(seq)).
WithVersion(enc.VersionImmutable),
Content: content,
Version: ndn.VersionImmutable,
})
if err != nil {
log.Error(pt, "Failed to produce op", "err", err)
Expand Down Expand Up @@ -263,12 +263,11 @@ func (pt *PrefixTable) publishSnap() {
// Produce the snapshot
// /<router>/32=DV/32=PFX/t=<boot>/32=SNAP/v=<seq>
name, err := pt.client.Produce(ndn.ProduceArgs{
Name: pt.config.PrefixTableDataPrefix().Append(
enc.NewTimestampComponent(pt.me.BootTime),
enc.NewKeywordComponent(PrefixSnapKeyword),
),
Name: pt.config.PrefixTableDataPrefix().
Append(enc.NewTimestampComponent(pt.me.BootTime)).
Append(enc.NewKeywordComponent(PrefixSnapKeyword)).
WithVersion(pt.me.Latest),
Content: snap.Encode(),
Version: pt.me.Latest,
})
if err != nil {
log.Error(pt, "Failed to produce snap", "err", err)
Expand Down
3 changes: 1 addition & 2 deletions fw/mgmt/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,8 @@ func (m *Thread) sendCtrlResp(interest *Interest, statusCode uint64, statusText
// Create a segmented status dataset and send the first segment to the internal transport
func (m *Thread) sendStatusDataset(interest *Interest, name enc.Name, dataset enc.Wire) {
objName, err := object.Produce(ndn.ProduceArgs{
Name: name,
Name: name.WithVersion(enc.VersionUnixMicro),
Content: dataset,
Version: ndn.VersionUnixMicro,
FreshnessPeriod: time.Millisecond,
NoMetadata: true,
}, m.store, m.signer)
Expand Down
29 changes: 29 additions & 0 deletions std/encoding/component_new.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
package encoding

import "time"

// VersionImmutable is the version number for immutable objects.
// A version number of 0 will be used on the wire.
const VersionImmutable = uint64(0)

// VersionUnixMicro is the version number for objects with a unix timestamp.
// A version number of microseconds since the unix epoch will be used on the wire.
// Current unix time must be positive, or usage will panic.
const VersionUnixMicro = uint64(1<<63 - 16)

func NewBytesComponent(typ TLNum, val []byte) Component {
return Component{
Typ: typ,
Expand Down Expand Up @@ -80,3 +91,21 @@ func (c Component) IsVersion() bool {
func (c Component) IsTimestamp() bool {
return c.Typ == TypeTimestampNameComponent
}

// WithVersion appends a version component to the name.
func (n Name) WithVersion(v uint64) Name {
if n.At(-1).IsVersion() {
n = n.Prefix(-1) // pop old version
}
switch v {
case VersionImmutable:
v = 0
case VersionUnixMicro:
if now := time.Now().UnixMicro(); now > 0 { // > 1970
v = uint64(now)
} else {
panic("current unix time is negative")
}
}
return n.Append(NewVersionComponent(v))
}
39 changes: 18 additions & 21 deletions std/ndn/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,6 @@ import (
enc "github.com/named-data/ndnd/std/encoding"
)

// ObjectVersionImmutable is the version number for immutable objects.
// A version number of 0 will be used on the wire.
const VersionImmutable = uint64(1<<63 - 15)

// ObjectVersionUnixMicro is the version number for objects with a unix timestamp.
// A version number of microseconds since the unix epoch will be used on the wire.
// Current unix time must be positive to use this.
const VersionUnixMicro = uint64(1<<63 - 16)

// Client is the interface for the Object Client API
type Client interface {
// String is the instance log identifier.
Expand All @@ -27,20 +18,25 @@ type Client interface {
Engine() Engine
// Store gives the underlying data store.
Store() Store

// Produce generates and signs data, and inserts into the client's store.
// The input data will be freed as the object is segmented.
// Returns the final versioned name of the object.
Produce(args ProduceArgs) (enc.Name, error)
// Remove removes an object from the client's store by name.
Remove(name enc.Name) error
// Consume fetches an object with a given name
// Consume fetches an object with a given name.
// By default, Consume will attemt to discover the latest version of the object.
// To specify a particular version, use Name.WithVersion()
Consume(name enc.Name, callback func(status ConsumeState))
// ConsumeExt is a more advanced consume API that allows for
// more control over the fetching process.
ConsumeExt(args ConsumeExtArgs)

// ExpressR sends a single interest with reliability.
// Since this is a low-level API, the result is NOT validated.
ExpressR(args ExpressRArgs)

// Suggest suggests a signer for a given name.
// nil is returned if no signer is found.
SuggestSigner(name enc.Name) Signer
Expand All @@ -53,15 +49,11 @@ type Client interface {
// ProduceArgs are the arguments for the produce API.
type ProduceArgs struct {
// Name is the name of the object to produce.
// The version of the object MUST be set using WithVersion.
Name enc.Name
// Content is the raw data wire.
// Content can be larger than a single packet and will be segmented.
Content enc.Wire
// Version of the object. This option is required.
// Available magic values are:
// VersionImmutable for v=0
// VersionUnixMicro for v=current unix time in microseconds
Version uint64
// Time for which the object version can be cached (default 4s).
FreshnessPeriod time.Duration
// NoMetadata disables RDR metadata (advanced usage).
Expand All @@ -70,19 +62,24 @@ type ProduceArgs struct {

// ConsumeState is the state of the consume operation
type ConsumeState interface {
// Name of the object being consumed.
// Name of the object being consumed including version.
Name() enc.Name
// Error that occurred during fetching.
Error() error
// Version of the object being consumed.
Version() uint64

// IsComplete returns true if the content has been completely fetched.
IsComplete() bool
// Content is the currently available buffer in the content.
// any subsequent calls to Content() will return data after the previous call.
Content() enc.Wire
// Progress counter
Progress() int
// ProgressMax is the max value for the progress counter (-1 for unknown).
ProgressMax() int
// Error that occurred during fetching.
Error() error

// Content is the currently available buffer in the content.
// any subsequent calls to Content() will return data after the previous call.
Content() enc.Wire

// Cancel the consume operation.
Cancel()
}
Expand Down
8 changes: 8 additions & 0 deletions std/object/client_consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ func (a *ConsumeState) Name() enc.Name {
return a.fetchName
}

// returns the version of the object being consumed
func (a *ConsumeState) Version() uint64 {
if ver := a.fetchName.At(-1); ver.IsVersion() {
return ver.NumberVal()
}
return 0
}

// returns the error that occurred during fetching
func (a *ConsumeState) Error() error {
return a.err
Expand Down
36 changes: 14 additions & 22 deletions std/object/client_produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,10 @@ func Produce(args ndn.ProduceArgs, store ndn.Store, signer ndn.Signer) (enc.Name
contentSize := content.Length()

// Get the correct version
version := args.Version
switch args.Version {
case 0: // nothing
return nil, errors.New("object version is not specified or zero")
case ndn.VersionImmutable:
version = 0
case ndn.VersionUnixMicro:
if now := time.Now().UnixMicro(); now > 0 { // > 1970
version = uint64(now)
} else {
return nil, errors.New("current unix time is negative")
}
if !args.Name.At(-1).IsVersion() {
return nil, errors.New("object version not set")
}
version := args.Name.At(-1).NumberVal()

// Use freshness period or default
if args.FreshnessPeriod == 0 {
Expand All @@ -47,23 +38,20 @@ func Produce(args ndn.ProduceArgs, store ndn.Store, signer ndn.Signer) (enc.Name
if contentSize > 0 {
lastSeg = uint64((contentSize - 1) / pSegmentSize)
}
finalBlockId := enc.NewSegmentComponent(lastSeg)

cfg := &ndn.DataConfig{
ContentType: utils.IdPtr(ndn.ContentTypeBlob),
Freshness: utils.IdPtr(args.FreshnessPeriod),
FinalBlockID: &finalBlockId,
FinalBlockID: utils.IdPtr(enc.NewSegmentComponent(lastSeg)),
}

basename := args.Name.Append(enc.NewVersionComponent(version))

// use a transaction to ensure the entire object is written
store.Begin()
defer store.Commit()

var seg uint64
for seg = 0; seg <= lastSeg; seg++ {
name := basename.Append(enc.NewSegmentComponent(seg))
name := args.Name.Append(enc.NewSegmentComponent(seg))

segContent := enc.Wire{}
segContentSize := 0
Expand Down Expand Up @@ -100,14 +88,14 @@ func Produce(args ndn.ProduceArgs, store ndn.Store, signer ndn.Signer) (enc.Name

if !args.NoMetadata {
// write metadata packet
name := args.Name.Append(
name := args.Name.Prefix(-1).Append(
enc.NewKeywordComponent(rdr.MetadataKeyword),
enc.NewVersionComponent(version),
enc.NewSegmentComponent(0),
)
content := rdr.MetaData{
Name: basename,
FinalBlockID: finalBlockId.Bytes(),
Name: args.Name,
FinalBlockID: cfg.FinalBlockID.Bytes(),
}

data, err := spec.Spec{}.MakeData(name, cfg, content.Encode(), signer)
Expand All @@ -121,13 +109,17 @@ func Produce(args ndn.ProduceArgs, store ndn.Store, signer ndn.Signer) (enc.Name
}
}

return basename, nil
return args.Name, nil
}

// Produce and sign data, and insert into the client's store.
// The input data will be freed as the object is segmented.
func (c *Client) Produce(args ndn.ProduceArgs) (enc.Name, error) {
signer := c.SuggestSigner(args.Name)
if !args.Name.At(-1).IsVersion() {
return nil, errors.New("object version not set")
}

signer := c.SuggestSigner(args.Name.Prefix(-1))
if signer == nil {
return nil, fmt.Errorf("no valid signer found for %s", args.Name)
}
Expand Down
3 changes: 1 addition & 2 deletions tools/putchunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ func (pc *PutChunks) run(_ *cobra.Command, args []string) {

// produce object
vname, err := cli.Produce(ndn.ProduceArgs{
Name: name,
Name: name.WithVersion(enc.VersionUnixMicro),
Content: content,
Version: ndn.VersionUnixMicro,
})
if err != nil {
log.Fatal(pc, "Unable to produce object", "err", err)
Expand Down

0 comments on commit 13f255a

Please sign in to comment.