Introduction & purpose

This post is more related to the data engineering side of things, rather than the analytics, at least first sight. At the end, indeed it is about an Android app! The point is, we've been for quite some time now preparing a data laboratory, and our intention is to buy pretty soon one or two dozens of wristbands, have some people wear them for a few hours a day, upstream all the data to a database in a server, and then analyze the data to find insights.

Everything you would need for building such a lab is pretty much available: there's several providers for the hardware of the wristband, which also give an Android app to communicate with it. And Android itself has open and well documented libraries for nearly everything you need, thus it should be fairly easy to enhance the provider's app to your purpose. However, the sensors in the wristband produce an amount of data that is not negligible, i.e. if you want to process the raw data (commercial products pre-process the sensors' data in the hardware of the wristband, and only accumulated results go to the app in your smartphone). Besides, upstreaming is not really so common in any Android development, at least not in a fast and continuous manner. Thus, finally it was not so easy to prepare the app for the lab, and that's why we thought it could make a hopefully interesting and useful post.

This post aims to explain the details of such an app, with code snippets and links to the full solution, and being agnostic about the hardware.

A cache object and an upstream service

In this post you will find the description of two main objects of the app:

The complete app (see here for the full code), contains several Android Activities which handle the interaction with the user and the connection with the device. The cache is tightly linked to one of the activities, while the communication with the server is implemented as an Android Service. (Note that, in Android development, an Activity runs in the foreground and serves as the user's GUI, while a Service implements something that the user does not see directly, but it is important for the app, see here for more details.)

The cache in detail, including code snippets

You can find the full implementation of the object here. Main features follow as bullet list.

    public synchronized static AccDataCacheSingleton getInstance(Context context) {
          if (accDataCacheSingleton == null && context != null) {
              Log.d(TAG, "Creating cache singleton");
              accDataCacheSingleton = new AccDataCacheSingleton();
              accDataCacheSingleton.context = context;
              accDataCacheSingleton.initBuffer();
          }
          return accDataCacheSingleton;
      }
  
    private void initBuffer() {
            togglingBuffer = new Buffer[TOGGLING_BUFFER_SIZE];
            bufferPointer = 0;
            unitsPointer = 0;
            bufferBackupPending = new HashMap<>();
            togglingBuffer[accDataCacheSingleton.bufferPointer] = new Buffer();
        }
    
    public synchronized void add(FraaStreamDataUnit unit) {
          if (unitsPointer == LENGTH_OF_BUFFER) {
              Log.e(TAG, "Error, index has been skipped!!!");
              toggleBuffer();
          }
          togglingBuffer[bufferPointer].units[unitsPointer++] = unit;
          if (unitsPointer == LENGTH_OF_BUFFER) {
              Log.d(TAG, "Recreating singleton buffer");
              UUID id = toggleBuffer();
              new InsertIntoDatabaseTask().execute(id.toString());
          }
      }
  
    private static class Buffer {
            public FraaStreamDataUnit[] units;

            public Buffer() {
                this.units = new FraaStreamDataUnit[LENGTH_OF_BUFFER];
            }
        }

        private UUID toggleBuffer() {
            if (bufferBackupPending.get(bufferPointer) != null) {
                Log.e(TAG, "Data would get lost in this case... consider adding a semaphore?");
            }
            UUID id = UUID.randomUUID();
            // use database semaphore...
            getDbHelperWhenAvailable();
            bufferBackupPending.put(bufferPointer++, id);
            release();
            bufferPointer = (bufferPointer == TOGGLING_BUFFER_SIZE) ? 0 : bufferPointer;
            togglingBuffer[bufferPointer] = new Buffer();
            unitsPointer = 0;
            return id;
        }
    
    private class InsertIntoDatabaseTask extends AsyncTask<String, Integer, Integer> {
            @Override
            protected Integer doInBackground(String... params) {
                UUID id = UUID.fromString(params[0]);
                Integer bufferToBackup = null;
                FraaDbHelper fraaDbHelper = getDbHelperWhenAvailable();
                SQLiteDatabase db = fraaDbHelper.getWritableDatabase();
                for (Integer key : bufferBackupPending.keySet()) {
                    UUID candidate = bufferBackupPending.get(key);
                    if (candidate != null && candidate.equals(id)) {
                        bufferToBackup = key;
                    }
                }
                if (bufferToBackup == null) {
                    Log.e(TAG, "Data got lost... Don't find " + id.toString() + " anymore");
                }

                for (FraaStreamDataUnit unit : togglingBuffer[bufferToBackup].units) {
                    //Log.d(StreamingActivity.TAG, "count:" + count++);
                    ContentValues values = new ContentValues();
                    values.put(AccDataContract.AccDataEntry.COLUMN_NAME_HEADER_ID, getHeaderId());
                    values.put(AccDataContract.AccDataEntry.COLUMN_NAME_INDEX, unit.getIndex());
                    values.put(AccDataContract.AccDataEntry.COLUMN_NAME_X, unit.getX());
                    values.put(AccDataContract.AccDataEntry.COLUMN_NAME_Y, unit.getY());
                    values.put(AccDataContract.AccDataEntry.COLUMN_NAME_Z, unit.getZ());
                    db.insert(AccDataContract.AccDataEntry.TABLE_NAME,
                            null,
                            values);
                }

                //db.endTransaction();
                bufferBackupPending.put(bufferToBackup, null);
                Log.d(TAG, "copy to SQLite ok (" + bufferToBackup + ")");
                db.close();
                release();
                return 1;
            }
        }
    
    private final Semaphore available = new Semaphore(1);

    private FraaDbHelper getDbHelperWhenAvailable() {
        available.acquireUninterruptibly();
        return new FraaDbHelper(this.context);
    }
  

The upstream service in detail, including code snippets

The upstream service periodically uploads the available data into the server. As stated above, every time it needs to access the SQLite database, it asks it to the cache object (a unique singleton instance). Note that:

You can find the full implementation of the object here. Main features follow as bullet list.

    private void sendToServer(FraaStreamData data) {
        RequestQueue queue = Volley.newRequestQueue(this);
        String url = server_url + "data";
        GsonRequest postRequest = new GsonRequest<FraaStreamData, UUID>(Request.Method.POST, url, data, null,
                new Response.Listener<UUID>() {
                    @Override
                    public void onResponse(UUID response) {
                        Log.i(StreamingActivity.TAG, "Response id to be removed: " + response + "(" + pendingRequests.get(response).getHeaderId() + ")");
                        // update new serverHeaderId
                        AccDataCacheSingleton obj = AccDataCacheSingleton.getInstance();
                        obj.removeFromDatabase(pendingRequests.get(response));
                        Log.i(StreamingActivity.TAG, "Response id finished removing: " + response + "(" + pendingRequests.get(response).getHeaderId() + ")");
                    }
                }, new Response.ErrorListener() {
            @Override
            public void onErrorResponse(VolleyError error) {
                Log.i(StreamingActivity.TAG, "sendToServer didn't work!");
                Log.i(StreamingActivity.TAG, error.toString());
            }
        }, UUID.class);
        // avoid sending the data twice (big message over a slow network)
        postRequest.setRetryPolicy(new DefaultRetryPolicy(0,
                DefaultRetryPolicy.DEFAULT_MAX_RETRIES,
                DefaultRetryPolicy.DEFAULT_BACKOFF_MULT));
        // Add the request to the RequestQueue.
        queue.add(postRequest);
    }
  
    public void checkCacheDataStream() {
          final Runnable checkData = new Runnable() {
              public void run() {
                  AccDataCacheSingleton obj = AccDataCacheSingleton.getInstance();
                  Collection<FraaStreamData> list = convertWithMaxDataSize(obj.selectRowsHeaderEqualTo(getHeaderId()));
                  boolean dataPending = false;
                  for (FraaStreamData data : list) {
                      if (data.getDataUnits().length == MAX_NUMBER_DATA_UNITS_UPSTREAM) {
                          addDataToPendingRequests(data);
                          dataPending = true;
                      }
                  }
                  if (dataPending && dataProcessHandle.isCancelled()) {
                      Log.d(StreamingActivity.TAG, "restarting data process");
                      checkDataEveryInterval();
                  }
              }
          };
          // runs every 2 minutes, never stops
          // TODO adjust time interval depending on size of buffers & sampling rate
          cacheScheduler.scheduleAtFixedRate(checkData, 0, 2, TimeUnit.MINUTES);
      }
  

Note on the server

You can find the full code here. The server just does enough to handle the REST requests and flush the data into a database. It is implemented in Java, using Hibernate for handling the database. It uses maven to produce a war file, which may be deployed e.g. using tomcat.

Related posts