Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Running using Databricks Connect #582 #583

Merged
merged 25 commits into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
e54046d
using dbfs
vikasgupta78 May 2, 2023
64f7a06
refactor labeller to move common methods to training helper
vikasgupta78 May 5, 2023
681465e
supporting both normal and data bricks connect client
vikasgupta78 May 6, 2023
e0371aa
getting spark session thru new method in client
vikasgupta78 May 7, 2023
3eab8d7
getting spark session thru new method in client
vikasgupta78 May 7, 2023
99e9ca1
getting spark session thru new method in client
vikasgupta78 May 7, 2023
fe01f2c
getting spark session and JVM thru new method in client
vikasgupta78 May 8, 2023
6764867
label updater should overwrite and not append
vikasgupta78 May 8, 2023
4051246
Running using Databricks Connect
vikasgupta78 May 8, 2023
7097759
indentation issue
vikasgupta78 May 11, 2023
f23c1b2
util method to write using pandas df
vikasgupta78 May 12, 2023
645b03d
Merge pull request #1 from zinggAI/0.3.5
vikasgupta78 May 26, 2023
48b2134
refactor view and model into separate classes 1st cut
vikasgupta78 May 26, 2023
36878b7
extra null check removed
vikasgupta78 May 30, 2023
552d091
constant QUIT_LABELING = 9 defined
vikasgupta78 May 30, 2023
ea7e8f4
constant INCREMENT = 1 defined
vikasgupta78 May 30, 2023
47b493a
refactoring
vikasgupta78 May 31, 2023
18f9c77
lazy initialization
vikasgupta78 May 31, 2023
40b817a
compile error
vikasgupta78 May 31, 2023
26f1135
label update methods and refactoring
vikasgupta78 May 31, 2023
2df6704
validity check added
vikasgupta78 May 31, 2023
c5abe56
compile issues resolved
vikasgupta78 May 31, 2023
53ae447
DB Connect check added
vikasgupta78 May 31, 2023
880b4f6
syntax issues
vikasgupta78 May 31, 2023
a3ddd46
shell script changes
vikasgupta78 Jun 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
@JsonInclude(Include.NON_NULL)
public class Arguments implements Serializable {

private static final long serialVersionUID = 1L;
// creates DriverArgs and invokes the main object
Pipe[] output;
Pipe[] data;
Expand Down
5 changes: 5 additions & 0 deletions common/client/src/main/java/zingg/common/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*
*/
public abstract class Client<S,D,R,C,T> implements Serializable {
private static final long serialVersionUID = 1L;
protected Arguments arguments;
protected IZingg<S,D,R,C> zingg;
protected ClientOptions options;
Expand Down Expand Up @@ -283,4 +284,8 @@ public ZFrame<D,R,C> getUnmarkedRecords() {
return zingg.getUnmarkedRecords();
}

public ITrainingHelper<S, D, R, C> getTrainingHelper() throws UnsupportedOperationException {
return zingg.getTrainingHelper();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package zingg.common.client;

import java.util.List;

import zingg.common.client.pipe.Pipe;

public interface ITrainingHelper<S, D, R, C> {

public void setMarkedRecordsStat(ZFrame<D, R, C> markedRecords);

public Long getMarkedRecordsStat(ZFrame<D, R, C> markedRecords, long value);

public Long getMatchedMarkedRecordsStat(ZFrame<D, R, C> markedRecords);

public Long getUnmatchedMarkedRecordsStat(ZFrame<D, R, C> markedRecords);

public Long getUnsureMarkedRecordsStat(ZFrame<D, R, C> markedRecords);

public ZFrame<D, R, C> getClusterIdsFrame(ZFrame<D, R, C> lines);

public List<R> getClusterIds(ZFrame<D, R, C> lines);

public List<C> getDisplayColumns(ZFrame<D, R, C> lines, Arguments args);

public ZFrame<D, R, C> getCurrentPair(ZFrame<D, R, C> lines, int index, List<R> clusterIds, ZFrame<D, R, C> clusterLines);

public double getScore(ZFrame<D, R, C> currentPair);

public double getPrediction(ZFrame<D, R, C> currentPair);

public String getMsg1(int index, int totalPairs);

public String getMsg2(double prediction, double score);

public void displayRecords(ZFrame<D, R, C> records, String preMessage, String postMessage);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if:

we have two interfaces here. TrainingDataModel and LabelDataViewHelper. The data model has methods for reading and writing training pairs, getting scores etc. The view has messages.

TrainingDataModel should extend from ZinggBase and automatically gets pipeutil and other context stuff. ZinggBase already has the methods to get stats etc..and other methods can be moved there. You can use TDM in labeller and labelupdater just like we use the trainer and matcher in trainmatcher.

TDM and LabelDataViewHelper are returned from Client methods and used in python.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1st draft available in commit 48b2134 please review


public ZFrame<D, R, C> updateRecords(int matchValue, ZFrame<D, R, C> newRecords, ZFrame<D, R, C> updatedRecords);

public void updateLabellerStat(int selected_option, int increment);

public void printMarkedRecordsStat();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wont this go in the view?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kept update in model and print in view , commit 48b2134, please review


public void writeLabelledOutput(ZFrame<D, R, C> records, Arguments args) throws ZinggClientException;

public void writeLabelledOutput(ZFrame<D,R,C> records, Arguments args, Pipe p) throws ZinggClientException;

}
2 changes: 2 additions & 0 deletions common/client/src/main/java/zingg/common/client/IZingg.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,7 @@ public void init(Arguments args, String license)
public ClientOptions getClientOptions();

public void setSession(S session);

public ITrainingHelper<S, D, R, C> getTrainingHelper() throws UnsupportedOperationException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
import zingg.common.core.util.LabelMatchType;

public abstract class LabelUpdater<S,D,R,C,T> extends Labeller<S,D,R,C,T> {
private static final long serialVersionUID = 1L;
protected static String name = "zingg.LabelUpdater";
public static final Log LOG = LogFactory.getLog(LabelUpdater.class);

public LabelUpdater() {
setZinggOptions(ZinggOptions.UPDATE_LABEL);
setTrainingHelper(new TrainingHelper<S,D,R,C>());
}

public void execute() throws ZinggClientException {
Expand All @@ -33,12 +35,12 @@ public void execute() throws ZinggClientException {
}
}

public void processRecordsCli(ZFrame<D,R,C> lines) throws ZinggClientException {
public ZFrame<D,R,C> processRecordsCli(ZFrame<D,R,C> lines) throws ZinggClientException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to return a zframe here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done so that writing of labelled output happens in a a separate method. This is needed for python api to work.

LOG.info("Processing Records for CLI updateLabelling");

if (lines != null && lines.count() > 0) {
getMarkedRecordsStat(lines);
printMarkedRecordsStat();
getTrainingHelper().setMarkedRecordsStat(lines);
getTrainingHelper().printMarkedRecordsStat();

List<C> displayCols = getDSUtil().getFieldDefColumns(lines, args, false, args.getShowConcise());
try {
Expand Down Expand Up @@ -67,9 +69,9 @@ public void processRecordsCli(ZFrame<D,R,C> lines) throws ZinggClientException {
String matchType = LabelMatchType.get(matchFlag).msg;
postMsg = String.format("\tThe above pair is labeled as %s\n", matchType);
selectedOption = displayRecordsAndGetUserInput(getDSUtil().select(currentPair, displayCols), preMsg, postMsg);
updateLabellerStat(selectedOption, +1);
updateLabellerStat(matchFlag, -1);
printMarkedRecordsStat();
getTrainingHelper().updateLabellerStat(selectedOption, +1);
getTrainingHelper().updateLabellerStat(matchFlag, -1);
getTrainingHelper().printMarkedRecordsStat();
if (selectedOption == 9) {
LOG.info("User has quit in the middle. Updating the records.");
break;
Expand All @@ -80,15 +82,16 @@ public void processRecordsCli(ZFrame<D,R,C> lines) throws ZinggClientException {
updatedRecords = updatedRecords
.filter(updatedRecords.notEqual(ColName.CLUSTER_COLUMN,cluster_id));
}
updatedRecords = updateRecords(selectedOption, currentPair, updatedRecords);
updatedRecords = getTrainingHelper().updateRecords(selectedOption, currentPair, updatedRecords);
} while (selectedOption != 9);

if (updatedRecords != null) {
updatedRecords = updatedRecords.union(recordsToUpdate);
}
writeLabelledOutput(updatedRecords);
getTrainingHelper().writeLabelledOutput(updatedRecords,args,getOutputPipe());
sc.close();
LOG.info("Processing finished.");
return updatedRecords;
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
e.printStackTrace();
Expand All @@ -98,6 +101,7 @@ public void processRecordsCli(ZFrame<D,R,C> lines) throws ZinggClientException {
}
} else {
LOG.info("There is no marked record for updating. Please run findTrainingData/label jobs to generate training data.");
return null;
}
}

Expand Down
Loading