Skip to content

Commit

Permalink
bitwiseStoring
Browse files Browse the repository at this point in the history
  • Loading branch information
youfanx committed Aug 8, 2024
1 parent 2ce860c commit 69f8dcf
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 29 deletions.
17 changes: 16 additions & 1 deletion rxlib/src/main/java/org/rx/bean/DataRange.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import lombok.NoArgsConstructor;
import lombok.NonNull;
import org.rx.core.Reflects;
import org.rx.core.StringBuilder;
import org.rx.core.Strings;

import java.io.Serializable;
Expand All @@ -14,9 +15,10 @@
@NoArgsConstructor
public class DataRange<T extends Comparable<T>> implements Serializable {
private static final long serialVersionUID = 2698228026798507997L;
static final String DELIMITER = " - ";

public static <T extends Comparable<T>> DataRange<T> of(String expr, Class<T> type) {
String[] vals = Strings.split(expr, "-", 2);
String[] vals = Strings.split(expr, DELIMITER, 2);
return new DataRange<>(Reflects.changeType(vals[0], type), Reflects.changeType(vals[1], type));
}

Expand All @@ -33,4 +35,17 @@ public DataRange(@NonNull T start, @NonNull T end) {
public boolean has(T data) {
return start.compareTo(data) <= 0 && end.compareTo(data) > 0;
}

@Override
public String toString() {
StringBuilder buf = new StringBuilder();
if (start != null) {
buf.append(start.toString());
}
buf.append(DELIMITER);
if (end != null) {
buf.append(end.toString());
}
return buf.toString();
}
}
11 changes: 11 additions & 0 deletions rxlib/src/main/java/org/rx/core/RxConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.rx.annotation.Metadata;
import org.rx.bean.LogStrategy;
import org.rx.net.Sockets;
import org.springframework.core.env.Environment;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -111,6 +112,7 @@ public static class ThreadPoolConfig {
String traceName;
int maxTraceDepth;
int slowMethodSamplingPercent;
final List<String> slowMethodAutoSampleTime = newConcurrentList(true);

int cpuLoadWarningThreshold;
long samplingPeriod;
Expand Down Expand Up @@ -217,6 +219,14 @@ public int getIntId() {
private RxConfig() {
}

public void refreshFrom(Environment env, int flags) {
Map<String, Object> rsProps = Linq.from(Reflects.getFieldMap(RxConfig.ConfigNames.class).values()).select(p -> {
String k = (String) p.get(null);
return new AbstractMap.SimpleEntry<>(k, env.getProperty(k));
}).where(p -> p.getValue() != null).toMap();
refreshFrom(rsProps, flags);
}

public void refreshFrom(Map<String, Object> props) {
refreshFrom(props, 0);
}
Expand All @@ -239,6 +249,7 @@ void afterSet() {
id = Sockets.getLocalAddress().getHostAddress() + "-" + Strings.randomValue(99);
}
}

// public void refreshFromSystemProperty() {
// Map<String, Object> sysProps = new HashMap<>((Map) System.getProperties());
// reset(net.lanIps, ConfigNames.NET_LAN_IPS);
Expand Down
33 changes: 15 additions & 18 deletions rxlib/src/main/java/org/rx/core/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,19 +138,16 @@ private void doNotify() {
}

static class Task<T> implements Runnable, Callable<T>, Supplier<T> {
static <T> Task<T> adapt(Callable<T> fn) {
return adapt(fn, null, null);
}
//减少stackTrace
// static <T> Task<T> adapt(Callable<T> fn) {
// return adapt(fn, null, null);
// }

static <T> Task<T> adapt(Callable<T> fn, FlagsEnum<RunFlag> flags, Object id) {
Task<T> t = as(fn);
return t != null && t.id == id ? t : new Task<>(fn, flags, id);
}

static <T> Task<T> adapt(Runnable fn) {
return adapt(fn, null, null);
}

static <T> Task<T> adapt(Runnable fn, FlagsEnum<RunFlag> flags, Object id) {
Task<T> t = as(fn);
return t != null && t.id == id ? t : new Task<>(() -> {
Expand Down Expand Up @@ -433,7 +430,7 @@ public void setDynamicSize(IntWaterMark cpuWaterMark) {
//region v1
@Override
public void execute(Runnable command) {
super.execute(Task.adapt(command));
super.execute(Task.adapt(command, null, null));
}

@Override
Expand All @@ -459,32 +456,32 @@ public <T> Future<T> submit(Callable<T> task) {

@Override
protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTaskAdapter<>(Task.adapt(runnable), value);
return new FutureTaskAdapter<>(Task.adapt(runnable, null, null), value);
}

@Override
protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTaskAdapter<>(Task.adapt(callable));
return new FutureTaskAdapter<>(Task.adapt(callable, null, null));
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return super.invokeAny(Linq.from(tasks).select(p -> Task.adapt(p)).toList());
return super.invokeAny(Linq.from(tasks).select(p -> Task.adapt(p, null, null)).toList());
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return super.invokeAny(Linq.from(tasks).select(p -> Task.adapt(p)).toList(), timeout, unit);
return super.invokeAny(Linq.from(tasks).select(p -> Task.adapt(p, null, null)).toList(), timeout, unit);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return super.invokeAll(Linq.from(tasks).select(p -> Task.adapt(p)).toList());
return super.invokeAll(Linq.from(tasks).select(p -> Task.adapt(p, null, null)).toList());
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return super.invokeAll(Linq.from(tasks).select(p -> Task.adapt(p)).toList(), timeout, unit);
return super.invokeAll(Linq.from(tasks).select(p -> Task.adapt(p, null, null)).toList(), timeout, unit);
}

public Future<Void> run(Action task) {
Expand All @@ -505,13 +502,13 @@ public <T> Future<T> run(Func<T> task, Object taskId, FlagsEnum<RunFlag> flags)

@SneakyThrows
public <T> T runAny(Iterable<Func<T>> tasks, long timeoutMillis) {
List<Callable<T>> callables = Linq.from(tasks).select(p -> (Callable<T>) Task.adapt(p)).toList();
List<Callable<T>> callables = Linq.from(tasks).select(p -> (Callable<T>) Task.adapt(p, null, null)).toList();
return timeoutMillis > 0 ? super.invokeAny(callables, timeoutMillis, TimeUnit.MILLISECONDS) : super.invokeAny(callables);
}

@SneakyThrows
public <T> List<Future<T>> runAll(Iterable<Func<T>> tasks, long timeoutMillis) {
List<Callable<T>> callables = Linq.from(tasks).select(p -> (Callable<T>) Task.adapt(p)).toList();
List<Callable<T>> callables = Linq.from(tasks).select(p -> (Callable<T>) Task.adapt(p, null, null)).toList();
return timeoutMillis > 0 ? super.invokeAll(callables, timeoutMillis, TimeUnit.MILLISECONDS) : super.invokeAll(callables);
}

Expand Down Expand Up @@ -583,15 +580,15 @@ <T> CompletableFuture<T> runSerialAsync(@NonNull Func<T> task, @NonNull Object t

public <T> MultiTaskFuture<T, T> runAnyAsync(Iterable<Func<T>> tasks) {
CompletableFuture<T>[] futures = Linq.from(tasks).select(task -> {
Task<T> t = Task.adapt(task);
Task<T> t = Task.adapt(task, null, null);
return CompletableFuture.supplyAsync(t, asyncExecutor);
}).toArray();
return new MultiTaskFuture<>((CompletableFuture<T>) CompletableFuture.anyOf(futures), futures);
}

public <T> MultiTaskFuture<Void, T> runAllAsync(Iterable<Func<T>> tasks) {
CompletableFuture<T>[] futures = Linq.from(tasks).select(task -> {
Task<T> t = Task.adapt(task);
Task<T> t = Task.adapt(task, null, null);
//allOf().join() will hang
// return wrap(CompletableFuture.supplyAsync(t, this), t.traceId);
return CompletableFuture.supplyAsync(t, asyncExecutor);
Expand Down
4 changes: 4 additions & 0 deletions rxlib/src/main/java/org/rx/core/WheelTimer.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ public void run(Timeout timeout) throws Exception {
final HashedWheelTimer timer = new HashedWheelTimer(ThreadPool.newThreadFactory("TIMER"), TICK_DURATION, TimeUnit.MILLISECONDS);
final EmptyTimeout nonTask = new EmptyTimeout();

public TimeoutFuture<?> getFutureById(Object taskId) {
return holder.get(taskId);
}

public TimeoutFuture<?> setTimeout(Action fn, LongUnaryOperator nextDelay) {
return setTimeout(fn, nextDelay, null, null);
}
Expand Down
12 changes: 6 additions & 6 deletions rxlib/src/main/java/org/rx/exception/TraceHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ public void uncaughtException(Thread t, Throwable e) {
}

public void saveExceptionTrace(Thread t, String msg, Throwable e) {
queue.offer(new Object[]{t.getName(), DateTime.now(), msg, e});
queue.offer(new Object[]{t.getName(), Sys.getMDCCtxMap(), DateTime.now(), msg, e});
}

void innerSave(String thread, DateTime now, String msg, Throwable e) {
void innerSave(String thread, Map<String, String> mdc, DateTime now, String msg, Throwable e) {
RxConfig.TraceConfig conf = RxConfig.INSTANCE.getTrace();
String stackTrace = ExceptionUtils.getStackTrace(e);
String eMsg = Strings.EMPTY;
Expand Down Expand Up @@ -223,7 +223,7 @@ void innerSave(String thread, DateTime now, String msg, Throwable e) {
StringBuilder b = new StringBuilder();
b.appendMessageFormat("{}\t{}{}", now.toDateTimeString(), eMsg, msg);
call.put("message", b.toString());
call.put("MDC", Sys.getMDCCtxMap());
call.put("MDC", mdc);
queue.offer(call);
entity.occurCount++;
entity.setAppName(RxConfig.INSTANCE.getId());
Expand Down Expand Up @@ -270,10 +270,10 @@ public List<ExceptionEntity> queryExceptionTraces(Date startTime, Date endTime,

public void saveMethodTrace(Thread t, String declaringTypeName, String methodName, Object[] parameters,
Object returnValue, Throwable e, long elapsedMicros) {
queue.offer(new Object[]{t.getName(), DateTime.now(), declaringTypeName, methodName, parameters, returnValue, e, elapsedMicros});
queue.offer(new Object[]{t.getName(), Sys.getMDCCtxMap(), DateTime.now(), declaringTypeName, methodName, parameters, returnValue, e, elapsedMicros});
}

void innerSave(String thread, DateTime now, String declaringTypeName, String methodName, Object[] parameters,
void innerSave(String thread, Map<String, String> mdc, DateTime now, String declaringTypeName, String methodName, Object[] parameters,
Object returnValue, Throwable error, long elapsedNanos) {
RxConfig.TraceConfig conf = RxConfig.INSTANCE.getTrace();
long elapsedMicros;
Expand Down Expand Up @@ -302,7 +302,7 @@ void innerSave(String thread, DateTime now, String declaringTypeName, String met
} else if (returnValue != null) {
entity.setReturnValue(toJsonString(returnValue));
}
entity.setMDC(Sys.getMDCCtxMap());
entity.setMDC(mdc);
entity.elapsedMicros = Math.max(entity.elapsedMicros, elapsedMicros);
entity.occurCount++;
entity.setAppName(RxConfig.INSTANCE.getId());
Expand Down
17 changes: 13 additions & 4 deletions rxlib/src/main/java/org/rx/io/Bytes.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.netty.buffer.*;
import lombok.SneakyThrows;
import org.h2.util.Bits;
import org.rx.core.Constants;

import java.io.InputStream;
Expand Down Expand Up @@ -64,12 +65,20 @@ public static ByteBuf copyInputStream(InputStream in, int length) {
return buf;
}

public static long wrap(int a, int b) {
return (((long) a) << 32) | (b & 0xffffffffL);
public static long wrap(int high, int low) {
return (((long) high) << 32) | (low & 0xffffffffL);
}

public static int[] unwrap(long l) {
return new int[]{(int) (l >> 32), (int) l};
public static int[] unwrap(long n) {
return new int[]{(int) (n >> 32), (int) n};
}

public static int wrap(short high, short low) {
return (high << 16) | (low & 0xffff);
}

public static short[] unwrap(int n) {
return new short[]{(short) (n >> 16), (short) n};
}

public static int findText(ByteBuf byteBuf, String str) {
Expand Down
33 changes: 33 additions & 0 deletions rxlib/src/test/java/org/rx/io/TestIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -750,4 +750,37 @@ public void listDirectories() {
System.out.println(p);
}
}

@Test
public void bitwiseStoring() {
int a = Integer.MAX_VALUE, b = 200;
int c = -50, d = Integer.MIN_VALUE;

long n = Bytes.wrap(a, b);
int[] unwrap = Bytes.unwrap(n);
assert unwrap[0] == a && unwrap[1] == b;

n = Bytes.wrap(c, d);
unwrap = Bytes.unwrap(n);
assert unwrap[0] == c && unwrap[1] == d;


short w = 1, x = 50;
short y = -50, z = Short.MIN_VALUE;

int sn = Bytes.wrap(w, x);
System.out.println("n3-b:" + sn);
short[] sunwrap = Bytes.unwrap(sn);
System.out.println("n3-a:" + toJsonString(sunwrap));
assert sunwrap[0] == w && sunwrap[1] == x;

sn = Bytes.wrap(y, z);
System.out.println("n4:" + sn);
sunwrap = Bytes.unwrap(sn);
assert sunwrap[0] == y && sunwrap[1] == z;

int[] ax = {0};
ax[0]++;
System.out.println(ax[0]);
}
}

0 comments on commit 69f8dcf

Please sign in to comment.